package org.apache.sling.distribution.journal.impl.shared;

import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.messages.Messages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
/* loaded from: input_file:org/apache/sling/distribution/journal/impl/shared/LimitPoller.class */
public class LimitPoller {
    private static final Duration CONNECT_TIMEOUT = Duration.ofSeconds(30);
    private final long minOffset;
    private final long maxMessages;
    private final Closeable headPoller;
    private final Logger log = LoggerFactory.getLogger(LimitPoller.class);
    private final Queue<FullMessage<Messages.PackageMessage>> messages = new ConcurrentLinkedQueue();
    private final Semaphore nextMessage = new Semaphore(0);

    public LimitPoller(MessagingProvider messagingProvider, String str, long j, long j2) {
        this.minOffset = j;
        this.maxMessages = j2;
        String assignTo = messagingProvider.assignTo(j);
        this.log.info("Fetching {} messages starting from {}", Long.valueOf(j2), Long.valueOf(j));
        this.headPoller = messagingProvider.createPoller(str, Reset.earliest, assignTo, new HandlerAdapter[]{HandlerAdapter.create(Messages.PackageMessage.class, this::handlePackage)});
    }

    public List<FullMessage<Messages.PackageMessage>> fetch(Duration duration) {
        try {
            try {
                boolean tryAcquire = this.nextMessage.tryAcquire(CONNECT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
                while (!tryAcquire && this.messages.size() < this.maxMessages) {
                    tryAcquire = !this.nextMessage.tryAcquire(duration.toMillis(), TimeUnit.MILLISECONDS);
                }
                ArrayList arrayList = new ArrayList(this.messages);
                this.log.info("Fetched {} messages", Integer.valueOf(arrayList.size()));
                IOUtils.closeQuietly(this.headPoller);
                return arrayList;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e.getMessage(), e);
            }
        } catch (Throwable th) {
            IOUtils.closeQuietly(this.headPoller);
            throw th;
        }
    }

    private void handlePackage(MessageInfo messageInfo, Messages.PackageMessage packageMessage) {
        this.log.debug("Reading offset {}", Long.valueOf(messageInfo.getOffset()));
        if (this.messages.size() < this.maxMessages && messageInfo.getOffset() >= this.minOffset) {
            this.messages.add(new FullMessage<>(messageInfo, packageMessage));
        }
        this.nextMessage.release();
    }
}
