1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.mapred;
21  
22  import java.io.File;
23  import java.io.IOException;
24  import java.util.Map;
25  import java.util.NavigableMap;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.fs.FileUtil;
30  import org.apache.hadoop.hbase.*;
31  import org.apache.hadoop.hbase.client.HTable;
32  import org.apache.hadoop.hbase.client.Result;
33  import org.apache.hadoop.hbase.client.Scan;
34  import org.apache.hadoop.hbase.client.ResultScanner;
35  import org.apache.hadoop.hbase.client.Put;
36  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
37  import org.apache.hadoop.hbase.client.Result;
38  import org.apache.hadoop.hbase.util.Bytes;
39  import org.apache.hadoop.mapred.JobClient;
40  import org.apache.hadoop.mapred.JobConf;
41  import org.apache.hadoop.mapred.MapReduceBase;
42  import org.apache.hadoop.mapred.MiniMRCluster;
43  import org.apache.hadoop.mapred.OutputCollector;
44  import org.apache.hadoop.mapred.Reporter;
45  
46  /**
47   * Test Map/Reduce job over HBase tables. The map/reduce process we're testing
48   * on our tables is simple - take every row in the table, reverse the value of
49   * a particular cell, and write it back to the table.
50   */
51  public class TestTableMapReduce extends MultiRegionTable {
52    private static final Log LOG =
53      LogFactory.getLog(TestTableMapReduce.class.getName());
54  
55    static final String MULTI_REGION_TABLE_NAME = "mrtest";
56    static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
57    static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
58  
59    private static final byte [][] columns = new byte [][] {
60      INPUT_FAMILY,
61      OUTPUT_FAMILY
62    };
63  
64    /** constructor */
65    public TestTableMapReduce() {
66      super(Bytes.toString(INPUT_FAMILY));
67      desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME);
68      desc.addFamily(new HColumnDescriptor(INPUT_FAMILY));
69      desc.addFamily(new HColumnDescriptor(OUTPUT_FAMILY));
70    }
71  
72    /**
73     * Pass the given key and processed record reduce
74     */
75    public static class ProcessContentsMapper
76    extends MapReduceBase
77    implements TableMap<ImmutableBytesWritable, Put> {
78      /**
79       * Pass the key, and reversed value to reduce
80       * @param key
81       * @param value
82       * @param output
83       * @param reporter
84       * @throws IOException
85       */
86      public void map(ImmutableBytesWritable key, Result value,
87        OutputCollector<ImmutableBytesWritable, Put> output,
88        Reporter reporter)
89      throws IOException {
90        if (value.size() != 1) {
91          throw new IOException("There should only be one input column");
92        }
93        Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
94          cf = value.getMap();
95        if(!cf.containsKey(INPUT_FAMILY)) {
96          throw new IOException("Wrong input columns. Missing: '" +
97            Bytes.toString(INPUT_FAMILY) + "'.");
98        }
99  
100       // Get the original value and reverse it
101 
102       String originalValue = new String(value.getValue(INPUT_FAMILY, null),
103         HConstants.UTF8_ENCODING);
104       StringBuilder newValue = new StringBuilder(originalValue);
105       newValue.reverse();
106 
107       // Now set the value to be collected
108 
109       Put outval = new Put(key.get());
110       outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
111       output.collect(key, outval);
112     }
113   }
114 
115   /**
116    * Test a map/reduce against a multi-region table
117    * @throws IOException
118    */
119   public void testMultiRegionTable() throws IOException {
120     runTestOnTable(new HTable(conf, MULTI_REGION_TABLE_NAME));
121   }
122 
123   private void runTestOnTable(HTable table) throws IOException {
124     MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
125 
126     JobConf jobConf = null;
127     try {
128       LOG.info("Before map/reduce startup");
129       jobConf = new JobConf(conf, TestTableMapReduce.class);
130       jobConf.setJobName("process column contents");
131       jobConf.setNumReduceTasks(1);
132       TableMapReduceUtil.initTableMapJob(Bytes.toString(table.getTableName()),
133         Bytes.toString(INPUT_FAMILY), ProcessContentsMapper.class,
134         ImmutableBytesWritable.class, Put.class, jobConf);
135       TableMapReduceUtil.initTableReduceJob(Bytes.toString(table.getTableName()),
136         IdentityTableReduce.class, jobConf);
137 
138       LOG.info("Started " + Bytes.toString(table.getTableName()));
139       JobClient.runJob(jobConf);
140       LOG.info("After map/reduce completion");
141 
142       // verify map-reduce results
143       verify(Bytes.toString(table.getTableName()));
144     } finally {
145       mrCluster.shutdown();
146       if (jobConf != null) {
147         FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
148       }
149     }
150   }
151 
152   private void verify(String tableName) throws IOException {
153     HTable table = new HTable(conf, tableName);
154     boolean verified = false;
155     long pause = conf.getLong("hbase.client.pause", 5 * 1000);
156     int numRetries = conf.getInt("hbase.client.retries.number", 5);
157     for (int i = 0; i < numRetries; i++) {
158       try {
159         LOG.info("Verification attempt #" + i);
160         verifyAttempt(table);
161         verified = true;
162         break;
163       } catch (NullPointerException e) {
164         // If here, a cell was empty.  Presume its because updates came in
165         // after the scanner had been opened.  Wait a while and retry.
166         LOG.debug("Verification attempt failed: " + e.getMessage());
167       }
168       try {
169         Thread.sleep(pause);
170       } catch (InterruptedException e) {
171         // continue
172       }
173     }
174     assertTrue(verified);
175   }
176 
177   /**
178    * Looks at every value of the mapreduce output and verifies that indeed
179    * the values have been reversed.
180    * @param table Table to scan.
181    * @throws IOException
182    * @throws NullPointerException if we failed to find a cell value
183    */
184   private void verifyAttempt(final HTable table) throws IOException, NullPointerException {
185     Scan scan = new Scan();
186     scan.addColumns(columns);
187     ResultScanner scanner = table.getScanner(scan);
188     try {
189       for (Result r : scanner) {
190         if (LOG.isDebugEnabled()) {
191           if (r.size() > 2 ) {
192             throw new IOException("Too many results, expected 2 got " +
193               r.size());
194           }
195         }
196         byte[] firstValue = null;
197         byte[] secondValue = null;
198         int count = 0;
199          for(KeyValue kv : r.list()) {
200           if (count == 0) {
201             firstValue = kv.getValue();
202           }
203           if (count == 1) {
204             secondValue = kv.getValue();
205           }
206           count++;
207           if (count == 2) {
208             break;
209           }
210         }
211 
212 
213         String first = "";
214         if (firstValue == null) {
215           throw new NullPointerException(Bytes.toString(r.getRow()) +
216             ": first value is null");
217         }
218         first = new String(firstValue, HConstants.UTF8_ENCODING);
219 
220         String second = "";
221         if (secondValue == null) {
222           throw new NullPointerException(Bytes.toString(r.getRow()) +
223             ": second value is null");
224         }
225         byte[] secondReversed = new byte[secondValue.length];
226         for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
227           secondReversed[i] = secondValue[j];
228         }
229         second = new String(secondReversed, HConstants.UTF8_ENCODING);
230 
231         if (first.compareTo(second) != 0) {
232           if (LOG.isDebugEnabled()) {
233             LOG.debug("second key is not the reverse of first. row=" +
234                 r.getRow() + ", first value=" + first + ", second value=" +
235                 second);
236           }
237           fail();
238         }
239       }
240     } finally {
241       scanner.close();
242     }
243   }
244 }