View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.regionserver;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.hbase.KeyValue;
26  import org.apache.hadoop.hbase.client.Scan;
27  
28  import java.io.IOException;
29  import java.util.ArrayList;
30  import java.util.LinkedList;
31  import java.util.List;
32  import java.util.NavigableSet;
33  
34  /**
35   * Scanner scans both the memstore and the HStore. Coalesce KeyValue stream
36   * into List<KeyValue> for a single row.
37   */
38  class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
39    static final Log LOG = LogFactory.getLog(StoreScanner.class);
40    private Store store;
41    private ScanQueryMatcher matcher;
42    private KeyValueHeap heap;
43    private boolean cacheBlocks;
44  
45    // Used to indicate that the scanner has closed (see HBASE-1107)
46    // Doesnt need to be volatile because it's always accessed via synchronized methods
47    private boolean closing = false;
48    private final boolean isGet;
49  
50    // if heap == null and lastTop != null, you need to reseek given the key below
51    private KeyValue lastTop = null;
52  
53    /**
54     * Opens a scanner across memstore, snapshot, and all StoreFiles.
55     *
56     * @param store who we scan
57     * @param scan the spec
58     * @param columns which columns we are scanning
59     * @throws IOException
60     */
61    StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns)
62                                throws IOException {
63      this.store = store;
64      this.cacheBlocks = scan.getCacheBlocks();
65      matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
66          columns, store.ttl, store.comparator.getRawComparator(),
67          store.versionsToReturn(scan.getMaxVersions()), 
68          false);
69  
70      this.isGet = scan.isGetScan();
71      // pass columns = try to filter out unnecessary ScanFiles
72      List<KeyValueScanner> scanners = getScanners(scan, columns);
73  
74      // Seek all scanners to the start of the Row (or if the exact maching row key does not
75      // exist, then to the start of the next matching Row).
76      for(KeyValueScanner scanner : scanners) {
77        scanner.seek(matcher.getStartKey());
78      }
79  
80      // Combine all seeked scanners with a heap
81      heap = new KeyValueHeap(scanners, store.comparator);
82  
83      this.store.addChangedReaderObserver(this);
84    }
85  
86    /**
87     * Used for major compactions.<p>
88     *
89     * Opens a scanner across specified StoreFiles.
90     * @param store who we scan
91     * @param scan the spec
92     * @param scanners ancilliary scanners
93     */
94    StoreScanner(Store store, Scan scan, List<? extends KeyValueScanner> scanners,
95        boolean retainDeletesInOutput)
96    throws IOException {
97      this.store = store;
98      this.cacheBlocks = false;
99      this.isGet = false;
100     matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
101         null, store.ttl, store.comparator.getRawComparator(),
102         store.versionsToReturn(scan.getMaxVersions()), retainDeletesInOutput);
103 
104     // Seek all scanners to the initial key
105     for(KeyValueScanner scanner : scanners) {
106       scanner.seek(matcher.getStartKey());
107     }
108 
109     // Combine all seeked scanners with a heap
110     heap = new KeyValueHeap(scanners, store.comparator);
111   }
112 
113   // Constructor for testing.
114   StoreScanner(final Scan scan, final byte [] colFamily, final long ttl,
115       final KeyValue.KVComparator comparator,
116       final NavigableSet<byte[]> columns,
117       final List<KeyValueScanner> scanners)
118         throws IOException {
119     this.store = null;
120     this.isGet = false;
121     this.cacheBlocks = scan.getCacheBlocks();
122     this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl,
123         comparator.getRawComparator(), scan.getMaxVersions(), false);
124 
125     // Seek all scanners to the initial key
126     for(KeyValueScanner scanner : scanners) {
127       scanner.seek(matcher.getStartKey());
128     }
129     heap = new KeyValueHeap(scanners, comparator);
130   }
131 
132   /*
133    * @return List of scanners ordered properly.
134    */
135   private List<KeyValueScanner> getScanners() throws IOException {
136     // First the store file scanners
137 
138     // TODO this used to get the store files in descending order,
139     // but now we get them in ascending order, which I think is
140     // actually more correct, since memstore get put at the end.
141     List<StoreFileScanner> sfScanners = StoreFileScanner
142       .getScannersForStoreFiles(store.getStorefiles(), cacheBlocks, isGet);
143     List<KeyValueScanner> scanners =
144       new ArrayList<KeyValueScanner>(sfScanners.size()+1);
145     scanners.addAll(sfScanners);
146     // Then the memstore scanners
147     scanners.addAll(this.store.memstore.getScanners());
148     return scanners;
149   }
150 
151   /*
152    * @return List of scanners to seek, possibly filtered by StoreFile.
153    */
154   private List<KeyValueScanner> getScanners(Scan scan,
155       final NavigableSet<byte[]> columns) throws IOException {
156     boolean memOnly;
157     boolean filesOnly;
158     if (scan instanceof InternalScan) {
159       InternalScan iscan = (InternalScan)scan;
160       memOnly = iscan.isCheckOnlyMemStore();
161       filesOnly = iscan.isCheckOnlyStoreFiles();
162     } else {
163       memOnly = false;
164       filesOnly = false;
165     }
166     List<KeyValueScanner> scanners = new LinkedList<KeyValueScanner>();
167     // First the store file scanners
168     if (memOnly == false) {
169       List<StoreFileScanner> sfScanners = StoreFileScanner
170       .getScannersForStoreFiles(store.getStorefiles(), cacheBlocks, isGet);
171 
172       // include only those scan files which pass all filters
173       for (StoreFileScanner sfs : sfScanners) {
174         if (sfs.shouldSeek(scan, columns)) {
175           scanners.add(sfs);
176         }
177       }
178     }
179 
180     // Then the memstore scanners
181     if ((filesOnly == false) && (this.store.memstore.shouldSeek(scan))) {
182         scanners.addAll(this.store.memstore.getScanners());
183     }
184     return scanners;
185   }
186 
187   public synchronized KeyValue peek() {
188     if (this.heap == null) {
189       return this.lastTop;
190     }
191     return this.heap.peek();
192   }
193 
194   public KeyValue next() {
195     // throw runtime exception perhaps?
196     throw new RuntimeException("Never call StoreScanner.next()");
197   }
198 
199   public synchronized void close() {
200     if (this.closing) return;
201     this.closing = true;
202     // under test, we dont have a this.store
203     if (this.store != null)
204       this.store.deleteChangedReaderObserver(this);
205     if (this.heap != null)
206       this.heap.close();
207     this.heap = null; // CLOSED!
208     this.lastTop = null; // If both are null, we are closed.
209   }
210 
211   public synchronized boolean seek(KeyValue key) throws IOException {
212     if (this.heap == null) {
213 
214       List<KeyValueScanner> scanners = getScanners();
215 
216       heap = new KeyValueHeap(scanners, store.comparator);
217     }
218 
219     return this.heap.seek(key);
220   }
221 
222   /**
223    * Get the next row of values from this Store.
224    * @param outResult
225    * @param limit
226    * @return true if there are more rows, false if scanner is done
227    */
228   public synchronized boolean next(List<KeyValue> outResult, int limit) throws IOException {
229     //DebugPrint.println("SS.next");
230 
231     checkReseek();
232 
233     // if the heap was left null, then the scanners had previously run out anyways, close and
234     // return.
235     if (this.heap == null) {
236       close();
237       return false;
238     }
239 
240     KeyValue peeked = this.heap.peek();
241     if (peeked == null) {
242       close();
243       return false;
244     }
245 
246     // only call setRow if the row changes; avoids confusing the query matcher
247     // if scanning intra-row
248     if ((matcher.row == null) || !peeked.matchingRow(matcher.row)) {
249       matcher.setRow(peeked.getRow());
250     }
251 
252     KeyValue kv;
253     List<KeyValue> results = new ArrayList<KeyValue>();
254     LOOP: while((kv = this.heap.peek()) != null) {
255       // kv is no longer immutable due to KeyOnlyFilter! use copy for safety
256       KeyValue copyKv = new KeyValue(kv.getBuffer(), kv.getOffset(), kv.getLength());
257       ScanQueryMatcher.MatchCode qcode = matcher.match(copyKv);
258       //DebugPrint.println("SS peek kv = " + kv + " with qcode = " + qcode);
259       switch(qcode) {
260         case INCLUDE:
261           results.add(copyKv);
262           this.heap.next();
263           if (limit > 0 && (results.size() == limit)) {
264             break LOOP;
265           }
266           continue;
267 
268         case DONE:
269           // copy jazz
270           outResult.addAll(results);
271           return true;
272 
273         case DONE_SCAN:
274           close();
275 
276           // copy jazz
277           outResult.addAll(results);
278 
279           return false;
280 
281         case SEEK_NEXT_ROW:
282           // This is just a relatively simple end of scan fix, to short-cut end us if there is a
283           // endKey in the scan.
284           if (!matcher.moreRowsMayExistAfter(kv)) {
285             outResult.addAll(results);
286             return false;
287           }
288 
289           reseek(matcher.getKeyForNextRow(kv));
290           break;
291 
292         case SEEK_NEXT_COL:
293           reseek(matcher.getKeyForNextColumn(kv));
294           break;
295 
296         case SKIP:
297           this.heap.next();
298           break;
299 
300         case SEEK_NEXT_USING_HINT:
301           KeyValue nextKV = matcher.getNextKeyHint(kv);
302           if (nextKV != null) {
303             reseek(nextKV);
304           } else {
305             heap.next();
306           }
307           break;
308 
309         default:
310           throw new RuntimeException("UNEXPECTED");
311       }
312     }
313 
314     if (!results.isEmpty()) {
315       // copy jazz
316       outResult.addAll(results);
317       return true;
318     }
319 
320     // No more keys
321     close();
322     return false;
323   }
324 
325   public synchronized boolean next(List<KeyValue> outResult) throws IOException {
326     return next(outResult, -1);
327   }
328 
329   // Implementation of ChangedReadersObserver
330   public synchronized void updateReaders() throws IOException {
331     if (this.closing) return;
332 
333     // All public synchronized API calls will call 'checkReseek' which will cause
334     // the scanner stack to reseek if this.heap==null && this.lastTop != null.
335     // But if two calls to updateReaders() happen without a 'next' or 'peek' then we
336     // will end up calling this.peek() which would cause a reseek in the middle of a updateReaders
337     // which is NOT what we want, not to mention could cause an NPE. So we early out here.
338     if (this.heap == null) return;
339 
340     // this could be null.
341     this.lastTop = this.peek();
342 
343     //DebugPrint.println("SS updateReaders, topKey = " + lastTop);
344 
345     // close scanners to old obsolete Store files
346     this.heap.close(); // bubble thru and close all scanners.
347     this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
348 
349     // Let the next() call handle re-creating and seeking
350   }
351 
352   private void checkReseek() throws IOException {
353     if (this.heap == null && this.lastTop != null) {
354       resetScannerStack(this.lastTop);
355       this.lastTop = null; // gone!
356     }
357     // else dont need to reseek
358   }
359 
360   private void resetScannerStack(KeyValue lastTopKey) throws IOException {
361     if (heap != null) {
362       throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
363     }
364 
365     /* When we have the scan object, should we not pass it to getScanners()
366      * to get a limited set of scanners? We did so in the constructor and we
367      * could have done it now by storing the scan object from the constructor */
368     List<KeyValueScanner> scanners = getScanners();
369 
370     for(KeyValueScanner scanner : scanners) {
371       scanner.seek(lastTopKey);
372     }
373 
374     // Combine all seeked scanners with a heap
375     heap = new KeyValueHeap(scanners, store.comparator);
376 
377     // Reset the state of the Query Matcher and set to top row.
378     // Only reset and call setRow if the row changes; avoids confusing the
379     // query matcher if scanning intra-row.
380     KeyValue kv = heap.peek();
381     if (kv == null) {
382       kv = lastTopKey;
383     }
384     if ((matcher.row == null) || !kv.matchingRow(matcher.row)) {
385       matcher.reset();
386       matcher.setRow(kv.getRow());
387     }
388   }
389 
390   @Override
391   public synchronized boolean reseek(KeyValue kv) throws IOException {
392     //Heap cannot be null, because this is only called from next() which
393     //guarantees that heap will never be null before this call.
394     return this.heap.reseek(kv);
395   }
396 
397   @Override
398   public long getSequenceID() {
399     return 0;
400   }
401 }