1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapreduce;
21
22 import static org.junit.Assert.assertEquals;
23
24 import java.io.IOException;
25
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.fs.FileSystem;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.HBaseTestingUtility;
30 import org.apache.hadoop.hbase.HColumnDescriptor;
31 import org.apache.hadoop.hbase.HTableDescriptor;
32 import org.apache.hadoop.hbase.KeyValue;
33 import org.apache.hadoop.hbase.client.HBaseAdmin;
34 import org.apache.hadoop.hbase.client.HTable;
35 import org.apache.hadoop.hbase.io.hfile.Compression;
36 import org.apache.hadoop.hbase.io.hfile.HFile;
37 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
38 import org.apache.hadoop.hbase.util.Bytes;
39 import org.junit.Test;
40
41 import static org.junit.Assert.*;
42
43
44
45
46
47
48 public class TestLoadIncrementalHFiles {
49
50 private static final byte[] TABLE = Bytes.toBytes("mytable");
51 private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
52 private static final byte[] FAMILY = Bytes.toBytes("myfam");
53
54 private static final byte[][] SPLIT_KEYS = new byte[][] {
55 Bytes.toBytes("ddd"),
56 Bytes.toBytes("ppp")
57 };
58
59 public static int BLOCKSIZE = 64*1024;
60 public static String COMPRESSION =
61 Compression.Algorithm.NONE.getName();
62
63 private HBaseTestingUtility util = new HBaseTestingUtility();
64
65
66
67
68
69 @Test
70 public void testSimpleLoad() throws Exception {
71 runTest("testSimpleLoad",
72 new byte[][][] {
73 new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
74 new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
75 });
76 }
77
78
79
80
81
82 @Test
83 public void testRegionCrossingLoad() throws Exception {
84 runTest("testRegionCrossingLoad",
85 new byte[][][] {
86 new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
87 new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
88 });
89 }
90
91 private void runTest(String testName, byte[][][] hfileRanges)
92 throws Exception {
93 Path dir = HBaseTestingUtility.getTestDir(testName);
94 FileSystem fs = util.getTestFileSystem();
95 dir = dir.makeQualified(fs);
96 Path familyDir = new Path(dir, Bytes.toString(FAMILY));
97
98 int hfileIdx = 0;
99 for (byte[][] range : hfileRanges) {
100 byte[] from = range[0];
101 byte[] to = range[1];
102 createHFile(fs, new Path(familyDir, "hfile_" + hfileIdx++),
103 FAMILY, QUALIFIER, from, to, 1000);
104 }
105 int expectedRows = hfileIdx * 1000;
106
107
108 util.startMiniCluster();
109 try {
110 HBaseAdmin admin = new HBaseAdmin(util.getConfiguration());
111 HTableDescriptor htd = new HTableDescriptor(TABLE);
112 htd.addFamily(new HColumnDescriptor(FAMILY));
113 admin.createTable(htd, SPLIT_KEYS);
114
115 HTable table = new HTable(util.getConfiguration(), TABLE);
116 util.waitTableAvailable(TABLE, 30000);
117 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(
118 util.getConfiguration());
119 loader.doBulkLoad(dir, table);
120
121 assertEquals(expectedRows, util.countRows(table));
122 } finally {
123 util.shutdownMiniCluster();
124 }
125 }
126
127 @Test
128 public void testSplitStoreFile() throws IOException {
129 Path dir = HBaseTestingUtility.getTestDir("testSplitHFile");
130 FileSystem fs = util.getTestFileSystem();
131 Path testIn = new Path(dir, "testhfile");
132 HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
133 createHFile(fs, testIn, FAMILY, QUALIFIER,
134 Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
135
136 Path bottomOut = new Path(dir, "bottom.out");
137 Path topOut = new Path(dir, "top.out");
138
139 LoadIncrementalHFiles.splitStoreFile(
140 util.getConfiguration(), testIn,
141 familyDesc, Bytes.toBytes("ggg"),
142 bottomOut,
143 topOut);
144
145 int rowCount = verifyHFile(bottomOut);
146 rowCount += verifyHFile(topOut);
147 assertEquals(1000, rowCount);
148 }
149
150 private int verifyHFile(Path p) throws IOException {
151 Configuration conf = util.getConfiguration();
152 HFile.Reader reader = new HFile.Reader(
153 p.getFileSystem(conf), p, null, false);
154 reader.loadFileInfo();
155 HFileScanner scanner = reader.getScanner(false, false);
156 scanner.seekTo();
157 int count = 0;
158 do {
159 count++;
160 } while (scanner.next());
161 assertTrue(count > 0);
162 return count;
163 }
164
165
166
167
168
169
170
171 static void createHFile(
172 FileSystem fs, Path path,
173 byte[] family, byte[] qualifier,
174 byte[] startKey, byte[] endKey, int numRows) throws IOException
175 {
176 HFile.Writer writer = new HFile.Writer(fs, path, BLOCKSIZE, COMPRESSION,
177 KeyValue.KEY_COMPARATOR);
178 long now = System.currentTimeMillis();
179 try {
180
181 for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, numRows-2)) {
182 KeyValue kv = new KeyValue(key, family, qualifier, now, key);
183 writer.append(kv);
184 }
185 } finally {
186 writer.close();
187 }
188 }
189 }