package org.apache.sling.distribution.journal.impl.queue.impl;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.messages.Messages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
/* loaded from: input_file:org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.class */
public class RangePoller {
    private static final Logger LOG = LoggerFactory.getLogger(RangePoller.class);
    private final long maxOffset;
    private final long minOffset;
    private final Closeable headPoller;
    private final CountDownLatch fetched = new CountDownLatch(1);
    private final List<FullMessage<Messages.PackageMessage>> messages = new ArrayList();

    public RangePoller(MessagingProvider messagingProvider, String str, long j, long j2) {
        this.maxOffset = j2;
        this.minOffset = j;
        String assignTo = messagingProvider.assignTo(j);
        LOG.info("Fetching offsets [{},{}[", Long.valueOf(j), Long.valueOf(j2));
        this.headPoller = messagingProvider.createPoller(str, Reset.earliest, assignTo, new HandlerAdapter[]{HandlerAdapter.create(Messages.PackageMessage.class, this::handlePackage)});
    }

    public List<FullMessage<Messages.PackageMessage>> fetchRange() throws InterruptedException {
        try {
            this.fetched.await();
            LOG.info("Fetched offsets [{},{}[", Long.valueOf(this.minOffset), Long.valueOf(this.maxOffset));
            return this.messages;
        } finally {
            IOUtils.closeQuietly(this.headPoller);
        }
    }

    private void handlePackage(MessageInfo messageInfo, Messages.PackageMessage packageMessage) {
        long offset = messageInfo.getOffset();
        LOG.debug(String.format("Reading offset %s", Long.valueOf(offset)));
        if (offset < this.maxOffset) {
            this.messages.add(new FullMessage<>(messageInfo, packageMessage));
        } else {
            this.fetched.countDown();
        }
    }
}
