package org.apache.sysml.runtime.controlprogram.parfor;

import java.io.IOException;
import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.lib.NLineInputFormat;
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
import org.apache.sysml.runtime.controlprogram.caching.CacheStatistics;
import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysml.runtime.controlprogram.parfor.stat.Stat;
import org.apache.sysml.runtime.instructions.cp.Data;
import org.apache.sysml.runtime.io.MatrixReader;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.util.MapReduceTool;
import org.apache.sysml.utils.Statistics;
import org.apache.sysml.yarn.DMLAppMasterUtils;

/* loaded from: input_file:org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.class */
public class RemoteParForMR {
    protected static final Log LOG = LogFactory.getLog(RemoteParForMR.class.getName());

    public static RemoteParForJobReturn runJob(long j, String str, String str2, String str3, MatrixObject matrixObject, boolean z, int i, int i2, int i3, long j2, boolean z2) throws DMLRuntimeException {
        long nanoTime = DMLScript.STATISTICS ? System.nanoTime() : 0L;
        JobConf jobConf = new JobConf(RemoteParForMR.class);
        jobConf.setJobName("ParFor-EMR" + j);
        Statistics.incrementNoOfCompiledMRJobs();
        try {
            try {
                MRJobConfiguration.setProgramBlocks(jobConf, str);
                MRJobConfiguration.setParforCachingConfig(jobConf, z);
                jobConf.setMapperClass(RemoteParWorkerMapper.class);
                if (matrixObject != null) {
                    jobConf.setInputFormat(RemoteParForColocatedNLineInputFormat.class);
                    MRJobConfiguration.setPartitioningFormat(jobConf, matrixObject.getPartitionFormat());
                    MatrixCharacteristics matrixCharacteristics = matrixObject.getMatrixCharacteristics();
                    MRJobConfiguration.setPartitioningBlockNumRows(jobConf, matrixCharacteristics.getRowsPerBlock());
                    MRJobConfiguration.setPartitioningBlockNumCols(jobConf, matrixCharacteristics.getColsPerBlock());
                    MRJobConfiguration.setPartitioningFilename(jobConf, matrixObject.getFileName());
                } else {
                    jobConf.setInputFormat(NLineInputFormat.class);
                }
                FileInputFormat.setInputPaths(jobConf, new Path[]{new Path(str2)});
                jobConf.setOutputFormat(SequenceFileOutputFormat.class);
                MapReduceTool.deleteFileIfExistOnHDFS(str3);
                FileOutputFormat.setOutputPath(jobConf, new Path(str3));
                jobConf.setMapOutputKeyClass(LongWritable.class);
                jobConf.setMapOutputValueClass(Text.class);
                jobConf.setOutputKeyClass(LongWritable.class);
                jobConf.setOutputValueClass(Text.class);
                jobConf.setNumMapTasks(i);
                jobConf.setNumReduceTasks(0);
                if (j2 > 0 && j2 > InfrastructureAnalyzer.extractMaxMemoryOpt(jobConf.get("mapred.child.java.opts"))) {
                    InfrastructureAnalyzer.setMaxMemoryOpt(jobConf, "mapred.child.java.opts", j2);
                    LOG.warn("Forcing 'mapred.child.java.opts' to -Xmx" + (j2 / 1048576) + "M.");
                }
                jobConf.setInt("mapred.task.timeout", 0);
                jobConf.setMapSpeculativeExecution(false);
                DMLAppMasterUtils.setupMRJobRemoteMaxMemory(jobConf, ConfigurationManager.getConfig());
                if (z2) {
                    jobConf.setNumTasksToExecutePerJvm(-1);
                }
                jobConf.setInt("io.sort.mb", 8);
                jobConf.setInt("dfs.replication", i2);
                MRJobConfiguration.setUniqueWorkingDir(jobConf);
                RunningJob runJob = JobClient.runJob(jobConf);
                Statistics.incrementNoOfExecutedMRJobs();
                Counters.Group group = runJob.getCounters().getGroup(ParForProgramBlock.PARFOR_COUNTER_GROUP_NAME);
                int counter = (int) group.getCounter(Stat.PARFOR_NUMTASKS.toString());
                int counter2 = (int) group.getCounter(Stat.PARFOR_NUMITERS.toString());
                if (DMLScript.STATISTICS && !InfrastructureAnalyzer.isLocalMode()) {
                    Statistics.incrementJITCompileTime(group.getCounter(Stat.PARFOR_JITCOMPILE.toString()));
                    Statistics.incrementJVMgcCount(group.getCounter(Stat.PARFOR_JVMGC_COUNT.toString()));
                    Statistics.incrementJVMgcTime(group.getCounter(Stat.PARFOR_JVMGC_TIME.toString()));
                    Counters.Group group2 = runJob.getCounters().getGroup(CacheableData.CACHING_COUNTER_GROUP_NAME.toString());
                    CacheStatistics.incrementMemHits((int) group2.getCounter(CacheStatistics.Stat.CACHE_HITS_MEM.toString()));
                    CacheStatistics.incrementFSBuffHits((int) group2.getCounter(CacheStatistics.Stat.CACHE_HITS_FSBUFF.toString()));
                    CacheStatistics.incrementFSHits((int) group2.getCounter(CacheStatistics.Stat.CACHE_HITS_FS.toString()));
                    CacheStatistics.incrementHDFSHits((int) group2.getCounter(CacheStatistics.Stat.CACHE_HITS_HDFS.toString()));
                    CacheStatistics.incrementFSBuffWrites((int) group2.getCounter(CacheStatistics.Stat.CACHE_WRITES_FSBUFF.toString()));
                    CacheStatistics.incrementFSWrites((int) group2.getCounter(CacheStatistics.Stat.CACHE_WRITES_FS.toString()));
                    CacheStatistics.incrementHDFSWrites((int) group2.getCounter(CacheStatistics.Stat.CACHE_WRITES_HDFS.toString()));
                    CacheStatistics.incrementAcquireRTime(group2.getCounter(CacheStatistics.Stat.CACHE_TIME_ACQR.toString()));
                    CacheStatistics.incrementAcquireMTime(group2.getCounter(CacheStatistics.Stat.CACHE_TIME_ACQM.toString()));
                    CacheStatistics.incrementReleaseTime(group2.getCounter(CacheStatistics.Stat.CACHE_TIME_RLS.toString()));
                    CacheStatistics.incrementExportTime(group2.getCounter(CacheStatistics.Stat.CACHE_TIME_EXP.toString()));
                }
                RemoteParForJobReturn remoteParForJobReturn = new RemoteParForJobReturn(runJob.isSuccessful(), counter, counter2, readResultFile(jobConf, str3));
                try {
                    MapReduceTool.deleteFileIfExistOnHDFS(new Path(str2), jobConf);
                    MapReduceTool.deleteFileIfExistOnHDFS(new Path(str3), jobConf);
                    if (DMLScript.STATISTICS) {
                        Statistics.maintainCPHeavyHitters("MR-Job_ParFor-EMR", System.nanoTime() - nanoTime);
                    }
                    return remoteParForJobReturn;
                } catch (IOException e) {
                    throw new DMLRuntimeException(e);
                }
            } catch (Exception e2) {
                throw new DMLRuntimeException(e2);
            }
        } catch (Throwable th) {
            try {
                MapReduceTool.deleteFileIfExistOnHDFS(new Path(str2), jobConf);
                MapReduceTool.deleteFileIfExistOnHDFS(new Path(str3), jobConf);
                throw th;
            } catch (IOException e3) {
                throw new DMLRuntimeException(e3);
            }
        }
    }

    public static LocalVariableMap[] readResultFile(JobConf jobConf, String str) throws DMLRuntimeException, IOException {
        HashMap hashMap = new HashMap();
        FileSystem fileSystem = FileSystem.get(jobConf);
        Path path = new Path(str);
        LongWritable longWritable = new LongWritable();
        Text text = new Text();
        int i = 0;
        for (Path path2 : MatrixReader.getSequenceFilePaths(fileSystem, path)) {
            SequenceFile.Reader reader = new SequenceFile.Reader(FileSystem.get(jobConf), path2, jobConf);
            while (reader.next(longWritable, text)) {
                try {
                    if (!hashMap.containsKey(Long.valueOf(longWritable.get()))) {
                        hashMap.put(Long.valueOf(longWritable.get()), new LocalVariableMap());
                    }
                    Object[] parseDataObject = ProgramConverter.parseDataObject(text.toString());
                    ((LocalVariableMap) hashMap.get(Long.valueOf(longWritable.get()))).put((String) parseDataObject[0], (Data) parseDataObject[1]);
                    i++;
                } finally {
                    if (reader != null) {
                        reader.close();
                    }
                }
            }
        }
        LOG.debug("Num remote worker results (before deduplication): " + i);
        LOG.debug("Num remote worker results: " + hashMap.size());
        return (LocalVariableMap[]) hashMap.values().toArray(new LocalVariableMap[0]);
    }
}
