package org.apache.sling.distribution.journal.impl.queue.impl;

import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.sling.distribution.journal.JournalAvailable;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
@Component(service = {PubQueueCacheService.class, Runnable.class}, property = {"scheduler.concurrent:Boolean=false", "scheduler.period:Long=43200"})
/* loaded from: input_file:org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.class */
public class PubQueueCacheService implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(PubQueueCacheService.class);
    private static final int CLEANUP_THRESHOLD = 10000;
    private static final long CACHE_SEEDING_DELAY_MS = 10000;

    @Reference
    private JournalAvailable journalAvailable;

    @Reference
    private MessagingProvider messagingProvider;

    @Reference
    private Topics topics;

    @Reference
    private EventAdmin eventAdmin;

    @Reference
    private DistributionMetricsService distributionMetricsService;
    private volatile PubQueueCache cache;

    public PubQueueCacheService() {
    }

    public PubQueueCacheService(MessagingProvider messagingProvider, Topics topics, EventAdmin eventAdmin) {
        this.messagingProvider = messagingProvider;
        this.topics = topics;
        this.eventAdmin = eventAdmin;
    }

    @Activate
    public void activate() {
        this.cache = newCache();
        LOG.info("Started Publisher queue cache service");
    }

    @Deactivate
    public void deactivate() {
        if (this.cache != null) {
            this.cache.close();
        }
        LOG.info("Stopped Publisher queue cache service");
    }

    @Nonnull
    public OffsetQueue<DistributionQueueItem> getOffsetQueue(String str, long j) {
        try {
            return this.cache.getOffsetQueue(str, j);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    private void cleanup() {
        if (this.cache != null) {
            int size = this.cache.size();
            if (size <= 10000) {
                LOG.info("No cleanup required for package cache (size={}/{})", Integer.valueOf(size), 10000);
                return;
            }
            LOG.info("Cleanup package cache (size={}/{})", Integer.valueOf(size), 10000);
            this.cache.close();
            this.cache = newCache();
        }
    }

    private PubQueueCache newCache() {
        return new PubQueueCache(this.messagingProvider, this.eventAdmin, this.distributionMetricsService, this.topics.getPackageTopic(), CACHE_SEEDING_DELAY_MS);
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.info("Starting package cache cleanup task");
        cleanup();
        LOG.info("Stopping package cache cleanup task");
    }
}
