1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import java.util.Collection;
23 import java.util.Date;
24 import java.util.HashMap;
25 import java.util.Iterator;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.PriorityBlockingQueue;
28 import java.util.concurrent.TimeUnit;
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31
32
33
34
35
36
37
38
39
40 public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
41 static final Log LOG = LogFactory.getLog(PriorityCompactionQueue.class);
42
43
44
45
46
47 private class CompactionRequest implements Comparable<CompactionRequest> {
48 private final HRegion r;
49 private final int p;
50 private final Date date;
51
52 public CompactionRequest(HRegion r, int p) {
53 this(r, p, null);
54 }
55
56 public CompactionRequest(HRegion r, int p, Date d) {
57 if (r == null) {
58 throw new NullPointerException("HRegion cannot be null");
59 }
60
61 if (d == null) {
62 d = new Date();
63 }
64
65 this.r = r;
66 this.p = p;
67 this.date = d;
68 }
69
70
71
72
73
74
75
76
77
78
79
80
81 @Override
82 public int compareTo(CompactionRequest request) {
83
84 if (this.equals(request)) {
85 return 0;
86 }
87 int compareVal;
88
89 compareVal = p - request.p;
90 if (compareVal != 0) {
91 return compareVal;
92 }
93
94 compareVal = date.compareTo(request.date);
95 if (compareVal != 0) {
96 return compareVal;
97 }
98
99
100 return -1;
101 }
102
103
104 HRegion getHRegion() {
105 return r;
106 }
107
108
109 int getPriority() {
110 return p;
111 }
112
113 public String toString() {
114 return "regionName=" + r.getRegionNameAsString() +
115 ", priority=" + p + ", date=" + date;
116 }
117 }
118
119
120 protected final BlockingQueue<CompactionRequest> queue =
121 new PriorityBlockingQueue<CompactionRequest>();
122
123
124 private final HashMap<HRegion, CompactionRequest> regionsInQueue =
125 new HashMap<HRegion, CompactionRequest>();
126
127
128 public PriorityCompactionQueue() {
129 LOG.debug("Create PriorityCompactionQueue");
130 }
131
132
133
134
135
136
137
138 protected CompactionRequest addToRegionsInQueue(HRegion r, int p) {
139 CompactionRequest queuedRequest = null;
140 CompactionRequest newRequest = new CompactionRequest(r, p);
141 synchronized (regionsInQueue) {
142 queuedRequest = regionsInQueue.get(r);
143 if (queuedRequest == null ||
144 newRequest.getPriority() < queuedRequest.getPriority()) {
145 LOG.trace("Inserting region in queue. " + newRequest);
146 regionsInQueue.put(r, newRequest);
147 } else {
148 LOG.trace("Region already in queue, skipping. Queued: " + queuedRequest +
149 ", requested: " + newRequest);
150 newRequest = null;
151 }
152 }
153
154 if (newRequest != null && queuedRequest != null) {
155
156 queue.remove(queuedRequest);
157 }
158
159 return newRequest;
160 }
161
162
163
164
165 protected CompactionRequest removeFromRegionsInQueue(CompactionRequest remove) {
166 if (remove == null) return null;
167
168 synchronized (regionsInQueue) {
169 CompactionRequest cr = null;
170 cr = regionsInQueue.remove(remove.getHRegion());
171 if (cr != null && !cr.equals(remove))
172 {
173
174
175
176
177
178 regionsInQueue.put(cr.getHRegion(), cr);
179 }
180 if (cr == null) {
181 LOG.warn("Removed a region it couldn't find in regionsInQueue: " + remove.getHRegion());
182 }
183 return cr;
184 }
185 }
186
187 public boolean add(HRegion e, int p) {
188 CompactionRequest request = this.addToRegionsInQueue(e, p);
189 if (request != null) {
190 boolean result = queue.add(request);
191 return result;
192 } else {
193 return false;
194 }
195 }
196
197 @Override
198 public boolean add(HRegion e) {
199 return add(e, e.getCompactPriority());
200 }
201
202 public boolean offer(HRegion e, int p) {
203 CompactionRequest request = this.addToRegionsInQueue(e, p);
204 return (request != null)? queue.offer(request): false;
205 }
206
207 @Override
208 public boolean offer(HRegion e) {
209 return offer(e, e.getCompactPriority());
210 }
211
212 public void put(HRegion e, int p) throws InterruptedException {
213 CompactionRequest request = this.addToRegionsInQueue(e, p);
214 if (request != null) {
215 queue.put(request);
216 }
217 }
218
219 @Override
220 public void put(HRegion e) throws InterruptedException {
221 put(e, e.getCompactPriority());
222 }
223
224 public boolean offer(HRegion e, int p, long timeout, TimeUnit unit)
225 throws InterruptedException {
226 CompactionRequest request = this.addToRegionsInQueue(e, p);
227 return (request != null)? queue.offer(request, timeout, unit): false;
228 }
229
230 @Override
231 public boolean offer(HRegion e, long timeout, TimeUnit unit)
232 throws InterruptedException {
233 return offer(e, e.getCompactPriority(), timeout, unit);
234 }
235
236 @Override
237 public HRegion take() throws InterruptedException {
238 CompactionRequest cr = queue.take();
239 if (cr != null) {
240 removeFromRegionsInQueue(cr);
241 return cr.getHRegion();
242 }
243 return null;
244 }
245
246 @Override
247 public HRegion poll(long timeout, TimeUnit unit) throws InterruptedException {
248 CompactionRequest cr = queue.poll(timeout, unit);
249 if (cr != null) {
250 removeFromRegionsInQueue(cr);
251 return cr.getHRegion();
252 }
253 return null;
254 }
255
256 @Override
257 public boolean remove(Object r) {
258 if (r instanceof CompactionRequest) {
259 CompactionRequest cr = removeFromRegionsInQueue((CompactionRequest) r);
260 if (cr != null) {
261 return queue.remove(cr);
262 }
263 }
264
265 return false;
266 }
267
268 @Override
269 public HRegion remove() {
270 CompactionRequest cr = queue.remove();
271 if (cr != null) {
272 removeFromRegionsInQueue(cr);
273 return cr.getHRegion();
274 }
275 return null;
276 }
277
278 @Override
279 public HRegion poll() {
280 CompactionRequest cr = queue.poll();
281 if (cr != null) {
282 removeFromRegionsInQueue(cr);
283 return cr.getHRegion();
284 }
285 return null;
286 }
287
288 @Override
289 public int remainingCapacity() {
290 return queue.remainingCapacity();
291 }
292
293 @Override
294 public boolean contains(Object r) {
295 if (r instanceof HRegion) {
296 synchronized (regionsInQueue) {
297 return regionsInQueue.containsKey((HRegion) r);
298 }
299 } else if (r instanceof CompactionRequest) {
300 return queue.contains(r);
301 }
302 return false;
303 }
304
305 @Override
306 public HRegion element() {
307 CompactionRequest cr = queue.element();
308 return (cr != null)? cr.getHRegion(): null;
309 }
310
311 @Override
312 public HRegion peek() {
313 CompactionRequest cr = queue.peek();
314 return (cr != null)? cr.getHRegion(): null;
315 }
316
317 @Override
318 public int size() {
319 return queue.size();
320 }
321
322 @Override
323 public boolean isEmpty() {
324 return queue.isEmpty();
325 }
326
327 @Override
328 public void clear() {
329 regionsInQueue.clear();
330 queue.clear();
331 }
332
333
334
335 @Override
336 public Iterator<HRegion> iterator() {
337 throw new UnsupportedOperationException("Not supported.");
338 }
339
340 @Override
341 public Object[] toArray() {
342 throw new UnsupportedOperationException("Not supported.");
343 }
344
345 @Override
346 public <T> T[] toArray(T[] a) {
347 throw new UnsupportedOperationException("Not supported.");
348 }
349
350 @Override
351 public boolean containsAll(Collection<?> c) {
352 throw new UnsupportedOperationException("Not supported.");
353 }
354
355 @Override
356 public boolean addAll(Collection<? extends HRegion> c) {
357 throw new UnsupportedOperationException("Not supported.");
358 }
359
360 @Override
361 public boolean removeAll(Collection<?> c) {
362 throw new UnsupportedOperationException("Not supported.");
363 }
364
365 @Override
366 public boolean retainAll(Collection<?> c) {
367 throw new UnsupportedOperationException("Not supported.");
368 }
369
370 @Override
371 public int drainTo(Collection<? super HRegion> c) {
372 throw new UnsupportedOperationException("Not supported.");
373 }
374
375 @Override
376 public int drainTo(Collection<? super HRegion> c, int maxElements) {
377 throw new UnsupportedOperationException("Not supported.");
378 }
379 }