1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver.wal;
21
22 import static org.apache.hadoop.hbase.util.FSUtils.recoverFileLease;
23
24 import java.io.EOFException;
25 import java.io.FileNotFoundException;
26 import java.io.IOException;
27 import java.lang.reflect.Constructor;
28 import java.lang.reflect.InvocationTargetException;
29 import java.text.ParseException;
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.LinkedList;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.TreeMap;
37 import java.util.TreeSet;
38 import java.util.concurrent.atomic.AtomicReference;
39
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.hadoop.conf.Configuration;
43 import org.apache.hadoop.fs.FileStatus;
44 import org.apache.hadoop.fs.FileSystem;
45 import org.apache.hadoop.fs.Path;
46 import org.apache.hadoop.hbase.io.HeapSize;
47 import org.apache.hadoop.hbase.HConstants;
48 import org.apache.hadoop.hbase.HTableDescriptor;
49 import org.apache.hadoop.hbase.RemoteExceptionHandler;
50 import org.apache.hadoop.hbase.regionserver.HRegion;
51 import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
52 import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
53 import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
54 import org.apache.hadoop.hbase.util.Bytes;
55 import org.apache.hadoop.hbase.util.ClassSize;
56 import org.apache.hadoop.io.MultipleIOException;
57
58 import com.google.common.base.Preconditions;
59 import com.google.common.collect.Lists;
60
61
62
63
64
65
66 public class HLogSplitter {
67
68 private static final String LOG_SPLITTER_IMPL = "hbase.hlog.splitter.impl";
69
70
71
72
73
74 public static final String RECOVERED_EDITS = "recovered.edits";
75
76
77 static final Log LOG = LogFactory.getLog(HLogSplitter.class);
78
79 private boolean hasSplit = false;
80 private long splitTime = 0;
81 private long splitSize = 0;
82
83
84
85 protected final Path rootDir;
86 protected final Path srcDir;
87 protected final Path oldLogDir;
88 protected final FileSystem fs;
89 protected final Configuration conf;
90
91
92
93 OutputSink outputSink;
94 EntryBuffers entryBuffers;
95
96
97
98 protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
99
100
101
102 Object dataAvailable = new Object();
103
104
105
106
107
108
109
110
111
112
113
114
115
116 public static HLogSplitter createLogSplitter(Configuration conf,
117 final Path rootDir, final Path srcDir,
118 Path oldLogDir, final FileSystem fs) {
119
120 @SuppressWarnings("unchecked")
121 Class<? extends HLogSplitter> splitterClass = (Class<? extends HLogSplitter>) conf
122 .getClass(LOG_SPLITTER_IMPL, HLogSplitter.class);
123 try {
124 Constructor<? extends HLogSplitter> constructor =
125 splitterClass.getConstructor(
126 Configuration.class,
127 Path.class,
128 Path.class,
129 Path.class,
130 FileSystem.class);
131 return constructor.newInstance(conf, rootDir, srcDir, oldLogDir, fs);
132 } catch (IllegalArgumentException e) {
133 throw new RuntimeException(e);
134 } catch (InstantiationException e) {
135 throw new RuntimeException(e);
136 } catch (IllegalAccessException e) {
137 throw new RuntimeException(e);
138 } catch (InvocationTargetException e) {
139 throw new RuntimeException(e);
140 } catch (SecurityException e) {
141 throw new RuntimeException(e);
142 } catch (NoSuchMethodException e) {
143 throw new RuntimeException(e);
144 }
145 }
146
147 public HLogSplitter(Configuration conf, Path rootDir, Path srcDir,
148 Path oldLogDir, FileSystem fs) {
149 this.conf = conf;
150 this.rootDir = rootDir;
151 this.srcDir = srcDir;
152 this.oldLogDir = oldLogDir;
153 this.fs = fs;
154
155 entryBuffers = new EntryBuffers(
156 conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
157 128*1024*1024));
158 outputSink = new OutputSink();
159 }
160
161
162
163
164
165
166
167
168
169 public List<Path> splitLog()
170 throws IOException {
171 Preconditions.checkState(!hasSplit,
172 "An HLogSplitter instance may only be used once");
173 hasSplit = true;
174
175 long startTime = System.currentTimeMillis();
176 List<Path> splits = null;
177 if (!fs.exists(srcDir)) {
178
179 return splits;
180 }
181 FileStatus[] logfiles = fs.listStatus(srcDir);
182 if (logfiles == null || logfiles.length == 0) {
183
184 return splits;
185 }
186 LOG.info("Splitting " + logfiles.length + " hlog(s) in "
187 + srcDir.toString());
188 splits = splitLog(logfiles);
189
190 splitTime = System.currentTimeMillis() - startTime;
191 LOG.info("hlog file splitting completed in " + splitTime +
192 " ms for " + srcDir.toString());
193 return splits;
194 }
195
196
197
198
199 public long getTime() {
200 return this.splitTime;
201 }
202
203
204
205
206 public long getSize() {
207 return this.splitSize;
208 }
209
210
211
212
213
214 Map<byte[], Long> getOutputCounts() {
215 Preconditions.checkState(hasSplit);
216 return outputSink.getOutputCounts();
217 }
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241 private List<Path> splitLog(final FileStatus[] logfiles) throws IOException {
242 List<Path> processedLogs = new ArrayList<Path>();
243 List<Path> corruptedLogs = new ArrayList<Path>();
244 List<Path> splits = null;
245
246 boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", true);
247
248 splitSize = 0;
249
250 outputSink.startWriterThreads(entryBuffers);
251
252 try {
253 int i = 0;
254 for (FileStatus log : logfiles) {
255 Path logPath = log.getPath();
256 long logLength = log.getLen();
257 splitSize += logLength;
258 LOG.debug("Splitting hlog " + (i++ + 1) + " of " + logfiles.length
259 + ": " + logPath + ", length=" + logLength);
260 try {
261 recoverFileLease(fs, logPath, conf);
262 parseHLog(log, entryBuffers, fs, conf);
263 processedLogs.add(logPath);
264 } catch (EOFException eof) {
265
266 LOG.info("EOF from hlog " + logPath + ". Continuing");
267 processedLogs.add(logPath);
268 } catch (FileNotFoundException fnfe) {
269
270
271 LOG.info("A log was missing " + logPath +
272 ", probably because it was moved by the" +
273 " now dead region server. Continuing");
274 processedLogs.add(logPath);
275 } catch (IOException e) {
276
277
278 if (e.getCause() instanceof ParseException) {
279 LOG.warn("Parse exception from hlog " + logPath + ". continuing", e);
280 processedLogs.add(logPath);
281 } else {
282 if (skipErrors) {
283 LOG.info("Got while parsing hlog " + logPath +
284 ". Marking as corrupted", e);
285 corruptedLogs.add(logPath);
286 } else {
287 throw e;
288 }
289 }
290 }
291 }
292 if (fs.listStatus(srcDir).length > processedLogs.size()
293 + corruptedLogs.size()) {
294 throw new OrphanHLogAfterSplitException(
295 "Discovered orphan hlog after split. Maybe the "
296 + "HRegionServer was not dead when we started");
297 }
298 archiveLogs(srcDir, corruptedLogs, processedLogs, oldLogDir, fs, conf);
299 } finally {
300 splits = outputSink.finishWritingAndClose();
301 }
302 return splits;
303 }
304
305
306
307
308
309
310
311
312
313
314
315
316
317 private static void archiveLogs(
318 final Path srcDir,
319 final List<Path> corruptedLogs,
320 final List<Path> processedLogs, final Path oldLogDir,
321 final FileSystem fs, final Configuration conf) throws IOException {
322 final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get(
323 "hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"));
324
325 if (!fs.mkdirs(corruptDir)) {
326 LOG.info("Unable to mkdir " + corruptDir);
327 }
328 fs.mkdirs(oldLogDir);
329
330 for (Path corrupted : corruptedLogs) {
331 Path p = new Path(corruptDir, corrupted.getName());
332 if (!fs.rename(corrupted, p)) {
333 LOG.info("Unable to move corrupted log " + corrupted + " to " + p);
334 } else {
335 LOG.info("Moving corrupted log " + corrupted + " to " + p);
336 }
337 }
338
339 for (Path p : processedLogs) {
340 Path newPath = HLog.getHLogArchivePath(oldLogDir, p);
341 if (!fs.rename(p, newPath)) {
342 LOG.info("Unable to move " + p + " to " + newPath);
343 } else {
344 LOG.info("Archived processed log " + p + " to " + newPath);
345 }
346 }
347
348 if (!fs.delete(srcDir, true)) {
349 throw new IOException("Unable to delete src dir: " + srcDir);
350 }
351 }
352
353
354
355
356
357
358
359
360
361
362
363
364
365 static Path getRegionSplitEditsPath(final FileSystem fs,
366 final Entry logEntry, final Path rootDir) throws IOException {
367 Path tableDir = HTableDescriptor.getTableDir(rootDir, logEntry.getKey()
368 .getTablename());
369 Path regiondir = HRegion.getRegionDir(tableDir,
370 Bytes.toString(logEntry.getKey().getEncodedRegionName()));
371 if (!fs.exists(regiondir)) {
372 LOG.info("This region's directory doesn't exist: "
373 + regiondir.toString() + ". It is very likely that it was" +
374 " already split so it's safe to discard those edits.");
375 return null;
376 }
377 Path dir = HLog.getRegionDirRecoveredEditsDir(regiondir);
378 if (!fs.exists(dir)) {
379 if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
380 }
381 return new Path(dir, formatRecoveredEditsFileName(logEntry.getKey()
382 .getLogSeqNum()));
383 }
384
385 static String formatRecoveredEditsFileName(final long seqid) {
386 return String.format("%019d", seqid);
387 }
388
389
390
391
392
393
394
395
396
397
398
399 private void parseHLog(final FileStatus logfile,
400 EntryBuffers entryBuffers, final FileSystem fs,
401 final Configuration conf)
402 throws IOException {
403
404
405
406 long length = logfile.getLen();
407 if (length <= 0) {
408 LOG.warn("File " + logfile.getPath() + " might be still open, length is 0");
409 }
410 Path path = logfile.getPath();
411 Reader in;
412 int editsCount = 0;
413 try {
414 in = getReader(fs, path, conf);
415 } catch (EOFException e) {
416 if (length <= 0) {
417
418
419
420
421 LOG.warn("Could not open " + path + " for reading. File is empty" + e);
422 return;
423 } else {
424 throw e;
425 }
426 }
427 try {
428 Entry entry;
429 while ((entry = in.next()) != null) {
430 entryBuffers.appendEntry(entry);
431 editsCount++;
432 }
433 } catch (InterruptedException ie) {
434 throw new RuntimeException(ie);
435 } finally {
436 LOG.debug("Pushed=" + editsCount + " entries from " + path);
437 try {
438 if (in != null) {
439 in.close();
440 }
441 } catch (IOException e) {
442 LOG.warn("Close log reader in finally threw exception -- continuing",
443 e);
444 }
445 }
446 }
447
448 private void writerThreadError(Throwable t) {
449 thrown.compareAndSet(null, t);
450 }
451
452
453
454
455 private void checkForErrors() throws IOException {
456 Throwable thrown = this.thrown.get();
457 if (thrown == null) return;
458 if (thrown instanceof IOException) {
459 throw (IOException)thrown;
460 } else {
461 throw new RuntimeException(thrown);
462 }
463 }
464
465
466
467 protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
468 throws IOException {
469 return HLog.createWriter(fs, logfile, conf);
470 }
471
472
473
474
475 protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf)
476 throws IOException {
477 return HLog.getReader(fs, curLogFile, conf);
478 }
479
480
481
482
483
484
485
486
487
488 class EntryBuffers {
489 Map<byte[], RegionEntryBuffer> buffers =
490 new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
491
492
493
494
495 Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
496
497 long totalBuffered = 0;
498 long maxHeapUsage;
499
500 EntryBuffers(long maxHeapUsage) {
501 this.maxHeapUsage = maxHeapUsage;
502 }
503
504
505
506
507
508
509
510
511 void appendEntry(Entry entry) throws InterruptedException, IOException {
512 HLogKey key = entry.getKey();
513
514 RegionEntryBuffer buffer;
515 long incrHeap;
516 synchronized (this) {
517 buffer = buffers.get(key.getEncodedRegionName());
518 if (buffer == null) {
519 buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName());
520 buffers.put(key.getEncodedRegionName(), buffer);
521 }
522 incrHeap= buffer.appendEntry(entry);
523 }
524
525
526 synchronized (dataAvailable) {
527 totalBuffered += incrHeap;
528 while (totalBuffered > maxHeapUsage && thrown.get() == null) {
529 LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
530 dataAvailable.wait(3000);
531 }
532 dataAvailable.notifyAll();
533 }
534 checkForErrors();
535 }
536
537 synchronized RegionEntryBuffer getChunkToWrite() {
538 long biggestSize=0;
539 byte[] biggestBufferKey=null;
540
541 for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
542 long size = entry.getValue().heapSize();
543 if (size > biggestSize && !currentlyWriting.contains(entry.getKey())) {
544 biggestSize = size;
545 biggestBufferKey = entry.getKey();
546 }
547 }
548 if (biggestBufferKey == null) {
549 return null;
550 }
551
552 RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
553 currentlyWriting.add(biggestBufferKey);
554 return buffer;
555 }
556
557 void doneWriting(RegionEntryBuffer buffer) {
558 synchronized (this) {
559 boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
560 assert removed;
561 }
562 long size = buffer.heapSize();
563
564 synchronized (dataAvailable) {
565 totalBuffered -= size;
566
567 dataAvailable.notifyAll();
568 }
569 }
570
571 synchronized boolean isRegionCurrentlyWriting(byte[] region) {
572 return currentlyWriting.contains(region);
573 }
574 }
575
576
577
578
579
580
581
582 static class RegionEntryBuffer implements HeapSize {
583 long heapInBuffer = 0;
584 List<Entry> entryBuffer;
585 byte[] tableName;
586 byte[] encodedRegionName;
587
588 RegionEntryBuffer(byte[] table, byte[] region) {
589 this.tableName = table;
590 this.encodedRegionName = region;
591 this.entryBuffer = new LinkedList<Entry>();
592 }
593
594 long appendEntry(Entry entry) {
595 internify(entry);
596 entryBuffer.add(entry);
597 long incrHeap = entry.getEdit().heapSize() +
598 ClassSize.align(2 * ClassSize.REFERENCE) +
599 0;
600 heapInBuffer += incrHeap;
601 return incrHeap;
602 }
603
604 private void internify(Entry entry) {
605 HLogKey k = entry.getKey();
606 k.internTableName(this.tableName);
607 k.internEncodedRegionName(this.encodedRegionName);
608 }
609
610 public long heapSize() {
611 return heapInBuffer;
612 }
613 }
614
615
616 class WriterThread extends Thread {
617 private volatile boolean shouldStop = false;
618
619 WriterThread(int i) {
620 super("WriterThread-" + i);
621 }
622
623 public void run() {
624 try {
625 doRun();
626 } catch (Throwable t) {
627 LOG.error("Error in log splitting write thread", t);
628 writerThreadError(t);
629 }
630 }
631
632 private void doRun() throws IOException {
633 LOG.debug("Writer thread " + this + ": starting");
634 while (true) {
635 RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
636 if (buffer == null) {
637
638 synchronized (dataAvailable) {
639 if (shouldStop) return;
640 try {
641 dataAvailable.wait(1000);
642 } catch (InterruptedException ie) {
643 if (!shouldStop) {
644 throw new RuntimeException(ie);
645 }
646 }
647 }
648 continue;
649 }
650
651 assert buffer != null;
652 try {
653 writeBuffer(buffer);
654 } finally {
655 entryBuffers.doneWriting(buffer);
656 }
657 }
658 }
659
660 private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
661 List<Entry> entries = buffer.entryBuffer;
662 if (entries.isEmpty()) {
663 LOG.warn(this.getName() + " got an empty buffer, skipping");
664 return;
665 }
666
667 WriterAndPath wap = null;
668
669 long startTime = System.nanoTime();
670 try {
671 int editsCount = 0;
672
673 for (Entry logEntry : entries) {
674 if (wap == null) {
675 wap = outputSink.getWriterAndPath(logEntry);
676 if (wap == null) {
677
678
679 return;
680 }
681 }
682 wap.w.append(logEntry);
683 editsCount++;
684 }
685
686 wap.incrementEdits(editsCount);
687 wap.incrementNanoTime(System.nanoTime() - startTime);
688 } catch (IOException e) {
689 e = RemoteExceptionHandler.checkIOException(e);
690 LOG.fatal(this.getName() + " Got while writing log entry to log", e);
691 throw e;
692 }
693 }
694
695 void finish() {
696 shouldStop = true;
697 }
698 }
699
700
701
702
703 class OutputSink {
704 private final Map<byte[], WriterAndPath> logWriters = Collections.synchronizedMap(
705 new TreeMap<byte[], WriterAndPath>(Bytes.BYTES_COMPARATOR));
706 private final List<WriterThread> writerThreads = Lists.newArrayList();
707
708
709 private final Set<byte[]> blacklistedRegions = Collections.synchronizedSet(
710 new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
711
712 private boolean hasClosed = false;
713
714
715
716
717
718
719 synchronized void startWriterThreads(EntryBuffers entryBuffers) {
720
721
722
723
724
725 int numThreads = conf.getInt(
726 "hbase.regionserver.hlog.splitlog.writer.threads", 3);
727
728 for (int i = 0; i < numThreads; i++) {
729 WriterThread t = new WriterThread(i);
730 t.start();
731 writerThreads.add(t);
732 }
733 }
734
735 List<Path> finishWritingAndClose() throws IOException {
736 LOG.info("Waiting for split writer threads to finish");
737 for (WriterThread t : writerThreads) {
738 t.finish();
739 }
740 for (WriterThread t: writerThreads) {
741 try {
742 t.join();
743 } catch (InterruptedException ie) {
744 throw new IOException(ie);
745 }
746 checkForErrors();
747 }
748 LOG.info("Split writers finished");
749
750 return closeStreams();
751 }
752
753
754
755
756
757 private List<Path> closeStreams() throws IOException {
758 Preconditions.checkState(!hasClosed);
759
760 List<Path> paths = new ArrayList<Path>();
761 List<IOException> thrown = Lists.newArrayList();
762
763 for (WriterAndPath wap : logWriters.values()) {
764 try {
765 wap.w.close();
766 } catch (IOException ioe) {
767 LOG.error("Couldn't close log at " + wap.p, ioe);
768 thrown.add(ioe);
769 continue;
770 }
771 paths.add(wap.p);
772 LOG.info("Closed path " + wap.p +" (wrote " + wap.editsWritten + " edits in "
773 + (wap.nanosSpent / 1000/ 1000) + "ms)");
774 }
775 if (!thrown.isEmpty()) {
776 throw MultipleIOException.createIOException(thrown);
777 }
778
779 hasClosed = true;
780 return paths;
781 }
782
783
784
785
786
787
788
789
790
791 WriterAndPath getWriterAndPath(Entry entry) throws IOException {
792
793 byte region[] = entry.getKey().getEncodedRegionName();
794 WriterAndPath ret = logWriters.get(region);
795 if (ret != null) {
796 return ret;
797 }
798
799
800
801 if (blacklistedRegions.contains(region)) {
802 return null;
803 }
804
805
806 Path regionedits = getRegionSplitEditsPath(fs,
807 entry, rootDir);
808 if (regionedits == null) {
809
810 blacklistedRegions.add(region);
811 return null;
812 }
813 deletePreexistingOldEdits(regionedits);
814 Writer w = createWriter(fs, regionedits, conf);
815 ret = new WriterAndPath(regionedits, w);
816 logWriters.put(region, ret);
817 LOG.debug("Creating writer path=" + regionedits + " region="
818 + Bytes.toStringBinary(region));
819
820 return ret;
821 }
822
823
824
825
826 private void deletePreexistingOldEdits(Path regionedits) throws IOException {
827 if (fs.exists(regionedits)) {
828 LOG.warn("Found existing old edits file. It could be the "
829 + "result of a previous failed split attempt. Deleting "
830 + regionedits + ", length="
831 + fs.getFileStatus(regionedits).getLen());
832 if (!fs.delete(regionedits, false)) {
833 LOG.warn("Failed delete of old " + regionedits);
834 }
835 }
836 }
837
838
839
840
841
842 private Map<byte[], Long> getOutputCounts() {
843 TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(
844 Bytes.BYTES_COMPARATOR);
845 synchronized (logWriters) {
846 for (Map.Entry<byte[], WriterAndPath> entry : logWriters.entrySet()) {
847 ret.put(entry.getKey(), entry.getValue().editsWritten);
848 }
849 }
850 return ret;
851 }
852 }
853
854
855
856
857
858
859 private final static class WriterAndPath {
860 final Path p;
861 final Writer w;
862
863
864 long editsWritten = 0;
865
866 long nanosSpent = 0;
867
868 WriterAndPath(final Path p, final Writer w) {
869 this.p = p;
870 this.w = w;
871 }
872
873 void incrementEdits(int edits) {
874 editsWritten += edits;
875 }
876
877 void incrementNanoTime(long nanos) {
878 nanosSpent += nanos;
879 }
880 }
881 }