View Javadoc

1   /*
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.replication.regionserver;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.SortedMap;
28  import java.util.SortedSet;
29  import java.util.TreeSet;
30  import java.util.concurrent.LinkedBlockingQueue;
31  import java.util.concurrent.RejectedExecutionException;
32  import java.util.concurrent.ThreadPoolExecutor;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicBoolean;
35  
36  import com.google.common.util.concurrent.ThreadFactoryBuilder;
37  import org.apache.commons.logging.Log;
38  import org.apache.commons.logging.LogFactory;
39  import org.apache.hadoop.conf.Configuration;
40  import org.apache.hadoop.fs.FileSystem;
41  import org.apache.hadoop.fs.Path;
42  import org.apache.hadoop.hbase.Stoppable;
43  import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
44  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
45  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
46  import org.apache.zookeeper.KeeperException;
47  
48  /**
49   * This class is responsible to manage all the replication
50   * sources. There are two classes of sources:
51   * <li> Normal sources are persistent and one per peer cluster</li>
52   * <li> Old sources are recovered from a failed region server and our
53   * only goal is to finish replicating the HLog queue it had up in ZK</li>
54   *
55   * When a region server dies, this class uses a watcher to get notified and it
56   * tries to grab a lock in order to transfer all the queues in a local
57   * old source.
58   */
59  public class ReplicationSourceManager {
60    private static final Log LOG =
61        LogFactory.getLog(ReplicationSourceManager.class);
62    // List of all the sources that read this RS's logs
63    private final List<ReplicationSourceInterface> sources;
64    // List of all the sources we got from died RSs
65    private final List<ReplicationSourceInterface> oldsources;
66    // Indicates if we are currently replicating
67    private final AtomicBoolean replicating;
68    // Helper for zookeeper
69    private final ReplicationZookeeper zkHelper;
70    // All about stopping
71    private final Stoppable stopper;
72    // All logs we are currently trackign
73    private final SortedSet<String> hlogs;
74    private final Configuration conf;
75    private final FileSystem fs;
76    // The path to the latest log we saw, for new coming sources
77    private Path latestPath;
78    // List of all the other region servers in this cluster
79    private final List<String> otherRegionServers;
80    // Path to the hlogs directories
81    private final Path logDir;
82    // Path to the hlog archive
83    private final Path oldLogDir;
84    // The number of ms that we wait before moving znodes, HBASE-3596
85    private final long sleepBeforeFailover;
86    // Homemade executer service for replication
87    private final ThreadPoolExecutor executor;
88  
89    /**
90     * Creates a replication manager and sets the watch on all the other
91     * registered region servers
92     * @param zkHelper the zk helper for replication
93     * @param conf the configuration to use
94     * @param stopper the stopper object for this region server
95     * @param fs the file system to use
96     * @param replicating the status of the replication on this cluster
97     * @param logDir the directory that contains all hlog directories of live RSs
98     * @param oldLogDir the directory where old logs are archived
99     */
100   public ReplicationSourceManager(final ReplicationZookeeper zkHelper,
101                                   final Configuration conf,
102                                   final Stoppable stopper,
103                                   final FileSystem fs,
104                                   final AtomicBoolean replicating,
105                                   final Path logDir,
106                                   final Path oldLogDir) {
107     this.sources = new ArrayList<ReplicationSourceInterface>();
108     this.replicating = replicating;
109     this.zkHelper = zkHelper;
110     this.stopper = stopper;
111     this.hlogs = new TreeSet<String>();
112     this.oldsources = new ArrayList<ReplicationSourceInterface>();
113     this.conf = conf;
114     this.fs = fs;
115     this.logDir = logDir;
116     this.oldLogDir = oldLogDir;
117     this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000);
118     this.zkHelper.registerRegionServerListener(
119         new OtherRegionServerWatcher(this.zkHelper.getZookeeperWatcher()));
120     List<String> otherRSs =
121         this.zkHelper.getRegisteredRegionServers();
122     this.zkHelper.registerRegionServerListener(
123         new PeersWatcher(this.zkHelper.getZookeeperWatcher()));
124     this.zkHelper.listPeersIdsAndWatch();
125     this.otherRegionServers = otherRSs == null ? new ArrayList<String>() : otherRSs;
126     // It's preferable to failover 1 RS at a time, but with good zk servers
127     // more could be processed at the same time.
128     int nbWorkers = conf.getInt("replication.executor.workers", 1);
129     // use a short 100ms sleep since this could be done inline with a RS startup
130     // even if we fail, other region servers can take care of it
131     this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers,
132         100, TimeUnit.MILLISECONDS,
133         new LinkedBlockingQueue<Runnable>());
134     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
135     tfb.setNameFormat("ReplicationExecutor-%d");
136     this.executor.setThreadFactory(tfb.build());
137   }
138 
139   /**
140    * Provide the id of the peer and a log key and this method will figure which
141    * hlog it belongs to and will log, for this region server, the current
142    * position. It will also clean old logs from the queue.
143    * @param log Path to the log currently being replicated from
144    * replication status in zookeeper. It will also delete older entries.
145    * @param id id of the peer cluster
146    * @param position current location in the log
147    * @param queueRecovered indicates if this queue comes from another region server
148    */
149   public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) {
150     String key = log.getName();
151     LOG.info("Going to report log #" + key + " for position " + position + " in " + log);
152     this.zkHelper.writeReplicationStatus(key.toString(), id, position);
153     synchronized (this.hlogs) {
154       if (!queueRecovered && this.hlogs.first() != key) {
155         SortedSet<String> hlogSet = this.hlogs.headSet(key);
156         LOG.info("Removing " + hlogSet.size() +
157             " logs in the list: " + hlogSet);
158         for (String hlog : hlogSet) {
159           this.zkHelper.removeLogFromList(hlog.toString(), id);
160         }
161         hlogSet.clear();
162       }
163     }
164   }
165 
166   /**
167    * Adds a normal source per registered peer cluster and tries to process all
168    * old region server hlog queues
169    */
170   public void init() throws IOException {
171     for (String id : this.zkHelper.getPeerClusters().keySet()) {
172       addSource(id);
173     }
174     List<String> currentReplicators = this.zkHelper.getListOfReplicators();
175     if (currentReplicators == null || currentReplicators.size() == 0) {
176       return;
177     }
178     synchronized (otherRegionServers) {
179       LOG.info("Current list of replicators: " + currentReplicators
180           + " other RSs: " + otherRegionServers);
181     }
182     // Look if there's anything to process after a restart
183     for (String rs : currentReplicators) {
184       synchronized (otherRegionServers) {
185         if (!this.otherRegionServers.contains(rs)) {
186           transferQueues(rs);
187         }
188       }
189     }
190   }
191 
192   /**
193    * Add a new normal source to this region server
194    * @param id the id of the peer cluster
195    * @return the source that was created
196    * @throws IOException
197    */
198   public ReplicationSourceInterface addSource(String id) throws IOException {
199     ReplicationSourceInterface src =
200         getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
201     // TODO set it to what's in ZK
202     src.setSourceEnabled(true);
203     synchronized (this.hlogs) {
204       this.sources.add(src);
205       if (this.hlogs.size() > 0) {
206         // Add the latest hlog to that source's queue
207         this.zkHelper.addLogToList(this.hlogs.last(),
208             this.sources.get(0).getPeerClusterZnode());
209         src.enqueueLog(this.latestPath);
210       }
211     }
212     src.startup();
213     return src;
214   }
215 
216   /**
217    * Terminate the replication on this region server
218    */
219   public void join() {
220     this.executor.shutdown();
221     if (this.sources.size() == 0) {
222       this.zkHelper.deleteOwnRSZNode();
223     }
224     for (ReplicationSourceInterface source : this.sources) {
225       source.terminate("Region server is closing");
226     }
227   }
228 
229   /**
230    * Get a copy of the hlogs of the first source on this rs
231    * @return a sorted set of hlog names
232    */
233   protected SortedSet<String> getHLogs() {
234     return new TreeSet<String>(this.hlogs);
235   }
236 
237   /**
238    * Get a list of all the normal sources of this rs
239    * @return lis of all sources
240    */
241   public List<ReplicationSourceInterface> getSources() {
242     return this.sources;
243   }
244 
245   void logRolled(Path newLog) {
246     if (!this.replicating.get()) {
247       LOG.warn("Replication stopped, won't add new log");
248       return;
249     }
250 
251     synchronized (this.hlogs) {
252       if (this.sources.size() > 0) {
253         this.zkHelper.addLogToList(newLog.getName(),
254             this.sources.get(0).getPeerClusterZnode());
255       } else {
256         // If there's no slaves, don't need to keep the old hlogs since
257         // we only consider the last one when a new slave comes in
258         this.hlogs.clear();
259       }
260       this.hlogs.add(newLog.getName());
261     }
262     this.latestPath = newLog;
263     // This only update the sources we own, not the recovered ones
264     for (ReplicationSourceInterface source : this.sources) {
265       source.enqueueLog(newLog);
266     }
267   }
268 
269   /**
270    * Get the ZK help of this manager
271    * @return the helper
272    */
273   public ReplicationZookeeper getRepZkWrapper() {
274     return zkHelper;
275   }
276 
277   /**
278    * Factory method to create a replication source
279    * @param conf the configuration to use
280    * @param fs the file system to use
281    * @param manager the manager to use
282    * @param stopper the stopper object for this region server
283    * @param replicating the status of the replication on this cluster
284    * @param peerClusterId the id of the peer cluster
285    * @return the created source
286    * @throws IOException
287    */
288   public ReplicationSourceInterface getReplicationSource(
289       final Configuration conf,
290       final FileSystem fs,
291       final ReplicationSourceManager manager,
292       final Stoppable stopper,
293       final AtomicBoolean replicating,
294       final String peerClusterId) throws IOException {
295     ReplicationSourceInterface src;
296     try {
297       @SuppressWarnings("rawtypes")
298       Class c = Class.forName(conf.get("replication.replicationsource.implementation",
299           ReplicationSource.class.getCanonicalName()));
300       src = (ReplicationSourceInterface) c.newInstance();
301     } catch (Exception e) {
302       LOG.warn("Passed replication source implemention throws errors, " +
303           "defaulting to ReplicationSource", e);
304       src = new ReplicationSource();
305 
306     }
307     src.init(conf, fs, manager, stopper, replicating, peerClusterId);
308     return src;
309   }
310 
311   /**
312    * Transfer all the queues of the specified to this region server.
313    * First it tries to grab a lock and if it works it will move the
314    * znodes and finally will delete the old znodes.
315    *
316    * It creates one old source for any type of source of the old rs.
317    * @param rsZnode
318    */
319   public void transferQueues(String rsZnode) {
320     NodeFailoverWorker transfer = new NodeFailoverWorker(rsZnode);
321     try {
322       this.executor.execute(transfer);
323     } catch (RejectedExecutionException ex) {
324       LOG.info("Cancelling the transfer of " + rsZnode +
325           " because of " + ex.getMessage());
326     }
327   }
328 
329   /**
330    * Clear the references to the specified old source
331    * @param src source to clear
332    */
333   public void closeRecoveredQueue(ReplicationSourceInterface src) {
334     LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
335     this.oldsources.remove(src);
336     this.zkHelper.deleteSource(src.getPeerClusterZnode(), false);
337   }
338 
339   /**
340    * Thie method first deletes all the recovered sources for the specified
341    * id, then deletes the normal source (deleting all related data in ZK).
342    * @param id The id of the peer cluster
343    */
344   public void removePeer(String id) {
345     LOG.info("Closing the following queue " + id + ", currently have "
346         + sources.size() + " and another "
347         + oldsources.size() + " that were recovered");
348     ReplicationSourceInterface srcToRemove = null;
349     List<ReplicationSourceInterface> oldSourcesToDelete =
350         new ArrayList<ReplicationSourceInterface>();
351     // First close all the recovered sources for this peer
352     for (ReplicationSourceInterface src : oldsources) {
353       if (id.equals(src.getPeerClusterId())) {
354         oldSourcesToDelete.add(src);
355       }
356     }
357     for (ReplicationSourceInterface src : oldSourcesToDelete) {
358       closeRecoveredQueue((src));
359     }
360     LOG.info("Number of deleted recovered sources for " + id + ": "
361         + oldSourcesToDelete.size());
362     // Now look for the one on this cluster
363     for (ReplicationSourceInterface src : this.sources) {
364       if (id.equals(src.getPeerClusterId())) {
365         srcToRemove = src;
366         break;
367       }
368     }
369     if (srcToRemove == null) {
370       LOG.error("The queue we wanted to close is missing " + id);
371       return;
372     }
373     srcToRemove.terminate("Replication stream was removed by a user");
374     this.sources.remove(srcToRemove);
375     this.zkHelper.deleteSource(id, true);
376   }
377 
378   /**
379    * Watcher used to be notified of the other region server's death
380    * in the local cluster. It initiates the process to transfer the queues
381    * if it is able to grab the lock.
382    */
383   public class OtherRegionServerWatcher extends ZooKeeperListener {
384 
385     /**
386      * Construct a ZooKeeper event listener.
387      */
388     public OtherRegionServerWatcher(ZooKeeperWatcher watcher) {
389       super(watcher);
390     }
391 
392     /**
393      * Called when a new node has been created.
394      * @param path full path of the new node
395      */
396     public void nodeCreated(String path) {
397       refreshRegionServersList(path);
398     }
399 
400     /**
401      * Called when a node has been deleted
402      * @param path full path of the deleted node
403      */
404     public void nodeDeleted(String path) {
405       if (stopper.isStopped()) {
406         return;
407       }
408       boolean cont = refreshRegionServersList(path);
409       if (!cont) {
410         return;
411       }
412       LOG.info(path + " znode expired, trying to lock it");
413       transferQueues(zkHelper.getZNodeName(path));
414     }
415 
416     /**
417      * Called when an existing node has a child node added or removed.
418      * @param path full path of the node whose children have changed
419      */
420     public void nodeChildrenChanged(String path) {
421       if (stopper.isStopped()) {
422         return;
423       }
424       refreshRegionServersList(path);
425     }
426 
427     private boolean refreshRegionServersList(String path) {
428       if (!path.startsWith(zkHelper.getZookeeperWatcher().rsZNode)) {
429         return false;
430       }
431       List<String> newRsList = (zkHelper.getRegisteredRegionServers());
432       if (newRsList == null) {
433         return false;
434       } else {
435         synchronized (otherRegionServers) {
436           otherRegionServers.clear();
437           otherRegionServers.addAll(newRsList);
438         }
439       }
440       return true;
441     }
442   }
443 
444   /**
445    * Watcher used to follow the creation and deletion of peer clusters.
446    */
447   public class PeersWatcher extends ZooKeeperListener {
448 
449     /**
450      * Construct a ZooKeeper event listener.
451      */
452     public PeersWatcher(ZooKeeperWatcher watcher) {
453       super(watcher);
454     }
455 
456     /**
457      * Called when a node has been deleted
458      * @param path full path of the deleted node
459      */
460     public void nodeDeleted(String path) {
461       List<String> peers = refreshPeersList(path);
462       if (peers == null) {
463         return;
464       }
465       String id = zkHelper.getZNodeName(path);
466       removePeer(id);
467     }
468 
469     /**
470      * Called when an existing node has a child node added or removed.
471      * @param path full path of the node whose children have changed
472      */
473     public void nodeChildrenChanged(String path) {
474       List<String> peers = refreshPeersList(path);
475       if (peers == null) {
476         return;
477       }
478       for (String id : peers) {
479         try {
480           boolean added = zkHelper.connectToPeer(id);
481           if (added) {
482             addSource(id);
483           }
484         } catch (IOException e) {
485           // TODO manage better than that ?
486           LOG.error("Error while adding a new peer", e);
487         } catch (KeeperException e) {
488           LOG.error("Error while adding a new peer", e);
489         }
490       }
491     }
492 
493     /**
494      * Verify if this event is meant for us, and if so then get the latest
495      * peers' list from ZK. Also reset the watches.
496      * @param path path to check against
497      * @return A list of peers' identifiers if the event concerns this watcher,
498      * else null.
499      */
500     private List<String> refreshPeersList(String path) {
501       if (!path.startsWith(zkHelper.getPeersZNode())) {
502         return null;
503       }
504       return zkHelper.listPeersIdsAndWatch();
505     }
506   }
507 
508   /**
509    * Class responsible to setup new ReplicationSources to take care of the
510    * queues from dead region servers.
511    */
512   class NodeFailoverWorker extends Thread {
513 
514     private String rsZnode;
515 
516     /**
517      *
518      * @param rsZnode
519      */
520     public NodeFailoverWorker(String rsZnode) {
521       super("Failover-for-"+rsZnode);
522       this.rsZnode = rsZnode;
523     }
524 
525     @Override
526     public void run() {
527       // Wait a bit before transferring the queues, we may be shutting down.
528       // This sleep may not be enough in some cases.
529       try {
530         Thread.sleep(sleepBeforeFailover);
531       } catch (InterruptedException e) {
532         LOG.warn("Interrupted while waiting before transferring a queue.");
533         Thread.currentThread().interrupt();
534       }
535       // We try to lock that rs' queue directory
536       if (stopper.isStopped()) {
537         LOG.info("Not transferring queue since we are shutting down");
538         return;
539       }
540       if (!zkHelper.lockOtherRS(rsZnode)) {
541         return;
542       }
543       LOG.info("Moving " + rsZnode + "'s hlogs to my queue");
544       SortedMap<String, SortedSet<String>> newQueues =
545           zkHelper.copyQueuesFromRS(rsZnode);
546       zkHelper.deleteRsQueues(rsZnode);
547       if (newQueues == null || newQueues.size() == 0) {
548         return;
549       }
550 
551       for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
552         String peerId = entry.getKey();
553         try {
554           ReplicationSourceInterface src = getReplicationSource(conf,
555               fs, ReplicationSourceManager.this, stopper, replicating, peerId);
556           if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) {
557             src.terminate("Recovered queue doesn't belong to any current peer");
558             break;
559           }
560           oldsources.add(src);
561           for (String hlog : entry.getValue()) {
562             src.enqueueLog(new Path(oldLogDir, hlog));
563           }
564           // TODO set it to what's in ZK
565           src.setSourceEnabled(true);
566           src.startup();
567         } catch (IOException e) {
568           // TODO manage it
569           LOG.error("Failed creating a source", e);
570         }
571       }
572     }
573   }
574 
575   /**
576    * Get the directory where hlogs are archived
577    * @return the directory where hlogs are archived
578    */
579   public Path getOldLogDir() {
580     return this.oldLogDir;
581   }
582 
583   /**
584    * Get the directory where hlogs are stored by their RSs
585    * @return the directory where hlogs are stored by their RSs
586    */
587   public Path getLogDir() {
588     return this.logDir;
589   }
590 
591   /**
592    * Get the handle on the local file system
593    * @return Handle on the local file system
594    */
595   public FileSystem getFs() {
596     return this.fs;
597   }
598 }