1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import java.io.ByteArrayOutputStream;
23 import java.io.DataOutputStream;
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.List;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.HBaseTestCase;
31 import org.apache.hadoop.hbase.HColumnDescriptor;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.HRegionInfo;
34 import org.apache.hadoop.hbase.HServerAddress;
35 import org.apache.hadoop.hbase.HTableDescriptor;
36 import org.apache.hadoop.hbase.KeyValue;
37 import org.apache.hadoop.hbase.UnknownScannerException;
38 import org.apache.hadoop.hbase.client.Get;
39 import org.apache.hadoop.hbase.client.Put;
40 import org.apache.hadoop.hbase.client.Result;
41 import org.apache.hadoop.hbase.client.Scan;
42 import org.apache.hadoop.hbase.filter.Filter;
43 import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
44 import org.apache.hadoop.hbase.filter.PrefixFilter;
45 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
46 import org.apache.hadoop.hbase.io.hfile.Compression;
47 import org.apache.hadoop.hbase.util.Bytes;
48 import org.apache.hadoop.hbase.util.Writables;
49 import org.apache.hadoop.hdfs.MiniDFSCluster;
50
51
52
53
54 public class TestScanner extends HBaseTestCase {
55 private final Log LOG = LogFactory.getLog(this.getClass());
56
57 private static final byte [] FIRST_ROW = HConstants.EMPTY_START_ROW;
58 private static final byte [][] COLS = { HConstants.CATALOG_FAMILY };
59 private static final byte [][] EXPLICIT_COLS = {
60 HConstants.REGIONINFO_QUALIFIER, HConstants.SERVER_QUALIFIER,
61
62
63 };
64
65 static final HTableDescriptor TESTTABLEDESC =
66 new HTableDescriptor("testscanner");
67 static {
68 TESTTABLEDESC.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY,
69 10,
70 Compression.Algorithm.NONE.getName(), false, true, 8 * 1024,
71 HConstants.FOREVER, StoreFile.BloomType.NONE.toString(),
72 HConstants.REPLICATION_SCOPE_LOCAL));
73 }
74
75 public static final HRegionInfo REGION_INFO =
76 new HRegionInfo(TESTTABLEDESC, HConstants.EMPTY_BYTE_ARRAY,
77 HConstants.EMPTY_BYTE_ARRAY);
78
79 private static final byte [] ROW_KEY = REGION_INFO.getRegionName();
80
81 private static final long START_CODE = Long.MAX_VALUE;
82
83 private MiniDFSCluster cluster = null;
84 private HRegion r;
85 private HRegionIncommon region;
86
87 @Override
88 public void setUp() throws Exception {
89 cluster = new MiniDFSCluster(conf, 2, true, (String[])null);
90
91 this.conf.set(HConstants.HBASE_DIR,
92 this.cluster.getFileSystem().getHomeDirectory().toString());
93 super.setUp();
94
95 }
96
97
98
99
100
101 public void testStopRow() throws Exception {
102 byte [] startrow = Bytes.toBytes("bbb");
103 byte [] stoprow = Bytes.toBytes("ccc");
104 try {
105 this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
106 addContent(this.r, HConstants.CATALOG_FAMILY);
107 List<KeyValue> results = new ArrayList<KeyValue>();
108
109 Scan scan = new Scan(Bytes.toBytes("abc"), Bytes.toBytes("abd"));
110 scan.addFamily(HConstants.CATALOG_FAMILY);
111
112 InternalScanner s = r.getScanner(scan);
113 int count = 0;
114 while (s.next(results)) {
115 count++;
116 }
117 s.close();
118 assertEquals(0, count);
119
120 scan = new Scan(startrow, stoprow);
121 scan.addFamily(HConstants.CATALOG_FAMILY);
122
123 s = r.getScanner(scan);
124 count = 0;
125 KeyValue kv = null;
126 results = new ArrayList<KeyValue>();
127 for (boolean first = true; s.next(results);) {
128 kv = results.get(0);
129 if (first) {
130 assertTrue(Bytes.BYTES_COMPARATOR.compare(startrow, kv.getRow()) == 0);
131 first = false;
132 }
133 count++;
134 }
135 assertTrue(Bytes.BYTES_COMPARATOR.compare(stoprow, kv.getRow()) > 0);
136
137 assertTrue(count > 10);
138 s.close();
139 } finally {
140 this.r.close();
141 this.r.getLog().closeAndDelete();
142 shutdownDfs(this.cluster);
143 }
144 }
145
146 void rowPrefixFilter(Scan scan) throws IOException {
147 List<KeyValue> results = new ArrayList<KeyValue>();
148 scan.addFamily(HConstants.CATALOG_FAMILY);
149 InternalScanner s = r.getScanner(scan);
150 boolean hasMore = true;
151 while (hasMore) {
152 hasMore = s.next(results);
153 for (KeyValue kv : results) {
154 assertEquals((byte)'a', kv.getRow()[0]);
155 assertEquals((byte)'b', kv.getRow()[1]);
156 }
157 results.clear();
158 }
159 s.close();
160 }
161
162 void rowInclusiveStopFilter(Scan scan, byte[] stopRow) throws IOException {
163 List<KeyValue> results = new ArrayList<KeyValue>();
164 scan.addFamily(HConstants.CATALOG_FAMILY);
165 InternalScanner s = r.getScanner(scan);
166 boolean hasMore = true;
167 while (hasMore) {
168 hasMore = s.next(results);
169 for (KeyValue kv : results) {
170 assertTrue(Bytes.compareTo(kv.getRow(), stopRow) <= 0);
171 }
172 results.clear();
173 }
174 s.close();
175 }
176
177 public void testFilters() throws IOException {
178 try {
179 this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
180 addContent(this.r, HConstants.CATALOG_FAMILY);
181 byte [] prefix = Bytes.toBytes("ab");
182 Filter newFilter = new PrefixFilter(prefix);
183 Scan scan = new Scan();
184 scan.setFilter(newFilter);
185 rowPrefixFilter(scan);
186
187 byte[] stopRow = Bytes.toBytes("bbc");
188 newFilter = new WhileMatchFilter(new InclusiveStopFilter(stopRow));
189 scan = new Scan();
190 scan.setFilter(newFilter);
191 rowInclusiveStopFilter(scan, stopRow);
192
193 } finally {
194 this.r.close();
195 this.r.getLog().closeAndDelete();
196 shutdownDfs(this.cluster);
197 }
198 }
199
200
201
202
203
204
205 public void testRaceBetweenClientAndTimeout() throws Exception {
206 try {
207 this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
208 addContent(this.r, HConstants.CATALOG_FAMILY);
209 Scan scan = new Scan();
210 InternalScanner s = r.getScanner(scan);
211 List<KeyValue> results = new ArrayList<KeyValue>();
212 try {
213 s.next(results);
214 s.close();
215 s.next(results);
216 fail("We don't want anything more, we should be failing");
217 } catch (UnknownScannerException ex) {
218
219 return;
220 }
221 } finally {
222 this.r.close();
223 this.r.getLog().closeAndDelete();
224 shutdownDfs(this.cluster);
225 }
226 }
227
228
229
230
231 public void testScanner() throws IOException {
232 try {
233 r = createNewHRegion(TESTTABLEDESC, null, null);
234 region = new HRegionIncommon(r);
235
236
237
238 Put put = new Put(ROW_KEY, System.currentTimeMillis(), null);
239
240 ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
241 DataOutputStream s = new DataOutputStream(byteStream);
242 REGION_INFO.write(s);
243 put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
244 byteStream.toByteArray());
245 region.put(put);
246
247
248
249
250 scan(false, null);
251 getRegionInfo();
252
253
254
255 r.close();
256 r = openClosedRegion(r);
257 region = new HRegionIncommon(r);
258
259
260
261 scan(false, null);
262 getRegionInfo();
263
264
265
266 HServerAddress address = new HServerAddress("foo.bar.com:1234");
267
268 put = new Put(ROW_KEY, System.currentTimeMillis(), null);
269 put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
270 Bytes.toBytes(address.toString()));
271
272
273
274 region.put(put);
275
276
277
278
279 scan(true, address.toString());
280 getRegionInfo();
281
282
283
284 region.flushcache();
285
286
287
288 scan(true, address.toString());
289 getRegionInfo();
290
291
292
293 r.close();
294 r = openClosedRegion(r);
295 region = new HRegionIncommon(r);
296
297
298
299 scan(true, address.toString());
300 getRegionInfo();
301
302
303
304 address = new HServerAddress("bar.foo.com:4321");
305
306 put = new Put(ROW_KEY, System.currentTimeMillis(), null);
307
308 put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
309 Bytes.toBytes(address.toString()));
310 region.put(put);
311
312
313
314 scan(true, address.toString());
315 getRegionInfo();
316
317
318
319 region.flushcache();
320
321
322
323 scan(true, address.toString());
324 getRegionInfo();
325
326
327
328 r.close();
329 r = openClosedRegion(r);
330 region = new HRegionIncommon(r);
331
332
333
334 scan(true, address.toString());
335 getRegionInfo();
336
337
338
339 r.close();
340 r.getLog().closeAndDelete();
341
342 } finally {
343 shutdownDfs(cluster);
344 }
345 }
346
347
348 private void validateRegionInfo(byte [] regionBytes) throws IOException {
349 HRegionInfo info =
350 (HRegionInfo) Writables.getWritable(regionBytes, new HRegionInfo());
351
352 assertEquals(REGION_INFO.getRegionId(), info.getRegionId());
353 assertEquals(0, info.getStartKey().length);
354 assertEquals(0, info.getEndKey().length);
355 assertEquals(0, Bytes.compareTo(info.getRegionName(), REGION_INFO.getRegionName()));
356 assertEquals(0, info.getTableDesc().compareTo(REGION_INFO.getTableDesc()));
357 }
358
359
360 private void scan(boolean validateStartcode, String serverName)
361 throws IOException {
362 InternalScanner scanner = null;
363 Scan scan = null;
364 List<KeyValue> results = new ArrayList<KeyValue>();
365 byte [][][] scanColumns = {
366 COLS,
367 EXPLICIT_COLS
368 };
369
370 for(int i = 0; i < scanColumns.length; i++) {
371 try {
372 scan = new Scan(FIRST_ROW);
373 for (int ii = 0; ii < EXPLICIT_COLS.length; ii++) {
374 scan.addColumn(COLS[0], EXPLICIT_COLS[ii]);
375 }
376 scanner = r.getScanner(scan);
377 while (scanner.next(results)) {
378 assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
379 HConstants.REGIONINFO_QUALIFIER));
380 byte [] val = getColumn(results, HConstants.CATALOG_FAMILY,
381 HConstants.REGIONINFO_QUALIFIER).getValue();
382 validateRegionInfo(val);
383 if(validateStartcode) {
384
385
386
387
388 assertNotNull(val);
389 assertFalse(val.length == 0);
390 long startCode = Bytes.toLong(val);
391 assertEquals(START_CODE, startCode);
392 }
393
394 if(serverName != null) {
395 assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
396 HConstants.SERVER_QUALIFIER));
397 val = getColumn(results, HConstants.CATALOG_FAMILY,
398 HConstants.SERVER_QUALIFIER).getValue();
399 assertNotNull(val);
400 assertFalse(val.length == 0);
401 String server = Bytes.toString(val);
402 assertEquals(0, server.compareTo(serverName));
403 }
404 }
405 } finally {
406 InternalScanner s = scanner;
407 scanner = null;
408 if(s != null) {
409 s.close();
410 }
411 }
412 }
413 }
414
415 private boolean hasColumn(final List<KeyValue> kvs, final byte [] family,
416 final byte [] qualifier) {
417 for (KeyValue kv: kvs) {
418 if (kv.matchingFamily(family) && kv.matchingQualifier(qualifier)) {
419 return true;
420 }
421 }
422 return false;
423 }
424
425 private KeyValue getColumn(final List<KeyValue> kvs, final byte [] family,
426 final byte [] qualifier) {
427 for (KeyValue kv: kvs) {
428 if (kv.matchingFamily(family) && kv.matchingQualifier(qualifier)) {
429 return kv;
430 }
431 }
432 return null;
433 }
434
435
436
437 private void getRegionInfo() throws IOException {
438 Get get = new Get(ROW_KEY);
439 get.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
440 Result result = region.get(get, null);
441 byte [] bytes = result.value();
442 validateRegionInfo(bytes);
443 }
444
445
446
447
448
449
450
451 public void testScanAndSyncFlush() throws Exception {
452 this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
453 HRegionIncommon hri = new HRegionIncommon(r);
454 try {
455 LOG.info("Added: " + addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
456 Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
457 int count = count(hri, -1, false);
458 assertEquals(count, count(hri, 100, false));
459 } catch (Exception e) {
460 LOG.error("Failed", e);
461 throw e;
462 } finally {
463 this.r.close();
464 this.r.getLog().closeAndDelete();
465 shutdownDfs(cluster);
466 }
467 }
468
469
470
471
472
473
474
475 public void testScanAndRealConcurrentFlush() throws Exception {
476 this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
477 HRegionIncommon hri = new HRegionIncommon(r);
478 try {
479 LOG.info("Added: " + addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
480 Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
481 int count = count(hri, -1, false);
482 assertEquals(count, count(hri, 100, true));
483 } catch (Exception e) {
484 LOG.error("Failed", e);
485 throw e;
486 } finally {
487 this.r.close();
488 this.r.getLog().closeAndDelete();
489 shutdownDfs(cluster);
490 }
491 }
492
493
494
495
496
497
498
499
500
501 private int count(final HRegionIncommon hri, final int flushIndex,
502 boolean concurrent)
503 throws IOException {
504 LOG.info("Taking out counting scan");
505 ScannerIncommon s = hri.getScanner(HConstants.CATALOG_FAMILY, EXPLICIT_COLS,
506 HConstants.EMPTY_START_ROW, HConstants.LATEST_TIMESTAMP);
507 List<KeyValue> values = new ArrayList<KeyValue>();
508 int count = 0;
509 boolean justFlushed = false;
510 while (s.next(values)) {
511 if (justFlushed) {
512 LOG.info("after next() just after next flush");
513 justFlushed=false;
514 }
515 count++;
516 if (flushIndex == count) {
517 LOG.info("Starting flush at flush index " + flushIndex);
518 Thread t = new Thread() {
519 public void run() {
520 try {
521 hri.flushcache();
522 LOG.info("Finishing flush");
523 } catch (IOException e) {
524 LOG.info("Failed flush cache");
525 }
526 }
527 };
528 if (concurrent) {
529 t.start();
530 } else {
531 t.run();
532 }
533 LOG.info("Continuing on after kicking off background flush");
534 justFlushed = true;
535 }
536 }
537 s.close();
538 LOG.info("Found " + count + " items");
539 return count;
540 }
541 }