package org.apache.sling.event.impl.jobs.queues;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.sling.event.EventPropertiesMap;
import org.apache.sling.event.EventUtil;
import org.apache.sling.event.impl.EnvironmentComponent;
import org.apache.sling.event.impl.jobs.JobEvent;
import org.apache.sling.event.impl.jobs.JobStatusNotifier;
import org.apache.sling.event.impl.jobs.StatisticsImpl;
import org.apache.sling.event.impl.jobs.Utility;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
import org.apache.sling.event.jobs.Queue;
import org.apache.sling.event.jobs.Statistics;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.class */
public abstract class AbstractJobQueue extends StatisticsImpl implements JobStatusNotifier, Queue {
    private static final long DEFAULT_WAIT_FOR_ACK_IN_MS = 60000;
    private static final long MAX_SUSPEND_TIME = 3600000;
    protected final Logger logger;
    protected final InternalQueueConfiguration configuration;
    private final EnvironmentComponent environment;
    protected volatile String queueName;
    private volatile boolean markedForRemoval = false;
    protected volatile boolean isWaiting = false;
    private final Map<String, JobEvent> startedJobsLists = new HashMap();
    private final Map<String, JobEvent> processsingJobsLists = new HashMap();
    private final AtomicLong suspendedSince = new AtomicLong(-1);
    private final Object suspendLock = new Object();
    protected volatile boolean running = true;

    public AbstractJobQueue(String str, InternalQueueConfiguration internalQueueConfiguration, EnvironmentComponent environmentComponent) {
        this.queueName = str;
        this.configuration = internalQueueConfiguration;
        this.logger = LoggerFactory.getLogger(getClass().getName() + '.' + str);
        this.environment = environmentComponent;
    }

    @Override // org.apache.sling.event.jobs.Queue
    public String getStateInfo() {
        return "isWaiting=" + this.isWaiting + ", markedForRemoval=" + this.markedForRemoval + ", suspendedSince=" + this.suspendedSince.longValue();
    }

    public void start() {
        Thread thread = new Thread(new Runnable() { // from class: org.apache.sling.event.impl.jobs.queues.AbstractJobQueue.1
            @Override // java.lang.Runnable
            public void run() {
                while (AbstractJobQueue.this.running) {
                    AbstractJobQueue.this.logger.info("Starting job queue {}", AbstractJobQueue.this.queueName);
                    AbstractJobQueue.this.logger.debug("Configuration for job queue={}", AbstractJobQueue.this.configuration);
                    try {
                        AbstractJobQueue.this.runJobQueue();
                    } catch (Throwable th) {
                        AbstractJobQueue.this.logger.error("Job queue " + AbstractJobQueue.this.queueName + " stopped with exception: " + th.getMessage() + ". Restarting.", th);
                    }
                }
            }
        }, "Apache Sling Job Queue " + this.queueName);
        thread.setDaemon(true);
        thread.start();
    }

    @Override // org.apache.sling.event.jobs.Queue
    public InternalQueueConfiguration getConfiguration() {
        return this.configuration;
    }

    public void close() {
        this.running = false;
        this.logger.debug("Shutting down job queue {}", this.queueName);
        this.logger.debug("Waking up sleeping queue {}", this.queueName);
        resume();
        if (this.isWaiting) {
            this.logger.debug("Waking up waiting queue {}", this.queueName);
            notifyFinished(null);
        }
        put(new JobEvent(null, null) { // from class: org.apache.sling.event.impl.jobs.queues.AbstractJobQueue.2
            @Override // org.apache.sling.event.impl.jobs.JobEvent
            public boolean lock() {
                return false;
            }

            @Override // org.apache.sling.event.impl.jobs.JobEvent
            public void unlock() {
            }

            @Override // org.apache.sling.event.impl.jobs.JobEvent
            public void finished() {
            }

            @Override // org.apache.sling.event.impl.jobs.JobEvent
            public void restart() {
            }

            @Override // org.apache.sling.event.impl.jobs.JobEvent
            public boolean remove() {
                return true;
            }

            @Override // org.apache.sling.event.impl.jobs.JobEvent
            public boolean reschedule() {
                return false;
            }
        });
        this.processsingJobsLists.clear();
        this.startedJobsLists.clear();
        this.logger.info("Stopped job queue {}", this.queueName);
    }

    public void checkForUnprocessedJobs() {
        boolean z;
        if (this.running) {
            long currentTimeMillis = System.currentTimeMillis() - DEFAULT_WAIT_FOR_ACK_IN_MS;
            ArrayList<JobEvent> arrayList = new ArrayList();
            synchronized (this.startedJobsLists) {
                for (Map.Entry<String, JobEvent> entry : this.startedJobsLists.entrySet()) {
                    if (entry.getValue().started <= currentTimeMillis) {
                        arrayList.add(entry.getValue());
                    }
                }
            }
            if (arrayList.size() > 0) {
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                    ignoreException(e);
                }
            }
            for (JobEvent jobEvent : arrayList) {
                synchronized (this.startedJobsLists) {
                    z = this.startedJobsLists.remove(jobEvent.uniqueId) != null;
                }
                if (z) {
                    if (jobEvent.reschedule()) {
                        this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", EventUtil.toString(jobEvent.event), jobEvent.uniqueId);
                        checkForNotify(jobEvent);
                    } else {
                        decQueued();
                        checkForNotify(null);
                    }
                }
            }
        }
    }

    @Override // org.apache.sling.event.impl.jobs.JobStatusNotifier
    public boolean sendAcknowledge(Event event) {
        JobEvent remove;
        String str = (String) event.getProperty("slingevent:eventId");
        synchronized (this.startedJobsLists) {
            remove = this.startedJobsLists.remove(str);
        }
        if (remove != null) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Received ack for job {}", EventUtil.toString(event));
            }
            long j = remove.started - remove.queued;
            addActive(j);
            Utility.sendNotification(this.environment, "org/apache/sling/event/notification/job/START", event, Long.valueOf(j));
            synchronized (this.processsingJobsLists) {
                this.processsingJobsLists.put(str, remove);
            }
        } else {
            decQueued();
        }
        return remove != null;
    }

    private boolean handleReschedule(JobEvent jobEvent, boolean z) {
        boolean z2 = z;
        if (z) {
            int maxRetries = this.configuration.getMaxRetries();
            if (jobEvent.event.getProperty("event.job.retries") != null) {
                maxRetries = ((Integer) jobEvent.event.getProperty("event.job.retries")).intValue();
            }
            int i = 0;
            if (jobEvent.event.getProperty("event.job.retrycount") != null) {
                i = ((Integer) jobEvent.event.getProperty("event.job.retrycount")).intValue();
            }
            int i2 = i + 1;
            if (maxRetries != -1 && i2 > maxRetries) {
                z2 = false;
            }
            if (z2) {
                EventPropertiesMap eventPropertiesMap = new EventPropertiesMap(jobEvent.event);
                eventPropertiesMap.put((EventPropertiesMap) "event.job.retrycount", (String) Integer.valueOf(i2));
                eventPropertiesMap.put((EventPropertiesMap) "event.job.retries", (String) Integer.valueOf(maxRetries));
                jobEvent.event = new Event(jobEvent.event.getTopic(), eventPropertiesMap);
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Failed job {}", EventUtil.toString(jobEvent.event));
                }
                failedJob();
                jobEvent.queued = System.currentTimeMillis();
                Utility.sendNotification(this.environment, "org/apache/sling/event/notification/job/FAILED", jobEvent.event, null);
            } else {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Cancelled job {}", EventUtil.toString(jobEvent.event));
                }
                cancelledJob();
                Utility.sendNotification(this.environment, "org/apache/sling/event/notification/job/CANCELLED", jobEvent.event, null);
            }
        } else {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Finished job {}", EventUtil.toString(jobEvent.event));
            }
            long currentTimeMillis = System.currentTimeMillis() - jobEvent.started;
            finishedJob(currentTimeMillis);
            Utility.sendNotification(this.environment, "org/apache/sling/event/notification/job/FINISHED", jobEvent.event, Long.valueOf(currentTimeMillis));
        }
        return z2;
    }

    @Override // org.apache.sling.event.impl.jobs.JobStatusNotifier
    public boolean finishedJob(Event event, boolean z) {
        JobEvent remove;
        boolean reschedule;
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Received finish for job {}, shouldReschedule={}", EventUtil.toString(event), Boolean.valueOf(z));
        }
        if (!this.running) {
            if (!this.logger.isDebugEnabled()) {
                return false;
            }
            this.logger.debug("Queue is not running anymore. Discarding finish for {}", EventUtil.toString(event));
            return false;
        }
        String str = (String) event.getProperty("slingevent:eventId");
        synchronized (this.startedJobsLists) {
            this.startedJobsLists.remove(str);
        }
        synchronized (this.processsingJobsLists) {
            remove = this.processsingJobsLists.remove(str);
        }
        if (remove == null) {
            if (!this.logger.isDebugEnabled()) {
                return false;
            }
            this.logger.debug("This job has never been started by this queue: {}", EventUtil.toString(event));
            return false;
        }
        boolean handleReschedule = handleReschedule(remove, z);
        if (handleReschedule) {
            reschedule = remove.reschedule();
        } else {
            remove.finished();
            reschedule = true;
        }
        if (reschedule && handleReschedule) {
            checkForNotify(remove);
            return true;
        }
        checkForNotify(null);
        return false;
    }

    private void checkForNotify(JobEvent jobEvent) {
        JobEvent jobEvent2 = null;
        if (jobEvent != null) {
            jobEvent2 = reschedule(jobEvent);
        }
        notifyFinished(jobEvent2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean canBeMarkedForRemoval() {
        return isEmpty() && !this.isWaiting;
    }

    public void markForRemoval() {
        if (canBeMarkedForRemoval()) {
            this.markedForRemoval = true;
        }
    }

    public boolean isMarkedForRemoval() {
        if (!this.markedForRemoval) {
            return false;
        }
        if (canBeMarkedForRemoval()) {
            return true;
        }
        this.markedForRemoval = false;
        return false;
    }

    @Override // org.apache.sling.event.jobs.Queue
    public String getName() {
        return this.queueName;
    }

    public void process(JobEvent jobEvent) {
        put(jobEvent);
        jobEvent.queued = System.currentTimeMillis();
        incQueued();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void runJobQueue() {
        JobEvent jobEvent = null;
        while (this.running) {
            while (this.suspendedSince.longValue() != -1) {
                synchronized (this.suspendLock) {
                    try {
                        this.suspendLock.wait(MAX_SUSPEND_TIME);
                    } catch (InterruptedException e) {
                        ignoreException(e);
                    }
                    if (System.currentTimeMillis() > this.suspendedSince.longValue() + MAX_SUSPEND_TIME) {
                        this.suspendedSince.set(-1L);
                    }
                }
            }
            if (jobEvent == null) {
                jobEvent = take();
            }
            if (jobEvent != null && this.running) {
                jobEvent = start(jobEvent);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean executeJob(JobEvent jobEvent) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Executing job {}.", EventUtil.toString(jobEvent.event));
        }
        if (jobEvent.lock()) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Starting job {}", EventUtil.toString(jobEvent.event));
            }
            try {
                try {
                    Event jobEvent2 = getJobEvent(jobEvent);
                    EventAdmin eventAdmin = this.environment.getEventAdmin();
                    jobEvent.started = System.currentTimeMillis();
                    synchronized (this.startedJobsLists) {
                        this.startedJobsLists.put(jobEvent.uniqueId, jobEvent);
                    }
                    eventAdmin.postEvent(jobEvent2);
                    if (0 != 0) {
                        jobEvent.unlock();
                    }
                    return true;
                } catch (Exception e) {
                    this.logger.error("Exception during job processing.", e);
                    if (1 != 0) {
                        jobEvent.unlock();
                    }
                }
            } catch (Throwable th) {
                if (1 != 0) {
                    jobEvent.unlock();
                }
                throw th;
            }
        }
        decQueued();
        return false;
    }

    private Event getJobEvent(JobEvent jobEvent) {
        String str = (String) jobEvent.event.getProperty("event.job.topic");
        EventPropertiesMap eventPropertiesMap = new EventPropertiesMap(jobEvent.event);
        eventPropertiesMap.put((EventPropertiesMap) JobStatusNotifier.CONTEXT_PROPERTY_NAME, (String) new JobStatusNotifier.NotifierContext(this));
        eventPropertiesMap.remove(EventUtil.PROPERTY_DISTRIBUTE);
        eventPropertiesMap.remove(EventUtil.PROPERTY_APPLICATION);
        if (eventPropertiesMap.get("event.job.priority") == null) {
            eventPropertiesMap.put((EventPropertiesMap) "event.job.priority", (String) this.configuration.getPriority());
        }
        eventPropertiesMap.put((EventPropertiesMap) "event.job.queuename", jobEvent.queueName);
        return new Event(str, eventPropertiesMap);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void ignoreException(Exception exc) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Ignored exception " + exc.getMessage(), exc);
        }
    }

    public void rename(String str) {
        this.logger.info("Queue reconfiguration: old queue {} is renamed to {}.", this.queueName, str);
        this.queueName = str;
    }

    protected abstract JobEvent reschedule(JobEvent jobEvent);

    @Override // org.apache.sling.event.jobs.Queue
    public Statistics getStatistics() {
        return this;
    }

    @Override // org.apache.sling.event.jobs.Queue
    public void resume() {
        if (isSuspended()) {
            synchronized (this.suspendLock) {
                this.suspendLock.notify();
            }
        }
        this.suspendedSince.set(-1L);
    }

    @Override // org.apache.sling.event.jobs.Queue
    public void suspend() {
        this.suspendedSince.compareAndSet(-1L, System.currentTimeMillis());
    }

    @Override // org.apache.sling.event.jobs.Queue
    public boolean isSuspended() {
        return this.suspendedSince.longValue() != -1;
    }

    @Override // org.apache.sling.event.jobs.Queue
    public synchronized void removeAll() {
        boolean isSuspended = isSuspended();
        suspend();
        final Collection<JobEvent> removeAllJobs = removeAllJobs();
        clearQueued();
        Thread thread = new Thread(new Runnable() { // from class: org.apache.sling.event.impl.jobs.queues.AbstractJobQueue.3
            @Override // java.lang.Runnable
            public void run() {
                Iterator it = removeAllJobs.iterator();
                while (it.hasNext()) {
                    ((JobEvent) it.next()).remove();
                }
            }
        }, "Queue RemoveAll Thread for " + this.queueName);
        thread.setDaemon(true);
        thread.start();
        if (isSuspended) {
            return;
        }
        resume();
    }

    @Override // org.apache.sling.event.jobs.Queue
    public void clear() {
        clearQueued();
    }

    @Override // org.apache.sling.event.jobs.Queue
    public Object getState(String str) {
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract void put(JobEvent jobEvent);

    protected abstract JobEvent take();

    protected abstract boolean isEmpty();

    protected abstract Collection<JobEvent> removeAllJobs();

    protected abstract JobEvent start(JobEvent jobEvent);

    protected abstract void notifyFinished(JobEvent jobEvent);
}
