package org.apache.sling.distribution.journal.impl.subscriber;

import java.io.Closeable;
import java.io.IOException;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/sling/distribution/journal/impl/subscriber/PackageStatusWatcher.class */
public class PackageStatusWatcher implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(PackageStatusWatcher.class);
    private final Closeable poller;
    private final String topicName;
    private final String subAgentName;
    private final NavigableMap<Long, FullMessage<Messages.PackageStatusMessage>> cache = new ConcurrentSkipListMap();

    public PackageStatusWatcher(MessagingProvider messagingProvider, Topics topics, String str) {
        this.topicName = topics.getStatusTopic();
        this.subAgentName = str;
        this.poller = messagingProvider.createPoller(this.topicName, Reset.earliest, new HandlerAdapter[]{HandlerAdapter.create(Messages.PackageStatusMessage.class, this::handle)});
    }

    public Messages.PackageStatusMessage.Status getStatus(long j) {
        FullMessage fullMessage = (FullMessage) this.cache.get(Long.valueOf(j));
        if (fullMessage != null) {
            return fullMessage.getMessage().getStatus();
        }
        return null;
    }

    public Long getStatusOffset(long j) {
        FullMessage fullMessage = (FullMessage) this.cache.get(Long.valueOf(j));
        if (fullMessage != null) {
            return Long.valueOf(fullMessage.getInfo().getOffset());
        }
        return null;
    }

    public void clear(long j) {
        NavigableMap<Long, FullMessage<Messages.PackageStatusMessage>> headMap = this.cache.headMap(Long.valueOf(j), false);
        if (!headMap.isEmpty()) {
            LOG.info("Remove package offsets {} from status cache", headMap.keySet());
        }
        headMap.clear();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.poller.close();
    }

    public void handle(MessageInfo messageInfo, Messages.PackageStatusMessage packageStatusMessage) {
        Long valueOf = Long.valueOf(packageStatusMessage.getOffset());
        FullMessage fullMessage = new FullMessage(messageInfo, packageStatusMessage);
        if (this.subAgentName.equals(packageStatusMessage.getSubAgentName())) {
            if (this.cache.containsKey(valueOf)) {
                LOG.warn("Package offset {} already exists", valueOf);
            }
            this.cache.put(valueOf, fullMessage);
        }
    }
}
