1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapreduce;
21
22 import org.apache.hadoop.hbase.util.Base64;
23
24 import java.io.IOException;
25 import java.util.ArrayList;
26
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.HBaseConfiguration;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.KeyValue;
32 import org.apache.hadoop.hbase.client.HTable;
33 import org.apache.hadoop.hbase.client.Put;
34 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
35 import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
36 import org.apache.hadoop.hbase.util.Bytes;
37 import org.apache.hadoop.io.LongWritable;
38 import org.apache.hadoop.io.Text;
39 import org.apache.hadoop.mapreduce.Counter;
40 import org.apache.hadoop.mapreduce.Job;
41 import org.apache.hadoop.mapreduce.Mapper;
42 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
43 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
44 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
45 import org.apache.hadoop.util.GenericOptionsParser;
46
47 import com.google.common.base.Preconditions;
48 import com.google.common.base.Splitter;
49 import com.google.common.collect.Lists;
50
51
52
53
54
55
56
57
58
59 public class ImportTsv {
60 final static String NAME = "importtsv";
61
62 final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines";
63 final static String BULK_OUTPUT_CONF_KEY = "importtsv.bulk.output";
64 final static String COLUMNS_CONF_KEY = "importtsv.columns";
65 final static String SEPARATOR_CONF_KEY = "importtsv.separator";
66 final static String DEFAULT_SEPARATOR = "\t";
67
68 static class TsvParser {
69
70
71
72 private final byte[][] families;
73 private final byte[][] qualifiers;
74
75 private final byte separatorByte;
76
77 private int rowKeyColumnIndex;
78
79 public static String ROWKEY_COLUMN_SPEC="HBASE_ROW_KEY";
80
81
82
83
84
85 public TsvParser(String columnsSpecification, String separatorStr) {
86
87 byte[] separator = Bytes.toBytes(separatorStr);
88 Preconditions.checkArgument(separator.length == 1,
89 "TsvParser only supports single-byte separators");
90 separatorByte = separator[0];
91
92
93 ArrayList<String> columnStrings = Lists.newArrayList(
94 Splitter.on(',').trimResults().split(columnsSpecification));
95
96 families = new byte[columnStrings.size()][];
97 qualifiers = new byte[columnStrings.size()][];
98
99 for (int i = 0; i < columnStrings.size(); i++) {
100 String str = columnStrings.get(i);
101 if (ROWKEY_COLUMN_SPEC.equals(str)) {
102 rowKeyColumnIndex = i;
103 continue;
104 }
105 String[] parts = str.split(":", 2);
106 if (parts.length == 1) {
107 families[i] = str.getBytes();
108 qualifiers[i] = HConstants.EMPTY_BYTE_ARRAY;
109 } else {
110 families[i] = parts[0].getBytes();
111 qualifiers[i] = parts[1].getBytes();
112 }
113 }
114 }
115
116 public int getRowKeyColumnIndex() {
117 return rowKeyColumnIndex;
118 }
119 public byte[] getFamily(int idx) {
120 return families[idx];
121 }
122 public byte[] getQualifier(int idx) {
123 return qualifiers[idx];
124 }
125
126 public ParsedLine parse(byte[] lineBytes, int length)
127 throws BadTsvLineException {
128
129 ArrayList<Integer> tabOffsets = new ArrayList<Integer>(families.length);
130 for (int i = 0; i < length; i++) {
131 if (lineBytes[i] == separatorByte) {
132 tabOffsets.add(i);
133 }
134 }
135 if (tabOffsets.isEmpty()) {
136 throw new BadTsvLineException("No delimiter");
137 }
138
139 tabOffsets.add(length);
140
141 if (tabOffsets.size() > families.length) {
142 throw new BadTsvLineException("Excessive columns");
143 } else if (tabOffsets.size() <= getRowKeyColumnIndex()) {
144 throw new BadTsvLineException("No row key");
145 }
146 return new ParsedLine(tabOffsets, lineBytes);
147 }
148
149 class ParsedLine {
150 private final ArrayList<Integer> tabOffsets;
151 private byte[] lineBytes;
152
153 ParsedLine(ArrayList<Integer> tabOffsets, byte[] lineBytes) {
154 this.tabOffsets = tabOffsets;
155 this.lineBytes = lineBytes;
156 }
157
158 public int getRowKeyOffset() {
159 return getColumnOffset(rowKeyColumnIndex);
160 }
161 public int getRowKeyLength() {
162 return getColumnLength(rowKeyColumnIndex);
163 }
164 public int getColumnOffset(int idx) {
165 if (idx > 0)
166 return tabOffsets.get(idx - 1) + 1;
167 else
168 return 0;
169 }
170 public int getColumnLength(int idx) {
171 return tabOffsets.get(idx) - getColumnOffset(idx);
172 }
173 public int getColumnCount() {
174 return tabOffsets.size();
175 }
176 public byte[] getLineBytes() {
177 return lineBytes;
178 }
179 }
180
181 public static class BadTsvLineException extends Exception {
182 public BadTsvLineException(String err) {
183 super(err);
184 }
185 private static final long serialVersionUID = 1L;
186 }
187 }
188
189
190
191
192 static class TsvImporter
193 extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
194 {
195
196
197 private long ts;
198
199
200 private boolean skipBadLines;
201 private Counter badLineCount;
202
203 private TsvParser parser;
204
205 @Override
206 protected void setup(Context context) {
207 Configuration conf = context.getConfiguration();
208
209
210
211 String separator = conf.get(SEPARATOR_CONF_KEY);
212 if (separator == null) {
213 separator = DEFAULT_SEPARATOR;
214 } else {
215 separator = new String(Base64.decode(separator));
216 }
217
218 parser = new TsvParser(conf.get(COLUMNS_CONF_KEY),
219 separator);
220 if (parser.getRowKeyColumnIndex() == -1) {
221 throw new RuntimeException("No row key column specified");
222 }
223 ts = System.currentTimeMillis();
224
225 skipBadLines = context.getConfiguration().getBoolean(
226 SKIP_LINES_CONF_KEY, true);
227 badLineCount = context.getCounter("ImportTsv", "Bad Lines");
228 }
229
230
231
232
233 @Override
234 public void map(LongWritable offset, Text value,
235 Context context)
236 throws IOException {
237 byte[] lineBytes = value.getBytes();
238
239 try {
240 TsvParser.ParsedLine parsed = parser.parse(
241 lineBytes, value.getLength());
242 ImmutableBytesWritable rowKey =
243 new ImmutableBytesWritable(lineBytes,
244 parsed.getRowKeyOffset(),
245 parsed.getRowKeyLength());
246
247 Put put = new Put(rowKey.copyBytes());
248 for (int i = 0; i < parsed.getColumnCount(); i++) {
249 if (i == parser.getRowKeyColumnIndex()) continue;
250 KeyValue kv = new KeyValue(
251 lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
252 parser.getFamily(i), 0, parser.getFamily(i).length,
253 parser.getQualifier(i), 0, parser.getQualifier(i).length,
254 ts,
255 KeyValue.Type.Put,
256 lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i));
257 put.add(kv);
258 }
259 context.write(rowKey, put);
260 } catch (BadTsvLineException badLine) {
261 if (skipBadLines) {
262 System.err.println(
263 "Bad line at offset: " + offset.get() + ":\n" +
264 badLine.getMessage());
265 badLineCount.increment(1);
266 return;
267 } else {
268 throw new IOException(badLine);
269 }
270 } catch (InterruptedException e) {
271 e.printStackTrace();
272 }
273 }
274 }
275
276
277
278
279
280
281
282
283
284 public static Job createSubmittableJob(Configuration conf, String[] args)
285 throws IOException {
286
287
288
289 String actualSeparator = conf.get(SEPARATOR_CONF_KEY);
290 if (actualSeparator != null) {
291 conf.set(SEPARATOR_CONF_KEY, new String(
292 Base64.encodeBytes(actualSeparator.getBytes())));
293 }
294
295 String tableName = args[0];
296 Path inputDir = new Path(args[1]);
297 Job job = new Job(conf, NAME + "_" + tableName);
298 job.setJarByClass(TsvImporter.class);
299 FileInputFormat.setInputPaths(job, inputDir);
300 job.setInputFormatClass(TextInputFormat.class);
301 job.setMapperClass(TsvImporter.class);
302
303 String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
304 if (hfileOutPath != null) {
305 HTable table = new HTable(conf, tableName);
306 job.setReducerClass(PutSortReducer.class);
307 Path outputDir = new Path(hfileOutPath);
308 FileOutputFormat.setOutputPath(job, outputDir);
309 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
310 job.setMapOutputValueClass(Put.class);
311 HFileOutputFormat.configureIncrementalLoad(job, table);
312 } else {
313
314
315 TableMapReduceUtil.initTableReducerJob(tableName, null, job);
316 job.setNumReduceTasks(0);
317 }
318
319 TableMapReduceUtil.addDependencyJars(job);
320 TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
321 com.google.common.base.Function.class
322 return job;
323 }
324
325
326
327
328 private static void usage(final String errorMsg) {
329 if (errorMsg != null && errorMsg.length() > 0) {
330 System.err.println("ERROR: " + errorMsg);
331 }
332 String usage =
333 "Usage: " + NAME + " -Dimporttsv.columns=a,b,c <tablename> <inputdir>\n" +
334 "\n" +
335 "Imports the given input directory of TSV data into the specified table.\n" +
336 "\n" +
337 "The column names of the TSV data must be specified using the -Dimporttsv.columns\n" +
338 "option. This option takes the form of comma-separated column names, where each\n" +
339 "column name is either a simple column family, or a columnfamily:qualifier. The special\n" +
340 "column name HBASE_ROW_KEY is used to designate that this column should be used\n" +
341 "as the row key for each imported record. You must specify exactly one column\n" +
342 "to be the row key.\n" +
343 "\n" +
344 "In order to prepare data for a bulk data load, pass the option:\n" +
345 " -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output\n" +
346 "\n" +
347 "Other options that may be specified with -D include:\n" +
348 " -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line\n" +
349 " '-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs";
350 System.err.println(usage);
351 }
352
353
354
355
356
357
358
359 public static void main(String[] args) throws Exception {
360 Configuration conf = HBaseConfiguration.create();
361 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
362 if (otherArgs.length < 2) {
363 usage("Wrong number of arguments: " + otherArgs.length);
364 System.exit(-1);
365 }
366
367
368 String columns[] = conf.getStrings(COLUMNS_CONF_KEY);
369 if (columns == null) {
370 usage("No columns specified. Please specify with -D" +
371 COLUMNS_CONF_KEY+"=...");
372 System.exit(-1);
373 }
374
375
376 int rowkeysFound=0;
377 for (String col : columns) {
378 if (col.equals(TsvParser.ROWKEY_COLUMN_SPEC)) rowkeysFound++;
379 }
380 if (rowkeysFound != 1) {
381 usage("Must specify exactly one column as " + TsvParser.ROWKEY_COLUMN_SPEC);
382 System.exit(-1);
383 }
384
385
386 if (columns.length < 2) {
387 usage("One or more columns in addition to the row key are required");
388 System.exit(-1);
389 }
390
391 Job job = createSubmittableJob(conf, otherArgs);
392 System.exit(job.waitForCompletion(true) ? 0 : 1);
393 }
394
395 }