View Javadoc

1   /*
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import java.io.EOFException;
23  import java.io.FileNotFoundException;
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.Arrays;
27  import java.util.Comparator;
28  import java.util.HashSet;
29  import java.util.List;
30  import java.util.NavigableMap;
31  import java.util.Random;
32  import java.util.Set;
33  import java.util.concurrent.CountDownLatch;
34  import java.util.concurrent.PriorityBlockingQueue;
35  import java.util.concurrent.TimeUnit;
36  import java.util.concurrent.atomic.AtomicBoolean;
37  
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.conf.Configuration;
41  import org.apache.hadoop.fs.FileStatus;
42  import org.apache.hadoop.fs.FileSystem;
43  import org.apache.hadoop.fs.Path;
44  import org.apache.hadoop.hbase.HConstants;
45  import org.apache.hadoop.hbase.HServerAddress;
46  import org.apache.hadoop.hbase.KeyValue;
47  import org.apache.hadoop.hbase.Stoppable;
48  import org.apache.hadoop.hbase.client.HConnection;
49  import org.apache.hadoop.hbase.client.HConnectionManager;
50  import org.apache.hadoop.hbase.ipc.HRegionInterface;
51  import org.apache.hadoop.hbase.regionserver.wal.HLog;
52  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
53  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
54  import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
55  import org.apache.hadoop.hbase.util.Bytes;
56  import org.apache.hadoop.hbase.util.Threads;
57  import org.apache.hadoop.ipc.RemoteException;
58  import org.apache.zookeeper.KeeperException;
59  
60  /**
61   * Class that handles the source of a replication stream.
62   * Currently does not handle more than 1 slave
63   * For each slave cluster it selects a random number of peers
64   * using a replication ratio. For example, if replication ration = 0.1
65   * and slave cluster has 100 region servers, 10 will be selected.
66   * <p/>
67   * A stream is considered down when we cannot contact a region server on the
68   * peer cluster for more than 55 seconds by default.
69   * <p/>
70   *
71   */
72  public class ReplicationSource extends Thread
73      implements ReplicationSourceInterface {
74  
75    private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
76    // Queue of logs to process
77    private PriorityBlockingQueue<Path> queue;
78    // container of entries to replicate
79    private HLog.Entry[] entriesArray;
80    private HConnection conn;
81    // Helper class for zookeeper
82    private ReplicationZookeeper zkHelper;
83    private Configuration conf;
84    // ratio of region servers to chose from a slave cluster
85    private float ratio;
86    private Random random;
87    // should we replicate or not?
88    private AtomicBoolean replicating;
89    // id of the peer cluster this source replicates to
90    private String peerClusterId;
91    // The manager of all sources to which we ping back our progress
92    private ReplicationSourceManager manager;
93    // Should we stop everything?
94    private Stoppable stopper;
95    // List of chosen sinks (region servers)
96    private List<HServerAddress> currentPeers;
97    // How long should we sleep for each retry
98    private long sleepForRetries;
99    // Max size in bytes of entriesArray
100   private long replicationQueueSizeCapacity;
101   // Max number of entries in entriesArray
102   private int replicationQueueNbCapacity;
103   // Our reader for the current log
104   private HLog.Reader reader;
105   // Current position in the log
106   private long position = 0;
107   // Path of the current log
108   private volatile Path currentPath;
109   private FileSystem fs;
110   // id of this cluster
111   private byte clusterId;
112   // total number of edits we replicated
113   private long totalReplicatedEdits = 0;
114   // The znode we currently play with
115   private String peerClusterZnode;
116   // Indicates if this queue is recovered (and will be deleted when depleted)
117   private boolean queueRecovered;
118   // List of all the dead region servers that had this queue (if recovered)
119   private String[] deadRegionServers;
120   // Maximum number of retries before taking bold actions
121   private long maxRetriesMultiplier;
122   // Current number of entries that we need to replicate
123   private int currentNbEntries = 0;
124   // Current number of operations (Put/Delete) that we need to replicate
125   private int currentNbOperations = 0;
126   // Indicates if this particular source is running
127   private volatile boolean running = true;
128   // Metrics for this source
129   private ReplicationSourceMetrics metrics;
130   // If source is enabled, replication happens. If disabled, nothing will be
131   // replicated but HLogs will still be queued
132   private AtomicBoolean sourceEnabled = new AtomicBoolean();
133 
134   /**
135    * Instantiation method used by region servers
136    *
137    * @param conf configuration to use
138    * @param fs file system to use
139    * @param manager replication manager to ping to
140    * @param stopper     the atomic boolean to use to stop the regionserver
141    * @param replicating the atomic boolean that starts/stops replication
142    * @param peerClusterZnode the name of our znode
143    * @throws IOException
144    */
145   public void init(final Configuration conf,
146                    final FileSystem fs,
147                    final ReplicationSourceManager manager,
148                    final Stoppable stopper,
149                    final AtomicBoolean replicating,
150                    final String peerClusterZnode)
151       throws IOException {
152     this.stopper = stopper;
153     this.conf = conf;
154     this.replicationQueueSizeCapacity =
155         this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
156     this.replicationQueueNbCapacity =
157         this.conf.getInt("replication.source.nb.capacity", 25000);
158     this.entriesArray = new HLog.Entry[this.replicationQueueNbCapacity];
159     for (int i = 0; i < this.replicationQueueNbCapacity; i++) {
160       this.entriesArray[i] = new HLog.Entry();
161     }
162     this.maxRetriesMultiplier =
163         this.conf.getLong("replication.source.maxretriesmultiplier", 10);
164     this.queue =
165         new PriorityBlockingQueue<Path>(
166             conf.getInt("hbase.regionserver.maxlogs", 32),
167             new LogsComparator());
168     this.conn = HConnectionManager.getConnection(conf);
169     this.zkHelper = manager.getRepZkWrapper();
170     this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f);
171     this.currentPeers = new ArrayList<HServerAddress>();
172     this.random = new Random();
173     this.replicating = replicating;
174     this.manager = manager;
175     this.sleepForRetries =
176         this.conf.getLong("replication.source.sleepforretries", 1000);
177     this.fs = fs;
178     this.clusterId = Byte.valueOf(zkHelper.getClusterId());
179     this.metrics = new ReplicationSourceMetrics(peerClusterZnode);
180 
181     // Finally look if this is a recovered queue
182     this.checkIfQueueRecovered(peerClusterZnode);
183   }
184 
185   // The passed znode will be either the id of the peer cluster or
186   // the handling story of that queue in the form of id-servername-*
187   private void checkIfQueueRecovered(String peerClusterZnode) {
188     String[] parts = peerClusterZnode.split("-");
189     this.queueRecovered = parts.length != 1;
190     this.peerClusterId = this.queueRecovered ?
191         parts[0] : peerClusterZnode;
192     this.peerClusterZnode = peerClusterZnode;
193     this.deadRegionServers = new String[parts.length-1];
194     // Extract all the places where we could find the hlogs
195     for (int i = 1; i < parts.length; i++) {
196       this.deadRegionServers[i-1] = parts[i];
197     }
198   }
199 
200   /**
201    * Select a number of peers at random using the ratio. Mininum 1.
202    */
203   private void chooseSinks() throws KeeperException {
204     this.currentPeers.clear();
205     List<HServerAddress> addresses =
206         this.zkHelper.getSlavesAddresses(peerClusterId);
207     Set<HServerAddress> setOfAddr = new HashSet<HServerAddress>();
208     int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
209     LOG.info("Getting " + nbPeers +
210         " rs from peer cluster # " + peerClusterId);
211     for (int i = 0; i < nbPeers; i++) {
212       HServerAddress address;
213       // Make sure we get one address that we don't already have
214       do {
215         address = addresses.get(this.random.nextInt(addresses.size()));
216       } while (setOfAddr.contains(address));
217       LOG.info("Choosing peer " + address);
218       setOfAddr.add(address);
219     }
220     this.currentPeers.addAll(setOfAddr);
221   }
222 
223   @Override
224   public void enqueueLog(Path log) {
225     this.queue.put(log);
226     this.metrics.sizeOfLogQueue.set(queue.size());
227   }
228 
229   @Override
230   public void run() {
231     connectToPeers();
232     // We were stopped while looping to connect to sinks, just abort
233     if (this.stopper.isStopped()) {
234       return;
235     }
236     // If this is recovered, the queue is already full and the first log
237     // normally has a position (unless the RS failed between 2 logs)
238     if (this.queueRecovered) {
239       try {
240         this.position = this.zkHelper.getHLogRepPosition(
241             this.peerClusterZnode, this.queue.peek().getName());
242       } catch (KeeperException e) {
243         this.terminate("Couldn't get the position of this recovered queue " +
244             peerClusterZnode, e);
245       }
246     }
247     int sleepMultiplier = 1;
248     // Loop until we close down
249     while (!stopper.isStopped() && this.running) {
250       // Sleep until replication is enabled again
251       if (!this.replicating.get() || !this.sourceEnabled.get()) {
252         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
253           sleepMultiplier++;
254         }
255         continue;
256       }
257       // Get a new path
258       if (!getNextPath()) {
259         if (sleepForRetries("No log to process", sleepMultiplier)) {
260           sleepMultiplier++;
261         }
262         continue;
263       }
264       // Open a reader on it
265       if (!openReader(sleepMultiplier)) {
266         // Reset the sleep multiplier, else it'd be reused for the next file
267         sleepMultiplier = 1;
268         continue;
269       }
270 
271       // If we got a null reader but didn't continue, then sleep and continue
272       if (this.reader == null) {
273         if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
274           sleepMultiplier++;
275         }
276         continue;
277       }
278 
279       boolean gotIOE = false;
280       currentNbEntries = 0;
281       try {
282         if(readAllEntriesToReplicateOrNextFile()) {
283           continue;
284         }
285       } catch (IOException ioe) {
286         LOG.warn(peerClusterZnode + " Got: ", ioe);
287         gotIOE = true;
288         if (ioe.getCause() instanceof EOFException) {
289 
290           boolean considerDumping = false;
291           if (this.queueRecovered) {
292             try {
293               FileStatus stat = this.fs.getFileStatus(this.currentPath);
294               if (stat.getLen() == 0) {
295                 LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
296               }
297               considerDumping = true;
298             } catch (IOException e) {
299               LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
300             }
301           } else if (currentNbEntries != 0) {
302             LOG.warn(peerClusterZnode + " Got EOF while reading, " +
303                 "looks like this file is broken? " + currentPath);
304             considerDumping = true;
305             currentNbEntries = 0;
306           }
307 
308           if (considerDumping &&
309               sleepMultiplier == this.maxRetriesMultiplier &&
310               processEndOfFile()) {
311             continue;
312           }
313         }
314       } finally {
315         try {
316           // if current path is null, it means we processEndOfFile hence
317           if (this.currentPath != null && !gotIOE) {
318             this.position = this.reader.getPosition();
319           }
320           if (this.reader != null) {
321             this.reader.close();
322           }
323         } catch (IOException e) {
324           gotIOE = true;
325           LOG.warn("Unable to finalize the tailing of a file", e);
326         }
327       }
328 
329       // If we didn't get anything to replicate, or if we hit a IOE,
330       // wait a bit and retry.
331       // But if we need to stop, don't bother sleeping
332       if (!stopper.isStopped() && (gotIOE || currentNbEntries == 0)) {
333         this.manager.logPositionAndCleanOldLogs(this.currentPath,
334             this.peerClusterZnode, this.position, queueRecovered);
335         if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
336           sleepMultiplier++;
337         }
338         continue;
339       }
340       sleepMultiplier = 1;
341       shipEdits();
342 
343     }
344     LOG.debug("Source exiting " + peerClusterId);
345   }
346 
347   /**
348    * Read all the entries from the current log files and retain those
349    * that need to be replicated. Else, process the end of the current file.
350    * @return true if we got nothing and went to the next file, false if we got
351    * entries
352    * @throws IOException
353    */
354   protected boolean readAllEntriesToReplicateOrNextFile() throws IOException{
355     long seenEntries = 0;
356     if (this.position != 0) {
357       this.reader.seek(this.position);
358     }
359     HLog.Entry entry = this.reader.next(this.entriesArray[currentNbEntries]);
360     while (entry != null) {
361       WALEdit edit = entry.getEdit();
362       this.metrics.logEditsReadRate.inc(1);
363       seenEntries++;
364       // Remove all KVs that should not be replicated
365       removeNonReplicableEdits(edit);
366       HLogKey logKey = entry.getKey();
367       // Don't replicate catalog entries, if the WALEdit wasn't
368       // containing anything to replicate and if we're currently not set to replicate
369       if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) ||
370           Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) &&
371           edit.size() != 0 && replicating.get()) {
372         logKey.setClusterId(this.clusterId);
373         currentNbOperations += countDistinctRowKeys(edit);
374         currentNbEntries++;
375       } else {
376         this.metrics.logEditsFilteredRate.inc(1);
377       }
378       // Stop if too many entries or too big
379       if ((this.reader.getPosition() - this.position)
380           >= this.replicationQueueSizeCapacity ||
381           currentNbEntries >= this.replicationQueueNbCapacity) {
382         break;
383       }
384       entry = this.reader.next(entriesArray[currentNbEntries]);
385     }
386     LOG.debug("currentNbOperations:" + currentNbOperations +
387         " and seenEntries:" + seenEntries +
388         " and size: " + (this.reader.getPosition() - this.position));
389     // If we didn't get anything and the queue has an object, it means we
390     // hit the end of the file for sure
391     return seenEntries == 0 && processEndOfFile();
392   }
393 
394   private void connectToPeers() {
395     // Connect to peer cluster first, unless we have to stop
396     while (!this.stopper.isStopped() && this.currentPeers.size() == 0) {
397       try {
398         chooseSinks();
399         Thread.sleep(this.sleepForRetries);
400       } catch (InterruptedException e) {
401         LOG.error("Interrupted while trying to connect to sinks", e);
402       } catch (KeeperException e) {
403         LOG.error("Error talking to zookeeper, retrying", e);
404       }
405     }
406   }
407 
408   /**
409    * Poll for the next path
410    * @return true if a path was obtained, false if not
411    */
412   protected boolean getNextPath() {
413     try {
414       if (this.currentPath == null) {
415         this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
416         this.metrics.sizeOfLogQueue.set(queue.size());
417       }
418     } catch (InterruptedException e) {
419       LOG.warn("Interrupted while reading edits", e);
420     }
421     return this.currentPath != null;
422   }
423 
424   /**
425    * Open a reader on the current path
426    *
427    * @param sleepMultiplier by how many times the default sleeping time is augmented
428    * @return true if we should continue with that file, false if we are over with it
429    */
430   protected boolean openReader(int sleepMultiplier) {
431     try {
432       LOG.debug("Opening log for replication " + this.currentPath.getName() +
433           " at " + this.position);
434       try {
435        this.reader = null;
436        this.reader = HLog.getReader(this.fs, this.currentPath, this.conf);
437       } catch (FileNotFoundException fnfe) {
438         if (this.queueRecovered) {
439           // We didn't find the log in the archive directory, look if it still
440           // exists in the dead RS folder (there could be a chain of failures
441           // to look at)
442           LOG.info("NB dead servers : " + deadRegionServers.length);
443           for (int i = this.deadRegionServers.length - 1; i >= 0; i--) {
444 
445             Path deadRsDirectory =
446                 new Path(manager.getLogDir().getParent(), this.deadRegionServers[i]);
447             Path possibleLogLocation =
448                 new Path(deadRsDirectory, currentPath.getName());
449             LOG.info("Possible location " + possibleLogLocation.toUri().toString());
450             if (this.manager.getFs().exists(possibleLogLocation)) {
451               // We found the right new location
452               LOG.info("Log " + this.currentPath + " still exists at " +
453                   possibleLogLocation);
454               // Breaking here will make us sleep since reader is null
455               return true;
456             }
457           }
458           // TODO What happens if the log was missing from every single location?
459           // Although we need to check a couple of times as the log could have
460           // been moved by the master between the checks
461           // It can also happen if a recovered queue wasn't properly cleaned,
462           // such that the znode pointing to a log exists but the log was
463           // deleted a long time ago.
464           // For the moment, we'll throw the IO and processEndOfFile
465           throw new IOException("File from recovered queue is " +
466               "nowhere to be found", fnfe);
467         } else {
468           // If the log was archived, continue reading from there
469           Path archivedLogLocation =
470               new Path(manager.getOldLogDir(), currentPath.getName());
471           if (this.manager.getFs().exists(archivedLogLocation)) {
472             currentPath = archivedLogLocation;
473             LOG.info("Log " + this.currentPath + " was moved to " +
474                 archivedLogLocation);
475             // Open the log at the new location
476             this.openReader(sleepMultiplier);
477 
478           }
479           // TODO What happens the log is missing in both places?
480         }
481       }
482     } catch (IOException ioe) {
483       LOG.warn(peerClusterZnode + " Got: ", ioe);
484       // TODO Need a better way to determinate if a file is really gone but
485       // TODO without scanning all logs dir
486       if (sleepMultiplier == this.maxRetriesMultiplier) {
487         LOG.warn("Waited too long for this file, considering dumping");
488         return !processEndOfFile();
489       }
490     }
491     return true;
492   }
493 
494   /**
495    * Do the sleeping logic
496    * @param msg Why we sleep
497    * @param sleepMultiplier by how many times the default sleeping time is augmented
498    * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
499    */
500   protected boolean sleepForRetries(String msg, int sleepMultiplier) {
501     try {
502       LOG.debug(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
503       Thread.sleep(this.sleepForRetries * sleepMultiplier);
504     } catch (InterruptedException e) {
505       LOG.debug("Interrupted while sleeping between retries");
506     }
507     return sleepMultiplier < maxRetriesMultiplier;
508   }
509 
510   /**
511    * We only want KVs that are scoped other than local
512    * @param edit The KV to check for replication
513    */
514   protected void removeNonReplicableEdits(WALEdit edit) {
515     NavigableMap<byte[], Integer> scopes = edit.getScopes();
516     List<KeyValue> kvs = edit.getKeyValues();
517     for (int i = 0; i < edit.size(); i++) {
518       KeyValue kv = kvs.get(i);
519       // The scope will be null or empty if
520       // there's nothing to replicate in that WALEdit
521       if (scopes == null || !scopes.containsKey(kv.getFamily())) {
522         kvs.remove(i);
523         i--;
524       }
525     }
526   }
527 
528   /**
529    * Count the number of different row keys in the given edit because of
530    * mini-batching. We assume that there's at least one KV in the WALEdit.
531    * @param edit edit to count row keys from
532    * @return number of different row keys
533    */
534   private int countDistinctRowKeys(WALEdit edit) {
535     List<KeyValue> kvs = edit.getKeyValues();
536     int distinctRowKeys = 1;
537     KeyValue lastKV = kvs.get(0);
538     for (int i = 0; i < edit.size(); i++) {
539       if (!kvs.get(i).matchingRow(lastKV)) {
540         distinctRowKeys++;
541       }
542     }
543     return distinctRowKeys;
544   }
545 
546   /**
547    * Do the shipping logic
548    */
549   protected void shipEdits() {
550     int sleepMultiplier = 1;
551     if (this.currentNbEntries == 0) {
552       LOG.warn("Was given 0 edits to ship");
553       return;
554     }
555     while (!this.stopper.isStopped()) {
556       try {
557         HRegionInterface rrs = getRS();
558         LOG.debug("Replicating " + currentNbEntries);
559         rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries));
560         this.manager.logPositionAndCleanOldLogs(this.currentPath,
561             this.peerClusterZnode, this.position, queueRecovered);
562         this.totalReplicatedEdits += currentNbEntries;
563         this.metrics.shippedBatchesRate.inc(1);
564         this.metrics.shippedOpsRate.inc(
565             this.currentNbOperations);
566         this.metrics.setAgeOfLastShippedOp(
567             this.entriesArray[this.entriesArray.length-1].getKey().getWriteTime());
568         LOG.debug("Replicated in total: " + this.totalReplicatedEdits);
569         break;
570 
571       } catch (IOException ioe) {
572         // Didn't ship anything, but must still age the last time we did
573         this.metrics.refreshAgeOfLastShippedOp();
574         if (ioe instanceof RemoteException) {
575           ioe = ((RemoteException) ioe).unwrapRemoteException();
576           LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
577         } else {
578           LOG.warn("Can't replicate because of a local or network error: ", ioe);
579         }
580         try {
581           boolean down;
582           do {
583             down = isSlaveDown();
584             if (down) {
585               if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
586                 sleepMultiplier++;
587               } else {
588                 chooseSinks();
589               }
590             }
591           } while (!this.stopper.isStopped() && down);
592         } catch (InterruptedException e) {
593           LOG.debug("Interrupted while trying to contact the peer cluster");
594         } catch (KeeperException e) {
595           LOG.error("Error talking to zookeeper, retrying", e);
596         }
597 
598       }
599     }
600   }
601 
602   /**
603    * If the queue isn't empty, switch to the next one
604    * Else if this is a recovered queue, it means we're done!
605    * Else we'll just continue to try reading the log file
606    * @return true if we're done with the current file, false if we should
607    * continue trying to read from it
608    */
609   protected boolean processEndOfFile() {
610     if (this.queue.size() != 0) {
611       this.currentPath = null;
612       this.position = 0;
613       return true;
614     } else if (this.queueRecovered) {
615       this.manager.closeRecoveredQueue(this);
616       LOG.info("Finished recovering the queue");
617       this.running = false;
618       return true;
619     }
620     return false;
621   }
622 
623   public void startup() {
624     String n = Thread.currentThread().getName();
625     Thread.UncaughtExceptionHandler handler =
626         new Thread.UncaughtExceptionHandler() {
627           public void uncaughtException(final Thread t, final Throwable e) {
628             terminate("Uncaught exception during runtime", new Exception(e));
629           }
630         };
631     Threads.setDaemonThreadRunning(
632         this, n + ".replicationSource," + peerClusterZnode, handler);
633   }
634 
635   public void terminate(String reason) {
636     terminate(reason, null);
637   }
638 
639   public void terminate(String reason, Exception cause) {
640     if (cause == null) {
641       LOG.info("Closing source "
642           + this.peerClusterZnode + " because: " + reason);
643 
644     } else {
645       LOG.error("Closing source " + this.peerClusterZnode
646           + " because an error occurred: " + reason, cause);
647     }
648     this.running = false;
649     Threads.shutdown(this, this.sleepForRetries);
650   }
651 
652   /**
653    * Get a new region server at random from this peer
654    * @return
655    * @throws IOException
656    */
657   private HRegionInterface getRS() throws IOException {
658     if (this.currentPeers.size() == 0) {
659       throw new IOException(this.peerClusterZnode + " has 0 region servers");
660     }
661     HServerAddress address =
662         currentPeers.get(random.nextInt(this.currentPeers.size()));
663     return this.conn.getHRegionConnection(address);
664   }
665 
666   /**
667    * Check if the slave is down by trying to establish a connection
668    * @return true if down, false if up
669    * @throws InterruptedException
670    */
671   public boolean isSlaveDown() throws InterruptedException {
672     final CountDownLatch latch = new CountDownLatch(1);
673     Thread pingThread = new Thread() {
674       public void run() {
675         try {
676           HRegionInterface rrs = getRS();
677           // Dummy call which should fail
678           rrs.getHServerInfo();
679           latch.countDown();
680         } catch (IOException ex) {
681           if (ex instanceof RemoteException) {
682             ex = ((RemoteException) ex).unwrapRemoteException();
683           }
684           LOG.info("Slave cluster looks down: " + ex.getMessage());
685         }
686       }
687     };
688     pingThread.start();
689     // awaits returns true if countDown happened
690     boolean down = ! latch.await(this.sleepForRetries, TimeUnit.MILLISECONDS);
691     pingThread.interrupt();
692     return down;
693   }
694 
695   public String getPeerClusterZnode() {
696     return this.peerClusterZnode;
697   }
698 
699   public String getPeerClusterId() {
700     return this.peerClusterId;
701   }
702 
703   public Path getCurrentPath() {
704     return this.currentPath;
705   }
706 
707   public void setSourceEnabled(boolean status) {
708     this.sourceEnabled.set(status);
709   }
710 
711   /**
712    * Comparator used to compare logs together based on their start time
713    */
714   public static class LogsComparator implements Comparator<Path> {
715 
716     @Override
717     public int compare(Path o1, Path o2) {
718       return Long.valueOf(getTS(o1)).compareTo(getTS(o2));
719     }
720 
721     @Override
722     public boolean equals(Object o) {
723       return true;
724     }
725 
726     /**
727      * Split a path to get the start time
728      * For example: 10.20.20.171%3A60020.1277499063250
729      * @param p path to split
730      * @return start time
731      */
732     private long getTS(Path p) {
733       String[] parts = p.getName().split("\\.");
734       return Long.parseLong(parts[parts.length-1]);
735     }
736   }
737 }