package org.apache.sling.distribution.journal.impl.queue.impl;

import java.io.Closeable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
@Component
/* loaded from: input_file:org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.class */
public class PubQueueProviderImpl implements PubQueueProvider {
    private static final Logger LOG = LoggerFactory.getLogger(PubQueueProviderImpl.class);
    private final Map<String, OffsetQueue<Long>> errorQueues = new ConcurrentHashMap();

    @Reference
    private MessagingProvider messagingProvider;

    @Reference
    private Topics topics;

    @Reference
    private PubQueueCacheService pubQueueCacheService;
    private Closeable statusPoller;
    private MessageSender<Messages.CommandMessage> sender;

    public PubQueueProviderImpl() {
    }

    public PubQueueProviderImpl(PubQueueCacheService pubQueueCacheService, MessagingProvider messagingProvider, Topics topics) {
        this.pubQueueCacheService = pubQueueCacheService;
        this.messagingProvider = messagingProvider;
        this.topics = topics;
    }

    @Activate
    public void activate() {
        this.statusPoller = this.messagingProvider.createPoller(this.topics.getStatusTopic(), Reset.earliest, new HandlerAdapter[]{HandlerAdapter.create(Messages.PackageStatusMessage.class, this::handleStatus)});
        this.sender = this.messagingProvider.createSender();
        LOG.info("Started Publisher queue provider service");
    }

    @Deactivate
    public void deactivate() {
        IOUtils.closeQuietly(this.statusPoller);
        LOG.info("Stopped Publisher queue provider service");
    }

    @Override // org.apache.sling.distribution.journal.impl.queue.PubQueueProvider
    @Nonnull
    public DistributionQueue getQueue(String str, String str2, String str3, String str4, long j, int i, boolean z) {
        OffsetQueue<DistributionQueueItem> offsetQueue = this.pubQueueCacheService.getOffsetQueue(str, j);
        return new PubQueue(str4, offsetQueue.getMinOffsetQueue(j), i, z ? j2 -> {
            sendClearCommand(str2, str3, j2);
        } : null);
    }

    @Override // org.apache.sling.distribution.journal.impl.queue.PubQueueProvider
    @Nonnull
    public DistributionQueue getErrorQueue(String str, String str2, String str3, String str4) {
        OffsetQueue<DistributionQueueItem> offsetQueue;
        OffsetQueue<Long> orDefault = this.errorQueues.getOrDefault(errorQueueKey(str, str2, str3), new OffsetQueueImpl());
        long headOffset = orDefault.getHeadOffset();
        if (headOffset < 0) {
            offsetQueue = new OffsetQueueImpl();
        } else {
            offsetQueue = this.pubQueueCacheService.getOffsetQueue(str, orDefault.getItem(headOffset).longValue());
        }
        return new PubErrQueue(str4, offsetQueue, orDefault);
    }

    public void handleStatus(MessageInfo messageInfo, Messages.PackageStatusMessage packageStatusMessage) {
        if (packageStatusMessage.getStatus() == Messages.PackageStatusMessage.Status.REMOVED_FAILED) {
            this.errorQueues.computeIfAbsent(errorQueueKey(packageStatusMessage.getPubAgentName(), packageStatusMessage.getSubSlingId(), packageStatusMessage.getSubAgentName()), str -> {
                return new OffsetQueueImpl();
            }).putItem(messageInfo.getOffset(), Long.valueOf(packageStatusMessage.getOffset()));
        }
    }

    @Nonnull
    private String errorQueueKey(String str, String str2, String str3) {
        return String.format("%s#%s#%s", str, str2, str3);
    }

    private void sendClearCommand(String str, String str2, long j) {
        Messages.CommandMessage build = Messages.CommandMessage.newBuilder().setSubSlingId(str).setSubAgentName(str2).setClearCommand(Messages.ClearCommand.newBuilder().setOffset(j).build()).build();
        LOG.info("Sending clear command to subSlingId: {}, subAgentName: {} with offset {}.", new Object[]{str, str2, Long.valueOf(j)});
        this.sender.send(this.topics.getCommandTopic(), build);
    }
}
