1   /**
2    * Copyright 2007 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.mapreduce;
21  
22  import java.io.File;
23  import java.io.IOException;
24  import java.util.Map;
25  import java.util.NavigableMap;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.fs.FileUtil;
31  import org.apache.hadoop.fs.Path;
32  import org.apache.hadoop.hbase.HBaseTestingUtility;
33  import org.apache.hadoop.hbase.client.HTable;
34  import org.apache.hadoop.hbase.client.Result;
35  import org.apache.hadoop.hbase.client.Scan;
36  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.io.NullWritable;
39  import org.apache.hadoop.mapreduce.Job;
40  import org.apache.hadoop.mapreduce.Reducer;
41  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
42  import org.junit.After;
43  import org.junit.AfterClass;
44  import org.junit.Before;
45  import org.junit.BeforeClass;
46  import org.junit.Test;
47  
48  import static org.junit.Assert.assertEquals;
49  import static org.junit.Assert.assertTrue;
50  
51  /**
52   * Tests various scan start and stop row scenarios. This is set in a scan and
53   * tested in a MapReduce job to see if that is handed over and done properly
54   * too.
55   */
56  public class TestTableInputFormatScan {
57  
58    static final Log LOG = LogFactory.getLog(TestTableInputFormatScan.class);
59    static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
60  
61    static final byte[] TABLE_NAME = Bytes.toBytes("scantest");
62    static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
63    static final String KEY_STARTROW = "startRow";
64    static final String KEY_LASTROW = "stpRow";
65  
66    private static HTable table = null;
67  
68    @BeforeClass
69    public static void setUpBeforeClass() throws Exception {
70      // switch TIF to log at DEBUG level
71      TEST_UTIL.enableDebug(TableInputFormat.class);
72      TEST_UTIL.enableDebug(TableInputFormatBase.class);
73      // start mini hbase cluster
74      TEST_UTIL.startMiniCluster(3);
75      // create and fill table
76      table = TEST_UTIL.createTable(TABLE_NAME, INPUT_FAMILY);
77      TEST_UTIL.createMultiRegions(table, INPUT_FAMILY);
78      TEST_UTIL.loadTable(table, INPUT_FAMILY);
79      // start MR cluster
80      TEST_UTIL.startMiniMapReduceCluster();
81    }
82  
83    @AfterClass
84    public static void tearDownAfterClass() throws Exception {
85      TEST_UTIL.shutdownMiniMapReduceCluster();
86      TEST_UTIL.shutdownMiniCluster();
87    }
88  
89    @Before
90    public void setUp() throws Exception {
91      // nothing
92    }
93  
94    /**
95     * @throws java.lang.Exception
96     */
97    @After
98    public void tearDown() throws Exception {
99      Configuration c = TEST_UTIL.getConfiguration();
100     FileUtil.fullyDelete(new File(c.get("hadoop.tmp.dir")));
101   }
102 
103   /**
104    * Pass the key and value to reduce.
105    */
106   public static class ScanMapper
107   extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
108 
109     /**
110      * Pass the key and value to reduce.
111      *
112      * @param key  The key, here "aaa", "aab" etc.
113      * @param value  The value is the same as the key.
114      * @param context  The task context.
115      * @throws IOException When reading the rows fails.
116      */
117     @Override
118     public void map(ImmutableBytesWritable key, Result value,
119       Context context)
120     throws IOException, InterruptedException {
121       if (value.size() != 1) {
122         throw new IOException("There should only be one input column");
123       }
124       Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
125         cf = value.getMap();
126       if(!cf.containsKey(INPUT_FAMILY)) {
127         throw new IOException("Wrong input columns. Missing: '" +
128           Bytes.toString(INPUT_FAMILY) + "'.");
129       }
130       String val = Bytes.toStringBinary(value.getValue(INPUT_FAMILY, null));
131       LOG.info("map: key -> " + Bytes.toStringBinary(key.get()) +
132         ", value -> " + val);
133       context.write(key, key);
134     }
135 
136   }
137 
138   /**
139    * Checks the last and first key seen against the scanner boundaries.
140    */
141   public static class ScanReducer
142   extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable,
143                   NullWritable, NullWritable> {
144 
145     private String first = null;
146     private String last = null;
147 
148     protected void reduce(ImmutableBytesWritable key,
149         Iterable<ImmutableBytesWritable> values, Context context)
150     throws IOException ,InterruptedException {
151       int count = 0;
152       for (ImmutableBytesWritable value : values) {
153         String val = Bytes.toStringBinary(value.get());
154         LOG.info("reduce: key[" + count + "] -> " +
155           Bytes.toStringBinary(key.get()) + ", value -> " + val);
156         if (first == null) first = val;
157         last = val;
158         count++;
159       }
160     }
161 
162     protected void cleanup(Context context)
163     throws IOException, InterruptedException {
164       Configuration c = context.getConfiguration();
165       String startRow = c.get(KEY_STARTROW);
166       String lastRow = c.get(KEY_LASTROW);
167       LOG.info("cleanup: first -> \"" + first + "\", start row -> \"" + startRow + "\"");
168       LOG.info("cleanup: last -> \"" + last + "\", last row -> \"" + lastRow + "\"");
169       if (startRow != null && startRow.length() > 0) {
170         assertEquals(startRow, first);
171       }
172       if (lastRow != null && lastRow.length() > 0) {
173         assertEquals(lastRow, last);
174       }
175     }
176 
177   }
178 
179   /**
180    * Tests a MR scan using specific start and stop rows.
181    *
182    * @throws IOException
183    * @throws ClassNotFoundException
184    * @throws InterruptedException
185    */
186   @Test
187   public void testScanEmptyToEmpty()
188   throws IOException, InterruptedException, ClassNotFoundException {
189     testScan(null, null, null);
190   }
191 
192   /**
193    * Tests a MR scan using specific start and stop rows.
194    *
195    * @throws IOException
196    * @throws ClassNotFoundException
197    * @throws InterruptedException
198    */
199   @Test
200   public void testScanEmptyToAPP()
201   throws IOException, InterruptedException, ClassNotFoundException {
202     testScan(null, "app", "apo");
203   }
204 
205   /**
206    * Tests a MR scan using specific start and stop rows.
207    *
208    * @throws IOException
209    * @throws ClassNotFoundException
210    * @throws InterruptedException
211    */
212   @Test
213   public void testScanEmptyToBBA()
214   throws IOException, InterruptedException, ClassNotFoundException {
215     testScan(null, "bba", "baz");
216   }
217 
218   /**
219    * Tests a MR scan using specific start and stop rows.
220    *
221    * @throws IOException
222    * @throws ClassNotFoundException
223    * @throws InterruptedException
224    */
225   @Test
226   public void testScanEmptyToBBB()
227   throws IOException, InterruptedException, ClassNotFoundException {
228     testScan(null, "bbb", "bba");
229   }
230 
231   /**
232    * Tests a MR scan using specific start and stop rows.
233    *
234    * @throws IOException
235    * @throws ClassNotFoundException
236    * @throws InterruptedException
237    */
238   @Test
239   public void testScanEmptyToOPP()
240   throws IOException, InterruptedException, ClassNotFoundException {
241     testScan(null, "opp", "opo");
242   }
243 
244   /**
245    * Tests a MR scan using specific start and stop rows.
246    *
247    * @throws IOException
248    * @throws ClassNotFoundException
249    * @throws InterruptedException
250    */
251   @Test
252   public void testScanOBBToOPP()
253   throws IOException, InterruptedException, ClassNotFoundException {
254     testScan("obb", "opp", "opo");
255   }
256 
257   /**
258    * Tests a MR scan using specific start and stop rows.
259    *
260    * @throws IOException
261    * @throws ClassNotFoundException
262    * @throws InterruptedException
263    */
264   @Test
265   public void testScanOBBToQPP()
266   throws IOException, InterruptedException, ClassNotFoundException {
267     testScan("obb", "qpp", "qpo");
268   }
269 
270   /**
271    * Tests a MR scan using specific start and stop rows.
272    *
273    * @throws IOException
274    * @throws ClassNotFoundException
275    * @throws InterruptedException
276    */
277   @Test
278   public void testScanOPPToEmpty()
279   throws IOException, InterruptedException, ClassNotFoundException {
280     testScan("opp", null, "zzz");
281   }
282 
283   /**
284    * Tests a MR scan using specific start and stop rows.
285    *
286    * @throws IOException
287    * @throws ClassNotFoundException
288    * @throws InterruptedException
289    */
290   @Test
291   public void testScanYYXToEmpty()
292   throws IOException, InterruptedException, ClassNotFoundException {
293     testScan("yyx", null, "zzz");
294   }
295 
296   /**
297    * Tests a MR scan using specific start and stop rows.
298    *
299    * @throws IOException
300    * @throws ClassNotFoundException
301    * @throws InterruptedException
302    */
303   @Test
304   public void testScanYYYToEmpty()
305   throws IOException, InterruptedException, ClassNotFoundException {
306     testScan("yyy", null, "zzz");
307   }
308 
309   /**
310    * Tests a MR scan using specific start and stop rows.
311    *
312    * @throws IOException
313    * @throws ClassNotFoundException
314    * @throws InterruptedException
315    */
316   @Test
317   public void testScanYZYToEmpty()
318   throws IOException, InterruptedException, ClassNotFoundException {
319     testScan("yzy", null, "zzz");
320   }
321 
322   /**
323    * Tests a MR scan using specific start and stop rows.
324    *
325    * @throws IOException
326    * @throws ClassNotFoundException
327    * @throws InterruptedException
328    */
329   private void testScan(String start, String stop, String last)
330   throws IOException, InterruptedException, ClassNotFoundException {
331     String jobName = "Scan" + (start != null ? start.toUpperCase() : "Empty") +
332     "To" + (stop != null ? stop.toUpperCase() : "Empty");
333     LOG.info("Before map/reduce startup - job " + jobName);
334     Configuration c = new Configuration(TEST_UTIL.getConfiguration());
335     Scan scan = new Scan();
336     scan.addFamily(INPUT_FAMILY);
337     if (start != null) {
338       scan.setStartRow(Bytes.toBytes(start));
339     }
340     c.set(KEY_STARTROW, start != null ? start : "");
341     if (stop != null) {
342       scan.setStopRow(Bytes.toBytes(stop));
343     }
344     c.set(KEY_LASTROW, last != null ? last : "");
345     LOG.info("scan before: " + scan);
346     Job job = new Job(c, jobName);
347     TableMapReduceUtil.initTableMapperJob(
348       Bytes.toString(TABLE_NAME), scan, ScanMapper.class,
349       ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
350     job.setReducerClass(ScanReducer.class);
351     job.setNumReduceTasks(1); // one to get final "first" and "last" key
352     FileOutputFormat.setOutputPath(job, new Path(job.getJobName()));
353     LOG.info("Started " + job.getJobName());
354     job.waitForCompletion(true);
355     assertTrue(job.isComplete());
356     LOG.info("After map/reduce completion - job " + jobName);
357   }
358 }