package org.apache.sling.distribution.journal.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.RunnableUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.class */
public class KafkaJsonMessagePoller<T> implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaJsonMessagePoller.class);
    private volatile boolean running = true;
    private final KafkaConsumer<String, String> consumer;
    private final MessageHandler<T> handler;
    private final ObjectReader reader;

    public KafkaJsonMessagePoller(KafkaConsumer<String, String> kafkaConsumer, MessageHandler<T> messageHandler, Class<T> cls) {
        this.consumer = kafkaConsumer;
        this.handler = messageHandler;
        this.reader = new ObjectMapper().readerFor(cls);
        RunnableUtil.startBackgroundThread(this::run, String.format("Message Json Poller for handler %s", messageHandler));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        LOG.info("Shutdown JSON poller for handler {}", this.handler);
        this.running = false;
        this.consumer.wakeup();
    }

    public void run() {
        LOG.info("Start JSON poller for handler {}", this.handler);
        while (this.running) {
            try {
                try {
                    try {
                        consume();
                    } catch (WakeupException e) {
                        if (this.running) {
                            LOG.error("Waked up while running {}", e.getMessage(), e);
                            throw e;
                        }
                        LOG.debug("Waked up while stopping {}", e.getMessage(), e);
                        this.consumer.close();
                    }
                } catch (Throwable th) {
                    LOG.error(String.format("Catch Throwable %s closing consumer", th.getMessage()), th);
                    throw th;
                }
            } catch (Throwable th2) {
                this.consumer.close();
                throw th2;
            }
        }
        this.consumer.close();
        LOG.info("Stop JSON poller for handler {}", this.handler);
    }

    private void consume() {
        this.consumer.poll(Duration.ofHours(1L)).forEach(this::handleRecord);
    }

    private void handleRecord(ConsumerRecord<String, String> consumerRecord) {
        KafkaMessageInfo kafkaMessageInfo = new KafkaMessageInfo(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), consumerRecord.timestamp());
        String str = (String) consumerRecord.value();
        try {
            this.handler.handle(kafkaMessageInfo, this.reader.readValue(str));
        } catch (IOException e) {
            LOG.error("Failed to parse payload {}", str);
        }
    }
}
