package org.apache.sling.distribution.journal.impl.queue.impl;

import java.io.Closeable;
import java.util.UUID;
import java.util.function.LongConsumer;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingException;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.RunnableUtil;
import org.apache.sling.distribution.journal.messages.Messages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.class */
public class QueueCacheSeeder implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(QueueCacheSeeder.class);
    private static final long CACHE_SEEDING_DELAY_MS = 10000;
    private final String topic;
    private final MessagingProvider provider;
    private volatile Closeable poller;
    private volatile boolean closed;

    public QueueCacheSeeder(MessagingProvider messagingProvider, String str) {
        this.provider = messagingProvider;
        this.topic = str;
    }

    public void seedOne() {
        RunnableUtil.startBackgroundThread(this::sendSeedingMessage, "Seeder thread - one seed");
    }

    public void seed(LongConsumer longConsumer) {
        this.poller = this.provider.createPoller(this.topic, Reset.latest, new HandlerAdapter[]{HandlerAdapter.create(Messages.PackageMessage.class, (messageInfo, packageMessage) -> {
            close();
            longConsumer.accept(messageInfo.getOffset());
        })});
        RunnableUtil.startBackgroundThread(this::sendSeedingMessages, "Seeder thread");
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.closed = true;
        IOUtils.closeQuietly(this.poller);
    }

    private void sendSeedingMessages() {
        LOG.info("Start message seeder");
        try {
            MessageSender<Messages.PackageMessage> createSender = this.provider.createSender();
            while (!this.closed) {
                sendSeedingMessage(createSender);
                delay(CACHE_SEEDING_DELAY_MS);
            }
            LOG.info("Stop message seeder");
        } catch (Throwable th) {
            LOG.info("Stop message seeder");
            throw th;
        }
    }

    private void sendSeedingMessage() {
        sendSeedingMessage(this.provider.createSender());
    }

    private void sendSeedingMessage(MessageSender<Messages.PackageMessage> messageSender) {
        Messages.PackageMessage createTestMessage = createTestMessage();
        LOG.info("Send seeding message");
        try {
            messageSender.send(this.topic, createTestMessage);
        } catch (MessagingException e) {
            LOG.warn(e.getMessage(), e);
            delay(100000L);
        }
    }

    private static void delay(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    protected Messages.PackageMessage createTestMessage() {
        return Messages.PackageMessage.newBuilder().setPubSlingId("seeder").setPkgId(UUID.randomUUID().toString()).setPkgType("seeder").setReqType(Messages.PackageMessage.ReqType.TEST).build();
    }
}
