package org.apache.sling.distribution.journal.impl.queue.impl;

import java.io.Closeable;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingException;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.RunnableUtil;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
/* loaded from: input_file:org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.class */
public class PubQueueCache {
    private static final Logger LOG = LoggerFactory.getLogger(PubQueueCache.class);
    private static final long SEEDING_DELAY_MS = 1000;
    private final MessagingProvider messagingProvider;
    private final EventAdmin eventAdmin;
    private final Closeable tailPoller;
    private final String topic;
    private DistributionMetricsService distributionMetricsService;
    private volatile boolean closed;
    private final Map<String, OffsetQueue<DistributionQueueItem>> agentQueues = new ConcurrentHashMap();
    private final CountDownLatch seeded = new CountDownLatch(1);
    private final Lock headPollerLock = new ReentrantLock();
    private final AtomicLong minOffset = new AtomicLong(Long.MAX_VALUE);
    private final Set<JMXRegistration> jmxRegs = new HashSet();

    public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String str) {
        this.messagingProvider = messagingProvider;
        this.eventAdmin = eventAdmin;
        this.distributionMetricsService = distributionMetricsService;
        this.topic = str;
        this.tailPoller = messagingProvider.createPoller(str, Reset.latest, new HandlerAdapter[]{HandlerAdapter.create(Messages.PackageMessage.class, this::handlePackage)});
        RunnableUtil.startBackgroundThread(this::sendSeedingMessages, "queue seeding");
    }

    @Nonnull
    public OffsetQueue<DistributionQueueItem> getOffsetQueue(String str, long j) {
        fetchIfNeeded(j);
        return this.agentQueues.getOrDefault(str, new OffsetQueueImpl());
    }

    public int size() {
        return this.agentQueues.values().stream().mapToInt((v0) -> {
            return v0.getSize();
        }).sum();
    }

    public void close() {
        this.closed = true;
        IOUtils.closeQuietly(this.tailPoller);
        this.jmxRegs.stream().forEach((v0) -> {
            IOUtils.closeQuietly(v0);
        });
    }

    private void sendSeedingMessages() {
        LOG.info("Send seeding messages");
        MessageSender createSender = this.messagingProvider.createSender();
        while (!this.closed) {
            Messages.PackageMessage createTestMessage = createTestMessage();
            try {
                LOG.debug("Send seeding message");
                createSender.send(this.topic, createTestMessage);
            } catch (MessagingException e) {
                LOG.info(e.getMessage());
                sleep(SEEDING_DELAY_MS);
            } catch (InterruptedException e2) {
            }
            if (this.seeded.await(SEEDING_DELAY_MS, TimeUnit.MILLISECONDS)) {
                LOG.info("Cache has been seeded");
                return;
            }
            continue;
        }
    }

    private Messages.PackageMessage createTestMessage() {
        return Messages.PackageMessage.newBuilder().setPubSlingId("seeder").setPkgId(UUID.randomUUID().toString()).setPkgType("seeder").setReqType(Messages.PackageMessage.ReqType.TEST).build();
    }

    private void fetchIfNeeded(long j) {
        waitSeeded();
        long minOffset = getMinOffset();
        if (j < minOffset) {
            LOG.debug(String.format("Requested min offset %s smaller than cached min offset %s", Long.valueOf(j), Long.valueOf(minOffset)));
            this.headPollerLock.lock();
            try {
                try {
                    minOffset = getMinOffset();
                    if (j < minOffset) {
                        fetch(j, minOffset);
                    }
                } catch (Exception e) {
                    throw new RuntimeException(String.format("Failed to fetch offsets [%s,%s[, %s", Long.valueOf(j), Long.valueOf(minOffset), e.getMessage()), e);
                }
            } finally {
                this.headPollerLock.unlock();
            }
        }
    }

    private void fetch(long j, long j2) throws InterruptedException {
        this.distributionMetricsService.getQueueCacheFetchCount().increment();
        merge(new RangePoller(this.messagingProvider, this.topic, j, j2).fetchRange());
        updateMinOffset(j);
    }

    private void waitSeeded() {
        while (!this.closed) {
            if (this.seeded.await(SEEDING_DELAY_MS, TimeUnit.MILLISECONDS)) {
                return;
            } else {
                LOG.debug("Waiting for seeded cache");
            }
        }
        throw new RuntimeException();
    }

    protected long getMinOffset() {
        return this.minOffset.longValue();
    }

    private void updateMinOffset(long j) {
        this.minOffset.accumulateAndGet(j, Math::min);
    }

    private void merge(List<FullMessage<Messages.PackageMessage>> list) {
        LOG.debug("Merging fetched offsets");
        ((Map) list.stream().filter(this::isNotTestMessage).collect(Collectors.groupingBy(fullMessage -> {
            return fullMessage.getMessage().getPubAgentName();
        }))).forEach(this::mergeByAgent);
        list.stream().findFirst().ifPresent(fullMessage2 -> {
            updateMinOffset(fullMessage2.getInfo().getOffset());
        });
    }

    private void mergeByAgent(String str, List<FullMessage<Messages.PackageMessage>> list) {
        OffsetQueueImpl offsetQueueImpl = new OffsetQueueImpl();
        list.stream().forEach(fullMessage -> {
            offsetQueueImpl.putItem(fullMessage.getInfo().getOffset(), QueueItemFactory.fromPackage(fullMessage));
        });
        getOrCreateQueue(str).putItems(offsetQueueImpl);
        list.stream().forEach(this::sendQueuedEvent);
    }

    private void sendQueuedEvent(FullMessage<Messages.PackageMessage> fullMessage) {
        Messages.PackageMessage message = fullMessage.getMessage();
        this.eventAdmin.postEvent(DistributionEvent.eventPackageQueued(message, message.getPubAgentName()));
    }

    private OffsetQueue<DistributionQueueItem> getOrCreateQueue(String str) {
        return this.agentQueues.computeIfAbsent(str, this::createQueue);
    }

    private boolean isNotTestMessage(FullMessage<Messages.PackageMessage> fullMessage) {
        return fullMessage.getMessage().getReqType() != Messages.PackageMessage.ReqType.TEST;
    }

    private OffsetQueue<DistributionQueueItem> createQueue(String str) {
        OffsetQueueImpl offsetQueueImpl = new OffsetQueueImpl();
        this.jmxRegs.add(new JMXRegistration(offsetQueueImpl, OffsetQueue.class.getSimpleName(), str));
        return offsetQueueImpl;
    }

    private void sleep(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
        }
    }

    private void handlePackage(MessageInfo messageInfo, Messages.PackageMessage packageMessage) {
        merge(Collections.singletonList(new FullMessage(messageInfo, packageMessage)));
        this.seeded.countDown();
    }
}
