1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapreduce;
21
22 import java.io.IOException;
23
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.fs.Path;
26 import org.apache.hadoop.hbase.HBaseConfiguration;
27 import org.apache.hadoop.hbase.client.Result;
28 import org.apache.hadoop.hbase.client.Scan;
29 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
30 import org.apache.hadoop.hbase.util.Bytes;
31 import org.apache.hadoop.mapreduce.Job;
32 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
33 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
34 import org.apache.hadoop.util.GenericOptionsParser;
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37
38
39
40
41
42
43 public class Export {
44 private static final Log LOG = LogFactory.getLog(Export.class);
45 final static String NAME = "export";
46
47
48
49
50 static class Exporter
51 extends TableMapper<ImmutableBytesWritable, Result> {
52
53
54
55
56
57
58
59
60 @Override
61 public void map(ImmutableBytesWritable row, Result value,
62 Context context)
63 throws IOException {
64 try {
65 context.write(row, value);
66 } catch (InterruptedException e) {
67 e.printStackTrace();
68 }
69 }
70 }
71
72
73
74
75
76
77
78
79
80 public static Job createSubmittableJob(Configuration conf, String[] args)
81 throws IOException {
82 String tableName = args[0];
83 Path outputDir = new Path(args[1]);
84 Job job = new Job(conf, NAME + "_" + tableName);
85 job.setJobName(NAME + "_" + tableName);
86 job.setJarByClass(Exporter.class);
87
88 Scan s = new Scan();
89
90 int versions = args.length > 2? Integer.parseInt(args[2]): 1;
91 s.setMaxVersions(versions);
92 long startTime = args.length > 3? Long.parseLong(args[3]): 0L;
93 long endTime = args.length > 4? Long.parseLong(args[4]): Long.MAX_VALUE;
94 s.setTimeRange(startTime, endTime);
95 s.setCacheBlocks(false);
96 if (conf.get(TableInputFormat.SCAN_COLUMN_FAMILY) != null) {
97 s.addFamily(Bytes.toBytes(conf.get(TableInputFormat.SCAN_COLUMN_FAMILY)));
98 }
99 LOG.info("verisons=" + versions + ", starttime=" + startTime +
100 ", endtime=" + endTime);
101 TableMapReduceUtil.initTableMapperJob(tableName, s, Exporter.class, null,
102 null, job);
103
104 job.setNumReduceTasks(0);
105 job.setOutputFormatClass(SequenceFileOutputFormat.class);
106 job.setOutputKeyClass(ImmutableBytesWritable.class);
107 job.setOutputValueClass(Result.class);
108 FileOutputFormat.setOutputPath(job, outputDir);
109 return job;
110 }
111
112
113
114
115 private static void usage(final String errorMsg) {
116 if (errorMsg != null && errorMsg.length() > 0) {
117 System.err.println("ERROR: " + errorMsg);
118 }
119 System.err.println("Usage: Export [-D <property=value>]* <tablename> <outputdir> [<versions> " +
120 "[<starttime> [<endtime>]]]\n");
121 System.err.println(" Note: -D properties will be applied to the conf used. ");
122 System.err.println(" For example: ");
123 System.err.println(" -D mapred.output.compress=true");
124 System.err.println(" -D mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec");
125 System.err.println(" -D mapred.output.compression.type=BLOCK");
126 System.err.println(" Additionally, the following SCAN properties can be specified");
127 System.err.println(" to control/limit what is exported..");
128 System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<familyName>");
129 }
130
131
132
133
134
135
136
137 public static void main(String[] args) throws Exception {
138 Configuration conf = HBaseConfiguration.create();
139 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
140 if (otherArgs.length < 2) {
141 usage("Wrong number of arguments: " + otherArgs.length);
142 System.exit(-1);
143 }
144 Job job = createSubmittableJob(conf, otherArgs);
145 System.exit(job.waitForCompletion(true)? 0 : 1);
146 }
147 }