1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.zookeeper;
21
22 import java.io.IOException;
23 import java.util.HashSet;
24 import java.util.List;
25 import java.util.Set;
26 import java.util.concurrent.CopyOnWriteArrayList;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.Abortable;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
34 import org.apache.hadoop.hbase.util.Threads;
35 import org.apache.zookeeper.KeeperException;
36 import org.apache.zookeeper.WatchedEvent;
37 import org.apache.zookeeper.Watcher;
38 import org.apache.zookeeper.ZooKeeper;
39
40
41
42
43
44
45
46
47
48
49
50
51 public class ZooKeeperWatcher implements Watcher, Abortable {
52 private static final Log LOG = LogFactory.getLog(ZooKeeperWatcher.class);
53
54
55
56 private String identifier;
57
58
59 private String quorum;
60
61
62 private ZooKeeper zooKeeper;
63
64
65 private Abortable abortable;
66
67
68 private final List<ZooKeeperListener> listeners =
69 new CopyOnWriteArrayList<ZooKeeperListener>();
70
71
72 private Set<String> unassignedNodes = new HashSet<String>();
73
74
75
76
77 public String baseZNode;
78
79 public String rootServerZNode;
80
81 public String rsZNode;
82
83 public String masterAddressZNode;
84
85 public String clusterStateZNode;
86
87 public String assignmentZNode;
88
89 public String tableZNode;
90
91 private final Configuration conf;
92
93 private final Exception constructorCaller;
94
95
96
97
98
99
100
101
102 public ZooKeeperWatcher(Configuration conf, String descriptor,
103 Abortable abortable)
104 throws IOException, ZooKeeperConnectionException {
105 this.conf = conf;
106
107
108 try {
109 throw new Exception("ZKW CONSTRUCTOR STACK TRACE FOR DEBUGGING");
110 } catch (Exception e) {
111 this.constructorCaller = e;
112 }
113 this.quorum = ZKConfig.getZKQuorumServersString(conf);
114
115
116 this.identifier = descriptor;
117 this.abortable = abortable;
118 setNodeNames(conf);
119 this.zooKeeper = ZKUtil.connect(conf, quorum, this, descriptor);
120 try {
121
122
123
124
125
126
127
128 long wait = conf.getLong("hbase.zookeeper.recoverable.waittime", 10000);
129 long finished = System.currentTimeMillis() + wait;
130 KeeperException ke = null;
131 do {
132 try {
133 ZKUtil.createAndFailSilent(this, baseZNode);
134 ke = null;
135 break;
136 } catch (KeeperException.ConnectionLossException e) {
137 if (LOG.isDebugEnabled() && (isFinishedRetryingRecoverable(finished))) {
138 LOG.debug("Retrying zk create for another " +
139 (finished - System.currentTimeMillis()) +
140 "ms; set 'hbase.zookeeper.recoverable.waittime' to change " +
141 "wait time); " + e.getMessage());
142 }
143 ke = e;
144 }
145 } while (isFinishedRetryingRecoverable(finished));
146
147 if (ke != null) {
148 try {
149
150 this.zooKeeper.close();
151 } catch (InterruptedException e) {
152 Thread.currentThread().interrupt();
153 LOG.warn("Interrupted while closing", e);
154 }
155 throw new ZooKeeperConnectionException("HBase is able to connect to" +
156 " ZooKeeper but the connection closes immediately. This could be" +
157 " a sign that the server has too many connections (30 is the" +
158 " default). Consider inspecting your ZK server logs for that" +
159 " error and then make sure you are reusing HBaseConfiguration" +
160 " as often as you can. See HTable's javadoc for more information.",
161 ke);
162 }
163 ZKUtil.createAndFailSilent(this, assignmentZNode);
164 ZKUtil.createAndFailSilent(this, rsZNode);
165 ZKUtil.createAndFailSilent(this, tableZNode);
166 } catch (KeeperException e) {
167 throw new ZooKeeperConnectionException(
168 prefix("Unexpected KeeperException creating base node"), e);
169 }
170 }
171
172 private boolean isFinishedRetryingRecoverable(final long finished) {
173 return System.currentTimeMillis() < finished;
174 }
175
176 @Override
177 public String toString() {
178 return this.identifier;
179 }
180
181
182
183
184
185
186
187 public String prefix(final String str) {
188 return this.toString() + " " + str;
189 }
190
191
192
193
194 private void setNodeNames(Configuration conf) {
195 baseZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
196 HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
197 rootServerZNode = ZKUtil.joinZNode(baseZNode,
198 conf.get("zookeeper.znode.rootserver", "root-region-server"));
199 rsZNode = ZKUtil.joinZNode(baseZNode,
200 conf.get("zookeeper.znode.rs", "rs"));
201 masterAddressZNode = ZKUtil.joinZNode(baseZNode,
202 conf.get("zookeeper.znode.master", "master"));
203 clusterStateZNode = ZKUtil.joinZNode(baseZNode,
204 conf.get("zookeeper.znode.state", "shutdown"));
205 assignmentZNode = ZKUtil.joinZNode(baseZNode,
206 conf.get("zookeeper.znode.unassigned", "unassigned"));
207 tableZNode = ZKUtil.joinZNode(baseZNode,
208 conf.get("zookeeper.znode.tableEnableDisable", "table"));
209 }
210
211
212
213
214
215 public void registerListener(ZooKeeperListener listener) {
216 listeners.add(listener);
217 }
218
219
220
221
222
223
224 public void registerListenerFirst(ZooKeeperListener listener) {
225 listeners.add(0, listener);
226 }
227
228
229
230
231
232 public ZooKeeper getZooKeeper() {
233 return zooKeeper;
234 }
235
236
237
238
239
240 public String getQuorum() {
241 return quorum;
242 }
243
244
245
246
247
248
249
250 @Override
251 public void process(WatchedEvent event) {
252 LOG.debug(prefix("Received ZooKeeper Event, " +
253 "type=" + event.getType() + ", " +
254 "state=" + event.getState() + ", " +
255 "path=" + event.getPath()));
256
257 switch(event.getType()) {
258
259
260 case None: {
261 connectionEvent(event);
262 break;
263 }
264
265
266
267 case NodeCreated: {
268 for(ZooKeeperListener listener : listeners) {
269 listener.nodeCreated(event.getPath());
270 }
271 break;
272 }
273
274 case NodeDeleted: {
275 for(ZooKeeperListener listener : listeners) {
276 listener.nodeDeleted(event.getPath());
277 }
278 break;
279 }
280
281 case NodeDataChanged: {
282 for(ZooKeeperListener listener : listeners) {
283 listener.nodeDataChanged(event.getPath());
284 }
285 break;
286 }
287
288 case NodeChildrenChanged: {
289 for(ZooKeeperListener listener : listeners) {
290 listener.nodeChildrenChanged(event.getPath());
291 }
292 break;
293 }
294 }
295 }
296
297
298
299
300
301
302
303
304
305
306
307
308
309 private void connectionEvent(WatchedEvent event) {
310 switch(event.getState()) {
311 case SyncConnected:
312
313
314 long finished = System.currentTimeMillis() +
315 this.conf.getLong("hbase.zookeeper.watcher.sync.connected.wait", 2000);
316 while (System.currentTimeMillis() < finished) {
317 Threads.sleep(1);
318 if (this.zooKeeper != null) break;
319 }
320 if (this.zooKeeper == null) {
321 LOG.error("ZK is null on connection event -- see stack trace " +
322 "for the stack trace when constructor was called on this zkw",
323 this.constructorCaller);
324 throw new NullPointerException("ZK is null");
325 }
326 this.identifier = this.identifier + "-0x" +
327 Long.toHexString(this.zooKeeper.getSessionId());
328
329 LOG.debug(this.identifier + " connected");
330 break;
331
332
333
334 case Disconnected:
335 LOG.debug(prefix("Received Disconnected from ZooKeeper, ignoring"));
336 break;
337
338 case Expired:
339 String msg = prefix(this.identifier + " received expired from " +
340 "ZooKeeper, aborting");
341
342
343 if (this.abortable != null) this.abortable.abort(msg,
344 new KeeperException.SessionExpiredException());
345 break;
346 }
347 }
348
349
350
351
352
353
354
355
356
357
358
359
360
361 public void sync(String path) {
362 this.zooKeeper.sync(path, null, null);
363 }
364
365
366
367
368
369 public Set<String> getNodes() {
370 return unassignedNodes;
371 }
372
373
374
375
376
377
378
379
380
381
382
383 public void keeperException(KeeperException ke)
384 throws KeeperException {
385 LOG.error(prefix("Received unexpected KeeperException, re-throwing exception"), ke);
386 throw ke;
387 }
388
389
390
391
392
393
394
395
396
397
398
399
400 public void interruptedException(InterruptedException ie) {
401 LOG.debug(prefix("Received InterruptedException, doing nothing here"), ie);
402
403 Thread.currentThread().interrupt();
404
405 }
406
407
408
409
410
411 public void close() {
412 try {
413 if (zooKeeper != null) {
414 zooKeeper.close();
415
416 }
417 } catch (InterruptedException e) {
418 }
419 }
420
421 @Override
422 public void abort(String why, Throwable e) {
423 this.abortable.abort(why, e);
424 }
425 }