1   /*
2    * Copyright 2009 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.regionserver;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.Random;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.HBaseTestCase;
32  import org.apache.hadoop.hbase.HColumnDescriptor;
33  import org.apache.hadoop.hbase.HConstants;
34  import org.apache.hadoop.hbase.HRegionInfo;
35  import org.apache.hadoop.hbase.HTableDescriptor;
36  import org.apache.hadoop.hbase.KeyValue;
37  import org.apache.hadoop.hbase.client.Put;
38  import org.apache.hadoop.hbase.client.Scan;
39  import org.apache.hadoop.hbase.io.hfile.Compression;
40  import org.apache.hadoop.hbase.util.Bytes;
41  import org.apache.hadoop.hdfs.MiniDFSCluster;
42  
43  public class TestWideScanner extends HBaseTestCase {
44    private final Log LOG = LogFactory.getLog(this.getClass());
45  
46    static final byte[] A = Bytes.toBytes("A");
47    static final byte[] B = Bytes.toBytes("B");
48    static final byte[] C = Bytes.toBytes("C");
49    static byte[][] COLUMNS = { A, B, C };
50    static final Random rng = new Random();
51    static final HTableDescriptor TESTTABLEDESC =
52      new HTableDescriptor("testwidescan");
53    static {
54      TESTTABLEDESC.addFamily(new HColumnDescriptor(A,
55        100,  // Keep versions to help debuggging.
56        Compression.Algorithm.NONE.getName(), false, true, 8 * 1024,
57        HConstants.FOREVER, StoreFile.BloomType.NONE.toString(),
58        HColumnDescriptor.DEFAULT_REPLICATION_SCOPE));
59      TESTTABLEDESC.addFamily(new HColumnDescriptor(B,
60        100,  // Keep versions to help debuggging.
61        Compression.Algorithm.NONE.getName(), false, true, 8 * 1024,
62        HConstants.FOREVER, StoreFile.BloomType.NONE.toString(),
63        HColumnDescriptor.DEFAULT_REPLICATION_SCOPE));
64      TESTTABLEDESC.addFamily(new HColumnDescriptor(C,
65        100,  // Keep versions to help debuggging.
66        Compression.Algorithm.NONE.getName(), false, true, 8 * 1024,
67        HConstants.FOREVER, StoreFile.BloomType.NONE.toString(),
68        HColumnDescriptor.DEFAULT_REPLICATION_SCOPE));
69    }
70  
71    /** HRegionInfo for root region */
72    public static final HRegionInfo REGION_INFO =
73      new HRegionInfo(TESTTABLEDESC, HConstants.EMPTY_BYTE_ARRAY,
74      HConstants.EMPTY_BYTE_ARRAY);
75  
76    MiniDFSCluster cluster = null;
77    HRegion r;
78  
79    @Override
80    public void setUp() throws Exception {
81      cluster = new MiniDFSCluster(conf, 2, true, (String[])null);
82      // Set the hbase.rootdir to be the home directory in mini dfs.
83      this.conf.set(HConstants.HBASE_DIR,
84        this.cluster.getFileSystem().getHomeDirectory().toString());
85      super.setUp();
86    }
87  
88    private int addWideContent(HRegion region) throws IOException {
89      int count = 0;
90      for (char c = 'a'; c <= 'c'; c++) {
91        byte[] row = Bytes.toBytes("ab" + c);
92        int i, j;
93        long ts = System.currentTimeMillis();
94        for (i = 0; i < 100; i++) {
95          byte[] b = Bytes.toBytes(String.format("%10d", i));
96          for (j = 0; j < 100; j++) {
97            Put put = new Put(row);
98            put.add(COLUMNS[rng.nextInt(COLUMNS.length)], b, ++ts, b);
99            region.put(put);
100           count++;
101         }
102       }
103     }
104     return count;
105   }
106 
107   public void testWideScanBatching() throws IOException {
108     final int batch = 256;
109     try {
110       this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
111       int inserted = addWideContent(this.r);
112       List<KeyValue> results = new ArrayList<KeyValue>();
113       Scan scan = new Scan();
114       scan.addFamily(A);
115       scan.addFamily(B);
116       scan.addFamily(C);
117       scan.setMaxVersions(100);
118       scan.setBatch(batch);
119       InternalScanner s = r.getScanner(scan);
120       int total = 0;
121       int i = 0;
122       boolean more;
123       do {
124         more = s.next(results);
125         i++;
126         LOG.info("iteration #" + i + ", results.size=" + results.size());
127 
128         // assert that the result set is no larger
129         assertTrue(results.size() <= batch);
130 
131         total += results.size();
132 
133         if (results.size() > 0) {
134           // assert that all results are from the same row
135           byte[] row = results.get(0).getRow();
136           for (KeyValue kv: results) {
137             assertTrue(Bytes.equals(row, kv.getRow()));
138           }
139         }
140 
141         results.clear();
142 
143         // trigger ChangedReadersObservers
144         Iterator<KeyValueScanner> scanners =
145           ((HRegion.RegionScanner)s).storeHeap.getHeap().iterator();
146         while (scanners.hasNext()) {
147           StoreScanner ss = (StoreScanner)scanners.next();
148           ss.updateReaders();
149         }
150       } while (more);
151 
152       // assert that the scanner returned all values
153       LOG.info("inserted " + inserted + ", scanned " + total);
154       assertEquals(total, inserted);
155 
156       s.close();
157     } finally {
158       this.r.close();
159       this.r.getLog().closeAndDelete();
160       shutdownDfs(this.cluster);
161     }
162   }
163 }