package org.apache.sling.distribution.journal.impl.subscriber;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.jackrabbit.commons.jackrabbit.SimpleReferenceBinary;
import org.apache.jackrabbit.vault.packaging.Packaging;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.commons.osgi.PropertiesUtil;
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.DistributionRequestState;
import org.apache.sling.distribution.DistributionRequestType;
import org.apache.sling.distribution.DistributionResponse;
import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.JournalAvailable;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.RunnableUtil;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
import org.apache.sling.distribution.journal.impl.queue.impl.SubQueue;
import org.apache.sling.distribution.journal.impl.shared.AgentState;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.impl.shared.JournalAvailableChecker;
import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.log.spi.DistributionLog;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.serviceusermapping.ServiceUserMapped;
import org.apache.sling.settings.SlingSettingsService;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.metatype.annotations.Designate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

@ParametersAreNonnullByDefault
@Designate(ocd = SubscriberConfiguration.class, factory = true)
@Component(service = {}, immediate = true, property = {"announceDelay=10000"}, configurationPid = {"org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory"})
/* loaded from: input_file:org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.class */
public class DistributionSubscriber implements DistributionAgent {
    private static final int PRECONDITION_TIMEOUT = 60;

    @Reference(name = "packageBuilder")
    private DistributionPackageBuilder packageBuilder;

    @Reference
    private SlingSettingsService slingSettings;

    @Reference
    private ResourceResolverFactory resolverFactory;

    @Reference
    private MessagingProvider messagingProvider;

    @Reference
    private Topics topics;

    @Reference
    private EventAdmin eventAdmin;

    @Reference
    private JournalAvailable journalAvailable;

    @Reference(name = "precondition")
    private Precondition precondition;

    @Reference
    private DistributionMetricsService distributionMetricsService;

    @Reference
    private ServiceUserMapped mappedUser;

    @Reference
    private Packaging packaging;
    private ServiceRegistration<DistributionAgent> componentReg;
    private DistributionMetricsService.GaugeService<Integer> retriesGauge;
    private Closeable packagePoller;
    private Closeable commandPoller;
    private LocalStore processedOffsets;
    private LocalStore processedStatuses;
    private MessageSender<Messages.PackageStatusMessage> sender;
    private Announcer announcer;
    private String subAgentName;
    private String subSlingId;
    private String pkgType;
    private int maxRetries;
    private boolean errorQueueEnabled;
    private boolean editable;
    private volatile Thread queueProcessor;
    private ContentPackageExtractor extractor;
    static int RETRY_DELAY = 5000;
    private static final int RETRY_SEND_DELAY = 1000;
    static int QUEUE_FETCH_DELAY = RETRY_SEND_DELAY;
    private static final Logger LOG = LoggerFactory.getLogger(DistributionSubscriber.class);
    private static final Set<DistributionRequestType> SUPPORTED_REQ_TYPES = Collections.emptySet();
    private final PackageRetries packageRetries = new PackageRetries();
    private final AtomicLong clearOffset = new AtomicLong(-1);
    private final BlockingQueue<DistributionQueueItem> queueItemsBuffer = new LinkedBlockingQueue(8);
    private Set<String> queueNames = Collections.emptySet();
    private volatile boolean running = true;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriber$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$sling$distribution$journal$messages$Messages$PackageMessage$ReqType = new int[Messages.PackageMessage.ReqType.values().length];

        static {
            try {
                $SwitchMap$org$apache$sling$distribution$journal$messages$Messages$PackageMessage$ReqType[Messages.PackageMessage.ReqType.ADD.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$sling$distribution$journal$messages$Messages$PackageMessage$ReqType[Messages.PackageMessage.ReqType.DELETE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$sling$distribution$journal$messages$Messages$PackageMessage$ReqType[Messages.PackageMessage.ReqType.TEST.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    @Activate
    public void activate(SubscriberConfiguration subscriberConfiguration, BundleContext bundleContext, Map<String, Object> map) {
        this.subSlingId = (String) Objects.requireNonNull(this.slingSettings.getSlingId());
        this.subAgentName = (String) Objects.requireNonNull(subscriberConfiguration.name());
        Objects.requireNonNull(subscriberConfiguration);
        Objects.requireNonNull(bundleContext);
        Objects.requireNonNull(this.packageBuilder);
        Objects.requireNonNull(this.slingSettings);
        Objects.requireNonNull(this.resolverFactory);
        Objects.requireNonNull(this.messagingProvider);
        Objects.requireNonNull(this.topics);
        Objects.requireNonNull(this.eventAdmin);
        Objects.requireNonNull(this.precondition);
        this.queueNames = getNotEmpty(subscriberConfiguration.agentNames());
        this.maxRetries = subscriberConfiguration.maxRetries();
        this.errorQueueEnabled = this.maxRetries >= 0;
        this.editable = subscriberConfiguration.editable();
        this.processedOffsets = new LocalStore(this.resolverFactory, "packages", this.subAgentName);
        long longValue = ((Long) this.processedOffsets.load("offset", -1L)).longValue() + 1;
        this.processedStatuses = new LocalStore(this.resolverFactory, "statuses", this.subAgentName);
        this.packagePoller = this.messagingProvider.createPoller(this.topics.getPackageTopic(), Reset.earliest, this.messagingProvider.assignTo(longValue), new HandlerAdapter[]{HandlerAdapter.create(Messages.PackageMessage.class, this::handlePackageMessage)});
        if (this.editable) {
            this.commandPoller = this.messagingProvider.createPoller(this.topics.getCommandTopic(), Reset.earliest, new HandlerAdapter[]{HandlerAdapter.create(Messages.CommandMessage.class, this::handleCommandMessage)});
        }
        this.queueProcessor = RunnableUtil.startBackgroundThread(this::processQueue, String.format("Queue Processor for Subscriber agent %s", this.subAgentName));
        this.sender = this.messagingProvider.createSender();
        String str = "distribution.journal.subscriber.current_retries;sub_name=" + subscriberConfiguration.name();
        DistributionMetricsService distributionMetricsService = this.distributionMetricsService;
        PackageRetries packageRetries = this.packageRetries;
        packageRetries.getClass();
        this.retriesGauge = distributionMetricsService.createGauge(str, "Retries of current package", packageRetries::getSum);
        this.announcer = new Announcer(this.subSlingId, this.subAgentName, this.topics.getDiscoveryTopic(), this.queueNames, this.messagingProvider.createSender(), this.processedOffsets, this.packageRetries, this.maxRetries, subscriberConfiguration.editable(), PropertiesUtil.toInteger(map.get("announceDelay"), 10000));
        this.pkgType = (String) Objects.requireNonNull(this.packageBuilder.getType());
        LOG.info(String.format("Started Subscriber agent %s at offset %s, subscribed to agent names %s with package builder %s editable %s maxRetries %s errorQueueEnabled %s", this.subAgentName, Long.valueOf(longValue), this.queueNames, this.pkgType, Boolean.valueOf(subscriberConfiguration.editable()), Integer.valueOf(this.maxRetries), Boolean.valueOf(this.errorQueueEnabled)));
        this.componentReg = bundleContext.registerService(DistributionAgent.class, this, createServiceProps(subscriberConfiguration));
        this.extractor = new ContentPackageExtractor(this.packaging, subscriberConfiguration.packageHandling());
    }

    private Set<String> getNotEmpty(String[] strArr) {
        return (Set) Arrays.asList(strArr).stream().filter((v0) -> {
            return StringUtils.isNotBlank(v0);
        }).collect(Collectors.toSet());
    }

    private Dictionary<String, Object> createServiceProps(SubscriberConfiguration subscriberConfiguration) {
        Hashtable hashtable = new Hashtable();
        hashtable.put("name", subscriberConfiguration.name());
        hashtable.put("title", subscriberConfiguration.name());
        hashtable.put("details", subscriberConfiguration.name());
        hashtable.put("agentNames", subscriberConfiguration.agentNames());
        hashtable.put("editable", Boolean.valueOf(subscriberConfiguration.editable()));
        hashtable.put("maxRetries", Integer.valueOf(subscriberConfiguration.maxRetries()));
        hashtable.put("packageBuilder.target", subscriberConfiguration.packageBuilder_target());
        hashtable.put("precondition.target", subscriberConfiguration.precondition_target());
        hashtable.put("webconsole.configurationFactory.nameHint", subscriberConfiguration.webconsole_configurationFactory_nameHint());
        return hashtable;
    }

    @Deactivate
    public void deactivate() {
        IOUtils.closeQuietly(this.retriesGauge);
        IOUtils.closeQuietly(this.announcer);
        this.componentReg.unregister();
        IOUtils.closeQuietly(this.packagePoller);
        IOUtils.closeQuietly(this.commandPoller);
        this.running = false;
        Thread thread = this.queueProcessor;
        if (thread != null) {
            thread.interrupt();
        }
        LOG.info(String.format("Stopped Subscriber agent %s, subscribed to Publisher agent names %s with package builder %s", this.subAgentName, this.queueNames, this.pkgType));
    }

    @Nonnull
    public Iterable<String> getQueueNames() {
        return this.queueNames;
    }

    public DistributionQueue getQueue(@Nonnull String str) {
        return new SubQueue(str, (DistributionQueueItem) this.queueItemsBuffer.stream().filter(distributionQueueItem -> {
            return isIn(str, distributionQueueItem);
        }).findFirst().orElse(null), this.packageRetries);
    }

    private boolean isIn(String str, DistributionQueueItem distributionQueueItem) {
        return str.equals(((Messages.PackageMessage) distributionQueueItem.get(QueueItemFactory.PACKAGE_MSG, Messages.PackageMessage.class)).getPubAgentName());
    }

    @Nonnull
    public DistributionLog getLog() {
        return this::emptyDistributionLog;
    }

    private List<String> emptyDistributionLog() {
        return Collections.emptyList();
    }

    @Nonnull
    public DistributionAgentState getState() {
        return AgentState.getState(this);
    }

    @Nonnull
    public DistributionResponse execute(ResourceResolver resourceResolver, DistributionRequest distributionRequest) throws DistributionException {
        return executeUnsupported(distributionRequest);
    }

    @Nonnull
    private DistributionResponse executeUnsupported(DistributionRequest distributionRequest) {
        String format = String.format("Request type %s is not supported by this agent, expected one of %s", distributionRequest.getRequestType(), SUPPORTED_REQ_TYPES);
        LOG.info(format);
        return new SimpleDistributionResponse(DistributionRequestState.DROPPED, format);
    }

    private void handlePackageMessage(MessageInfo messageInfo, Messages.PackageMessage packageMessage) {
        if (!this.queueNames.contains(packageMessage.getPubAgentName())) {
            LOG.info(String.format("Skipping package for Publisher agent %s (not subscribed)", packageMessage.getPubAgentName()));
        } else {
            if (!this.pkgType.equals(packageMessage.getPkgType())) {
                LOG.warn(String.format("Skipping package with type %s", packageMessage.getPkgType()));
                return;
            }
            try {
                enqueue(QueueItemFactory.fromPackage(messageInfo, packageMessage, true));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException();
            }
        }
    }

    private void enqueue(DistributionQueueItem distributionQueueItem) throws InterruptedException {
        while (this.running) {
            if (this.queueItemsBuffer.offer(distributionQueueItem, 1000L, TimeUnit.MILLISECONDS)) {
                this.distributionMetricsService.getItemsBufferSize().increment();
                return;
            }
        }
        throw new InterruptedException();
    }

    private void processQueue() {
        LOG.info("Started Queue processor");
        while (!Thread.interrupted()) {
            try {
                processQueueItems();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        LOG.info("Stopped Queue processor");
    }

    private void processQueueItems() throws InterruptedException {
        try {
            Timer.Context time = this.distributionMetricsService.getSendStoredStatusDuration().time();
            Throwable th = null;
            try {
                sendStoredStatus();
                if (time != null) {
                    if (0 != 0) {
                        try {
                            time.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        time.close();
                    }
                }
                DistributionQueueItem blockingPeekQueueItem = blockingPeekQueueItem();
                Timer.Context time2 = this.distributionMetricsService.getProcessQueueItemDuration().time();
                Throwable th3 = null;
                try {
                    try {
                        processQueueItem(blockingPeekQueueItem);
                        if (time2 != null) {
                            if (0 != 0) {
                                try {
                                    time2.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            } else {
                                time2.close();
                            }
                        }
                    } catch (Throwable th5) {
                        th3 = th5;
                        throw th5;
                    }
                } finally {
                }
            } finally {
            }
        } catch (InterruptedException e) {
            throw e;
        } catch (Throwable th6) {
            LOG.error("Error processing queue item", th6);
            Thread.sleep(RETRY_DELAY);
        }
    }

    private DistributionQueueItem blockingPeekQueueItem() throws InterruptedException {
        while (true) {
            DistributionQueueItem peek = this.queueItemsBuffer.peek();
            if (peek != null) {
                return peek;
            }
            Thread.sleep(QUEUE_FETCH_DELAY);
        }
    }

    /* JADX WARN: Removed duplicated region for block: B:11:0x0059  */
    /* JADX WARN: Removed duplicated region for block: B:15:0x0063  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void processQueueItem(org.apache.sling.distribution.queue.DistributionQueueItem r8) throws java.lang.Exception {
        /*
            r7 = this;
            r0 = r8
            java.lang.String r1 = "recordOffset"
            java.lang.Class<java.lang.Long> r2 = java.lang.Long.class
            java.lang.Object r0 = r0.get(r1, r2)
            java.lang.Long r0 = (java.lang.Long) r0
            long r0 = r0.longValue()
            r9 = r0
            r0 = r7
            r1 = r9
            boolean r0 = r0.isCleared(r1)     // Catch: java.lang.IllegalStateException -> L29
            if (r0 != 0) goto L1f
            r0 = r7
            r1 = r9
            boolean r0 = r0.cannotProcess(r1)     // Catch: java.lang.IllegalStateException -> L29
            if (r0 == 0) goto L23
        L1f:
            r0 = 1
            goto L24
        L23:
            r0 = 0
        L24:
            r11 = r0
            goto L40
        L29:
            r12 = move-exception
            org.slf4j.Logger r0 = org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriber.LOG
            r1 = r12
            java.lang.String r1 = r1.getMessage()
            r0.info(r1)
            int r0 = org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriber.RETRY_DELAY
            long r0 = (long) r0
            java.lang.Thread.sleep(r0)
            return
        L40:
            r0 = r8
            java.lang.String r1 = "packageMessage"
            java.lang.Class<org.apache.sling.distribution.journal.messages.Messages$PackageMessage> r2 = org.apache.sling.distribution.journal.messages.Messages.PackageMessage.class
            java.lang.Object r0 = r0.get(r1, r2)
            org.apache.sling.distribution.journal.messages.Messages$PackageMessage r0 = (org.apache.sling.distribution.journal.messages.Messages.PackageMessage) r0
            r12 = r0
            r0 = r12
            java.lang.String r0 = r0.getPubAgentName()
            r13 = r0
            r0 = r11
            if (r0 == 0) goto L63
            r0 = r7
            r1 = r12
            r2 = r9
            r0.removePackage(r1, r2)
            goto L7c
        L63:
            r0 = r8
            java.lang.String r1 = "recordTimestamp"
            java.lang.Class<java.lang.Long> r2 = java.lang.Long.class
            java.lang.Object r0 = r0.get(r1, r2)
            java.lang.Long r0 = (java.lang.Long) r0
            long r0 = r0.longValue()
            r14 = r0
            r0 = r7
            r1 = r12
            r2 = r9
            r3 = r14
            r0.importPackage(r1, r2, r3)
        L7c:
            r0 = r7
            r1 = r13
            r0.queueItemProcessed(r1)
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriber.processQueueItem(org.apache.sling.distribution.queue.DistributionQueueItem):void");
    }

    private void removePackage(Messages.PackageMessage packageMessage, long j) throws Exception {
        LOG.info(String.format("Removing distribution package %s of type %s at offset %s", packageMessage.getPkgId(), packageMessage.getReqType(), Long.valueOf(j)));
        Timer.Context time = this.distributionMetricsService.getRemovedPackageDuration().time();
        ResourceResolver serviceResolver = getServiceResolver("bookkeeper");
        Throwable th = null;
        try {
            try {
                if (this.editable) {
                    storeStatus(serviceResolver, Messages.PackageStatusMessage.Status.REMOVED, j, packageMessage.getPubAgentName());
                }
                storeOffset(serviceResolver, j);
                serviceResolver.commit();
                time.stop();
                if (serviceResolver != null) {
                    if (0 == 0) {
                        serviceResolver.close();
                        return;
                    }
                    try {
                        serviceResolver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (serviceResolver != null) {
                if (th != null) {
                    try {
                        serviceResolver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    serviceResolver.close();
                }
            }
            throw th4;
        }
    }

    private void removeFailedPackage(Messages.PackageMessage packageMessage, long j) throws Exception {
        LOG.info(String.format("Removing failed distribution package %s of type %s at offset %s", packageMessage.getPkgId(), packageMessage.getReqType(), Long.valueOf(j)));
        Timer.Context time = this.distributionMetricsService.getRemovedFailedPackageDuration().time();
        ResourceResolver serviceResolver = getServiceResolver("bookkeeper");
        Throwable th = null;
        try {
            try {
                storeStatus(serviceResolver, Messages.PackageStatusMessage.Status.REMOVED_FAILED, j, packageMessage.getPubAgentName());
                storeOffset(serviceResolver, j);
                serviceResolver.commit();
                time.stop();
                if (serviceResolver != null) {
                    if (0 == 0) {
                        serviceResolver.close();
                        return;
                    }
                    try {
                        serviceResolver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (serviceResolver != null) {
                if (th != null) {
                    try {
                        serviceResolver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    serviceResolver.close();
                }
            }
            throw th4;
        }
    }

    private void importPackage(Messages.PackageMessage packageMessage, long j, long j2) throws Exception {
        String pubAgentName = packageMessage.getPubAgentName();
        LOG.info(String.format("Importing distribution package %s of type %s at offset %s", packageMessage.getPkgId(), packageMessage.getReqType(), Long.valueOf(j)));
        addPackageMDC(packageMessage);
        Timer.Context time = this.distributionMetricsService.getImportedPackageDuration().time();
        try {
            ResourceResolver serviceResolver = getServiceResolver("importer");
            Throwable th = null;
            try {
                try {
                    installPackage(serviceResolver, packageMessage);
                    if (this.editable) {
                        storeStatus(serviceResolver, Messages.PackageStatusMessage.Status.IMPORTED, j, pubAgentName);
                    }
                    storeOffset(serviceResolver, j);
                    serviceResolver.commit();
                    time.stop();
                    this.distributionMetricsService.getImportedPackageSize().update(packageMessage.getPkgLength());
                    this.distributionMetricsService.getPackageDistributedDuration().update(System.currentTimeMillis() - j2, TimeUnit.MILLISECONDS);
                    this.eventAdmin.postEvent(DistributionEvent.eventImporterImported(packageMessage, this.subAgentName));
                    if (serviceResolver != null) {
                        if (0 != 0) {
                            try {
                                serviceResolver.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            serviceResolver.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (serviceResolver != null) {
                    if (th != null) {
                        try {
                            serviceResolver.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        serviceResolver.close();
                    }
                }
                throw th4;
            }
        } catch (Throwable th6) {
            this.distributionMetricsService.getFailedPackageImports().mark();
            if (th6 instanceof Error) {
                throw ((Error) th6);
            }
            int i = this.packageRetries.get(pubAgentName);
            if (this.errorQueueEnabled && i >= this.maxRetries) {
                LOG.warn(String.format("Failed to import distribution package %s at offset %s after %s retries, removing the package.", packageMessage.getPkgId(), Long.valueOf(j), Integer.valueOf(i)));
                removeFailedPackage(packageMessage, j);
                return;
            }
            this.packageRetries.increase(pubAgentName);
            Object[] objArr = new Object[3];
            objArr[0] = packageMessage.getPkgId();
            objArr[1] = Integer.valueOf(i);
            objArr[2] = this.errorQueueEnabled ? Integer.toString(this.maxRetries) : "infinite";
            throw new DistributionException(String.format("Error processing distribution package %s. Retry attempts %s/%s.", objArr), th6);
        } finally {
            MDC.clear();
        }
    }

    private void storeOffset(ResourceResolver resourceResolver, long j) throws PersistenceException {
        this.processedOffsets.store(resourceResolver, "offset", Long.valueOf(j));
    }

    private void queueItemProcessed(String str) {
        this.packageRetries.clear(str);
        this.queueItemsBuffer.remove();
        this.distributionMetricsService.getItemsBufferSize().decrement();
    }

    private void addPackageMDC(Messages.PackageMessage packageMessage) {
        MDC.put("module", "distribution");
        MDC.put("package-id", packageMessage.getPkgId());
        MDC.put("paths", (String) packageMessage.getPathsList().stream().collect(Collectors.joining(",")));
        MDC.put("pub-sling-id", packageMessage.getPubSlingId());
        String pubAgentName = packageMessage.getPubAgentName();
        MDC.put("pub-agent-name", pubAgentName);
        MDC.put("distribution-message-type", packageMessage.getReqType().name());
        MDC.put("retries", Integer.toString(this.packageRetries.get(pubAgentName)));
        MDC.put("sub-sling-id", this.subSlingId);
        MDC.put("sub-agent-name", this.subAgentName);
    }

    private void installPackage(ResourceResolver resourceResolver, Messages.PackageMessage packageMessage) throws DistributionException, PersistenceException {
        Messages.PackageMessage.ReqType reqType = packageMessage.getReqType();
        switch (AnonymousClass1.$SwitchMap$org$apache$sling$distribution$journal$messages$Messages$PackageMessage$ReqType[reqType.ordinal()]) {
            case 1:
                installAddPackage(resourceResolver, packageMessage);
                return;
            case JournalAvailableChecker.MIN_ERRORS /* 2 */:
                installDeletePackage(resourceResolver, packageMessage);
                return;
            case 3:
                return;
            default:
                throw new UnsupportedOperationException(String.format("Unable to process messages with type: %s", reqType));
        }
    }

    private void installAddPackage(ResourceResolver resourceResolver, Messages.PackageMessage packageMessage) throws DistributionException {
        LOG.info("Importing paths " + packageMessage.getPathsList());
        InputStream inputStream = null;
        try {
            inputStream = pkgStream(resourceResolver, packageMessage);
            this.packageBuilder.installPackage(resourceResolver, inputStream);
            this.extractor.handle(resourceResolver, packageMessage.getPathsList());
            IOUtils.closeQuietly(inputStream);
        } catch (Throwable th) {
            IOUtils.closeQuietly(inputStream);
            throw th;
        }
    }

    private void installDeletePackage(ResourceResolver resourceResolver, Messages.PackageMessage packageMessage) throws PersistenceException {
        LOG.info("Deleting paths " + packageMessage.getPathsList());
        Iterator it = packageMessage.getPathsList().iterator();
        while (it.hasNext()) {
            Resource resource = resourceResolver.getResource((String) it.next());
            if (resource != null) {
                resourceResolver.delete(resource);
            }
        }
    }

    private void storeStatus(ResourceResolver resourceResolver, Messages.PackageStatusMessage.Status status, long j, String str) throws PersistenceException {
        HashMap hashMap = new HashMap();
        hashMap.put("pubAgentName", str);
        hashMap.put("statusNumber", Integer.valueOf(status.getNumber()));
        hashMap.put("offset", Long.valueOf(j));
        hashMap.put("sent", false);
        this.processedStatuses.store(resourceResolver, hashMap);
        LOG.info("Stored status {}", hashMap);
    }

    private void sendStoredStatus() throws InterruptedException {
        ValueMap load = this.processedStatuses.load();
        boolean booleanValue = ((Boolean) load.get("sent", true)).booleanValue();
        int i = 0;
        while (!booleanValue) {
            try {
                sendStatusMessage(load);
                markStatusSent();
                booleanValue = true;
            } catch (Exception e) {
                LOG.warn("Cannot send status (retry {})", Integer.valueOf(i), e);
                Thread.sleep(1000L);
            }
            i++;
        }
    }

    private void markStatusSent() {
        try {
            ResourceResolver serviceResolver = getServiceResolver("bookkeeper");
            Throwable th = null;
            try {
                this.processedStatuses.store(serviceResolver, "sent", true);
                serviceResolver.commit();
                if (serviceResolver != null) {
                    if (0 != 0) {
                        try {
                            serviceResolver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        serviceResolver.close();
                    }
                }
            } finally {
            }
        } catch (Exception e) {
            LOG.warn("Failed to mark status as sent", e);
        }
    }

    private void sendStatusMessage(ValueMap valueMap) {
        this.sender.send(this.topics.getStatusTopic(), Messages.PackageStatusMessage.newBuilder().setSubSlingId(this.subSlingId).setSubAgentName(this.subAgentName).setPubAgentName((String) valueMap.get("pubAgentName", String.class)).setOffset(((Long) valueMap.get("offset", Long.class)).longValue()).setStatus(Messages.PackageStatusMessage.Status.valueOf(((Integer) valueMap.get("statusNumber", Integer.class)).intValue())).build());
        LOG.info("Sent status message {}", valueMap);
    }

    @Nonnull
    private InputStream pkgStream(ResourceResolver resourceResolver, Messages.PackageMessage packageMessage) throws DistributionException {
        if (packageMessage.hasPkgBinary()) {
            return new ByteArrayInputStream(packageMessage.getPkgBinary().toByteArray());
        }
        try {
            return ((Session) resourceResolver.adaptTo(Session.class)).getValueFactory().createValue(new SimpleReferenceBinary(packageMessage.getPkgBinaryRef())).getBinary().getStream();
        } catch (RepositoryException e) {
            throw new DistributionException(e.getMessage(), e);
        }
    }

    private void handleCommandMessage(MessageInfo messageInfo, Messages.CommandMessage commandMessage) {
        if (!this.subSlingId.equals(commandMessage.getSubSlingId()) || !this.subAgentName.equals(commandMessage.getSubAgentName())) {
            LOG.debug(String.format("Skip command for subSlingId %s", commandMessage.getSubSlingId()));
        } else if (commandMessage.hasClearCommand()) {
            handleClearCommand(commandMessage.getClearCommand().getOffset());
        } else {
            LOG.warn("Unsupported command {}", commandMessage);
        }
    }

    private boolean isCleared(long j) {
        return j <= this.clearOffset.longValue();
    }

    private boolean cannotProcess(long j) {
        return !this.precondition.canProcess(j, PRECONDITION_TIMEOUT);
    }

    private void handleClearCommand(long j) {
        if (this.editable) {
            this.clearOffset.accumulateAndGet(j, Math::max);
        } else {
            LOG.warn("Unexpected ClearCommand for non editable subscriber");
        }
    }

    private ResourceResolver getServiceResolver(String str) throws LoginException {
        return this.resolverFactory.getServiceResourceResolver(Collections.singletonMap("sling.service.subservice", str));
    }
}
