1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapreduce;
21
22 import java.io.UnsupportedEncodingException;
23 import java.util.List;
24 import java.util.ArrayList;
25
26 import org.apache.hadoop.mapreduce.Job;
27 import org.apache.hadoop.fs.FSDataOutputStream;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.util.GenericOptionsParser;
32
33 import org.apache.hadoop.hbase.HConstants;
34 import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser;
35 import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
36 import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine;
37 import org.apache.hadoop.hbase.util.Bytes;
38 import org.apache.hadoop.hbase.HBaseTestingUtility;
39 import org.apache.hadoop.hbase.client.HTable;
40 import org.apache.hadoop.hbase.KeyValue;
41 import org.apache.hadoop.hbase.client.ResultScanner;
42 import org.apache.hadoop.hbase.client.Scan;
43 import org.apache.hadoop.hbase.client.HBaseAdmin;
44 import org.apache.hadoop.hbase.HColumnDescriptor;
45 import org.apache.hadoop.hbase.HTableDescriptor;
46 import org.apache.hadoop.hbase.MiniHBaseCluster;
47 import org.apache.hadoop.hbase.client.Result;
48
49 import org.junit.Test;
50
51 import com.google.common.base.Joiner;
52 import com.google.common.base.Splitter;
53 import com.google.common.collect.Iterables;
54
55 import static org.junit.Assert.*;
56
57 public class TestImportTsv {
58
59 @Test
60 public void testTsvParserSpecParsing() {
61 TsvParser parser;
62
63 parser = new TsvParser("HBASE_ROW_KEY", "\t");
64 assertNull(parser.getFamily(0));
65 assertNull(parser.getQualifier(0));
66 assertEquals(0, parser.getRowKeyColumnIndex());
67
68 parser = new TsvParser("HBASE_ROW_KEY,col1:scol1", "\t");
69 assertNull(parser.getFamily(0));
70 assertNull(parser.getQualifier(0));
71 assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
72 assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
73 assertEquals(0, parser.getRowKeyColumnIndex());
74
75 parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,col1:scol2", "\t");
76 assertNull(parser.getFamily(0));
77 assertNull(parser.getQualifier(0));
78 assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
79 assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
80 assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(2));
81 assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(2));
82 assertEquals(0, parser.getRowKeyColumnIndex());
83 }
84
85 @Test
86 public void testTsvParser() throws BadTsvLineException {
87 TsvParser parser = new TsvParser("col_a,col_b:qual,HBASE_ROW_KEY,col_d", "\t");
88 assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(0));
89 assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(0));
90 assertBytesEquals(Bytes.toBytes("col_b"), parser.getFamily(1));
91 assertBytesEquals(Bytes.toBytes("qual"), parser.getQualifier(1));
92 assertNull(parser.getFamily(2));
93 assertNull(parser.getQualifier(2));
94 assertEquals(2, parser.getRowKeyColumnIndex());
95
96 byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d");
97 ParsedLine parsed = parser.parse(line, line.length);
98 checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
99 }
100
101 private void checkParsing(ParsedLine parsed, Iterable<String> expected) {
102 ArrayList<String> parsedCols = new ArrayList<String>();
103 for (int i = 0; i < parsed.getColumnCount(); i++) {
104 parsedCols.add(Bytes.toString(
105 parsed.getLineBytes(),
106 parsed.getColumnOffset(i),
107 parsed.getColumnLength(i)));
108 }
109 if (!Iterables.elementsEqual(parsedCols, expected)) {
110 fail("Expected: " + Joiner.on(",").join(expected) + "\n" +
111 "Got:" + Joiner.on(",").join(parsedCols));
112 }
113 }
114
115 private void assertBytesEquals(byte[] a, byte[] b) {
116 assertEquals(Bytes.toStringBinary(a), Bytes.toStringBinary(b));
117 }
118
119
120
121
122 @Test(expected=BadTsvLineException.class)
123 public void testTsvParserBadTsvLineExcessiveColumns() throws BadTsvLineException {
124 TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
125 byte[] line = Bytes.toBytes("val_a\tval_b\tval_c");
126 ParsedLine parsed = parser.parse(line, line.length);
127 }
128
129 @Test(expected=BadTsvLineException.class)
130 public void testTsvParserBadTsvLineZeroColumn() throws BadTsvLineException {
131 TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
132 byte[] line = Bytes.toBytes("");
133 ParsedLine parsed = parser.parse(line, line.length);
134 }
135
136 @Test(expected=BadTsvLineException.class)
137 public void testTsvParserBadTsvLineOnlyKey() throws BadTsvLineException {
138 TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
139 byte[] line = Bytes.toBytes("key_only");
140 ParsedLine parsed = parser.parse(line, line.length);
141 }
142
143 @Test(expected=BadTsvLineException.class)
144 public void testTsvParserBadTsvLineNoRowKey() throws BadTsvLineException {
145 TsvParser parser = new TsvParser("col_a,HBASE_ROW_KEY", "\t");
146 byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key");
147 ParsedLine parsed = parser.parse(line, line.length);
148 }
149
150 @Test
151 public void testMROnTable()
152 throws Exception {
153 String TABLE_NAME = "TestTable";
154 String FAMILY = "FAM";
155 String INPUT_FILE = "InputFile.esv";
156
157
158 String[] args = new String[] {
159 "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B",
160 "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b",
161 TABLE_NAME,
162 INPUT_FILE
163 };
164
165
166 HBaseTestingUtility htu1 = new HBaseTestingUtility();
167
168 MiniHBaseCluster cluster = htu1.startMiniCluster();
169
170 GenericOptionsParser opts = new GenericOptionsParser(cluster.getConfiguration(), args);
171 Configuration conf = opts.getConfiguration();
172 args = opts.getRemainingArgs();
173
174 try {
175
176 FileSystem fs = FileSystem.get(conf);
177 FSDataOutputStream op = fs.create(new Path(INPUT_FILE), true);
178 String line = "KEY\u001bVALUE1\u001bVALUE2\n";
179 op.write(line.getBytes(HConstants.UTF8_ENCODING));
180 op.close();
181
182 final byte[] FAM = Bytes.toBytes(FAMILY);
183 final byte[] TAB = Bytes.toBytes(TABLE_NAME);
184 final byte[] QA = Bytes.toBytes("A");
185 final byte[] QB = Bytes.toBytes("B");
186
187 HTableDescriptor desc = new HTableDescriptor(TAB);
188 desc.addFamily(new HColumnDescriptor(FAM));
189 new HBaseAdmin(conf).createTable(desc);
190
191 Job job = ImportTsv.createSubmittableJob(conf, args);
192 job.waitForCompletion(false);
193 assertTrue(job.isSuccessful());
194
195 HTable table = new HTable(new Configuration(conf), TAB);
196 boolean verified = false;
197 long pause = conf.getLong("hbase.client.pause", 5 * 1000);
198 int numRetries = conf.getInt("hbase.client.retries.number", 5);
199 for (int i = 0; i < numRetries; i++) {
200 try {
201 Scan scan = new Scan();
202
203 scan.addFamily(FAM);
204 ResultScanner resScanner = table.getScanner(scan);
205 for (Result res : resScanner) {
206 assertTrue(res.size() == 2);
207 List<KeyValue> kvs = res.list();
208 assertEquals(toU8Str(kvs.get(0).getRow()),
209 toU8Str(Bytes.toBytes("KEY")));
210 assertEquals(toU8Str(kvs.get(1).getRow()),
211 toU8Str(Bytes.toBytes("KEY")));
212 assertEquals(toU8Str(kvs.get(0).getValue()),
213 toU8Str(Bytes.toBytes("VALUE1")));
214 assertEquals(toU8Str(kvs.get(1).getValue()),
215 toU8Str(Bytes.toBytes("VALUE2")));
216
217 }
218 verified = true;
219 break;
220 } catch (NullPointerException e) {
221
222
223 }
224 try {
225 Thread.sleep(pause);
226 } catch (InterruptedException e) {
227
228 }
229 }
230 assertTrue(verified);
231 } finally {
232 cluster.shutdown();
233 }
234 }
235
236 public static String toU8Str(byte[] bytes) throws UnsupportedEncodingException {
237 return new String(bytes, HConstants.UTF8_ENCODING);
238 }
239 }