package org.apache.sling.distribution.journal.kafka;

import com.google.protobuf.GeneratedMessage;
import java.io.Closeable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.JsonMessageSender;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingException;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.ConfigurationPolicy;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.metatype.annotations.Designate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Designate(ocd = KafkaEndpoint.class)
@Component(service = {MessagingProvider.class}, configurationPolicy = ConfigurationPolicy.REQUIRE)
/* loaded from: input_file:org/apache/sling/distribution/journal/kafka/KafkaClientProvider.class */
public class KafkaClientProvider implements MessagingProvider, Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaClientProvider.class);
    public static final int PARTITION = 0;
    private volatile KafkaProducer<String, byte[]> rawProducer = null;
    private volatile KafkaProducer<String, String> jsonProducer = null;
    private String kafkaBootstrapServers;
    private int requestTimeout;
    private int defaultApiTimeout;

    @Activate
    public void activate(KafkaEndpoint kafkaEndpoint) {
        this.kafkaBootstrapServers = (String) Objects.requireNonNull(kafkaEndpoint.kafkaBootstrapServers());
        this.requestTimeout = kafkaEndpoint.kafkaRequestTimeout();
        this.defaultApiTimeout = kafkaEndpoint.kafkaDefaultApiTimeout();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    @Deactivate
    public void close() {
        IOUtils.closeQuietly(this.rawProducer);
        IOUtils.closeQuietly(this.jsonProducer);
    }

    public <T extends GeneratedMessage> MessageSender<T> createSender() {
        return new KafkaMessageSender(buildKafkaProducer());
    }

    public <T> Closeable createPoller(String str, Reset reset, HandlerAdapter<?>... handlerAdapterArr) {
        return createPoller(str, reset, null, handlerAdapterArr);
    }

    public Closeable createPoller(String str, Reset reset, @Nullable String str2, HandlerAdapter<?>... handlerAdapterArr) {
        String uuid = UUID.randomUUID().toString();
        KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerConfig(ByteArrayDeserializer.class, uuid, reset));
        TopicPartition topicPartition = new TopicPartition(str, 0);
        Set singleton = Collections.singleton(topicPartition);
        kafkaConsumer.assign(singleton);
        if (str2 != null) {
            kafkaConsumer.seek(topicPartition, offset(str2));
        } else if (reset == Reset.earliest) {
            kafkaConsumer.seekToBeginning(singleton);
        } else {
            kafkaConsumer.seekToEnd(singleton);
        }
        KafkaMessagePoller kafkaMessagePoller = new KafkaMessagePoller(kafkaConsumer, handlerAdapterArr);
        LOG.info(String.format("Created poller for consumerGroupId %s, reset %s, topicName %s, assign %s", uuid, reset, str, str2));
        return kafkaMessagePoller;
    }

    public <T> JsonMessageSender<T> createJsonSender() {
        return new KafkaJsonMessageSender(buildJsonKafkaProducer());
    }

    public <T> Closeable createJsonPoller(String str, Reset reset, MessageHandler<T> messageHandler, Class<T> cls) {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerConfig(StringDeserializer.class, UUID.randomUUID().toString(), reset));
        Set singleton = Collections.singleton(new TopicPartition(str, 0));
        kafkaConsumer.assign(singleton);
        if (reset == Reset.earliest) {
            kafkaConsumer.seekToBeginning(singleton);
        } else {
            kafkaConsumer.seekToEnd(singleton);
        }
        return new KafkaJsonMessagePoller(kafkaConsumer, messageHandler, cls);
    }

    public void assertTopic(String str) throws MessagingException {
        try {
            KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerConfig(StringDeserializer.class, UUID.randomUUID().toString(), Reset.latest));
            Throwable th = null;
            try {
                try {
                    if (!kafkaConsumer.listTopics().containsKey(str)) {
                        throw new MessagingException(String.format("Topic %s does not exist", str));
                    }
                    if (kafkaConsumer != null) {
                        if (0 != 0) {
                            try {
                                kafkaConsumer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            kafkaConsumer.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw new MessagingException(String.format("Unable to load topic stats for %s", str), e);
        }
    }

    public long retrieveOffset(String str, Reset reset) {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerConfig(ByteArrayDeserializer.class, UUID.randomUUID().toString(), reset));
        TopicPartition topicPartition = new TopicPartition(str, 0);
        Set singleton = Collections.singleton(topicPartition);
        Map beginningOffsets = reset == Reset.earliest ? kafkaConsumer.beginningOffsets(singleton) : kafkaConsumer.endOffsets(singleton);
        kafkaConsumer.close();
        return ((Long) beginningOffsets.get(topicPartition)).longValue();
    }

    public String assignTo(long j) {
        return String.format("%s:%s", 0, Long.valueOf(j));
    }

    @Nonnull
    private synchronized KafkaProducer<String, byte[]> buildKafkaProducer() {
        if (this.rawProducer == null) {
            this.rawProducer = new KafkaProducer<>(producerConfig(ByteArraySerializer.class));
        }
        return this.rawProducer;
    }

    @Nonnull
    private synchronized KafkaProducer<String, String> buildJsonKafkaProducer() {
        if (this.jsonProducer == null) {
            this.jsonProducer = new KafkaProducer<>(producerConfig(StringSerializer.class));
        }
        return this.jsonProducer;
    }

    private Map<String, Object> consumerConfig(Object obj, String str, Reset reset) {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", this.kafkaBootstrapServers);
        hashMap.put("group.id", str);
        hashMap.put("enable.auto.commit", false);
        hashMap.put("default.api.timeout.ms", Integer.valueOf(this.defaultApiTimeout));
        hashMap.put("key.deserializer", StringDeserializer.class);
        hashMap.put("value.deserializer", obj);
        hashMap.put("auto.offset.reset", reset.name());
        return hashMap;
    }

    private Map<String, Object> producerConfig(Object obj) {
        HashMap hashMap = new HashMap();
        hashMap.put("key.serializer", StringSerializer.class);
        hashMap.put("value.serializer", obj);
        hashMap.put("bootstrap.servers", this.kafkaBootstrapServers);
        hashMap.put("request.timeout.ms", Integer.valueOf(this.requestTimeout));
        hashMap.put("acks", "all");
        return Collections.unmodifiableMap(hashMap);
    }

    private long offset(String str) {
        String[] split = str.split(":");
        if (split.length != 2) {
            throw new IllegalArgumentException(String.format("Illegal assign %s", str));
        }
        return Long.parseLong(split[1]);
    }
}
