package org.apache.sysml.runtime.instructions.spark;

import java.util.LinkedList;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.DMLUnsupportedOperationException;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysml.runtime.functionobjects.Multiply;
import org.apache.sysml.runtime.functionobjects.Plus;
import org.apache.sysml.runtime.instructions.InstructionUtils;
import org.apache.sysml.runtime.instructions.cp.CPOperand;
import org.apache.sysml.runtime.instructions.spark.SPInstruction;
import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
import org.apache.sysml.runtime.matrix.data.MatrixValue;
import org.apache.sysml.runtime.matrix.data.TripleIndexes;
import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
import org.apache.sysml.runtime.matrix.operators.Operator;
import scala.Tuple2;

/* loaded from: input_file:org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.class */
public class RmmSPInstruction extends BinarySPInstruction {

    /* loaded from: input_file:org/apache/sysml/runtime/instructions/spark/RmmSPInstruction$RmmMultiplyFunction.class */
    private static class RmmMultiplyFunction implements PairFunction<Tuple2<TripleIndexes, Tuple2<MatrixBlock, MatrixBlock>>, MatrixIndexes, MatrixBlock> {
        private static final long serialVersionUID = -5772410117511730911L;
        private AggregateBinaryOperator _op;

        public RmmMultiplyFunction() {
            this._op = null;
            this._op = new AggregateBinaryOperator(Multiply.getMultiplyFnObject(), new AggregateOperator(0.0d, Plus.getPlusFnObject()));
        }

        public Tuple2<MatrixIndexes, MatrixBlock> call(Tuple2<TripleIndexes, Tuple2<MatrixBlock, MatrixBlock>> tuple2) throws Exception {
            TripleIndexes tripleIndexes = (TripleIndexes) tuple2._1();
            MatrixIndexes matrixIndexes = new MatrixIndexes(tripleIndexes.getFirstIndex(), tripleIndexes.getSecondIndex());
            MatrixValue matrixValue = (MatrixBlock) ((Tuple2) tuple2._2())._1();
            MatrixBlock matrixBlock = (MatrixBlock) ((Tuple2) tuple2._2())._2();
            MatrixBlock matrixBlock2 = new MatrixBlock();
            matrixValue.aggregateBinaryOperations(matrixValue, matrixBlock, matrixBlock2, this._op);
            return new Tuple2<>(matrixIndexes, matrixBlock2);
        }
    }

    /* loaded from: input_file:org/apache/sysml/runtime/instructions/spark/RmmSPInstruction$RmmReplicateFunction.class */
    private static class RmmReplicateFunction implements PairFlatMapFunction<Tuple2<MatrixIndexes, MatrixBlock>, TripleIndexes, MatrixBlock> {
        private static final long serialVersionUID = 3577072668341033932L;
        private long _len;
        private long _blen;
        private boolean _left;

        public RmmReplicateFunction(long j, long j2, boolean z) {
            this._len = -1L;
            this._blen = -1L;
            this._left = false;
            this._len = j;
            this._blen = j2;
            this._left = z;
        }

        public Iterable<Tuple2<TripleIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> tuple2) throws Exception {
            LinkedList linkedList = new LinkedList();
            MatrixIndexes matrixIndexes = (MatrixIndexes) tuple2._1();
            MatrixBlock matrixBlock = (MatrixBlock) tuple2._2();
            long ceil = (long) Math.ceil(this._len / this._blen);
            if (!this._left) {
                long rowIndex = matrixIndexes.getRowIndex();
                long columnIndex = matrixIndexes.getColumnIndex();
                long j = 1;
                while (true) {
                    long j2 = j;
                    if (j2 > ceil) {
                        break;
                    }
                    linkedList.add(new Tuple2(new TripleIndexes(j2, columnIndex, rowIndex), new MatrixBlock(matrixBlock)));
                    j = j2 + 1;
                }
            } else {
                long rowIndex2 = matrixIndexes.getRowIndex();
                long columnIndex2 = matrixIndexes.getColumnIndex();
                long j3 = 1;
                while (true) {
                    long j4 = j3;
                    if (j4 > ceil) {
                        break;
                    }
                    linkedList.add(new Tuple2(new TripleIndexes(rowIndex2, j4, columnIndex2), new MatrixBlock(matrixBlock)));
                    j3 = j4 + 1;
                }
            }
            return linkedList;
        }
    }

    public RmmSPInstruction(Operator operator, CPOperand cPOperand, CPOperand cPOperand2, CPOperand cPOperand3, String str, String str2) {
        super(operator, cPOperand, cPOperand2, cPOperand3, str, str2);
        this._sptype = SPInstruction.SPINSTRUCTION_TYPE.RMM;
    }

    public static RmmSPInstruction parseInstruction(String str) throws DMLRuntimeException {
        String[] instructionPartsWithValueType = InstructionUtils.getInstructionPartsWithValueType(str);
        String str2 = instructionPartsWithValueType[0];
        if ("rmm".equals(str2)) {
            return new RmmSPInstruction(null, new CPOperand(instructionPartsWithValueType[1]), new CPOperand(instructionPartsWithValueType[2]), new CPOperand(instructionPartsWithValueType[3]), str2, str);
        }
        throw new DMLRuntimeException("RmmSPInstruction.parseInstruction():: Unknown opcode " + str2);
    }

    @Override // org.apache.sysml.runtime.instructions.spark.SPInstruction, org.apache.sysml.runtime.instructions.Instruction
    public void processInstruction(ExecutionContext executionContext) throws DMLRuntimeException, DMLUnsupportedOperationException {
        SparkExecutionContext sparkExecutionContext = (SparkExecutionContext) executionContext;
        MatrixCharacteristics matrixCharacteristics = sparkExecutionContext.getMatrixCharacteristics(this.input1.getName());
        JavaPairRDD<MatrixIndexes, MatrixBlock> sumByKeyStable = RDDAggregateUtils.sumByKeyStable(sparkExecutionContext.getBinaryBlockRDDHandleForVariable(this.input1.getName()).flatMapToPair(new RmmReplicateFunction(sparkExecutionContext.getMatrixCharacteristics(this.input2.getName()).getCols(), r0.getColsPerBlock(), true)).join(sparkExecutionContext.getBinaryBlockRDDHandleForVariable(this.input2.getName()).flatMapToPair(new RmmReplicateFunction(matrixCharacteristics.getRows(), matrixCharacteristics.getRowsPerBlock(), false))).mapToPair(new RmmMultiplyFunction()));
        updateBinaryMMOutputMatrixCharacteristics(sparkExecutionContext, true);
        sparkExecutionContext.setRDDHandleForVariable(this.output.getName(), sumByKeyStable);
        sparkExecutionContext.addLineageRDD(this.output.getName(), this.input1.getName());
        sparkExecutionContext.addLineageRDD(this.output.getName(), this.input2.getName());
    }
}
