1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.regionserver;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.hbase.KeyValue;
26 import org.apache.hadoop.hbase.client.Scan;
27
28 import java.io.IOException;
29 import java.util.ArrayList;
30 import java.util.LinkedList;
31 import java.util.List;
32 import java.util.NavigableSet;
33
34
35
36
37
38 class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
39 static final Log LOG = LogFactory.getLog(StoreScanner.class);
40 private Store store;
41 private ScanQueryMatcher matcher;
42 private KeyValueHeap heap;
43 private boolean cacheBlocks;
44
45
46
47 private boolean closing = false;
48 private final boolean isGet;
49
50
51 private KeyValue lastTop = null;
52
53
54
55
56
57
58
59
60
61 StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns)
62 throws IOException {
63 this.store = store;
64 this.cacheBlocks = scan.getCacheBlocks();
65 matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
66 columns, store.ttl, store.comparator.getRawComparator(),
67 store.versionsToReturn(scan.getMaxVersions()),
68 false);
69
70 this.isGet = scan.isGetScan();
71
72 List<KeyValueScanner> scanners = getScanners(scan, columns);
73
74
75
76 for(KeyValueScanner scanner : scanners) {
77 scanner.seek(matcher.getStartKey());
78 }
79
80
81 heap = new KeyValueHeap(scanners, store.comparator);
82
83 this.store.addChangedReaderObserver(this);
84 }
85
86
87
88
89
90
91
92
93
94 StoreScanner(Store store, Scan scan, List<? extends KeyValueScanner> scanners,
95 boolean retainDeletesInOutput)
96 throws IOException {
97 this.store = store;
98 this.cacheBlocks = false;
99 this.isGet = false;
100 matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
101 null, store.ttl, store.comparator.getRawComparator(),
102 store.versionsToReturn(scan.getMaxVersions()), retainDeletesInOutput);
103
104
105 for(KeyValueScanner scanner : scanners) {
106 scanner.seek(matcher.getStartKey());
107 }
108
109
110 heap = new KeyValueHeap(scanners, store.comparator);
111 }
112
113
114 StoreScanner(final Scan scan, final byte [] colFamily, final long ttl,
115 final KeyValue.KVComparator comparator,
116 final NavigableSet<byte[]> columns,
117 final List<KeyValueScanner> scanners)
118 throws IOException {
119 this.store = null;
120 this.isGet = false;
121 this.cacheBlocks = scan.getCacheBlocks();
122 this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl,
123 comparator.getRawComparator(), scan.getMaxVersions(), false);
124
125
126 for(KeyValueScanner scanner : scanners) {
127 scanner.seek(matcher.getStartKey());
128 }
129 heap = new KeyValueHeap(scanners, comparator);
130 }
131
132
133
134
135 private List<KeyValueScanner> getScanners() throws IOException {
136
137
138
139
140
141 List<StoreFileScanner> sfScanners = StoreFileScanner
142 .getScannersForStoreFiles(store.getStorefiles(), cacheBlocks, isGet);
143 List<KeyValueScanner> scanners =
144 new ArrayList<KeyValueScanner>(sfScanners.size()+1);
145 scanners.addAll(sfScanners);
146
147 scanners.addAll(this.store.memstore.getScanners());
148 return scanners;
149 }
150
151
152
153
154 private List<KeyValueScanner> getScanners(Scan scan,
155 final NavigableSet<byte[]> columns) throws IOException {
156 boolean memOnly;
157 boolean filesOnly;
158 if (scan instanceof InternalScan) {
159 InternalScan iscan = (InternalScan)scan;
160 memOnly = iscan.isCheckOnlyMemStore();
161 filesOnly = iscan.isCheckOnlyStoreFiles();
162 } else {
163 memOnly = false;
164 filesOnly = false;
165 }
166 List<KeyValueScanner> scanners = new LinkedList<KeyValueScanner>();
167
168 if (memOnly == false) {
169 List<StoreFileScanner> sfScanners = StoreFileScanner
170 .getScannersForStoreFiles(store.getStorefiles(), cacheBlocks, isGet);
171
172
173 for (StoreFileScanner sfs : sfScanners) {
174 if (sfs.shouldSeek(scan, columns)) {
175 scanners.add(sfs);
176 }
177 }
178 }
179
180
181 if ((filesOnly == false) && (this.store.memstore.shouldSeek(scan))) {
182 scanners.addAll(this.store.memstore.getScanners());
183 }
184 return scanners;
185 }
186
187 public synchronized KeyValue peek() {
188 if (this.heap == null) {
189 return this.lastTop;
190 }
191 return this.heap.peek();
192 }
193
194 public KeyValue next() {
195
196 throw new RuntimeException("Never call StoreScanner.next()");
197 }
198
199 public synchronized void close() {
200 if (this.closing) return;
201 this.closing = true;
202
203 if (this.store != null)
204 this.store.deleteChangedReaderObserver(this);
205 if (this.heap != null)
206 this.heap.close();
207 this.heap = null;
208 this.lastTop = null;
209 }
210
211 public synchronized boolean seek(KeyValue key) throws IOException {
212 if (this.heap == null) {
213
214 List<KeyValueScanner> scanners = getScanners();
215
216 heap = new KeyValueHeap(scanners, store.comparator);
217 }
218
219 return this.heap.seek(key);
220 }
221
222
223
224
225
226
227
228 public synchronized boolean next(List<KeyValue> outResult, int limit) throws IOException {
229
230
231 checkReseek();
232
233
234
235 if (this.heap == null) {
236 close();
237 return false;
238 }
239
240 KeyValue peeked = this.heap.peek();
241 if (peeked == null) {
242 close();
243 return false;
244 }
245
246
247
248 if ((matcher.row == null) || !peeked.matchingRow(matcher.row)) {
249 matcher.setRow(peeked.getRow());
250 }
251
252 KeyValue kv;
253 List<KeyValue> results = new ArrayList<KeyValue>();
254 LOOP: while((kv = this.heap.peek()) != null) {
255
256 KeyValue copyKv = new KeyValue(kv.getBuffer(), kv.getOffset(), kv.getLength());
257 ScanQueryMatcher.MatchCode qcode = matcher.match(copyKv);
258
259 switch(qcode) {
260 case INCLUDE:
261 results.add(copyKv);
262 this.heap.next();
263 if (limit > 0 && (results.size() == limit)) {
264 break LOOP;
265 }
266 continue;
267
268 case DONE:
269
270 outResult.addAll(results);
271 return true;
272
273 case DONE_SCAN:
274 close();
275
276
277 outResult.addAll(results);
278
279 return false;
280
281 case SEEK_NEXT_ROW:
282
283
284 if (!matcher.moreRowsMayExistAfter(kv)) {
285 outResult.addAll(results);
286 return false;
287 }
288
289 reseek(matcher.getKeyForNextRow(kv));
290 break;
291
292 case SEEK_NEXT_COL:
293 reseek(matcher.getKeyForNextColumn(kv));
294 break;
295
296 case SKIP:
297 this.heap.next();
298 break;
299
300 case SEEK_NEXT_USING_HINT:
301 KeyValue nextKV = matcher.getNextKeyHint(kv);
302 if (nextKV != null) {
303 reseek(nextKV);
304 } else {
305 heap.next();
306 }
307 break;
308
309 default:
310 throw new RuntimeException("UNEXPECTED");
311 }
312 }
313
314 if (!results.isEmpty()) {
315
316 outResult.addAll(results);
317 return true;
318 }
319
320
321 close();
322 return false;
323 }
324
325 public synchronized boolean next(List<KeyValue> outResult) throws IOException {
326 return next(outResult, -1);
327 }
328
329
330 public synchronized void updateReaders() throws IOException {
331 if (this.closing) return;
332
333
334
335
336
337
338 if (this.heap == null) return;
339
340
341 this.lastTop = this.peek();
342
343
344
345
346 this.heap.close();
347 this.heap = null;
348
349
350 }
351
352 private void checkReseek() throws IOException {
353 if (this.heap == null && this.lastTop != null) {
354 resetScannerStack(this.lastTop);
355 this.lastTop = null;
356 }
357
358 }
359
360 private void resetScannerStack(KeyValue lastTopKey) throws IOException {
361 if (heap != null) {
362 throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
363 }
364
365
366
367
368 List<KeyValueScanner> scanners = getScanners();
369
370 for(KeyValueScanner scanner : scanners) {
371 scanner.seek(lastTopKey);
372 }
373
374
375 heap = new KeyValueHeap(scanners, store.comparator);
376
377
378
379
380 KeyValue kv = heap.peek();
381 if (kv == null) {
382 kv = lastTopKey;
383 }
384 if ((matcher.row == null) || !kv.matchingRow(matcher.row)) {
385 matcher.reset();
386 matcher.setRow(kv.getRow());
387 }
388 }
389
390 @Override
391 public synchronized boolean reseek(KeyValue kv) throws IOException {
392
393
394 return this.heap.reseek(kv);
395 }
396
397 @Override
398 public long getSequenceID() {
399 return 0;
400 }
401 }