package org.apache.hadoop.mapred;

import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobInProgress;
import org.apache.hadoop.mapred.UtilsForTests;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.StaticMapping;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/hadoop/mapred/TestJobInProgress.class */
public class TestJobInProgress {
    private static MiniMRCluster mrCluster;
    private static MiniDFSCluster dfsCluster;
    private static JobTracker jt;
    static final Log LOG = LogFactory.getLog(TestJobInProgress.class);
    static final String[] trackers = {"tracker_tracker1.r1.com:1000", "tracker_tracker2.r1.com:1000", "tracker_tracker3.r2.com:1000", "tracker_tracker4.r3.com:1000"};
    static final String[] hosts = {"tracker1.r1.com", "tracker2.r1.com", "tracker3.r2.com", "tracker4.r3.com"};
    static final String[] racks = {"/r1", "/r1", "/r2", "/r3"};
    private static Path TEST_DIR = new Path(System.getProperty("test.build.data", "/tmp"), "jip-testing");
    private static int numSlaves = 4;

    /* loaded from: input_file:org/apache/hadoop/mapred/TestJobInProgress$FailMapTaskJob.class */
    public static class FailMapTaskJob extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
        @Override // org.apache.hadoop.mapred.Mapper
        public void map(LongWritable longWritable, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {
            try {
                Thread.sleep(1000L);
                throw new IllegalArgumentException("Failing MAP task");
            } catch (InterruptedException e) {
                throw new IllegalArgumentException("Interrupted MAP task");
            }
        }
    }

    /* loaded from: input_file:org/apache/hadoop/mapred/TestJobInProgress$FailReduceTaskJob.class */
    public static class FailReduceTaskJob extends MapReduceBase implements Reducer {
        @Override // org.apache.hadoop.mapred.Reducer
        public void reduce(Object obj, Iterator it, OutputCollector outputCollector, Reporter reporter) throws IOException {
            try {
                Thread.sleep(1000L);
                throw new IllegalArgumentException("Failing Reduce task");
            } catch (InterruptedException e) {
                throw new IllegalArgumentException("Failing Reduce task");
            }
        }
    }

    @BeforeClass
    public static void setUp() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set("mapreduce.jobtracker.address", "localhost:0");
        configuration.set(JTConfig.JT_HTTP_ADDRESS, YarnConfiguration.DEFAULT_NM_ADDRESS);
        configuration.setClass("topology.node.switch.mapping.impl", StaticMapping.class, DNSToSwitchMapping.class);
        dfsCluster = new MiniDFSCluster(configuration, numSlaves, true, null);
        mrCluster = new MiniMRCluster(numSlaves, dfsCluster.getFileSystem().getUri().toString(), 1);
        jt = mrCluster.getJobTrackerRunner().getJobTracker();
        for (int i = 0; i < hosts.length; i++) {
            StaticMapping.addNodeToRack(hosts[i], racks[i]);
        }
        for (String str : trackers) {
            FakeObjectUtilities.establishFirstContact(jt, str);
        }
    }

    @Test
    public void testPendingMapTaskCount() throws Exception {
        launchTask(FailMapTaskJob.class, IdentityReducer.class);
        checkTaskCounts();
    }

    @Test
    public void testSlowStartAndFailurePercent() throws Exception {
        launchTaskSlowStart(FailMapTaskJob.class, IdentityReducer.class);
        checkTaskCounts();
    }

    @Test
    public void testPendingReduceTaskCount() throws Exception {
        launchTask(IdentityMapper.class, FailReduceTaskJob.class);
        checkTaskCounts();
    }

    private void testRunningTaskCount(boolean z, boolean z2) throws Exception {
        LOG.info("Testing running jobs with speculation : " + z + ", locality : " + z2);
        dfsCluster.getFileSystem().delete(TEST_DIR, true);
        Path path = new Path(TEST_DIR, "map-signal");
        Path path2 = new Path(TEST_DIR, "reduce-signal");
        JobConf configure = configure(UtilsForTests.WaitingMapper.class, IdentityReducer.class, 1, 1, z2);
        configure.set(UtilsForTests.getTaskSignalParameter(true), path.toString());
        configure.set(UtilsForTests.getTaskSignalParameter(false), path2.toString());
        configure.setFloat("mapred.reduce.slowstart.completed.maps", 0.0f);
        configure.setSpeculativeExecution(z);
        JobClient jobClient = new JobClient(configure);
        JobInProgress job = mrCluster.getJobTrackerRunner().getJobTracker().getJob(jobClient.submitJob(configure).getID());
        LOG.info("Running job " + job.getJobID());
        LOG.info("Waiting for job " + job.getJobID() + " to be ready");
        waitTillReady(job, configure);
        HashSet hashSet = new HashSet();
        Iterator it = job.getRunningMapCache().entrySet().iterator();
        while (it.hasNext()) {
            hashSet.addAll((Collection) ((Map.Entry) it.next()).getValue());
        }
        hashSet.addAll(job.getNonLocalRunningMaps());
        Assert.assertEquals("Running map count doesnt match for jobs with speculation " + z + ", and locality " + z2, job.runningMaps(), hashSet.size());
        Assert.assertEquals("Running reducer count doesnt match for jobs with speculation " + z + ", and locality " + z2, job.runningReduces(), job.getRunningReduces().size());
        LOG.info("Signaling the tasks");
        UtilsForTests.signalTasks(dfsCluster, dfsCluster.getFileSystem(), path.toString(), path2.toString(), numSlaves);
        LOG.info("Waiting for job " + job.getJobID() + " to be complete");
        UtilsForTests.waitTillDone(jobClient);
        dfsCluster.getFileSystem().delete(TEST_DIR, true);
    }

    private void waitTillReady(JobInProgress jobInProgress, JobConf jobConf) {
        while (jobInProgress.runningMaps() < jobConf.getNumMapTasks()) {
            UtilsForTests.waitFor(10L);
        }
        while (jobInProgress.runningReduces() < jobConf.getNumReduceTasks()) {
            UtilsForTests.waitFor(10L);
        }
    }

    @Test
    public void testRunningTaskCount() throws Exception {
        testRunningTaskCount(false, true);
        testRunningTaskCount(true, true);
        testRunningTaskCount(false, false);
        testRunningTaskCount(true, false);
    }

    @Test
    public void testLocality() throws Exception {
        NetworkTopology networkTopology = new NetworkTopology();
        NodeBase nodeBase = new NodeBase("/default/rack1/node1");
        networkTopology.add(nodeBase);
        NodeBase nodeBase2 = new NodeBase("/default/rack1/node2");
        networkTopology.add(nodeBase2);
        NodeBase nodeBase3 = new NodeBase("/default/rack2/node3");
        networkTopology.add(nodeBase3);
        LOG.debug("r1n1 parent: " + nodeBase.getParent() + "\nr1n2 parent: " + nodeBase2.getParent() + "\nr2n3 parent: " + nodeBase3.getParent());
        Assert.assertEquals(0L, JobInProgress.getMatchingLevelForNodes(nodeBase, nodeBase, 3));
        Assert.assertEquals(1L, JobInProgress.getMatchingLevelForNodes(nodeBase, nodeBase2, 3));
        Assert.assertEquals(2L, JobInProgress.getMatchingLevelForNodes(nodeBase, nodeBase3, 3));
    }

    @AfterClass
    public static void tearDown() throws Exception {
        mrCluster.shutdown();
        dfsCluster.shutdown();
    }

    void launchTaskSlowStart(Class cls, Class cls2) throws Exception {
        JobConf configure = configure(cls, cls2, 5, 10, true);
        configure.setFloat("mapred.reduce.slowstart.completed.maps", 1.0f);
        configure.setInt("mapred.max.map.failures.percent", 100);
        try {
            JobClient.runJob(configure);
        } catch (IOException e) {
        }
    }

    void launchTask(Class cls, Class cls2) throws Exception {
        try {
            JobClient.runJob(configure(cls, cls2, 5, 10, true));
        } catch (IOException e) {
        }
    }

    JobConf configure(Class cls, Class cls2, int i, int i2, boolean z) throws Exception {
        JobConf createJobConf = mrCluster.createJobConf();
        Path path = new Path("./failjob/input");
        Path path2 = new Path("./failjob/output");
        FileSystem fileSystem = path.getFileSystem(createJobConf);
        path2.getFileSystem(createJobConf).delete(path2, true);
        if (!fileSystem.mkdirs(path)) {
            throw new IOException("create directory failed" + path.toString());
        }
        FSDataOutputStream create = fileSystem.create(new Path(path, "part-0"));
        create.writeBytes("Test failing job.\n One more line");
        create.close();
        createJobConf.setJobName("failmaptask");
        if (z) {
            createJobConf.setInputFormat(TextInputFormat.class);
        } else {
            createJobConf.setInputFormat(UtilsForTests.RandomInputFormat.class);
        }
        createJobConf.setOutputKeyClass(Text.class);
        createJobConf.setOutputValueClass(Text.class);
        createJobConf.setMapperClass(cls);
        createJobConf.setCombinerClass(cls2);
        createJobConf.setReducerClass(cls2);
        FileInputFormat.setInputPaths(createJobConf, path);
        FileOutputFormat.setOutputPath(createJobConf, path2);
        createJobConf.setNumMapTasks(i);
        createJobConf.setNumReduceTasks(i2);
        return createJobConf;
    }

    void checkTaskCounts() {
        for (JobStatus jobStatus : jt.getAllJobs()) {
            JobInProgress job = jt.getJob(jobStatus.getJobID());
            Counters jobCounters = job.getJobCounters();
            long counter = jobCounters.getCounter(JobInProgress.Counter.TOTAL_LAUNCHED_MAPS) + jobCounters.getCounter(JobInProgress.Counter.TOTAL_LAUNCHED_REDUCES);
            while (job.getNumTaskCompletionEvents() < counter) {
                Assert.assertEquals(true, Boolean.valueOf(job.runningMaps() >= 0));
                Assert.assertEquals(true, Boolean.valueOf(job.pendingMaps() >= 0));
                Assert.assertEquals(true, Boolean.valueOf(job.runningReduces() >= 0));
                Assert.assertEquals(true, Boolean.valueOf(job.pendingReduces() >= 0));
            }
        }
    }

    @Test
    public void testScheduleReducesConsiderFailedMapTips() throws Exception {
        JobInProgress jobInProgress = (JobInProgress) Mockito.mock(JobInProgress.class);
        Mockito.when(Boolean.valueOf(jobInProgress.scheduleReduces())).thenCallRealMethod();
        jobInProgress.failedMapTIPs = 10;
        jobInProgress.finishedMapTasks = 50;
        jobInProgress.completedMapsForReduceSlowstart = 60;
        Assert.assertTrue("The Reduce is not scheduled", jobInProgress.scheduleReduces());
    }
}
