package org.apache.sling.distribution.journal.kafka;

import com.google.protobuf.ByteString;
import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.RunnableUtil;
import org.apache.sling.distribution.journal.messages.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.class */
public class KafkaMessagePoller implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaMessagePoller.class);
    private final KafkaConsumer<String, byte[]> consumer;
    private final String types;
    private final Map<Class<?>, HandlerAdapter<?>> handlers = new HashMap();
    private volatile boolean running = true;

    public KafkaMessagePoller(KafkaConsumer<String, byte[]> kafkaConsumer, HandlerAdapter<?>... handlerAdapterArr) {
        this.consumer = kafkaConsumer;
        for (HandlerAdapter<?> handlerAdapter : handlerAdapterArr) {
            this.handlers.put(handlerAdapter.getType(), handlerAdapter);
        }
        this.types = this.handlers.keySet().toString();
        RunnableUtil.startBackgroundThread(this::run, String.format("Message Poller %s", this.types));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        LOG.info("Shutdown poller for types {}", this.types);
        this.running = false;
        this.consumer.wakeup();
    }

    public void run() {
        LOG.info("Start poller for types {}", this.types);
        while (this.running) {
            try {
                try {
                    try {
                        consume();
                    } catch (WakeupException e) {
                        if (this.running) {
                            LOG.error("Waked up while running {}", e.getMessage(), e);
                            throw e;
                        }
                        LOG.debug("Waked up while stopping {}", e.getMessage(), e);
                        this.consumer.close();
                    }
                } catch (Throwable th) {
                    LOG.error(String.format("Catch Throwable %s closing consumer", th.getMessage()), th);
                    throw th;
                }
            } catch (Throwable th2) {
                this.consumer.close();
                throw th2;
            }
        }
        this.consumer.close();
        LOG.info("Stop poller for types {}", this.types);
    }

    private void consume() {
        this.consumer.poll(Duration.ofHours(1L)).forEach(this::handleRecord);
    }

    private void handleRecord(ConsumerRecord<String, byte[]> consumerRecord) {
        try {
            Class type = Types.getType(Integer.parseInt(getHeaderValue(consumerRecord.headers(), "type")), Integer.parseInt(getHeaderValue(consumerRecord.headers(), "version")));
            HandlerAdapter<?> handlerAdapter = this.handlers.get(type);
            if (handlerAdapter == null) {
                LOG.debug("No handler registered for type {}", type.getName());
                return;
            }
            try {
                handleRecord(handlerAdapter, consumerRecord);
            } catch (Exception e) {
                LOG.warn(String.format("Error consuming message for types %s", this.types));
            }
        } catch (RuntimeException e2) {
            LOG.info("Ignoring unknown message");
        }
    }

    private void handleRecord(HandlerAdapter<?> handlerAdapter, ConsumerRecord<String, byte[]> consumerRecord) throws Exception {
        handlerAdapter.handle(new KafkaMessageInfo(consumerRecord), ByteString.copyFrom((byte[]) consumerRecord.value()));
    }

    private String getHeaderValue(Headers headers, String str) {
        Header lastHeader = headers.lastHeader(str);
        if (lastHeader == null) {
            throw new IllegalArgumentException(String.format("Header with key %s not found", str));
        }
        return new String(lastHeader.value(), StandardCharsets.UTF_8);
    }
}
