View Javadoc

1   /**
2    * Copyright 2008 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.mapreduce;
21  
22  import java.io.ByteArrayInputStream;
23  import java.io.ByteArrayOutputStream;
24  import java.io.DataInputStream;
25  import java.io.DataOutputStream;
26  import java.io.IOException;
27  import java.net.URL;
28  import java.net.URLDecoder;
29  import java.util.Enumeration;
30  import java.util.HashSet;
31  import java.util.Set;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FileSystem;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.HBaseConfiguration;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.client.HTable;
41  import org.apache.hadoop.hbase.client.Scan;
42  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
43  import org.apache.hadoop.hbase.util.Base64;
44  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
45  import org.apache.hadoop.io.Writable;
46  import org.apache.hadoop.io.WritableComparable;
47  import org.apache.hadoop.mapreduce.Job;
48  import org.apache.hadoop.util.StringUtils;
49  
50  /**
51   * Utility for {@link TableMapper} and {@link TableReducer}
52   */
53  @SuppressWarnings("unchecked")
54  public class TableMapReduceUtil {
55    static Log LOG = LogFactory.getLog(TableMapReduceUtil.class);
56    
57    /**
58     * Use this before submitting a TableMap job. It will appropriately set up
59     * the job.
60     *
61     * @param table  The table name to read from.
62     * @param scan  The scan instance with the columns, time range etc.
63     * @param mapper  The mapper class to use.
64     * @param outputKeyClass  The class of the output key.
65     * @param outputValueClass  The class of the output value.
66     * @param job  The current job to adjust.  Make sure the passed job is
67     * carrying all necessary HBase configuration.
68     * @throws IOException When setting up the details fails.
69     */
70    public static void initTableMapperJob(String table, Scan scan,
71        Class<? extends TableMapper> mapper,
72        Class<? extends WritableComparable> outputKeyClass,
73        Class<? extends Writable> outputValueClass, Job job)
74    throws IOException {
75      initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass,
76          job, true);
77    }
78  
79    /**
80     * Use this before submitting a TableMap job. It will appropriately set up
81     * the job.
82     *
83     * @param table  The table name to read from.
84     * @param scan  The scan instance with the columns, time range etc.
85     * @param mapper  The mapper class to use.
86     * @param outputKeyClass  The class of the output key.
87     * @param outputValueClass  The class of the output value.
88     * @param job  The current job to adjust.  Make sure the passed job is
89     * carrying all necessary HBase configuration.
90     * @param addDependencyJars upload HBase jars and jars for any of the configured
91     *           job classes via the distributed cache (tmpjars).
92     * @throws IOException When setting up the details fails.
93     */
94    public static void initTableMapperJob(String table, Scan scan,
95        Class<? extends TableMapper> mapper,
96        Class<? extends WritableComparable> outputKeyClass,
97        Class<? extends Writable> outputValueClass, Job job,
98        boolean addDependencyJars)
99    throws IOException {
100     job.setInputFormatClass(TableInputFormat.class);
101     if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass);
102     if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass);
103     job.setMapperClass(mapper);
104     HBaseConfiguration.addHbaseResources(job.getConfiguration());
105     job.getConfiguration().set(TableInputFormat.INPUT_TABLE, table);
106     job.getConfiguration().set(TableInputFormat.SCAN,
107       convertScanToString(scan));
108     if (addDependencyJars) {
109       addDependencyJars(job);
110     }
111   }
112 
113   /**
114    * Writes the given scan into a Base64 encoded string.
115    *
116    * @param scan  The scan to write out.
117    * @return The scan saved in a Base64 encoded string.
118    * @throws IOException When writing the scan fails.
119    */
120   static String convertScanToString(Scan scan) throws IOException {
121     ByteArrayOutputStream out = new ByteArrayOutputStream();
122     DataOutputStream dos = new DataOutputStream(out);
123     scan.write(dos);
124     return Base64.encodeBytes(out.toByteArray());
125   }
126 
127   /**
128    * Converts the given Base64 string back into a Scan instance.
129    *
130    * @param base64  The scan details.
131    * @return The newly created Scan instance.
132    * @throws IOException When reading the scan instance fails.
133    */
134   static Scan convertStringToScan(String base64) throws IOException {
135     ByteArrayInputStream bis = new ByteArrayInputStream(Base64.decode(base64));
136     DataInputStream dis = new DataInputStream(bis);
137     Scan scan = new Scan();
138     scan.readFields(dis);
139     return scan;
140   }
141 
142   /**
143    * Use this before submitting a TableReduce job. It will
144    * appropriately set up the JobConf.
145    *
146    * @param table  The output table.
147    * @param reducer  The reducer class to use.
148    * @param job  The current job to adjust.
149    * @throws IOException When determining the region count fails.
150    */
151   public static void initTableReducerJob(String table,
152     Class<? extends TableReducer> reducer, Job job)
153   throws IOException {
154     initTableReducerJob(table, reducer, job, null);
155   }
156 
157   /**
158    * Use this before submitting a TableReduce job. It will
159    * appropriately set up the JobConf.
160    *
161    * @param table  The output table.
162    * @param reducer  The reducer class to use.
163    * @param job  The current job to adjust.
164    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
165    * default partitioner.
166    * @throws IOException When determining the region count fails.
167    */
168   public static void initTableReducerJob(String table,
169     Class<? extends TableReducer> reducer, Job job,
170     Class partitioner) throws IOException {
171     initTableReducerJob(table, reducer, job, partitioner, null, null, null);
172   }
173 
174   /**
175    * Use this before submitting a TableReduce job. It will
176    * appropriately set up the JobConf.
177    *
178    * @param table  The output table.
179    * @param reducer  The reducer class to use.
180    * @param job  The current job to adjust.  Make sure the passed job is
181    * carrying all necessary HBase configuration.
182    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
183    * default partitioner.
184    * @param quorumAddress Distant cluster to write to; default is null for
185    * output to the cluster that is designated in <code>hbase-site.xml</code>.
186    * Set this String to the zookeeper ensemble of an alternate remote cluster
187    * when you would have the reduce write a cluster that is other than the
188    * default; e.g. copying tables between clusters, the source would be
189    * designated by <code>hbase-site.xml</code> and this param would have the
190    * ensemble address of the remote cluster.  The format to pass is particular.
191    * Pass <code> &lt;hbase.zookeeper.quorum>:&lt;hbase.zookeeper.client.port>:&lt;zookeeper.znode.parent>
192    * </code> such as <code>server,server2,server3:2181:/hbase</code>.
193    * @param serverClass redefined hbase.regionserver.class
194    * @param serverImpl redefined hbase.regionserver.impl
195    * @throws IOException When determining the region count fails.
196    */
197   public static void initTableReducerJob(String table,
198     Class<? extends TableReducer> reducer, Job job,
199     Class partitioner, String quorumAddress, String serverClass,
200     String serverImpl) throws IOException {
201     initTableReducerJob(table, reducer, job, partitioner, quorumAddress,
202         serverClass, serverImpl, true);
203   }
204 
205   /**
206    * Use this before submitting a TableReduce job. It will
207    * appropriately set up the JobConf.
208    *
209    * @param table  The output table.
210    * @param reducer  The reducer class to use.
211    * @param job  The current job to adjust.  Make sure the passed job is
212    * carrying all necessary HBase configuration.
213    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
214    * default partitioner.
215    * @param quorumAddress Distant cluster to write to; default is null for
216    * output to the cluster that is designated in <code>hbase-site.xml</code>.
217    * Set this String to the zookeeper ensemble of an alternate remote cluster
218    * when you would have the reduce write a cluster that is other than the
219    * default; e.g. copying tables between clusters, the source would be
220    * designated by <code>hbase-site.xml</code> and this param would have the
221    * ensemble address of the remote cluster.  The format to pass is particular.
222    * Pass <code> &lt;hbase.zookeeper.quorum>:&lt;hbase.zookeeper.client.port>:&lt;zookeeper.znode.parent>
223    * </code> such as <code>server,server2,server3:2181:/hbase</code>.
224    * @param serverClass redefined hbase.regionserver.class
225    * @param serverImpl redefined hbase.regionserver.impl
226    * @param addDependencyJars upload HBase jars and jars for any of the configured
227    *           job classes via the distributed cache (tmpjars).
228    * @throws IOException When determining the region count fails.
229    */
230   public static void initTableReducerJob(String table,
231     Class<? extends TableReducer> reducer, Job job,
232     Class partitioner, String quorumAddress, String serverClass,
233     String serverImpl, boolean addDependencyJars) throws IOException {
234 
235     Configuration conf = job.getConfiguration();
236     HBaseConfiguration.addHbaseResources(conf);
237     job.setOutputFormatClass(TableOutputFormat.class);
238     if (reducer != null) job.setReducerClass(reducer);
239     conf.set(TableOutputFormat.OUTPUT_TABLE, table);
240     // If passed a quorum/ensemble address, pass it on to TableOutputFormat.
241     if (quorumAddress != null) {
242       // Calling this will validate the format
243       ZKUtil.transformClusterKey(quorumAddress);
244       conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
245     }
246     if (serverClass != null && serverImpl != null) {
247       conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
248       conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl);
249     }
250     job.setOutputKeyClass(ImmutableBytesWritable.class);
251     job.setOutputValueClass(Writable.class);
252     if (partitioner == HRegionPartitioner.class) {
253       job.setPartitionerClass(HRegionPartitioner.class);
254       HTable outputTable = new HTable(conf, table);
255       int regions = outputTable.getRegionsInfo().size();
256       if (job.getNumReduceTasks() > regions) {
257         job.setNumReduceTasks(outputTable.getRegionsInfo().size());
258       }
259     } else if (partitioner != null) {
260       job.setPartitionerClass(partitioner);
261     }
262 
263     if (addDependencyJars) {
264       addDependencyJars(job);
265     }
266   }
267 
268   /**
269    * Ensures that the given number of reduce tasks for the given job
270    * configuration does not exceed the number of regions for the given table.
271    *
272    * @param table  The table to get the region count for.
273    * @param job  The current job to adjust.
274    * @throws IOException When retrieving the table details fails.
275    */
276   public static void limitNumReduceTasks(String table, Job job)
277   throws IOException {
278     HTable outputTable = new HTable(job.getConfiguration(), table);
279     int regions = outputTable.getRegionsInfo().size();
280     if (job.getNumReduceTasks() > regions)
281       job.setNumReduceTasks(regions);
282   }
283 
284   /**
285    * Sets the number of reduce tasks for the given job configuration to the
286    * number of regions the given table has.
287    *
288    * @param table  The table to get the region count for.
289    * @param job  The current job to adjust.
290    * @throws IOException When retrieving the table details fails.
291    */
292   public static void setNumReduceTasks(String table, Job job)
293   throws IOException {
294     HTable outputTable = new HTable(job.getConfiguration(), table);
295     int regions = outputTable.getRegionsInfo().size();
296     job.setNumReduceTasks(regions);
297   }
298 
299   /**
300    * Sets the number of rows to return and cache with each scanner iteration.
301    * Higher caching values will enable faster mapreduce jobs at the expense of
302    * requiring more heap to contain the cached rows.
303    *
304    * @param job The current job to adjust.
305    * @param batchSize The number of rows to return in batch with each scanner
306    * iteration.
307    */
308   public static void setScannerCaching(Job job, int batchSize) {
309     job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize);
310   }
311 
312   /**
313    * Add the HBase dependency jars as well as jars for any of the configured
314    * job classes to the job configuration, so that JobClient will ship them
315    * to the cluster and add them to the DistributedCache.
316    */
317   public static void addDependencyJars(Job job) throws IOException {
318     try {
319       addDependencyJars(job.getConfiguration(),
320           org.apache.zookeeper.ZooKeeper.class,
321           job.getMapOutputKeyClass(),
322           job.getMapOutputValueClass(),
323           job.getInputFormatClass(),
324           job.getOutputKeyClass(),
325           job.getOutputValueClass(),
326           job.getOutputFormatClass(),
327           job.getPartitionerClass(),
328           job.getCombinerClass());
329     } catch (ClassNotFoundException e) {
330       throw new IOException(e);
331     }    
332   }
333   
334   /**
335    * Add the jars containing the given classes to the job's configuration
336    * such that JobClient will ship them to the cluster and add them to
337    * the DistributedCache.
338    */
339   public static void addDependencyJars(Configuration conf,
340       Class... classes) throws IOException {
341 
342     FileSystem localFs = FileSystem.getLocal(conf);
343 
344     Set<String> jars = new HashSet<String>();
345 
346     // Add jars that are already in the tmpjars variable
347     jars.addAll( conf.getStringCollection("tmpjars") );
348 
349     // Add jars containing the specified classes
350     for (Class clazz : classes) {
351       if (clazz == null) continue;
352 
353       String pathStr = findContainingJar(clazz);
354       if (pathStr == null) {
355         LOG.warn("Could not find jar for class " + clazz +
356                  " in order to ship it to the cluster.");
357         continue;
358       }
359       Path path = new Path(pathStr);
360       if (!localFs.exists(path)) {
361         LOG.warn("Could not validate jar file " + path + " for class "
362                  + clazz);
363         continue;
364       }
365       jars.add(path.makeQualified(localFs).toString());
366     }
367     if (jars.isEmpty()) return;
368 
369     conf.set("tmpjars",
370              StringUtils.arrayToString(jars.toArray(new String[0])));
371   }
372 
373   /** 
374    * Find a jar that contains a class of the same name, if any.
375    * It will return a jar file, even if that is not the first thing
376    * on the class path that has a class with the same name.
377    * 
378    * This is shamelessly copied from JobConf
379    * 
380    * @param my_class the class to find.
381    * @return a jar file that contains the class, or null.
382    * @throws IOException
383    */
384   private static String findContainingJar(Class my_class) {
385     ClassLoader loader = my_class.getClassLoader();
386     String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
387     try {
388       for(Enumeration itr = loader.getResources(class_file);
389           itr.hasMoreElements();) {
390         URL url = (URL) itr.nextElement();
391         if ("jar".equals(url.getProtocol())) {
392           String toReturn = url.getPath();
393           if (toReturn.startsWith("file:")) {
394             toReturn = toReturn.substring("file:".length());
395           }
396           // URLDecoder is a misnamed class, since it actually decodes
397           // x-www-form-urlencoded MIME type rather than actual
398           // URL encoding (which the file path has). Therefore it would
399           // decode +s to ' 's which is incorrect (spaces are actually
400           // either unencoded or encoded as "%20"). Replace +s first, so
401           // that they are kept sacred during the decoding process.
402           toReturn = toReturn.replaceAll("\\+", "%2B");
403           toReturn = URLDecoder.decode(toReturn, "UTF-8");
404           return toReturn.replaceAll("!.*$", "");
405         }
406       }
407     } catch (IOException e) {
408       throw new RuntimeException(e);
409     }
410     return null;
411   }
412 
413 
414 }