1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapreduce;
21
22 import java.io.File;
23 import java.io.IOException;
24 import java.util.Map;
25 import java.util.NavigableMap;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.FileUtil;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.hbase.HBaseTestingUtility;
33 import org.apache.hadoop.hbase.client.HTable;
34 import org.apache.hadoop.hbase.client.Result;
35 import org.apache.hadoop.hbase.client.Scan;
36 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.io.NullWritable;
39 import org.apache.hadoop.mapreduce.Job;
40 import org.apache.hadoop.mapreduce.Reducer;
41 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
42 import org.junit.After;
43 import org.junit.AfterClass;
44 import org.junit.Before;
45 import org.junit.BeforeClass;
46 import org.junit.Test;
47
48 import static org.junit.Assert.assertEquals;
49 import static org.junit.Assert.assertTrue;
50
51
52
53
54
55
56 public class TestTableInputFormatScan {
57
58 static final Log LOG = LogFactory.getLog(TestTableInputFormatScan.class);
59 static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
60
61 static final byte[] TABLE_NAME = Bytes.toBytes("scantest");
62 static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
63 static final String KEY_STARTROW = "startRow";
64 static final String KEY_LASTROW = "stpRow";
65
66 private static HTable table = null;
67
68 @BeforeClass
69 public static void setUpBeforeClass() throws Exception {
70
71 TEST_UTIL.enableDebug(TableInputFormat.class);
72 TEST_UTIL.enableDebug(TableInputFormatBase.class);
73
74 TEST_UTIL.startMiniCluster(3);
75
76 table = TEST_UTIL.createTable(TABLE_NAME, INPUT_FAMILY);
77 TEST_UTIL.createMultiRegions(table, INPUT_FAMILY);
78 TEST_UTIL.loadTable(table, INPUT_FAMILY);
79
80 TEST_UTIL.startMiniMapReduceCluster();
81 }
82
83 @AfterClass
84 public static void tearDownAfterClass() throws Exception {
85 TEST_UTIL.shutdownMiniMapReduceCluster();
86 TEST_UTIL.shutdownMiniCluster();
87 }
88
89 @Before
90 public void setUp() throws Exception {
91
92 }
93
94
95
96
97 @After
98 public void tearDown() throws Exception {
99 Configuration c = TEST_UTIL.getConfiguration();
100 FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir")));
101 }
102
103
104
105
106 public static class ScanMapper
107 extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
108
109
110
111
112
113
114
115
116
117 @Override
118 public void map(ImmutableBytesWritable key, Result value,
119 Context context)
120 throws IOException, InterruptedException {
121 if (value.size() != 1) {
122 throw new IOException("There should only be one input column");
123 }
124 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
125 cf = value.getMap();
126 if(!cf.containsKey(INPUT_FAMILY)) {
127 throw new IOException("Wrong input columns. Missing: '" +
128 Bytes.toString(INPUT_FAMILY) + "'.");
129 }
130 String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
131 LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) +
132 ", value -> " + val);
133 context.write(key, key);
134 }
135
136 }
137
138
139
140
141 public static class ScanReducer
142 extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
143 NullWritable, NullWritable> {
144
145 private String first = null;
146 private String last = null;
147
148 protected void reduce(ImmutableBytesWritable key,
149 Iterable<ImmutableBytesWritable> values, Context context)
150 throws IOException ,InterruptedException {
151 int count = 0;
152 for (ImmutableBytesWritable value : values) {
153 String val = Bytes.toStringBinary(value.get());
154 LOG.info("reduce: key[" + count + "] -> " +
155 Bytes.toStringBinary(key.get()) + ", value -> " + val);
156 if (first == null) first = val;
157 last = val;
158 count++;
159 }
160 }
161
162 protected void cleanup(Context context)
163 throws IOException, InterruptedException {
164 Configuration c = context.getConfiguration();
165 String startRow = c.get(KEY_STARTROW);
166 String lastRow = c.get(KEY_LASTROW);
167 LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\"");
168 LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\"");
169 if (startRow != null && startRow.length() > 0) {
170 assertEquals(startRow, first);
171 }
172 if (lastRow != null && lastRow.length() > 0) {
173 assertEquals(lastRow, last);
174 }
175 }
176
177 }
178
179
180
181
182
183
184
185
186 @Test
187 public void testScanEmptyToEmpty()
188 throws IOException, InterruptedException, ClassNotFoundException {
189 testScan(null, null, null);
190 }
191
192
193
194
195
196
197
198
199 @Test
200 public void testScanEmptyToAPP()
201 throws IOException, InterruptedException, ClassNotFoundException {
202 testScan(null, "app", "apo");
203 }
204
205
206
207
208
209
210
211
212 @Test
213 public void testScanEmptyToBBA()
214 throws IOException, InterruptedException, ClassNotFoundException {
215 testScan(null, "bba", "baz");
216 }
217
218
219
220
221
222
223
224
225 @Test
226 public void testScanEmptyToBBB()
227 throws IOException, InterruptedException, ClassNotFoundException {
228 testScan(null, "bbb", "bba");
229 }
230
231
232
233
234
235
236
237
238 @Test
239 public void testScanEmptyToOPP()
240 throws IOException, InterruptedException, ClassNotFoundException {
241 testScan(null, "opp", "opo");
242 }
243
244
245
246
247
248
249
250
251 @Test
252 public void testScanOBBToOPP()
253 throws IOException, InterruptedException, ClassNotFoundException {
254 testScan("obb", "opp", "opo");
255 }
256
257
258
259
260
261
262
263
264 @Test
265 public void testScanOBBToQPP()
266 throws IOException, InterruptedException, ClassNotFoundException {
267 testScan("obb", "qpp", "qpo");
268 }
269
270
271
272
273
274
275
276
277 @Test
278 public void testScanOPPToEmpty()
279 throws IOException, InterruptedException, ClassNotFoundException {
280 testScan("opp", null, "zzz");
281 }
282
283
284
285
286
287
288
289
290 @Test
291 public void testScanYYXToEmpty()
292 throws IOException, InterruptedException, ClassNotFoundException {
293 testScan("yyx", null, "zzz");
294 }
295
296
297
298
299
300
301
302
303 @Test
304 public void testScanYYYToEmpty()
305 throws IOException, InterruptedException, ClassNotFoundException {
306 testScan("yyy", null, "zzz");
307 }
308
309
310
311
312
313
314
315
316 @Test
317 public void testScanYZYToEmpty()
318 throws IOException, InterruptedException, ClassNotFoundException {
319 testScan("yzy", null, "zzz");
320 }
321
322
323
324
325
326
327
328
329 private void testScan(String start, String stop, String last)
330 throws IOException, InterruptedException, ClassNotFoundException {
331 String jobName = "Scan" + (start != null ? start.toUpperCase() : "Empty") +
332 "To" + (stop != null ? stop.toUpperCase() : "Empty");
333 LOG.info("Before map/reduce startup - job " + jobName);
334 Configuration c = new Configuration(TEST_UTIL.getConfiguration());
335 Scan scan = new Scan();
336 scan.addFamily(INPUT_FAMILY);
337 if (start != null) {
338 scan.setStartRow(Bytes.toBytes(start));
339 }
340 c.set(KEY_STARTROW, start != null ? start : "");
341 if (stop != null) {
342 scan.setStopRow(Bytes.toBytes(stop));
343 }
344 c.set(KEY_LASTROW, last != null ? last : "");
345 LOG.info("scan before: " + scan);
346 Job job = new Job(c, jobName);
347 TableMapReduceUtil.initTableMapperJob(
348 Bytes.toString(TABLE_NAME), scan, ScanMapper.class,
349 ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
350 job.setReducerClass(ScanReducer.class);
351 job.setNumReduceTasks(1);
352 FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
353 LOG.info("Started " + job.getJobName());
354 job.waitForCompletion(true);
355 assertTrue(job.isComplete());
356 LOG.info("After map/reduce completion - job " + jobName);
357 }
358 }