1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.replication.regionserver;
21
22 import java.io.EOFException;
23 import java.io.FileNotFoundException;
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.Comparator;
28 import java.util.HashSet;
29 import java.util.List;
30 import java.util.NavigableMap;
31 import java.util.Random;
32 import java.util.Set;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.PriorityBlockingQueue;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicBoolean;
37
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.fs.FileStatus;
42 import org.apache.hadoop.fs.FileSystem;
43 import org.apache.hadoop.fs.Path;
44 import org.apache.hadoop.hbase.HConstants;
45 import org.apache.hadoop.hbase.HServerAddress;
46 import org.apache.hadoop.hbase.KeyValue;
47 import org.apache.hadoop.hbase.Stoppable;
48 import org.apache.hadoop.hbase.client.HConnection;
49 import org.apache.hadoop.hbase.client.HConnectionManager;
50 import org.apache.hadoop.hbase.ipc.HRegionInterface;
51 import org.apache.hadoop.hbase.regionserver.wal.HLog;
52 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
53 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
54 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
55 import org.apache.hadoop.hbase.util.Bytes;
56 import org.apache.hadoop.hbase.util.Threads;
57 import org.apache.hadoop.ipc.RemoteException;
58 import org.apache.zookeeper.KeeperException;
59
60
61
62
63
64
65
66
67
68
69
70
71
72 public class ReplicationSource extends Thread
73 implements ReplicationSourceInterface {
74
75 private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
76
77 private PriorityBlockingQueue<Path> queue;
78
79 private HLog.Entry[] entriesArray;
80 private HConnection conn;
81
82 private ReplicationZookeeper zkHelper;
83 private Configuration conf;
84
85 private float ratio;
86 private Random random;
87
88 private AtomicBoolean replicating;
89
90 private String peerClusterId;
91
92 private ReplicationSourceManager manager;
93
94 private Stoppable stopper;
95
96 private List<HServerAddress> currentPeers;
97
98 private long sleepForRetries;
99
100 private long replicationQueueSizeCapacity;
101
102 private int replicationQueueNbCapacity;
103
104 private HLog.Reader reader;
105
106 private long position = 0;
107
108 private volatile Path currentPath;
109 private FileSystem fs;
110
111 private byte clusterId;
112
113 private long totalReplicatedEdits = 0;
114
115 private String peerClusterZnode;
116
117 private boolean queueRecovered;
118
119 private String[] deadRegionServers;
120
121 private long maxRetriesMultiplier;
122
123 private int currentNbEntries = 0;
124
125 private int currentNbOperations = 0;
126
127 private volatile boolean running = true;
128
129 private ReplicationSourceMetrics metrics;
130
131
132 private AtomicBoolean sourceEnabled = new AtomicBoolean();
133
134
135
136
137
138
139
140
141
142
143
144
145 public void init(final Configuration conf,
146 final FileSystem fs,
147 final ReplicationSourceManager manager,
148 final Stoppable stopper,
149 final AtomicBoolean replicating,
150 final String peerClusterZnode)
151 throws IOException {
152 this.stopper = stopper;
153 this.conf = conf;
154 this.replicationQueueSizeCapacity =
155 this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
156 this.replicationQueueNbCapacity =
157 this.conf.getInt("replication.source.nb.capacity", 25000);
158 this.entriesArray = new HLog.Entry[this.replicationQueueNbCapacity];
159 for (int i = 0; i < this.replicationQueueNbCapacity; i++) {
160 this.entriesArray[i] = new HLog.Entry();
161 }
162 this.maxRetriesMultiplier =
163 this.conf.getLong("replication.source.maxretriesmultiplier", 10);
164 this.queue =
165 new PriorityBlockingQueue<Path>(
166 conf.getInt("hbase.regionserver.maxlogs", 32),
167 new LogsComparator());
168 this.conn = HConnectionManager.getConnection(conf);
169 this.zkHelper = manager.getRepZkWrapper();
170 this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f);
171 this.currentPeers = new ArrayList<HServerAddress>();
172 this.random = new Random();
173 this.replicating = replicating;
174 this.manager = manager;
175 this.sleepForRetries =
176 this.conf.getLong("replication.source.sleepforretries", 1000);
177 this.fs = fs;
178 this.clusterId = Byte.valueOf(zkHelper.getClusterId());
179 this.metrics = new ReplicationSourceMetrics(peerClusterZnode);
180
181
182 this.checkIfQueueRecovered(peerClusterZnode);
183 }
184
185
186
187 private void checkIfQueueRecovered(String peerClusterZnode) {
188 String[] parts = peerClusterZnode.split("-");
189 this.queueRecovered = parts.length != 1;
190 this.peerClusterId = this.queueRecovered ?
191 parts[0] : peerClusterZnode;
192 this.peerClusterZnode = peerClusterZnode;
193 this.deadRegionServers = new String[parts.length-1];
194
195 for (int i = 1; i < parts.length; i++) {
196 this.deadRegionServers[i-1] = parts[i];
197 }
198 }
199
200
201
202
203 private void chooseSinks() throws KeeperException {
204 this.currentPeers.clear();
205 List<HServerAddress> addresses =
206 this.zkHelper.getSlavesAddresses(peerClusterId);
207 Set<HServerAddress> setOfAddr = new HashSet<HServerAddress>();
208 int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
209 LOG.info("Getting " + nbPeers +
210 " rs from peer cluster # " + peerClusterId);
211 for (int i = 0; i < nbPeers; i++) {
212 HServerAddress address;
213
214 do {
215 address = addresses.get(this.random.nextInt(addresses.size()));
216 } while (setOfAddr.contains(address));
217 LOG.info("Choosing peer " + address);
218 setOfAddr.add(address);
219 }
220 this.currentPeers.addAll(setOfAddr);
221 }
222
223 @Override
224 public void enqueueLog(Path log) {
225 this.queue.put(log);
226 this.metrics.sizeOfLogQueue.set(queue.size());
227 }
228
229 @Override
230 public void run() {
231 connectToPeers();
232
233 if (this.stopper.isStopped()) {
234 return;
235 }
236
237
238 if (this.queueRecovered) {
239 try {
240 this.position = this.zkHelper.getHLogRepPosition(
241 this.peerClusterZnode, this.queue.peek().getName());
242 } catch (KeeperException e) {
243 this.terminate("Couldn't get the position of this recovered queue " +
244 peerClusterZnode, e);
245 }
246 }
247 int sleepMultiplier = 1;
248
249 while (!stopper.isStopped() && this.running) {
250
251 if (!this.replicating.get() || !this.sourceEnabled.get()) {
252 if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
253 sleepMultiplier++;
254 }
255 continue;
256 }
257
258 if (!getNextPath()) {
259 if (sleepForRetries("No log to process", sleepMultiplier)) {
260 sleepMultiplier++;
261 }
262 continue;
263 }
264
265 if (!openReader(sleepMultiplier)) {
266
267 sleepMultiplier = 1;
268 continue;
269 }
270
271
272 if (this.reader == null) {
273 if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
274 sleepMultiplier++;
275 }
276 continue;
277 }
278
279 boolean gotIOE = false;
280 currentNbEntries = 0;
281 try {
282 if(readAllEntriesToReplicateOrNextFile()) {
283 continue;
284 }
285 } catch (IOException ioe) {
286 LOG.warn(peerClusterZnode + " Got: ", ioe);
287 gotIOE = true;
288 if (ioe.getCause() instanceof EOFException) {
289
290 boolean considerDumping = false;
291 if (this.queueRecovered) {
292 try {
293 FileStatus stat = this.fs.getFileStatus(this.currentPath);
294 if (stat.getLen() == 0) {
295 LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
296 }
297 considerDumping = true;
298 } catch (IOException e) {
299 LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
300 }
301 } else if (currentNbEntries != 0) {
302 LOG.warn(peerClusterZnode + " Got EOF while reading, " +
303 "looks like this file is broken? " + currentPath);
304 considerDumping = true;
305 currentNbEntries = 0;
306 }
307
308 if (considerDumping &&
309 sleepMultiplier == this.maxRetriesMultiplier &&
310 processEndOfFile()) {
311 continue;
312 }
313 }
314 } finally {
315 try {
316
317 if (this.currentPath != null && !gotIOE) {
318 this.position = this.reader.getPosition();
319 }
320 if (this.reader != null) {
321 this.reader.close();
322 }
323 } catch (IOException e) {
324 gotIOE = true;
325 LOG.warn("Unable to finalize the tailing of a file", e);
326 }
327 }
328
329
330
331
332 if (!stopper.isStopped() && (gotIOE || currentNbEntries == 0)) {
333 this.manager.logPositionAndCleanOldLogs(this.currentPath,
334 this.peerClusterZnode, this.position, queueRecovered);
335 if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
336 sleepMultiplier++;
337 }
338 continue;
339 }
340 sleepMultiplier = 1;
341 shipEdits();
342
343 }
344 LOG.debug("Source exiting " + peerClusterId);
345 }
346
347
348
349
350
351
352
353
354 protected boolean readAllEntriesToReplicateOrNextFile() throws IOException{
355 long seenEntries = 0;
356 if (this.position != 0) {
357 this.reader.seek(this.position);
358 }
359 HLog.Entry entry = this.reader.next(this.entriesArray[currentNbEntries]);
360 while (entry != null) {
361 WALEdit edit = entry.getEdit();
362 this.metrics.logEditsReadRate.inc(1);
363 seenEntries++;
364
365 removeNonReplicableEdits(edit);
366 HLogKey logKey = entry.getKey();
367
368
369 if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) ||
370 Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) &&
371 edit.size() != 0 && replicating.get()) {
372 logKey.setClusterId(this.clusterId);
373 currentNbOperations += countDistinctRowKeys(edit);
374 currentNbEntries++;
375 } else {
376 this.metrics.logEditsFilteredRate.inc(1);
377 }
378
379 if ((this.reader.getPosition() - this.position)
380 >= this.replicationQueueSizeCapacity ||
381 currentNbEntries >= this.replicationQueueNbCapacity) {
382 break;
383 }
384 entry = this.reader.next(entriesArray[currentNbEntries]);
385 }
386 LOG.debug("currentNbOperations:" + currentNbOperations +
387 " and seenEntries:" + seenEntries +
388 " and size: " + (this.reader.getPosition() - this.position));
389
390
391 return seenEntries == 0 && processEndOfFile();
392 }
393
394 private void connectToPeers() {
395
396 while (!this.stopper.isStopped() && this.currentPeers.size() == 0) {
397 try {
398 chooseSinks();
399 Thread.sleep(this.sleepForRetries);
400 } catch (InterruptedException e) {
401 LOG.error("Interrupted while trying to connect to sinks", e);
402 } catch (KeeperException e) {
403 LOG.error("Error talking to zookeeper, retrying", e);
404 }
405 }
406 }
407
408
409
410
411
412 protected boolean getNextPath() {
413 try {
414 if (this.currentPath == null) {
415 this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
416 this.metrics.sizeOfLogQueue.set(queue.size());
417 }
418 } catch (InterruptedException e) {
419 LOG.warn("Interrupted while reading edits", e);
420 }
421 return this.currentPath != null;
422 }
423
424
425
426
427
428
429
430 protected boolean openReader(int sleepMultiplier) {
431 try {
432 LOG.debug("Opening log for replication " + this.currentPath.getName() +
433 " at " + this.position);
434 try {
435 this.reader = null;
436 this.reader = HLog.getReader(this.fs, this.currentPath, this.conf);
437 } catch (FileNotFoundException fnfe) {
438 if (this.queueRecovered) {
439
440
441
442 LOG.info("NB dead servers : " + deadRegionServers.length);
443 for (int i = this.deadRegionServers.length - 1; i >= 0; i--) {
444
445 Path deadRsDirectory =
446 new Path(manager.getLogDir().getParent(), this.deadRegionServers[i]);
447 Path possibleLogLocation =
448 new Path(deadRsDirectory, currentPath.getName());
449 LOG.info("Possible location " + possibleLogLocation.toUri().toString());
450 if (this.manager.getFs().exists(possibleLogLocation)) {
451
452 LOG.info("Log " + this.currentPath + " still exists at " +
453 possibleLogLocation);
454
455 return true;
456 }
457 }
458
459
460
461
462
463
464
465 throw new IOException("File from recovered queue is " +
466 "nowhere to be found", fnfe);
467 } else {
468
469 Path archivedLogLocation =
470 new Path(manager.getOldLogDir(), currentPath.getName());
471 if (this.manager.getFs().exists(archivedLogLocation)) {
472 currentPath = archivedLogLocation;
473 LOG.info("Log " + this.currentPath + " was moved to " +
474 archivedLogLocation);
475
476 this.openReader(sleepMultiplier);
477
478 }
479
480 }
481 }
482 } catch (IOException ioe) {
483 LOG.warn(peerClusterZnode + " Got: ", ioe);
484
485
486 if (sleepMultiplier == this.maxRetriesMultiplier) {
487 LOG.warn("Waited too long for this file, considering dumping");
488 return !processEndOfFile();
489 }
490 }
491 return true;
492 }
493
494
495
496
497
498
499
500 protected boolean sleepForRetries(String msg, int sleepMultiplier) {
501 try {
502 LOG.debug(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
503 Thread.sleep(this.sleepForRetries * sleepMultiplier);
504 } catch (InterruptedException e) {
505 LOG.debug("Interrupted while sleeping between retries");
506 }
507 return sleepMultiplier < maxRetriesMultiplier;
508 }
509
510
511
512
513
514 protected void removeNonReplicableEdits(WALEdit edit) {
515 NavigableMap<byte[], Integer> scopes = edit.getScopes();
516 List<KeyValue> kvs = edit.getKeyValues();
517 for (int i = 0; i < edit.size(); i++) {
518 KeyValue kv = kvs.get(i);
519
520
521 if (scopes == null || !scopes.containsKey(kv.getFamily())) {
522 kvs.remove(i);
523 i--;
524 }
525 }
526 }
527
528
529
530
531
532
533
534 private int countDistinctRowKeys(WALEdit edit) {
535 List<KeyValue> kvs = edit.getKeyValues();
536 int distinctRowKeys = 1;
537 KeyValue lastKV = kvs.get(0);
538 for (int i = 0; i < edit.size(); i++) {
539 if (!kvs.get(i).matchingRow(lastKV)) {
540 distinctRowKeys++;
541 }
542 }
543 return distinctRowKeys;
544 }
545
546
547
548
549 protected void shipEdits() {
550 int sleepMultiplier = 1;
551 if (this.currentNbEntries == 0) {
552 LOG.warn("Was given 0 edits to ship");
553 return;
554 }
555 while (!this.stopper.isStopped()) {
556 try {
557 HRegionInterface rrs = getRS();
558 LOG.debug("Replicating " + currentNbEntries);
559 rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries));
560 this.manager.logPositionAndCleanOldLogs(this.currentPath,
561 this.peerClusterZnode, this.position, queueRecovered);
562 this.totalReplicatedEdits += currentNbEntries;
563 this.metrics.shippedBatchesRate.inc(1);
564 this.metrics.shippedOpsRate.inc(
565 this.currentNbOperations);
566 this.metrics.setAgeOfLastShippedOp(
567 this.entriesArray[this.entriesArray.length-1].getKey().getWriteTime());
568 LOG.debug("Replicated in total: " + this.totalReplicatedEdits);
569 break;
570
571 } catch (IOException ioe) {
572
573 this.metrics.refreshAgeOfLastShippedOp();
574 if (ioe instanceof RemoteException) {
575 ioe = ((RemoteException) ioe).unwrapRemoteException();
576 LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
577 } else {
578 LOG.warn("Can't replicate because of a local or network error: ", ioe);
579 }
580 try {
581 boolean down;
582 do {
583 down = isSlaveDown();
584 if (down) {
585 if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
586 sleepMultiplier++;
587 } else {
588 chooseSinks();
589 }
590 }
591 } while (!this.stopper.isStopped() && down);
592 } catch (InterruptedException e) {
593 LOG.debug("Interrupted while trying to contact the peer cluster");
594 } catch (KeeperException e) {
595 LOG.error("Error talking to zookeeper, retrying", e);
596 }
597
598 }
599 }
600 }
601
602
603
604
605
606
607
608
609 protected boolean processEndOfFile() {
610 if (this.queue.size() != 0) {
611 this.currentPath = null;
612 this.position = 0;
613 return true;
614 } else if (this.queueRecovered) {
615 this.manager.closeRecoveredQueue(this);
616 LOG.info("Finished recovering the queue");
617 this.running = false;
618 return true;
619 }
620 return false;
621 }
622
623 public void startup() {
624 String n = Thread.currentThread().getName();
625 Thread.UncaughtExceptionHandler handler =
626 new Thread.UncaughtExceptionHandler() {
627 public void uncaughtException(final Thread t, final Throwable e) {
628 terminate("Uncaught exception during runtime", new Exception(e));
629 }
630 };
631 Threads.setDaemonThreadRunning(
632 this, n + ".replicationSource," + peerClusterZnode, handler);
633 }
634
635 public void terminate(String reason) {
636 terminate(reason, null);
637 }
638
639 public void terminate(String reason, Exception cause) {
640 if (cause == null) {
641 LOG.info("Closing source "
642 + this.peerClusterZnode + " because: " + reason);
643
644 } else {
645 LOG.error("Closing source " + this.peerClusterZnode
646 + " because an error occurred: " + reason, cause);
647 }
648 this.running = false;
649 Threads.shutdown(this, this.sleepForRetries);
650 }
651
652
653
654
655
656
657 private HRegionInterface getRS() throws IOException {
658 if (this.currentPeers.size() == 0) {
659 throw new IOException(this.peerClusterZnode + " has 0 region servers");
660 }
661 HServerAddress address =
662 currentPeers.get(random.nextInt(this.currentPeers.size()));
663 return this.conn.getHRegionConnection(address);
664 }
665
666
667
668
669
670
671 public boolean isSlaveDown() throws InterruptedException {
672 final CountDownLatch latch = new CountDownLatch(1);
673 Thread pingThread = new Thread() {
674 public void run() {
675 try {
676 HRegionInterface rrs = getRS();
677
678 rrs.getHServerInfo();
679 latch.countDown();
680 } catch (IOException ex) {
681 if (ex instanceof RemoteException) {
682 ex = ((RemoteException) ex).unwrapRemoteException();
683 }
684 LOG.info("Slave cluster looks down: " + ex.getMessage());
685 }
686 }
687 };
688 pingThread.start();
689
690 boolean down = ! latch.await(this.sleepForRetries, TimeUnit.MILLISECONDS);
691 pingThread.interrupt();
692 return down;
693 }
694
695 public String getPeerClusterZnode() {
696 return this.peerClusterZnode;
697 }
698
699 public String getPeerClusterId() {
700 return this.peerClusterId;
701 }
702
703 public Path getCurrentPath() {
704 return this.currentPath;
705 }
706
707 public void setSourceEnabled(boolean status) {
708 this.sourceEnabled.set(status);
709 }
710
711
712
713
714 public static class LogsComparator implements Comparator<Path> {
715
716 @Override
717 public int compare(Path o1, Path o2) {
718 return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
719 }
720
721 @Override
722 public boolean equals(Object o) {
723 return true;
724 }
725
726
727
728
729
730
731
732 private long getTS(Path p) {
733 String[] parts = p.getName().split("\\.");
734 return Long.parseLong(parts[parts.length-1]);
735 }
736 }
737 }