View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.regionserver;
22  
23  import org.apache.hadoop.hbase.KeyValue;
24  import org.apache.hadoop.hbase.KeyValue.KVComparator;
25  
26  import java.io.IOException;
27  import java.util.Comparator;
28  import java.util.List;
29  import java.util.PriorityQueue;
30  
31  /**
32   * Implements a heap merge across any number of KeyValueScanners.
33   * <p>
34   * Implements KeyValueScanner itself.
35   * <p>
36   * This class is used at the Region level to merge across Stores
37   * and at the Store level to merge across the memstore and StoreFiles.
38   * <p>
39   * In the Region case, we also need InternalScanner.next(List), so this class
40   * also implements InternalScanner.  WARNING: As is, if you try to use this
41   * as an InternalScanner at the Store level, you will get runtime exceptions.
42   */
43  public class KeyValueHeap implements KeyValueScanner, InternalScanner {
44    private PriorityQueue<KeyValueScanner> heap = null;
45    private KeyValueScanner current = null;
46    private KVScannerComparator comparator;
47  
48    /**
49     * Constructor.  This KeyValueHeap will handle closing of passed in
50     * KeyValueScanners.
51     * @param scanners
52     * @param comparator
53     */
54    public KeyValueHeap(List<? extends KeyValueScanner> scanners,
55        KVComparator comparator) {
56      this.comparator = new KVScannerComparator(comparator);
57      if (!scanners.isEmpty()) {
58        this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
59            this.comparator);
60        for (KeyValueScanner scanner : scanners) {
61          if (scanner.peek() != null) {
62            this.heap.add(scanner);
63          } else {
64            scanner.close();
65          }
66        }
67        this.current = heap.poll();
68      }
69    }
70  
71    public KeyValue peek() {
72      if (this.current == null) {
73        return null;
74      }
75      return this.current.peek();
76    }
77  
78    public KeyValue next()  throws IOException {
79      if(this.current == null) {
80        return null;
81      }
82      KeyValue kvReturn = this.current.next();
83      KeyValue kvNext = this.current.peek();
84      if (kvNext == null) {
85        this.current.close();
86        this.current = this.heap.poll();
87      } else {
88        KeyValueScanner topScanner = this.heap.peek();
89        if (topScanner == null ||
90            this.comparator.compare(kvNext, topScanner.peek()) >= 0) {
91          this.heap.add(this.current);
92          this.current = this.heap.poll();
93        }
94      }
95      return kvReturn;
96    }
97  
98    /**
99     * Gets the next row of keys from the top-most scanner.
100    * <p>
101    * This method takes care of updating the heap.
102    * <p>
103    * This can ONLY be called when you are using Scanners that implement
104    * InternalScanner as well as KeyValueScanner (a {@link StoreScanner}).
105    * @param result
106    * @param limit
107    * @return true if there are more keys, false if all scanners are done
108    */
109   public boolean next(List<KeyValue> result, int limit) throws IOException {
110     if (this.current == null) {
111       return false;
112     }
113     InternalScanner currentAsInternal = (InternalScanner)this.current;
114     boolean mayContainsMoreRows = currentAsInternal.next(result, limit);
115     KeyValue pee = this.current.peek();
116     /*
117      * By definition, any InternalScanner must return false only when it has no
118      * further rows to be fetched. So, we can close a scanner if it returns
119      * false. All existing implementations seem to be fine with this. It is much
120      * more efficient to close scanners which are not needed than keep them in
121      * the heap. This is also required for certain optimizations.
122      */
123     if (pee == null || !mayContainsMoreRows) {
124       this.current.close();
125     } else {
126       this.heap.add(this.current);
127     }
128     this.current = this.heap.poll();
129     return (this.current != null);
130   }
131 
132   /**
133    * Gets the next row of keys from the top-most scanner.
134    * <p>
135    * This method takes care of updating the heap.
136    * <p>
137    * This can ONLY be called when you are using Scanners that implement
138    * InternalScanner as well as KeyValueScanner (a {@link StoreScanner}).
139    * @param result
140    * @return true if there are more keys, false if all scanners are done
141    */
142   public boolean next(List<KeyValue> result) throws IOException {
143     return next(result, -1);
144   }
145 
146   private static class KVScannerComparator implements Comparator<KeyValueScanner> {
147     private KVComparator kvComparator;
148     /**
149      * Constructor
150      * @param kvComparator
151      */
152     public KVScannerComparator(KVComparator kvComparator) {
153       this.kvComparator = kvComparator;
154     }
155     public int compare(KeyValueScanner left, KeyValueScanner right) {
156       int comparison = compare(left.peek(), right.peek());
157       if (comparison != 0) {
158         return comparison;
159       } else {
160         // Since both the keys are exactly the same, we break the tie in favor
161         // of the key which came latest.
162         long leftSequenceID = left.getSequenceID();
163         long rightSequenceID = right.getSequenceID();
164         if (leftSequenceID > rightSequenceID) {
165           return -1;
166         } else if (leftSequenceID < rightSequenceID) {
167           return 1;
168         } else {
169           return 0;
170         }
171       }
172     }
173     /**
174      * Compares two KeyValue
175      * @param left
176      * @param right
177      * @return less than 0 if left is smaller, 0 if equal etc..
178      */
179     public int compare(KeyValue left, KeyValue right) {
180       return this.kvComparator.compare(left, right);
181     }
182     /**
183      * @return KVComparator
184      */
185     public KVComparator getComparator() {
186       return this.kvComparator;
187     }
188   }
189 
190   public void close() {
191     if (this.current != null) {
192       this.current.close();
193     }
194     if (this.heap != null) {
195       KeyValueScanner scanner;
196       while ((scanner = this.heap.poll()) != null) {
197         scanner.close();
198       }
199     }
200   }
201 
202   /**
203    * Seeks all scanners at or below the specified seek key.  If we earlied-out
204    * of a row, we may end up skipping values that were never reached yet.
205    * Rather than iterating down, we want to give the opportunity to re-seek.
206    * <p>
207    * As individual scanners may run past their ends, those scanners are
208    * automatically closed and removed from the heap.
209    * @param seekKey KeyValue to seek at or after
210    * @return true if KeyValues exist at or after specified key, false if not
211    * @throws IOException
212    */
213   public boolean seek(KeyValue seekKey) throws IOException {
214     if (this.current == null) {
215       return false;
216     }
217     this.heap.add(this.current);
218     this.current = null;
219 
220     KeyValueScanner scanner;
221     while((scanner = this.heap.poll()) != null) {
222       KeyValue topKey = scanner.peek();
223       if(comparator.getComparator().compare(seekKey, topKey) <= 0) { // Correct?
224         // Top KeyValue is at-or-after Seek KeyValue
225         this.current = scanner;
226         return true;
227       }
228       if(!scanner.seek(seekKey)) {
229         scanner.close();
230       } else {
231         this.heap.add(scanner);
232       }
233     }
234     // Heap is returning empty, scanner is done
235     return false;
236   }
237 
238   public boolean reseek(KeyValue seekKey) throws IOException {
239     //This function is very identical to the seek(KeyValue) function except that
240     //scanner.seek(seekKey) is changed to scanner.reseek(seekKey)
241     if (this.current == null) {
242       return false;
243     }
244     this.heap.add(this.current);
245     this.current = null;
246 
247     KeyValueScanner scanner;
248     while ((scanner = this.heap.poll()) != null) {
249       KeyValue topKey = scanner.peek();
250       if (comparator.getComparator().compare(seekKey, topKey) <= 0) {
251         // Top KeyValue is at-or-after Seek KeyValue
252         this.current = scanner;
253         return true;
254       }
255       if (!scanner.reseek(seekKey)) {
256         scanner.close();
257       } else {
258         this.heap.add(scanner);
259       }
260     }
261     // Heap is returning empty, scanner is done
262     return false;
263   }
264 
265   /**
266    * @return the current Heap
267    */
268   public PriorityQueue<KeyValueScanner> getHeap() {
269     return this.heap;
270   }
271 
272   @Override
273   public long getSequenceID() {
274     return 0;
275   }
276 }