1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver.wal;
21
22 import static org.junit.Assert.*;
23
24 import java.io.FileNotFoundException;
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.FSDataInputStream;
37 import org.apache.hadoop.fs.FSDataOutputStream;
38 import org.apache.hadoop.fs.FileStatus;
39 import org.apache.hadoop.fs.FileSystem;
40 import org.apache.hadoop.fs.FileUtil;
41 import org.apache.hadoop.fs.Path;
42 import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
43 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
44 import org.apache.hadoop.hbase.HBaseTestingUtility;
45 import org.apache.hadoop.hbase.HConstants;
46 import org.apache.hadoop.hbase.HRegionInfo;
47 import org.apache.hadoop.hbase.HTableDescriptor;
48 import org.apache.hadoop.hbase.KeyValue;
49 import org.apache.hadoop.hbase.regionserver.HRegion;
50 import org.apache.hadoop.hbase.util.Bytes;
51 import org.apache.hadoop.hbase.util.Threads;
52 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
53 import org.apache.hadoop.ipc.RemoteException;
54 import org.junit.After;
55 import org.junit.AfterClass;
56 import org.junit.Assert;
57 import org.junit.Before;
58 import org.junit.BeforeClass;
59 import org.junit.Test;
60 import org.mockito.Mockito;
61 import org.mockito.invocation.InvocationOnMock;
62 import org.mockito.stubbing.Answer;
63
64 import com.google.common.base.Joiner;
65 import com.google.common.collect.ImmutableList;
66
67
68
69
70 public class TestHLogSplit {
71
72 private final static Log LOG = LogFactory.getLog(TestHLogSplit.class);
73
74 private Configuration conf;
75 private FileSystem fs;
76
77 private final static HBaseTestingUtility
78 TEST_UTIL = new HBaseTestingUtility();
79
80
81 private static final Path hbaseDir = new Path("/hbase");
82 private static final Path hlogDir = new Path(hbaseDir, "hlog");
83 private static final Path oldLogDir = new Path(hbaseDir, "hlog.old");
84 private static final Path corruptDir = new Path(hbaseDir, ".corrupt");
85
86 private static final int NUM_WRITERS = 10;
87 private static final int ENTRIES = 10;
88
89 private HLog.Writer[] writer = new HLog.Writer[NUM_WRITERS];
90 private long seq = 0;
91 private static final byte[] TABLE_NAME = "t1".getBytes();
92 private static final byte[] FAMILY = "f1".getBytes();
93 private static final byte[] QUALIFIER = "q1".getBytes();
94 private static final byte[] VALUE = "v1".getBytes();
95 private static final String HLOG_FILE_PREFIX = "hlog.dat.";
96 private static List<String> regions;
97 private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors";
98 private static final Path tabledir =
99 new Path(hbaseDir, Bytes.toString(TABLE_NAME));
100
101 static enum Corruptions {
102 INSERT_GARBAGE_ON_FIRST_LINE,
103 INSERT_GARBAGE_IN_THE_MIDDLE,
104 APPEND_GARBAGE,
105 TRUNCATE,
106 }
107
108 @BeforeClass
109 public static void setUpBeforeClass() throws Exception {
110 TEST_UTIL.getConfiguration().
111 setBoolean("dfs.support.append", true);
112 TEST_UTIL.getConfiguration().
113 setStrings("hbase.rootdir", hbaseDir.toString());
114 TEST_UTIL.getConfiguration().
115 setClass("hbase.regionserver.hlog.writer.impl",
116 InstrumentedSequenceFileLogWriter.class, HLog.Writer.class);
117
118 TEST_UTIL.startMiniDFSCluster(2);
119 }
120
121 @AfterClass
122 public static void tearDownAfterClass() throws Exception {
123 TEST_UTIL.shutdownMiniDFSCluster();
124 }
125
126 @Before
127 public void setUp() throws Exception {
128 flushToConsole("Cleaning up cluster for new test\n"
129 + "--------------------------");
130 conf = TEST_UTIL.getConfiguration();
131 fs = TEST_UTIL.getDFSCluster().getFileSystem();
132 FileStatus[] entries = fs.listStatus(new Path("/"));
133 flushToConsole("Num entries in /:" + entries.length);
134 for (FileStatus dir : entries){
135 assertTrue("Deleting " + dir.getPath(),
136 fs.delete(dir.getPath(), true));
137 }
138 seq = 0;
139 regions = new ArrayList<String>();
140 Collections.addAll(regions, "bbb", "ccc");
141 InstrumentedSequenceFileLogWriter.activateFailure = false;
142
143 TEST_UTIL.setNameNodeNameSystemLeasePeriod(100, 50000);
144 }
145
146 @After
147 public void tearDown() throws Exception {
148 }
149
150
151
152
153
154 @Test public void testRecoveredEditsPathForMeta() throws IOException {
155 FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
156 byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
157 Path tdir = new Path(hbaseDir, Bytes.toString(HConstants.META_TABLE_NAME));
158 Path regiondir = new Path(tdir,
159 HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
160 fs.mkdirs(regiondir);
161 long now = System.currentTimeMillis();
162 HLog.Entry entry =
163 new HLog.Entry(new HLogKey(encoded, HConstants.META_TABLE_NAME, 1, now),
164 new WALEdit());
165 Path p = HLogSplitter.getRegionSplitEditsPath(fs, entry, hbaseDir);
166 String parentOfParent = p.getParent().getParent().getName();
167 assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
168 }
169
170 @Test(expected = OrphanHLogAfterSplitException.class)
171 public void testSplitFailsIfNewHLogGetsCreatedAfterSplitStarted()
172 throws IOException {
173 AtomicBoolean stop = new AtomicBoolean(false);
174
175 FileStatus[] stats = fs.listStatus(new Path("/hbase/t1"));
176 assertTrue("Previous test should clean up table dir",
177 stats == null || stats.length == 0);
178
179 generateHLogs(-1);
180
181 try {
182 (new ZombieNewLogWriterRegionServer(stop)).start();
183 HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
184 hbaseDir, hlogDir, oldLogDir, fs);
185 logSplitter.splitLog();
186 } finally {
187 stop.set(true);
188 }
189 }
190
191 @Test
192 public void testSplitPreservesEdits() throws IOException{
193 final String REGION = "region__1";
194 regions.removeAll(regions);
195 regions.add(REGION);
196
197 generateHLogs(1, 10, -1);
198 fs.initialize(fs.getUri(), conf);
199 HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
200 hbaseDir, hlogDir, oldLogDir, fs);
201 logSplitter.splitLog();
202
203 Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
204 Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION);
205
206 assertEquals("edits differ after split", true, logsAreEqual(originalLog, splitLog));
207 }
208
209
210 @Test
211 public void testEmptyLogFiles() throws IOException {
212
213 injectEmptyFile(".empty", true);
214 generateHLogs(Integer.MAX_VALUE);
215 injectEmptyFile("empty", true);
216
217
218
219 fs.initialize(fs.getUri(), conf);
220
221 HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
222 hbaseDir, hlogDir, oldLogDir, fs);
223 logSplitter.splitLog();
224
225
226 for (String region : regions) {
227 Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
228 assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
229 }
230
231 }
232
233
234 @Test
235 public void testEmptyOpenLogFiles() throws IOException {
236 injectEmptyFile(".empty", false);
237 generateHLogs(Integer.MAX_VALUE);
238 injectEmptyFile("empty", false);
239
240
241
242 fs.initialize(fs.getUri(), conf);
243
244 HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
245 hbaseDir, hlogDir, oldLogDir, fs);
246 logSplitter.splitLog();
247
248 for (String region : regions) {
249 Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
250 assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
251 }
252 }
253
254 @Test
255 public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException {
256
257 generateHLogs(5);
258
259 fs.initialize(fs.getUri(), conf);
260
261 HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
262 hbaseDir, hlogDir, oldLogDir, fs);
263 logSplitter.splitLog();
264
265 for (String region : regions) {
266 Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
267 assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
268 }
269
270
271 }
272
273
274 @Test
275 public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
276 conf.setBoolean(HBASE_SKIP_ERRORS, true);
277 generateHLogs(Integer.MAX_VALUE);
278 corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
279 Corruptions.APPEND_GARBAGE, true, fs);
280 fs.initialize(fs.getUri(), conf);
281
282 HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
283 hbaseDir, hlogDir, oldLogDir, fs);
284 logSplitter.splitLog();
285 for (String region : regions) {
286 Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
287 assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
288 }
289
290
291 }
292
293 @Test
294 public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
295 conf.setBoolean(HBASE_SKIP_ERRORS, true);
296 generateHLogs(Integer.MAX_VALUE);
297 corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
298 Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
299 fs.initialize(fs.getUri(), conf);
300
301 HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
302 hbaseDir, hlogDir, oldLogDir, fs);
303 logSplitter.splitLog();
304 for (String region : regions) {
305 Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
306 assertEquals((NUM_WRITERS - 1) * ENTRIES, countHLog(logfile, fs, conf));
307 }
308
309
310 }
311
312
313 @Test
314 public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
315 conf.setBoolean(HBASE_SKIP_ERRORS, true);
316 generateHLogs(Integer.MAX_VALUE);
317 corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
318 Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs);
319 fs.initialize(fs.getUri(), conf);
320 HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
321 hbaseDir, hlogDir, oldLogDir, fs);
322 logSplitter.splitLog();
323
324 for (String region : regions) {
325 Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
326
327
328
329 int goodEntries = (NUM_WRITERS - 1) * ENTRIES;
330 int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1;
331 assertTrue("The file up to the corrupted area hasn't been parsed",
332 goodEntries + firstHalfEntries <= countHLog(logfile, fs, conf));
333 }
334 }
335
336 @Test
337 public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
338 conf.setBoolean(HBASE_SKIP_ERRORS, true);
339 Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
340 Reader.class);
341 InstrumentedSequenceFileLogWriter.activateFailure = false;
342 HLog.resetLogReaderClass();
343
344 try {
345 Path c1 = new Path(hlogDir, HLOG_FILE_PREFIX + "0");
346 conf.setClass("hbase.regionserver.hlog.reader.impl",
347 FaultySequenceFileLogReader.class, HLog.Reader.class);
348 for (FaultySequenceFileLogReader.FailureType failureType : FaultySequenceFileLogReader.FailureType.values()) {
349 conf.set("faultysequencefilelogreader.failuretype", failureType.name());
350 generateHLogs(1, ENTRIES, -1);
351 fs.initialize(fs.getUri(), conf);
352 HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
353 hbaseDir, hlogDir, oldLogDir, fs);
354 logSplitter.splitLog();
355 FileStatus[] archivedLogs = fs.listStatus(corruptDir);
356 assertEquals("expected a different file", c1.getName(), archivedLogs[0]
357 .getPath().getName());
358 assertEquals(archivedLogs.length, 1);
359 fs.delete(new Path(oldLogDir, HLOG_FILE_PREFIX + "0"), false);
360 }
361 } finally {
362 conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
363 Reader.class);
364 HLog.resetLogReaderClass();
365 }
366 }
367
368 @Test(expected = IOException.class)
369 public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows()
370 throws IOException {
371 conf.setBoolean(HBASE_SKIP_ERRORS, false);
372 Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
373 Reader.class);
374 InstrumentedSequenceFileLogWriter.activateFailure = false;
375 HLog.resetLogReaderClass();
376
377 try {
378 conf.setClass("hbase.regionserver.hlog.reader.impl",
379 FaultySequenceFileLogReader.class, HLog.Reader.class);
380 conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name());
381 generateHLogs(Integer.MAX_VALUE);
382 fs.initialize(fs.getUri(), conf);
383 HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
384 hbaseDir, hlogDir, oldLogDir, fs);
385 logSplitter.splitLog();
386 } finally {
387 conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
388 Reader.class);
389 HLog.resetLogReaderClass();
390 }
391
392 }
393
394 @Test
395 public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs()
396 throws IOException {
397 conf.setBoolean(HBASE_SKIP_ERRORS, false);
398 Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
399 Reader.class);
400 InstrumentedSequenceFileLogWriter.activateFailure = false;
401 HLog.resetLogReaderClass();
402
403 try {
404 conf.setClass("hbase.regionserver.hlog.reader.impl",
405 FaultySequenceFileLogReader.class, HLog.Reader.class);
406 conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name());
407 generateHLogs(-1);
408 fs.initialize(fs.getUri(), conf);
409 HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
410 hbaseDir, hlogDir, oldLogDir, fs);
411 try {
412 logSplitter.splitLog();
413 } catch (IOException e) {
414 assertEquals(
415 "if skip.errors is false all files should remain in place",
416 NUM_WRITERS, fs.listStatus(hlogDir).length);
417 }
418 } finally {
419 conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
420 Reader.class);
421 HLog.resetLogReaderClass();
422 }
423
424 }
425
426 @Test
427 public void testEOFisIgnored() throws IOException {
428 conf.setBoolean(HBASE_SKIP_ERRORS, false);
429
430 final String REGION = "region__1";
431 regions.removeAll(regions);
432 regions.add(REGION);
433
434 int entryCount = 10;
435 Path c1 = new Path(hlogDir, HLOG_FILE_PREFIX + "0");
436 generateHLogs(1, entryCount, -1);
437 corruptHLog(c1, Corruptions.TRUNCATE, true, fs);
438
439 fs.initialize(fs.getUri(), conf);
440 HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
441 hbaseDir, hlogDir, oldLogDir, fs);
442 logSplitter.splitLog();
443
444 Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
445 Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION);
446
447 int actualCount = 0;
448 HLog.Reader in = HLog.getReader(fs, splitLog, conf);
449 HLog.Entry entry;
450 while ((entry = in.next()) != null) ++actualCount;
451 assertEquals(entryCount-1, actualCount);
452
453
454 FileStatus[] archivedLogs = fs.listStatus(corruptDir);
455 assertEquals(archivedLogs.length, 0);
456 }
457
458 @Test
459 public void testLogsGetArchivedAfterSplit() throws IOException {
460 conf.setBoolean(HBASE_SKIP_ERRORS, false);
461
462 generateHLogs(-1);
463
464 fs.initialize(fs.getUri(), conf);
465 HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
466 hbaseDir, hlogDir, oldLogDir, fs);
467 logSplitter.splitLog();
468
469 FileStatus[] archivedLogs = fs.listStatus(oldLogDir);
470
471 assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
472 }
473
474 @Test
475 public void testSplit() throws IOException {
476 generateHLogs(-1);
477 fs.initialize(fs.getUri(), conf);
478 HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
479 hbaseDir, hlogDir, oldLogDir, fs);
480 logSplitter.splitLog();
481
482 for (String region : regions) {
483 Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
484 assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
485
486 }
487 }
488
489 @Test
490 public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit()
491 throws IOException {
492 generateHLogs(-1);
493 fs.initialize(fs.getUri(), conf);
494 HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
495 hbaseDir, hlogDir, oldLogDir, fs);
496 logSplitter.splitLog();
497 FileStatus [] statuses = null;
498 try {
499 statuses = fs.listStatus(hlogDir);
500 if (statuses != null) {
501 Assert.fail("Files left in log dir: " +
502 Joiner.on(",").join(FileUtil.stat2Paths(statuses)));
503 }
504 } catch (FileNotFoundException e) {
505
506 }
507 }
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539 @Test
540 public void testSplitWillNotTouchLogsIfNewHLogGetsCreatedAfterSplitStarted()
541 throws IOException {
542 AtomicBoolean stop = new AtomicBoolean(false);
543 generateHLogs(-1);
544 fs.initialize(fs.getUri(), conf);
545 Thread zombie = new ZombieNewLogWriterRegionServer(stop);
546
547 try {
548 zombie.start();
549 try {
550 HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
551 hbaseDir, hlogDir, oldLogDir, fs);
552 logSplitter.splitLog();
553 } catch (IOException ex) {
554 int logFilesNumber = fs.listStatus(hlogDir).length;
555
556 assertEquals("Log files should not be archived if there's an extra file after split",
557 NUM_WRITERS + 1, logFilesNumber);
558 } finally {
559 stop.set(true);
560 }
561
562 }
563
564
565
566 @Test(expected = IOException.class)
567 public void testSplitWillFailIfWritingToRegionFails() throws Exception {
568
569 generateHLogs(4);
570
571 fs.initialize(fs.getUri(), conf);
572
573 String region = "break";
574 Path regiondir = new Path(tabledir, region);
575 fs.mkdirs(regiondir);
576
577 InstrumentedSequenceFileLogWriter.activateFailure = false;
578 appendEntry(writer[4], TABLE_NAME, Bytes.toBytes(region),
579 ("r" + 999).getBytes(), FAMILY, QUALIFIER, VALUE, 0);
580 writer[4].close();
581
582 try {
583 InstrumentedSequenceFileLogWriter.activateFailure = true;
584 HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
585 hbaseDir, hlogDir, oldLogDir, fs);
586 logSplitter.splitLog();
587
588 } catch (IOException e) {
589 assertEquals("This exception is instrumented and should only be thrown for testing", e.getMessage());
590 throw e;
591 } finally {
592 InstrumentedSequenceFileLogWriter.activateFailure = false;
593 }
594 }
595
596
597
598
599
600
601 public void testSplittingLargeNumberOfRegionsConsistency() throws IOException {
602
603 regions.removeAll(regions);
604 for (int i=0; i<100; i++) {
605 regions.add("region__"+i);
606 }
607
608 generateHLogs(1, 100, -1);
609 fs.initialize(fs.getUri(), conf);
610
611 HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
612 hbaseDir, hlogDir, oldLogDir, fs);
613 logSplitter.splitLog();
614 fs.rename(oldLogDir, hlogDir);
615 Path firstSplitPath = new Path(hbaseDir, Bytes.toString(TABLE_NAME) + ".first");
616 Path splitPath = new Path(hbaseDir, Bytes.toString(TABLE_NAME));
617 fs.rename(splitPath,
618 firstSplitPath);
619
620
621 fs.initialize(fs.getUri(), conf);
622 logSplitter = HLogSplitter.createLogSplitter(conf,
623 hbaseDir, hlogDir, oldLogDir, fs);
624 logSplitter.splitLog();
625
626 assertEquals(0, compareHLogSplitDirs(firstSplitPath, splitPath));
627 }
628
629 @Test
630 public void testSplitDeletedRegion() throws IOException {
631 regions.removeAll(regions);
632 String region = "region_that_splits";
633 regions.add(region);
634
635 generateHLogs(1);
636
637 fs.initialize(fs.getUri(), conf);
638
639 Path regiondir = new Path(tabledir, region);
640 fs.delete(regiondir, true);
641
642 HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
643 hbaseDir, hlogDir, oldLogDir, fs);
644 logSplitter.splitLog();
645
646 assertFalse(fs.exists(regiondir));
647 }
648
649 @Test
650 public void testIOEOnOutputThread() throws Exception {
651 conf.setBoolean(HBASE_SKIP_ERRORS, false);
652
653 generateHLogs(-1);
654
655 fs.initialize(fs.getUri(), conf);
656
657 HLogSplitter logSplitter = new HLogSplitter(
658 conf, hbaseDir, hlogDir, oldLogDir, fs) {
659 protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
660 throws IOException {
661 HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
662 Mockito.doThrow(new IOException("Injected")).when(mockWriter).append(Mockito.<HLog.Entry>any());
663 return mockWriter;
664
665 }
666 };
667 try {
668 logSplitter.splitLog();
669 fail("Didn't throw!");
670 } catch (IOException ioe) {
671 assertTrue(ioe.toString().contains("Injected"));
672 }
673 }
674
675
676 @Test
677 public void testMovedHLogDuringRecovery() throws Exception {
678 generateHLogs(-1);
679
680 fs.initialize(fs.getUri(), conf);
681
682
683
684 FileSystem spiedFs = Mockito.spy(fs);
685
686
687 Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).
688 when(spiedFs).append(Mockito.<Path>any());
689
690 HLogSplitter logSplitter = new HLogSplitter(
691 conf, hbaseDir, hlogDir, oldLogDir, spiedFs);
692
693 try {
694 logSplitter.splitLog();
695 assertEquals(NUM_WRITERS, fs.listStatus(oldLogDir).length);
696 assertFalse(fs.exists(hlogDir));
697 } catch (IOException e) {
698 fail("There shouldn't be any exception but: " + e.toString());
699 }
700 }
701
702
703
704
705
706 @Test
707 public void testThreading() throws Exception {
708 doTestThreading(20000, 128*1024*1024, 0);
709 }
710
711
712
713
714
715 @Test
716 public void testThreadingSlowWriterSmallBuffer() throws Exception {
717 doTestThreading(200, 1024, 50);
718 }
719
720
721
722
723
724
725
726
727
728
729
730
731
732 private void doTestThreading(final int numFakeEdits,
733 final int bufferSize,
734 final int writerSlowness) throws Exception {
735
736 Configuration localConf = new Configuration(conf);
737 localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize);
738
739
740 FSDataOutputStream out = fs.create(new Path(hlogDir, HLOG_FILE_PREFIX + ".fake"));
741 out.close();
742
743
744 final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4");
745 makeRegionDirs(fs, regions);
746
747
748 HLogSplitter logSplitter = new HLogSplitter(
749 localConf, hbaseDir, hlogDir, oldLogDir, fs) {
750
751
752 protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
753 throws IOException {
754 HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
755 Mockito.doAnswer(new Answer<Void>() {
756 int expectedIndex = 0;
757
758 @Override
759 public Void answer(InvocationOnMock invocation) {
760 if (writerSlowness > 0) {
761 try {
762 Thread.sleep(writerSlowness);
763 } catch (InterruptedException ie) {
764 Thread.currentThread().interrupt();
765 }
766 }
767 HLog.Entry entry = (Entry) invocation.getArguments()[0];
768 WALEdit edit = entry.getEdit();
769 List<KeyValue> keyValues = edit.getKeyValues();
770 assertEquals(1, keyValues.size());
771 KeyValue kv = keyValues.get(0);
772
773
774 assertEquals(expectedIndex, Bytes.toInt(kv.getRow()));
775 expectedIndex++;
776 return null;
777 }
778 }).when(mockWriter).append(Mockito.<HLog.Entry>any());
779 return mockWriter;
780 }
781
782
783
784 protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf)
785 throws IOException {
786 Reader mockReader = Mockito.mock(Reader.class);
787 Mockito.doAnswer(new Answer<HLog.Entry>() {
788 int index = 0;
789
790 @Override
791 public HLog.Entry answer(InvocationOnMock invocation) throws Throwable {
792 if (index >= numFakeEdits) return null;
793
794
795 int regionIdx = index % regions.size();
796 byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)};
797
798 HLog.Entry ret = createTestEntry(TABLE_NAME, region,
799 Bytes.toBytes((int)(index / regions.size())),
800 FAMILY, QUALIFIER, VALUE, index);
801 index++;
802 return ret;
803 }
804 }).when(mockReader).next();
805 return mockReader;
806 }
807 };
808
809 logSplitter.splitLog();
810
811
812
813 Map<byte[], Long> outputCounts = logSplitter.getOutputCounts();
814 for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) {
815 LOG.info("Got " + entry.getValue() + " output edits for region " +
816 Bytes.toString(entry.getKey()));
817
818 assertEquals((long)entry.getValue(), numFakeEdits / regions.size());
819 }
820 assertEquals(regions.size(), outputCounts.size());
821 }
822
823
824
825
826
827
828
829
830 class ZombieLastLogWriterRegionServer extends Thread {
831 AtomicLong editsCount;
832 AtomicBoolean stop;
833 Path log;
834 HLog.Writer lastLogWriter;
835 public ZombieLastLogWriterRegionServer(HLog.Writer writer, AtomicLong counter, AtomicBoolean stop) {
836 this.stop = stop;
837 this.editsCount = counter;
838 this.lastLogWriter = writer;
839 }
840
841 @Override
842 public void run() {
843 if (stop.get()){
844 return;
845 }
846 flushToConsole("starting");
847 while (true) {
848 try {
849 String region = "juliet";
850
851 fs.mkdirs(new Path(new Path(hbaseDir, region), region));
852 appendEntry(lastLogWriter, TABLE_NAME, region.getBytes(),
853 ("r" + editsCount).getBytes(), FAMILY, QUALIFIER, VALUE, 0);
854 lastLogWriter.sync();
855 editsCount.incrementAndGet();
856 try {
857 Thread.sleep(1);
858 } catch (InterruptedException e) {
859
860 }
861
862
863 } catch (IOException ex) {
864 if (ex instanceof RemoteException) {
865 flushToConsole("Juliet: got RemoteException " +
866 ex.getMessage() + " while writing " + (editsCount.get() + 1));
867 break;
868 } else {
869 assertTrue("Failed to write " + editsCount.get(), false);
870 }
871
872 }
873 }
874
875
876 }
877 }
878
879
880
881
882
883
884 class ZombieNewLogWriterRegionServer extends Thread {
885 AtomicBoolean stop;
886 public ZombieNewLogWriterRegionServer(AtomicBoolean stop) {
887 super("ZombieNewLogWriterRegionServer");
888 this.stop = stop;
889 }
890
891 @Override
892 public void run() {
893 if (stop.get()) {
894 return;
895 }
896 Path tableDir = new Path(hbaseDir, new String(TABLE_NAME));
897 Path regionDir = new Path(tableDir, regions.get(0));
898 Path recoveredEdits = new Path(regionDir, HLogSplitter.RECOVERED_EDITS);
899 String region = "juliet";
900 Path julietLog = new Path(hlogDir, HLOG_FILE_PREFIX + ".juliet");
901 try {
902
903 while (!fs.exists(recoveredEdits) && !stop.get()) {
904 flushToConsole("Juliet: split not started, sleeping a bit...");
905 Threads.sleep(10);
906 }
907
908 fs.mkdirs(new Path(tableDir, region));
909 HLog.Writer writer = HLog.createWriter(fs,
910 julietLog, conf);
911 appendEntry(writer, "juliet".getBytes(), ("juliet").getBytes(),
912 ("r").getBytes(), FAMILY, QUALIFIER, VALUE, 0);
913 writer.close();
914 flushToConsole("Juliet file creator: created file " + julietLog);
915 } catch (IOException e1) {
916 assertTrue("Failed to create file " + julietLog, false);
917 }
918 }
919 }
920
921 private void flushToConsole(String s) {
922 System.out.println(s);
923 System.out.flush();
924 }
925
926
927 private void generateHLogs(int leaveOpen) throws IOException {
928 generateHLogs(NUM_WRITERS, ENTRIES, leaveOpen);
929 }
930
931 private void makeRegionDirs(FileSystem fs, List<String> regions) throws IOException {
932 for (String region : regions) {
933 flushToConsole("Creating dir for region " + region);
934 fs.mkdirs(new Path(tabledir, region));
935 }
936 }
937
938 private void generateHLogs(int writers, int entries, int leaveOpen) throws IOException {
939 makeRegionDirs(fs, regions);
940 for (int i = 0; i < writers; i++) {
941 writer[i] = HLog.createWriter(fs, new Path(hlogDir, HLOG_FILE_PREFIX + i), conf);
942 for (int j = 0; j < entries; j++) {
943 int prefix = 0;
944 for (String region : regions) {
945 String row_key = region + prefix++ + i + j;
946 appendEntry(writer[i], TABLE_NAME, region.getBytes(),
947 row_key.getBytes(), FAMILY, QUALIFIER, VALUE, seq);
948 }
949 }
950 if (i != leaveOpen) {
951 writer[i].close();
952 flushToConsole("Closing writer " + i);
953 }
954 }
955 }
956
957 private Path getLogForRegion(Path rootdir, byte[] table, String region)
958 throws IOException {
959 Path tdir = HTableDescriptor.getTableDir(rootdir, table);
960 Path editsdir = HLog.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
961 Bytes.toString(region.getBytes())));
962 FileStatus [] files = this.fs.listStatus(editsdir);
963 assertEquals(1, files.length);
964 return files[0].getPath();
965 }
966
967 private void corruptHLog(Path path, Corruptions corruption, boolean close,
968 FileSystem fs) throws IOException {
969
970 FSDataOutputStream out;
971 int fileSize = (int) fs.listStatus(path)[0].getLen();
972
973 FSDataInputStream in = fs.open(path);
974 byte[] corrupted_bytes = new byte[fileSize];
975 in.readFully(0, corrupted_bytes, 0, fileSize);
976 in.close();
977
978 switch (corruption) {
979 case APPEND_GARBAGE:
980 out = fs.append(path);
981 out.write("-----".getBytes());
982 closeOrFlush(close, out);
983 break;
984
985 case INSERT_GARBAGE_ON_FIRST_LINE:
986 fs.delete(path, false);
987 out = fs.create(path);
988 out.write(0);
989 out.write(corrupted_bytes);
990 closeOrFlush(close, out);
991 break;
992
993 case INSERT_GARBAGE_IN_THE_MIDDLE:
994 fs.delete(path, false);
995 out = fs.create(path);
996 int middle = (int) Math.floor(corrupted_bytes.length / 2);
997 out.write(corrupted_bytes, 0, middle);
998 out.write(0);
999 out.write(corrupted_bytes, middle, corrupted_bytes.length - middle);
1000 closeOrFlush(close, out);
1001 break;
1002
1003 case TRUNCATE:
1004 fs.delete(path, false);
1005 out = fs.create(path);
1006 out.write(corrupted_bytes, 0, fileSize-32);
1007 closeOrFlush(close, out);
1008
1009 break;
1010 }
1011
1012
1013 }
1014
1015 private void closeOrFlush(boolean close, FSDataOutputStream out)
1016 throws IOException {
1017 if (close) {
1018 out.close();
1019 } else {
1020 out.sync();
1021
1022 }
1023 }
1024
1025 @SuppressWarnings("unused")
1026 private void dumpHLog(Path log, FileSystem fs, Configuration conf) throws IOException {
1027 HLog.Entry entry;
1028 HLog.Reader in = HLog.getReader(fs, log, conf);
1029 while ((entry = in.next()) != null) {
1030 System.out.println(entry);
1031 }
1032 }
1033
1034 private int countHLog(Path log, FileSystem fs, Configuration conf) throws IOException {
1035 int count = 0;
1036 HLog.Reader in = HLog.getReader(fs, log, conf);
1037 while (in.next() != null) {
1038 count++;
1039 }
1040 return count;
1041 }
1042
1043
1044 public long appendEntry(HLog.Writer writer, byte[] table, byte[] region,
1045 byte[] row, byte[] family, byte[] qualifier,
1046 byte[] value, long seq)
1047 throws IOException {
1048
1049 writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
1050 writer.sync();
1051 return seq;
1052 }
1053
1054 private HLog.Entry createTestEntry(
1055 byte[] table, byte[] region,
1056 byte[] row, byte[] family, byte[] qualifier,
1057 byte[] value, long seq) {
1058 long time = System.nanoTime();
1059 WALEdit edit = new WALEdit();
1060 seq++;
1061 edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value));
1062 return new HLog.Entry(new HLogKey(region, table, seq, time), edit);
1063 }
1064
1065
1066 private void injectEmptyFile(String suffix, boolean closeFile)
1067 throws IOException {
1068 HLog.Writer writer = HLog.createWriter(
1069 fs, new Path(hlogDir, HLOG_FILE_PREFIX + suffix), conf);
1070 if (closeFile) writer.close();
1071 }
1072
1073 @SuppressWarnings("unused")
1074 private void listLogs(FileSystem fs, Path dir) throws IOException {
1075 for (FileStatus file : fs.listStatus(dir)) {
1076 System.out.println(file.getPath());
1077 }
1078
1079 }
1080
1081 private int compareHLogSplitDirs(Path p1, Path p2) throws IOException {
1082 FileStatus[] f1 = fs.listStatus(p1);
1083 FileStatus[] f2 = fs.listStatus(p2);
1084 assertNotNull("Path " + p1 + " doesn't exist", f1);
1085 assertNotNull("Path " + p2 + " doesn't exist", f2);
1086
1087 System.out.println("Files in " + p1 + ": " +
1088 Joiner.on(",").join(FileUtil.stat2Paths(f1)));
1089 System.out.println("Files in " + p2 + ": " +
1090 Joiner.on(",").join(FileUtil.stat2Paths(f2)));
1091 assertEquals(f1.length, f2.length);
1092
1093 for (int i = 0; i < f1.length; i++) {
1094
1095
1096 Path rd1 = HLog.getRegionDirRecoveredEditsDir(f1[i].getPath());
1097 FileStatus[] rd1fs = fs.listStatus(rd1);
1098 assertEquals(1, rd1fs.length);
1099 Path rd2 = HLog.getRegionDirRecoveredEditsDir(f2[i].getPath());
1100 FileStatus[] rd2fs = fs.listStatus(rd2);
1101 assertEquals(1, rd2fs.length);
1102 if (!logsAreEqual(rd1fs[0].getPath(), rd2fs[0].getPath())) {
1103 return -1;
1104 }
1105 }
1106 return 0;
1107 }
1108
1109 private boolean logsAreEqual(Path p1, Path p2) throws IOException {
1110 HLog.Reader in1, in2;
1111 in1 = HLog.getReader(fs, p1, conf);
1112 in2 = HLog.getReader(fs, p2, conf);
1113 HLog.Entry entry1;
1114 HLog.Entry entry2;
1115 while ((entry1 = in1.next()) != null) {
1116 entry2 = in2.next();
1117 if ((entry1.getKey().compareTo(entry2.getKey()) != 0) ||
1118 (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) {
1119 return false;
1120 }
1121 }
1122 return true;
1123 }
1124 }