1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.replication;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.SortedMap;
28 import java.util.SortedSet;
29 import java.util.TreeMap;
30 import java.util.TreeSet;
31 import java.util.concurrent.atomic.AtomicBoolean;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.hbase.Abortable;
37 import org.apache.hadoop.hbase.HConstants;
38 import org.apache.hadoop.hbase.HServerAddress;
39 import org.apache.hadoop.hbase.Server;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
42 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
43 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
44 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
45 import org.apache.hadoop.util.StringUtils;
46 import org.apache.zookeeper.KeeperException;
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 public class ReplicationZookeeper {
76 private static final Log LOG =
77 LogFactory.getLog(ReplicationZookeeper.class);
78
79 private final static String RS_LOCK_ZNODE = "lock";
80
81 private final ZooKeeperWatcher zookeeper;
82
83 private Map<String, ReplicationPeer> peerClusters;
84
85 private String replicationZNode;
86
87 private String peersZNode;
88
89 private String rsZNode;
90
91 private String rsServerNameZnode;
92
93 private String replicationStateNodeName;
94 private final Configuration conf;
95
96 private AtomicBoolean replicating;
97
98 private String clusterId;
99
100 private String ourClusterKey;
101
102 private Abortable abortable;
103 private ReplicationStatusTracker statusTracker;
104
105
106
107
108
109
110
111 public ReplicationZookeeper(final Abortable abortable, final Configuration conf,
112 final ZooKeeperWatcher zk)
113 throws KeeperException {
114
115 this.conf = conf;
116 this.zookeeper = zk;
117 this.replicating = new AtomicBoolean();
118 setZNodes(abortable);
119 }
120
121
122
123
124
125
126
127
128
129 public ReplicationZookeeper(final Server server, final AtomicBoolean replicating)
130 throws IOException, KeeperException {
131 this.abortable = server;
132 this.zookeeper = server.getZooKeeper();
133 this.conf = server.getConfiguration();
134 this.replicating = replicating;
135 setZNodes(server);
136
137 this.peerClusters = new HashMap<String, ReplicationPeer>();
138 ZKUtil.createWithParents(this.zookeeper,
139 ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
140 this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName());
141 ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode);
142 connectExistingPeers();
143 }
144
145 private void setZNodes(Abortable abortable) throws KeeperException {
146 String replicationZNodeName =
147 conf.get("zookeeper.znode.replication", "replication");
148 String peersZNodeName =
149 conf.get("zookeeper.znode.replication.peers", "peers");
150 String repMasterZNodeName =
151 conf.get("zookeeper.znode.replication.master", "master");
152 this.replicationStateNodeName =
153 conf.get("zookeeper.znode.replication.state", "state");
154 String clusterIdZNodeName =
155 conf.get("zookeeper.znode.replication.clusterId", "clusterId");
156 String rsZNodeName =
157 conf.get("zookeeper.znode.replication.rs", "rs");
158 this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
159 this.replicationZNode =
160 ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
161 this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
162 ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
163 this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
164 ZKUtil.createWithParents(this.zookeeper, this.rsZNode);
165
166 String znode = ZKUtil.joinZNode(this.replicationZNode, clusterIdZNodeName);
167 byte [] data = ZKUtil.getData(this.zookeeper, znode);
168 String idResult = Bytes.toString(data);
169 this.clusterId = idResult == null?
170 Byte.toString(HConstants.DEFAULT_CLUSTER_ID): idResult;
171
172 this.statusTracker =
173 new ReplicationStatusTracker(this.zookeeper, abortable);
174 statusTracker.start();
175 readReplicationStateZnode();
176 }
177
178 private void connectExistingPeers() throws IOException, KeeperException {
179 List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
180 if (znodes != null) {
181 for (String z : znodes) {
182 connectToPeer(z);
183 }
184 }
185 }
186
187
188
189
190
191 public List<String> listPeersIdsAndWatch() {
192 List<String> ids = null;
193 try {
194 ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
195 } catch (KeeperException e) {
196 this.abortable.abort("Cannot get the list of peers ", e);
197 }
198 return ids;
199 }
200
201
202
203
204
205
206
207 public List<HServerAddress> getSlavesAddresses(String peerClusterId)
208 throws KeeperException {
209 if (this.peerClusters.size() == 0) {
210 return new ArrayList<HServerAddress>(0);
211 }
212 ReplicationPeer peer = this.peerClusters.get(peerClusterId);
213 if (peer == null) {
214 return new ArrayList<HServerAddress>(0);
215 }
216 peer.setRegionServers(fetchSlavesAddresses(peer.getZkw()));
217 return peer.getRegionServers();
218 }
219
220
221
222
223
224
225
226 private List<HServerAddress> fetchSlavesAddresses(ZooKeeperWatcher zkw) {
227 try {
228 return ZKUtil.listChildrenAndGetAsAddresses(zkw, zkw.rsZNode);
229 } catch (KeeperException e) {
230 LOG.warn("Cannot get peer's region server addresses", e);
231 return new ArrayList<HServerAddress>(0);
232 }
233 }
234
235
236
237
238
239
240
241 public boolean connectToPeer(String peerId)
242 throws IOException, KeeperException {
243 if (peerClusters == null) {
244 return false;
245 }
246 if (this.peerClusters.containsKey(peerId)) {
247 return false;
248
249 } else if (this.peerClusters.size() > 0) {
250 LOG.warn("Multiple slaves feature not supported");
251 return false;
252 }
253 ReplicationPeer peer = getPeer(peerId);
254 if (peer == null) {
255 return false;
256 }
257 this.peerClusters.put(peerId, peer);
258 ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode(
259 this.rsServerNameZnode, peerId));
260 LOG.info("Added new peer cluster " + peer.getClusterKey());
261 return true;
262 }
263
264
265
266
267
268
269
270
271 public ReplicationPeer getPeer(String peerId) throws IOException, KeeperException{
272 String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
273 byte [] data = ZKUtil.getData(this.zookeeper, znode);
274 String otherClusterKey = Bytes.toString(data);
275 if (this.ourClusterKey.equals(otherClusterKey)) {
276 LOG.debug("Not connecting to " + peerId + " because it's us");
277 return null;
278 }
279
280 Configuration otherConf = new Configuration(this.conf);
281 try {
282 ZKUtil.applyClusterKeyToConf(otherConf, otherClusterKey);
283 } catch (IOException e) {
284 LOG.error("Can't get peer because:", e);
285 return null;
286 }
287
288 ZooKeeperWatcher zkw = new ZooKeeperWatcher(otherConf,
289 "connection to cluster: " + peerId, this.abortable);
290 return new ReplicationPeer(otherConf, peerId,
291 otherClusterKey, zkw);
292 }
293
294
295
296
297
298 public void setReplicating(boolean newState) throws KeeperException {
299 ZKUtil.createWithParents(this.zookeeper,
300 ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
301 ZKUtil.setData(this.zookeeper,
302 ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName),
303 Bytes.toBytes(Boolean.toString(newState)));
304 }
305
306
307
308
309
310
311
312 public void removePeer(String id) throws IOException {
313 try {
314 if (!peerExists(id)) {
315 throw new IllegalArgumentException("Cannot remove inexisting peer");
316 }
317 ZKUtil.deleteNode(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
318 } catch (KeeperException e) {
319 throw new IOException("Unable to remove a peer", e);
320 }
321 }
322
323
324
325
326
327
328
329
330
331 public void addPeer(String id, String clusterKey) throws IOException {
332 try {
333 if (peerExists(id)) {
334 throw new IllegalArgumentException("Cannot add existing peer");
335 } else if (countPeers() > 0) {
336 throw new IllegalStateException("Multi-slave isn't supported yet");
337 }
338 ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
339 ZKUtil.createAndWatch(this.zookeeper,
340 ZKUtil.joinZNode(this.peersZNode, id), Bytes.toBytes(clusterKey));
341 } catch (KeeperException e) {
342 throw new IOException("Unable to add peer", e);
343 }
344 }
345
346 private boolean peerExists(String id) throws KeeperException {
347 return ZKUtil.checkExists(this.zookeeper,
348 ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
349 }
350
351 private int countPeers() throws KeeperException {
352 List<String> peers =
353 ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
354 return peers == null ? 0 : peers.size();
355 }
356
357
358
359
360 private void readReplicationStateZnode() {
361 try {
362 this.replicating.set(getReplication());
363 LOG.info("Replication is now " + (this.replicating.get()?
364 "started" : "stopped"));
365 } catch (KeeperException e) {
366 this.abortable.abort("Failed getting data on from " + getRepStateNode(), e);
367 }
368 }
369
370
371
372
373
374
375
376 public boolean getReplication() throws KeeperException {
377 byte [] data = this.statusTracker.getData();
378 if (data == null || data.length == 0) {
379 setReplicating(true);
380 return true;
381 }
382 return Boolean.parseBoolean(Bytes.toString(data));
383 }
384
385 private String getRepStateNode() {
386 return ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName);
387 }
388
389
390
391
392
393
394 public void addLogToList(String filename, String clusterId) {
395 try {
396 String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId);
397 znode = ZKUtil.joinZNode(znode, filename);
398 ZKUtil.createWithParents(this.zookeeper, znode);
399 } catch (KeeperException e) {
400 this.abortable.abort("Failed add log to list", e);
401 }
402 }
403
404
405
406
407
408
409 public void removeLogFromList(String filename, String clusterId) {
410 try {
411 String znode = ZKUtil.joinZNode(rsServerNameZnode, clusterId);
412 znode = ZKUtil.joinZNode(znode, filename);
413 ZKUtil.deleteNode(this.zookeeper, znode);
414 } catch (KeeperException e) {
415 this.abortable.abort("Failed remove from list", e);
416 }
417 }
418
419
420
421
422
423
424
425
426 public void writeReplicationStatus(String filename, String clusterId,
427 long position) {
428 try {
429 String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId);
430 znode = ZKUtil.joinZNode(znode, filename);
431
432 ZKUtil.setData(this.zookeeper, znode,
433 Bytes.toBytes(Long.toString(position)));
434 } catch (KeeperException e) {
435 this.abortable.abort("Writing replication status", e);
436 }
437 }
438
439
440
441
442
443
444 public List<String> getRegisteredRegionServers() {
445 List<String> result = null;
446 try {
447 result = ZKUtil.listChildrenAndWatchThem(
448 this.zookeeper, this.zookeeper.rsZNode);
449 } catch (KeeperException e) {
450 this.abortable.abort("Get list of registered region servers", e);
451 }
452 return result;
453 }
454
455
456
457
458
459
460 public List<String> getListOfReplicators() {
461 List<String> result = null;
462 try {
463 result = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZNode);
464 } catch (KeeperException e) {
465 this.abortable.abort("Get list of replicators", e);
466 }
467 return result;
468 }
469
470
471
472
473
474
475 public List<String> getListPeersForRS(String rs) {
476 String znode = ZKUtil.joinZNode(rsZNode, rs);
477 List<String> result = null;
478 try {
479 result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
480 } catch (KeeperException e) {
481 this.abortable.abort("Get list of peers for rs", e);
482 }
483 return result;
484 }
485
486
487
488
489
490
491
492 public List<String> getListHLogsForPeerForRS(String rs, String id) {
493 String znode = ZKUtil.joinZNode(rsZNode, rs);
494 znode = ZKUtil.joinZNode(znode, id);
495 List<String> result = null;
496 try {
497 result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
498 } catch (KeeperException e) {
499 this.abortable.abort("Get list of hlogs for peer", e);
500 }
501 return result;
502 }
503
504
505
506
507
508
509 public boolean lockOtherRS(String znode) {
510 try {
511 String parent = ZKUtil.joinZNode(this.rsZNode, znode);
512 if (parent.equals(rsServerNameZnode)) {
513 LOG.warn("Won't lock because this is us, we're dead!");
514 return false;
515 }
516 String p = ZKUtil.joinZNode(parent, RS_LOCK_ZNODE);
517 ZKUtil.createAndWatch(this.zookeeper, p, Bytes.toBytes(rsServerNameZnode));
518 } catch (KeeperException e) {
519
520
521
522
523
524 if (e instanceof KeeperException.NoNodeException ||
525 e instanceof KeeperException.NodeExistsException) {
526 LOG.info("Won't transfer the queue," +
527 " another RS took care of it because of: " + e.getMessage());
528 } else {
529 LOG.info("Failed lock other rs", e);
530 }
531 return false;
532 }
533 return true;
534 }
535
536
537
538
539
540
541
542
543 public SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
544
545
546 SortedMap<String,SortedSet<String>> queues =
547 new TreeMap<String,SortedSet<String>>();
548 try {
549 String nodePath = ZKUtil.joinZNode(rsZNode, znode);
550 List<String> clusters =
551 ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath);
552
553 if (clusters == null || clusters.size() <= 1) {
554 return queues;
555 }
556
557 clusters.remove(RS_LOCK_ZNODE);
558 for (String cluster : clusters) {
559
560
561
562 String newCluster = cluster+"-"+znode;
563 String newClusterZnode = ZKUtil.joinZNode(rsServerNameZnode, newCluster);
564 ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
565 HConstants.EMPTY_BYTE_ARRAY);
566 String clusterPath = ZKUtil.joinZNode(nodePath, cluster);
567 List<String> hlogs = ZKUtil.listChildrenNoWatch(this.zookeeper, clusterPath);
568
569 if (hlogs == null || hlogs.size() == 0) {
570 continue;
571 }
572 SortedSet<String> logQueue = new TreeSet<String>();
573 queues.put(newCluster, logQueue);
574 for (String hlog : hlogs) {
575 String z = ZKUtil.joinZNode(clusterPath, hlog);
576 byte [] position = ZKUtil.getData(this.zookeeper, z);
577 LOG.debug("Creating " + hlog + " with data " + Bytes.toString(position));
578 String child = ZKUtil.joinZNode(newClusterZnode, hlog);
579 ZKUtil.createAndWatch(this.zookeeper, child, position);
580 logQueue.add(hlog);
581 }
582 }
583 } catch (KeeperException e) {
584 this.abortable.abort("Copy queues from rs", e);
585 }
586 return queues;
587 }
588
589
590
591
592
593 public void deleteSource(String peerZnode, boolean closeConnection) {
594 try {
595 ZKUtil.deleteNodeRecursively(this.zookeeper,
596 ZKUtil.joinZNode(rsServerNameZnode, peerZnode));
597 if (closeConnection) {
598 this.peerClusters.get(peerZnode).getZkw().close();
599 this.peerClusters.remove(peerZnode);
600 }
601 } catch (KeeperException e) {
602 this.abortable.abort("Failed delete of " + peerZnode, e);
603 }
604 }
605
606
607
608
609
610 public void deleteRsQueues(String znode) {
611 String fullpath = ZKUtil.joinZNode(rsZNode, znode);
612 try {
613 List<String> clusters =
614 ZKUtil.listChildrenNoWatch(this.zookeeper, fullpath);
615 for (String cluster : clusters) {
616
617 if (cluster.equals(RS_LOCK_ZNODE)) {
618 continue;
619 }
620 String fullClusterPath = ZKUtil.joinZNode(fullpath, cluster);
621 ZKUtil.deleteNodeRecursively(this.zookeeper, fullClusterPath);
622 }
623
624 ZKUtil.deleteNodeRecursively(this.zookeeper, fullpath);
625 } catch (KeeperException e) {
626 if (e instanceof KeeperException.NoNodeException ||
627 e instanceof KeeperException.NotEmptyException) {
628
629
630
631 if (e.getPath().equals(fullpath)) {
632 return;
633 }
634 }
635 this.abortable.abort("Failed delete of " + znode, e);
636 }
637 }
638
639
640
641
642 public void deleteOwnRSZNode() {
643 try {
644 ZKUtil.deleteNodeRecursively(this.zookeeper,
645 this.rsServerNameZnode);
646 } catch (KeeperException e) {
647
648 if (e instanceof KeeperException.SessionExpiredException) {
649 return;
650 }
651 this.abortable.abort("Failed delete of " + this.rsServerNameZnode, e);
652 }
653 }
654
655
656
657
658
659
660
661
662 public long getHLogRepPosition(String peerId, String hlog)
663 throws KeeperException {
664 String clusterZnode = ZKUtil.joinZNode(rsServerNameZnode, peerId);
665 String znode = ZKUtil.joinZNode(clusterZnode, hlog);
666 String data = Bytes.toString(ZKUtil.getData(this.zookeeper, znode));
667 return data == null || data.length() == 0 ? 0 : Long.parseLong(data);
668 }
669
670 public void registerRegionServerListener(ZooKeeperListener listener) {
671 this.zookeeper.registerListener(listener);
672 }
673
674
675
676
677
678
679 public String getClusterId() {
680 return this.clusterId;
681 }
682
683
684
685
686
687 public Map<String, ReplicationPeer> getPeerClusters() {
688 return this.peerClusters;
689 }
690
691
692
693
694
695
696 public static String getZNodeName(String fullPath) {
697 String[] parts = fullPath.split("/");
698 return parts.length > 0 ? parts[parts.length-1] : "";
699 }
700
701
702
703
704
705 public ZooKeeperWatcher getZookeeperWatcher() {
706 return this.zookeeper;
707 }
708
709
710
711
712
713
714 public String getPeersZNode() {
715 return peersZNode;
716 }
717
718
719
720
721 public class ReplicationStatusTracker extends ZooKeeperNodeTracker {
722 public ReplicationStatusTracker(ZooKeeperWatcher watcher,
723 Abortable abortable) {
724 super(watcher, getRepStateNode(), abortable);
725 }
726
727 @Override
728 public synchronized void nodeDataChanged(String path) {
729 if (path.equals(node)) {
730 super.nodeDataChanged(path);
731 readReplicationStateZnode();
732 }
733 }
734 }
735 }