1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver.wal;
21  
22  import static org.junit.Assert.*;
23  
24  import java.io.FileNotFoundException;
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.concurrent.atomic.AtomicBoolean;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FSDataInputStream;
37  import org.apache.hadoop.fs.FSDataOutputStream;
38  import org.apache.hadoop.fs.FileStatus;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.FileUtil;
41  import org.apache.hadoop.fs.Path;
42  import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
43  import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
44  import org.apache.hadoop.hbase.HBaseTestingUtility;
45  import org.apache.hadoop.hbase.HConstants;
46  import org.apache.hadoop.hbase.HRegionInfo;
47  import org.apache.hadoop.hbase.HTableDescriptor;
48  import org.apache.hadoop.hbase.KeyValue;
49  import org.apache.hadoop.hbase.regionserver.HRegion;
50  import org.apache.hadoop.hbase.util.Bytes;
51  import org.apache.hadoop.hbase.util.Threads;
52  import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
53  import org.apache.hadoop.ipc.RemoteException;
54  import org.junit.After;
55  import org.junit.AfterClass;
56  import org.junit.Assert;
57  import org.junit.Before;
58  import org.junit.BeforeClass;
59  import org.junit.Test;
60  import org.mockito.Mockito;
61  import org.mockito.invocation.InvocationOnMock;
62  import org.mockito.stubbing.Answer;
63  
64  import com.google.common.base.Joiner;
65  import com.google.common.collect.ImmutableList;
66  
67  /**
68   * Testing {@link HLog} splitting code.
69   */
70  public class TestHLogSplit {
71  
72    private final static Log LOG = LogFactory.getLog(TestHLogSplit.class);
73  
74    private Configuration conf;
75    private FileSystem fs;
76  
77    private final static HBaseTestingUtility
78            TEST_UTIL = new HBaseTestingUtility();
79  
80  
81    private static final Path hbaseDir = new Path("/hbase");
82    private static final Path hlogDir = new Path(hbaseDir, "hlog");
83    private static final Path oldLogDir = new Path(hbaseDir, "hlog.old");
84    private static final Path corruptDir = new Path(hbaseDir, ".corrupt");
85  
86    private static final int NUM_WRITERS = 10;
87    private static final int ENTRIES = 10; // entries per writer per region
88  
89    private HLog.Writer[] writer = new HLog.Writer[NUM_WRITERS];
90    private long seq = 0;
91    private static final byte[] TABLE_NAME = "t1".getBytes();
92    private static final byte[] FAMILY = "f1".getBytes();
93    private static final byte[] QUALIFIER = "q1".getBytes();
94    private static final byte[] VALUE = "v1".getBytes();
95    private static final String HLOG_FILE_PREFIX = "hlog.dat.";
96    private static List<String> regions;
97    private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors";
98    private static final Path tabledir =
99        new Path(hbaseDir, Bytes.toString(TABLE_NAME));
100 
101   static enum Corruptions {
102     INSERT_GARBAGE_ON_FIRST_LINE,
103     INSERT_GARBAGE_IN_THE_MIDDLE,
104     APPEND_GARBAGE,
105     TRUNCATE,
106   }
107 
108   @BeforeClass
109   public static void setUpBeforeClass() throws Exception {
110     TEST_UTIL.getConfiguration().
111             setBoolean("dfs.support.append", true);
112     TEST_UTIL.getConfiguration().
113             setStrings("hbase.rootdir", hbaseDir.toString());
114     TEST_UTIL.getConfiguration().
115             setClass("hbase.regionserver.hlog.writer.impl",
116                 InstrumentedSequenceFileLogWriter.class, HLog.Writer.class);
117 
118     TEST_UTIL.startMiniDFSCluster(2);
119   }
120 
121   @AfterClass
122   public static void tearDownAfterClass() throws Exception {
123     TEST_UTIL.shutdownMiniDFSCluster();
124   }
125 
126   @Before
127   public void setUp() throws Exception {
128     flushToConsole("Cleaning up cluster for new test\n"
129         + "--------------------------");
130     conf = TEST_UTIL.getConfiguration();
131     fs = TEST_UTIL.getDFSCluster().getFileSystem();
132     FileStatus[] entries = fs.listStatus(new Path("/"));
133     flushToConsole("Num entries in /:" + entries.length);
134     for (FileStatus dir : entries){
135       assertTrue("Deleting " + dir.getPath(),
136           fs.delete(dir.getPath(), true));
137     }
138     seq = 0;
139     regions = new ArrayList<String>();
140     Collections.addAll(regions, "bbb", "ccc");
141     InstrumentedSequenceFileLogWriter.activateFailure = false;
142     // Set the soft lease for hdfs to be down from default of 5 minutes or so.
143     TEST_UTIL.setNameNodeNameSystemLeasePeriod(100, 50000);
144   }
145 
146   @After
147   public void tearDown() throws Exception {
148   }
149 
150   /**
151    * @throws IOException 
152    * @see https://issues.apache.org/jira/browse/HBASE-3020
153    */
154   @Test public void testRecoveredEditsPathForMeta() throws IOException {
155     FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
156     byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
157     Path tdir = new Path(hbaseDir, Bytes.toString(HConstants.META_TABLE_NAME));
158     Path regiondir = new Path(tdir,
159         HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
160     fs.mkdirs(regiondir);
161     long now = System.currentTimeMillis();
162     HLog.Entry entry =
163       new HLog.Entry(new HLogKey(encoded, HConstants.META_TABLE_NAME, 1, now),
164       new WALEdit());
165     Path p = HLogSplitter.getRegionSplitEditsPath(fs, entry, hbaseDir);
166     String parentOfParent = p.getParent().getParent().getName();
167     assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
168   }
169 
170   @Test(expected = OrphanHLogAfterSplitException.class)
171   public void testSplitFailsIfNewHLogGetsCreatedAfterSplitStarted()
172   throws IOException {
173     AtomicBoolean stop = new AtomicBoolean(false);
174     
175     FileStatus[] stats = fs.listStatus(new Path("/hbase/t1"));
176     assertTrue("Previous test should clean up table dir",
177         stats == null || stats.length == 0);
178 
179     generateHLogs(-1);
180     
181     try {
182     (new ZombieNewLogWriterRegionServer(stop)).start();
183     HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
184         hbaseDir, hlogDir, oldLogDir, fs);
185     logSplitter.splitLog();
186     } finally {
187       stop.set(true);
188     }
189   }
190 
191   @Test
192   public void testSplitPreservesEdits() throws IOException{
193     final String REGION = "region__1";
194     regions.removeAll(regions);
195     regions.add(REGION);
196 
197     generateHLogs(1, 10, -1);
198     fs.initialize(fs.getUri(), conf);
199     HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
200       hbaseDir, hlogDir, oldLogDir, fs);
201     logSplitter.splitLog();
202 
203     Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
204     Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION);
205 
206     assertEquals("edits differ after split", true, logsAreEqual(originalLog, splitLog));
207   }
208 
209 
210   @Test
211   public void testEmptyLogFiles() throws IOException {
212 
213     injectEmptyFile(".empty", true);
214     generateHLogs(Integer.MAX_VALUE);
215     injectEmptyFile("empty", true);
216 
217     // make fs act as a different client now
218     // initialize will create a new DFSClient with a new client ID
219     fs.initialize(fs.getUri(), conf);
220 
221     HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
222         hbaseDir, hlogDir, oldLogDir, fs);
223     logSplitter.splitLog();
224 
225 
226     for (String region : regions) {
227       Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
228       assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
229     }
230 
231   }
232 
233 
234   @Test
235   public void testEmptyOpenLogFiles() throws IOException {
236     injectEmptyFile(".empty", false);
237     generateHLogs(Integer.MAX_VALUE);
238     injectEmptyFile("empty", false);
239 
240     // make fs act as a different client now
241     // initialize will create a new DFSClient with a new client ID
242     fs.initialize(fs.getUri(), conf);
243 
244     HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
245         hbaseDir, hlogDir, oldLogDir, fs);
246     logSplitter.splitLog();
247 
248     for (String region : regions) {
249       Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
250       assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
251     }
252   }
253 
254   @Test
255   public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException {
256     // generate logs but leave hlog.dat.5 open.
257     generateHLogs(5);
258 
259     fs.initialize(fs.getUri(), conf);
260 
261     HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
262         hbaseDir, hlogDir, oldLogDir, fs);
263     logSplitter.splitLog();
264 
265     for (String region : regions) {
266       Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
267       assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
268     }
269 
270 
271   }
272 
273 
274   @Test
275   public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
276     conf.setBoolean(HBASE_SKIP_ERRORS, true);
277     generateHLogs(Integer.MAX_VALUE);
278     corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
279             Corruptions.APPEND_GARBAGE, true, fs);
280     fs.initialize(fs.getUri(), conf);
281 
282     HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
283         hbaseDir, hlogDir, oldLogDir, fs);
284     logSplitter.splitLog();
285     for (String region : regions) {
286       Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
287       assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
288     }
289 
290 
291   }
292 
293   @Test
294   public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
295     conf.setBoolean(HBASE_SKIP_ERRORS, true);
296     generateHLogs(Integer.MAX_VALUE);
297     corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
298             Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
299     fs.initialize(fs.getUri(), conf);
300 
301     HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
302         hbaseDir, hlogDir, oldLogDir, fs);
303     logSplitter.splitLog();
304     for (String region : regions) {
305       Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
306       assertEquals((NUM_WRITERS - 1) * ENTRIES, countHLog(logfile, fs, conf));
307     }
308 
309 
310   }
311 
312 
313   @Test
314   public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
315     conf.setBoolean(HBASE_SKIP_ERRORS, true);
316     generateHLogs(Integer.MAX_VALUE);
317     corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
318             Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs);
319     fs.initialize(fs.getUri(), conf);
320     HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
321         hbaseDir, hlogDir, oldLogDir, fs);
322     logSplitter.splitLog();
323 
324     for (String region : regions) {
325       Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
326       // the entries in the original logs are alternating regions
327       // considering the sequence file header, the middle corruption should
328       // affect at least half of the entries
329       int goodEntries = (NUM_WRITERS - 1) * ENTRIES;
330       int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1;
331       assertTrue("The file up to the corrupted area hasn't been parsed",
332               goodEntries + firstHalfEntries <= countHLog(logfile, fs, conf));
333     }
334   }
335 
336   @Test
337   public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
338     conf.setBoolean(HBASE_SKIP_ERRORS, true);
339     Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
340         Reader.class);
341     InstrumentedSequenceFileLogWriter.activateFailure = false;
342     HLog.resetLogReaderClass();
343 
344     try {
345     Path c1 = new Path(hlogDir, HLOG_FILE_PREFIX + "0");
346       conf.setClass("hbase.regionserver.hlog.reader.impl",
347           FaultySequenceFileLogReader.class, HLog.Reader.class);
348       for (FaultySequenceFileLogReader.FailureType  failureType : FaultySequenceFileLogReader.FailureType.values()) {
349         conf.set("faultysequencefilelogreader.failuretype", failureType.name());
350         generateHLogs(1, ENTRIES, -1);
351         fs.initialize(fs.getUri(), conf);
352         HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
353             hbaseDir, hlogDir, oldLogDir, fs);
354         logSplitter.splitLog();
355         FileStatus[] archivedLogs = fs.listStatus(corruptDir);
356         assertEquals("expected a different file", c1.getName(), archivedLogs[0]
357             .getPath().getName());
358         assertEquals(archivedLogs.length, 1);
359         fs.delete(new Path(oldLogDir, HLOG_FILE_PREFIX + "0"), false);
360       }
361     } finally {
362       conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
363           Reader.class);
364       HLog.resetLogReaderClass();
365     }
366   }
367 
368   @Test(expected = IOException.class)
369   public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows()
370       throws IOException {
371     conf.setBoolean(HBASE_SKIP_ERRORS, false);
372     Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
373         Reader.class);
374     InstrumentedSequenceFileLogWriter.activateFailure = false;
375     HLog.resetLogReaderClass();
376 
377     try {
378       conf.setClass("hbase.regionserver.hlog.reader.impl",
379           FaultySequenceFileLogReader.class, HLog.Reader.class);
380       conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name());
381       generateHLogs(Integer.MAX_VALUE);
382     fs.initialize(fs.getUri(), conf);
383     HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
384         hbaseDir, hlogDir, oldLogDir, fs);
385     logSplitter.splitLog();
386     } finally {
387       conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
388           Reader.class);
389       HLog.resetLogReaderClass();
390     }
391 
392   }
393 
394   @Test
395   public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs()
396       throws IOException {
397     conf.setBoolean(HBASE_SKIP_ERRORS, false);
398     Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
399         Reader.class);
400     InstrumentedSequenceFileLogWriter.activateFailure = false;
401     HLog.resetLogReaderClass();
402 
403     try {
404       conf.setClass("hbase.regionserver.hlog.reader.impl",
405           FaultySequenceFileLogReader.class, HLog.Reader.class);
406       conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name());
407       generateHLogs(-1);
408       fs.initialize(fs.getUri(), conf);
409       HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
410           hbaseDir, hlogDir, oldLogDir, fs);
411       try {
412         logSplitter.splitLog();
413       } catch (IOException e) {
414         assertEquals(
415             "if skip.errors is false all files should remain in place",
416             NUM_WRITERS, fs.listStatus(hlogDir).length);
417       }
418     } finally {
419       conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
420           Reader.class);
421       HLog.resetLogReaderClass();
422     }
423 
424   }
425 
426   @Test
427   public void testEOFisIgnored() throws IOException {
428     conf.setBoolean(HBASE_SKIP_ERRORS, false);
429 
430     final String REGION = "region__1";
431     regions.removeAll(regions);
432     regions.add(REGION);
433 
434     int entryCount = 10;
435     Path c1 = new Path(hlogDir, HLOG_FILE_PREFIX + "0");
436     generateHLogs(1, entryCount, -1);
437     corruptHLog(c1, Corruptions.TRUNCATE, true, fs);
438 
439     fs.initialize(fs.getUri(), conf);
440     HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
441         hbaseDir, hlogDir, oldLogDir, fs);
442     logSplitter.splitLog();
443 
444     Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
445     Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION);
446 
447     int actualCount = 0;
448     HLog.Reader in = HLog.getReader(fs, splitLog, conf);
449     HLog.Entry entry;
450     while ((entry = in.next()) != null) ++actualCount;
451     assertEquals(entryCount-1, actualCount);
452 
453     // should not have stored the EOF files as corrupt
454     FileStatus[] archivedLogs = fs.listStatus(corruptDir);
455     assertEquals(archivedLogs.length, 0);
456   }
457   
458   @Test
459   public void testLogsGetArchivedAfterSplit() throws IOException {
460     conf.setBoolean(HBASE_SKIP_ERRORS, false);
461 
462     generateHLogs(-1);
463 
464     fs.initialize(fs.getUri(), conf);
465     HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
466         hbaseDir, hlogDir, oldLogDir, fs);
467     logSplitter.splitLog();
468 
469     FileStatus[] archivedLogs = fs.listStatus(oldLogDir);
470 
471     assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
472   }
473 
474   @Test
475   public void testSplit() throws IOException {
476     generateHLogs(-1);
477     fs.initialize(fs.getUri(), conf);
478     HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
479         hbaseDir, hlogDir, oldLogDir, fs);
480     logSplitter.splitLog();
481 
482     for (String region : regions) {
483       Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
484       assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
485 
486     }
487   }
488 
489   @Test
490   public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit()
491   throws IOException {
492     generateHLogs(-1);
493     fs.initialize(fs.getUri(), conf);
494     HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
495         hbaseDir, hlogDir, oldLogDir, fs);
496     logSplitter.splitLog();
497     FileStatus [] statuses = null;
498     try {
499       statuses = fs.listStatus(hlogDir);
500       if (statuses != null) {
501         Assert.fail("Files left in log dir: " +
502             Joiner.on(",").join(FileUtil.stat2Paths(statuses)));
503       }
504     } catch (FileNotFoundException e) {
505       // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
506     }
507   }
508 /* DISABLED for now.  TODO: HBASE-2645 
509   @Test
510   public void testLogCannotBeWrittenOnceParsed() throws IOException {
511     AtomicLong counter = new AtomicLong(0);
512     AtomicBoolean stop = new AtomicBoolean(false);
513     generateHLogs(9);
514     fs.initialize(fs.getUri(), conf);
515 
516     Thread zombie = new ZombieLastLogWriterRegionServer(writer[9], counter, stop);
517 
518 
519 
520     try {
521       zombie.start();
522 
523       HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
524 
525       Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, "juliet");
526 
527       // It's possible that the writer got an error while appending and didn't count it
528       // however the entry will in fact be written to file and split with the rest
529       long numberOfEditsInRegion = countHLog(logfile, fs, conf);
530       assertTrue("The log file could have at most 1 extra log entry, but " +
531               "can't have less. Zombie could write "+counter.get() +" and logfile had only"+ numberOfEditsInRegion+" "  + logfile, counter.get() == numberOfEditsInRegion ||
532                       counter.get() + 1 == numberOfEditsInRegion);
533     } finally {
534       stop.set(true);
535     }
536   }
537 */
538 
539   @Test
540   public void testSplitWillNotTouchLogsIfNewHLogGetsCreatedAfterSplitStarted()
541   throws IOException {
542     AtomicBoolean stop = new AtomicBoolean(false);
543     generateHLogs(-1);
544     fs.initialize(fs.getUri(), conf);
545     Thread zombie = new ZombieNewLogWriterRegionServer(stop);
546     
547     try {
548       zombie.start();
549       try {
550         HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
551             hbaseDir, hlogDir, oldLogDir, fs);
552         logSplitter.splitLog();
553       } catch (IOException ex) {/* expected */}
554       int logFilesNumber = fs.listStatus(hlogDir).length;
555 
556       assertEquals("Log files should not be archived if there's an extra file after split",
557               NUM_WRITERS + 1, logFilesNumber);
558     } finally {
559       stop.set(true);
560     }
561 
562   }
563 
564 
565 
566   @Test(expected = IOException.class)
567   public void testSplitWillFailIfWritingToRegionFails() throws Exception {
568     //leave 5th log open so we could append the "trap"
569     generateHLogs(4);
570 
571     fs.initialize(fs.getUri(), conf);
572 
573     String region = "break";
574     Path regiondir = new Path(tabledir, region);
575     fs.mkdirs(regiondir);
576 
577     InstrumentedSequenceFileLogWriter.activateFailure = false;
578     appendEntry(writer[4], TABLE_NAME, Bytes.toBytes(region),
579         ("r" + 999).getBytes(), FAMILY, QUALIFIER, VALUE, 0);
580     writer[4].close();
581 
582     try {
583       InstrumentedSequenceFileLogWriter.activateFailure = true;
584       HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
585           hbaseDir, hlogDir, oldLogDir, fs);
586       logSplitter.splitLog();
587 
588     } catch (IOException e) {
589       assertEquals("This exception is instrumented and should only be thrown for testing", e.getMessage());
590       throw e;
591     } finally {
592       InstrumentedSequenceFileLogWriter.activateFailure = false;
593     }
594   }
595 
596 
597   // @Test TODO this test has been disabled since it was created!
598   // It currently fails because the second split doesn't output anything
599   // -- because there are no region dirs after we move aside the first
600   // split result
601   public void testSplittingLargeNumberOfRegionsConsistency() throws IOException {
602 
603     regions.removeAll(regions);
604     for (int i=0; i<100; i++) {
605       regions.add("region__"+i);
606     }
607 
608     generateHLogs(1, 100, -1);
609     fs.initialize(fs.getUri(), conf);
610 
611     HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
612         hbaseDir, hlogDir, oldLogDir, fs);
613     logSplitter.splitLog();
614     fs.rename(oldLogDir, hlogDir);
615     Path firstSplitPath = new Path(hbaseDir, Bytes.toString(TABLE_NAME) + ".first");
616     Path splitPath = new Path(hbaseDir, Bytes.toString(TABLE_NAME));
617     fs.rename(splitPath,
618             firstSplitPath);
619 
620 
621     fs.initialize(fs.getUri(), conf);
622     logSplitter = HLogSplitter.createLogSplitter(conf,
623         hbaseDir, hlogDir, oldLogDir, fs);
624     logSplitter.splitLog();
625 
626     assertEquals(0, compareHLogSplitDirs(firstSplitPath, splitPath));
627   }
628 
629   @Test
630   public void testSplitDeletedRegion() throws IOException {
631     regions.removeAll(regions);
632     String region = "region_that_splits";
633     regions.add(region);
634 
635     generateHLogs(1);
636 
637     fs.initialize(fs.getUri(), conf);
638 
639     Path regiondir = new Path(tabledir, region);
640     fs.delete(regiondir, true);
641 
642     HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
643         hbaseDir, hlogDir, oldLogDir, fs);
644     logSplitter.splitLog();
645     
646     assertFalse(fs.exists(regiondir));
647   }
648   
649   @Test
650   public void testIOEOnOutputThread() throws Exception {
651     conf.setBoolean(HBASE_SKIP_ERRORS, false);
652 
653     generateHLogs(-1);
654 
655     fs.initialize(fs.getUri(), conf);
656     // Set up a splitter that will throw an IOE on the output side
657     HLogSplitter logSplitter = new HLogSplitter(
658         conf, hbaseDir, hlogDir, oldLogDir, fs) {
659       protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
660       throws IOException {
661         HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
662         Mockito.doThrow(new IOException("Injected")).when(mockWriter).append(Mockito.<HLog.Entry>any());
663         return mockWriter;
664         
665       }
666     };
667     try {
668       logSplitter.splitLog();
669       fail("Didn't throw!");
670     } catch (IOException ioe) {
671       assertTrue(ioe.toString().contains("Injected"));
672     }
673   }
674 
675   // Test for HBASE-3412
676   @Test
677   public void testMovedHLogDuringRecovery() throws Exception {
678     generateHLogs(-1);
679 
680     fs.initialize(fs.getUri(), conf);
681 
682     // This partial mock will throw LEE for every file simulating
683     // files that were moved
684     FileSystem spiedFs = Mockito.spy(fs);
685     // The "File does not exist" part is very important,
686     // that's how it comes out of HDFS
687     Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).
688         when(spiedFs).append(Mockito.<Path>any());
689 
690     HLogSplitter logSplitter = new HLogSplitter(
691         conf, hbaseDir, hlogDir, oldLogDir, spiedFs);
692 
693     try {
694       logSplitter.splitLog();
695       assertEquals(NUM_WRITERS, fs.listStatus(oldLogDir).length);
696       assertFalse(fs.exists(hlogDir));
697     } catch (IOException e) {
698       fail("There shouldn't be any exception but: " + e.toString());
699     }
700   }
701   
702   /**
703    * Test log split process with fake data and lots of edits to trigger threading
704    * issues.
705    */
706   @Test
707   public void testThreading() throws Exception {
708     doTestThreading(20000, 128*1024*1024, 0);
709   }
710   
711   /**
712    * Test blocking behavior of the log split process if writers are writing slower
713    * than the reader is reading.
714    */
715   @Test
716   public void testThreadingSlowWriterSmallBuffer() throws Exception {
717     doTestThreading(200, 1024, 50);
718   }
719   
720   /**
721    * Sets up a log splitter with a mock reader and writer. The mock reader generates
722    * a specified number of edits spread across 5 regions. The mock writer optionally
723    * sleeps for each edit it is fed.
724    * *
725    * After the split is complete, verifies that the statistics show the correct number
726    * of edits output into each region.
727    * 
728    * @param numFakeEdits number of fake edits to push through pipeline
729    * @param bufferSize size of in-memory buffer
730    * @param writerSlowness writer threads will sleep this many ms per edit
731    */
732   private void doTestThreading(final int numFakeEdits,
733       final int bufferSize,
734       final int writerSlowness) throws Exception {
735 
736     Configuration localConf = new Configuration(conf);
737     localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize);
738 
739     // Create a fake log file (we'll override the reader to produce a stream of edits)
740     FSDataOutputStream out = fs.create(new Path(hlogDir, HLOG_FILE_PREFIX + ".fake"));
741     out.close();
742 
743     // Make region dirs for our destination regions so the output doesn't get skipped
744     final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4"); 
745     makeRegionDirs(fs, regions);
746 
747     // Create a splitter that reads and writes the data without touching disk
748     HLogSplitter logSplitter = new HLogSplitter(
749         localConf, hbaseDir, hlogDir, oldLogDir, fs) {
750       
751       /* Produce a mock writer that doesn't write anywhere */
752       protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
753       throws IOException {
754         HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
755         Mockito.doAnswer(new Answer<Void>() {
756           int expectedIndex = 0;
757           
758           @Override
759           public Void answer(InvocationOnMock invocation) {
760             if (writerSlowness > 0) {
761               try {
762                 Thread.sleep(writerSlowness);
763               } catch (InterruptedException ie) {
764                 Thread.currentThread().interrupt();
765               }
766             }
767             HLog.Entry entry = (Entry) invocation.getArguments()[0];
768             WALEdit edit = entry.getEdit();
769             List<KeyValue> keyValues = edit.getKeyValues();
770             assertEquals(1, keyValues.size());
771             KeyValue kv = keyValues.get(0);
772             
773             // Check that the edits come in the right order.
774             assertEquals(expectedIndex, Bytes.toInt(kv.getRow()));
775             expectedIndex++;
776             return null;
777           }
778         }).when(mockWriter).append(Mockito.<HLog.Entry>any());
779         return mockWriter;        
780       }
781       
782       
783       /* Produce a mock reader that generates fake entries */
784       protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf)
785       throws IOException {
786         Reader mockReader = Mockito.mock(Reader.class);
787         Mockito.doAnswer(new Answer<HLog.Entry>() {
788           int index = 0;
789 
790           @Override
791           public HLog.Entry answer(InvocationOnMock invocation) throws Throwable {
792             if (index >= numFakeEdits) return null;
793            
794             // Generate r0 through r4 in round robin fashion
795             int regionIdx = index % regions.size();
796             byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)};
797             
798             HLog.Entry ret = createTestEntry(TABLE_NAME, region,
799                 Bytes.toBytes((int)(index / regions.size())),
800                 FAMILY, QUALIFIER, VALUE, index);
801             index++;
802             return ret;
803           }
804         }).when(mockReader).next();
805         return mockReader;
806       }
807     };
808     
809     logSplitter.splitLog();
810     
811     // Verify number of written edits per region
812 
813     Map<byte[], Long> outputCounts = logSplitter.getOutputCounts();
814     for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) {
815       LOG.info("Got " + entry.getValue() + " output edits for region " + 
816           Bytes.toString(entry.getKey()));
817       
818       assertEquals((long)entry.getValue(), numFakeEdits / regions.size());
819     }
820     assertEquals(regions.size(), outputCounts.size());
821   }
822   
823   
824 
825   /**
826    * This thread will keep writing to the file after the split process has started
827    * It simulates a region server that was considered dead but woke up and wrote
828    * some more to he last log entry
829    */
830   class ZombieLastLogWriterRegionServer extends Thread {
831     AtomicLong editsCount;
832     AtomicBoolean stop;
833     Path log;
834     HLog.Writer lastLogWriter;
835     public ZombieLastLogWriterRegionServer(HLog.Writer writer, AtomicLong counter, AtomicBoolean stop) {
836       this.stop = stop;
837       this.editsCount = counter;
838       this.lastLogWriter = writer;
839     }
840 
841     @Override
842     public void run() {
843       if (stop.get()){
844         return;
845       }
846       flushToConsole("starting");
847       while (true) {
848         try {
849           String region = "juliet";
850           
851           fs.mkdirs(new Path(new Path(hbaseDir, region), region));
852           appendEntry(lastLogWriter, TABLE_NAME, region.getBytes(),
853                   ("r" + editsCount).getBytes(), FAMILY, QUALIFIER, VALUE, 0);
854           lastLogWriter.sync();
855           editsCount.incrementAndGet();
856           try {
857             Thread.sleep(1);
858           } catch (InterruptedException e) {
859             //
860           }
861 
862 
863         } catch (IOException ex) {
864           if (ex instanceof RemoteException) {
865             flushToConsole("Juliet: got RemoteException " +
866                     ex.getMessage() + " while writing " + (editsCount.get() + 1));
867             break;
868           } else {
869             assertTrue("Failed to write " + editsCount.get(), false);
870           }
871 
872         }
873       }
874 
875 
876     }
877   }
878 
879   /**
880    * This thread will keep adding new log files
881    * It simulates a region server that was considered dead but woke up and wrote
882    * some more to a new hlog
883    */
884   class ZombieNewLogWriterRegionServer extends Thread {
885     AtomicBoolean stop;
886     public ZombieNewLogWriterRegionServer(AtomicBoolean stop) {
887       super("ZombieNewLogWriterRegionServer");
888       this.stop = stop;
889     }
890 
891     @Override
892     public void run() {
893       if (stop.get()) {
894         return;
895       }
896       Path tableDir = new Path(hbaseDir, new String(TABLE_NAME));
897       Path regionDir = new Path(tableDir, regions.get(0));      
898       Path recoveredEdits = new Path(regionDir, HLogSplitter.RECOVERED_EDITS);
899       String region = "juliet";
900       Path julietLog = new Path(hlogDir, HLOG_FILE_PREFIX + ".juliet");
901       try {
902 
903         while (!fs.exists(recoveredEdits) && !stop.get()) {
904           flushToConsole("Juliet: split not started, sleeping a bit...");
905           Threads.sleep(10);
906         }
907 
908         fs.mkdirs(new Path(tableDir, region));
909         HLog.Writer writer = HLog.createWriter(fs,
910                 julietLog, conf);
911         appendEntry(writer, "juliet".getBytes(), ("juliet").getBytes(),
912                 ("r").getBytes(), FAMILY, QUALIFIER, VALUE, 0);
913         writer.close();
914         flushToConsole("Juliet file creator: created file " + julietLog);
915       } catch (IOException e1) {
916         assertTrue("Failed to create file " + julietLog, false);
917       }
918     }
919   }
920 
921   private void flushToConsole(String s) {
922     System.out.println(s);
923     System.out.flush();
924   }
925 
926 
927   private void generateHLogs(int leaveOpen) throws IOException {
928     generateHLogs(NUM_WRITERS, ENTRIES, leaveOpen);
929   }
930 
931   private void makeRegionDirs(FileSystem fs, List<String> regions) throws IOException {
932     for (String region : regions) {
933       flushToConsole("Creating dir for region " + region);
934       fs.mkdirs(new Path(tabledir, region));
935     }
936   }
937   
938   private void generateHLogs(int writers, int entries, int leaveOpen) throws IOException {
939     makeRegionDirs(fs, regions);
940     for (int i = 0; i < writers; i++) {
941       writer[i] = HLog.createWriter(fs, new Path(hlogDir, HLOG_FILE_PREFIX + i), conf);
942       for (int j = 0; j < entries; j++) {
943         int prefix = 0;
944         for (String region : regions) {
945           String row_key = region + prefix++ + i + j;
946           appendEntry(writer[i], TABLE_NAME, region.getBytes(),
947                   row_key.getBytes(), FAMILY, QUALIFIER, VALUE, seq);
948         }
949       }
950       if (i != leaveOpen) {
951         writer[i].close();
952         flushToConsole("Closing writer " + i);
953       }
954     }
955   }
956 
957   private Path getLogForRegion(Path rootdir, byte[] table, String region)
958   throws IOException {
959     Path tdir = HTableDescriptor.getTableDir(rootdir, table);
960     Path editsdir = HLog.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
961       Bytes.toString(region.getBytes())));
962     FileStatus [] files = this.fs.listStatus(editsdir);
963     assertEquals(1, files.length);
964     return files[0].getPath();
965   }
966 
967   private void corruptHLog(Path path, Corruptions corruption, boolean close,
968                            FileSystem fs) throws IOException {
969 
970     FSDataOutputStream out;
971     int fileSize = (int) fs.listStatus(path)[0].getLen();
972 
973     FSDataInputStream in = fs.open(path);
974     byte[] corrupted_bytes = new byte[fileSize];
975     in.readFully(0, corrupted_bytes, 0, fileSize);
976     in.close();
977 
978     switch (corruption) {
979       case APPEND_GARBAGE:
980         out = fs.append(path);
981         out.write("-----".getBytes());
982         closeOrFlush(close, out);
983         break;
984 
985       case INSERT_GARBAGE_ON_FIRST_LINE:
986         fs.delete(path, false);
987         out = fs.create(path);
988         out.write(0);
989         out.write(corrupted_bytes);
990         closeOrFlush(close, out);
991         break;
992 
993       case INSERT_GARBAGE_IN_THE_MIDDLE:
994         fs.delete(path, false);
995         out = fs.create(path);
996         int middle = (int) Math.floor(corrupted_bytes.length / 2);
997         out.write(corrupted_bytes, 0, middle);
998         out.write(0);
999         out.write(corrupted_bytes, middle, corrupted_bytes.length - middle);
1000         closeOrFlush(close, out);
1001         break;
1002         
1003       case TRUNCATE:
1004         fs.delete(path, false);
1005         out = fs.create(path);
1006         out.write(corrupted_bytes, 0, fileSize-32);
1007         closeOrFlush(close, out);
1008         
1009         break;
1010     }
1011 
1012 
1013   }
1014 
1015   private void closeOrFlush(boolean close, FSDataOutputStream out)
1016   throws IOException {
1017     if (close) {
1018       out.close();
1019     } else {
1020       out.sync();
1021       // Not in 0out.hflush();
1022     }
1023   }
1024 
1025   @SuppressWarnings("unused")
1026   private void dumpHLog(Path log, FileSystem fs, Configuration conf) throws IOException {
1027     HLog.Entry entry;
1028     HLog.Reader in = HLog.getReader(fs, log, conf);
1029     while ((entry = in.next()) != null) {
1030       System.out.println(entry);
1031     }
1032   }
1033 
1034   private int countHLog(Path log, FileSystem fs, Configuration conf) throws IOException {
1035     int count = 0;
1036     HLog.Reader in = HLog.getReader(fs, log, conf);
1037     while (in.next() != null) {
1038       count++;
1039     }
1040     return count;
1041   }
1042 
1043 
1044   public long appendEntry(HLog.Writer writer, byte[] table, byte[] region,
1045                           byte[] row, byte[] family, byte[] qualifier,
1046                           byte[] value, long seq)
1047           throws IOException {
1048 
1049     writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
1050     writer.sync();
1051     return seq;
1052   }
1053   
1054   private HLog.Entry createTestEntry(
1055       byte[] table, byte[] region,
1056       byte[] row, byte[] family, byte[] qualifier,
1057       byte[] value, long seq) {
1058     long time = System.nanoTime();
1059     WALEdit edit = new WALEdit();
1060     seq++;
1061     edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value));
1062     return new HLog.Entry(new HLogKey(region, table, seq, time), edit);
1063   }
1064 
1065 
1066   private void injectEmptyFile(String suffix, boolean closeFile)
1067           throws IOException {
1068     HLog.Writer writer = HLog.createWriter(
1069             fs, new Path(hlogDir, HLOG_FILE_PREFIX + suffix), conf);
1070     if (closeFile) writer.close();
1071   }
1072 
1073   @SuppressWarnings("unused")
1074   private void listLogs(FileSystem fs, Path dir) throws IOException {
1075     for (FileStatus file : fs.listStatus(dir)) {
1076       System.out.println(file.getPath());
1077     }
1078 
1079   }
1080 
1081   private int compareHLogSplitDirs(Path p1, Path p2) throws IOException {
1082     FileStatus[] f1 = fs.listStatus(p1);
1083     FileStatus[] f2 = fs.listStatus(p2);
1084     assertNotNull("Path " + p1 + " doesn't exist", f1);
1085     assertNotNull("Path " + p2 + " doesn't exist", f2);
1086     
1087     System.out.println("Files in " + p1 + ": " +
1088         Joiner.on(",").join(FileUtil.stat2Paths(f1)));
1089     System.out.println("Files in " + p2 + ": " +
1090         Joiner.on(",").join(FileUtil.stat2Paths(f2)));
1091     assertEquals(f1.length, f2.length);
1092 
1093     for (int i = 0; i < f1.length; i++) {
1094       // Regions now have a directory named RECOVERED_EDITS_DIR and in here
1095       // are split edit files. In below presume only 1.
1096       Path rd1 = HLog.getRegionDirRecoveredEditsDir(f1[i].getPath());
1097       FileStatus[] rd1fs = fs.listStatus(rd1);
1098       assertEquals(1, rd1fs.length);
1099       Path rd2 = HLog.getRegionDirRecoveredEditsDir(f2[i].getPath());
1100       FileStatus[] rd2fs = fs.listStatus(rd2);
1101       assertEquals(1, rd2fs.length);
1102       if (!logsAreEqual(rd1fs[0].getPath(), rd2fs[0].getPath())) {
1103         return -1;
1104       }
1105     }
1106     return 0;
1107   }
1108 
1109   private boolean logsAreEqual(Path p1, Path p2) throws IOException {
1110     HLog.Reader in1, in2;
1111     in1 = HLog.getReader(fs, p1, conf);
1112     in2 = HLog.getReader(fs, p2, conf);
1113     HLog.Entry entry1;
1114     HLog.Entry entry2;
1115     while ((entry1 = in1.next()) != null) {
1116       entry2 = in2.next();
1117       if ((entry1.getKey().compareTo(entry2.getKey()) != 0) ||
1118               (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) {
1119         return false;
1120       }
1121     }
1122     return true;
1123   }
1124 }