1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.zookeeper;
21
22 import org.apache.hadoop.hbase.Abortable;
23 import org.apache.zookeeper.KeeperException;
24
25
26
27
28
29
30
31
32
33
34 public abstract class ZooKeeperNodeTracker extends ZooKeeperListener {
35
36 protected final String node;
37
38
39 private byte [] data;
40
41
42 protected final Abortable abortable;
43
44 private boolean stopped = false;
45
46
47
48
49
50
51
52
53
54
55 public ZooKeeperNodeTracker(ZooKeeperWatcher watcher, String node,
56 Abortable abortable) {
57 super(watcher);
58 this.node = node;
59 this.abortable = abortable;
60 this.data = null;
61 }
62
63
64
65
66
67
68
69 public synchronized void start() {
70 this.watcher.registerListener(this);
71 try {
72 if(ZKUtil.watchAndCheckExists(watcher, node)) {
73 byte [] data = ZKUtil.getDataAndWatch(watcher, node);
74 if(data != null) {
75 this.data = data;
76 } else {
77
78 start();
79 }
80 }
81 } catch (KeeperException e) {
82 abortable.abort("Unexpected exception during initialization, aborting", e);
83 }
84 }
85
86 public synchronized void stop() {
87 this.stopped = true;
88 notifyAll();
89 }
90
91
92
93
94
95
96
97 public synchronized byte [] blockUntilAvailable()
98 throws InterruptedException {
99 return blockUntilAvailable(0);
100 }
101
102
103
104
105
106
107
108
109
110
111 public synchronized byte [] blockUntilAvailable(long timeout)
112 throws InterruptedException {
113 if (timeout < 0) throw new IllegalArgumentException();
114 boolean notimeout = timeout == 0;
115 long startTime = System.currentTimeMillis();
116 long remaining = timeout;
117 while (!this.stopped && (notimeout || remaining > 0) && this.data == null) {
118 if (notimeout) {
119 wait();
120 continue;
121 }
122 wait(remaining);
123 remaining = timeout - (System.currentTimeMillis() - startTime);
124 }
125 return data;
126 }
127
128
129
130
131
132
133
134
135
136
137 public synchronized byte [] getData() {
138 return data;
139 }
140
141 public String getNode() {
142 return this.node;
143 }
144
145 @Override
146 public synchronized void nodeCreated(String path) {
147 if (!path.equals(node)) return;
148 try {
149 byte [] data = ZKUtil.getDataAndWatch(watcher, node);
150 if (data != null) {
151 this.data = data;
152 notifyAll();
153 } else {
154 nodeDeleted(path);
155 }
156 } catch(KeeperException e) {
157 abortable.abort("Unexpected exception handling nodeCreated event", e);
158 }
159 }
160
161 @Override
162 public synchronized void nodeDeleted(String path) {
163 if(path.equals(node)) {
164 try {
165 if(ZKUtil.watchAndCheckExists(watcher, node)) {
166 nodeCreated(path);
167 } else {
168 this.data = null;
169 }
170 } catch(KeeperException e) {
171 abortable.abort("Unexpected exception handling nodeDeleted event", e);
172 }
173 }
174 }
175
176 @Override
177 public synchronized void nodeDataChanged(String path) {
178 if(path.equals(node)) {
179 nodeCreated(path);
180 }
181 }
182 }