View Javadoc

1   /*
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.replication;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.HashMap;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.SortedMap;
28  import java.util.SortedSet;
29  import java.util.TreeMap;
30  import java.util.TreeSet;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.Abortable;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.HServerAddress;
39  import org.apache.hadoop.hbase.Server;
40  import org.apache.hadoop.hbase.util.Bytes;
41  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
42  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
43  import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
44  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
45  import org.apache.hadoop.util.StringUtils;
46  import org.apache.zookeeper.KeeperException;
47  
48  /**
49   * This class serves as a helper for all things related to zookeeper
50   * in replication.
51   * <p/>
52   * The layout looks something like this under zookeeper.znode.parent
53   * for the master cluster:
54   * <p/>
55   * <pre>
56   * replication/
57   *  state      {contains true or false}
58   *  clusterId  {contains a byte}
59   *  peers/
60   *    1/   {contains a full cluster address}
61   *    2/
62   *    ...
63   *  rs/ {lists all RS that replicate}
64   *    startcode1/ {lists all peer clusters}
65   *      1/ {lists hlogs to process}
66   *        10.10.1.76%3A53488.123456789 {contains nothing or a position}
67   *        10.10.1.76%3A53488.123456790
68   *        ...
69   *      2/
70   *      ...
71   *    startcode2/
72   *    ...
73   * </pre>
74   */
75  public class ReplicationZookeeper {
76    private static final Log LOG =
77      LogFactory.getLog(ReplicationZookeeper.class);
78    // Name of znode we use to lock when failover
79    private final static String RS_LOCK_ZNODE = "lock";
80    // Our handle on zookeeper
81    private final ZooKeeperWatcher zookeeper;
82    // Map of peer clusters keyed by their id
83    private Map<String, ReplicationPeer> peerClusters;
84    // Path to the root replication znode
85    private String replicationZNode;
86    // Path to the peer clusters znode
87    private String peersZNode;
88    // Path to the znode that contains all RS that replicates
89    private String rsZNode;
90    // Path to this region server's name under rsZNode
91    private String rsServerNameZnode;
92    // Name node if the replicationState znode
93    private String replicationStateNodeName;
94    private final Configuration conf;
95    // Is this cluster replicating at the moment?
96    private AtomicBoolean replicating;
97    // Byte (stored as string here) that identifies this cluster
98    private String clusterId;
99    // The key to our own cluster
100   private String ourClusterKey;
101   // Abortable
102   private Abortable abortable;
103   private ReplicationStatusTracker statusTracker;
104 
105   /**
106    * Constructor used by clients of replication (like master and HBase clients)
107    * @param conf  conf to use
108    * @param zk    zk connection to use
109    * @throws IOException
110    */
111   public ReplicationZookeeper(final Abortable abortable, final Configuration conf,
112                               final ZooKeeperWatcher zk)
113     throws KeeperException {
114 
115     this.conf = conf;
116     this.zookeeper = zk;
117     this.replicating = new AtomicBoolean();
118     setZNodes(abortable);
119   }
120 
121   /**
122    * Constructor used by region servers, connects to the peer cluster right away.
123    *
124    * @param server
125    * @param replicating    atomic boolean to start/stop replication
126    * @throws IOException
127    * @throws KeeperException 
128    */
129   public ReplicationZookeeper(final Server server, final AtomicBoolean replicating)
130   throws IOException, KeeperException {
131     this.abortable = server;
132     this.zookeeper = server.getZooKeeper();
133     this.conf = server.getConfiguration();
134     this.replicating = replicating;
135     setZNodes(server);
136 
137     this.peerClusters = new HashMap<String, ReplicationPeer>();
138     ZKUtil.createWithParents(this.zookeeper,
139         ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
140     this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName());
141     ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode);
142     connectExistingPeers();
143   }
144 
145   private void setZNodes(Abortable abortable) throws KeeperException {
146     String replicationZNodeName =
147         conf.get("zookeeper.znode.replication", "replication");
148     String peersZNodeName =
149         conf.get("zookeeper.znode.replication.peers", "peers");
150     String repMasterZNodeName =
151         conf.get("zookeeper.znode.replication.master", "master");
152     this.replicationStateNodeName =
153         conf.get("zookeeper.znode.replication.state", "state");
154     String clusterIdZNodeName =
155         conf.get("zookeeper.znode.replication.clusterId", "clusterId");
156     String rsZNodeName =
157         conf.get("zookeeper.znode.replication.rs", "rs");
158     this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
159     this.replicationZNode =
160       ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
161     this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
162     ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
163     this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
164     ZKUtil.createWithParents(this.zookeeper, this.rsZNode);
165 
166     String znode = ZKUtil.joinZNode(this.replicationZNode, clusterIdZNodeName);
167     byte [] data = ZKUtil.getData(this.zookeeper, znode);
168     String idResult = Bytes.toString(data);
169     this.clusterId = idResult == null?
170       Byte.toString(HConstants.DEFAULT_CLUSTER_ID): idResult;
171     // Set a tracker on replicationStateNodeNode
172     this.statusTracker =
173         new ReplicationStatusTracker(this.zookeeper, abortable);
174     statusTracker.start();
175     readReplicationStateZnode();
176   }
177 
178   private void connectExistingPeers() throws IOException, KeeperException {
179     List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
180     if (znodes != null) {
181       for (String z : znodes) {
182         connectToPeer(z);
183       }
184     }
185   }
186 
187   /**
188    * List this cluster's peers' IDs
189    * @return list of all peers' identifiers
190    */
191   public List<String> listPeersIdsAndWatch() {
192     List<String> ids = null;
193     try {
194       ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
195     } catch (KeeperException e) {
196       this.abortable.abort("Cannot get the list of peers ", e);
197     }
198     return ids;
199   }
200 
201   /**
202    * Returns all region servers from given peer
203    *
204    * @param peerClusterId (byte) the cluster to interrogate
205    * @return addresses of all region servers
206    */
207   public List<HServerAddress> getSlavesAddresses(String peerClusterId)
208       throws KeeperException {
209     if (this.peerClusters.size() == 0) {
210       return new ArrayList<HServerAddress>(0);
211     }
212     ReplicationPeer peer = this.peerClusters.get(peerClusterId);
213     if (peer == null) {
214       return new ArrayList<HServerAddress>(0);
215     }
216     peer.setRegionServers(fetchSlavesAddresses(peer.getZkw()));
217     return peer.getRegionServers();
218   }
219 
220   /**
221    * Get the list of all the region servers from the specified peer
222    * @param zkw zk connection to use
223    * @return list of region server addresses or an empty list if the slave
224    * is unavailable
225    */
226   private List<HServerAddress> fetchSlavesAddresses(ZooKeeperWatcher zkw) {
227     try {
228       return ZKUtil.listChildrenAndGetAsAddresses(zkw, zkw.rsZNode);
229     } catch (KeeperException e) {
230       LOG.warn("Cannot get peer's region server addresses", e);
231       return new ArrayList<HServerAddress>(0);
232     }
233   }
234 
235   /**
236    * This method connects this cluster to another one and registers it
237    * in this region server's replication znode
238    * @param peerId id of the peer cluster
239    * @throws KeeperException 
240    */
241   public boolean connectToPeer(String peerId)
242       throws IOException, KeeperException {
243     if (peerClusters == null) {
244       return false;
245     }
246     if (this.peerClusters.containsKey(peerId)) {
247       return false;
248       // TODO remove when we support it
249     } else if (this.peerClusters.size() > 0) {
250       LOG.warn("Multiple slaves feature not supported");
251       return false;
252     }
253     ReplicationPeer peer = getPeer(peerId);
254     if (peer == null) {
255       return false;
256     }
257     this.peerClusters.put(peerId, peer);
258     ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode(
259         this.rsServerNameZnode, peerId));
260     LOG.info("Added new peer cluster " + peer.getClusterKey());
261     return true;
262   }
263 
264   /**
265    * Helper method to connect to a peer
266    * @param peerId peer's identifier
267    * @return object representing the peer
268    * @throws IOException
269    * @throws KeeperException
270    */
271   public ReplicationPeer getPeer(String peerId) throws IOException, KeeperException{
272     String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
273     byte [] data = ZKUtil.getData(this.zookeeper, znode);
274     String otherClusterKey = Bytes.toString(data);
275     if (this.ourClusterKey.equals(otherClusterKey)) {
276       LOG.debug("Not connecting to " + peerId + " because it's us");
277       return null;
278     }
279     // Construct the connection to the new peer
280     Configuration otherConf = new Configuration(this.conf);
281     try {
282       ZKUtil.applyClusterKeyToConf(otherConf, otherClusterKey);
283     } catch (IOException e) {
284       LOG.error("Can't get peer because:", e);
285       return null;
286     }
287 
288     ZooKeeperWatcher zkw = new ZooKeeperWatcher(otherConf,
289         "connection to cluster: " + peerId, this.abortable);
290     return new ReplicationPeer(otherConf, peerId,
291         otherClusterKey, zkw);
292   }
293 
294   /**
295    * Set the new replication state for this cluster
296    * @param newState
297    */
298   public void setReplicating(boolean newState) throws KeeperException {
299     ZKUtil.createWithParents(this.zookeeper,
300         ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
301     ZKUtil.setData(this.zookeeper,
302         ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName),
303         Bytes.toBytes(Boolean.toString(newState)));
304   }
305 
306   /**
307    * Remove the peer from zookeeper. which will trigger the watchers on every
308    * region server and close their sources
309    * @param id
310    * @throws IllegalArgumentException Thrown when the peer doesn't exist
311    */
312   public void removePeer(String id) throws IOException {
313     try {
314       if (!peerExists(id)) {
315         throw new IllegalArgumentException("Cannot remove inexisting peer");
316       }
317       ZKUtil.deleteNode(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
318     } catch (KeeperException e) {
319       throw new IOException("Unable to remove a peer", e);
320     }
321   }
322 
323   /**
324    * Add a new peer to this cluster
325    * @param id peer's identifier
326    * @param clusterKey ZK ensemble's addresses, client port and root znode
327    * @throws IllegalArgumentException Thrown when the peer doesn't exist
328    * @throws IllegalStateException Thrown when a peer already exists, since
329    *         multi-slave isn't supported yet.
330    */
331   public void addPeer(String id, String clusterKey) throws IOException {
332     try {
333       if (peerExists(id)) {
334         throw new IllegalArgumentException("Cannot add existing peer");
335       } else if (countPeers() > 0) {
336         throw new IllegalStateException("Multi-slave isn't supported yet");
337       }
338       ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
339       ZKUtil.createAndWatch(this.zookeeper,
340           ZKUtil.joinZNode(this.peersZNode, id), Bytes.toBytes(clusterKey));
341     } catch (KeeperException e) {
342       throw new IOException("Unable to add peer", e);
343     }
344   }
345 
346   private boolean peerExists(String id) throws KeeperException {
347     return ZKUtil.checkExists(this.zookeeper,
348           ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
349   }
350 
351   private int countPeers() throws KeeperException {
352     List<String> peers =
353         ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
354     return peers == null ? 0 : peers.size();
355   }
356 
357   /**
358    * This reads the state znode for replication and sets the atomic boolean
359    */
360   private void readReplicationStateZnode() {
361     try {
362       this.replicating.set(getReplication());
363       LOG.info("Replication is now " + (this.replicating.get()?
364         "started" : "stopped"));
365     } catch (KeeperException e) {
366       this.abortable.abort("Failed getting data on from " + getRepStateNode(), e);
367     }
368   }
369 
370   /**
371    * Get the replication status of this cluster. If the state znode doesn't
372    * exist it will also create it and set it true.
373    * @return returns true when it's enabled, else false
374    * @throws KeeperException
375    */
376   public boolean getReplication() throws KeeperException {
377     byte [] data = this.statusTracker.getData();
378     if (data == null || data.length == 0) {
379       setReplicating(true);
380       return true;
381     }
382     return Boolean.parseBoolean(Bytes.toString(data));
383   }
384 
385   private String getRepStateNode() {
386     return ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName);
387   }
388 
389   /**
390    * Add a new log to the list of hlogs in zookeeper
391    * @param filename name of the hlog's znode
392    * @param clusterId name of the cluster's znode
393    */
394   public void addLogToList(String filename, String clusterId) {
395     try {
396       String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId);
397       znode = ZKUtil.joinZNode(znode, filename);
398       ZKUtil.createWithParents(this.zookeeper, znode);
399     } catch (KeeperException e) {
400       this.abortable.abort("Failed add log to list", e);
401     }
402   }
403 
404   /**
405    * Remove a log from the list of hlogs in zookeeper
406    * @param filename name of the hlog's znode
407    * @param clusterId name of the cluster's znode
408    */
409   public void removeLogFromList(String filename, String clusterId) {
410     try {
411       String znode = ZKUtil.joinZNode(rsServerNameZnode, clusterId);
412       znode = ZKUtil.joinZNode(znode, filename);
413       ZKUtil.deleteNode(this.zookeeper, znode);
414     } catch (KeeperException e) {
415       this.abortable.abort("Failed remove from list", e);
416     }
417   }
418 
419   /**
420    * Set the current position of the specified cluster in the current hlog
421    * @param filename filename name of the hlog's znode
422    * @param clusterId clusterId name of the cluster's znode
423    * @param position the position in the file
424    * @throws IOException
425    */
426   public void writeReplicationStatus(String filename, String clusterId,
427       long position) {
428     try {
429       String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId);
430       znode = ZKUtil.joinZNode(znode, filename);
431       // Why serialize String of Long and note Long as bytes?
432       ZKUtil.setData(this.zookeeper, znode,
433         Bytes.toBytes(Long.toString(position)));
434     } catch (KeeperException e) {
435       this.abortable.abort("Writing replication status", e);
436     }
437   }
438 
439   /**
440    * Get a list of all the other region servers in this cluster
441    * and set a watch
442    * @return a list of server nanes
443    */
444   public List<String> getRegisteredRegionServers() {
445     List<String> result = null;
446     try {
447       result = ZKUtil.listChildrenAndWatchThem(
448           this.zookeeper, this.zookeeper.rsZNode);
449     } catch (KeeperException e) {
450       this.abortable.abort("Get list of registered region servers", e);
451     }
452     return result;
453   }
454 
455   /**
456    * Get the list of the replicators that have queues, they can be alive, dead
457    * or simply from a previous run
458    * @return a list of server names
459    */
460   public List<String> getListOfReplicators() {
461     List<String> result = null;
462     try {
463       result = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZNode);
464     } catch (KeeperException e) {
465       this.abortable.abort("Get list of replicators", e);
466     }
467     return result;
468   }
469 
470   /**
471    * Get the list of peer clusters for the specified server names
472    * @param rs server names of the rs
473    * @return a list of peer cluster
474    */
475   public List<String> getListPeersForRS(String rs) {
476     String znode = ZKUtil.joinZNode(rsZNode, rs);
477     List<String> result = null;
478     try {
479       result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
480     } catch (KeeperException e) {
481       this.abortable.abort("Get list of peers for rs", e);
482     }
483     return result;
484   }
485 
486   /**
487    * Get the list of hlogs for the specified region server and peer cluster
488    * @param rs server names of the rs
489    * @param id peer cluster
490    * @return a list of hlogs
491    */
492   public List<String> getListHLogsForPeerForRS(String rs, String id) {
493     String znode = ZKUtil.joinZNode(rsZNode, rs);
494     znode = ZKUtil.joinZNode(znode, id);
495     List<String> result = null;
496     try {
497       result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
498     } catch (KeeperException e) {
499       this.abortable.abort("Get list of hlogs for peer", e);
500     }
501     return result;
502   }
503 
504   /**
505    * Try to set a lock in another server's znode.
506    * @param znode the server names of the other server
507    * @return true if the lock was acquired, false in every other cases
508    */
509   public boolean lockOtherRS(String znode) {
510     try {
511       String parent = ZKUtil.joinZNode(this.rsZNode, znode);
512       if (parent.equals(rsServerNameZnode)) {
513         LOG.warn("Won't lock because this is us, we're dead!");
514         return false;
515       }
516       String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
517       ZKUtil.createAndWatch(this.zookeeper, p, Bytes.toBytes(rsServerNameZnode));
518     } catch (KeeperException e) {
519       // This exception will pop up if the znode under which we're trying to
520       // create the lock is already deleted by another region server, meaning
521       // that the transfer already occurred.
522       // NoNode => transfer is done and znodes are already deleted
523       // NodeExists => lock znode already created by another RS
524       if (e instanceof KeeperException.NoNodeException ||
525           e instanceof KeeperException.NodeExistsException) {
526         LOG.info("Won't transfer the queue," +
527             " another RS took care of it because of: " + e.getMessage());
528       } else {
529         LOG.info("Failed lock other rs", e);
530       }
531       return false;
532     }
533     return true;
534   }
535 
536   /**
537    * This methods copies all the hlogs queues from another region server
538    * and returns them all sorted per peer cluster (appended with the dead
539    * server's znode)
540    * @param znode server names to copy
541    * @return all hlogs for all peers of that cluster, null if an error occurred
542    */
543   public SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
544     // TODO this method isn't atomic enough, we could start copying and then
545     // TODO fail for some reason and we would end up with znodes we don't want.
546     SortedMap<String,SortedSet<String>> queues =
547         new TreeMap<String,SortedSet<String>>();
548     try {
549       String nodePath = ZKUtil.joinZNode(rsZNode, znode);
550       List<String> clusters =
551         ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath);
552       // We have a lock znode in there, it will count as one.
553       if (clusters == null || clusters.size() <= 1) {
554         return queues;
555       }
556       // The lock isn't a peer cluster, remove it
557       clusters.remove(RS_LOCK_ZNODE);
558       for (String cluster : clusters) {
559         // We add the name of the recovered RS to the new znode, we can even
560         // do that for queues that were recovered 10 times giving a znode like
561         // number-startcode-number-otherstartcode-number-anotherstartcode-etc
562         String newCluster = cluster+"-"+znode;
563         String newClusterZnode = ZKUtil.joinZNode(rsServerNameZnode, newCluster);
564         ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
565           HConstants.EMPTY_BYTE_ARRAY);
566         String clusterPath = ZKUtil.joinZNode(nodePath, cluster);
567         List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath);
568         // That region server didn't have anything to replicate for this cluster
569         if (hlogs == null || hlogs.size() == 0) {
570           continue;
571         }
572         SortedSet<String> logQueue = new TreeSet<String>();
573         queues.put(newCluster, logQueue);
574         for (String hlog : hlogs) {
575           String z = ZKUtil.joinZNode(clusterPath, hlog);
576           byte [] position = ZKUtil.getData(this.zookeeper, z);
577           LOG.debug("Creating " + hlog + " with data " + Bytes.toString(position));
578           String child = ZKUtil.joinZNode(newClusterZnode, hlog);
579           ZKUtil.createAndWatch(this.zookeeper, child, position);
580           logQueue.add(hlog);
581         }
582       }
583     } catch (KeeperException e) {
584       this.abortable.abort("Copy queues from rs", e);
585     }
586     return queues;
587   }
588 
589   /**
590    * Delete a complete queue of hlogs
591    * @param peerZnode znode of the peer cluster queue of hlogs to delete
592    */
593   public void deleteSource(String peerZnode, boolean closeConnection) {
594     try {
595       ZKUtil.deleteNodeRecursively(this.zookeeper,
596           ZKUtil.joinZNode(rsServerNameZnode, peerZnode));
597       if (closeConnection) {
598         this.peerClusters.get(peerZnode).getZkw().close();
599         this.peerClusters.remove(peerZnode);
600       }
601     } catch (KeeperException e) {
602       this.abortable.abort("Failed delete of " + peerZnode, e);
603     }
604   }
605 
606   /**
607    * Recursive deletion of all znodes in specified rs' znode
608    * @param znode
609    */
610   public void deleteRsQueues(String znode) {
611     String fullpath = ZKUtil.joinZNode(rsZNode, znode);
612     try {
613       List<String> clusters =
614         ZKUtil.listChildrenNoWatch(this.zookeeper, fullpath);
615       for (String cluster : clusters) {
616         // We'll delete it later
617         if (cluster.equals(RS_LOCK_ZNODE)) {
618           continue;
619         }
620         String fullClusterPath = ZKUtil.joinZNode(fullpath, cluster);
621         ZKUtil.deleteNodeRecursively(this.zookeeper, fullClusterPath);
622       }
623       // Finish cleaning up
624       ZKUtil.deleteNodeRecursively(this.zookeeper, fullpath);
625     } catch (KeeperException e) {
626       if (e instanceof KeeperException.NoNodeException ||
627           e instanceof KeeperException.NotEmptyException) {
628         // Testing a special case where another region server was able to
629         // create a lock just after we deleted it, but then was also able to
630         // delete the RS znode before us or its lock znode is still there.
631         if (e.getPath().equals(fullpath)) {
632           return;
633         }
634       }
635       this.abortable.abort("Failed delete of " + znode, e);
636     }
637   }
638 
639   /**
640    * Delete this cluster's queues
641    */
642   public void deleteOwnRSZNode() {
643     try {
644       ZKUtil.deleteNodeRecursively(this.zookeeper,
645           this.rsServerNameZnode);
646     } catch (KeeperException e) {
647       // if the znode is already expired, don't bother going further
648       if (e instanceof KeeperException.SessionExpiredException) {
649         return;
650       }
651       this.abortable.abort("Failed delete of " + this.rsServerNameZnode, e);
652     }
653   }
654 
655   /**
656    * Get the position of the specified hlog in the specified peer znode
657    * @param peerId znode of the peer cluster
658    * @param hlog name of the hlog
659    * @return the position in that hlog
660    * @throws KeeperException 
661    */
662   public long getHLogRepPosition(String peerId, String hlog)
663   throws KeeperException {
664     String clusterZnode = ZKUtil.joinZNode(rsServerNameZnode, peerId);
665     String znode = ZKUtil.joinZNode(clusterZnode, hlog);
666     String data = Bytes.toString(ZKUtil.getData(this.zookeeper, znode));
667     return data == null || data.length() == 0 ? 0 : Long.parseLong(data);
668   }
669 
670   public void registerRegionServerListener(ZooKeeperListener listener) {
671     this.zookeeper.registerListener(listener);
672   }
673 
674   /**
675    * Get the identification of the cluster
676    *
677    * @return the id for the cluster
678    */
679   public String getClusterId() {
680     return this.clusterId;
681   }
682 
683   /**
684    * Get a map of all peer clusters
685    * @return map of peer cluster keyed by id
686    */
687   public Map<String, ReplicationPeer> getPeerClusters() {
688     return this.peerClusters;
689   }
690 
691   /**
692    * Extracts the znode name of a peer cluster from a ZK path
693    * @param fullPath Path to extract the id from
694    * @return the id or an empty string if path is invalid
695    */
696   public static String getZNodeName(String fullPath) {
697     String[] parts = fullPath.split("/");
698     return parts.length > 0 ? parts[parts.length-1] : "";
699   }
700 
701   /**
702    * Get this cluster's zk connection
703    * @return zk connection
704    */
705   public ZooKeeperWatcher getZookeeperWatcher() {
706     return this.zookeeper;
707   }
708 
709 
710   /**
711    * Get the full path to the peers' znode
712    * @return path to peers in zk
713    */
714   public String getPeersZNode() {
715     return peersZNode;
716   }
717 
718   /**
719    * Tracker for status of the replication
720    */
721   public class ReplicationStatusTracker extends ZooKeeperNodeTracker {
722     public ReplicationStatusTracker(ZooKeeperWatcher watcher,
723         Abortable abortable) {
724       super(watcher, getRepStateNode(), abortable);
725     }
726 
727     @Override
728     public synchronized void nodeDataChanged(String path) {
729       if (path.equals(node)) {
730         super.nodeDataChanged(path);
731         readReplicationStateZnode();
732       }
733     }
734   }
735 }