package org.apache.sling.distribution.queue.impl.simple;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FilenameFilter;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.json.JsonException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;
import org.apache.sling.commons.scheduler.ScheduleOptions;
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.distribution.queue.DistributionQueueType;
import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.class */
public class SimpleDistributionQueueProvider implements DistributionQueueProvider {
    public static final String TYPE = "simple";
    public static final String TYPE_CHECKPOINT = "simple-checkpoint";
    private final String name;
    private final Scheduler scheduler;
    private final boolean checkpoint;
    private File checkpointDirectory;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final Map<String, SimpleDistributionQueue> queueMap = new ConcurrentHashMap();

    public SimpleDistributionQueueProvider(Scheduler scheduler, String str, boolean z) {
        this.checkpoint = z;
        if (str == null || scheduler == null) {
            throw new IllegalArgumentException("all arguments are required");
        }
        if (z) {
            this.checkpointDirectory = new File(str + "-simple-queues-checkpoints");
            this.log.info("creating checkpoint directory {}", this.checkpointDirectory.getAbsoluteFile());
            if (this.checkpointDirectory.exists() && !this.checkpointDirectory.isDirectory() && !$assertionsDisabled && !this.checkpointDirectory.delete()) {
                throw new AssertionError();
            }
            this.log.info("checkpoint directory created: {}, exists {}", Boolean.valueOf(this.checkpointDirectory.exists() ? false : this.checkpointDirectory.mkdir()), Boolean.valueOf(this.checkpointDirectory.isDirectory() && this.checkpointDirectory.exists()));
        }
        this.scheduler = scheduler;
        this.name = str;
    }

    @Override // org.apache.sling.distribution.queue.impl.DistributionQueueProvider
    @NotNull
    public DistributionQueue getQueue(@NotNull String str) {
        String key = getKey(str);
        SimpleDistributionQueue simpleDistributionQueue = this.queueMap.get(key);
        if (simpleDistributionQueue == null) {
            this.log.debug("creating a queue with key {}", key);
            simpleDistributionQueue = new SimpleDistributionQueue(this.name, str);
            this.queueMap.put(key, simpleDistributionQueue);
            this.log.debug("queue created {}", simpleDistributionQueue);
        }
        return simpleDistributionQueue;
    }

    @Override // org.apache.sling.distribution.queue.impl.DistributionQueueProvider
    public DistributionQueue getQueue(@NotNull String str, @NotNull DistributionQueueType distributionQueueType) {
        return getQueue(str);
    }

    Collection<SimpleDistributionQueue> getQueues() {
        return this.queueMap.values();
    }

    @Override // org.apache.sling.distribution.queue.impl.DistributionQueueProvider
    public void enableQueueProcessing(@NotNull DistributionQueueProcessor distributionQueueProcessor, String... strArr) {
        if (this.checkpoint) {
            QueueItemMapper queueItemMapper = new QueueItemMapper();
            this.log.debug("recovering from checkpoints if needed");
            for (final String str : strArr) {
                this.log.debug("recovering for queue {}", str);
                DistributionQueue queue = getQueue(str);
                for (File file : this.checkpointDirectory.listFiles(new FilenameFilter() { // from class: org.apache.sling.distribution.queue.impl.simple.SimpleDistributionQueueProvider.1
                    @Override // java.io.FilenameFilter
                    public boolean accept(File file2, String str2) {
                        return str2.equals(str + "-checkpoint");
                    }
                })) {
                    this.log.info("recovering from checkpoint {}", file);
                    try {
                        LineIterator lineIterator = IOUtils.lineIterator(new FileReader(file));
                        while (lineIterator.hasNext()) {
                            queue.add(queueItemMapper.readQueueItem(lineIterator.nextLine()));
                        }
                        this.log.info("recovered {} items from queue {}", Integer.valueOf(queue.getStatus().getItemsCount()), str);
                    } catch (FileNotFoundException e) {
                        this.log.warn("could not read checkpoint file {}", file.getAbsolutePath());
                    } catch (JsonException e2) {
                        this.log.warn("could not parse info from checkpoint file {}", file.getAbsolutePath());
                    }
                }
            }
            for (String str2 : strArr) {
                this.scheduler.schedule(new SimpleDistributionQueueCheckpoint(getQueue(str2), this.checkpointDirectory), this.scheduler.NOW(-1, 15L).canRunConcurrently(false).name(getJobName(str2 + "-checkpoint")));
            }
        }
        for (String str3 : strArr) {
            ScheduleOptions name = this.scheduler.NOW(-1, 1L).canRunConcurrently(false).name(getJobName(str3));
            SimpleDistributionQueue simpleDistributionQueue = (SimpleDistributionQueue) getQueue(str3);
            simpleDistributionQueue.getClass();
            this.scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(str3), distributionQueueProcessor, simpleDistributionQueue::recordProcessingAttempt), name);
        }
    }

    @Override // org.apache.sling.distribution.queue.impl.DistributionQueueProvider
    public void disableQueueProcessing() {
        for (SimpleDistributionQueue simpleDistributionQueue : getQueues()) {
            String name = simpleDistributionQueue.getName();
            if (this.scheduler.unschedule(getJobName(name))) {
                this.log.debug("queue processing on {} stopped", simpleDistributionQueue);
            } else {
                this.log.warn("could not disable queue processing on {}", simpleDistributionQueue);
            }
            if (this.checkpoint) {
                if (this.scheduler.unschedule(getJobName(name) + "-checkpoint")) {
                    this.log.debug("checkpoint on {} stopped", simpleDistributionQueue);
                } else {
                    this.log.warn("could not disable checkpoint on {}", simpleDistributionQueue);
                }
            }
        }
    }

    private String getKey(String str) {
        return this.name + "#" + str;
    }

    private String getJobName(String str) {
        return "simple-queueProcessor-" + this.name + "-" + str;
    }

    static {
        $assertionsDisabled = !SimpleDistributionQueueProvider.class.desiredAssertionStatus();
    }
}
