package org.apache.hadoop.mapred;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import junit.extensions.TestSetup;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobStatusChangeEvent;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.TestNoJobSetupCleanup;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;

/* loaded from: input_file:org/apache/hadoop/mapred/TestJobInProgressListener.class */
public class TestJobInProgressListener extends TestCase {
    private static final Log LOG = LogFactory.getLog(TestJobInProgressListener.class);
    private static String TEST_ROOT_DIR = new File(System.getProperty("test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
    private final Path testDir = new Path(TEST_ROOT_DIR, "test-jip-listener-update");
    private static MiniMRCluster mr;
    private static JobTracker jobtracker;
    private static JobConf conf;
    private static MyScheduler myScheduler;

    /* loaded from: input_file:org/apache/hadoop/mapred/TestJobInProgressListener$MyListener.class */
    public static class MyListener extends JobInProgressListener {
        private List<JobInProgress> wjobs = new ArrayList();
        private List<JobInProgress> rjobs = new ArrayList();
        private List<JobID> wjobsAdded = new ArrayList();
        private List<JobID> rjobsAdded = new ArrayList();

        public boolean contains(JobID jobID) {
            return contains(jobID, true) || contains(jobID, false);
        }

        public boolean contains(JobID jobID, boolean z) {
            if (!this.wjobsAdded.contains(jobID)) {
                throw new RuntimeException("Job " + jobID + " not seen in waiting queue");
            }
            if (!z && !this.rjobsAdded.contains(jobID)) {
                throw new RuntimeException("Job " + jobID + " not seen in run queue");
            }
            Iterator<JobInProgress> it = (z ? this.wjobs : this.rjobs).iterator();
            while (it.hasNext()) {
                if (it.next().getJobID().equals(jobID)) {
                    return true;
                }
            }
            return false;
        }

        public void jobAdded(JobInProgress jobInProgress) {
            TestJobInProgressListener.LOG.info("Job " + jobInProgress.getJobID().toString() + " added");
            this.wjobs.add(jobInProgress);
            this.wjobsAdded.add(jobInProgress.getJobID());
        }

        public void jobRemoved(JobInProgress jobInProgress) {
            TestJobInProgressListener.LOG.info("Job " + jobInProgress.getJobID().toString() + " removed");
            this.wjobs.remove(jobInProgress);
            this.rjobs.remove(jobInProgress);
        }

        public void jobUpdated(JobChangeEvent jobChangeEvent) {
            TestJobInProgressListener.LOG.info("Job " + jobChangeEvent.getJobInProgress().getJobID().toString() + " updated");
            if (jobChangeEvent instanceof JobStatusChangeEvent) {
                JobStatusChangeEvent jobStatusChangeEvent = (JobStatusChangeEvent) jobChangeEvent;
                if (jobStatusChangeEvent.getEventType() == JobStatusChangeEvent.EventType.RUN_STATE_CHANGED) {
                    JobInProgress jobInProgress = jobChangeEvent.getJobInProgress();
                    String jobID = jobInProgress.getJobID().toString();
                    if (!jobInProgress.isComplete()) {
                        TestJobInProgressListener.LOG.info("Job " + jobID + " deleted from the waiting queue");
                        this.wjobs.remove(jobInProgress);
                        this.rjobs.add(jobInProgress);
                        this.rjobsAdded.add(jobInProgress.getJobID());
                        return;
                    }
                    TestJobInProgressListener.LOG.info("Job " + jobID + " deleted from the running queue");
                    if (jobStatusChangeEvent.getOldStatus().getRunState() == JobStatus.PREP) {
                        this.wjobs.remove(jobInProgress);
                    } else {
                        this.rjobs.remove(jobInProgress);
                    }
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/hadoop/mapred/TestJobInProgressListener$MyScheduler.class */
    public static class MyScheduler extends JobQueueTaskScheduler {
        public synchronized void start() throws IOException {
            super.start();
        }

        void stopInitializer() throws IOException {
            this.taskTrackerManager.removeJobInProgressListener(this.eagerTaskInitializationListener);
            this.eagerTaskInitializationListener.terminate();
        }

        void startInitializer() throws IOException {
            this.eagerTaskInitializationListener = new EagerTaskInitializationListener(getConf());
            this.eagerTaskInitializationListener.setTaskTrackerManager(this.taskTrackerManager);
            this.eagerTaskInitializationListener.start();
            this.taskTrackerManager.addJobInProgressListener(this.eagerTaskInitializationListener);
        }

        public /* bridge */ /* synthetic */ Collection getJobs(String str) {
            return super.getJobs(str);
        }

        public /* bridge */ /* synthetic */ List assignTasks(TaskTracker taskTracker) throws IOException {
            return super.assignTasks(taskTracker);
        }

        public /* bridge */ /* synthetic */ void setConf(Configuration configuration) {
            super.setConf(configuration);
        }

        public /* bridge */ /* synthetic */ void terminate() throws IOException {
            super.terminate();
        }

        public /* bridge */ /* synthetic */ void setTaskTrackerManager(TaskTrackerManager taskTrackerManager) {
            super.setTaskTrackerManager(taskTrackerManager);
        }

        public /* bridge */ /* synthetic */ Configuration getConf() {
            return super.getConf();
        }
    }

    public static Test suite() {
        return new TestSetup(new TestSuite(TestJobInProgressListener.class)) { // from class: org.apache.hadoop.mapred.TestJobInProgressListener.1
            protected void setUp() throws Exception {
                JobConf unused = TestJobInProgressListener.conf = new JobConf();
                TestJobInProgressListener.conf.setClass("mapreduce.jobtracker.taskscheduler", MyScheduler.class, TaskScheduler.class);
                MiniMRCluster unused2 = TestJobInProgressListener.mr = new MiniMRCluster(1, "file:///", 1, (String[]) null, (String[]) null, TestJobInProgressListener.conf);
                JobTracker unused3 = TestJobInProgressListener.jobtracker = TestJobInProgressListener.mr.getJobTrackerRunner().getJobTracker();
                MyScheduler unused4 = TestJobInProgressListener.myScheduler = TestJobInProgressListener.jobtracker.getScheduler();
                JobConf unused5 = TestJobInProgressListener.conf = TestJobInProgressListener.mr.createJobConf();
            }

            protected void tearDown() throws Exception {
                JobConf unused = TestJobInProgressListener.conf = null;
                try {
                    TestJobInProgressListener.mr.shutdown();
                } catch (Exception e) {
                    TestJobInProgressListener.LOG.info("Error in shutting down the MR cluster", e);
                }
                JobTracker unused2 = TestJobInProgressListener.jobtracker = null;
                TestJobInProgressListener.myScheduler.terminate();
            }
        };
    }

    public void testJobQueueChanges() throws IOException {
        LOG.info("Testing job queue changes");
        myScheduler.stopInitializer();
        JobQueueJobInProgressListener jobQueueJobInProgressListener = new JobQueueJobInProgressListener();
        jobtracker.addJobInProgressListener(jobQueueJobInProgressListener);
        Path path = new Path(this.testDir, "input");
        Path path2 = new Path(this.testDir, "output1");
        Path path3 = new Path(this.testDir, "output2");
        RunningJob runJob = UtilsForTests.runJob(conf, path, path2, 1, 0);
        LOG.info("Running job " + runJob.getID().toString());
        RunningJob runJob2 = UtilsForTests.runJob(conf, path, path3, 1, 0);
        LOG.info("Running job " + runJob2.getID().toString());
        LOG.info("Testing job priority changes");
        LOG.info("Increasing job2's priority to HIGH");
        runJob2.setJobPriority("HIGH");
        assertTrue("Priority change garbles the queue", jobQueueJobInProgressListener.getJobQueue().size() == 2);
        JobInProgress[] jobInProgressArr = (JobInProgress[]) jobQueueJobInProgressListener.getJobQueue().toArray(new JobInProgress[0]);
        assertTrue("Priority change failed to bump up job2 in the queue", jobInProgressArr[0].getJobID().equals(runJob2.getID()));
        assertTrue("Priority change failed to bump down job1 in the queue", jobInProgressArr[1].getJobID().equals(runJob.getID()));
        assertEquals("Priority change has garbled the queue", 2, jobInProgressArr.length);
        LOG.info("Testing job start-time changes");
        LOG.info("Increasing job2's priority to NORMAL");
        runJob2.setJobPriority("NORMAL");
        JobInProgress job = jobtracker.getJob(runJob2.getID());
        JobInProgress job2 = jobtracker.getJob(runJob.getID());
        JobStatus jobStatus = (JobStatus) job.getStatus().clone();
        job.startTime = job2.startTime - 1;
        job.status.setStartTime(job.startTime);
        JobStatus jobStatus2 = (JobStatus) job.getStatus().clone();
        LOG.info("Updating the listener about job2's start-time change");
        jobQueueJobInProgressListener.jobUpdated(new JobStatusChangeEvent(job, JobStatusChangeEvent.EventType.START_TIME_CHANGED, jobStatus, jobStatus2));
        assertTrue("Start time change garbles the queue", jobQueueJobInProgressListener.getJobQueue().size() == 2);
        JobInProgress[] jobInProgressArr2 = (JobInProgress[]) jobQueueJobInProgressListener.getJobQueue().toArray(new JobInProgress[0]);
        assertTrue("Start time change failed to bump up job2 in the queue", jobInProgressArr2[0].getJobID().equals(runJob2.getID()));
        assertTrue("Start time change failed to bump down job1 in the queue", jobInProgressArr2[1].getJobID().equals(runJob.getID()));
        assertEquals("Start time change has garbled the queue", 2, jobInProgressArr2.length);
    }

    public void testJobCompletion() throws Exception {
        MyListener myListener = new MyListener();
        jobtracker.addJobInProgressListener(myListener);
        myScheduler.stopInitializer();
        testQueuedJobKill(conf, myListener);
        myScheduler.startInitializer();
        testFailedJob(conf, myListener);
        testKilledJob(conf, myListener);
        testSuccessfulJob(conf, myListener);
    }

    private void testFailedJob(JobConf jobConf, MyListener myListener) throws IOException {
        LOG.info("Testing job-fail");
        Path path = new Path(TEST_ROOT_DIR + "/jiplistenerfailjob/input");
        Path path2 = new Path(TEST_ROOT_DIR + "/jiplistenerfailjob/output");
        jobConf.setNumMapTasks(1);
        jobConf.setNumReduceTasks(0);
        jobConf.setMaxMapAttempts(1);
        RunningJob runJobFail = UtilsForTests.runJobFail(jobConf, path, path2);
        assertFalse("Missing event notification on failing a running job", myListener.contains(runJobFail.getID()));
        assertEquals("Job failed!", JobStatus.FAILED, runJobFail.getJobState());
    }

    private void testKilledJob(JobConf jobConf, MyListener myListener) throws IOException {
        LOG.info("Testing job-kill");
        Path path = new Path(TEST_ROOT_DIR + "/jiplistenerkilljob/input");
        Path path2 = new Path(TEST_ROOT_DIR + "/jiplistenerkilljob/output");
        jobConf.setNumMapTasks(1);
        jobConf.setNumReduceTasks(0);
        RunningJob runJobKill = UtilsForTests.runJobKill(jobConf, path, path2);
        assertFalse("Missing event notification on killing a running job", myListener.contains(runJobKill.getID()));
        assertEquals("Job failed!", JobStatus.KILLED, runJobKill.getJobState());
    }

    private void testSuccessfulJob(JobConf jobConf, MyListener myListener) throws Exception {
        LOG.info("Testing job-success");
        Path path = new Path(TEST_ROOT_DIR + "/jiplistenerjob/input");
        Path path2 = new Path(TEST_ROOT_DIR + "/jiplistenerjob/output");
        jobConf.setNumMapTasks(1);
        jobConf.setNumReduceTasks(0);
        RunningJob runJobSucceed = UtilsForTests.runJobSucceed(jobConf, path, path2);
        runJobSucceed.waitForCompletion();
        assertFalse("Missing event notification for a successful job", myListener.contains(runJobSucceed.getID()));
        assertEquals("Job failed!", JobStatus.SUCCEEDED, runJobSucceed.getJobState());
        LOG.info("Testing job with no task job with setup and cleanup");
        jobConf.setNumMapTasks(0);
        jobConf.setNumReduceTasks(0);
        RunningJob runJobSucceed2 = UtilsForTests.runJobSucceed(jobConf, path, new Path(TEST_ROOT_DIR + "/jiplistenerjob/output-no-tasks"));
        runJobSucceed2.waitForCompletion();
        assertFalse("Missing event notification for a successful job with no tasks", myListener.contains(runJobSucceed2.getID(), true));
        assertEquals("Job failed!", JobStatus.SUCCEEDED, runJobSucceed2.getJobState());
        LOG.info("Testing job with no-set-cleanup no task");
        Job createJob = MapReduceTestUtil.createJob(mr.createJobConf(), path, new Path(TEST_ROOT_DIR + "/jiplistenerjob/output-no-tasks-no-set"), 0, 0);
        createJob.setJobSetupCleanupNeeded(false);
        createJob.setOutputFormatClass(TestNoJobSetupCleanup.MyOutputFormat.class);
        createJob.submit();
        createJob.waitForCompletion(true);
        assertFalse("Missing event notification on no-set-cleanup no task job", myListener.contains(JobID.downgrade(createJob.getJobID()), true));
        assertEquals("Job status doesnt reflect success", JobStatus.SUCCEEDED, runJobSucceed2.getJobState());
    }

    private void testQueuedJobKill(JobConf jobConf, MyListener myListener) throws IOException {
        LOG.info("Testing queued-job-kill");
        Path path = new Path(TEST_ROOT_DIR + "/jiplistenerqueuedjob/input");
        Path path2 = new Path(TEST_ROOT_DIR + "/jiplistener1ueuedjob/output");
        jobConf.setMapperClass(IdentityMapper.class);
        jobConf.setReducerClass(IdentityReducer.class);
        jobConf.setNumMapTasks(1);
        jobConf.setNumReduceTasks(0);
        RunningJob runJob = UtilsForTests.runJob(jobConf, path, path2);
        JobID id = runJob.getID();
        LOG.info("Job : " + id.toString() + " submitted");
        assertTrue("Missing event notification on submiting a job", myListener.contains(id, true));
        LOG.info("Killing job : " + id.toString());
        runJob.killJob();
        assertEquals("Job status doesnt reflect the kill-job action", JobStatus.KILLED, runJob.getJobState());
        assertFalse("Missing event notification on killing a waiting job", myListener.contains(id, true));
    }
}
