package org.apache.sling.distribution.journal.impl.publisher;

import java.util.Arrays;
import java.util.Collections;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import javax.management.NotCompliantMBeanException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.DistributionRequestState;
import org.apache.sling.distribution.DistributionRequestType;
import org.apache.sling.distribution.DistributionResponse;
import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.JournalAvailable;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
import org.apache.sling.distribution.journal.impl.shared.AgentState;
import org.apache.sling.distribution.journal.impl.shared.DefaultDistributionLog;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages;
import org.apache.sling.distribution.log.spi.DistributionLog;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.metatype.annotations.Designate;

@ParametersAreNonnullByDefault
@Designate(ocd = PublisherConfiguration.class, factory = true)
@Component(service = {}, immediate = true, configurationPid = {"org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory"})
/* loaded from: input_file:org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.class */
public class DistributionPublisher implements DistributionAgent {

    @Reference
    private MessagingProvider messagingProvider;

    @Reference(name = "packageBuilder")
    private DistributionPackageBuilder packageBuilder;

    @Reference
    private PackageQueuedNotifier queuedNotifier;

    @Reference
    private PubQueueProvider pubQueueProvider;

    @Reference
    private DiscoveryService discoveryService;

    @Reference
    private PackageMessageFactory factory;

    @Reference
    private EventAdmin eventAdmin;

    @Reference
    private Topics topics;

    @Reference
    JournalAvailable journalAvailable;

    @Reference
    private DistributionMetricsService distributionMetricsService;
    private String pkgType;
    private long queuedTimeout;
    private ServiceRegistration<DistributionAgent> componentReg;
    private MessageSender<Messages.PackageMessage> sender;
    private JMXRegistration reg;
    private DistributionMetricsService.GaugeService<Integer> subscriberCountGauge;
    private final Map<DistributionRequestType, Consumer<Messages.PackageMessage>> REQ_TYPES = new HashMap();
    private String pubAgentName;
    private final DefaultDistributionLog log = new DefaultDistributionLog(this.pubAgentName, getClass(), DefaultDistributionLog.LogLevel.INFO);

    public DistributionPublisher() {
        this.REQ_TYPES.put(DistributionRequestType.ADD, this::sendAndWait);
        this.REQ_TYPES.put(DistributionRequestType.DELETE, this::sendAndWait);
        this.REQ_TYPES.put(DistributionRequestType.TEST, this::send);
    }

    @Activate
    public void activate(PublisherConfiguration publisherConfiguration, BundleContext bundleContext) {
        Objects.requireNonNull(this.factory);
        Objects.requireNonNull(this.distributionMetricsService);
        this.pubAgentName = (String) Objects.requireNonNull(publisherConfiguration.name());
        this.queuedTimeout = publisherConfiguration.queuedTimeout();
        this.pkgType = this.packageBuilder.getType();
        this.sender = this.messagingProvider.createSender();
        this.componentReg = (ServiceRegistration) Objects.requireNonNull(bundleContext.registerService(DistributionAgent.class, this, createServiceProps(publisherConfiguration)));
        try {
            this.reg = new JMXRegistration(new DistPublisherJMX(this.pubAgentName, this.discoveryService, this), "agent", this.pubAgentName);
            String format = String.format("Started Publisher agent %s with packageBuilder %s, queuedTimeout %s", this.pubAgentName, this.pkgType, Long.valueOf(this.queuedTimeout));
            this.subscriberCountGauge = this.distributionMetricsService.createGauge("distribution.journal.publisher.subscriber_count;pub_name=" + this.pubAgentName, "Current number of publish subscribers", () -> {
                return Integer.valueOf(this.discoveryService.getTopologyView().getSubscribedAgentIds().size());
            });
            this.log.info(format, new Object[0]);
        } catch (NotCompliantMBeanException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @Deactivate
    public void deactivate() {
        this.reg.close();
        this.componentReg.unregister();
        String format = String.format("Stopped Publisher agent %s with packageBuilder %s, queuedTimeout %s", this.pubAgentName, this.pkgType, Long.valueOf(this.queuedTimeout));
        IOUtils.closeQuietly(this.subscriberCountGauge);
        this.log.info(format, new Object[0]);
    }

    private Dictionary<String, Object> createServiceProps(PublisherConfiguration publisherConfiguration) {
        Hashtable hashtable = new Hashtable();
        hashtable.put("name", publisherConfiguration.name());
        hashtable.put("title", publisherConfiguration.name());
        hashtable.put("details", publisherConfiguration.name());
        hashtable.put("packageBuilder.target", publisherConfiguration.packageBuilder_target());
        hashtable.put("webconsole.configurationFactory.nameHint", publisherConfiguration.webconsole_configurationFactory_nameHint());
        return hashtable;
    }

    @Nonnull
    public Iterable<String> getQueueNames() {
        HashSet hashSet = new HashSet();
        TopologyView topologyView = this.discoveryService.getTopologyView();
        for (String str : topologyView.getSubscribedAgentIds(this.pubAgentName)) {
            hashSet.add(str);
            State state = topologyView.getState(str, this.pubAgentName);
            if (state != null) {
                if (state.getMaxRetries() >= 0) {
                    hashSet.add(String.format("%s-error", str));
                }
            }
        }
        return Collections.unmodifiableCollection(hashSet);
    }

    public DistributionQueue getQueue(String str) {
        Stream stream = StreamSupport.stream(getQueueNames().spliterator(), true);
        str.getClass();
        if (stream.noneMatch((v1) -> {
            return r1.equals(v1);
        })) {
            return null;
        }
        return str.endsWith("-error") ? getErrorQueue(str) : getPubQueue(str);
    }

    @Nonnull
    private DistributionQueue getErrorQueue(String str) {
        AgentId agentId = new AgentId(StringUtils.substringBeforeLast(str, "-error"));
        return this.pubQueueProvider.getErrorQueue(this.pubAgentName, agentId.getSlingId(), agentId.getAgentName(), str);
    }

    @CheckForNull
    private DistributionQueue getPubQueue(String str) {
        TopologyView topologyView = this.discoveryService.getTopologyView();
        AgentId agentId = new AgentId(str);
        State state = topologyView.getState(agentId.getAgentId(), this.pubAgentName);
        if (state != null) {
            return this.pubQueueProvider.getQueue(this.pubAgentName, agentId.getSlingId(), agentId.getAgentName(), str, state.getOffset() + 1, state.getRetries(), state.isEditable());
        }
        return null;
    }

    @Nonnull
    public DistributionLog getLog() {
        return this.log;
    }

    @Nonnull
    public DistributionAgentState getState() {
        return AgentState.getState(this);
    }

    @Nonnull
    public DistributionResponse execute(ResourceResolver resourceResolver, DistributionRequest distributionRequest) throws DistributionException {
        Consumer<Messages.PackageMessage> consumer = this.REQ_TYPES.get(distributionRequest.getRequestType());
        return consumer != null ? execute(resourceResolver, distributionRequest, consumer) : executeUnsupported(distributionRequest);
    }

    private DistributionResponse execute(ResourceResolver resourceResolver, DistributionRequest distributionRequest, Consumer<Messages.PackageMessage> consumer) throws DistributionException {
        try {
            Messages.PackageMessage packageMessage = (Messages.PackageMessage) DistributionMetricsService.timed(this.distributionMetricsService.getBuildPackageDuration(), () -> {
                return this.factory.create(this.packageBuilder, resourceResolver, this.pubAgentName, distributionRequest);
            });
            DistributionMetricsService.timed(this.distributionMetricsService.getEnqueuePackageDuration(), () -> {
                consumer.accept(packageMessage);
            });
            this.distributionMetricsService.getExportedPackageSize().update(packageMessage.getPkgLength());
            this.distributionMetricsService.getAcceptedRequests().mark();
            String format = String.format("Distribution request accepted with type %s paths %s ", distributionRequest.getRequestType(), Arrays.toString(distributionRequest.getPaths()));
            this.log.info(format, new Object[0]);
            return new SimpleDistributionResponse(DistributionRequestState.ACCEPTED, format);
        } catch (Throwable th) {
            this.distributionMetricsService.getDroppedRequests().mark();
            String format2 = String.format("Failed to queue distribution request %s", th.getMessage());
            this.log.error(format2, th);
            if (th instanceof Error) {
                throw ((Error) th);
            }
            throw new DistributionException(format2, th);
        }
    }

    private void sendAndWait(Messages.PackageMessage packageMessage) {
        try {
            CompletableFuture<Void> registerWait = this.queuedNotifier.registerWait(packageMessage.getPkgId());
            this.eventAdmin.postEvent(DistributionEvent.eventPackageCreated(packageMessage, this.pubAgentName));
            send(packageMessage);
            registerWait.get(this.queuedTimeout, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            this.queuedNotifier.unRegisterWait(packageMessage.getPkgId());
            throw new RuntimeException(e);
        }
    }

    private void send(Messages.PackageMessage packageMessage) {
        this.sender.send(this.topics.getPackageTopic(), packageMessage);
    }

    @Nonnull
    private DistributionResponse executeUnsupported(DistributionRequest distributionRequest) {
        String format = String.format("Request type %s is not supported by this agent, expected one of %s", distributionRequest.getRequestType(), this.REQ_TYPES.keySet());
        this.log.info(format, new Object[0]);
        return new SimpleDistributionResponse(DistributionRequestState.DROPPED, format);
    }
}
