package org.apache.carbondata.stream;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.carbondata.common.exceptions.NoSuchStreamException;
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.spark.StreamingOption;
import org.apache.carbondata.streaming.CarbonStreamException;
import org.apache.carbondata.streaming.parser.CarbonStreamParser;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import scala.Option$;
import scala.Predef$;
import scala.StringContext;
import scala.collection.JavaConverters$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Set;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.StringBuilder;
import scala.runtime.ObjectRef;

/* compiled from: StreamJobManager.scala */
/* loaded from: input_file:org/apache/carbondata/stream/StreamJobManager$.class */
public final class StreamJobManager$ {
    public static final StreamJobManager$ MODULE$ = null;
    private final LogService org$apache$carbondata$stream$StreamJobManager$$LOGGER;
    private final ConcurrentHashMap<String, StreamJobDesc> jobs;

    static {
        new StreamJobManager$();
    }

    public LogService org$apache$carbondata$stream$StreamJobManager$$LOGGER() {
        return this.org$apache$carbondata$stream$StreamJobManager$$LOGGER;
    }

    private ConcurrentHashMap<String, StreamJobDesc> jobs() {
        return this.jobs;
    }

    private void validateSourceTable(CarbonTable carbonTable) {
        if (!carbonTable.isStreamingSource()) {
            throw new MalformedCarbonCommandException(new StringBuilder().append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Table ", " is not streaming source table "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{carbonTable.getTableName()}))).append("('streaming' tblproperty is not 'source')").toString());
        }
    }

    private void validateSinkTable(boolean z, StructType structType, CarbonTable carbonTable) {
        if (!carbonTable.isStreamingSink()) {
            throw new MalformedCarbonCommandException(new StringBuilder().append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Table ", " is not streaming sink table "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{carbonTable.getTableName()}))).append("('streaming' tblproperty is not 'sink' or 'true')").toString());
        }
        if (z && !structType.equals(StructType$.MODULE$.apply((Buffer) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(carbonTable.getCreateOrderColumn(carbonTable.getTableName())).asScala()).map(new StreamJobManager$$anonfun$1(), Buffer$.MODULE$.canBuildFrom())))) {
            throw new MalformedCarbonCommandException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Schema of table ", " does not match query output"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{carbonTable.getTableName()})));
        }
    }

    public String startStream(SparkSession sparkSession, boolean z, String str, CarbonTable carbonTable, final CarbonTable carbonTable2, String str2, final Dataset<Row> dataset, final StreamingOption streamingOption) {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final ObjectRef create = ObjectRef.create((Object) null);
        final ObjectRef create2 = ObjectRef.create((Object) null);
        if (jobs().containsKey(str)) {
            if (z) {
                return jobs().get(str).streamingQuery().id().toString();
            }
            throw new MalformedCarbonCommandException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Stream Name ", " already exists"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str})));
        }
        validateSourceTable(carbonTable);
        validateSinkTable((Option$.MODULE$.apply(carbonTable.getFormat()).contains("kafka") || Option$.MODULE$.apply(carbonTable.getFormat()).contains("socket")) ? false : true, dataset.schema(), carbonTable2);
        Thread thread = new Thread(new Runnable(carbonTable2, dataset, streamingOption, countDownLatch, create, create2) { // from class: org.apache.carbondata.stream.StreamJobManager$$anon$1
            private final CarbonTable sinkTable$1;
            private final Dataset streamDf$1;
            private final StreamingOption options$1;
            private final CountDownLatch latch$1;
            private final ObjectRef exception$1;
            private final ObjectRef job$1;

            @Override // java.lang.Runnable
            public void run() {
                try {
                    this.job$1.elem = this.streamDf$1.writeStream().format("carbondata").trigger(this.options$1.trigger()).option("checkpointLocation", this.options$1.checkpointLocation(this.sinkTable$1.getTablePath())).option("dateformat", this.options$1.dateFormat()).option("timestampformat", this.options$1.timeStampFormat()).option(CarbonStreamParser.CARBON_STREAM_PARSER, this.options$1.rowParser()).option("dbName", this.sinkTable$1.getDatabaseName()).option("tableName", this.sinkTable$1.getTableName()).option("bad_record_path", this.options$1.badRecordsPath()).option("bad_records_action", this.options$1.badRecordsAction()).option("bad_records_logger_enable", this.options$1.badRecordsLogger()).option("is_empty_bad_record", this.options$1.isEmptyBadRecord()).options(this.options$1.remainingOption()).start();
                    this.latch$1.countDown();
                    ((StreamingQuery) this.job$1.elem).awaitTermination();
                } catch (Throwable th) {
                    StreamJobManager$.MODULE$.org$apache$carbondata$stream$StreamJobManager$$LOGGER().error(th);
                    this.exception$1.elem = th;
                    this.latch$1.countDown();
                }
            }

            {
                this.sinkTable$1 = carbonTable2;
                this.streamDf$1 = dataset;
                this.options$1 = streamingOption;
                this.latch$1 = countDownLatch;
                this.exception$1 = create;
                this.job$1 = create2;
            }
        });
        thread.start();
        if (!countDownLatch.await(10L, TimeUnit.SECONDS)) {
            thread.interrupt();
            throw new CarbonStreamException("Streaming job takes too long to start");
        }
        if (((Throwable) create.elem) != null) {
            throw ((Throwable) create.elem);
        }
        jobs().put(str, new StreamJobDesc((StreamingQuery) create2.elem, str, carbonTable.getDatabaseName(), carbonTable.getTableName(), carbonTable2.getDatabaseName(), carbonTable2.getTableName(), str2, thread, StreamJobDesc$.MODULE$.apply$default$9()));
        org$apache$carbondata$stream$StreamJobManager$$LOGGER().audit(new StringBuilder().append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"STREAM ", " started with job id '", "', "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str, ((StreamingQuery) create2.elem).id().toString()}))).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"from ", ".", " "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{carbonTable.getDatabaseName(), carbonTable.getTableName()}))).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"to ", ".", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{carbonTable2.getDatabaseName(), carbonTable2.getTableName()}))).toString());
        return ((StreamingQuery) create2.elem).id().toString();
    }

    public void stopStream(String str, boolean z) {
        if (!jobs().containsKey(str)) {
            if (!z) {
                throw new NoSuchStreamException(str);
            }
            return;
        }
        StreamJobDesc streamJobDesc = jobs().get(str);
        streamJobDesc.streamingQuery().stop();
        streamJobDesc.thread().interrupt();
        jobs().remove(str);
        org$apache$carbondata$stream$StreamJobManager$$LOGGER().audit(new StringBuilder().append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"STREAM ", " stopped, job id '", "', "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str, streamJobDesc.streamingQuery().id().toString()}))).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"from ", ".", " "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{streamJobDesc.sourceDb(), streamJobDesc.sourceTable()}))).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"to ", ".", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{streamJobDesc.sinkDb(), streamJobDesc.sinkTable()}))).toString());
    }

    public Set<StreamJobDesc> getAllJobs() {
        return ((TraversableOnce) JavaConverters$.MODULE$.collectionAsScalaIterableConverter(jobs().values()).asScala()).toSet();
    }

    private StreamJobManager$() {
        MODULE$ = this;
        this.org$apache$carbondata$stream$StreamJobManager$$LOGGER = LogServiceFactory.getLogService(getClass().getCanonicalName());
        this.jobs = new ConcurrentHashMap<>();
    }
}
