package org.apache.hadoop.mapred;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
import org.apache.hadoop.mapred.JobStatusChangeEvent;
import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorDescriptor;
import org.eclipse.jdt.core.JavaCore;

/* loaded from: input_file:jars/hadoop-test-1.1.2.jar:org/apache/hadoop/mapred/TestJobInProgressListener.class */
public class TestJobInProgressListener extends TestCase {
    private final Path testDir = new Path("test-jip-listener-update");
    private static final Log LOG = LogFactory.getLog(TestJobInProgressListener.class);
    private static String TEST_ROOT_DIR = new File(System.getProperty("test.build.data", "/tmp")).toURI().toString().replace(' ', '+');

    /* loaded from: input_file:jars/hadoop-test-1.1.2.jar:org/apache/hadoop/mapred/TestJobInProgressListener$MyListener.class */
    public static class MyListener extends JobInProgressListener {
        private List<JobInProgress> wjobs = new ArrayList();
        private List<JobInProgress> jobs = new ArrayList();

        public boolean contains(JobID jobID) {
            return contains(jobID, true) || contains(jobID, false);
        }

        public boolean contains(JobID jobID, boolean z) {
            Iterator<JobInProgress> it = (z ? this.wjobs : this.jobs).iterator();
            while (it.hasNext()) {
                if (it.next().getJobID().equals(jobID)) {
                    return true;
                }
            }
            return false;
        }

        @Override // org.apache.hadoop.mapred.JobInProgressListener
        public void jobAdded(JobInProgress jobInProgress) {
            TestJobInProgressListener.LOG.info("Job " + jobInProgress.getJobID().toString() + " added");
            this.wjobs.add(jobInProgress);
        }

        @Override // org.apache.hadoop.mapred.JobInProgressListener
        public void jobRemoved(JobInProgress jobInProgress) {
            TestJobInProgressListener.LOG.info("Job " + jobInProgress.getJobID().toString() + " removed");
        }

        @Override // org.apache.hadoop.mapred.JobInProgressListener
        public void jobUpdated(JobChangeEvent jobChangeEvent) {
            TestJobInProgressListener.LOG.info("Job " + jobChangeEvent.getJobInProgress().getJobID().toString() + " updated");
            if (jobChangeEvent instanceof JobStatusChangeEvent) {
                JobStatusChangeEvent jobStatusChangeEvent = (JobStatusChangeEvent) jobChangeEvent;
                if (jobStatusChangeEvent.getEventType() == JobStatusChangeEvent.EventType.RUN_STATE_CHANGED) {
                    JobInProgress jobInProgress = jobChangeEvent.getJobInProgress();
                    String jobID = jobInProgress.getJobID().toString();
                    if (!jobInProgress.isComplete()) {
                        TestJobInProgressListener.LOG.info("Job " + jobID + " deleted from the waiting queue");
                        this.wjobs.remove(jobInProgress);
                        this.jobs.add(jobInProgress);
                    } else {
                        TestJobInProgressListener.LOG.info("Job " + jobID + " deleted from the running queue");
                        if (jobStatusChangeEvent.getOldStatus().getRunState() == 4) {
                            this.wjobs.remove(jobInProgress);
                        } else {
                            this.jobs.remove(jobInProgress);
                        }
                    }
                }
            }
        }
    }

    /* loaded from: input_file:jars/hadoop-test-1.1.2.jar:org/apache/hadoop/mapred/TestJobInProgressListener$MyScheduler.class */
    public static class MyScheduler extends JobQueueTaskScheduler {
        @Override // org.apache.hadoop.mapred.JobQueueTaskScheduler, org.apache.hadoop.mapred.TaskScheduler
        public synchronized void start() throws IOException {
            super.start();
            this.taskTrackerManager.removeJobInProgressListener(this.eagerTaskInitializationListener);
            this.eagerTaskInitializationListener.terminate();
        }

        @Override // org.apache.hadoop.mapred.JobQueueTaskScheduler, org.apache.hadoop.mapred.TaskScheduler
        public /* bridge */ /* synthetic */ Collection getJobs(String str) {
            return super.getJobs(str);
        }

        @Override // org.apache.hadoop.mapred.JobQueueTaskScheduler, org.apache.hadoop.mapred.TaskScheduler
        public /* bridge */ /* synthetic */ List assignTasks(org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker taskTracker) throws IOException {
            return super.assignTasks(taskTracker);
        }

        @Override // org.apache.hadoop.mapred.JobQueueTaskScheduler, org.apache.hadoop.mapred.TaskScheduler, org.apache.hadoop.conf.Configurable
        public /* bridge */ /* synthetic */ void setConf(Configuration configuration) {
            super.setConf(configuration);
        }

        @Override // org.apache.hadoop.mapred.JobQueueTaskScheduler, org.apache.hadoop.mapred.TaskScheduler
        public /* bridge */ /* synthetic */ void terminate() throws IOException {
            super.terminate();
        }

        @Override // org.apache.hadoop.mapred.TaskScheduler
        public /* bridge */ /* synthetic */ void checkJobSubmission(JobInProgress jobInProgress) throws IOException {
            super.checkJobSubmission(jobInProgress);
        }

        @Override // org.apache.hadoop.mapred.TaskScheduler
        public /* bridge */ /* synthetic */ void refresh() throws IOException {
            super.refresh();
        }

        @Override // org.apache.hadoop.mapred.TaskScheduler
        public /* bridge */ /* synthetic */ void setTaskTrackerManager(TaskTrackerManager taskTrackerManager) {
            super.setTaskTrackerManager(taskTrackerManager);
        }

        @Override // org.apache.hadoop.mapred.TaskScheduler, org.apache.hadoop.conf.Configurable
        public /* bridge */ /* synthetic */ Configuration getConf() {
            return super.getConf();
        }
    }

    private JobConf configureJob(JobConf jobConf, int i, int i2, Path path, Path path2, String str, String str2) throws IOException {
        UtilsForTests.configureWaitingJobConf(jobConf, path, path2, i, i2, "job-listener-test", str, str2);
        return jobConf;
    }

    public void testJobQueueChanges() throws IOException {
        LOG.info("Testing job queue changes");
        JobConf jobConf = new JobConf();
        MiniDFSCluster miniDFSCluster = new MiniDFSCluster(jobConf, 1, true, null, null);
        miniDFSCluster.waitActive();
        FileSystem fileSystem = miniDFSCluster.getFileSystem();
        miniDFSCluster.startDataNodes(jobConf, 1, true, null, null, null, null);
        miniDFSCluster.waitActive();
        MiniMRCluster miniMRCluster = new MiniMRCluster(1, miniDFSCluster.getFileSystem().getUri().getHost() + ValueAggregatorDescriptor.TYPE_SEPARATOR + miniDFSCluster.getFileSystem().getUri().getPort(), 1);
        JobClient jobClient = new JobClient(miniMRCluster.createJobConf());
        fileSystem.delete(this.testDir, true);
        if (!fileSystem.mkdirs(this.testDir)) {
            throw new IOException("Mkdirs failed to create " + this.testDir.toString());
        }
        Path path = new Path(this.testDir, "input");
        Path path2 = new Path(this.testDir, "share");
        String mapSignalFile = UtilsForTests.getMapSignalFile(path2);
        String reduceSignalFile = UtilsForTests.getReduceSignalFile(path2);
        UtilsForTests.writeFile(miniDFSCluster.getNameNode(), jobConf, new Path(path + "/file"), (short) 1);
        JobQueueJobInProgressListener jobQueueJobInProgressListener = new JobQueueJobInProgressListener();
        miniMRCluster.getJobTrackerRunner().getJobTracker().addJobInProgressListener(jobQueueJobInProgressListener);
        Path path3 = new Path(this.testDir, "output");
        JobConf configureJob = configureJob(miniMRCluster.createJobConf(), 10, 0, path, path3.suffix(OffsetParam.DEFAULT), mapSignalFile, reduceSignalFile);
        JobConf configureJob2 = configureJob(miniMRCluster.createJobConf(), 1, 0, path, path3.suffix("1"), mapSignalFile, reduceSignalFile);
        RunningJob submitJob = jobClient.submitJob(configureJob);
        LOG.info("Running job " + submitJob.getID().toString());
        RunningJob submitJob2 = jobClient.submitJob(configureJob2);
        LOG.info("Running job " + submitJob2.getID().toString());
        LOG.info("Testing job priority changes");
        LOG.info("Increasing job2's priority to HIGH");
        submitJob2.setJobPriority(JavaCore.COMPILER_TASK_PRIORITY_HIGH);
        assertTrue("Priority change garbles the queue", jobQueueJobInProgressListener.getJobQueue().size() == 2);
        JobInProgress[] jobInProgressArr = (JobInProgress[]) jobQueueJobInProgressListener.getJobQueue().toArray(new JobInProgress[0]);
        assertTrue("Priority change failed to bump up job2 in the queue", jobInProgressArr[0].getJobID().equals(submitJob2.getID()));
        assertTrue("Priority change failed to bump down job1 in the queue", jobInProgressArr[1].getJobID().equals(submitJob.getID()));
        assertEquals("Priority change has garbled the queue", 2, jobInProgressArr.length);
        LOG.info("Testing job start-time changes");
        LOG.info("Increasing job2's priority to NORMAL");
        submitJob2.setJobPriority("NORMAL");
        JobInProgress job = miniMRCluster.getJobTrackerRunner().getJobTracker().getJob(submitJob2.getID());
        JobInProgress job2 = miniMRCluster.getJobTrackerRunner().getJobTracker().getJob(submitJob.getID());
        JobStatus jobStatus = (JobStatus) job.getStatus().clone();
        job.startTime = job2.startTime - 1;
        job.status.setStartTime(job.startTime);
        JobStatus jobStatus2 = (JobStatus) job.getStatus().clone();
        LOG.info("Updating the listener about job2's start-time change");
        jobQueueJobInProgressListener.jobUpdated(new JobStatusChangeEvent(job, JobStatusChangeEvent.EventType.START_TIME_CHANGED, jobStatus, jobStatus2));
        assertTrue("Start time change garbles the queue", jobQueueJobInProgressListener.getJobQueue().size() == 2);
        JobInProgress[] jobInProgressArr2 = (JobInProgress[]) jobQueueJobInProgressListener.getJobQueue().toArray(new JobInProgress[0]);
        assertTrue("Start time change failed to bump up job2 in the queue", jobInProgressArr2[0].getJobID().equals(submitJob2.getID()));
        assertTrue("Start time change failed to bump down job1 in the queue", jobInProgressArr2[1].getJobID().equals(submitJob.getID()));
        assertEquals("Start time change has garbled the queue", 2, jobInProgressArr2.length);
        UtilsForTests.signalTasks(miniDFSCluster, fileSystem, true, mapSignalFile, reduceSignalFile);
        while (submitJob2.getJobState() != 2) {
            UtilsForTests.waitFor(10L);
        }
        while (submitJob.getJobState() != 2) {
            UtilsForTests.waitFor(10L);
        }
        assertTrue("Job completion garbles the queue", jobQueueJobInProgressListener.getJobQueue().size() == 0);
    }

    public void testJobFailure() throws Exception {
        LOG.info("Testing job-success");
        MyListener myListener = new MyListener();
        MiniMRCluster miniMRCluster = new MiniMRCluster(1, CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT, 1);
        JobConf createJobConf = miniMRCluster.createJobConf();
        miniMRCluster.getJobTrackerRunner().getJobTracker().addJobInProgressListener(myListener);
        assertFalse("Missing event notification on failing a running job", myListener.contains(UtilsForTests.runJobFail(createJobConf, new Path(TEST_ROOT_DIR + "/jiplistenerfailjob/input"), new Path(TEST_ROOT_DIR + "/jiplistenerfailjob/output")).getID()));
    }

    public void testJobKill() throws Exception {
        LOG.info("Testing job-kill");
        MyListener myListener = new MyListener();
        MiniMRCluster miniMRCluster = new MiniMRCluster(1, CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT, 1);
        JobConf createJobConf = miniMRCluster.createJobConf();
        miniMRCluster.getJobTrackerRunner().getJobTracker().addJobInProgressListener(myListener);
        assertFalse("Missing event notification on killing a running job", myListener.contains(UtilsForTests.runJobKill(createJobConf, new Path(TEST_ROOT_DIR + "/jiplistenerkilljob/input"), new Path(TEST_ROOT_DIR + "/jiplistenerkilljob/output")).getID()));
    }

    public void testJobSuccess() throws Exception {
        LOG.info("Testing job-success");
        MyListener myListener = new MyListener();
        MiniMRCluster miniMRCluster = new MiniMRCluster(1, CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT, 1);
        JobConf createJobConf = miniMRCluster.createJobConf();
        miniMRCluster.getJobTrackerRunner().getJobTracker().addJobInProgressListener(myListener);
        RunningJob runJob = UtilsForTests.runJob(createJobConf, new Path(TEST_ROOT_DIR + "/jiplistenerjob/input"), new Path(TEST_ROOT_DIR + "/jiplistenerjob/output"));
        while (runJob.getJobState() != 1) {
            UtilsForTests.waitFor(10L);
        }
        LOG.info("Job " + runJob.getID().toString() + " started running");
        assertFalse("Missing event notification for a running job", myListener.contains(runJob.getID(), true));
        while (runJob.getJobState() != 2) {
            UtilsForTests.waitFor(10L);
        }
        assertFalse("Missing event notification for a successful job", myListener.contains(runJob.getID(), false));
    }

    public void testQueuedJobKill() throws Exception {
        LOG.info("Testing queued-job-kill");
        MyListener myListener = new MyListener();
        JobConf jobConf = new JobConf();
        jobConf.setClass("mapred.jobtracker.taskScheduler", MyScheduler.class, TaskScheduler.class);
        MiniMRCluster miniMRCluster = new MiniMRCluster(1, CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT, 1, (String[]) null, (String[]) null, jobConf);
        JobConf createJobConf = miniMRCluster.createJobConf();
        miniMRCluster.getJobTrackerRunner().getJobTracker().addJobInProgressListener(myListener);
        RunningJob runJob = UtilsForTests.runJob(createJobConf, new Path(TEST_ROOT_DIR + "/jiplistenerjob/input"), new Path(TEST_ROOT_DIR + "/jiplistenerjob/output"));
        JobID id = runJob.getID();
        LOG.info("Job : " + id.toString() + " submitted");
        assertTrue("Missing event notification on submiting a job", myListener.contains(id, true));
        LOG.info("Killing job : " + id.toString());
        runJob.killJob();
        assertEquals("Job status doesnt reflect the kill-job action", 5, runJob.getJobState());
        assertFalse("Missing event notification on killing a waiting job", myListener.contains(id, true));
    }
}
