package org.apache.flink.table.store.shaded.connector.kafka.sink;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.table.store.shaded.org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.flink.table.store.shaded.org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.flink.table.store.shaded.org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.flink.table.store.shaded.org.apache.kafka.common.errors.RetriableException;
import org.apache.flink.table.store.shaded.org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/store/shaded/connector/kafka/sink/KafkaCommitter.class */
class KafkaCommitter implements Committer<KafkaCommittable>, Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaCommitter.class);
    public static final String UNKNOWN_PRODUCER_ID_ERROR_MESSAGE = "because of a bug in the Kafka broker (KAFKA-9310). Please upgrade to Kafka 2.5+. If you are running with concurrent checkpoints, you also may want to try without them.\nTo avoid data loss, the application will restart.";
    private final Properties kafkaProducerConfig;

    @Nullable
    private FlinkKafkaInternalProducer<?, ?> recoveryProducer;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaCommitter(Properties properties) {
        this.kafkaProducerConfig = properties;
    }

    public List<KafkaCommittable> commit(List<KafkaCommittable> list) throws IOException {
        ArrayList arrayList = new ArrayList();
        Exception exc = null;
        for (KafkaCommittable kafkaCommittable : list) {
            String transactionalId = kafkaCommittable.getTransactionalId();
            LOG.debug("Committing Kafka transaction {}", transactionalId);
            Optional<Recyclable<? extends FlinkKafkaInternalProducer<?, ?>>> producer = kafkaCommittable.getProducer();
            try {
                FlinkKafkaInternalProducer flinkKafkaInternalProducer = (FlinkKafkaInternalProducer) producer.map((v0) -> {
                    return v0.getObject();
                }).orElseGet(() -> {
                    return getRecoveryProducer(kafkaCommittable);
                });
                flinkKafkaInternalProducer.commitTransaction();
                flinkKafkaInternalProducer.flush();
            } catch (InvalidTxnStateException e) {
                LOG.error("Unable to commit transaction ({}) because it's in an invalid state. Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.", kafkaCommittable, e);
            } catch (ProducerFencedException e2) {
                LOG.error("Unable to commit transaction ({}) because its producer is already fenced. This means that you either have a different producer with the same '{}' (this is unlikely with the '{}' as all generated ids are unique and shouldn't be reused) or recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss, please consult the Flink documentation for more details.", new Object[]{kafkaCommittable, ProducerConfig.TRANSACTIONAL_ID_CONFIG, KafkaSink.class.getSimpleName(), ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, this.kafkaProducerConfig.getProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), e2});
            } catch (RetriableException e3) {
                LOG.warn("Encountered retriable exception while committing {}.", transactionalId, e3);
                arrayList.add(kafkaCommittable);
            } catch (UnknownProducerIdException e4) {
                LOG.error("Unable to commit transaction ({}) because of a bug in the Kafka broker (KAFKA-9310). Please upgrade to Kafka 2.5+. If you are running with concurrent checkpoints, you also may want to try without them.\nTo avoid data loss, the application will restart.", kafkaCommittable, e4);
            } catch (Exception e5) {
                LOG.error("Transaction ({}) encountered error and data has been potentially lost.", kafkaCommittable, e5);
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e5, exc);
            }
            producer.ifPresent((v0) -> {
                v0.close();
            });
            if (exc != null) {
                throw new FlinkRuntimeException("Some committables were not committed and committing failed with:", exc);
            }
        }
        return arrayList;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.recoveryProducer != null) {
            this.recoveryProducer.close();
        }
    }

    private FlinkKafkaInternalProducer<?, ?> getRecoveryProducer(KafkaCommittable kafkaCommittable) {
        if (this.recoveryProducer == null) {
            this.recoveryProducer = new FlinkKafkaInternalProducer<>(this.kafkaProducerConfig, kafkaCommittable.getTransactionalId());
        } else {
            this.recoveryProducer.setTransactionId(kafkaCommittable.getTransactionalId());
        }
        this.recoveryProducer.resumeTransaction(kafkaCommittable.getProducerId(), kafkaCommittable.getEpoch());
        return this.recoveryProducer;
    }
}
