1   /**
2    * Copyright 2007 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.ByteArrayOutputStream;
23  import java.io.DataOutputStream;
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.List;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.HBaseTestCase;
31  import org.apache.hadoop.hbase.HColumnDescriptor;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.HRegionInfo;
34  import org.apache.hadoop.hbase.HServerAddress;
35  import org.apache.hadoop.hbase.HTableDescriptor;
36  import org.apache.hadoop.hbase.KeyValue;
37  import org.apache.hadoop.hbase.UnknownScannerException;
38  import org.apache.hadoop.hbase.client.Get;
39  import org.apache.hadoop.hbase.client.Put;
40  import org.apache.hadoop.hbase.client.Result;
41  import org.apache.hadoop.hbase.client.Scan;
42  import org.apache.hadoop.hbase.filter.Filter;
43  import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
44  import org.apache.hadoop.hbase.filter.PrefixFilter;
45  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
46  import org.apache.hadoop.hbase.io.hfile.Compression;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.hbase.util.Writables;
49  import org.apache.hadoop.hdfs.MiniDFSCluster;
50  
51  /**
52   * Test of a long-lived scanner validating as we go.
53   */
54  public class TestScanner extends HBaseTestCase {
55    private final Log LOG = LogFactory.getLog(this.getClass());
56  
57    private static final byte [] FIRST_ROW = HConstants.EMPTY_START_ROW;
58    private static final byte [][] COLS = { HConstants.CATALOG_FAMILY };
59    private static final byte [][] EXPLICIT_COLS = {
60      HConstants.REGIONINFO_QUALIFIER, HConstants.SERVER_QUALIFIER,
61        // TODO ryan
62        //HConstants.STARTCODE_QUALIFIER
63    };
64  
65    static final HTableDescriptor TESTTABLEDESC =
66      new HTableDescriptor("testscanner");
67    static {
68      TESTTABLEDESC.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY,
69        10,  // Ten is arbitrary number.  Keep versions to help debuggging.
70        Compression.Algorithm.NONE.getName(), false, true, 8 * 1024,
71        HConstants.FOREVER, StoreFile.BloomType.NONE.toString(),
72        HConstants.REPLICATION_SCOPE_LOCAL));
73    }
74    /** HRegionInfo for root region */
75    public static final HRegionInfo REGION_INFO =
76      new HRegionInfo(TESTTABLEDESC, HConstants.EMPTY_BYTE_ARRAY,
77      HConstants.EMPTY_BYTE_ARRAY);
78  
79    private static final byte [] ROW_KEY = REGION_INFO.getRegionName();
80  
81    private static final long START_CODE = Long.MAX_VALUE;
82  
83    private MiniDFSCluster cluster = null;
84    private HRegion r;
85    private HRegionIncommon region;
86  
87    @Override
88    public void setUp() throws Exception {
89      cluster = new MiniDFSCluster(conf, 2, true, (String[])null);
90      // Set the hbase.rootdir to be the home directory in mini dfs.
91      this.conf.set(HConstants.HBASE_DIR,
92        this.cluster.getFileSystem().getHomeDirectory().toString());
93      super.setUp();
94  
95    }
96  
97    /**
98     * Test basic stop row filter works.
99     * @throws Exception
100    */
101   public void testStopRow() throws Exception {
102     byte [] startrow = Bytes.toBytes("bbb");
103     byte [] stoprow = Bytes.toBytes("ccc");
104     try {
105       this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
106       addContent(this.r, HConstants.CATALOG_FAMILY);
107       List<KeyValue> results = new ArrayList<KeyValue>();
108       // Do simple test of getting one row only first.
109       Scan scan = new Scan(Bytes.toBytes("abc"), Bytes.toBytes("abd"));
110       scan.addFamily(HConstants.CATALOG_FAMILY);
111 
112       InternalScanner s = r.getScanner(scan);
113       int count = 0;
114       while (s.next(results)) {
115         count++;
116       }
117       s.close();
118       assertEquals(0, count);
119       // Now do something a bit more imvolved.
120       scan = new Scan(startrow, stoprow);
121       scan.addFamily(HConstants.CATALOG_FAMILY);
122 
123       s = r.getScanner(scan);
124       count = 0;
125       KeyValue kv = null;
126       results = new ArrayList<KeyValue>();
127       for (boolean first = true; s.next(results);) {
128         kv = results.get(0);
129         if (first) {
130           assertTrue(Bytes.BYTES_COMPARATOR.compare(startrow, kv.getRow()) == 0);
131           first = false;
132         }
133         count++;
134       }
135       assertTrue(Bytes.BYTES_COMPARATOR.compare(stoprow, kv.getRow()) > 0);
136       // We got something back.
137       assertTrue(count > 10);
138       s.close();
139     } finally {
140       this.r.close();
141       this.r.getLog().closeAndDelete();
142       shutdownDfs(this.cluster);
143     }
144   }
145 
146   void rowPrefixFilter(Scan scan) throws IOException {
147     List<KeyValue> results = new ArrayList<KeyValue>();
148     scan.addFamily(HConstants.CATALOG_FAMILY);
149     InternalScanner s = r.getScanner(scan);
150     boolean hasMore = true;
151     while (hasMore) {
152       hasMore = s.next(results);
153       for (KeyValue kv : results) {
154         assertEquals((byte)'a', kv.getRow()[0]);
155         assertEquals((byte)'b', kv.getRow()[1]);
156       }
157       results.clear();
158     }
159     s.close();
160   }
161 
162   void rowInclusiveStopFilter(Scan scan, byte[] stopRow) throws IOException {
163     List<KeyValue> results = new ArrayList<KeyValue>();
164     scan.addFamily(HConstants.CATALOG_FAMILY);
165     InternalScanner s = r.getScanner(scan);
166     boolean hasMore = true;
167     while (hasMore) {
168       hasMore = s.next(results);
169       for (KeyValue kv : results) {
170         assertTrue(Bytes.compareTo(kv.getRow(), stopRow) <= 0);
171       }
172       results.clear();
173     }
174     s.close();
175   }
176 
177   public void testFilters() throws IOException {
178     try {
179       this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
180       addContent(this.r, HConstants.CATALOG_FAMILY);
181       byte [] prefix = Bytes.toBytes("ab");
182       Filter newFilter = new PrefixFilter(prefix);
183       Scan scan = new Scan();
184       scan.setFilter(newFilter);
185       rowPrefixFilter(scan);
186 
187       byte[] stopRow = Bytes.toBytes("bbc");
188       newFilter = new WhileMatchFilter(new InclusiveStopFilter(stopRow));
189       scan = new Scan();
190       scan.setFilter(newFilter);
191       rowInclusiveStopFilter(scan, stopRow);
192 
193     } finally {
194       this.r.close();
195       this.r.getLog().closeAndDelete();
196       shutdownDfs(this.cluster);
197     }
198   }
199 
200   /**
201    * Test that closing a scanner while a client is using it doesn't throw
202    * NPEs but instead a UnknownScannerException. HBASE-2503
203    * @throws Exception
204    */
205   public void testRaceBetweenClientAndTimeout() throws Exception {
206     try {
207       this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
208       addContent(this.r, HConstants.CATALOG_FAMILY);
209       Scan scan = new Scan();
210       InternalScanner s = r.getScanner(scan);
211       List<KeyValue> results = new ArrayList<KeyValue>();
212       try {
213         s.next(results);
214         s.close();
215         s.next(results);
216         fail("We don't want anything more, we should be failing");
217       } catch (UnknownScannerException ex) {
218         // ok!
219         return;
220       }
221     } finally {
222       this.r.close();
223       this.r.getLog().closeAndDelete();
224       shutdownDfs(this.cluster);
225     }
226   }
227 
228   /** The test!
229    * @throws IOException
230    */
231   public void testScanner() throws IOException {
232     try {
233       r = createNewHRegion(TESTTABLEDESC, null, null);
234       region = new HRegionIncommon(r);
235 
236       // Write information to the meta table
237 
238       Put put = new Put(ROW_KEY, System.currentTimeMillis(), null);
239 
240       ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
241       DataOutputStream s = new DataOutputStream(byteStream);
242       REGION_INFO.write(s);
243       put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
244           byteStream.toByteArray());
245       region.put(put);
246 
247       // What we just committed is in the memstore. Verify that we can get
248       // it back both with scanning and get
249 
250       scan(false, null);
251       getRegionInfo();
252 
253       // Close and re-open
254 
255       r.close();
256       r = openClosedRegion(r);
257       region = new HRegionIncommon(r);
258 
259       // Verify we can get the data back now that it is on disk.
260 
261       scan(false, null);
262       getRegionInfo();
263 
264       // Store some new information
265 
266       HServerAddress address = new HServerAddress("foo.bar.com:1234");
267 
268       put = new Put(ROW_KEY, System.currentTimeMillis(), null);
269       put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
270           Bytes.toBytes(address.toString()));
271 
272 //      put.add(HConstants.COL_STARTCODE, Bytes.toBytes(START_CODE));
273 
274       region.put(put);
275 
276       // Validate that we can still get the HRegionInfo, even though it is in
277       // an older row on disk and there is a newer row in the memstore
278 
279       scan(true, address.toString());
280       getRegionInfo();
281 
282       // flush cache
283 
284       region.flushcache();
285 
286       // Validate again
287 
288       scan(true, address.toString());
289       getRegionInfo();
290 
291       // Close and reopen
292 
293       r.close();
294       r = openClosedRegion(r);
295       region = new HRegionIncommon(r);
296 
297       // Validate again
298 
299       scan(true, address.toString());
300       getRegionInfo();
301 
302       // Now update the information again
303 
304       address = new HServerAddress("bar.foo.com:4321");
305 
306       put = new Put(ROW_KEY, System.currentTimeMillis(), null);
307 
308       put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
309           Bytes.toBytes(address.toString()));
310       region.put(put);
311 
312       // Validate again
313 
314       scan(true, address.toString());
315       getRegionInfo();
316 
317       // flush cache
318 
319       region.flushcache();
320 
321       // Validate again
322 
323       scan(true, address.toString());
324       getRegionInfo();
325 
326       // Close and reopen
327 
328       r.close();
329       r = openClosedRegion(r);
330       region = new HRegionIncommon(r);
331 
332       // Validate again
333 
334       scan(true, address.toString());
335       getRegionInfo();
336 
337       // clean up
338 
339       r.close();
340       r.getLog().closeAndDelete();
341 
342     } finally {
343       shutdownDfs(cluster);
344     }
345   }
346 
347   /** Compare the HRegionInfo we read from HBase to what we stored */
348   private void validateRegionInfo(byte [] regionBytes) throws IOException {
349     HRegionInfo info =
350       (HRegionInfo) Writables.getWritable(regionBytes, new HRegionInfo());
351 
352     assertEquals(REGION_INFO.getRegionId(), info.getRegionId());
353     assertEquals(0, info.getStartKey().length);
354     assertEquals(0, info.getEndKey().length);
355     assertEquals(0, Bytes.compareTo(info.getRegionName(), REGION_INFO.getRegionName()));
356     assertEquals(0, info.getTableDesc().compareTo(REGION_INFO.getTableDesc()));
357   }
358 
359   /** Use a scanner to get the region info and then validate the results */
360   private void scan(boolean validateStartcode, String serverName)
361   throws IOException {
362     InternalScanner scanner = null;
363     Scan scan = null;
364     List<KeyValue> results = new ArrayList<KeyValue>();
365     byte [][][] scanColumns = {
366         COLS,
367         EXPLICIT_COLS
368     };
369 
370     for(int i = 0; i < scanColumns.length; i++) {
371       try {
372         scan = new Scan(FIRST_ROW);
373         for (int ii = 0; ii < EXPLICIT_COLS.length; ii++) {
374           scan.addColumn(COLS[0],  EXPLICIT_COLS[ii]);
375         }
376         scanner = r.getScanner(scan);
377         while (scanner.next(results)) {
378           assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
379               HConstants.REGIONINFO_QUALIFIER));
380           byte [] val = getColumn(results, HConstants.CATALOG_FAMILY,
381               HConstants.REGIONINFO_QUALIFIER).getValue();
382           validateRegionInfo(val);
383           if(validateStartcode) {
384 //            assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
385 //                HConstants.STARTCODE_QUALIFIER));
386 //            val = getColumn(results, HConstants.CATALOG_FAMILY,
387 //                HConstants.STARTCODE_QUALIFIER).getValue();
388             assertNotNull(val);
389             assertFalse(val.length == 0);
390             long startCode = Bytes.toLong(val);
391             assertEquals(START_CODE, startCode);
392           }
393 
394           if(serverName != null) {
395             assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
396                 HConstants.SERVER_QUALIFIER));
397             val = getColumn(results, HConstants.CATALOG_FAMILY,
398                 HConstants.SERVER_QUALIFIER).getValue();
399             assertNotNull(val);
400             assertFalse(val.length == 0);
401             String server = Bytes.toString(val);
402             assertEquals(0, server.compareTo(serverName));
403           }
404         }
405       } finally {
406         InternalScanner s = scanner;
407         scanner = null;
408         if(s != null) {
409           s.close();
410         }
411       }
412     }
413   }
414 
415   private boolean hasColumn(final List<KeyValue> kvs, final byte [] family,
416       final byte [] qualifier) {
417     for (KeyValue kv: kvs) {
418       if (kv.matchingFamily(family) && kv.matchingQualifier(qualifier)) {
419         return true;
420       }
421     }
422     return false;
423   }
424 
425   private KeyValue getColumn(final List<KeyValue> kvs, final byte [] family,
426       final byte [] qualifier) {
427     for (KeyValue kv: kvs) {
428       if (kv.matchingFamily(family) && kv.matchingQualifier(qualifier)) {
429         return kv;
430       }
431     }
432     return null;
433   }
434 
435 
436   /** Use get to retrieve the HRegionInfo and validate it */
437   private void getRegionInfo() throws IOException {
438     Get get = new Get(ROW_KEY);
439     get.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
440     Result result = region.get(get, null);
441     byte [] bytes = result.value();
442     validateRegionInfo(bytes);
443   }
444 
445   /**
446    * Tests to do a sync flush during the middle of a scan. This is testing the StoreScanner
447    * update readers code essentially.  This is not highly concurrent, since its all 1 thread.
448    * HBase-910.
449    * @throws Exception
450    */
451   public void testScanAndSyncFlush() throws Exception {
452     this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
453     HRegionIncommon hri = new HRegionIncommon(r);
454     try {
455         LOG.info("Added: " + addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
456             Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
457       int count = count(hri, -1, false);
458       assertEquals(count, count(hri, 100, false)); // do a sync flush.
459     } catch (Exception e) {
460       LOG.error("Failed", e);
461       throw e;
462     } finally {
463       this.r.close();
464       this.r.getLog().closeAndDelete();
465       shutdownDfs(cluster);
466     }
467   }
468 
469   /**
470    * Tests to do a concurrent flush (using a 2nd thread) while scanning.  This tests both
471    * the StoreScanner update readers and the transition from memstore -> snapshot -> store file.
472    *
473    * @throws Exception
474    */
475   public void testScanAndRealConcurrentFlush() throws Exception {
476     this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
477     HRegionIncommon hri = new HRegionIncommon(r);
478     try {
479         LOG.info("Added: " + addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
480             Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
481       int count = count(hri, -1, false);
482       assertEquals(count, count(hri, 100, true)); // do a true concurrent background thread flush
483     } catch (Exception e) {
484       LOG.error("Failed", e);
485       throw e;
486     } finally {
487       this.r.close();
488       this.r.getLog().closeAndDelete();
489       shutdownDfs(cluster);
490     }
491   }
492 
493 
494   /*
495    * @param hri Region
496    * @param flushIndex At what row we start the flush.
497    * @param concurrent if the flush should be concurrent or sync.
498    * @return Count of rows found.
499    * @throws IOException
500    */
501   private int count(final HRegionIncommon hri, final int flushIndex,
502                     boolean concurrent)
503   throws IOException {
504     LOG.info("Taking out counting scan");
505     ScannerIncommon s = hri.getScanner(HConstants.CATALOG_FAMILY, EXPLICIT_COLS,
506         HConstants.EMPTY_START_ROW, HConstants.LATEST_TIMESTAMP);
507     List<KeyValue> values = new ArrayList<KeyValue>();
508     int count = 0;
509     boolean justFlushed = false;
510     while (s.next(values)) {
511       if (justFlushed) {
512         LOG.info("after next() just after next flush");
513         justFlushed=false;
514       }
515       count++;
516       if (flushIndex == count) {
517         LOG.info("Starting flush at flush index " + flushIndex);
518         Thread t = new Thread() {
519           public void run() {
520             try {
521               hri.flushcache();
522               LOG.info("Finishing flush");
523             } catch (IOException e) {
524               LOG.info("Failed flush cache");
525             }
526           }
527         };
528         if (concurrent) {
529           t.start(); // concurrently flush.
530         } else {
531           t.run(); // sync flush
532         }
533         LOG.info("Continuing on after kicking off background flush");
534         justFlushed = true;
535       }
536     }
537     s.close();
538     LOG.info("Found " + count + " items");
539     return count;
540   }
541 }