package org.apache.sling.distribution.journal.kafka;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.sling.distribution.journal.ExceptionEventSender;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.RunnableUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/sling/distribution/journal/kafka/KafkaPoller.class */
public class KafkaPoller<T> implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaPoller.class);
    private static final long ERROR_SLEEP_MS = 10000;
    private final KafkaConsumer<String, T> consumer;
    private final Consumer<ConsumerRecord<String, T>> handler;
    private final ExceptionEventSender eventSender;
    private volatile boolean running = true;
    long errorSleepMs = ERROR_SLEEP_MS;

    public KafkaPoller(KafkaConsumer<String, T> kafkaConsumer, ExceptionEventSender exceptionEventSender, Consumer<ConsumerRecord<String, T>> consumer) {
        this.handler = consumer;
        this.consumer = (KafkaConsumer) Objects.requireNonNull(kafkaConsumer);
        this.eventSender = (ExceptionEventSender) Objects.requireNonNull(exceptionEventSender);
        RunnableUtil.startBackgroundThread(this::run, "Message Poller");
    }

    public static Closeable createProtobufPoller(KafkaConsumer<String, byte[]> kafkaConsumer, ExceptionEventSender exceptionEventSender, HandlerAdapter<?>... handlerAdapterArr) {
        return new KafkaPoller(kafkaConsumer, exceptionEventSender, new ProtobufRecordHandler(handlerAdapterArr));
    }

    public static <T> Closeable createJsonPoller(KafkaConsumer<String, String> kafkaConsumer, ExceptionEventSender exceptionEventSender, MessageHandler<T> messageHandler, Class<T> cls) {
        return new KafkaPoller(kafkaConsumer, exceptionEventSender, new JsonRecordHandler(messageHandler, cls));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        LOG.info("Shutdown poller");
        this.running = false;
        this.consumer.wakeup();
    }

    public void run() {
        LOG.info("Start poller");
        while (this.running) {
            try {
                this.consumer.poll(Duration.ofHours(1L)).forEach(this::handle);
            } catch (Exception e) {
                this.eventSender.send(e);
                LOG.error("Exception while receiving from kafka: {}", e.getMessage(), e);
                sleepAfterError();
            } catch (WakeupException e2) {
                LOG.debug("Waked up {}", e2.getMessage(), e2);
                this.running = false;
            }
        }
        this.consumer.close();
        LOG.info("Stopped poller");
    }

    public void handle(ConsumerRecord<String, T> consumerRecord) {
        try {
            this.handler.accept(consumerRecord);
        } catch (Exception e) {
            LOG.warn("Error consuming message {}", consumerRecord.headers());
        }
    }

    private void sleepAfterError() {
        try {
            Thread.sleep(this.errorSleepMs);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
