1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapreduce;
21
22 import java.io.IOException;
23
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.fs.Path;
26 import org.apache.hadoop.hbase.HBaseConfiguration;
27 import org.apache.hadoop.hbase.KeyValue;
28 import org.apache.hadoop.hbase.client.Put;
29 import org.apache.hadoop.hbase.client.Result;
30 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
31 import org.apache.hadoop.mapreduce.Job;
32 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
33 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
34 import org.apache.hadoop.util.GenericOptionsParser;
35
36
37
38
39 public class Import {
40 final static String NAME = "import";
41
42
43
44
45 static class Importer
46 extends TableMapper<ImmutableBytesWritable, Put> {
47
48
49
50
51
52
53
54
55 @Override
56 public void map(ImmutableBytesWritable row, Result value,
57 Context context)
58 throws IOException {
59 try {
60 context.write(row, resultToPut(row, value));
61 } catch (InterruptedException e) {
62 e.printStackTrace();
63 }
64 }
65
66 private static Put resultToPut(ImmutableBytesWritable key, Result result)
67 throws IOException {
68 Put put = new Put(key.get());
69 for (KeyValue kv : result.raw()) {
70 put.add(kv);
71 }
72 return put;
73 }
74 }
75
76
77
78
79
80
81
82
83
84 public static Job createSubmittableJob(Configuration conf, String[] args)
85 throws IOException {
86 String tableName = args[0];
87 Path inputDir = new Path(args[1]);
88 Job job = new Job(conf, NAME + "_" + tableName);
89 job.setJarByClass(Importer.class);
90 FileInputFormat.setInputPaths(job, inputDir);
91 job.setInputFormatClass(SequenceFileInputFormat.class);
92 job.setMapperClass(Importer.class);
93
94
95 TableMapReduceUtil.initTableReducerJob(tableName, null, job);
96 job.setNumReduceTasks(0);
97 return job;
98 }
99
100
101
102
103 private static void usage(final String errorMsg) {
104 if (errorMsg != null && errorMsg.length() > 0) {
105 System.err.println("ERROR: " + errorMsg);
106 }
107 System.err.println("Usage: Import <tablename> <inputdir>");
108 }
109
110
111
112
113
114
115
116 public static void main(String[] args) throws Exception {
117 Configuration conf = HBaseConfiguration.create();
118 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
119 if (otherArgs.length < 2) {
120 usage("Wrong number of arguments: " + otherArgs.length);
121 System.exit(-1);
122 }
123 Job job = createSubmittableJob(conf, otherArgs);
124 System.exit(job.waitForCompletion(true) ? 0 : 1);
125 }
126 }