View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.zookeeper;
21  
22  import org.apache.hadoop.hbase.Abortable;
23  import org.apache.zookeeper.KeeperException;
24  
25  /**
26   * Tracks the availability and value of a single ZooKeeper node.
27   *
28   * <p>Utilizes the {@link ZooKeeperListener} interface to get the necessary
29   * ZooKeeper events related to the node.
30   *
31   * <p>This is the base class used by trackers in both the Master and
32   * RegionServers.
33   */
34  public abstract class ZooKeeperNodeTracker extends ZooKeeperListener {
35    /** Path of node being tracked */
36    protected final String node;
37  
38    /** Data of the node being tracked */
39    private byte [] data;
40  
41    /** Used to abort if a fatal error occurs */
42    protected final Abortable abortable;
43  
44    private boolean stopped = false;
45  
46    /**
47     * Constructs a new ZK node tracker.
48     *
49     * <p>After construction, use {@link #start} to kick off tracking.
50     *
51     * @param watcher
52     * @param node
53     * @param abortable
54     */
55    public ZooKeeperNodeTracker(ZooKeeperWatcher watcher, String node,
56        Abortable abortable) {
57      super(watcher);
58      this.node = node;
59      this.abortable = abortable;
60      this.data = null;
61    }
62  
63    /**
64     * Starts the tracking of the node in ZooKeeper.
65     *
66     * <p>Use {@link #blockUntilAvailable()} to block until the node is available
67     * or {@link #getData()} to get the data of the node if it is available.
68     */
69    public synchronized void start() {
70      this.watcher.registerListener(this);
71      try {
72        if(ZKUtil.watchAndCheckExists(watcher, node)) {
73          byte [] data = ZKUtil.getDataAndWatch(watcher, node);
74          if(data != null) {
75            this.data = data;
76          } else {
77            // It existed but now does not, try again to ensure a watch is set
78            start();
79          }
80        }
81      } catch (KeeperException e) {
82        abortable.abort("Unexpected exception during initialization, aborting", e);
83      }
84    }
85  
86    public synchronized void stop() {
87      this.stopped = true;
88      notifyAll();
89    }
90  
91    /**
92     * Gets the data of the node, blocking until the node is available.
93     *
94     * @return data of the node
95     * @throws InterruptedException if the waiting thread is interrupted
96     */
97    public synchronized byte [] blockUntilAvailable()
98    throws InterruptedException {
99      return blockUntilAvailable(0);
100   }
101 
102   /**
103    * Gets the data of the node, blocking until the node is available or the
104    * specified timeout has elapsed.
105    *
106    * @param timeout maximum time to wait for the node data to be available,
107    * n milliseconds.  Pass 0 for no timeout.
108    * @return data of the node
109    * @throws InterruptedException if the waiting thread is interrupted
110    */
111   public synchronized byte [] blockUntilAvailable(long timeout)
112   throws InterruptedException {
113     if (timeout < 0) throw new IllegalArgumentException();
114     boolean notimeout = timeout == 0;
115     long startTime = System.currentTimeMillis();
116     long remaining = timeout;
117     while (!this.stopped && (notimeout || remaining > 0) && this.data == null) {
118       if (notimeout) {
119         wait();
120         continue;
121       }
122       wait(remaining);
123       remaining = timeout - (System.currentTimeMillis() - startTime);
124     }
125     return data;
126   }
127 
128   /**
129    * Gets the data of the node.
130    *
131    * <p>If the node is currently available, the most up-to-date known version of
132    * the data is returned.  If the node is not currently available, null is
133    * returned.
134    *
135    * @return data of the node, null if unavailable
136    */
137   public synchronized byte [] getData() {
138     return data;
139   }
140 
141   public String getNode() {
142     return this.node;
143   }
144 
145   @Override
146   public synchronized void nodeCreated(String path) {
147     if (!path.equals(node)) return;
148     try {
149       byte [] data = ZKUtil.getDataAndWatch(watcher, node);
150       if (data != null) {
151         this.data = data;
152         notifyAll();
153       } else {
154         nodeDeleted(path);
155       }
156     } catch(KeeperException e) {
157       abortable.abort("Unexpected exception handling nodeCreated event", e);
158     }
159   }
160 
161   @Override
162   public synchronized void nodeDeleted(String path) {
163     if(path.equals(node)) {
164       try {
165         if(ZKUtil.watchAndCheckExists(watcher, node)) {
166           nodeCreated(path);
167         } else {
168           this.data = null;
169         }
170       } catch(KeeperException e) {
171         abortable.abort("Unexpected exception handling nodeDeleted event", e);
172       }
173     }
174   }
175 
176   @Override
177   public synchronized void nodeDataChanged(String path) {
178     if(path.equals(node)) {
179       nodeCreated(path);
180     }
181   }
182 }