package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.wal.LogActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationZookeeperWrapper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.class */
public class ReplicationSourceManager implements LogActionsListener {
    private static final Log LOG = LogFactory.getLog(ReplicationSourceManager.class);
    private final AtomicBoolean replicating;
    private final ReplicationZookeeperWrapper zkHelper;
    private final AtomicBoolean stopper;
    private final Configuration conf;
    private final FileSystem fs;
    private Path latestPath;
    private final List<String> otherRegionServers;
    private final Path logDir;
    private final Path oldLogDir;
    private final List<ReplicationSourceInterface> sources = new ArrayList();
    private final SortedSet<String> hlogs = new TreeSet();
    private final List<ReplicationSourceInterface> oldsources = new ArrayList();

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager$OtherRegionServerWatcher.class */
    public class OtherRegionServerWatcher implements Watcher {
        public OtherRegionServerWatcher() {
        }

        public void process(WatchedEvent watchedEvent) {
            List<String> registeredRegionServers;
            ReplicationSourceManager.LOG.info(" event " + watchedEvent);
            if (watchedEvent.getType().equals(Watcher.Event.KeeperState.Expired) || watchedEvent.getType().equals(Watcher.Event.KeeperState.Disconnected) || (registeredRegionServers = ReplicationSourceManager.this.zkHelper.getRegisteredRegionServers(this)) == null) {
                return;
            }
            synchronized (ReplicationSourceManager.this.otherRegionServers) {
                ReplicationSourceManager.this.otherRegionServers.clear();
                ReplicationSourceManager.this.otherRegionServers.addAll(registeredRegionServers);
            }
            if (watchedEvent.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
                ReplicationSourceManager.LOG.info(watchedEvent.getPath() + " znode expired, trying to lock it");
                String[] split = watchedEvent.getPath().split("/");
                ReplicationSourceManager.this.transferQueues(split[split.length - 1]);
            }
        }
    }

    public ReplicationSourceManager(ReplicationZookeeperWrapper replicationZookeeperWrapper, Configuration configuration, AtomicBoolean atomicBoolean, FileSystem fileSystem, AtomicBoolean atomicBoolean2, Path path, Path path2) {
        this.replicating = atomicBoolean2;
        this.zkHelper = replicationZookeeperWrapper;
        this.stopper = atomicBoolean;
        this.conf = configuration;
        this.fs = fileSystem;
        this.logDir = path;
        this.oldLogDir = path2;
        List<String> registeredRegionServers = this.zkHelper.getRegisteredRegionServers(new OtherRegionServerWatcher());
        this.otherRegionServers = registeredRegionServers == null ? new ArrayList<>() : registeredRegionServers;
    }

    public void logPositionAndCleanOldLogs(Path path, String str, long j, boolean z) {
        String name = path.getName();
        LOG.info("Going to report log #" + name + " for position " + j + " in " + path);
        this.zkHelper.writeReplicationStatus(name.toString(), str, j);
        synchronized (this.hlogs) {
            if (!z) {
                if (this.hlogs.first() != name) {
                    SortedSet<String> headSet = this.hlogs.headSet(name);
                    LOG.info("Removing " + headSet.size() + " logs in the list: " + headSet);
                    Iterator<String> it = headSet.iterator();
                    while (it.hasNext()) {
                        this.zkHelper.removeLogFromList(it.next().toString(), str);
                    }
                    headSet.clear();
                }
            }
        }
    }

    public void init() throws IOException {
        Iterator<String> it = this.zkHelper.getPeerClusters().keySet().iterator();
        while (it.hasNext()) {
            addSource(it.next()).startup();
        }
        List<String> listOfReplicators = this.zkHelper.getListOfReplicators(null);
        synchronized (this.otherRegionServers) {
            LOG.info("Current list of replicators: " + listOfReplicators + " other RSs: " + this.otherRegionServers);
        }
        for (String str : listOfReplicators) {
            synchronized (this.otherRegionServers) {
                if (!this.otherRegionServers.contains(str)) {
                    transferQueues(str);
                }
            }
        }
    }

    public ReplicationSourceInterface addSource(String str) throws IOException {
        ReplicationSourceInterface replicationSource = getReplicationSource(this.conf, this.fs, this, this.stopper, this.replicating, str);
        this.sources.add(replicationSource);
        synchronized (this.hlogs) {
            if (this.hlogs.size() > 0) {
                this.zkHelper.addLogToList(this.hlogs.first(), this.sources.get(0).getPeerClusterZnode());
                replicationSource.enqueueLog(this.latestPath);
            }
        }
        return replicationSource;
    }

    public void join() {
        if (this.sources.size() == 0) {
            this.zkHelper.deleteOwnRSZNode();
        }
        Iterator<ReplicationSourceInterface> it = this.sources.iterator();
        while (it.hasNext()) {
            it.next().terminate();
        }
    }

    protected SortedSet<String> getHLogs() {
        return new TreeSet((SortedSet) this.hlogs);
    }

    public List<ReplicationSourceInterface> getSources() {
        return this.sources;
    }

    @Override // org.apache.hadoop.hbase.regionserver.wal.LogActionsListener
    public void logRolled(Path path) {
        if (this.sources.size() > 0) {
            this.zkHelper.addLogToList(path.getName(), this.sources.get(0).getPeerClusterZnode());
        }
        synchronized (this.hlogs) {
            this.hlogs.add(path.getName());
        }
        this.latestPath = path;
        Iterator<ReplicationSourceInterface> it = this.sources.iterator();
        while (it.hasNext()) {
            it.next().enqueueLog(path);
        }
    }

    public ReplicationZookeeperWrapper getRepZkWrapper() {
        return this.zkHelper;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v9, types: [org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface] */
    public ReplicationSourceInterface getReplicationSource(Configuration configuration, FileSystem fileSystem, ReplicationSourceManager replicationSourceManager, AtomicBoolean atomicBoolean, AtomicBoolean atomicBoolean2, String str) throws IOException {
        ReplicationSource replicationSource;
        try {
            replicationSource = (ReplicationSourceInterface) Class.forName(configuration.get("replication.replicationsource.implementation", ReplicationSource.class.getCanonicalName())).newInstance();
        } catch (Exception e) {
            LOG.warn("Passed replication source implemention throws errors, defaulting to ReplicationSource", e);
            replicationSource = new ReplicationSource();
        }
        replicationSource.init(configuration, fileSystem, replicationSourceManager, atomicBoolean, atomicBoolean2, str);
        return replicationSource;
    }

    public void transferQueues(String str) {
        if (this.stopper.get()) {
            LOG.info("Not transferring queue since we are shutting down");
            return;
        }
        if (this.zkHelper.lockOtherRS(str)) {
            LOG.info("Moving " + str + "'s hlogs to my queue");
            SortedMap<String, SortedSet<String>> copyQueuesFromRS = this.zkHelper.copyQueuesFromRS(str);
            this.zkHelper.deleteRsQueues(str);
            if (copyQueuesFromRS == null || copyQueuesFromRS.size() == 0) {
                return;
            }
            for (Map.Entry<String, SortedSet<String>> entry : copyQueuesFromRS.entrySet()) {
                try {
                    ReplicationSourceInterface replicationSource = getReplicationSource(this.conf, this.fs, this, this.stopper, this.replicating, entry.getKey());
                    this.oldsources.add(replicationSource);
                    Iterator<String> it = entry.getValue().iterator();
                    while (it.hasNext()) {
                        replicationSource.enqueueLog(new Path(this.oldLogDir, it.next()));
                    }
                    replicationSource.startup();
                } catch (IOException e) {
                    LOG.error("Failed creating a source", e);
                }
            }
        }
    }

    public void closeRecoveredQueue(ReplicationSourceInterface replicationSourceInterface) {
        LOG.info("Done with the recovered queue " + replicationSourceInterface.getPeerClusterZnode());
        this.oldsources.remove(replicationSourceInterface);
        this.zkHelper.deleteSource(replicationSourceInterface.getPeerClusterZnode());
    }

    public Path getOldLogDir() {
        return this.oldLogDir;
    }

    public Path getLogDir() {
        return this.logDir;
    }

    public FileSystem getFs() {
        return this.fs;
    }
}
