package org.apache.sysml.runtime.instructions.spark.utils;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.sysml.parser.DataExpression;
import org.apache.sysml.parser.Expression;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.parfor.opt.PerfTestTool;
import org.apache.sysml.runtime.instructions.spark.data.SerLongWritable;
import org.apache.sysml.runtime.instructions.spark.data.SerText;
import org.apache.sysml.runtime.instructions.spark.functions.ConvertFrameBlockToIJVLines;
import org.apache.sysml.runtime.io.IOUtilFunctions;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
import org.apache.sysml.runtime.matrix.data.FrameBlock;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
import org.apache.sysml.runtime.matrix.data.Pair;
import org.apache.sysml.runtime.matrix.mapred.FrameReblockBuffer;
import org.apache.sysml.runtime.util.DataConverter;
import org.apache.sysml.runtime.util.FastStringTokenizer;
import org.apache.sysml.runtime.util.UtilFunctions;
import scala.Tuple2;

/* loaded from: input_file:org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.class */
public class FrameRDDConverterUtils {

    /* loaded from: input_file:org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils$BinaryBlockToCSVFunction.class */
    private static class BinaryBlockToCSVFunction implements FlatMapFunction<Tuple2<LongWritable, FrameBlock>, String> {
        private static final long serialVersionUID = 8020608184930291069L;
        private CSVFileFormatProperties _props;

        public BinaryBlockToCSVFunction(CSVFileFormatProperties cSVFileFormatProperties) {
            this._props = null;
            this._props = cSVFileFormatProperties;
        }

        public Iterable<String> call(Tuple2<LongWritable, FrameBlock> tuple2) throws Exception {
            LongWritable longWritable = (LongWritable) tuple2._1();
            FrameBlock frameBlock = (FrameBlock) tuple2._2();
            ArrayList arrayList = new ArrayList();
            if (this._props.hasHeader() && longWritable.get() == 1) {
                StringBuilder sb = new StringBuilder();
                for (int i = 1; i <= frameBlock.getNumColumns(); i++) {
                    if (i != 1) {
                        sb.append(this._props.getDelim());
                    }
                    sb.append("C" + i);
                }
                arrayList.add(sb.toString());
            }
            StringBuilder sb2 = new StringBuilder();
            for (int i2 = 0; i2 < frameBlock.getNumRows(); i2++) {
                for (int i3 = 0; i3 < frameBlock.getNumColumns(); i3++) {
                    if (i3 != 0) {
                        sb2.append(this._props.getDelim());
                    }
                    Object obj = frameBlock.get(i2, i3);
                    if (obj != null) {
                        sb2.append(obj);
                    }
                }
                arrayList.add(sb2.toString());
                sb2.setLength(0);
            }
            return arrayList;
        }
    }

    /* loaded from: input_file:org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils$BinaryBlockToMatrixBlockFunction.class */
    private static class BinaryBlockToMatrixBlockFunction implements PairFlatMapFunction<Tuple2<LongWritable, FrameBlock>, MatrixIndexes, MatrixBlock> {
        private static final long serialVersionUID = -2654986510471835933L;
        MatrixCharacteristics _mcIn;
        MatrixCharacteristics _mcOut;

        public BinaryBlockToMatrixBlockFunction(MatrixCharacteristics matrixCharacteristics, MatrixCharacteristics matrixCharacteristics2) {
            this._mcIn = matrixCharacteristics;
            this._mcOut = matrixCharacteristics2;
        }

        public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<LongWritable, FrameBlock> tuple2) throws Exception {
            long j = ((LongWritable) tuple2._1()).get();
            FrameBlock frameBlock = (FrameBlock) tuple2._2();
            ArrayList arrayList = new ArrayList();
            int rowsPerBlock = this._mcOut.getRowsPerBlock();
            int colsPerBlock = this._mcOut.getColsPerBlock();
            long rows = this._mcIn.getRows();
            long cols = this._mcIn.getCols();
            long j2 = 0;
            while (true) {
                long j3 = j2;
                if (j3 >= frameBlock.getNumRows()) {
                    return arrayList;
                }
                long j4 = ((((j + j3) - 1) / rowsPerBlock) + 1) * rowsPerBlock;
                long max = Math.max((j4 - rowsPerBlock) + 1, 0L);
                long min = Math.min(j4, rows);
                long computeCellInBlock = UtilFunctions.computeCellInBlock(max, rowsPerBlock);
                long computeCellInBlock2 = UtilFunctions.computeCellInBlock(min, rowsPerBlock);
                long j5 = 0;
                while (true) {
                    long j6 = j5;
                    if (j6 < frameBlock.getNumColumns()) {
                        long min2 = Math.min((j6 + colsPerBlock) - 1, cols - 1);
                        long computeCellInBlock3 = UtilFunctions.computeCellInBlock(j6 + 1, colsPerBlock);
                        long computeCellInBlock4 = UtilFunctions.computeCellInBlock(min2 + 1, colsPerBlock);
                        FrameBlock sliceOperations = frameBlock.sliceOperations((int) j3, (int) ((j3 + computeCellInBlock2) - computeCellInBlock), (int) j6, (int) min2, new FrameBlock());
                        MatrixIndexes matrixIndexes = new MatrixIndexes(UtilFunctions.computeBlockIndex(max + 1, rowsPerBlock), UtilFunctions.computeBlockIndex(j6 + 1, colsPerBlock));
                        MatrixBlock convertToMatrixBlock = DataConverter.convertToMatrixBlock(sliceOperations);
                        arrayList.add(new Tuple2(matrixIndexes, convertToMatrixBlock.leftIndexingOperations(convertToMatrixBlock, (int) computeCellInBlock, (int) computeCellInBlock2, (int) computeCellInBlock3, (int) computeCellInBlock4, new MatrixBlock(), true)));
                        j5 = min2 + 1;
                    }
                }
                j2 = j3 + (min - max) + 1;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils$CSVToBinaryBlockFunction.class */
    public static class CSVToBinaryBlockFunction implements PairFlatMapFunction<Iterator<Tuple2<Text, Long>>, LongWritable, FrameBlock> {
        private static final long serialVersionUID = -1976803898174960086L;
        private long _clen;
        private boolean _hasHeader;
        private String _delim;
        private boolean _fill;
        private int _maxRowsPerBlock;
        protected static final int BUFFER_SIZE = 1000000;

        public CSVToBinaryBlockFunction(MatrixCharacteristics matrixCharacteristics, boolean z, String str, boolean z2) {
            this._clen = -1L;
            this._hasHeader = false;
            this._delim = null;
            this._fill = false;
            this._maxRowsPerBlock = -1;
            this._clen = matrixCharacteristics.getCols();
            this._hasHeader = z;
            this._delim = str;
            this._fill = z2;
            this._maxRowsPerBlock = Math.max((int) (PerfTestTool.MAX_DATASIZE / this._clen), 1);
        }

        public Iterable<Tuple2<LongWritable, FrameBlock>> call(Iterator<Tuple2<Text, Long>> it) throws Exception {
            ArrayList<Tuple2<LongWritable, FrameBlock>> arrayList = new ArrayList<>();
            LongWritable[] longWritableArr = new LongWritable[1];
            FrameBlock[] frameBlockArr = new FrameBlock[1];
            int i = 0;
            while (it.hasNext()) {
                Tuple2<Text, Long> next = it.next();
                String text = ((Text) next._1()).toString();
                long longValue = ((Long) next._2()).longValue();
                if (!this._hasHeader) {
                    longValue++;
                }
                if (!this._hasHeader || longValue != 0) {
                    if (i == 0 || i == this._maxRowsPerBlock) {
                        if (i == this._maxRowsPerBlock) {
                            flushBlocksToList(longWritableArr, frameBlockArr, arrayList);
                        }
                        createBlocks(longValue, longWritableArr, frameBlockArr);
                        i = 0;
                    }
                    frameBlockArr[0].appendRow(IOUtilFunctions.split(text, this._delim));
                    i++;
                    IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(text, this._fill, false);
                }
            }
            flushBlocksToList(longWritableArr, frameBlockArr, arrayList);
            return arrayList;
        }

        private void createBlocks(long j, LongWritable[] longWritableArr, FrameBlock[] frameBlockArr) {
            longWritableArr[0] = new LongWritable(j);
            frameBlockArr[0] = new FrameBlock((int) this._clen, Expression.ValueType.STRING);
        }

        private void flushBlocksToList(LongWritable[] longWritableArr, FrameBlock[] frameBlockArr, ArrayList<Tuple2<LongWritable, FrameBlock>> arrayList) throws DMLRuntimeException {
            int length = longWritableArr.length;
            for (int i = 0; i < length; i++) {
                if (frameBlockArr[i] != null) {
                    arrayList.add(new Tuple2<>(longWritableArr[i], frameBlockArr[i]));
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils$CellToBinaryBlockFunction.class */
    public static abstract class CellToBinaryBlockFunction implements Serializable {
        private static final long serialVersionUID = -729614449626680946L;
        protected static final int BUFFER_SIZE = 1000000;
        protected int _bufflen;
        protected long _rlen;
        protected long _clen;

        protected CellToBinaryBlockFunction(MatrixCharacteristics matrixCharacteristics) {
            this._bufflen = -1;
            this._rlen = -1L;
            this._clen = -1L;
            this._rlen = matrixCharacteristics.getRows();
            this._clen = matrixCharacteristics.getCols();
            this._bufflen = (int) Math.min(this._rlen * this._clen, PerfTestTool.MAX_DATASIZE);
        }

        protected void flushBufferToList(FrameReblockBuffer frameReblockBuffer, ArrayList<Tuple2<Long, FrameBlock>> arrayList) throws IOException, DMLRuntimeException {
            ArrayList<Pair<Long, FrameBlock>> arrayList2 = new ArrayList<>();
            frameReblockBuffer.flushBufferToBinaryBlocks(arrayList2);
            arrayList.addAll(SparkUtils.fromIndexedFrameBlock(arrayList2));
        }
    }

    /* loaded from: input_file:org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils$LongFrameToLongWritableFrameFunction.class */
    public static class LongFrameToLongWritableFrameFunction implements PairFunction<Tuple2<Long, FrameBlock>, LongWritable, FrameBlock> {
        private static final long serialVersionUID = -1467314923206783333L;

        public Tuple2<LongWritable, FrameBlock> call(Tuple2<Long, FrameBlock> tuple2) throws Exception {
            return new Tuple2<>(new LongWritable(((Long) tuple2._1).longValue()), tuple2._2);
        }
    }

    /* loaded from: input_file:org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils$LongWritableTextToLongTextFunction.class */
    public static class LongWritableTextToLongTextFunction implements PairFunction<Tuple2<LongWritable, Text>, Long, Text> {
        private static final long serialVersionUID = -5408386071466175348L;

        public Tuple2<Long, Text> call(Tuple2<LongWritable, Text> tuple2) throws Exception {
            return new Tuple2<>(new Long(((LongWritable) tuple2._1).get()), tuple2._2);
        }
    }

    /* loaded from: input_file:org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils$LongWritableToSerFunction.class */
    public static class LongWritableToSerFunction implements PairFunction<Tuple2<LongWritable, FrameBlock>, LongWritable, FrameBlock> {
        private static final long serialVersionUID = 2286037080400222528L;

        public Tuple2<LongWritable, FrameBlock> call(Tuple2<LongWritable, FrameBlock> tuple2) throws Exception {
            return new Tuple2<>(new SerLongWritable(Long.valueOf(((LongWritable) tuple2._1).get())), tuple2._2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils$MatrixToBinaryBlockFunction.class */
    public static class MatrixToBinaryBlockFunction implements PairFlatMapFunction<Tuple2<MatrixIndexes, MatrixBlock>, Long, FrameBlock> {
        private static final long serialVersionUID = 6205071301074768437L;
        private int _brlen;
        private int _bclen;
        private long _clen;
        private int _maxRowsPerBlock;
        protected static final int BUFFER_SIZE = 1000000;

        public MatrixToBinaryBlockFunction(MatrixCharacteristics matrixCharacteristics) {
            this._brlen = -1;
            this._bclen = -1;
            this._clen = -1L;
            this._maxRowsPerBlock = -1;
            this._brlen = matrixCharacteristics.getRowsPerBlock();
            this._bclen = matrixCharacteristics.getColsPerBlock();
            this._clen = matrixCharacteristics.getCols();
            this._maxRowsPerBlock = Math.max((int) (PerfTestTool.MAX_DATASIZE / this._clen), 1);
        }

        public Iterable<Tuple2<Long, FrameBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> tuple2) throws Exception {
            ArrayList arrayList = new ArrayList();
            MatrixIndexes matrixIndexes = (MatrixIndexes) tuple2._1();
            MatrixBlock matrixBlock = (MatrixBlock) tuple2._2();
            Long l = new Long(((matrixIndexes.getRowIndex() - 1) * this._brlen) + 1);
            long columnIndex = (int) (((matrixIndexes.getColumnIndex() - 1) * this._bclen) + 1);
            long min = Math.min((columnIndex + matrixBlock.getMaxColumn()) - 1, this._clen);
            int computeCellInBlock = UtilFunctions.computeCellInBlock(columnIndex, this._bclen);
            int computeCellInBlock2 = UtilFunctions.computeCellInBlock(min, this._bclen);
            FrameBlock convertToFrameBlock = DataConverter.convertToFrameBlock(matrixBlock);
            int i = 0;
            while (true) {
                int i2 = i;
                if (i2 >= matrixBlock.getMaxRow()) {
                    return arrayList;
                }
                int min2 = Math.min((i2 + this._maxRowsPerBlock) - 1, matrixBlock.getMaxRow() - 1);
                FrameBlock sliceOperations = (i2 == 0 && min2 == matrixBlock.getMaxRow() - 1) ? convertToFrameBlock : convertToFrameBlock.sliceOperations(i2, min2, computeCellInBlock, computeCellInBlock2, null);
                if (columnIndex == 0 && min == matrixBlock.getMaxColumn() - 1) {
                    arrayList.add(new Tuple2(Long.valueOf(l.longValue() + i2), sliceOperations));
                } else {
                    FrameBlock frameBlock = new FrameBlock((int) this._clen, Expression.ValueType.STRING);
                    frameBlock.ensureAllocatedColumns((min2 - i2) + 1);
                    frameBlock.copy(0, min2 - i2, ((int) columnIndex) - 1, ((int) min) - 1, sliceOperations);
                    arrayList.add(new Tuple2(Long.valueOf(l.longValue() + i2), frameBlock));
                }
                i = min2 + 1;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils$MatrixToBinaryBlockOneColumnBlockFunction.class */
    public static class MatrixToBinaryBlockOneColumnBlockFunction implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, Long, FrameBlock> {
        private static final long serialVersionUID = 3716019666116660815L;
        private int _brlen;
        private int _bclen;
        private long _clen;

        public MatrixToBinaryBlockOneColumnBlockFunction(MatrixCharacteristics matrixCharacteristics) {
            this._brlen = -1;
            this._bclen = -1;
            this._clen = -1L;
            this._brlen = matrixCharacteristics.getRowsPerBlock();
            this._bclen = matrixCharacteristics.getColsPerBlock();
            this._clen = matrixCharacteristics.getCols();
        }

        public Tuple2<Long, FrameBlock> call(Tuple2<MatrixIndexes, MatrixBlock> tuple2) throws Exception {
            if (this._clen > this._bclen) {
                throw new DMLRuntimeException("The input matrix has more than one column block, this function supports only one column block.");
            }
            return new Tuple2<>(new Long(((((MatrixIndexes) tuple2._1()).getRowIndex() - 1) * this._brlen) + 1), DataConverter.convertToFrameBlock((MatrixBlock) tuple2._2()));
        }
    }

    /* loaded from: input_file:org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils$StringToSerTextFunction.class */
    private static class StringToSerTextFunction implements PairFunction<String, LongWritable, Text> {
        private static final long serialVersionUID = 8683232211035837695L;

        private StringToSerTextFunction() {
        }

        public Tuple2<LongWritable, Text> call(String str) throws Exception {
            return new Tuple2<>(new SerLongWritable(1L), new SerText(str));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils$TextToBinaryBlockFunction.class */
    public static class TextToBinaryBlockFunction extends CellToBinaryBlockFunction implements PairFlatMapFunction<Iterator<Text>, Long, FrameBlock> {
        private static final long serialVersionUID = -2042208027876880588L;
        List<Expression.ValueType> _schema;

        protected TextToBinaryBlockFunction(MatrixCharacteristics matrixCharacteristics, List<Expression.ValueType> list) {
            super(matrixCharacteristics);
            this._schema = null;
            this._schema = list;
        }

        public Iterable<Tuple2<Long, FrameBlock>> call(Iterator<Text> it) throws Exception {
            ArrayList<Tuple2<Long, FrameBlock>> arrayList = new ArrayList<>();
            FrameReblockBuffer frameReblockBuffer = new FrameReblockBuffer(this._bufflen, this._rlen, this._clen, this._schema);
            FastStringTokenizer fastStringTokenizer = new FastStringTokenizer(' ');
            while (it.hasNext()) {
                String text = it.next().toString();
                if (!text.startsWith("%")) {
                    fastStringTokenizer.reset(text);
                    long nextLong = fastStringTokenizer.nextLong();
                    long nextLong2 = fastStringTokenizer.nextLong();
                    Object stringToObject = UtilFunctions.stringToObject(this._schema.get(((int) nextLong2) - 1), fastStringTokenizer.nextToken());
                    if (frameReblockBuffer.getSize() >= frameReblockBuffer.getCapacity()) {
                        flushBufferToList(frameReblockBuffer, arrayList);
                    }
                    frameReblockBuffer.appendCell(nextLong, nextLong2, stringToObject);
                }
            }
            flushBufferToList(frameReblockBuffer, arrayList);
            return arrayList;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils$TextToStringFunction.class */
    public static class TextToStringFunction implements Function<Text, String> {
        private static final long serialVersionUID = -2744814934501782747L;

        private TextToStringFunction() {
        }

        public String call(Text text) throws Exception {
            return text.toString();
        }
    }

    public static JavaPairRDD<LongWritable, FrameBlock> csvToBinaryBlock(JavaSparkContext javaSparkContext, JavaPairRDD<LongWritable, Text> javaPairRDD, MatrixCharacteristics matrixCharacteristics, boolean z, String str, boolean z2, double d) throws DMLRuntimeException {
        if (!matrixCharacteristics.dimsKnown(true)) {
            matrixCharacteristics.set(javaPairRDD.values().map(new TextToStringFunction()).count() - (z ? 1 : 0), ((String) r0.first()).split(str).length, matrixCharacteristics.getRowsPerBlock(), matrixCharacteristics.getColsPerBlock(), -1L);
        }
        return javaPairRDD.values().zipWithIndex().mapPartitionsToPair(new CSVToBinaryBlockFunction(matrixCharacteristics, z, str, z2));
    }

    public static JavaPairRDD<LongWritable, FrameBlock> csvToBinaryBlock(JavaSparkContext javaSparkContext, JavaRDD<String> javaRDD, MatrixCharacteristics matrixCharacteristics, boolean z, String str, boolean z2, double d) throws DMLRuntimeException {
        return csvToBinaryBlock(javaSparkContext, (JavaPairRDD<LongWritable, Text>) javaRDD.mapToPair(new StringToSerTextFunction()), matrixCharacteristics, z, str, z2, d);
    }

    public static JavaRDD<String> binaryBlockToCsv(JavaPairRDD<LongWritable, FrameBlock> javaPairRDD, MatrixCharacteristics matrixCharacteristics, CSVFileFormatProperties cSVFileFormatProperties, boolean z) {
        JavaPairRDD mapToPair = javaPairRDD.mapToPair(new LongWritableToSerFunction());
        if (z) {
            mapToPair = mapToPair.sortByKey(true);
        }
        return mapToPair.flatMap(new BinaryBlockToCSVFunction(cSVFileFormatProperties));
    }

    public static JavaPairRDD<LongWritable, FrameBlock> textCellToBinaryBlock(JavaSparkContext javaSparkContext, JavaPairRDD<LongWritable, Text> javaPairRDD, MatrixCharacteristics matrixCharacteristics, List<Expression.ValueType> list) throws DMLRuntimeException {
        return textCellToBinaryBlockLongIndex(javaSparkContext, javaPairRDD.mapToPair(new LongWritableTextToLongTextFunction()), matrixCharacteristics, list).mapToPair(new LongFrameToLongWritableFrameFunction());
    }

    public static JavaPairRDD<Long, FrameBlock> textCellToBinaryBlockLongIndex(JavaSparkContext javaSparkContext, JavaPairRDD<Long, Text> javaPairRDD, MatrixCharacteristics matrixCharacteristics, List<Expression.ValueType> list) throws DMLRuntimeException {
        return RDDAggregateUtils.mergeByFrameKey(javaPairRDD.values().mapPartitionsToPair(new TextToBinaryBlockFunction(matrixCharacteristics, list)));
    }

    public static JavaRDD<String> binaryBlockToStringRDD(JavaPairRDD<LongWritable, FrameBlock> javaPairRDD, MatrixCharacteristics matrixCharacteristics, String str) throws DMLRuntimeException {
        if (str.equals(DataExpression.FORMAT_TYPE_VALUE_TEXT)) {
            return javaPairRDD.flatMap(new ConvertFrameBlockToIJVLines(matrixCharacteristics.getRowsPerBlock(), matrixCharacteristics.getColsPerBlock()));
        }
        throw new DMLRuntimeException("The output format:" + str + " is not implemented yet.");
    }

    public static JavaPairRDD<LongWritable, FrameBlock> matrixBlockToBinaryBlock(JavaSparkContext javaSparkContext, JavaPairRDD<MatrixIndexes, MatrixBlock> javaPairRDD, MatrixCharacteristics matrixCharacteristics) throws DMLRuntimeException {
        return matrixBlockToBinaryBlockLongIndex(javaSparkContext, javaPairRDD, matrixCharacteristics).mapToPair(new LongFrameToLongWritableFrameFunction());
    }

    public static JavaPairRDD<Long, FrameBlock> matrixBlockToBinaryBlockLongIndex(JavaSparkContext javaSparkContext, JavaPairRDD<MatrixIndexes, MatrixBlock> javaPairRDD, MatrixCharacteristics matrixCharacteristics) throws DMLRuntimeException {
        JavaPairRDD<Long, FrameBlock> mapToPair;
        if (matrixCharacteristics.getCols() > matrixCharacteristics.getColsPerBlock()) {
            mapToPair = javaPairRDD.flatMapToPair(new MatrixToBinaryBlockFunction(matrixCharacteristics));
            if (matrixCharacteristics.getCols() > matrixCharacteristics.getColsPerBlock()) {
                mapToPair = RDDAggregateUtils.mergeByFrameKey(mapToPair);
            }
        } else {
            mapToPair = javaPairRDD.mapToPair(new MatrixToBinaryBlockOneColumnBlockFunction(matrixCharacteristics));
        }
        return mapToPair;
    }

    public static JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlockToMatrixBlock(JavaPairRDD<LongWritable, FrameBlock> javaPairRDD, MatrixCharacteristics matrixCharacteristics, MatrixCharacteristics matrixCharacteristics2) {
        return RDDAggregateUtils.mergeByKey(javaPairRDD.flatMapToPair(new BinaryBlockToMatrixBlockFunction(matrixCharacteristics, matrixCharacteristics2)));
    }
}
