package org.apache.sling.distribution.journal.impl.subscriber;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.commons.io.IOUtils;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.queue.impl.PackageRetries;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.messages.Messages;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.class */
public class BookKeeper implements Closeable {
    private static final String KEY_OFFSET = "offset";
    private static final String SUBSERVICE_IMPORTER = "importer";
    private static final String SUBSERVICE_BOOKKEEPER = "bookkeeper";
    private static final int RETRY_SEND_DELAY = 1000;
    private static final int COMMIT_AFTER_NUM_SKIPPED = 10;
    private final ResourceResolverFactory resolverFactory;
    private final DistributionMetricsService distributionMetricsService;
    private final PackageHandler packageHandler;
    private final EventAdmin eventAdmin;
    private final Consumer<Messages.PackageStatusMessage> sender;
    private final boolean editable;
    private final int maxRetries;
    private final boolean errorQueueEnabled;
    private final LocalStore statusStore;
    private final LocalStore processedOffsets;
    private final String subAgentName;
    private final String subSlingId;
    private final DistributionMetricsService.GaugeService<Integer> retriesGauge;
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final PackageRetries packageRetries = new PackageRetries();
    private int skippedCounter = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/sling/distribution/journal/impl/subscriber/BookKeeper$PackageStatus.class */
    public static class PackageStatus {
        final Messages.PackageStatusMessage.Status status;
        final Long offset;
        final String pubAgentName;
        final Boolean sent;

        PackageStatus(Messages.PackageStatusMessage.Status status, long j, String str) {
            this.status = status;
            this.offset = Long.valueOf(j);
            this.pubAgentName = str;
            this.sent = false;
        }

        PackageStatus(ValueMap valueMap) {
            Integer num = (Integer) valueMap.get("statusNumber", Integer.class);
            this.status = num != null ? Messages.PackageStatusMessage.Status.valueOf(num.intValue()) : null;
            this.offset = (Long) valueMap.get(BookKeeper.KEY_OFFSET, Long.class);
            this.pubAgentName = (String) valueMap.get("pubAgentName", String.class);
            this.sent = (Boolean) valueMap.get("sent", true);
        }

        Map<String, Object> asMap() {
            HashMap hashMap = new HashMap();
            hashMap.put("pubAgentName", this.pubAgentName);
            hashMap.put("statusNumber", Integer.valueOf(this.status.getNumber()));
            hashMap.put(BookKeeper.KEY_OFFSET, this.offset);
            hashMap.put("sent", this.sent);
            return hashMap;
        }
    }

    public BookKeeper(ResourceResolverFactory resourceResolverFactory, DistributionMetricsService distributionMetricsService, PackageHandler packageHandler, EventAdmin eventAdmin, Consumer<Messages.PackageStatusMessage> consumer, String str, String str2, boolean z, int i) {
        this.packageHandler = packageHandler;
        this.eventAdmin = eventAdmin;
        PackageRetries packageRetries = this.packageRetries;
        packageRetries.getClass();
        this.retriesGauge = distributionMetricsService.createGauge("distribution.journal.subscriber.current_retries;sub_name=" + str, "Retries of current package", packageRetries::getSum);
        this.resolverFactory = resourceResolverFactory;
        this.distributionMetricsService = distributionMetricsService;
        this.sender = consumer;
        this.subAgentName = str;
        this.subSlingId = str2;
        this.editable = z;
        this.maxRetries = i;
        this.errorQueueEnabled = i >= 0;
        this.statusStore = new LocalStore(resourceResolverFactory, "statuses", str);
        this.processedOffsets = new LocalStore(resourceResolverFactory, "packages", str);
    }

    /* JADX WARN: Finally extract failed */
    public void importPackage(Messages.PackageMessage packageMessage, long j, long j2) throws DistributionException {
        this.log.info("Importing distribution package {} of type {} at offset {}", new Object[]{packageMessage.getPkgId(), packageMessage.getReqType(), Long.valueOf(j)});
        addPackageMDC(packageMessage);
        try {
            Timer.Context time = this.distributionMetricsService.getImportedPackageDuration().time();
            Throwable th = null;
            try {
                ResourceResolver serviceResolver = getServiceResolver(SUBSERVICE_IMPORTER);
                Throwable th2 = null;
                try {
                    this.packageHandler.apply(serviceResolver, packageMessage);
                    if (this.editable) {
                        storeStatus(serviceResolver, new PackageStatus(Messages.PackageStatusMessage.Status.IMPORTED, j, packageMessage.getPubAgentName()));
                    }
                    storeOffset(serviceResolver, j);
                    serviceResolver.commit();
                    this.distributionMetricsService.getImportedPackageSize().update(packageMessage.getPkgLength());
                    this.distributionMetricsService.getPackageDistributedDuration().update(System.currentTimeMillis() - j2, TimeUnit.MILLISECONDS);
                    this.packageRetries.clear(packageMessage.getPubAgentName());
                    this.eventAdmin.postEvent(DistributionEvent.eventImporterImported(packageMessage, this.subAgentName));
                    if (serviceResolver != null) {
                        if (0 != 0) {
                            try {
                                serviceResolver.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            serviceResolver.close();
                        }
                    }
                    if (time != null) {
                        if (0 != 0) {
                            try {
                                time.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            time.close();
                        }
                    }
                } catch (Throwable th5) {
                    if (serviceResolver != null) {
                        if (0 != 0) {
                            try {
                                serviceResolver.close();
                            } catch (Throwable th6) {
                                th2.addSuppressed(th6);
                            }
                        } else {
                            serviceResolver.close();
                        }
                    }
                    throw th5;
                }
            } catch (Throwable th7) {
                if (time != null) {
                    if (0 != 0) {
                        try {
                            time.close();
                        } catch (Throwable th8) {
                            th.addSuppressed(th8);
                        }
                    } else {
                        time.close();
                    }
                }
                throw th7;
            }
        } catch (DistributionException | LoginException | IOException | RuntimeException e) {
            failure(packageMessage, j, e);
        } finally {
            MDC.clear();
        }
    }

    private void addPackageMDC(Messages.PackageMessage packageMessage) {
        MDC.put("module", "distribution");
        MDC.put("package-id", packageMessage.getPkgId());
        MDC.put("paths", String.join(",", packageMessage.getPathsList()));
        MDC.put("pub-sling-id", packageMessage.getPubSlingId());
        String pubAgentName = packageMessage.getPubAgentName();
        MDC.put("pub-agent-name", pubAgentName);
        MDC.put("distribution-message-type", packageMessage.getReqType().name());
        MDC.put("retries", Integer.toString(this.packageRetries.get(pubAgentName)));
        MDC.put("sub-sling-id", this.subSlingId);
        MDC.put("sub-agent-name", this.subAgentName);
    }

    private void failure(Messages.PackageMessage packageMessage, long j, Exception exc) throws DistributionException {
        this.distributionMetricsService.getFailedPackageImports().mark();
        String pubAgentName = packageMessage.getPubAgentName();
        int i = this.packageRetries.get(pubAgentName);
        if (this.errorQueueEnabled && i >= this.maxRetries) {
            this.log.warn("Failed to import distribution package {} at offset {} after {} retries, removing the package.", new Object[]{packageMessage.getPkgId(), Long.valueOf(j), Integer.valueOf(i)});
            removeFailedPackage(packageMessage, j);
            return;
        }
        this.packageRetries.increase(pubAgentName);
        Object[] objArr = new Object[3];
        objArr[0] = packageMessage.getPkgId();
        objArr[1] = Integer.valueOf(i);
        objArr[2] = this.errorQueueEnabled ? Integer.toString(this.maxRetries) : "infinite";
        throw new DistributionException(String.format("Error processing distribution package %s. Retry attempts %s/%s.", objArr), exc);
    }

    public void removePackage(Messages.PackageMessage packageMessage, long j) throws LoginException, PersistenceException {
        this.log.info("Removing distribution package {} of type {} at offset {}", new Object[]{packageMessage.getPkgId(), packageMessage.getReqType(), Long.valueOf(j)});
        Timer.Context time = this.distributionMetricsService.getRemovedPackageDuration().time();
        ResourceResolver serviceResolver = getServiceResolver(SUBSERVICE_BOOKKEEPER);
        Throwable th = null;
        try {
            try {
                if (this.editable) {
                    storeStatus(serviceResolver, new PackageStatus(Messages.PackageStatusMessage.Status.REMOVED, j, packageMessage.getPubAgentName()));
                }
                storeOffset(serviceResolver, j);
                serviceResolver.commit();
                if (serviceResolver != null) {
                    if (0 != 0) {
                        try {
                            serviceResolver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        serviceResolver.close();
                    }
                }
                this.packageRetries.clear(packageMessage.getPubAgentName());
                time.stop();
            } finally {
            }
        } catch (Throwable th3) {
            if (serviceResolver != null) {
                if (th != null) {
                    try {
                        serviceResolver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    serviceResolver.close();
                }
            }
            throw th3;
        }
    }

    public void skipPackage(long j) throws LoginException, PersistenceException {
        this.log.info("Skipping package at offset {}", Long.valueOf(j));
        if (shouldCommitSkipped()) {
            ResourceResolver serviceResolver = getServiceResolver(SUBSERVICE_BOOKKEEPER);
            Throwable th = null;
            try {
                try {
                    storeOffset(serviceResolver, j);
                    serviceResolver.commit();
                    if (serviceResolver != null) {
                        if (0 == 0) {
                            serviceResolver.close();
                            return;
                        }
                        try {
                            serviceResolver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (serviceResolver != null) {
                    if (th != null) {
                        try {
                            serviceResolver.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        serviceResolver.close();
                    }
                }
                throw th4;
            }
        }
    }

    public synchronized boolean shouldCommitSkipped() {
        this.skippedCounter++;
        if (this.skippedCounter <= COMMIT_AFTER_NUM_SKIPPED) {
            return false;
        }
        this.skippedCounter = 1;
        return true;
    }

    public boolean sendStoredStatus(int i) {
        PackageStatus packageStatus = new PackageStatus(this.statusStore.load());
        return packageStatus.sent.booleanValue() || sendStoredStatus(packageStatus, i);
    }

    private boolean sendStoredStatus(PackageStatus packageStatus, int i) {
        try {
            sendStatusMessage(packageStatus);
            markStatusSent();
            return true;
        } catch (Exception e) {
            this.log.warn("Cannot send status (retry {})", Integer.valueOf(i), e);
            retryDelay();
            return false;
        }
    }

    private void sendStatusMessage(PackageStatus packageStatus) {
        Messages.PackageStatusMessage build = Messages.PackageStatusMessage.newBuilder().setSubSlingId(this.subSlingId).setSubAgentName(this.subAgentName).setPubAgentName(packageStatus.pubAgentName).setOffset(packageStatus.offset.longValue()).setStatus(packageStatus.status).build();
        this.sender.accept(build);
        this.log.info("Sent status message {}", build);
    }

    public void markStatusSent() {
        try {
            ResourceResolver serviceResolver = getServiceResolver(SUBSERVICE_BOOKKEEPER);
            Throwable th = null;
            try {
                this.statusStore.store(serviceResolver, "sent", true);
                serviceResolver.commit();
                if (serviceResolver != null) {
                    if (0 != 0) {
                        try {
                            serviceResolver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        serviceResolver.close();
                    }
                }
            } finally {
            }
        } catch (Exception e) {
            this.log.warn("Failed to mark status as sent", e);
        }
    }

    public long loadOffset() {
        return ((Long) this.processedOffsets.load(KEY_OFFSET, (String) (-1L))).longValue();
    }

    public int getRetries(String str) {
        return this.packageRetries.get(str);
    }

    public PackageRetries getPackageRetries() {
        return this.packageRetries;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        IOUtils.closeQuietly(this.retriesGauge);
    }

    private void removeFailedPackage(Messages.PackageMessage packageMessage, long j) throws DistributionException {
        this.log.info("Removing failed distribution package {} of type {} at offset {}", new Object[]{packageMessage.getPkgId(), packageMessage.getReqType(), Long.valueOf(j)});
        Timer.Context time = this.distributionMetricsService.getRemovedFailedPackageDuration().time();
        try {
            ResourceResolver serviceResolver = getServiceResolver(SUBSERVICE_BOOKKEEPER);
            Throwable th = null;
            try {
                try {
                    storeStatus(serviceResolver, new PackageStatus(Messages.PackageStatusMessage.Status.REMOVED_FAILED, j, packageMessage.getPubAgentName()));
                    storeOffset(serviceResolver, j);
                    serviceResolver.commit();
                    if (serviceResolver != null) {
                        if (0 != 0) {
                            try {
                                serviceResolver.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            serviceResolver.close();
                        }
                    }
                    time.stop();
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw new DistributionException("Error removing failed package", e);
        }
    }

    private void storeStatus(ResourceResolver resourceResolver, PackageStatus packageStatus) throws PersistenceException {
        Map<String, Object> asMap = packageStatus.asMap();
        this.statusStore.store(resourceResolver, asMap);
        this.log.info("Stored status {}", asMap);
    }

    private void storeOffset(ResourceResolver resourceResolver, long j) throws PersistenceException {
        this.processedOffsets.store(resourceResolver, KEY_OFFSET, Long.valueOf(j));
    }

    private ResourceResolver getServiceResolver(String str) throws LoginException {
        return this.resolverFactory.getServiceResourceResolver(Collections.singletonMap("sling.service.subservice", str));
    }

    static void retryDelay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
