package org.apache.hadoop.mapred;

import java.io.IOException;
import java.util.Arrays;
import junit.extensions.TestSetup;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TestMapCollection;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.util.TestPureJavaCrc32;

/* loaded from: input_file:org/apache/hadoop/mapred/TestReduceFetch.class */
public class TestReduceFetch extends TestCase {
    private static MiniMRCluster mrCluster = null;
    private static MiniDFSCluster dfsCluster = null;

    /* loaded from: input_file:org/apache/hadoop/mapred/TestReduceFetch$MapMB.class */
    public static class MapMB implements Mapper<NullWritable, NullWritable, Text, Text> {
        @Override // org.apache.hadoop.mapred.Mapper
        public void map(NullWritable nullWritable, NullWritable nullWritable2, OutputCollector<Text, Text> outputCollector, Reporter reporter) throws IOException {
            Text text = new Text();
            Text text2 = new Text();
            text.set("KEYKEYKEYKEYKEYKEYKEYKEY");
            byte[] bArr = new byte[1000];
            Arrays.fill(bArr, (byte) 86);
            text2.set(bArr);
            for (int i = 0; i < 4096; i++) {
                outputCollector.collect(text, text2);
            }
        }

        @Override // org.apache.hadoop.mapred.JobConfigurable
        public void configure(JobConf jobConf) {
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
        }
    }

    public static Test suite() {
        return new TestSetup(new TestSuite(TestReduceFetch.class)) { // from class: org.apache.hadoop.mapred.TestReduceFetch.1
            protected void setUp() throws Exception {
                MiniDFSCluster unused = TestReduceFetch.dfsCluster = new MiniDFSCluster(new Configuration(), 2, true, null);
                MiniMRCluster unused2 = TestReduceFetch.mrCluster = new MiniMRCluster(2, TestReduceFetch.dfsCluster.getFileSystem().getUri().toString(), 1);
            }

            protected void tearDown() throws Exception {
                if (TestReduceFetch.dfsCluster != null) {
                    TestReduceFetch.dfsCluster.shutdown();
                }
                if (TestReduceFetch.mrCluster != null) {
                    TestReduceFetch.mrCluster.shutdown();
                }
            }
        };
    }

    public static Counters runJob(JobConf jobConf) throws Exception {
        jobConf.setMapperClass(MapMB.class);
        jobConf.setReducerClass(IdentityReducer.class);
        jobConf.setOutputKeyClass(Text.class);
        jobConf.setOutputValueClass(Text.class);
        jobConf.setNumReduceTasks(1);
        jobConf.setInputFormat(TestMapCollection.FakeIF.class);
        FileInputFormat.setInputPaths(jobConf, new Path("/in"));
        Path path = new Path("/out");
        FileOutputFormat.setOutputPath(jobConf, path);
        try {
            RunningJob runJob = JobClient.runJob(jobConf);
            assertTrue(runJob.isSuccessful());
            FileSystem fileSystem = dfsCluster.getFileSystem();
            if (fileSystem.exists(path)) {
                fileSystem.delete(path, true);
            }
            return runJob.getCounters();
        } catch (Throwable th) {
            FileSystem fileSystem2 = dfsCluster.getFileSystem();
            if (fileSystem2.exists(path)) {
                fileSystem2.delete(path, true);
            }
            throw th;
        }
    }

    public void testReduceFromDisk() throws Exception {
        JobConf createJobConf = mrCluster.createJobConf();
        createJobConf.set("mapred.job.reduce.input.buffer.percent", "0.0");
        createJobConf.setNumMapTasks(8);
        createJobConf.setInt("mapred.job.reduce.total.mem.bytes", TestPureJavaCrc32.PerformanceTest.BYTES_PER_SIZE);
        createJobConf.set("mapred.job.shuffle.input.buffer.percent", "0.05");
        createJobConf.setInt(CommonConfigurationKeysPublic.IO_SORT_FACTOR_KEY, 2);
        createJobConf.setInt("mapred.inmem.merge.threshold", 4);
        Counters runJob = runJob(createJobConf);
        long counter = runJob.findCounter((Enum<?>) Task.Counter.SPILLED_RECORDS).getCounter();
        long counter2 = runJob.findCounter((Enum<?>) Task.Counter.MAP_OUTPUT_RECORDS).getCounter();
        assertTrue("Expected all records spilled during reduce (" + counter + DefaultExpressionEngine.DEFAULT_INDEX_END, counter >= 2 * counter2);
        assertTrue("Expected intermediate merges (" + counter + DefaultExpressionEngine.DEFAULT_INDEX_END, counter >= (2 * counter2) + (counter2 / 8));
    }

    public void testReduceFromPartialMem() throws Exception {
        JobConf createJobConf = mrCluster.createJobConf();
        createJobConf.setNumMapTasks(7);
        createJobConf.setInt("mapred.inmem.merge.threshold", 0);
        createJobConf.set("mapred.job.reduce.input.buffer.percent", "1.0");
        createJobConf.setInt("mapred.reduce.parallel.copies", 1);
        createJobConf.setInt(CommonConfigurationKeysPublic.IO_SORT_MB_KEY, 10);
        createJobConf.setInt("mapred.job.reduce.total.mem.bytes", TestPureJavaCrc32.PerformanceTest.BYTES_PER_SIZE);
        createJobConf.set("mapred.job.shuffle.input.buffer.percent", "0.14");
        createJobConf.setNumTasksToExecutePerJvm(1);
        createJobConf.set("mapred.job.shuffle.merge.percent", "1.0");
        Counters runJob = runJob(createJobConf);
        long counter = runJob.findCounter((Enum<?>) Task.Counter.MAP_OUTPUT_RECORDS).getCounter();
        long counter2 = runJob.findCounter((Enum<?>) Task.Counter.SPILLED_RECORDS).getCounter();
        assertTrue("Expected some records not spilled during reduce" + counter2 + DefaultExpressionEngine.DEFAULT_INDEX_END, counter2 < 2 * counter);
    }

    public void testReduceFromMem() throws Exception {
        JobConf createJobConf = mrCluster.createJobConf();
        createJobConf.set("mapred.job.reduce.input.buffer.percent", "1.0");
        createJobConf.set("mapred.job.shuffle.input.buffer.percent", "1.0");
        createJobConf.setInt("mapred.job.reduce.total.mem.bytes", TestPureJavaCrc32.PerformanceTest.BYTES_PER_SIZE);
        createJobConf.setNumMapTasks(3);
        Counters runJob = runJob(createJobConf);
        long counter = runJob.findCounter((Enum<?>) Task.Counter.SPILLED_RECORDS).getCounter();
        assertEquals("Spilled records: " + counter, runJob.findCounter((Enum<?>) Task.Counter.MAP_OUTPUT_RECORDS).getCounter(), counter);
    }
}
