1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapreduce;
21
22 import java.io.ByteArrayInputStream;
23 import java.io.ByteArrayOutputStream;
24 import java.io.DataInputStream;
25 import java.io.DataOutputStream;
26 import java.io.IOException;
27 import java.net.URL;
28 import java.net.URLDecoder;
29 import java.util.Enumeration;
30 import java.util.HashSet;
31 import java.util.Set;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.FileSystem;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.hbase.HBaseConfiguration;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.client.HTable;
41 import org.apache.hadoop.hbase.client.Scan;
42 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
43 import org.apache.hadoop.hbase.util.Base64;
44 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
45 import org.apache.hadoop.io.Writable;
46 import org.apache.hadoop.io.WritableComparable;
47 import org.apache.hadoop.mapreduce.Job;
48 import org.apache.hadoop.util.StringUtils;
49
50
51
52
53 @SuppressWarnings("unchecked")
54 public class TableMapReduceUtil {
55 static Log LOG = LogFactory.getLog(TableMapReduceUtil.class);
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 public static void initTableMapperJob(String table, Scan scan,
71 Class<? extends TableMapper> mapper,
72 Class<? extends WritableComparable> outputKeyClass,
73 Class<? extends Writable> outputValueClass, Job job)
74 throws IOException {
75 initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass,
76 job, true);
77 }
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94 public static void initTableMapperJob(String table, Scan scan,
95 Class<? extends TableMapper> mapper,
96 Class<? extends WritableComparable> outputKeyClass,
97 Class<? extends Writable> outputValueClass, Job job,
98 boolean addDependencyJars)
99 throws IOException {
100 job.setInputFormatClass(TableInputFormat.class);
101 if (outputValueClass != null) job.setMapOutputValueClass(outputValueClass);
102 if (outputKeyClass != null) job.setMapOutputKeyClass(outputKeyClass);
103 job.setMapperClass(mapper);
104 HBaseConfiguration.addHbaseResources(job.getConfiguration());
105 job.getConfiguration().set(TableInputFormat.INPUT_TABLE, table);
106 job.getConfiguration().set(TableInputFormat.SCAN,
107 convertScanToString(scan));
108 if (addDependencyJars) {
109 addDependencyJars(job);
110 }
111 }
112
113
114
115
116
117
118
119
120 static String convertScanToString(Scan scan) throws IOException {
121 ByteArrayOutputStream out = new ByteArrayOutputStream();
122 DataOutputStream dos = new DataOutputStream(out);
123 scan.write(dos);
124 return Base64.encodeBytes(out.toByteArray());
125 }
126
127
128
129
130
131
132
133
134 static Scan convertStringToScan(String base64) throws IOException {
135 ByteArrayInputStream bis = new ByteArrayInputStream(Base64.decode(base64));
136 DataInputStream dis = new DataInputStream(bis);
137 Scan scan = new Scan();
138 scan.readFields(dis);
139 return scan;
140 }
141
142
143
144
145
146
147
148
149
150
151 public static void initTableReducerJob(String table,
152 Class<? extends TableReducer> reducer, Job job)
153 throws IOException {
154 initTableReducerJob(table, reducer, job, null);
155 }
156
157
158
159
160
161
162
163
164
165
166
167
168 public static void initTableReducerJob(String table,
169 Class<? extends TableReducer> reducer, Job job,
170 Class partitioner) throws IOException {
171 initTableReducerJob(table, reducer, job, partitioner, null, null, null);
172 }
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197 public static void initTableReducerJob(String table,
198 Class<? extends TableReducer> reducer, Job job,
199 Class partitioner, String quorumAddress, String serverClass,
200 String serverImpl) throws IOException {
201 initTableReducerJob(table, reducer, job, partitioner, quorumAddress,
202 serverClass, serverImpl, true);
203 }
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230 public static void initTableReducerJob(String table,
231 Class<? extends TableReducer> reducer, Job job,
232 Class partitioner, String quorumAddress, String serverClass,
233 String serverImpl, boolean addDependencyJars) throws IOException {
234
235 Configuration conf = job.getConfiguration();
236 HBaseConfiguration.addHbaseResources(conf);
237 job.setOutputFormatClass(TableOutputFormat.class);
238 if (reducer != null) job.setReducerClass(reducer);
239 conf.set(TableOutputFormat.OUTPUT_TABLE, table);
240
241 if (quorumAddress != null) {
242
243 ZKUtil.transformClusterKey(quorumAddress);
244 conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
245 }
246 if (serverClass != null && serverImpl != null) {
247 conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
248 conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl);
249 }
250 job.setOutputKeyClass(ImmutableBytesWritable.class);
251 job.setOutputValueClass(Writable.class);
252 if (partitioner == HRegionPartitioner.class) {
253 job.setPartitionerClass(HRegionPartitioner.class);
254 HTable outputTable = new HTable(conf, table);
255 int regions = outputTable.getRegionsInfo().size();
256 if (job.getNumReduceTasks() > regions) {
257 job.setNumReduceTasks(outputTable.getRegionsInfo().size());
258 }
259 } else if (partitioner != null) {
260 job.setPartitionerClass(partitioner);
261 }
262
263 if (addDependencyJars) {
264 addDependencyJars(job);
265 }
266 }
267
268
269
270
271
272
273
274
275
276 public static void limitNumReduceTasks(String table, Job job)
277 throws IOException {
278 HTable outputTable = new HTable(job.getConfiguration(), table);
279 int regions = outputTable.getRegionsInfo().size();
280 if (job.getNumReduceTasks() > regions)
281 job.setNumReduceTasks(regions);
282 }
283
284
285
286
287
288
289
290
291
292 public static void setNumReduceTasks(String table, Job job)
293 throws IOException {
294 HTable outputTable = new HTable(job.getConfiguration(), table);
295 int regions = outputTable.getRegionsInfo().size();
296 job.setNumReduceTasks(regions);
297 }
298
299
300
301
302
303
304
305
306
307
308 public static void setScannerCaching(Job job, int batchSize) {
309 job.getConfiguration().setInt("hbase.client.scanner.caching", batchSize);
310 }
311
312
313
314
315
316
317 public static void addDependencyJars(Job job) throws IOException {
318 try {
319 addDependencyJars(job.getConfiguration(),
320 org.apache.zookeeper.ZooKeeper.class,
321 job.getMapOutputKeyClass(),
322 job.getMapOutputValueClass(),
323 job.getInputFormatClass(),
324 job.getOutputKeyClass(),
325 job.getOutputValueClass(),
326 job.getOutputFormatClass(),
327 job.getPartitionerClass(),
328 job.getCombinerClass());
329 } catch (ClassNotFoundException e) {
330 throw new IOException(e);
331 }
332 }
333
334
335
336
337
338
339 public static void addDependencyJars(Configuration conf,
340 Class... classes) throws IOException {
341
342 FileSystem localFs = FileSystem.getLocal(conf);
343
344 Set<String> jars = new HashSet<String>();
345
346
347 jars.addAll( conf.getStringCollection("tmpjars") );
348
349
350 for (Class clazz : classes) {
351 if (clazz == null) continue;
352
353 String pathStr = findContainingJar(clazz);
354 if (pathStr == null) {
355 LOG.warn("Could not find jar for class " + clazz +
356 " in order to ship it to the cluster.");
357 continue;
358 }
359 Path path = new Path(pathStr);
360 if (!localFs.exists(path)) {
361 LOG.warn("Could not validate jar file " + path + " for class "
362 + clazz);
363 continue;
364 }
365 jars.add(path.makeQualified(localFs).toString());
366 }
367 if (jars.isEmpty()) return;
368
369 conf.set("tmpjars",
370 StringUtils.arrayToString(jars.toArray(new String[0])));
371 }
372
373
374
375
376
377
378
379
380
381
382
383
384 private static String findContainingJar(Class my_class) {
385 ClassLoader loader = my_class.getClassLoader();
386 String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
387 try {
388 for(Enumeration itr = loader.getResources(class_file);
389 itr.hasMoreElements();) {
390 URL url = (URL) itr.nextElement();
391 if ("jar".equals(url.getProtocol())) {
392 String toReturn = url.getPath();
393 if (toReturn.startsWith("file:")) {
394 toReturn = toReturn.substring("file:".length());
395 }
396
397
398
399
400
401
402 toReturn = toReturn.replaceAll("\\+", "%2B");
403 toReturn = URLDecoder.decode(toReturn, "UTF-8");
404 return toReturn.replaceAll("!.*$", "");
405 }
406 }
407 } catch (IOException e) {
408 throw new RuntimeException(e);
409 }
410 return null;
411 }
412
413
414 }