package org.apache.hadoop.mapred;

import java.io.File;
import java.io.IOException;
import java.util.Random;
import java.util.StringTokenizer;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.util.ProcessTree;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;

/* loaded from: input_file:org/apache/hadoop/mapred/TestKillSubProcesses.class */
public class TestKillSubProcesses extends TestCase {
    private static volatile Log LOG = LogFactory.getLog(TestKillSubProcesses.class);
    private static String BASE_TEST_ROOT_DIR = new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath();
    private static String TEST_ROOT_DIR = BASE_TEST_ROOT_DIR + "/killSubProcesses";
    private static Path scriptDir = new Path(TEST_ROOT_DIR, "script");
    private static String scriptDirName = scriptDir.toUri().getPath();
    private static Path signalFile = new Path(TEST_ROOT_DIR + "/script/signalFile");
    private static JobClient jobClient = null;
    static MiniMRCluster mr = null;
    private static String pid = null;
    private static int numLevelsOfSubProcesses = 4;

    /* loaded from: input_file:org/apache/hadoop/mapred/TestKillSubProcesses$FailingMapperWithChildren.class */
    static class FailingMapperWithChildren extends MapperWithChildren {
        FailingMapperWithChildren() {
        }

        @Override // org.apache.hadoop.mapred.TestKillSubProcesses.MapperWithChildren, org.apache.hadoop.mapred.MapReduceBase, org.apache.hadoop.mapred.JobConfigurable
        public void configure(JobConf jobConf) {
            super.configure(jobConf);
        }

        @Override // org.apache.hadoop.mapred.TestKillSubProcesses.MapperWithChildren
        public void map(WritableComparable writableComparable, Writable writable, OutputCollector<WritableComparable, Writable> outputCollector, Reporter reporter) throws IOException {
            while (!this.fs.exists(TestKillSubProcesses.signalFile)) {
                try {
                    reporter.progress();
                    synchronized (this) {
                        wait(1000L);
                    }
                } catch (InterruptedException e) {
                    System.out.println("Interrupted while the map was waiting for  the signal.");
                }
            }
            throw new RuntimeException("failing map");
        }

        @Override // org.apache.hadoop.mapred.TestKillSubProcesses.MapperWithChildren, org.apache.hadoop.mapred.Mapper
        public /* bridge */ /* synthetic */ void map(Object obj, Object obj2, OutputCollector outputCollector, Reporter reporter) throws IOException {
            map((WritableComparable) obj, (Writable) obj2, (OutputCollector<WritableComparable, Writable>) outputCollector, reporter);
        }
    }

    /* loaded from: input_file:org/apache/hadoop/mapred/TestKillSubProcesses$KillingMapperWithChildren.class */
    static class KillingMapperWithChildren extends MapperWithChildren {
        KillingMapperWithChildren() {
        }

        @Override // org.apache.hadoop.mapred.TestKillSubProcesses.MapperWithChildren, org.apache.hadoop.mapred.MapReduceBase, org.apache.hadoop.mapred.JobConfigurable
        public void configure(JobConf jobConf) {
            super.configure(jobConf);
        }

        @Override // org.apache.hadoop.mapred.TestKillSubProcesses.MapperWithChildren
        public void map(WritableComparable writableComparable, Writable writable, OutputCollector<WritableComparable, Writable> outputCollector, Reporter reporter) throws IOException {
            while (true) {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    TestKillSubProcesses.LOG.warn("Exception in KillMapperWithChild.map:" + e);
                    return;
                }
            }
        }

        @Override // org.apache.hadoop.mapred.TestKillSubProcesses.MapperWithChildren, org.apache.hadoop.mapred.Mapper
        public /* bridge */ /* synthetic */ void map(Object obj, Object obj2, OutputCollector outputCollector, Reporter reporter) throws IOException {
            map((WritableComparable) obj, (Writable) obj2, (OutputCollector<WritableComparable, Writable>) outputCollector, reporter);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/mapred/TestKillSubProcesses$MapperWithChildren.class */
    public static class MapperWithChildren extends MapReduceBase implements Mapper<WritableComparable, Writable, WritableComparable, Writable> {
        FileSystem fs = null;

        MapperWithChildren() {
        }

        @Override // org.apache.hadoop.mapred.MapReduceBase, org.apache.hadoop.mapred.JobConfigurable
        public void configure(JobConf jobConf) {
            try {
                this.fs = FileSystem.getLocal(jobConf);
                TestKillSubProcesses.runChildren(jobConf);
            } catch (Exception e) {
                TestKillSubProcesses.LOG.warn("Exception in configure: " + StringUtils.stringifyException(e));
            }
        }

        public void map(WritableComparable writableComparable, Writable writable, OutputCollector<WritableComparable, Writable> outputCollector, Reporter reporter) throws IOException {
            while (!this.fs.exists(TestKillSubProcesses.signalFile)) {
                try {
                    reporter.progress();
                    synchronized (this) {
                        wait(1000L);
                    }
                } catch (InterruptedException e) {
                    System.out.println("Interrupted while the map was waiting for  the signal.");
                    return;
                }
            }
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Object obj2, OutputCollector outputCollector, Reporter reporter) throws IOException {
            map((WritableComparable) obj, (Writable) obj2, (OutputCollector<WritableComparable, Writable>) outputCollector, reporter);
        }
    }

    private static void runKillingJobAndValidate(JobTracker jobTracker, JobConf jobConf) throws IOException {
        jobConf.setJobName("testkilljobsubprocesses");
        jobConf.setMapperClass(KillingMapperWithChildren.class);
        RunningJob runJobAndSetProcessHandle = runJobAndSetProcessHandle(jobTracker, jobConf);
        runJobAndSetProcessHandle.killJob();
        while (runJobAndSetProcessHandle.cleanupProgress() == 0.0f) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                LOG.warn("sleep is interrupted:" + e);
            }
        }
        validateKillingSubprocesses(runJobAndSetProcessHandle, jobConf);
        assertEquals(runJobAndSetProcessHandle.getJobState(), 5);
    }

    private static void runFailingJobAndValidate(JobTracker jobTracker, JobConf jobConf) throws IOException {
        jobConf.setJobName("testfailjobsubprocesses");
        jobConf.setMapperClass(FailingMapperWithChildren.class);
        jobConf.setMaxMapAttempts(1);
        RunningJob runJobAndSetProcessHandle = runJobAndSetProcessHandle(jobTracker, jobConf);
        signalTask(signalFile.toString(), jobConf);
        validateKillingSubprocesses(runJobAndSetProcessHandle, jobConf);
        assertEquals(runJobAndSetProcessHandle.getJobState(), 3);
    }

    private static void runSuccessfulJobAndValidate(JobTracker jobTracker, JobConf jobConf) throws IOException {
        jobConf.setJobName("testsucceedjobsubprocesses");
        jobConf.setMapperClass(MapperWithChildren.class);
        RunningJob runJobAndSetProcessHandle = runJobAndSetProcessHandle(jobTracker, jobConf);
        signalTask(signalFile.toString(), jobConf);
        validateKillingSubprocesses(runJobAndSetProcessHandle, jobConf);
        assertEquals(runJobAndSetProcessHandle.getJobState(), 2);
    }

    private static RunningJob runJobAndSetProcessHandle(JobTracker jobTracker, JobConf jobConf) throws IOException {
        RunningJob runJob = runJob(jobConf);
        while (runJob.getJobState() != 1) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
            }
        }
        pid = null;
        jobClient = new JobClient(jobConf);
        TaskReport[] mapTaskReports = jobClient.getMapTaskReports(runJob.getID());
        JobInProgress job = jobTracker.getJob(runJob.getID());
        for (TaskReport taskReport : mapTaskReports) {
            TaskInProgress taskInProgress = job.getTaskInProgress(taskReport.getTaskID());
            while (taskInProgress.getActiveTasks().size() == 0) {
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e2) {
                    LOG.warn("sleep is interrupted:" + e2);
                }
            }
            for (TaskAttemptID taskAttemptID : taskInProgress.getActiveTasks().keySet()) {
                LOG.info("taskAttemptID of map task is " + taskAttemptID);
                while (pid == null) {
                    pid = mr.getTaskTrackerRunner(0).getTaskTracker().getPid(taskAttemptID);
                    if (pid == null) {
                        try {
                            Thread.sleep(500L);
                        } catch (InterruptedException e3) {
                        }
                    }
                }
                LOG.info("pid of map task is " + pid);
                assertTrue("Map is no more alive", isAlive(pid));
                LOG.info("The map task is alive before Job completion, as expected.");
            }
        }
        if (ProcessTree.isSetsidAvailable) {
            for (String pidFromPidFile = UtilsForTests.getPidFromPidFile(scriptDirName + "/childPidFile0"); pidFromPidFile == null; pidFromPidFile = UtilsForTests.getPidFromPidFile(scriptDirName + "/childPidFile0")) {
                LOG.warn(scriptDirName + "/childPidFile0 is null; Sleeping...");
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e4) {
                    LOG.warn("sleep is interrupted:" + e4);
                }
            }
            for (int i = 0; i <= numLevelsOfSubProcesses; i++) {
                String pidFromPidFile2 = UtilsForTests.getPidFromPidFile(scriptDirName + "/childPidFile" + i);
                LOG.info("pid of the descendant process at level " + i + "in the subtree of processes(with the map task as the root) is " + pidFromPidFile2);
                assertTrue("Unexpected: The subprocess at level " + i + " in the subtree is not alive before Job completion", isAlive(pidFromPidFile2));
            }
        }
        return runJob;
    }

    private static void validateKillingSubprocesses(RunningJob runningJob, JobConf jobConf) throws IOException {
        while (!runningJob.isComplete()) {
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
            }
        }
        assertTrue(!ProcessTree.isAlive(pid));
        LOG.info("The map task is not alive after Job is completed, as expected.");
        if (ProcessTree.isSetsidAvailable) {
            for (int i = 0; i <= numLevelsOfSubProcesses; i++) {
                String pidFromPidFile = UtilsForTests.getPidFromPidFile(scriptDirName + "/childPidFile" + i);
                LOG.info("pid of the descendant process at level " + i + "in the subtree of processes(with the map task as the root) is " + pidFromPidFile);
                assertTrue("Unexpected: The subprocess at level " + i + " in the subtree is alive after Job completion", !isAlive(pidFromPidFile));
            }
        }
        LocalFileSystem local = FileSystem.getLocal(mr.createJobConf());
        if (local.exists(scriptDir)) {
            local.delete(scriptDir, true);
        }
    }

    private static RunningJob runJob(JobConf jobConf) throws IOException {
        Path path;
        Path path2;
        LocalFileSystem local = FileSystem.getLocal(jobConf);
        if (FileSystem.get(jobConf).getUri().toASCIIString().equals(local.getUri().toASCIIString())) {
            path = new Path(TEST_ROOT_DIR, "input");
            path2 = new Path(TEST_ROOT_DIR, "output");
        } else {
            path = new Path("killjob/input");
            path2 = new Path("killjob/output");
        }
        if (local.exists(scriptDir)) {
            local.delete(scriptDir, true);
        }
        jobConf.setNumMapTasks(1);
        jobConf.setNumReduceTasks(0);
        jobConf.set("mapred.map.child.java.opts", jobConf.get("mapred.map.child.java.opts", jobConf.get(JobConf.MAPRED_TASK_JAVA_OPTS)) + " -Dtest.build.data=" + BASE_TEST_ROOT_DIR);
        jobConf.set("mapred.reduce.child.java.opts", jobConf.get("mapred.reduce.child.java.opts", jobConf.get(JobConf.MAPRED_TASK_JAVA_OPTS)) + " -Dtest.build.data=" + BASE_TEST_ROOT_DIR);
        return UtilsForTests.runJob(jobConf, path, path2);
    }

    public void testJobKillFailAndSucceed() throws IOException {
        if (Shell.WINDOWS) {
            System.out.println("setsid doesn't work on WINDOWS as expected. Not testing");
            return;
        }
        try {
            JobConf jobConf = new JobConf();
            jobConf.setLong("mapred.tasktracker.tasks.sleeptime-before-sigkill", 0L);
            jobConf.setFloat(JTConfig.JT_HEARTBEATS_SCALING_FACTOR, 500.0f);
            mr = new MiniMRCluster(1, "file:///", 1, (String[]) null, (String[]) null, jobConf);
            runTests(mr.createJobConf(), mr.getJobTrackerRunner().getJobTracker());
            if (mr != null) {
                mr.shutdown();
            }
        } catch (Throwable th) {
            if (mr != null) {
                mr.shutdown();
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void runTests(JobConf jobConf, JobTracker jobTracker) throws IOException {
        LocalFileSystem local = FileSystem.getLocal(mr.createJobConf());
        Path path = new Path(TEST_ROOT_DIR);
        if (!local.exists(path)) {
            local.mkdirs(path);
        }
        local.setPermission(path, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
        runKillingJobAndValidate(jobTracker, jobConf);
        runFailingJobAndValidate(jobTracker, jobConf);
        runSuccessfulJobAndValidate(jobTracker, jobConf);
    }

    private static void signalTask(String str, JobConf jobConf) {
        try {
            FileSystem.getLocal(jobConf).createNewFile(new Path(str));
        } catch (IOException e) {
            LOG.warn("Unable to create signal file. " + e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void runChildren(JobConf jobConf) throws IOException {
        if (ProcessTree.isSetsidAvailable) {
            LocalFileSystem local = FileSystem.getLocal(jobConf);
            if (local.exists(scriptDir)) {
                local.delete(scriptDir, true);
            }
            local.mkdirs(scriptDir);
            local.setPermission(scriptDir, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
            Path path = new Path(scriptDirName, "_shellScript_" + new Random().nextInt() + ".sh");
            String path2 = path.toString();
            String str = "umask 000\necho $$ > " + scriptDirName + "/childPidFile$1\necho hello\ntrap 'echo got SIGTERM' 15 \nif [ $1 != 0 ]\nthen\n sh " + path2 + " $(($1-1))\nelse\n while true\n do\n  sleep 2\n done\nfi";
            FSDataOutputStream create = local.create(path);
            create.writeBytes(str);
            create.close();
            new File(path.toUri().getPath()).setExecutable(true);
            LOG.info("Calling script from map task : " + path2);
            Runtime.getRuntime().exec(path2 + " " + numLevelsOfSubProcesses);
            for (String pidFromPidFile = UtilsForTests.getPidFromPidFile(scriptDirName + "/childPidFile0"); pidFromPidFile == null; pidFromPidFile = UtilsForTests.getPidFromPidFile(scriptDirName + "/childPidFile0")) {
                LOG.warn(scriptDirName + "/childPidFile0 is null; Sleeping...");
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                    LOG.warn("sleep is interrupted:" + e);
                    return;
                }
            }
        }
    }

    private static boolean isAlive(String str) throws IOException {
        Shell.ShellCommandExecutor shellCommandExecutor = new Shell.ShellCommandExecutor(new String[]{"bash", "-c", "ps -o pid,command -e"});
        try {
            shellCommandExecutor.execute();
            StringTokenizer stringTokenizer = new StringTokenizer(shellCommandExecutor.getOutput(), "\n");
            boolean z = false;
            while (true) {
                if (!stringTokenizer.hasMoreTokens()) {
                    break;
                }
                StringTokenizer stringTokenizer2 = new StringTokenizer(stringTokenizer.nextToken(), " ");
                String nextToken = stringTokenizer2.nextToken();
                String nextToken2 = stringTokenizer2.nextToken();
                if (str.equals(nextToken) && !nextToken2.contains("ps") && !nextToken2.contains("grep")) {
                    z = true;
                    break;
                }
            }
            return z;
        } catch (Shell.ExitCodeException e) {
            return false;
        } catch (IOException e2) {
            LOG.warn("IOExecption thrown while checking if process is alive" + StringUtils.stringifyException(e2));
            throw e2;
        }
    }
}
