1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapred;
21
22 import java.io.IOException;
23
24 import org.apache.hadoop.hbase.HBaseConfiguration;
25 import org.apache.hadoop.hbase.client.HTable;
26 import org.apache.hadoop.hbase.client.Put;
27 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
28 import org.apache.hadoop.io.Writable;
29 import org.apache.hadoop.io.WritableComparable;
30 import org.apache.hadoop.mapred.FileInputFormat;
31 import org.apache.hadoop.mapred.JobConf;
32 import org.apache.hadoop.mapred.InputFormat;
33 import org.apache.hadoop.mapred.OutputFormat;
34 import org.apache.hadoop.mapred.TextInputFormat;
35 import org.apache.hadoop.mapred.TextOutputFormat;
36
37
38
39
40 @Deprecated
41 @SuppressWarnings("unchecked")
42 public class TableMapReduceUtil {
43
44
45
46
47
48
49
50
51
52
53
54
55 public static void initTableMapJob(String table, String columns,
56 Class<? extends TableMap> mapper,
57 Class<? extends WritableComparable> outputKeyClass,
58 Class<? extends Writable> outputValueClass, JobConf job) {
59 initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job, true);
60 }
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 public static void initTableMapJob(String table, String columns,
76 Class<? extends TableMap> mapper,
77 Class<? extends WritableComparable> outputKeyClass,
78 Class<? extends Writable> outputValueClass, JobConf job, boolean addDependencyJars) {
79
80 job.setInputFormat(TableInputFormat.class);
81 job.setMapOutputValueClass(outputValueClass);
82 job.setMapOutputKeyClass(outputKeyClass);
83 job.setMapperClass(mapper);
84 FileInputFormat.addInputPaths(job, table);
85 job.set(TableInputFormat.COLUMN_LIST, columns);
86 if (addDependencyJars) {
87 try {
88 addDependencyJars(job);
89 } catch (IOException e) {
90 e.printStackTrace();
91 }
92 }
93 }
94
95
96
97
98
99
100
101
102
103
104 public static void initTableReduceJob(String table,
105 Class<? extends TableReduce> reducer, JobConf job)
106 throws IOException {
107 initTableReduceJob(table, reducer, job, null);
108 }
109
110
111
112
113
114
115
116
117
118
119
120
121 public static void initTableReduceJob(String table,
122 Class<? extends TableReduce> reducer, JobConf job, Class partitioner)
123 throws IOException {
124 initTableReduceJob(table, reducer, job, partitioner, true);
125 }
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140 public static void initTableReduceJob(String table,
141 Class<? extends TableReduce> reducer, JobConf job, Class partitioner,
142 boolean addDependencyJars) throws IOException {
143 job.setOutputFormat(TableOutputFormat.class);
144 job.setReducerClass(reducer);
145 job.set(TableOutputFormat.OUTPUT_TABLE, table);
146 job.setOutputKeyClass(ImmutableBytesWritable.class);
147 job.setOutputValueClass(Put.class);
148 if (partitioner == HRegionPartitioner.class) {
149 job.setPartitionerClass(HRegionPartitioner.class);
150 HTable outputTable = new HTable(HBaseConfiguration.create(job), table);
151 int regions = outputTable.getRegionsInfo().size();
152 if (job.getNumReduceTasks() > regions) {
153 job.setNumReduceTasks(outputTable.getRegionsInfo().size());
154 }
155 } else if (partitioner != null) {
156 job.setPartitionerClass(partitioner);
157 }
158 if (addDependencyJars) {
159 addDependencyJars(job);
160 }
161 }
162
163
164
165
166
167
168
169
170
171 public static void limitNumReduceTasks(String table, JobConf job)
172 throws IOException {
173 HTable outputTable = new HTable(HBaseConfiguration.create(job), table);
174 int regions = outputTable.getRegionsInfo().size();
175 if (job.getNumReduceTasks() > regions)
176 job.setNumReduceTasks(regions);
177 }
178
179
180
181
182
183
184
185
186
187 public static void limitNumMapTasks(String table, JobConf job)
188 throws IOException {
189 HTable outputTable = new HTable(HBaseConfiguration.create(job), table);
190 int regions = outputTable.getRegionsInfo().size();
191 if (job.getNumMapTasks() > regions)
192 job.setNumMapTasks(regions);
193 }
194
195
196
197
198
199
200
201
202
203 public static void setNumReduceTasks(String table, JobConf job)
204 throws IOException {
205 HTable outputTable = new HTable(HBaseConfiguration.create(job), table);
206 int regions = outputTable.getRegionsInfo().size();
207 job.setNumReduceTasks(regions);
208 }
209
210
211
212
213
214
215
216
217
218 public static void setNumMapTasks(String table, JobConf job)
219 throws IOException {
220 HTable outputTable = new HTable(HBaseConfiguration.create(job), table);
221 int regions = outputTable.getRegionsInfo().size();
222 job.setNumMapTasks(regions);
223 }
224
225
226
227
228
229
230
231
232
233
234 public static void setScannerCaching(JobConf job, int batchSize) {
235 job.setInt("hbase.client.scanner.caching", batchSize);
236 }
237
238
239
240
241 public static void addDependencyJars(JobConf job) throws IOException {
242 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.addDependencyJars(
243 job,
244 org.apache.zookeeper.ZooKeeper.class,
245 com.google.common.base.Function.class,
246 job.getMapOutputKeyClass(),
247 job.getMapOutputValueClass(),
248 job.getOutputKeyClass(),
249 job.getOutputValueClass(),
250 job.getPartitionerClass(),
251 job.getClass("mapred.input.format.class", TextInputFormat.class, InputFormat.class),
252 job.getClass("mapred.output.format.class", TextOutputFormat.class, OutputFormat.class),
253 job.getCombinerClass());
254 }
255 }