package org.apache.sling.distribution.journal.kafka;

import com.google.protobuf.ByteString;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessagingException;
import org.apache.sling.distribution.journal.messages.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/sling/distribution/journal/kafka/ProtobufRecordHandler.class */
public class ProtobufRecordHandler implements Consumer<ConsumerRecord<String, byte[]>> {
    private static final Logger LOG = LoggerFactory.getLogger(ProtobufRecordHandler.class);
    private final Map<Class<?>, HandlerAdapter<?>> handlers = new HashMap();

    public ProtobufRecordHandler(HandlerAdapter<?>... handlerAdapterArr) {
        for (HandlerAdapter<?> handlerAdapter : handlerAdapterArr) {
            this.handlers.put(handlerAdapter.getType(), handlerAdapter);
        }
    }

    @Override // java.util.function.Consumer
    public void accept(ConsumerRecord<String, byte[]> consumerRecord) {
        getHandler(consumerRecord).ifPresent(handlerAdapter -> {
            handleRecord(handlerAdapter, consumerRecord);
        });
    }

    private Optional<HandlerAdapter<?>> getHandler(ConsumerRecord<String, byte[]> consumerRecord) {
        Class type = Types.getType(Integer.parseInt(getHeaderValue(consumerRecord.headers(), "type")), Integer.parseInt(getHeaderValue(consumerRecord.headers(), "version")));
        Optional<HandlerAdapter<?>> ofNullable = Optional.ofNullable(this.handlers.get(type));
        if (!ofNullable.isPresent()) {
            LOG.debug("No handler registered for type {}", type.getName());
        }
        return ofNullable;
    }

    private void handleRecord(HandlerAdapter<?> handlerAdapter, ConsumerRecord<String, byte[]> consumerRecord) {
        try {
            handlerAdapter.handle(new KafkaMessageInfo(consumerRecord), ByteString.copyFrom((byte[]) consumerRecord.value()));
        } catch (Exception e) {
            throw new MessagingException(e.getMessage(), e);
        }
    }

    private String getHeaderValue(Headers headers, String str) {
        return new String(((Header) Optional.ofNullable(headers.lastHeader(str)).orElseThrow(() -> {
            return new IllegalArgumentException(String.format("Header with key %s not found", str));
        })).value(), StandardCharsets.UTF_8);
    }
}
