View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.zookeeper;
21  
22  import java.io.IOException;
23  import java.util.HashSet;
24  import java.util.List;
25  import java.util.Set;
26  import java.util.concurrent.CopyOnWriteArrayList;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.hbase.Abortable;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
34  import org.apache.hadoop.hbase.util.Threads;
35  import org.apache.zookeeper.KeeperException;
36  import org.apache.zookeeper.WatchedEvent;
37  import org.apache.zookeeper.Watcher;
38  import org.apache.zookeeper.ZooKeeper;
39  
40  /**
41   * Acts as the single ZooKeeper Watcher.  One instance of this is instantiated
42   * for each Master, RegionServer, and client process.
43   *
44   * <p>This is the only class that implements {@link Watcher}.  Other internal
45   * classes which need to be notified of ZooKeeper events must register with
46   * the local instance of this watcher via {@link #registerListener}.
47   *
48   * <p>This class also holds and manages the connection to ZooKeeper.  Code to
49   * deal with connection related events and exceptions are handled here.
50   */
51  public class ZooKeeperWatcher implements Watcher, Abortable {
52    private static final Log LOG = LogFactory.getLog(ZooKeeperWatcher.class);
53  
54    // Identifiier for this watcher (for logging only).  Its made of the prefix
55    // passed on construction and the zookeeper sessionid.
56    private String identifier;
57  
58    // zookeeper quorum
59    private String quorum;
60  
61    // zookeeper connection
62    private ZooKeeper zooKeeper;
63  
64    // abortable in case of zk failure
65    private Abortable abortable;
66  
67    // listeners to be notified
68    private final List<ZooKeeperListener> listeners =
69      new CopyOnWriteArrayList<ZooKeeperListener>();
70  
71    // set of unassigned nodes watched
72    private Set<String> unassignedNodes = new HashSet<String>();
73  
74    // node names
75  
76    // base znode for this cluster
77    public String baseZNode;
78    // znode containing location of server hosting root region
79    public String rootServerZNode;
80    // znode containing ephemeral nodes of the regionservers
81    public String rsZNode;
82    // znode of currently active master
83    public String masterAddressZNode;
84    // znode containing the current cluster state
85    public String clusterStateZNode;
86    // znode used for region transitioning and assignment
87    public String assignmentZNode;
88    // znode used for table disabling/enabling
89    public String tableZNode;
90  
91    private final Configuration conf;
92  
93    private final Exception constructorCaller;
94  
95    /**
96     * Instantiate a ZooKeeper connection and watcher.
97     * @param descriptor Descriptive string that is added to zookeeper sessionid
98     * and used as identifier for this instance.
99     * @throws IOException
100    * @throws ZooKeeperConnectionException
101    */
102   public ZooKeeperWatcher(Configuration conf, String descriptor,
103       Abortable abortable)
104   throws IOException, ZooKeeperConnectionException {
105     this.conf = conf;
106     // Capture a stack trace now.  Will print it out later if problem so we can
107     // distingush amongst the myriad ZKWs.
108     try {
109       throw new Exception("ZKW CONSTRUCTOR STACK TRACE FOR DEBUGGING");
110     } catch (Exception e) {
111       this.constructorCaller = e;
112     }
113     this.quorum = ZKConfig.getZKQuorumServersString(conf);
114     // Identifier will get the sessionid appended later below down when we
115     // handle the syncconnect event.
116     this.identifier = descriptor;
117     this.abortable = abortable;
118     setNodeNames(conf);
119     this.zooKeeper = ZKUtil.connect(conf, quorum, this, descriptor);
120     try {
121       // Create all the necessary "directories" of znodes
122       // TODO: Move this to an init method somewhere so not everyone calls it?
123 
124       // The first call against zk can fail with connection loss.  Seems common.
125       // Apparently this is recoverable.  Retry a while.
126       // See http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling
127       // TODO: Generalize out in ZKUtil.
128       long wait = conf.getLong("hbase.zookeeper.recoverable.waittime", 10000);
129       long finished = System.currentTimeMillis() + wait;
130       KeeperException ke = null;
131       do {
132         try {
133           ZKUtil.createAndFailSilent(this, baseZNode);
134           ke = null;
135           break;
136         } catch (KeeperException.ConnectionLossException e) {
137           if (LOG.isDebugEnabled() && (isFinishedRetryingRecoverable(finished))) {
138             LOG.debug("Retrying zk create for another " +
139               (finished - System.currentTimeMillis()) +
140               "ms; set 'hbase.zookeeper.recoverable.waittime' to change " +
141               "wait time); " + e.getMessage());
142           }
143           ke = e;
144         }
145       } while (isFinishedRetryingRecoverable(finished));
146       // Convert connectionloss exception to ZKCE.
147       if (ke != null) {
148         try {
149           // If we don't close it, the zk connection managers won't be killed
150           this.zooKeeper.close();
151         } catch (InterruptedException e) {
152           Thread.currentThread().interrupt();
153           LOG.warn("Interrupted while closing", e);
154         }
155         throw new ZooKeeperConnectionException("HBase is able to connect to" +
156             " ZooKeeper but the connection closes immediately. This could be" +
157             " a sign that the server has too many connections (30 is the" +
158             " default). Consider inspecting your ZK server logs for that" +
159             " error and then make sure you are reusing HBaseConfiguration" +
160             " as often as you can. See HTable's javadoc for more information.",
161             ke);
162       }
163       ZKUtil.createAndFailSilent(this, assignmentZNode);
164       ZKUtil.createAndFailSilent(this, rsZNode);
165       ZKUtil.createAndFailSilent(this, tableZNode);
166     } catch (KeeperException e) {
167       throw new ZooKeeperConnectionException(
168           prefix("Unexpected KeeperException creating base node"), e);
169     }
170   }
171 
172   private boolean isFinishedRetryingRecoverable(final long finished) {
173     return System.currentTimeMillis() < finished;
174   }
175 
176   @Override
177   public String toString() {
178     return this.identifier;
179   }
180 
181   /**
182    * Adds this instance's identifier as a prefix to the passed <code>str</code>
183    * @param str String to amend.
184    * @return A new string with this instance's identifier as prefix: e.g.
185    * if passed 'hello world', the returned string could be
186    */
187   public String prefix(final String str) {
188     return this.toString() + " " + str;
189   }
190 
191   /**
192    * Set the local variable node names using the specified configuration.
193    */
194   private void setNodeNames(Configuration conf) {
195     baseZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
196         HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
197     rootServerZNode = ZKUtil.joinZNode(baseZNode,
198         conf.get("zookeeper.znode.rootserver", "root-region-server"));
199     rsZNode = ZKUtil.joinZNode(baseZNode,
200         conf.get("zookeeper.znode.rs", "rs"));
201     masterAddressZNode = ZKUtil.joinZNode(baseZNode,
202         conf.get("zookeeper.znode.master", "master"));
203     clusterStateZNode = ZKUtil.joinZNode(baseZNode,
204         conf.get("zookeeper.znode.state", "shutdown"));
205     assignmentZNode = ZKUtil.joinZNode(baseZNode,
206         conf.get("zookeeper.znode.unassigned", "unassigned"));
207     tableZNode = ZKUtil.joinZNode(baseZNode,
208         conf.get("zookeeper.znode.tableEnableDisable", "table"));
209   }
210 
211   /**
212    * Register the specified listener to receive ZooKeeper events.
213    * @param listener
214    */
215   public void registerListener(ZooKeeperListener listener) {
216     listeners.add(listener);
217   }
218 
219   /**
220    * Register the specified listener to receive ZooKeeper events and add it as
221    * the first in the list of current listeners.
222    * @param listener
223    */
224   public void registerListenerFirst(ZooKeeperListener listener) {
225     listeners.add(0, listener);
226   }
227 
228   /**
229    * Get the connection to ZooKeeper.
230    * @return connection reference to zookeeper
231    */
232   public ZooKeeper getZooKeeper() {
233     return zooKeeper;
234   }
235 
236   /**
237    * Get the quorum address of this instance.
238    * @return quorum string of this zookeeper connection instance
239    */
240   public String getQuorum() {
241     return quorum;
242   }
243 
244   /**
245    * Method called from ZooKeeper for events and connection status.
246    *
247    * Valid events are passed along to listeners.  Connection status changes
248    * are dealt with locally.
249    */
250   @Override
251   public void process(WatchedEvent event) {
252     LOG.debug(prefix("Received ZooKeeper Event, " +
253         "type=" + event.getType() + ", " +
254         "state=" + event.getState() + ", " +
255         "path=" + event.getPath()));
256 
257     switch(event.getType()) {
258 
259       // If event type is NONE, this is a connection status change
260       case None: {
261         connectionEvent(event);
262         break;
263       }
264 
265       // Otherwise pass along to the listeners
266 
267       case NodeCreated: {
268         for(ZooKeeperListener listener : listeners) {
269           listener.nodeCreated(event.getPath());
270         }
271         break;
272       }
273 
274       case NodeDeleted: {
275         for(ZooKeeperListener listener : listeners) {
276           listener.nodeDeleted(event.getPath());
277         }
278         break;
279       }
280 
281       case NodeDataChanged: {
282         for(ZooKeeperListener listener : listeners) {
283           listener.nodeDataChanged(event.getPath());
284         }
285         break;
286       }
287 
288       case NodeChildrenChanged: {
289         for(ZooKeeperListener listener : listeners) {
290           listener.nodeChildrenChanged(event.getPath());
291         }
292         break;
293       }
294     }
295   }
296 
297   // Connection management
298 
299   /**
300    * Called when there is a connection-related event via the Watcher callback.
301    *
302    * If Disconnected or Expired, this should shutdown the cluster. But, since
303    * we send a KeeperException.SessionExpiredException along with the abort
304    * call, it's possible for the Abortable to catch it and try to create a new
305    * session with ZooKeeper. This is what the client does in HCM.
306    *
307    * @param event
308    */
309   private void connectionEvent(WatchedEvent event) {
310     switch(event.getState()) {
311       case SyncConnected:
312         // Now, this callback can be invoked before the this.zookeeper is set.
313         // Wait a little while.
314         long finished = System.currentTimeMillis() +
315           this.conf.getLong("hbase.zookeeper.watcher.sync.connected.wait", 2000);
316         while (System.currentTimeMillis() < finished) {
317           Threads.sleep(1);
318           if (this.zooKeeper != null) break;
319         }
320         if (this.zooKeeper == null) {
321           LOG.error("ZK is null on connection event -- see stack trace " +
322             "for the stack trace when constructor was called on this zkw",
323             this.constructorCaller);
324           throw new NullPointerException("ZK is null");
325         }
326         this.identifier = this.identifier + "-0x" +
327           Long.toHexString(this.zooKeeper.getSessionId());
328         // Update our identifier.  Otherwise ignore.
329         LOG.debug(this.identifier + " connected");
330         break;
331 
332       // Abort the server if Disconnected or Expired
333       // TODO: Åny reason to handle these two differently?
334       case Disconnected:
335         LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring"));
336         break;
337 
338       case Expired:
339         String msg = prefix(this.identifier + " received expired from " +
340           "ZooKeeper, aborting");
341         // TODO: One thought is to add call to ZooKeeperListener so say,
342         // ZooKeperNodeTracker can zero out its data values.
343         if (this.abortable != null) this.abortable.abort(msg,
344             new KeeperException.SessionExpiredException());
345         break;
346     }
347   }
348 
349   /**
350    * Forces a synchronization of this ZooKeeper client connection.
351    * <p>
352    * Executing this method before running other methods will ensure that the
353    * subsequent operations are up-to-date and consistent as of the time that
354    * the sync is complete.
355    * <p>
356    * This is used for compareAndSwap type operations where we need to read the
357    * data of an existing node and delete or transition that node, utilizing the
358    * previously read version and data.  We want to ensure that the version read
359    * is up-to-date from when we begin the operation.
360    */
361   public void sync(String path) {
362     this.zooKeeper.sync(path, null, null);
363   }
364 
365   /**
366    * Get the set of already watched unassigned nodes.
367    * @return Set of Nodes.
368    */
369   public Set<String> getNodes() {
370     return unassignedNodes;
371   }
372 
373   /**
374    * Handles KeeperExceptions in client calls.
375    *
376    * This may be temporary but for now this gives one place to deal with these.
377    *
378    * TODO: Currently this method rethrows the exception to let the caller handle
379    *
380    * @param ke
381    * @throws KeeperException
382    */
383   public void keeperException(KeeperException ke)
384   throws KeeperException {
385     LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke);
386     throw ke;
387   }
388 
389   /**
390    * Handles InterruptedExceptions in client calls.
391    *
392    * This may be temporary but for now this gives one place to deal with these.
393    *
394    * TODO: Currently, this method does nothing.
395    *       Is this ever expected to happen?  Do we abort or can we let it run?
396    *       Maybe this should be logged as WARN?  It shouldn't happen?
397    *
398    * @param ie
399    */
400   public void interruptedException(InterruptedException ie) {
401     LOG.debug(prefix("Received InterruptedException, doing nothing here"), ie);
402     // At least preserver interrupt.
403     Thread.currentThread().interrupt();
404     // no-op
405   }
406 
407   /**
408    * Close the connection to ZooKeeper.
409    * @throws InterruptedException
410    */
411   public void close() {
412     try {
413       if (zooKeeper != null) {
414         zooKeeper.close();
415 //        super.close();
416       }
417     } catch (InterruptedException e) {
418     }
419   }
420 
421   @Override
422   public void abort(String why, Throwable e) {
423     this.abortable.abort(why, e);
424   }
425 }