View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.mapred;
21  
22  import java.io.IOException;
23  
24  import org.apache.hadoop.hbase.HBaseConfiguration;
25  import org.apache.hadoop.hbase.client.HTable;
26  import org.apache.hadoop.hbase.client.Put;
27  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
28  import org.apache.hadoop.io.Writable;
29  import org.apache.hadoop.io.WritableComparable;
30  import org.apache.hadoop.mapred.FileInputFormat;
31  import org.apache.hadoop.mapred.JobConf;
32  import org.apache.hadoop.mapred.InputFormat;
33  import org.apache.hadoop.mapred.OutputFormat;
34  import org.apache.hadoop.mapred.TextInputFormat;
35  import org.apache.hadoop.mapred.TextOutputFormat;
36  
37  /**
38   * Utility for {@link TableMap} and {@link TableReduce}
39   */
40  @Deprecated
41  @SuppressWarnings("unchecked")
42  public class TableMapReduceUtil {
43  
44    /**
45     * Use this before submitting a TableMap job. It will
46     * appropriately set up the JobConf.
47     *
48     * @param table  The table name to read from.
49     * @param columns  The columns to scan.
50     * @param mapper  The mapper class to use.
51     * @param outputKeyClass  The class of the output key.
52     * @param outputValueClass  The class of the output value.
53     * @param job  The current job configuration to adjust.
54     */
55    public static void initTableMapJob(String table, String columns,
56      Class<? extends TableMap> mapper,
57      Class<? extends WritableComparable> outputKeyClass,
58      Class<? extends Writable> outputValueClass, JobConf job) {
59      initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job, true);
60    }
61  
62    /**
63     * Use this before submitting a TableMap job. It will
64     * appropriately set up the JobConf.
65     *
66     * @param table  The table name to read from.
67     * @param columns  The columns to scan.
68     * @param mapper  The mapper class to use.
69     * @param outputKeyClass  The class of the output key.
70     * @param outputValueClass  The class of the output value.
71     * @param job  The current job configuration to adjust.
72     * @param addDependencyJars upload HBase jars and jars for any of the configured
73     *           job classes via the distributed cache (tmpjars).
74     */
75    public static void initTableMapJob(String table, String columns,
76      Class<? extends TableMap> mapper,
77      Class<? extends WritableComparable> outputKeyClass,
78      Class<? extends Writable> outputValueClass, JobConf job, boolean addDependencyJars) {
79  
80      job.setInputFormat(TableInputFormat.class);
81      job.setMapOutputValueClass(outputValueClass);
82      job.setMapOutputKeyClass(outputKeyClass);
83      job.setMapperClass(mapper);
84      FileInputFormat.addInputPaths(job, table);
85      job.set(TableInputFormat.COLUMN_LIST, columns);
86      if (addDependencyJars) {
87        try {
88          addDependencyJars(job);
89        } catch (IOException e) {
90          e.printStackTrace();
91        }
92      }
93    }
94  
95    /**
96     * Use this before submitting a TableReduce job. It will
97     * appropriately set up the JobConf.
98     *
99     * @param table  The output table.
100    * @param reducer  The reducer class to use.
101    * @param job  The current job configuration to adjust.
102    * @throws IOException When determining the region count fails.
103    */
104   public static void initTableReduceJob(String table,
105     Class<? extends TableReduce> reducer, JobConf job)
106   throws IOException {
107     initTableReduceJob(table, reducer, job, null);
108   }
109 
110   /**
111    * Use this before submitting a TableReduce job. It will
112    * appropriately set up the JobConf.
113    *
114    * @param table  The output table.
115    * @param reducer  The reducer class to use.
116    * @param job  The current job configuration to adjust.
117    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
118    * default partitioner.
119    * @throws IOException When determining the region count fails.
120    */
121   public static void initTableReduceJob(String table,
122     Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
123   throws IOException {
124     initTableReduceJob(table, reducer, job, partitioner, true);
125   }
126 
127   /**
128    * Use this before submitting a TableReduce job. It will
129    * appropriately set up the JobConf.
130    *
131    * @param table  The output table.
132    * @param reducer  The reducer class to use.
133    * @param job  The current job configuration to adjust.
134    * @param partitioner  Partitioner to use. Pass <code>null</code> to use
135    * default partitioner.
136    * @param addDependencyJars upload HBase jars and jars for any of the configured
137    *           job classes via the distributed cache (tmpjars).
138    * @throws IOException When determining the region count fails.
139    */
140   public static void initTableReduceJob(String table,
141     Class<? extends TableReduce> reducer, JobConf job, Class partitioner,
142     boolean addDependencyJars) throws IOException {
143     job.setOutputFormat(TableOutputFormat.class);
144     job.setReducerClass(reducer);
145     job.set(TableOutputFormat.OUTPUT_TABLE, table);
146     job.setOutputKeyClass(ImmutableBytesWritable.class);
147     job.setOutputValueClass(Put.class);
148     if (partitioner == HRegionPartitioner.class) {
149       job.setPartitionerClass(HRegionPartitioner.class);
150       HTable outputTable = new HTable(HBaseConfiguration.create(job), table);
151       int regions = outputTable.getRegionsInfo().size();
152       if (job.getNumReduceTasks() > regions) {
153         job.setNumReduceTasks(outputTable.getRegionsInfo().size());
154       }
155     } else if (partitioner != null) {
156       job.setPartitionerClass(partitioner);
157     }
158     if (addDependencyJars) {
159       addDependencyJars(job);
160     }
161   }
162 
163   /**
164    * Ensures that the given number of reduce tasks for the given job
165    * configuration does not exceed the number of regions for the given table.
166    *
167    * @param table  The table to get the region count for.
168    * @param job  The current job configuration to adjust.
169    * @throws IOException When retrieving the table details fails.
170    */
171   public static void limitNumReduceTasks(String table, JobConf job)
172   throws IOException {
173     HTable outputTable = new HTable(HBaseConfiguration.create(job), table);
174     int regions = outputTable.getRegionsInfo().size();
175     if (job.getNumReduceTasks() > regions)
176       job.setNumReduceTasks(regions);
177   }
178 
179   /**
180    * Ensures that the given number of map tasks for the given job
181    * configuration does not exceed the number of regions for the given table.
182    *
183    * @param table  The table to get the region count for.
184    * @param job  The current job configuration to adjust.
185    * @throws IOException When retrieving the table details fails.
186    */
187   public static void limitNumMapTasks(String table, JobConf job)
188   throws IOException {
189     HTable outputTable = new HTable(HBaseConfiguration.create(job), table);
190     int regions = outputTable.getRegionsInfo().size();
191     if (job.getNumMapTasks() > regions)
192       job.setNumMapTasks(regions);
193   }
194 
195   /**
196    * Sets the number of reduce tasks for the given job configuration to the
197    * number of regions the given table has.
198    *
199    * @param table  The table to get the region count for.
200    * @param job  The current job configuration to adjust.
201    * @throws IOException When retrieving the table details fails.
202    */
203   public static void setNumReduceTasks(String table, JobConf job)
204   throws IOException {
205     HTable outputTable = new HTable(HBaseConfiguration.create(job), table);
206     int regions = outputTable.getRegionsInfo().size();
207     job.setNumReduceTasks(regions);
208   }
209 
210   /**
211    * Sets the number of map tasks for the given job configuration to the
212    * number of regions the given table has.
213    *
214    * @param table  The table to get the region count for.
215    * @param job  The current job configuration to adjust.
216    * @throws IOException When retrieving the table details fails.
217    */
218   public static void setNumMapTasks(String table, JobConf job)
219   throws IOException {
220     HTable outputTable = new HTable(HBaseConfiguration.create(job), table);
221     int regions = outputTable.getRegionsInfo().size();
222     job.setNumMapTasks(regions);
223   }
224 
225   /**
226    * Sets the number of rows to return and cache with each scanner iteration.
227    * Higher caching values will enable faster mapreduce jobs at the expense of
228    * requiring more heap to contain the cached rows.
229    *
230    * @param job The current job configuration to adjust.
231    * @param batchSize The number of rows to return in batch with each scanner
232    * iteration.
233    */
234   public static void setScannerCaching(JobConf job, int batchSize) {
235     job.setInt("hbase.client.scanner.caching", batchSize);
236   }
237 
238   /**
239    * @see org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil#addDependencyJars(Job)
240    */
241   public static void addDependencyJars(JobConf job) throws IOException {
242     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(
243       job,
244       org.apache.zookeeper.ZooKeeper.class,
245       com.google.common.base.Function.class,
246       job.getMapOutputKeyClass(),
247       job.getMapOutputValueClass(),
248       job.getOutputKeyClass(),
249       job.getOutputValueClass(),
250       job.getPartitionerClass(),
251       job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class),
252       job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class),
253       job.getCombinerClass());
254   }
255 }