1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapreduce;
21
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertFalse;
24 import static org.junit.Assert.assertNotSame;
25 import static org.junit.Assert.assertTrue;
26
27 import java.io.IOException;
28 import java.util.Arrays;
29 import java.util.Random;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.FileStatus;
35 import org.apache.hadoop.fs.FileSystem;
36 import org.apache.hadoop.fs.Path;
37 import org.apache.hadoop.hbase.HBaseConfiguration;
38 import org.apache.hadoop.hbase.HBaseTestingUtility;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.KeyValue;
41 import org.apache.hadoop.hbase.PerformanceEvaluation;
42 import org.apache.hadoop.hbase.client.HBaseAdmin;
43 import org.apache.hadoop.hbase.client.HTable;
44 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
45 import org.apache.hadoop.hbase.util.Bytes;
46 import org.apache.hadoop.hbase.util.Threads;
47 import org.apache.hadoop.io.NullWritable;
48 import org.apache.hadoop.mapreduce.Job;
49 import org.apache.hadoop.mapreduce.Mapper;
50 import org.apache.hadoop.mapreduce.RecordWriter;
51 import org.apache.hadoop.mapreduce.TaskAttemptContext;
52 import org.apache.hadoop.mapreduce.TaskAttemptID;
53 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
54 import org.junit.Before;
55 import org.junit.Test;
56 import org.mockito.Mockito;
57
58
59
60
61
62
63
64 public class TestHFileOutputFormat {
65 private final static int ROWSPERSPLIT = 1024;
66
67 private static final byte[] FAMILY_NAME = PerformanceEvaluation.FAMILY_NAME;
68 private static final byte[] TABLE_NAME = Bytes.toBytes("TestTable");
69
70 private HBaseTestingUtility util = new HBaseTestingUtility();
71
72 private static Log LOG = LogFactory.getLog(TestHFileOutputFormat.class);
73
74
75
76
77 static class RandomKVGeneratingMapper
78 extends Mapper<NullWritable, NullWritable,
79 ImmutableBytesWritable, KeyValue> {
80
81 private int keyLength;
82 private static final int KEYLEN_DEFAULT=10;
83 private static final String KEYLEN_CONF="randomkv.key.length";
84
85 private int valLength;
86 private static final int VALLEN_DEFAULT=10;
87 private static final String VALLEN_CONF="randomkv.val.length";
88
89 @Override
90 protected void setup(Context context) throws IOException,
91 InterruptedException {
92 super.setup(context);
93
94 Configuration conf = context.getConfiguration();
95 keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
96 valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
97 }
98
99 protected void map(
100 NullWritable n1, NullWritable n2,
101 Mapper<NullWritable, NullWritable,
102 ImmutableBytesWritable,KeyValue>.Context context)
103 throws java.io.IOException ,InterruptedException
104 {
105
106 byte keyBytes[] = new byte[keyLength];
107 byte valBytes[] = new byte[valLength];
108
109 int taskId = context.getTaskAttemptID().getTaskID().getId();
110 assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
111
112 Random random = new Random();
113 for (int i = 0; i < ROWSPERSPLIT; i++) {
114
115 random.nextBytes(keyBytes);
116
117 keyBytes[keyLength - 1] = (byte)(taskId & 0xFF);
118 random.nextBytes(valBytes);
119 ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
120
121 KeyValue kv = new KeyValue(keyBytes, PerformanceEvaluation.FAMILY_NAME,
122 PerformanceEvaluation.QUALIFIER_NAME, valBytes);
123 context.write(key, kv);
124 }
125 }
126 }
127
128 @Before
129 public void cleanupDir() throws IOException {
130 util.cleanupTestDir();
131 }
132
133
134 private void setupRandomGeneratorMapper(Job job) {
135 job.setInputFormatClass(NMapInputFormat.class);
136 job.setMapperClass(RandomKVGeneratingMapper.class);
137 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
138 job.setMapOutputValueClass(KeyValue.class);
139 }
140
141
142
143
144
145
146 @Test
147 public void test_LATEST_TIMESTAMP_isReplaced()
148 throws IOException, InterruptedException {
149 Configuration conf = new Configuration(this.util.getConfiguration());
150 RecordWriter<ImmutableBytesWritable, KeyValue> writer = null;
151 TaskAttemptContext context = null;
152 Path dir =
153 HBaseTestingUtility.getTestDir("test_LATEST_TIMESTAMP_isReplaced");
154 try {
155 Job job = new Job(conf);
156 FileOutputFormat.setOutputPath(job, dir);
157 context = new TaskAttemptContext(job.getConfiguration(),
158 new TaskAttemptID());
159 HFileOutputFormat hof = new HFileOutputFormat();
160 writer = hof.getRecordWriter(context);
161 final byte [] b = Bytes.toBytes("b");
162
163
164
165 KeyValue kv = new KeyValue(b, b, b);
166 KeyValue original = kv.clone();
167 writer.write(new ImmutableBytesWritable(), kv);
168 assertFalse(original.equals(kv));
169 assertTrue(Bytes.equals(original.getRow(), kv.getRow()));
170 assertTrue(original.matchingColumn(kv.getFamily(), kv.getQualifier()));
171 assertNotSame(original.getTimestamp(), kv.getTimestamp());
172 assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
173
174
175
176 kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
177 original = kv.clone();
178 writer.write(new ImmutableBytesWritable(), kv);
179 assertTrue(original.equals(kv));
180 } finally {
181 if (writer != null && context != null) writer.close(context);
182 dir.getFileSystem(conf).delete(dir, true);
183 }
184 }
185
186
187
188
189 @Test
190 public void testWritingPEData() throws Exception {
191 Configuration conf = util.getConfiguration();
192 Path testDir = HBaseTestingUtility.getTestDir("testWritingPEData");
193 FileSystem fs = testDir.getFileSystem(conf);
194
195
196 conf.setInt("io.sort.mb", 20);
197
198 conf.setLong("hbase.hregion.max.filesize", 64 * 1024);
199
200 Job job = new Job(conf, "testWritingPEData");
201 setupRandomGeneratorMapper(job);
202
203
204 byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
205 byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
206
207 Arrays.fill(startKey, (byte)0);
208 Arrays.fill(endKey, (byte)0xff);
209
210 job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
211
212 SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
213 SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
214 job.setReducerClass(KeyValueSortReducer.class);
215 job.setOutputFormatClass(HFileOutputFormat.class);
216 job.setNumReduceTasks(4);
217
218 FileOutputFormat.setOutputPath(job, testDir);
219 assertTrue(job.waitForCompletion(false));
220 FileStatus [] files = fs.listStatus(testDir);
221 assertTrue(files.length > 0);
222 }
223
224 @Test
225 public void testJobConfiguration() throws Exception {
226 Job job = new Job();
227 HTable table = Mockito.mock(HTable.class);
228 byte[][] mockKeys = new byte[][] {
229 HConstants.EMPTY_BYTE_ARRAY,
230 Bytes.toBytes("aaa"),
231 Bytes.toBytes("ggg"),
232 Bytes.toBytes("zzz")
233 };
234 Mockito.doReturn(mockKeys).when(table).getStartKeys();
235
236 HFileOutputFormat.configureIncrementalLoad(job, table);
237 assertEquals(job.getNumReduceTasks(), 4);
238 }
239
240 private byte [][] generateRandomStartKeys(int numKeys) {
241 Random random = new Random();
242 byte[][] ret = new byte[numKeys][];
243
244 ret[0] = HConstants.EMPTY_BYTE_ARRAY;
245 for (int i = 1; i < numKeys; i++) {
246 ret[i] = PerformanceEvaluation.generateValue(random);
247 }
248 return ret;
249 }
250
251 @Test
252 public void testMRIncrementalLoad() throws Exception {
253 doIncrementalLoadTest(false);
254 }
255
256 @Test
257 public void testMRIncrementalLoadWithSplit() throws Exception {
258 doIncrementalLoadTest(true);
259 }
260
261 private void doIncrementalLoadTest(
262 boolean shouldChangeRegions) throws Exception {
263 Configuration conf = util.getConfiguration();
264 Path testDir = HBaseTestingUtility.getTestDir("testLocalMRIncrementalLoad");
265 byte[][] startKeys = generateRandomStartKeys(5);
266
267 try {
268 util.startMiniCluster();
269 HBaseAdmin admin = new HBaseAdmin(conf);
270 HTable table = util.createTable(TABLE_NAME, FAMILY_NAME);
271 int numRegions = util.createMultiRegions(
272 util.getConfiguration(), table, FAMILY_NAME,
273 startKeys);
274 assertEquals("Should make 5 regions",
275 numRegions, 5);
276 assertEquals("Should start with empty table",
277 0, util.countRows(table));
278
279
280 util.startMiniMapReduceCluster();
281 runIncrementalPELoad(conf, table, testDir);
282
283 assertEquals("HFOF should not touch actual table",
284 0, util.countRows(table));
285
286 if (shouldChangeRegions) {
287 LOG.info("Changing regions in table");
288 admin.disableTable(table.getTableName());
289 while(util.getMiniHBaseCluster().getMaster().getAssignmentManager().
290 isRegionsInTransition()) {
291 Threads.sleep(1000);
292 LOG.info("Waiting on table to finish disabling");
293 }
294 byte[][] newStartKeys = generateRandomStartKeys(15);
295 util.createMultiRegions(util.getConfiguration(),
296 table, FAMILY_NAME, newStartKeys);
297 admin.enableTable(table.getTableName());
298 while (table.getRegionsInfo().size() != 15 ||
299 !admin.isTableAvailable(table.getTableName())) {
300 Thread.sleep(1000);
301 LOG.info("Waiting for new region assignment to happen");
302 }
303 }
304
305
306 new LoadIncrementalHFiles(conf).doBulkLoad(testDir, table);
307
308
309 int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
310 assertEquals("LoadIncrementalHFiles should put expected data in table",
311 expectedRows, util.countRows(table));
312 String tableDigestBefore = util.checksumRows(table);
313
314
315 admin.disableTable(TABLE_NAME);
316 while (!admin.isTableDisabled(TABLE_NAME)) {
317 Thread.sleep(1000);
318 LOG.info("Waiting for table to disable");
319 }
320 admin.enableTable(TABLE_NAME);
321 util.waitTableAvailable(TABLE_NAME, 30000);
322 assertEquals("Data should remain after reopening of regions",
323 tableDigestBefore, util.checksumRows(table));
324 } finally {
325 util.shutdownMiniMapReduceCluster();
326 util.shutdownMiniCluster();
327 }
328 }
329
330 private void runIncrementalPELoad(
331 Configuration conf, HTable table, Path outDir)
332 throws Exception {
333 Job job = new Job(conf, "testLocalMRIncrementalLoad");
334 setupRandomGeneratorMapper(job);
335 HFileOutputFormat.configureIncrementalLoad(job, table);
336 FileOutputFormat.setOutputPath(job, outDir);
337
338 assertEquals(table.getRegionsInfo().size(),
339 job.getNumReduceTasks());
340
341 assertTrue(job.waitForCompletion(true));
342 }
343
344 public static void main(String args[]) throws Exception {
345 new TestHFileOutputFormat().manualTest(args);
346 }
347
348 public void manualTest(String args[]) throws Exception {
349 Configuration conf = HBaseConfiguration.create();
350 util = new HBaseTestingUtility(conf);
351 if ("newtable".equals(args[0])) {
352 byte[] tname = args[1].getBytes();
353 HTable table = util.createTable(tname, FAMILY_NAME);
354 HBaseAdmin admin = new HBaseAdmin(conf);
355 admin.disableTable(tname);
356 util.createMultiRegions(conf, table, FAMILY_NAME,
357 generateRandomStartKeys(5));
358 admin.enableTable(tname);
359 } else if ("incremental".equals(args[0])) {
360 byte[] tname = args[1].getBytes();
361 HTable table = new HTable(conf, tname);
362 Path outDir = new Path("incremental-out");
363 runIncrementalPELoad(conf, table, outDir);
364 } else {
365 throw new RuntimeException(
366 "usage: TestHFileOutputFormat newtable | incremental");
367 }
368 }
369 }