1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24
25 import java.util.ConcurrentModificationException;
26 import java.util.HashMap;
27 import java.util.Map;
28 import java.util.concurrent.Delayed;
29 import java.util.concurrent.DelayQueue;
30 import java.util.concurrent.TimeUnit;
31
32 import java.io.IOException;
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 public class Leases extends Thread {
54 private static final Log LOG = LogFactory.getLog(Leases.class.getName());
55 private final int leasePeriod;
56 private final int leaseCheckFrequency;
57 private volatile DelayQueue<Lease> leaseQueue = new DelayQueue<Lease>();
58 protected final Map<String, Lease> leases = new HashMap<String, Lease>();
59 private volatile boolean stopRequested = false;
60
61
62
63
64
65
66
67
68 public Leases(final int leasePeriod, final int leaseCheckFrequency) {
69 this.leasePeriod = leasePeriod;
70 this.leaseCheckFrequency = leaseCheckFrequency;
71 setDaemon(true);
72 }
73
74
75
76
77 @Override
78 public void run() {
79 while (!stopRequested || (stopRequested && leaseQueue.size() > 0) ) {
80 Lease lease = null;
81 try {
82 lease = leaseQueue.poll(leaseCheckFrequency, TimeUnit.MILLISECONDS);
83 } catch (InterruptedException e) {
84 continue;
85 } catch (ConcurrentModificationException e) {
86 continue;
87 } catch (Throwable e) {
88 LOG.fatal("Unexpected exception killed leases thread", e);
89 break;
90 }
91 if (lease == null) {
92 continue;
93 }
94
95
96 if (lease.getListener() == null) {
97 LOG.error("lease listener is null for lease " + lease.getLeaseName());
98 } else {
99 lease.getListener().leaseExpired();
100 }
101 synchronized (leaseQueue) {
102 leases.remove(lease.getLeaseName());
103 }
104 }
105 close();
106 }
107
108
109
110
111
112
113
114
115 public void closeAfterLeasesExpire() {
116 this.stopRequested = true;
117 }
118
119
120
121
122
123 public void close() {
124 LOG.info(Thread.currentThread().getName() + " closing leases");
125 this.stopRequested = true;
126 synchronized (leaseQueue) {
127 leaseQueue.clear();
128 leases.clear();
129 leaseQueue.notifyAll();
130 }
131 LOG.info(Thread.currentThread().getName() + " closed leases");
132 }
133
134
135
136
137
138
139
140
141 public void createLease(String leaseName, final LeaseListener listener)
142 throws LeaseStillHeldException {
143 addLease(new Lease(leaseName, listener));
144 }
145
146
147
148
149
150
151 public void addLease(final Lease lease) throws LeaseStillHeldException {
152 if (this.stopRequested) {
153 return;
154 }
155 lease.setExpirationTime(System.currentTimeMillis() + this.leasePeriod);
156 synchronized (leaseQueue) {
157 if (leases.containsKey(lease.getLeaseName())) {
158 throw new LeaseStillHeldException(lease.getLeaseName());
159 }
160 leases.put(lease.getLeaseName(), lease);
161 leaseQueue.add(lease);
162 }
163 }
164
165
166
167
168
169 @SuppressWarnings("serial")
170 public static class LeaseStillHeldException extends IOException {
171 private final String leaseName;
172
173
174
175
176 public LeaseStillHeldException(final String name) {
177 this.leaseName = name;
178 }
179
180
181 public String getName() {
182 return this.leaseName;
183 }
184 }
185
186
187
188
189
190
191
192 public void renewLease(final String leaseName) throws LeaseException {
193 synchronized (leaseQueue) {
194 Lease lease = leases.get(leaseName);
195
196
197
198 if (lease == null || !leaseQueue.remove(lease)) {
199 throw new LeaseException("lease '" + leaseName +
200 "' does not exist or has already expired");
201 }
202 lease.setExpirationTime(System.currentTimeMillis() + leasePeriod);
203 leaseQueue.add(lease);
204 }
205 }
206
207
208
209
210
211
212 public void cancelLease(final String leaseName) throws LeaseException {
213 removeLease(leaseName);
214 }
215
216
217
218
219
220
221
222
223
224
225 Lease removeLease(final String leaseName) throws LeaseException {
226 Lease lease = null;
227 synchronized (leaseQueue) {
228 lease = leases.remove(leaseName);
229 if (lease == null) {
230 throw new LeaseException("lease '" + leaseName + "' does not exist");
231 }
232 leaseQueue.remove(lease);
233 }
234 return lease;
235 }
236
237
238 static class Lease implements Delayed {
239 private final String leaseName;
240 private final LeaseListener listener;
241 private long expirationTime;
242
243 Lease(final String leaseName, LeaseListener listener) {
244 this(leaseName, listener, 0);
245 }
246
247 Lease(final String leaseName, LeaseListener listener, long expirationTime) {
248 this.leaseName = leaseName;
249 this.listener = listener;
250 this.expirationTime = expirationTime;
251 }
252
253
254 public String getLeaseName() {
255 return leaseName;
256 }
257
258
259 public LeaseListener getListener() {
260 return this.listener;
261 }
262
263 @Override
264 public boolean equals(Object obj) {
265 if (this == obj) {
266 return true;
267 }
268 if (obj == null) {
269 return false;
270 }
271 if (getClass() != obj.getClass()) {
272 return false;
273 }
274 return this.hashCode() == ((Lease) obj).hashCode();
275 }
276
277 @Override
278 public int hashCode() {
279 return this.leaseName.hashCode();
280 }
281
282 public long getDelay(TimeUnit unit) {
283 return unit.convert(this.expirationTime - System.currentTimeMillis(),
284 TimeUnit.MILLISECONDS);
285 }
286
287 public int compareTo(Delayed o) {
288 long delta = this.getDelay(TimeUnit.MILLISECONDS) -
289 o.getDelay(TimeUnit.MILLISECONDS);
290
291 return this.equals(o) ? 0 : (delta > 0 ? 1 : -1);
292 }
293
294
295 public void setExpirationTime(long expirationTime) {
296 this.expirationTime = expirationTime;
297 }
298 }
299 }