1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.replication.regionserver;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.SortedMap;
28 import java.util.SortedSet;
29 import java.util.TreeSet;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.RejectedExecutionException;
32 import java.util.concurrent.ThreadPoolExecutor;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicBoolean;
35
36 import com.google.common.util.concurrent.ThreadFactoryBuilder;
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.fs.FileSystem;
41 import org.apache.hadoop.fs.Path;
42 import org.apache.hadoop.hbase.Stoppable;
43 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
44 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
45 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
46 import org.apache.zookeeper.KeeperException;
47
48
49
50
51
52
53
54
55
56
57
58
59 public class ReplicationSourceManager {
60 private static final Log LOG =
61 LogFactory.getLog(ReplicationSourceManager.class);
62
63 private final List<ReplicationSourceInterface> sources;
64
65 private final List<ReplicationSourceInterface> oldsources;
66
67 private final AtomicBoolean replicating;
68
69 private final ReplicationZookeeper zkHelper;
70
71 private final Stoppable stopper;
72
73 private final SortedSet<String> hlogs;
74 private final Configuration conf;
75 private final FileSystem fs;
76
77 private Path latestPath;
78
79 private final List<String> otherRegionServers;
80
81 private final Path logDir;
82
83 private final Path oldLogDir;
84
85 private final long sleepBeforeFailover;
86
87 private final ThreadPoolExecutor executor;
88
89
90
91
92
93
94
95
96
97
98
99
100 public ReplicationSourceManager(final ReplicationZookeeper zkHelper,
101 final Configuration conf,
102 final Stoppable stopper,
103 final FileSystem fs,
104 final AtomicBoolean replicating,
105 final Path logDir,
106 final Path oldLogDir) {
107 this.sources = new ArrayList<ReplicationSourceInterface>();
108 this.replicating = replicating;
109 this.zkHelper = zkHelper;
110 this.stopper = stopper;
111 this.hlogs = new TreeSet<String>();
112 this.oldsources = new ArrayList<ReplicationSourceInterface>();
113 this.conf = conf;
114 this.fs = fs;
115 this.logDir = logDir;
116 this.oldLogDir = oldLogDir;
117 this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000);
118 this.zkHelper.registerRegionServerListener(
119 new OtherRegionServerWatcher(this.zkHelper.getZookeeperWatcher()));
120 List<String> otherRSs =
121 this.zkHelper.getRegisteredRegionServers();
122 this.zkHelper.registerRegionServerListener(
123 new PeersWatcher(this.zkHelper.getZookeeperWatcher()));
124 this.zkHelper.listPeersIdsAndWatch();
125 this.otherRegionServers = otherRSs == null ? new ArrayList<String>() : otherRSs;
126
127
128 int nbWorkers = conf.getInt("replication.executor.workers", 1);
129
130
131 this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
132 100, TimeUnit.MILLISECONDS,
133 new LinkedBlockingQueue<Runnable>());
134 ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
135 tfb.setNameFormat("ReplicationExecutor-%d");
136 this.executor.setThreadFactory(tfb.build());
137 }
138
139
140
141
142
143
144
145
146
147
148
149 public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) {
150 String key = log.getName();
151 LOG.info("Going to report log #" + key + " for position " + position + " in " + log);
152 this.zkHelper.writeReplicationStatus(key.toString(), id, position);
153 synchronized (this.hlogs) {
154 if (!queueRecovered && this.hlogs.first() != key) {
155 SortedSet<String> hlogSet = this.hlogs.headSet(key);
156 LOG.info("Removing " + hlogSet.size() +
157 " logs in the list: " + hlogSet);
158 for (String hlog : hlogSet) {
159 this.zkHelper.removeLogFromList(hlog.toString(), id);
160 }
161 hlogSet.clear();
162 }
163 }
164 }
165
166
167
168
169
170 public void init() throws IOException {
171 for (String id : this.zkHelper.getPeerClusters().keySet()) {
172 addSource(id);
173 }
174 List<String> currentReplicators = this.zkHelper.getListOfReplicators();
175 if (currentReplicators == null || currentReplicators.size() == 0) {
176 return;
177 }
178 synchronized (otherRegionServers) {
179 LOG.info("Current list of replicators: " + currentReplicators
180 + " other RSs: " + otherRegionServers);
181 }
182
183 for (String rs : currentReplicators) {
184 synchronized (otherRegionServers) {
185 if (!this.otherRegionServers.contains(rs)) {
186 transferQueues(rs);
187 }
188 }
189 }
190 }
191
192
193
194
195
196
197
198 public ReplicationSourceInterface addSource(String id) throws IOException {
199 ReplicationSourceInterface src =
200 getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
201
202 src.setSourceEnabled(true);
203 synchronized (this.hlogs) {
204 this.sources.add(src);
205 if (this.hlogs.size() > 0) {
206
207 this.zkHelper.addLogToList(this.hlogs.last(),
208 this.sources.get(0).getPeerClusterZnode());
209 src.enqueueLog(this.latestPath);
210 }
211 }
212 src.startup();
213 return src;
214 }
215
216
217
218
219 public void join() {
220 this.executor.shutdown();
221 if (this.sources.size() == 0) {
222 this.zkHelper.deleteOwnRSZNode();
223 }
224 for (ReplicationSourceInterface source : this.sources) {
225 source.terminate("Region server is closing");
226 }
227 }
228
229
230
231
232
233 protected SortedSet<String> getHLogs() {
234 return new TreeSet<String>(this.hlogs);
235 }
236
237
238
239
240
241 public List<ReplicationSourceInterface> getSources() {
242 return this.sources;
243 }
244
245 void logRolled(Path newLog) {
246 if (!this.replicating.get()) {
247 LOG.warn("Replication stopped, won't add new log");
248 return;
249 }
250
251 synchronized (this.hlogs) {
252 if (this.sources.size() > 0) {
253 this.zkHelper.addLogToList(newLog.getName(),
254 this.sources.get(0).getPeerClusterZnode());
255 } else {
256
257
258 this.hlogs.clear();
259 }
260 this.hlogs.add(newLog.getName());
261 }
262 this.latestPath = newLog;
263
264 for (ReplicationSourceInterface source : this.sources) {
265 source.enqueueLog(newLog);
266 }
267 }
268
269
270
271
272
273 public ReplicationZookeeper getRepZkWrapper() {
274 return zkHelper;
275 }
276
277
278
279
280
281
282
283
284
285
286
287
288 public ReplicationSourceInterface getReplicationSource(
289 final Configuration conf,
290 final FileSystem fs,
291 final ReplicationSourceManager manager,
292 final Stoppable stopper,
293 final AtomicBoolean replicating,
294 final String peerClusterId) throws IOException {
295 ReplicationSourceInterface src;
296 try {
297 @SuppressWarnings("rawtypes")
298 Class c = Class.forName(conf.get("replication.replicationsource.implementation",
299 ReplicationSource.class.getCanonicalName()));
300 src = (ReplicationSourceInterface) c.newInstance();
301 } catch (Exception e) {
302 LOG.warn("Passed replication source implemention throws errors, " +
303 "defaulting to ReplicationSource", e);
304 src = new ReplicationSource();
305
306 }
307 src.init(conf, fs, manager, stopper, replicating, peerClusterId);
308 return src;
309 }
310
311
312
313
314
315
316
317
318
319 public void transferQueues(String rsZnode) {
320 NodeFailoverWorker transfer = new NodeFailoverWorker(rsZnode);
321 try {
322 this.executor.execute(transfer);
323 } catch (RejectedExecutionException ex) {
324 LOG.info("Cancelling the transfer of " + rsZnode +
325 " because of " + ex.getMessage());
326 }
327 }
328
329
330
331
332
333 public void closeRecoveredQueue(ReplicationSourceInterface src) {
334 LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
335 this.oldsources.remove(src);
336 this.zkHelper.deleteSource(src.getPeerClusterZnode(), false);
337 }
338
339
340
341
342
343
344 public void removePeer(String id) {
345 LOG.info("Closing the following queue " + id + ", currently have "
346 + sources.size() + " and another "
347 + oldsources.size() + " that were recovered");
348 ReplicationSourceInterface srcToRemove = null;
349 List<ReplicationSourceInterface> oldSourcesToDelete =
350 new ArrayList<ReplicationSourceInterface>();
351
352 for (ReplicationSourceInterface src : oldsources) {
353 if (id.equals(src.getPeerClusterId())) {
354 oldSourcesToDelete.add(src);
355 }
356 }
357 for (ReplicationSourceInterface src : oldSourcesToDelete) {
358 closeRecoveredQueue((src));
359 }
360 LOG.info("Number of deleted recovered sources for " + id + ": "
361 + oldSourcesToDelete.size());
362
363 for (ReplicationSourceInterface src : this.sources) {
364 if (id.equals(src.getPeerClusterId())) {
365 srcToRemove = src;
366 break;
367 }
368 }
369 if (srcToRemove == null) {
370 LOG.error("The queue we wanted to close is missing " + id);
371 return;
372 }
373 srcToRemove.terminate("Replication stream was removed by a user");
374 this.sources.remove(srcToRemove);
375 this.zkHelper.deleteSource(id, true);
376 }
377
378
379
380
381
382
383 public class OtherRegionServerWatcher extends ZooKeeperListener {
384
385
386
387
388 public OtherRegionServerWatcher(ZooKeeperWatcher watcher) {
389 super(watcher);
390 }
391
392
393
394
395
396 public void nodeCreated(String path) {
397 refreshRegionServersList(path);
398 }
399
400
401
402
403
404 public void nodeDeleted(String path) {
405 if (stopper.isStopped()) {
406 return;
407 }
408 boolean cont = refreshRegionServersList(path);
409 if (!cont) {
410 return;
411 }
412 LOG.info(path + " znode expired, trying to lock it");
413 transferQueues(zkHelper.getZNodeName(path));
414 }
415
416
417
418
419
420 public void nodeChildrenChanged(String path) {
421 if (stopper.isStopped()) {
422 return;
423 }
424 refreshRegionServersList(path);
425 }
426
427 private boolean refreshRegionServersList(String path) {
428 if (!path.startsWith(zkHelper.getZookeeperWatcher().rsZNode)) {
429 return false;
430 }
431 List<String> newRsList = (zkHelper.getRegisteredRegionServers());
432 if (newRsList == null) {
433 return false;
434 } else {
435 synchronized (otherRegionServers) {
436 otherRegionServers.clear();
437 otherRegionServers.addAll(newRsList);
438 }
439 }
440 return true;
441 }
442 }
443
444
445
446
447 public class PeersWatcher extends ZooKeeperListener {
448
449
450
451
452 public PeersWatcher(ZooKeeperWatcher watcher) {
453 super(watcher);
454 }
455
456
457
458
459
460 public void nodeDeleted(String path) {
461 List<String> peers = refreshPeersList(path);
462 if (peers == null) {
463 return;
464 }
465 String id = zkHelper.getZNodeName(path);
466 removePeer(id);
467 }
468
469
470
471
472
473 public void nodeChildrenChanged(String path) {
474 List<String> peers = refreshPeersList(path);
475 if (peers == null) {
476 return;
477 }
478 for (String id : peers) {
479 try {
480 boolean added = zkHelper.connectToPeer(id);
481 if (added) {
482 addSource(id);
483 }
484 } catch (IOException e) {
485
486 LOG.error("Error while adding a new peer", e);
487 } catch (KeeperException e) {
488 LOG.error("Error while adding a new peer", e);
489 }
490 }
491 }
492
493
494
495
496
497
498
499
500 private List<String> refreshPeersList(String path) {
501 if (!path.startsWith(zkHelper.getPeersZNode())) {
502 return null;
503 }
504 return zkHelper.listPeersIdsAndWatch();
505 }
506 }
507
508
509
510
511
512 class NodeFailoverWorker extends Thread {
513
514 private String rsZnode;
515
516
517
518
519
520 public NodeFailoverWorker(String rsZnode) {
521 super("Failover-for-"+rsZnode);
522 this.rsZnode = rsZnode;
523 }
524
525 @Override
526 public void run() {
527
528
529 try {
530 Thread.sleep(sleepBeforeFailover);
531 } catch (InterruptedException e) {
532 LOG.warn("Interrupted while waiting before transferring a queue.");
533 Thread.currentThread().interrupt();
534 }
535
536 if (stopper.isStopped()) {
537 LOG.info("Not transferring queue since we are shutting down");
538 return;
539 }
540 if (!zkHelper.lockOtherRS(rsZnode)) {
541 return;
542 }
543 LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
544 SortedMap<String, SortedSet<String>> newQueues =
545 zkHelper.copyQueuesFromRS(rsZnode);
546 zkHelper.deleteRsQueues(rsZnode);
547 if (newQueues == null || newQueues.size() == 0) {
548 return;
549 }
550
551 for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
552 String peerId = entry.getKey();
553 try {
554 ReplicationSourceInterface src = getReplicationSource(conf,
555 fs, ReplicationSourceManager.this, stopper, replicating, peerId);
556 if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) {
557 src.terminate("Recovered queue doesn't belong to any current peer");
558 break;
559 }
560 oldsources.add(src);
561 for (String hlog : entry.getValue()) {
562 src.enqueueLog(new Path(oldLogDir, hlog));
563 }
564
565 src.setSourceEnabled(true);
566 src.startup();
567 } catch (IOException e) {
568
569 LOG.error("Failed creating a source", e);
570 }
571 }
572 }
573 }
574
575
576
577
578
579 public Path getOldLogDir() {
580 return this.oldLogDir;
581 }
582
583
584
585
586
587 public Path getLogDir() {
588 return this.logDir;
589 }
590
591
592
593
594
595 public FileSystem getFs() {
596 return this.fs;
597 }
598 }