View Javadoc

1   /**
2   * Copyright 2010 The Apache Software Foundation
3   *
4   * Licensed to the Apache Software Foundation (ASF) under one
5   * or more contributor license agreements.  See the NOTICE file
6   * distributed with this work for additional information
7   * regarding copyright ownership.  The ASF licenses this file
8   * to you under the Apache License, Version 2.0 (the
9   * "License"); you may not use this file except in compliance
10  * with the License.  You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.util.Collection;
23  import java.util.Date;
24  import java.util.HashMap;
25  import java.util.Iterator;
26  import java.util.concurrent.BlockingQueue;
27  import java.util.concurrent.PriorityBlockingQueue;
28  import java.util.concurrent.TimeUnit;
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  
32  /**
33   * This class delegates to the BlockingQueue but wraps all HRegions in
34   * compaction requests that hold the priority and the date requested.
35   *
36   * Implementation Note: With an elevation time of -1 there is the potential for
37   * starvation of the lower priority compaction requests as long as there is a
38   * constant stream of high priority requests.
39   */
40  public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
41    static final Log LOG = LogFactory.getLog(PriorityCompactionQueue.class);
42  
43    /**
44     * This class represents a compaction request and holds the region, priority,
45     * and time submitted.
46     */
47    private class CompactionRequest implements Comparable<CompactionRequest> {
48      private final HRegion r;
49      private final int p;
50      private final Date date;
51  
52      public CompactionRequest(HRegion r, int p) {
53        this(r, p, null);
54      }
55  
56      public CompactionRequest(HRegion r, int p, Date d) {
57        if (r == null) {
58          throw new NullPointerException("HRegion cannot be null");
59        }
60  
61        if (d == null) {
62          d = new Date();
63        }
64  
65        this.r = r;
66        this.p = p;
67        this.date = d;
68      }
69  
70      /**
71       * This function will define where in the priority queue the request will
72       * end up.  Those with the highest priorities will be first.  When the
73       * priorities are the same it will It will first compare priority then date
74       * to maintain a FIFO functionality.
75       *
76       * <p>Note: The date is only accurate to the millisecond which means it is
77       * possible that two requests were inserted into the queue within a
78       * millisecond.  When that is the case this function will break the tie
79       * arbitrarily.
80       */
81      @Override
82      public int compareTo(CompactionRequest request) {
83        //NOTE: The head of the priority queue is the least element
84        if (this.equals(request)) {
85          return 0; //they are the same request
86        }
87        int compareVal;
88  
89        compareVal = p - request.p; //compare priority
90        if (compareVal != 0) {
91          return compareVal;
92        }
93  
94        compareVal = date.compareTo(request.date);
95        if (compareVal != 0) {
96          return compareVal;
97        }
98  
99        //break the tie arbitrarily
100       return -1;
101     }
102 
103     /** Gets the HRegion for the request */
104     HRegion getHRegion() {
105       return r;
106     }
107 
108     /** Gets the priority for the request */
109     int getPriority() {
110       return p;
111     }
112 
113     public String toString() {
114       return "regionName=" + r.getRegionNameAsString() +
115         ", priority=" + p + ", date=" + date;
116     }
117   }
118 
119   /** The actual blocking queue we delegate to */
120   protected final BlockingQueue<CompactionRequest> queue =
121     new PriorityBlockingQueue<CompactionRequest>();
122 
123   /** Hash map of the HRegions contained within the Compaction Queue */
124   private final HashMap<HRegion, CompactionRequest> regionsInQueue =
125     new HashMap<HRegion, CompactionRequest>();
126 
127   /** Creates a new PriorityCompactionQueue with no priority elevation time */
128   public PriorityCompactionQueue() {
129     LOG.debug("Create PriorityCompactionQueue");
130   }
131 
132   /** If the region is not already in the queue it will add it and return a
133    * new compaction request object.  If it is already present in the queue
134    * then it will return null.
135    * @param p If null it will use the default priority
136    * @return returns a compaction request if it isn't already in the queue
137    */
138   protected CompactionRequest addToRegionsInQueue(HRegion r, int p) {
139     CompactionRequest queuedRequest = null;
140     CompactionRequest newRequest = new CompactionRequest(r, p);
141     synchronized (regionsInQueue) {
142       queuedRequest = regionsInQueue.get(r);
143       if (queuedRequest == null ||
144           newRequest.getPriority() < queuedRequest.getPriority()) {
145         LOG.trace("Inserting region in queue. " + newRequest);
146         regionsInQueue.put(r, newRequest);
147       } else {
148         LOG.trace("Region already in queue, skipping. Queued: " + queuedRequest +
149           ", requested: " + newRequest);
150         newRequest = null; // It is already present so don't add it
151       }
152     }
153 
154     if (newRequest != null && queuedRequest != null) {
155       // Remove the lower priority request
156       queue.remove(queuedRequest);
157     }
158 
159     return newRequest;
160   }
161 
162   /** Removes the request from the regions in queue
163    * @param remove
164    */
165   protected CompactionRequest removeFromRegionsInQueue(CompactionRequest remove) {
166     if (remove == null) return null;
167 
168     synchronized (regionsInQueue) {
169       CompactionRequest cr = null;
170       cr = regionsInQueue.remove(remove.getHRegion());
171       if (cr != null && !cr.equals(remove))
172       {
173         //Because we don't synchronize across both this.regionsInQueue and this.queue
174         //a rare race condition exists where a higher priority compaction request replaces
175         //the lower priority request in this.regionsInQueue but the lower priority request
176         //is taken off this.queue before the higher can be added to this.queue.
177         //So if we didn't remove what we were expecting we put it back on.
178         regionsInQueue.put(cr.getHRegion(), cr);
179       }
180       if (cr == null) {
181         LOG.warn("Removed a region it couldn't find in regionsInQueue: " + remove.getHRegion());
182       }
183       return cr;
184     }
185   }
186 
187   public boolean add(HRegion e, int p) {
188     CompactionRequest request = this.addToRegionsInQueue(e, p);
189     if (request != null) {
190       boolean result = queue.add(request);
191       return result;
192     } else {
193       return false;
194     }
195   }
196 
197   @Override
198   public boolean add(HRegion e) {
199     return add(e, e.getCompactPriority());
200   }
201 
202   public boolean offer(HRegion e, int p) {
203     CompactionRequest request = this.addToRegionsInQueue(e, p);
204     return (request != null)? queue.offer(request): false;
205   }
206 
207   @Override
208   public boolean offer(HRegion e) {
209     return offer(e, e.getCompactPriority());
210   }
211 
212   public void put(HRegion e, int p) throws InterruptedException {
213     CompactionRequest request = this.addToRegionsInQueue(e, p);
214     if (request != null) {
215       queue.put(request);
216     }
217   }
218 
219   @Override
220   public void put(HRegion e) throws InterruptedException {
221     put(e, e.getCompactPriority());
222   }
223 
224   public boolean offer(HRegion e, int p, long timeout, TimeUnit unit)
225   throws InterruptedException {
226     CompactionRequest request = this.addToRegionsInQueue(e, p);
227     return (request != null)? queue.offer(request, timeout, unit): false;
228   }
229 
230   @Override
231   public boolean offer(HRegion e, long timeout, TimeUnit unit)
232   throws InterruptedException {
233     return offer(e, e.getCompactPriority(), timeout, unit);
234   }
235 
236   @Override
237   public HRegion take() throws InterruptedException {
238     CompactionRequest cr = queue.take();
239     if (cr != null) {
240       removeFromRegionsInQueue(cr);
241       return cr.getHRegion();
242     }
243     return null;
244   }
245 
246   @Override
247   public HRegion poll(long timeout, TimeUnit unit) throws InterruptedException {
248     CompactionRequest cr = queue.poll(timeout, unit);
249     if (cr != null) {
250       removeFromRegionsInQueue(cr);
251       return cr.getHRegion();
252     }
253     return null;
254   }
255 
256   @Override
257   public boolean remove(Object r) {
258     if (r instanceof CompactionRequest) {
259       CompactionRequest cr = removeFromRegionsInQueue((CompactionRequest) r);
260       if (cr != null) {
261         return queue.remove(cr);
262       }
263     }
264 
265     return false;
266   }
267 
268   @Override
269   public HRegion remove() {
270     CompactionRequest cr = queue.remove();
271     if (cr != null) {
272       removeFromRegionsInQueue(cr);
273       return cr.getHRegion();
274     }
275     return null;
276   }
277 
278   @Override
279   public HRegion poll() {
280     CompactionRequest cr = queue.poll();
281     if (cr != null) {
282       removeFromRegionsInQueue(cr);
283       return cr.getHRegion();
284     }
285     return null;
286   }
287 
288   @Override
289   public int remainingCapacity() {
290     return queue.remainingCapacity();
291   }
292 
293   @Override
294   public boolean contains(Object r) {
295     if (r instanceof HRegion) {
296       synchronized (regionsInQueue) {
297         return regionsInQueue.containsKey((HRegion) r);
298       }
299     } else if (r instanceof CompactionRequest) {
300       return queue.contains(r);
301     }
302     return false;
303   }
304 
305   @Override
306   public HRegion element() {
307     CompactionRequest cr = queue.element();
308     return (cr != null)? cr.getHRegion(): null;
309   }
310 
311   @Override
312   public HRegion peek() {
313     CompactionRequest cr = queue.peek();
314     return (cr != null)? cr.getHRegion(): null;
315   }
316 
317   @Override
318   public int size() {
319     return queue.size();
320   }
321 
322   @Override
323   public boolean isEmpty() {
324     return queue.isEmpty();
325   }
326 
327   @Override
328   public void clear() {
329     regionsInQueue.clear();
330     queue.clear();
331   }
332 
333   // Unimplemented methods, collection methods
334 
335   @Override
336   public Iterator<HRegion> iterator() {
337     throw new UnsupportedOperationException("Not supported.");
338   }
339 
340   @Override
341   public Object[] toArray() {
342     throw new UnsupportedOperationException("Not supported.");
343   }
344 
345   @Override
346   public <T> T[] toArray(T[] a) {
347     throw new UnsupportedOperationException("Not supported.");
348   }
349 
350   @Override
351   public boolean containsAll(Collection<?> c) {
352     throw new UnsupportedOperationException("Not supported.");
353   }
354 
355   @Override
356   public boolean addAll(Collection<? extends HRegion> c) {
357     throw new UnsupportedOperationException("Not supported.");
358   }
359 
360   @Override
361   public boolean removeAll(Collection<?> c) {
362     throw new UnsupportedOperationException("Not supported.");
363   }
364 
365   @Override
366   public boolean retainAll(Collection<?> c) {
367     throw new UnsupportedOperationException("Not supported.");
368   }
369 
370   @Override
371   public int drainTo(Collection<? super HRegion> c) {
372     throw new UnsupportedOperationException("Not supported.");
373   }
374 
375   @Override
376   public int drainTo(Collection<? super HRegion> c, int maxElements) {
377     throw new UnsupportedOperationException("Not supported.");
378   }
379 }