1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.regionserver;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Iterator;
26 import java.util.List;
27 import java.util.Random;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.HBaseTestCase;
32 import org.apache.hadoop.hbase.HColumnDescriptor;
33 import org.apache.hadoop.hbase.HConstants;
34 import org.apache.hadoop.hbase.HRegionInfo;
35 import org.apache.hadoop.hbase.HTableDescriptor;
36 import org.apache.hadoop.hbase.KeyValue;
37 import org.apache.hadoop.hbase.client.Put;
38 import org.apache.hadoop.hbase.client.Scan;
39 import org.apache.hadoop.hbase.io.hfile.Compression;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.apache.hadoop.hdfs.MiniDFSCluster;
42
43 public class TestWideScanner extends HBaseTestCase {
44 private final Log LOG = LogFactory.getLog(this.getClass());
45
46 static final byte[] A = Bytes.toBytes("A");
47 static final byte[] B = Bytes.toBytes("B");
48 static final byte[] C = Bytes.toBytes("C");
49 static byte[][] COLUMNS = { A, B, C };
50 static final Random rng = new Random();
51 static final HTableDescriptor TESTTABLEDESC =
52 new HTableDescriptor("testwidescan");
53 static {
54 TESTTABLEDESC.addFamily(new HColumnDescriptor(A,
55 100,
56 Compression.Algorithm.NONE.getName(), false, true, 8 * 1024,
57 HConstants.FOREVER, StoreFile.BloomType.NONE.toString(),
58 HColumnDescriptor.DEFAULT_REPLICATION_SCOPE));
59 TESTTABLEDESC.addFamily(new HColumnDescriptor(B,
60 100,
61 Compression.Algorithm.NONE.getName(), false, true, 8 * 1024,
62 HConstants.FOREVER, StoreFile.BloomType.NONE.toString(),
63 HColumnDescriptor.DEFAULT_REPLICATION_SCOPE));
64 TESTTABLEDESC.addFamily(new HColumnDescriptor(C,
65 100,
66 Compression.Algorithm.NONE.getName(), false, true, 8 * 1024,
67 HConstants.FOREVER, StoreFile.BloomType.NONE.toString(),
68 HColumnDescriptor.DEFAULT_REPLICATION_SCOPE));
69 }
70
71
72 public static final HRegionInfo REGION_INFO =
73 new HRegionInfo(TESTTABLEDESC, HConstants.EMPTY_BYTE_ARRAY,
74 HConstants.EMPTY_BYTE_ARRAY);
75
76 MiniDFSCluster cluster = null;
77 HRegion r;
78
79 @Override
80 public void setUp() throws Exception {
81 cluster = new MiniDFSCluster(conf, 2, true, (String[])null);
82
83 this.conf.set(HConstants.HBASE_DIR,
84 this.cluster.getFileSystem().getHomeDirectory().toString());
85 super.setUp();
86 }
87
88 private int addWideContent(HRegion region) throws IOException {
89 int count = 0;
90 for (char c = 'a'; c <= 'c'; c++) {
91 byte[] row = Bytes.toBytes("ab" + c);
92 int i, j;
93 long ts = System.currentTimeMillis();
94 for (i = 0; i < 100; i++) {
95 byte[] b = Bytes.toBytes(String.format("%10d", i));
96 for (j = 0; j < 100; j++) {
97 Put put = new Put(row);
98 put.add(COLUMNS[rng.nextInt(COLUMNS.length)], b, ++ts, b);
99 region.put(put);
100 count++;
101 }
102 }
103 }
104 return count;
105 }
106
107 public void testWideScanBatching() throws IOException {
108 final int batch = 256;
109 try {
110 this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
111 int inserted = addWideContent(this.r);
112 List<KeyValue> results = new ArrayList<KeyValue>();
113 Scan scan = new Scan();
114 scan.addFamily(A);
115 scan.addFamily(B);
116 scan.addFamily(C);
117 scan.setMaxVersions(100);
118 scan.setBatch(batch);
119 InternalScanner s = r.getScanner(scan);
120 int total = 0;
121 int i = 0;
122 boolean more;
123 do {
124 more = s.next(results);
125 i++;
126 LOG.info("iteration #" + i + ", results.size=" + results.size());
127
128
129 assertTrue(results.size() <= batch);
130
131 total += results.size();
132
133 if (results.size() > 0) {
134
135 byte[] row = results.get(0).getRow();
136 for (KeyValue kv: results) {
137 assertTrue(Bytes.equals(row, kv.getRow()));
138 }
139 }
140
141 results.clear();
142
143
144 Iterator<KeyValueScanner> scanners =
145 ((HRegion.RegionScanner)s).storeHeap.getHeap().iterator();
146 while (scanners.hasNext()) {
147 StoreScanner ss = (StoreScanner)scanners.next();
148 ss.updateReaders();
149 }
150 } while (more);
151
152
153 LOG.info("inserted " + inserted + ", scanned " + total);
154 assertEquals(total, inserted);
155
156 s.close();
157 } finally {
158 this.r.close();
159 this.r.getLog().closeAndDelete();
160 shutdownDfs(this.cluster);
161 }
162 }
163 }