package org.apache.sling.event.impl.jobs.queues;

import com.mongodb.util.TimeConstants;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Properties;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.commons.threads.ThreadPool;
import org.apache.sling.commons.threads.ThreadPoolManager;
import org.apache.sling.event.impl.EventingThreadPool;
import org.apache.sling.event.impl.jobs.JobConsumerManager;
import org.apache.sling.event.impl.jobs.JobHandler;
import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.impl.jobs.config.ConfigurationChangeListener;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
import org.apache.sling.event.impl.jobs.jmx.QueueStatusEvent;
import org.apache.sling.event.impl.jobs.jmx.QueuesMBeanImpl;
import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.NotificationConstants;
import org.apache.sling.event.jobs.Queue;
import org.apache.sling.event.jobs.jmx.QueuesMBean;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.event.EventConstants;
import org.osgi.service.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Service({Runnable.class, QueueManager.class, EventHandler.class})
@Component(immediate = true)
@Properties({@Property(name = Scheduler.PROPERTY_SCHEDULER_PERIOD, longValue = {TimeConstants.S_MINUTE}), @Property(name = Scheduler.PROPERTY_SCHEDULER_CONCURRENT, boolValue = {false}), @Property(name = EventConstants.EVENT_TOPIC, value = {NotificationConstants.TOPIC_JOB_ADDED})})
/* loaded from: input_file:resources/install/0/org.apache.sling.event-3.7.4.jar:org/apache/sling/event/impl/jobs/queues/QueueManager.class */
public class QueueManager implements Runnable, EventHandler, ConfigurationChangeListener {

    @Reference
    private EventAdmin eventAdmin;

    @Reference
    private Scheduler scheduler;

    @Reference
    private JobConsumerManager jobConsumerManager;

    @Reference
    private QueuesMBean queuesMBean;

    @Reference
    private ThreadPoolManager threadPoolManager;

    @Reference(referenceInterface = EventingThreadPool.class)
    private ThreadPool threadPool;

    @Reference
    private JobManagerConfiguration configuration;

    @Reference
    private StatisticsManager statisticsManager;
    private volatile long schedulerRuns;
    private volatile QueueServices queueServices;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final Object queuesLock = new Object();
    private final Map<String, JobQueueImpl> queues = new ConcurrentHashMap();
    private final AtomicBoolean isActive = new AtomicBoolean(false);

    @Activate
    protected void activate(Map<String, Object> map) {
        this.logger.info("Apache Sling Queue Manager started on instance {}", Environment.APPLICATION_ID);
        this.queueServices = new QueueServices();
        this.queueServices.configuration = this.configuration;
        this.queueServices.eventAdmin = this.eventAdmin;
        this.queueServices.jobConsumerManager = this.jobConsumerManager;
        this.queueServices.scheduler = this.scheduler;
        this.queueServices.threadPoolManager = this.threadPoolManager;
        this.queueServices.statisticsManager = this.statisticsManager;
        this.queueServices.eventingThreadPool = this.threadPool;
        this.configuration.addListener(this);
    }

    @Deactivate
    protected void deactivate() {
        this.logger.debug("Apache Sling Queue Manager stopping on instance {}", Environment.APPLICATION_ID);
        this.configuration.removeListener(this);
        for (JobQueueImpl jobQueueImpl : this.queues.values()) {
            jobQueueImpl.close();
            ((QueuesMBeanImpl) this.queuesMBean).sendEvent(new QueueStatusEvent(null, jobQueueImpl));
        }
        this.queues.clear();
        this.queueServices = null;
        this.logger.info("Apache Sling Queue Manager stopped on instance {}", Environment.APPLICATION_ID);
    }

    private void maintain() {
        this.schedulerRuns++;
        this.logger.debug("Queue manager maintenance: Starting #{}", Long.valueOf(this.schedulerRuns));
        if (this.isActive.get()) {
            Iterator<JobQueueImpl> it = this.queues.values().iterator();
            while (it.hasNext()) {
                it.next().maintain();
            }
        }
        if (this.schedulerRuns % 3 == 0 && this.isActive.get()) {
            fullTopicScan();
        }
        if (this.schedulerRuns % 5 == 0) {
            this.logger.debug("Checking for idle queues...");
            synchronized (this.queuesLock) {
                Iterator<Map.Entry<String, JobQueueImpl>> it2 = this.queues.entrySet().iterator();
                while (it2.hasNext()) {
                    JobQueueImpl value = it2.next().getValue();
                    if (value.tryToClose()) {
                        this.logger.debug("Removing idle job queue {}", value);
                        it2.remove();
                        ((QueuesMBeanImpl) this.queuesMBean).sendEvent(new QueueStatusEvent(null, value));
                    }
                }
            }
        }
        this.logger.debug("Queue manager maintenance: Finished #{}", Long.valueOf(this.schedulerRuns));
    }

    private void start(QueueConfigurationManager.QueueInfo queueInfo, Set<String> set) {
        JobQueueImpl jobQueueImpl;
        InternalQueueConfiguration internalQueueConfiguration = queueInfo.queueConfiguration;
        boolean z = false;
        synchronized (this.queuesLock) {
            jobQueueImpl = this.queues.get(queueInfo.queueName);
            if (jobQueueImpl != null && jobQueueImpl.getConfiguration() != internalQueueConfiguration) {
                outdateQueue(jobQueueImpl);
                jobQueueImpl = null;
            }
            if (jobQueueImpl == null) {
                jobQueueImpl = JobQueueImpl.createQueue(queueInfo.queueName, internalQueueConfiguration, this.queueServices, set);
                if (jobQueueImpl != null) {
                    z = true;
                    this.queues.put(queueInfo.queueName, jobQueueImpl);
                    ((QueuesMBeanImpl) this.queuesMBean).sendEvent(new QueueStatusEvent(jobQueueImpl, null));
                }
            }
        }
        if (jobQueueImpl != null) {
            if (!z) {
                jobQueueImpl.wakeUpQueue(set);
            }
            jobQueueImpl.startJobs();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        maintain();
    }

    private void outdateQueue(JobQueueImpl jobQueueImpl) {
        this.queues.remove(ResourceHelper.filterQueueName(jobQueueImpl.getName()));
        if (jobQueueImpl.tryToClose()) {
            ((QueuesMBeanImpl) this.queuesMBean).sendEvent(new QueueStatusEvent(null, jobQueueImpl));
            return;
        }
        jobQueueImpl.outdate();
        String filterName = ResourceHelper.filterName(jobQueueImpl.getName());
        int i = 0;
        while (this.queues.containsKey(filterName)) {
            int i2 = i;
            i++;
            filterName = ResourceHelper.filterName(jobQueueImpl.getName()) + '$' + String.valueOf(i2);
        }
        this.queues.put(filterName, jobQueueImpl);
        ((QueuesMBeanImpl) this.queuesMBean).sendEvent(new QueueStatusEvent(jobQueueImpl, jobQueueImpl));
    }

    private void restart() {
        synchronized (this.queuesLock) {
            Iterator it = new ArrayList(this.queues.values()).iterator();
            while (it.hasNext()) {
                outdateQueue((JobQueueImpl) it.next());
            }
        }
        if (this.configuration != null) {
            Iterator<Job> it2 = this.configuration.clearJobRetryList().iterator();
            while (it2.hasNext()) {
                new JobHandler((JobImpl) it2.next(), null, this.configuration).reschedule();
            }
        }
    }

    public Queue getQueue(String str) {
        return this.queues.get(str);
    }

    public Iterable<Queue> getQueues() {
        final Iterator<JobQueueImpl> it = this.queues.values().iterator();
        return new Iterable<Queue>() { // from class: org.apache.sling.event.impl.jobs.queues.QueueManager.1
            @Override // java.lang.Iterable
            public Iterator<Queue> iterator() {
                return new Iterator<Queue>() { // from class: org.apache.sling.event.impl.jobs.queues.QueueManager.1.1
                    @Override // java.util.Iterator
                    public boolean hasNext() {
                        return it.hasNext();
                    }

                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.Iterator
                    public Queue next() {
                        return (Queue) it.next();
                    }

                    @Override // java.util.Iterator
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };
            }
        };
    }

    @Override // org.apache.sling.event.impl.jobs.config.ConfigurationChangeListener
    public void configurationChanged(boolean z) {
        if (this.configuration != null) {
            this.logger.debug("Topology changed {}", Boolean.valueOf(z));
            this.isActive.set(z);
            if (z) {
                fullTopicScan();
            } else {
                restart();
            }
        }
    }

    private void fullTopicScan() {
        this.logger.debug("Scanning repository for existing topics...");
        for (Map.Entry<QueueConfigurationManager.QueueInfo, Set<String>> entry : updateTopicMapping(scanTopics()).entrySet()) {
            start(entry.getKey(), entry.getValue());
        }
    }

    private Set<String> scanTopics() {
        HashSet hashSet = new HashSet();
        ResourceResolver createResourceResolver = this.configuration.createResourceResolver();
        try {
            Resource resource = createResourceResolver.getResource(this.configuration.getLocalJobsPath());
            if (resource != null) {
                Iterator<Resource> listChildren = resource.listChildren();
                while (listChildren.hasNext()) {
                    String replace = listChildren.next().getName().replace('.', '/');
                    this.logger.debug("Found topic {}", replace);
                    hashSet.add(replace);
                }
            }
            return hashSet;
        } finally {
            createResourceResolver.close();
        }
    }

    @Override // org.osgi.service.event.EventHandler
    public void handleEvent(Event event) {
        String str = (String) event.getProperty("event.job.topic");
        if (!this.isActive.get() || str == null) {
            return;
        }
        start(this.configuration.getQueueConfigurationManager().getQueueInfo(str), Collections.singleton(str));
    }

    private Map<QueueConfigurationManager.QueueInfo, Set<String>> updateTopicMapping(Set<String> set) {
        HashMap hashMap = new HashMap();
        for (String str : set) {
            QueueConfigurationManager.QueueInfo queueInfo = this.configuration.getQueueConfigurationManager().getQueueInfo(str);
            Set set2 = (Set) hashMap.get(queueInfo);
            if (set2 == null) {
                set2 = new HashSet();
                hashMap.put(queueInfo, set2);
            }
            set2.add(str);
        }
        this.logger.debug("Established new topic mapping: {}", hashMap);
        return hashMap;
    }

    protected void bindThreadPool(EventingThreadPool eventingThreadPool) {
        this.threadPool = eventingThreadPool;
    }

    protected void unbindThreadPool(EventingThreadPool eventingThreadPool) {
        if (this.threadPool == eventingThreadPool) {
            this.threadPool = null;
        }
    }

    protected void bindEventAdmin(EventAdmin eventAdmin) {
        this.eventAdmin = eventAdmin;
    }

    protected void unbindEventAdmin(EventAdmin eventAdmin) {
        if (this.eventAdmin == eventAdmin) {
            this.eventAdmin = null;
        }
    }

    protected void bindScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    protected void unbindScheduler(Scheduler scheduler) {
        if (this.scheduler == scheduler) {
            this.scheduler = null;
        }
    }

    protected void bindJobConsumerManager(JobConsumerManager jobConsumerManager) {
        this.jobConsumerManager = jobConsumerManager;
    }

    protected void unbindJobConsumerManager(JobConsumerManager jobConsumerManager) {
        if (this.jobConsumerManager == jobConsumerManager) {
            this.jobConsumerManager = null;
        }
    }

    protected void bindQueuesMBean(QueuesMBean queuesMBean) {
        this.queuesMBean = queuesMBean;
    }

    protected void unbindQueuesMBean(QueuesMBean queuesMBean) {
        if (this.queuesMBean == queuesMBean) {
            this.queuesMBean = null;
        }
    }

    protected void bindThreadPoolManager(ThreadPoolManager threadPoolManager) {
        this.threadPoolManager = threadPoolManager;
    }

    protected void unbindThreadPoolManager(ThreadPoolManager threadPoolManager) {
        if (this.threadPoolManager == threadPoolManager) {
            this.threadPoolManager = null;
        }
    }

    protected void bindConfiguration(JobManagerConfiguration jobManagerConfiguration) {
        this.configuration = jobManagerConfiguration;
    }

    protected void unbindConfiguration(JobManagerConfiguration jobManagerConfiguration) {
        if (this.configuration == jobManagerConfiguration) {
            this.configuration = null;
        }
    }

    protected void bindStatisticsManager(StatisticsManager statisticsManager) {
        this.statisticsManager = statisticsManager;
    }

    protected void unbindStatisticsManager(StatisticsManager statisticsManager) {
        if (this.statisticsManager == statisticsManager) {
            this.statisticsManager = null;
        }
    }
}
