1   /**
2    * Copyright 2007 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.mapreduce;
21  
22  import java.io.File;
23  import java.io.IOException;
24  import java.util.Map;
25  import java.util.NavigableMap;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.fs.FileUtil;
31  import org.apache.hadoop.fs.Path;
32  import org.apache.hadoop.hbase.HColumnDescriptor;
33  import org.apache.hadoop.hbase.HConstants;
34  import org.apache.hadoop.hbase.HTableDescriptor;
35  import org.apache.hadoop.hbase.KeyValue;
36  import org.apache.hadoop.hbase.MultiRegionTable;
37  import org.apache.hadoop.hbase.client.HTable;
38  import org.apache.hadoop.hbase.client.Put;
39  import org.apache.hadoop.hbase.client.Result;
40  import org.apache.hadoop.hbase.client.ResultScanner;
41  import org.apache.hadoop.hbase.client.Scan;
42  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.apache.hadoop.mapred.MiniMRCluster;
45  import org.apache.hadoop.mapreduce.Job;
46  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
47  
48  /**
49   * Test Map/Reduce job over HBase tables. The map/reduce process we're testing
50   * on our tables is simple - take every row in the table, reverse the value of
51   * a particular cell, and write it back to the table.
52   */
53  public class TestTableMapReduce extends MultiRegionTable {
54    private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class);
55    static final String MULTI_REGION_TABLE_NAME = "mrtest";
56    static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
57    static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
58  
59    /** constructor */
60    public TestTableMapReduce() {
61      super(Bytes.toString(INPUT_FAMILY));
62      desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME);
63      desc.addFamily(new HColumnDescriptor(INPUT_FAMILY));
64      desc.addFamily(new HColumnDescriptor(OUTPUT_FAMILY));
65    }
66  
67    /**
68     * Pass the given key and processed record reduce
69     */
70    public static class ProcessContentsMapper
71    extends TableMapper<ImmutableBytesWritable, Put> {
72  
73      /**
74       * Pass the key, and reversed value to reduce
75       *
76       * @param key
77       * @param value
78       * @param context
79       * @throws IOException
80       */
81      public void map(ImmutableBytesWritable key, Result value,
82        Context context)
83      throws IOException, InterruptedException {
84        if (value.size() != 1) {
85          throw new IOException("There should only be one input column");
86        }
87        Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
88          cf = value.getMap();
89        if(!cf.containsKey(INPUT_FAMILY)) {
90          throw new IOException("Wrong input columns. Missing: '" +
91            Bytes.toString(INPUT_FAMILY) + "'.");
92        }
93  
94        // Get the original value and reverse it
95        String originalValue = new String(value.getValue(INPUT_FAMILY, null),
96          HConstants.UTF8_ENCODING);
97        StringBuilder newValue = new StringBuilder(originalValue);
98        newValue.reverse();
99        // Now set the value to be collected
100       Put outval = new Put(key.get());
101       outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
102       context.write(key, outval);
103     }
104   }
105 
106   /**
107    * Test a map/reduce against a multi-region table
108    * @throws IOException
109    * @throws ClassNotFoundException
110    * @throws InterruptedException
111    */
112   public void testMultiRegionTable()
113   throws IOException, InterruptedException, ClassNotFoundException {
114     runTestOnTable(new HTable(new Configuration(conf), MULTI_REGION_TABLE_NAME));
115   }
116 
117   private void runTestOnTable(HTable table)
118   throws IOException, InterruptedException, ClassNotFoundException {
119     MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
120     Job job = null;
121     try {
122       LOG.info("Before map/reduce startup");
123       job = new Job(table.getConfiguration(), "process column contents");
124       job.setNumReduceTasks(1);
125       Scan scan = new Scan();
126       scan.addFamily(INPUT_FAMILY);
127       TableMapReduceUtil.initTableMapperJob(
128         Bytes.toString(table.getTableName()), scan,
129         ProcessContentsMapper.class, ImmutableBytesWritable.class,
130         Put.class, job);
131       TableMapReduceUtil.initTableReducerJob(
132         Bytes.toString(table.getTableName()),
133         IdentityTableReducer.class, job);
134       FileOutputFormat.setOutputPath(job, new Path("test"));
135       LOG.info("Started " + Bytes.toString(table.getTableName()));
136       job.waitForCompletion(true);
137       LOG.info("After map/reduce completion");
138 
139       // verify map-reduce results
140       verify(Bytes.toString(table.getTableName()));
141     } finally {
142       mrCluster.shutdown();
143       if (job != null) {
144         FileUtil.fullyDelete(
145           new File(job.getConfiguration().get("hadoop.tmp.dir")));
146       }
147     }
148   }
149 
150   private void verify(String tableName) throws IOException {
151     HTable table = new HTable(new Configuration(conf), tableName);
152     boolean verified = false;
153     long pause = conf.getLong("hbase.client.pause", 5 * 1000);
154     int numRetries = conf.getInt("hbase.client.retries.number", 5);
155     for (int i = 0; i < numRetries; i++) {
156       try {
157         LOG.info("Verification attempt #" + i);
158         verifyAttempt(table);
159         verified = true;
160         break;
161       } catch (NullPointerException e) {
162         // If here, a cell was empty.  Presume its because updates came in
163         // after the scanner had been opened.  Wait a while and retry.
164         LOG.debug("Verification attempt failed: " + e.getMessage());
165       }
166       try {
167         Thread.sleep(pause);
168       } catch (InterruptedException e) {
169         // continue
170       }
171     }
172     assertTrue(verified);
173   }
174 
175   /**
176    * Looks at every value of the mapreduce output and verifies that indeed
177    * the values have been reversed.
178    *
179    * @param table Table to scan.
180    * @throws IOException
181    * @throws NullPointerException if we failed to find a cell value
182    */
183   private void verifyAttempt(final HTable table) throws IOException, NullPointerException {
184     Scan scan = new Scan();
185     scan.addFamily(INPUT_FAMILY);
186     scan.addFamily(OUTPUT_FAMILY);
187     ResultScanner scanner = table.getScanner(scan);
188     try {
189       for (Result r : scanner) {
190         if (LOG.isDebugEnabled()) {
191           if (r.size() > 2 ) {
192             throw new IOException("Too many results, expected 2 got " +
193               r.size());
194           }
195         }
196         byte[] firstValue = null;
197         byte[] secondValue = null;
198         int count = 0;
199         for(KeyValue kv : r.list()) {
200           if (count == 0) {
201             firstValue = kv.getValue();
202           }
203           if (count == 1) {
204             secondValue = kv.getValue();
205           }
206           count++;
207           if (count == 2) {
208             break;
209           }
210         }
211 
212         String first = "";
213         if (firstValue == null) {
214           throw new NullPointerException(Bytes.toString(r.getRow()) +
215             ": first value is null");
216         }
217         first = new String(firstValue, HConstants.UTF8_ENCODING);
218 
219         String second = "";
220         if (secondValue == null) {
221           throw new NullPointerException(Bytes.toString(r.getRow()) +
222             ": second value is null");
223         }
224         byte[] secondReversed = new byte[secondValue.length];
225         for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
226           secondReversed[i] = secondValue[j];
227         }
228         second = new String(secondReversed, HConstants.UTF8_ENCODING);
229 
230         if (first.compareTo(second) != 0) {
231           if (LOG.isDebugEnabled()) {
232             LOG.debug("second key is not the reverse of first. row=" +
233                 Bytes.toStringBinary(r.getRow()) + ", first value=" + first +
234                 ", second value=" + second);
235           }
236           fail();
237         }
238       }
239     } finally {
240       scanner.close();
241     }
242   }
243 
244   /**
245    * Test that we add tmpjars correctly including the ZK jar.
246    */
247   public void testAddDependencyJars() throws Exception {
248     Job job = new Job();
249     TableMapReduceUtil.addDependencyJars(job);
250     String tmpjars = job.getConfiguration().get("tmpjars");
251 
252     System.err.println("tmpjars: " + tmpjars);
253     assertTrue(tmpjars.contains("zookeeper"));
254     assertFalse(tmpjars.contains("guava"));
255 
256     System.err.println("appending guava jar");
257     TableMapReduceUtil.addDependencyJars(job.getConfiguration(), 
258         com.google.common.base.Function.class);
259     tmpjars = job.getConfiguration().get("tmpjars");
260     assertTrue(tmpjars.contains("guava"));
261   }
262 }