1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.mapreduce;
21  
22  import java.io.UnsupportedEncodingException;
23  import java.util.List;
24  import java.util.ArrayList;
25  
26  import org.apache.hadoop.mapreduce.Job;
27  import org.apache.hadoop.fs.FSDataOutputStream;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.fs.FileSystem;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.util.GenericOptionsParser;
32  
33  import org.apache.hadoop.hbase.HConstants;
34  import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser;
35  import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
36  import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine;
37  import org.apache.hadoop.hbase.util.Bytes;
38  import org.apache.hadoop.hbase.HBaseTestingUtility;
39  import org.apache.hadoop.hbase.client.HTable;
40  import org.apache.hadoop.hbase.KeyValue;
41  import org.apache.hadoop.hbase.client.ResultScanner;
42  import org.apache.hadoop.hbase.client.Scan;
43  import org.apache.hadoop.hbase.client.HBaseAdmin;
44  import org.apache.hadoop.hbase.HColumnDescriptor;
45  import org.apache.hadoop.hbase.HTableDescriptor;
46  import org.apache.hadoop.hbase.MiniHBaseCluster;
47  import org.apache.hadoop.hbase.client.Result;
48  
49  import org.junit.Test;
50  
51  import com.google.common.base.Joiner;
52  import com.google.common.base.Splitter;
53  import com.google.common.collect.Iterables;
54  
55  import static org.junit.Assert.*;
56  
57  public class TestImportTsv {
58  
59    @Test
60    public void testTsvParserSpecParsing() {
61      TsvParser parser;
62  
63      parser = new TsvParser("HBASE_ROW_KEY", "\t");
64      assertNull(parser.getFamily(0));
65      assertNull(parser.getQualifier(0));
66      assertEquals(0, parser.getRowKeyColumnIndex());
67  
68      parser = new TsvParser("HBASE_ROW_KEY,col1:scol1", "\t");
69      assertNull(parser.getFamily(0));
70      assertNull(parser.getQualifier(0));
71      assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
72      assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
73      assertEquals(0, parser.getRowKeyColumnIndex());
74  
75      parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,col1:scol2", "\t");
76      assertNull(parser.getFamily(0));
77      assertNull(parser.getQualifier(0));
78      assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
79      assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
80      assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(2));
81      assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(2));
82      assertEquals(0, parser.getRowKeyColumnIndex());
83    }
84  
85    @Test
86    public void testTsvParser() throws BadTsvLineException {
87      TsvParser parser = new TsvParser("col_a,col_b:qual,HBASE_ROW_KEY,col_d", "\t");
88      assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(0));
89      assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(0));
90      assertBytesEquals(Bytes.toBytes("col_b"), parser.getFamily(1));
91      assertBytesEquals(Bytes.toBytes("qual"), parser.getQualifier(1));
92      assertNull(parser.getFamily(2));
93      assertNull(parser.getQualifier(2));
94      assertEquals(2, parser.getRowKeyColumnIndex());
95      
96      byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d");
97      ParsedLine parsed = parser.parse(line, line.length);
98      checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
99    }
100 
101   private void checkParsing(ParsedLine parsed, Iterable<String> expected) {
102     ArrayList<String> parsedCols = new ArrayList<String>();
103     for (int i = 0; i < parsed.getColumnCount(); i++) {
104       parsedCols.add(Bytes.toString(
105           parsed.getLineBytes(),
106           parsed.getColumnOffset(i),
107           parsed.getColumnLength(i)));
108     }
109     if (!Iterables.elementsEqual(parsedCols, expected)) {
110       fail("Expected: " + Joiner.on(",").join(expected) + "\n" + 
111           "Got:" + Joiner.on(",").join(parsedCols));
112     }
113   }
114   
115   private void assertBytesEquals(byte[] a, byte[] b) {
116     assertEquals(Bytes.toStringBinary(a), Bytes.toStringBinary(b));
117   }
118 
119   /**
120    * Test cases that throw BadTsvLineException
121    */
122   @Test(expected=BadTsvLineException.class)
123   public void testTsvParserBadTsvLineExcessiveColumns() throws BadTsvLineException {
124     TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
125     byte[] line = Bytes.toBytes("val_a\tval_b\tval_c");
126     ParsedLine parsed = parser.parse(line, line.length);
127   }
128 
129   @Test(expected=BadTsvLineException.class)
130   public void testTsvParserBadTsvLineZeroColumn() throws BadTsvLineException {
131     TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
132     byte[] line = Bytes.toBytes("");
133     ParsedLine parsed = parser.parse(line, line.length);
134   }
135 
136   @Test(expected=BadTsvLineException.class)
137   public void testTsvParserBadTsvLineOnlyKey() throws BadTsvLineException {
138     TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
139     byte[] line = Bytes.toBytes("key_only");
140     ParsedLine parsed = parser.parse(line, line.length);
141   }
142 
143   @Test(expected=BadTsvLineException.class)
144   public void testTsvParserBadTsvLineNoRowKey() throws BadTsvLineException {
145     TsvParser parser = new TsvParser("col_a,HBASE_ROW_KEY", "\t");
146     byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key");
147     ParsedLine parsed = parser.parse(line, line.length);
148   }
149 
150   @Test
151   public void testMROnTable()
152   throws Exception {
153     String TABLE_NAME = "TestTable";
154     String FAMILY = "FAM";
155     String INPUT_FILE = "InputFile.esv";
156     
157     // Prepare the arguments required for the test.
158     String[] args = new String[] {
159         "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
160         "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
161         TABLE_NAME,
162         INPUT_FILE
163     };
164 
165     // Cluster
166     HBaseTestingUtility htu1 = new HBaseTestingUtility();
167 
168     MiniHBaseCluster cluster = htu1.startMiniCluster();
169 
170     GenericOptionsParser opts = new GenericOptionsParser(cluster.getConfiguration(), args);
171     Configuration conf = opts.getConfiguration();
172     args = opts.getRemainingArgs();
173 
174     try {
175       
176       FileSystem fs = FileSystem.get(conf);
177       FSDataOutputStream op = fs.create(new Path(INPUT_FILE), true);
178       String line = "KEY\u001bVALUE1\u001bVALUE2\n";
179       op.write(line.getBytes(HConstants.UTF8_ENCODING));
180       op.close();
181 
182       final byte[] FAM = Bytes.toBytes(FAMILY);
183       final byte[] TAB = Bytes.toBytes(TABLE_NAME);
184       final byte[] QA = Bytes.toBytes("A");
185       final byte[] QB = Bytes.toBytes("B");
186 
187       HTableDescriptor desc = new HTableDescriptor(TAB);
188       desc.addFamily(new HColumnDescriptor(FAM));
189       new HBaseAdmin(conf).createTable(desc);
190 
191       Job job = ImportTsv.createSubmittableJob(conf, args);
192       job.waitForCompletion(false);
193       assertTrue(job.isSuccessful());
194       
195       HTable table = new HTable(new Configuration(conf), TAB);
196       boolean verified = false;
197       long pause = conf.getLong("hbase.client.pause", 5 * 1000);
198       int numRetries = conf.getInt("hbase.client.retries.number", 5);
199       for (int i = 0; i < numRetries; i++) {
200         try {
201           Scan scan = new Scan();
202           // Scan entire family.
203           scan.addFamily(FAM);
204           ResultScanner resScanner = table.getScanner(scan);
205           for (Result res : resScanner) {
206             assertTrue(res.size() == 2);
207             List<KeyValue> kvs = res.list();
208             assertEquals(toU8Str(kvs.get(0).getRow()),
209                 toU8Str(Bytes.toBytes("KEY")));
210             assertEquals(toU8Str(kvs.get(1).getRow()),
211                 toU8Str(Bytes.toBytes("KEY")));
212             assertEquals(toU8Str(kvs.get(0).getValue()),
213                 toU8Str(Bytes.toBytes("VALUE1")));
214             assertEquals(toU8Str(kvs.get(1).getValue()),
215                 toU8Str(Bytes.toBytes("VALUE2")));
216             // Only one result set is expected, so let it loop.
217           }
218           verified = true;
219           break;
220         } catch (NullPointerException e) {
221           // If here, a cell was empty.  Presume its because updates came in
222           // after the scanner had been opened.  Wait a while and retry.
223         }
224         try {
225           Thread.sleep(pause);
226         } catch (InterruptedException e) {
227           // continue
228         }
229       }
230       assertTrue(verified);
231     } finally {
232       cluster.shutdown();
233     }
234   }
235   
236   public static String toU8Str(byte[] bytes) throws UnsupportedEncodingException {
237     return new String(bytes, HConstants.UTF8_ENCODING);
238   }
239 }