View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.mapreduce;
21  
22  import org.apache.hadoop.hbase.util.Base64;
23  
24  import java.io.IOException;
25  import java.util.ArrayList;
26  
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.KeyValue;
32  import org.apache.hadoop.hbase.client.HTable;
33  import org.apache.hadoop.hbase.client.Put;
34  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
35  import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.io.LongWritable;
38  import org.apache.hadoop.io.Text;
39  import org.apache.hadoop.mapreduce.Counter;
40  import org.apache.hadoop.mapreduce.Job;
41  import org.apache.hadoop.mapreduce.Mapper;
42  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
43  import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
44  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
45  import org.apache.hadoop.util.GenericOptionsParser;
46  
47  import com.google.common.base.Preconditions;
48  import com.google.common.base.Splitter;
49  import com.google.common.collect.Lists;
50  
51  /**
52   * Tool to import data from a TSV file.
53   *
54   * This tool is rather simplistic - it doesn't do any quoting or
55   * escaping, but is useful for many data loads.
56   *
57   * @see ImportTsv#usage(String)
58   */
59  public class ImportTsv {
60    final static String NAME = "importtsv";
61  
62    final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines";
63    final static String BULK_OUTPUT_CONF_KEY = "importtsv.bulk.output";
64    final static String COLUMNS_CONF_KEY = "importtsv.columns";
65    final static String SEPARATOR_CONF_KEY = "importtsv.separator";
66    final static String DEFAULT_SEPARATOR = "\t";
67  
68    static class TsvParser {
69      /**
70       * Column families and qualifiers mapped to the TSV columns
71       */
72      private final byte[][] families;
73      private final byte[][] qualifiers;
74  
75      private final byte separatorByte;
76  
77      private int rowKeyColumnIndex;
78      
79      public static String ROWKEY_COLUMN_SPEC="HBASE_ROW_KEY";
80  
81      /**
82       * @param columnsSpecification the list of columns to parser out, comma separated.
83       * The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC
84       */
85      public TsvParser(String columnsSpecification, String separatorStr) {
86        // Configure separator
87        byte[] separator = Bytes.toBytes(separatorStr);
88        Preconditions.checkArgument(separator.length == 1,
89          "TsvParser only supports single-byte separators");
90        separatorByte = separator[0];
91  
92        // Configure columns
93        ArrayList<String> columnStrings = Lists.newArrayList(
94          Splitter.on(',').trimResults().split(columnsSpecification));
95        
96        families = new byte[columnStrings.size()][];
97        qualifiers = new byte[columnStrings.size()][];
98  
99        for (int i = 0; i < columnStrings.size(); i++) {
100         String str = columnStrings.get(i);
101         if (ROWKEY_COLUMN_SPEC.equals(str)) {
102           rowKeyColumnIndex = i;
103           continue;
104         }
105         String[] parts = str.split(":", 2);
106         if (parts.length == 1) {
107           families[i] = str.getBytes();
108           qualifiers[i] = HConstants.EMPTY_BYTE_ARRAY;
109         } else {
110           families[i] = parts[0].getBytes();
111           qualifiers[i] = parts[1].getBytes();
112         }
113       }
114     }
115     
116     public int getRowKeyColumnIndex() {
117       return rowKeyColumnIndex;
118     }
119     public byte[] getFamily(int idx) {
120       return families[idx];
121     }
122     public byte[] getQualifier(int idx) {
123       return qualifiers[idx];
124     }
125     
126     public ParsedLine parse(byte[] lineBytes, int length)
127     throws BadTsvLineException {
128       // Enumerate separator offsets
129       ArrayList<Integer> tabOffsets = new ArrayList<Integer>(families.length);
130       for (int i = 0; i < length; i++) {
131         if (lineBytes[i] == separatorByte) {
132           tabOffsets.add(i);
133         }
134       }
135       if (tabOffsets.isEmpty()) {
136         throw new BadTsvLineException("No delimiter");
137       }
138 
139       tabOffsets.add(length);
140 
141       if (tabOffsets.size() > families.length) {
142         throw new BadTsvLineException("Excessive columns");
143       } else if (tabOffsets.size() <= getRowKeyColumnIndex()) {
144         throw new BadTsvLineException("No row key");
145       }
146       return new ParsedLine(tabOffsets, lineBytes);
147     }
148     
149     class ParsedLine {
150       private final ArrayList<Integer> tabOffsets;
151       private byte[] lineBytes;
152       
153       ParsedLine(ArrayList<Integer> tabOffsets, byte[] lineBytes) {
154         this.tabOffsets = tabOffsets;
155         this.lineBytes = lineBytes;
156       }
157       
158       public int getRowKeyOffset() {
159         return getColumnOffset(rowKeyColumnIndex);
160       }
161       public int getRowKeyLength() {
162         return getColumnLength(rowKeyColumnIndex);
163       }
164       public int getColumnOffset(int idx) {
165         if (idx > 0)
166           return tabOffsets.get(idx - 1) + 1;
167         else
168           return 0;
169       }      
170       public int getColumnLength(int idx) {
171         return tabOffsets.get(idx) - getColumnOffset(idx);
172       }
173       public int getColumnCount() {
174         return tabOffsets.size();
175       }
176       public byte[] getLineBytes() {
177         return lineBytes;
178       }
179     }
180     
181     public static class BadTsvLineException extends Exception {
182       public BadTsvLineException(String err) {
183         super(err);
184       }
185       private static final long serialVersionUID = 1L;
186     }
187   }
188   
189   /**
190    * Write table content out to files in hdfs.
191    */
192   static class TsvImporter
193   extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
194   {
195     
196     /** Timestamp for all inserted rows */
197     private long ts;
198 
199     /** Should skip bad lines */
200     private boolean skipBadLines;
201     private Counter badLineCount;
202 
203     private TsvParser parser;
204 
205     @Override
206     protected void setup(Context context) {
207       Configuration conf = context.getConfiguration();
208 
209       // If a custom separator has been used,
210       // decode it back from Base64 encoding.
211       String separator = conf.get(SEPARATOR_CONF_KEY);
212       if (separator == null) {
213         separator = DEFAULT_SEPARATOR;
214       } else {
215         separator = new String(Base64.decode(separator));
216       }
217 
218       parser = new TsvParser(conf.get(COLUMNS_CONF_KEY),
219                              separator);
220       if (parser.getRowKeyColumnIndex() == -1) {
221         throw new RuntimeException("No row key column specified");
222       }
223       ts = System.currentTimeMillis();
224 
225       skipBadLines = context.getConfiguration().getBoolean(
226         SKIP_LINES_CONF_KEY, true);
227       badLineCount = context.getCounter("ImportTsv", "Bad Lines");
228     }
229 
230     /**
231      * Convert a line of TSV text into an HBase table row.
232      */
233     @Override
234     public void map(LongWritable offset, Text value,
235       Context context)
236     throws IOException {
237       byte[] lineBytes = value.getBytes();
238 
239       try {
240         TsvParser.ParsedLine parsed = parser.parse(
241             lineBytes, value.getLength());
242         ImmutableBytesWritable rowKey =
243           new ImmutableBytesWritable(lineBytes,
244               parsed.getRowKeyOffset(),
245               parsed.getRowKeyLength());
246 
247         Put put = new Put(rowKey.copyBytes());
248         for (int i = 0; i < parsed.getColumnCount(); i++) {
249           if (i == parser.getRowKeyColumnIndex()) continue;
250           KeyValue kv = new KeyValue(
251               lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
252               parser.getFamily(i), 0, parser.getFamily(i).length,
253               parser.getQualifier(i), 0, parser.getQualifier(i).length,
254               ts,
255               KeyValue.Type.Put,
256               lineBytes, parsed.getColumnOffset(i), parsed.getColumnLength(i));
257           put.add(kv);
258         }
259         context.write(rowKey, put);
260       } catch (BadTsvLineException badLine) {
261         if (skipBadLines) {
262           System.err.println(
263               "Bad line at offset: " + offset.get() + ":\n" +
264               badLine.getMessage());
265           badLineCount.increment(1);
266           return;
267         } else {
268           throw new IOException(badLine);
269         }
270       } catch (InterruptedException e) {
271         e.printStackTrace();
272       }
273     }
274   }
275 
276   /**
277    * Sets up the actual job.
278    *
279    * @param conf  The current configuration.
280    * @param args  The command line parameters.
281    * @return The newly created job.
282    * @throws IOException When setting up the job fails.
283    */
284   public static Job createSubmittableJob(Configuration conf, String[] args)
285   throws IOException {
286 
287     // Support non-XML supported characters
288     // by re-encoding the passed separator as a Base64 string.
289     String actualSeparator = conf.get(SEPARATOR_CONF_KEY);
290     if (actualSeparator != null) {
291       conf.set(SEPARATOR_CONF_KEY, new String(
292       Base64.encodeBytes(actualSeparator.getBytes())));
293     }
294 
295     String tableName = args[0];
296     Path inputDir = new Path(args[1]);
297     Job job = new Job(conf, NAME + "_" + tableName);
298     job.setJarByClass(TsvImporter.class);
299     FileInputFormat.setInputPaths(job, inputDir);
300     job.setInputFormatClass(TextInputFormat.class);
301     job.setMapperClass(TsvImporter.class);
302 
303     String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
304     if (hfileOutPath != null) {
305       HTable table = new HTable(conf, tableName);
306       job.setReducerClass(PutSortReducer.class);
307       Path outputDir = new Path(hfileOutPath);
308       FileOutputFormat.setOutputPath(job, outputDir);
309       job.setMapOutputKeyClass(ImmutableBytesWritable.class);
310       job.setMapOutputValueClass(Put.class);
311       HFileOutputFormat.configureIncrementalLoad(job, table);
312     } else {
313       // No reducers.  Just write straight to table.  Call initTableReducerJob
314       // to set up the TableOutputFormat.
315       TableMapReduceUtil.initTableReducerJob(tableName, null, job);
316       job.setNumReduceTasks(0);
317     }
318     
319     TableMapReduceUtil.addDependencyJars(job);
320     TableMapReduceUtil.addDependencyJars(job.getConfiguration(), 
321         com.google.common.base.Function.class /* Guava used by TsvParser */);
322     return job;
323   }
324 
325   /*
326    * @param errorMsg Error message.  Can be null.
327    */
328   private static void usage(final String errorMsg) {
329     if (errorMsg != null && errorMsg.length() > 0) {
330       System.err.println("ERROR: " + errorMsg);
331     }
332     String usage = 
333       "Usage: " + NAME + " -Dimporttsv.columns=a,b,c <tablename> <inputdir>\n" +
334       "\n" +
335       "Imports the given input directory of TSV data into the specified table.\n" +
336       "\n" +
337       "The column names of the TSV data must be specified using the -Dimporttsv.columns\n" +
338       "option. This option takes the form of comma-separated column names, where each\n" +
339       "column name is either a simple column family, or a columnfamily:qualifier. The special\n" +
340       "column name HBASE_ROW_KEY is used to designate that this column should be used\n" +
341       "as the row key for each imported record. You must specify exactly one column\n" +
342       "to be the row key.\n" +
343       "\n" +
344       "In order to prepare data for a bulk data load, pass the option:\n" +
345       "  -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output\n" +
346       "\n" +
347       "Other options that may be specified with -D include:\n" +
348       "  -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line\n" +
349       "  '-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs";
350     System.err.println(usage);
351   }
352 
353   /**
354    * Main entry point.
355    *
356    * @param args  The command line parameters.
357    * @throws Exception When running the job fails.
358    */
359   public static void main(String[] args) throws Exception {
360     Configuration conf = HBaseConfiguration.create();
361     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
362     if (otherArgs.length < 2) {
363       usage("Wrong number of arguments: " + otherArgs.length);
364       System.exit(-1);
365     }
366 
367     // Make sure columns are specified
368     String columns[] = conf.getStrings(COLUMNS_CONF_KEY);
369     if (columns == null) {
370       usage("No columns specified. Please specify with -D" +
371             COLUMNS_CONF_KEY+"=...");
372       System.exit(-1);
373     }
374 
375     // Make sure they specify exactly one column as the row key
376     int rowkeysFound=0;
377     for (String col : columns) {
378       if (col.equals(TsvParser.ROWKEY_COLUMN_SPEC)) rowkeysFound++;
379     }
380     if (rowkeysFound != 1) {
381       usage("Must specify exactly one column as " + TsvParser.ROWKEY_COLUMN_SPEC);
382       System.exit(-1);
383     }
384 
385     // Make sure one or more columns are specified
386     if (columns.length < 2) {
387       usage("One or more columns in addition to the row key are required");
388       System.exit(-1);
389     }
390 
391     Job job = createSubmittableJob(conf, otherArgs);
392     System.exit(job.waitForCompletion(true) ? 0 : 1);
393   }
394 
395 }