1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver.wal;
21
22 import java.io.DataInput;
23 import java.io.DataOutput;
24 import java.io.FileNotFoundException;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.io.UnsupportedEncodingException;
28 import java.lang.reflect.InvocationTargetException;
29 import java.lang.reflect.Method;
30 import java.net.URLEncoder;
31 import java.util.ArrayList;
32 import java.util.Collections;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.NavigableSet;
36 import java.util.Arrays;
37 import java.util.SortedMap;
38 import java.util.TreeMap;
39 import java.util.TreeSet;
40 import java.util.concurrent.ConcurrentSkipListMap;
41 import java.util.concurrent.CopyOnWriteArrayList;
42 import java.util.concurrent.atomic.AtomicInteger;
43 import java.util.concurrent.atomic.AtomicLong;
44 import java.util.concurrent.locks.Condition;
45 import java.util.concurrent.locks.Lock;
46 import java.util.concurrent.locks.ReentrantLock;
47 import java.util.regex.Matcher;
48 import java.util.regex.Pattern;
49
50 import org.apache.commons.logging.Log;
51 import org.apache.commons.logging.LogFactory;
52 import org.apache.hadoop.conf.Configuration;
53 import org.apache.hadoop.fs.FileStatus;
54 import org.apache.hadoop.fs.FileSystem;
55 import org.apache.hadoop.fs.Path;
56 import org.apache.hadoop.fs.PathFilter;
57 import org.apache.hadoop.fs.Syncable;
58 import org.apache.hadoop.hbase.HBaseConfiguration;
59 import org.apache.hadoop.hbase.HConstants;
60 import org.apache.hadoop.hbase.HRegionInfo;
61 import org.apache.hadoop.hbase.HServerInfo;
62 import org.apache.hadoop.hbase.KeyValue;
63 import org.apache.hadoop.hbase.util.Bytes;
64 import org.apache.hadoop.hbase.util.ClassSize;
65 import org.apache.hadoop.hbase.util.FSUtils;
66 import org.apache.hadoop.hbase.util.Threads;
67 import org.apache.hadoop.io.Writable;
68 import org.apache.hadoop.util.StringUtils;
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109 public class HLog implements Syncable {
110 static final Log LOG = LogFactory.getLog(HLog.class);
111 public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
112 static final byte [] METAROW = Bytes.toBytes("METAROW");
113
114
115
116
117
118 private static final String RECOVERED_EDITS_DIR = "recovered.edits";
119 private static final Pattern EDITFILES_NAME_PATTERN =
120 Pattern.compile("-?[0-9]+");
121
122 private final FileSystem fs;
123 private final Path dir;
124 private final Configuration conf;
125
126 private List<WALObserver> listeners =
127 new CopyOnWriteArrayList<WALObserver>();
128 private final long optionalFlushInterval;
129 private final long blocksize;
130 private final String prefix;
131 private final Path oldLogDir;
132 private boolean logRollRequested;
133
134
135 private static Class<? extends Writer> logWriterClass;
136 private static Class<? extends Reader> logReaderClass;
137
138 static void resetLogReaderClass() {
139 HLog.logReaderClass = null;
140 }
141
142 private OutputStream hdfs_out;
143 private int initialReplication;
144 private Method getNumCurrentReplicas;
145 final static Object [] NO_ARGS = new Object []{};
146
147
148 private boolean forceSync = false;
149
150 public interface Reader {
151 void init(FileSystem fs, Path path, Configuration c) throws IOException;
152 void close() throws IOException;
153 Entry next() throws IOException;
154 Entry next(Entry reuse) throws IOException;
155 void seek(long pos) throws IOException;
156 long getPosition() throws IOException;
157 }
158
159 public interface Writer {
160 void init(FileSystem fs, Path path, Configuration c) throws IOException;
161 void close() throws IOException;
162 void sync() throws IOException;
163 void append(Entry entry) throws IOException;
164 long getLength() throws IOException;
165 }
166
167
168
169
170 Writer writer;
171
172
173
174
175 final SortedMap<Long, Path> outputfiles =
176 Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
177
178
179
180
181
182 private final ConcurrentSkipListMap<byte [], Long> lastSeqWritten =
183 new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
184
185 private volatile boolean closed = false;
186
187 private final AtomicLong logSeqNum = new AtomicLong(0);
188
189
190 private volatile long filenum = -1;
191
192
193 private final AtomicInteger numEntries = new AtomicInteger(0);
194
195
196
197 private final long logrollsize;
198
199
200
201 private final Lock cacheFlushLock = new ReentrantLock();
202
203
204
205
206 private final Object updateLock = new Object();
207
208 private final boolean enabled;
209
210
211
212
213
214
215 private final int maxLogs;
216
217
218
219
220 private final LogSyncer logSyncerThread;
221
222
223
224
225 private static final Pattern pattern = Pattern.compile(".*\\.\\d*");
226
227 static byte [] COMPLETE_CACHE_FLUSH;
228 static {
229 try {
230 COMPLETE_CACHE_FLUSH =
231 "HBASE::CACHEFLUSH".getBytes(HConstants.UTF8_ENCODING);
232 } catch (UnsupportedEncodingException e) {
233 assert(false);
234 }
235 }
236
237
238 private static volatile long writeOps;
239 private static volatile long writeTime;
240
241 private static volatile long syncOps;
242 private static volatile long syncTime;
243
244 public static long getWriteOps() {
245 long ret = writeOps;
246 writeOps = 0;
247 return ret;
248 }
249
250 public static long getWriteTime() {
251 long ret = writeTime;
252 writeTime = 0;
253 return ret;
254 }
255
256 public static long getSyncOps() {
257 long ret = syncOps;
258 syncOps = 0;
259 return ret;
260 }
261
262 public static long getSyncTime() {
263 long ret = syncTime;
264 syncTime = 0;
265 return ret;
266 }
267
268
269
270
271
272
273
274
275
276
277 public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
278 final Configuration conf)
279 throws IOException {
280 this(fs, dir, oldLogDir, conf, null, true, null);
281 }
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302 public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
303 final Configuration conf, final List<WALObserver> listeners,
304 final String prefix) throws IOException {
305 this(fs, dir, oldLogDir, conf, listeners, true, prefix);
306 }
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328 public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
329 final Configuration conf, final List<WALObserver> listeners,
330 final boolean failIfLogDirExists, final String prefix)
331 throws IOException {
332 super();
333 this.fs = fs;
334 this.dir = dir;
335 this.conf = conf;
336 if (listeners != null) {
337 for (WALObserver i: listeners) {
338 registerWALActionsListener(i);
339 }
340 }
341 this.blocksize = conf.getLong("hbase.regionserver.hlog.blocksize",
342 this.fs.getDefaultBlockSize());
343
344 float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
345 this.logrollsize = (long)(this.blocksize * multi);
346 this.optionalFlushInterval =
347 conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
348 if (failIfLogDirExists && fs.exists(dir)) {
349 throw new IOException("Target HLog directory already exists: " + dir);
350 }
351 if (!fs.mkdirs(dir)) {
352 throw new IOException("Unable to mkdir " + dir);
353 }
354 this.oldLogDir = oldLogDir;
355 if (!fs.exists(oldLogDir)) {
356 if (!fs.mkdirs(this.oldLogDir)) {
357 throw new IOException("Unable to mkdir " + this.oldLogDir);
358 }
359 }
360 this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
361 this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
362 LOG.info("HLog configuration: blocksize=" +
363 StringUtils.byteDesc(this.blocksize) +
364 ", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
365 ", enabled=" + this.enabled +
366 ", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
367
368 this.prefix = prefix == null || prefix.isEmpty() ?
369 "hlog" : URLEncoder.encode(prefix, "UTF8");
370
371 rollWriter();
372
373
374 this.getNumCurrentReplicas = null;
375 Exception exception = null;
376 if (this.hdfs_out != null) {
377 try {
378 this.getNumCurrentReplicas = this.hdfs_out.getClass().
379 getMethod("getNumCurrentReplicas", new Class<?> []{});
380 this.getNumCurrentReplicas.setAccessible(true);
381 } catch (NoSuchMethodException e) {
382
383 exception = e;
384 } catch (SecurityException e) {
385
386 exception = e;
387 this.getNumCurrentReplicas = null;
388 }
389 }
390 if (this.getNumCurrentReplicas != null) {
391 LOG.info("Using getNumCurrentReplicas--HDFS-826");
392 } else {
393 LOG.info("getNumCurrentReplicas--HDFS-826 not available; hdfs_out=" +
394 this.hdfs_out + ", exception=" + exception.getMessage());
395 }
396
397 logSyncerThread = new LogSyncer(this.optionalFlushInterval);
398 Threads.setDaemonThreadRunning(logSyncerThread,
399 Thread.currentThread().getName() + ".logSyncer");
400 }
401
402 public void registerWALActionsListener (final WALObserver listener) {
403 this.listeners.add(listener);
404 }
405
406 public boolean unregisterWALActionsListener(final WALObserver listener) {
407 return this.listeners.remove(listener);
408 }
409
410
411
412
413 public long getFilenum() {
414 return this.filenum;
415 }
416
417
418
419
420
421
422
423
424
425 public void setSequenceNumber(final long newvalue) {
426 for (long id = this.logSeqNum.get(); id < newvalue &&
427 !this.logSeqNum.compareAndSet(id, newvalue); id = this.logSeqNum.get()) {
428
429
430 LOG.debug("Changed sequenceid from " + logSeqNum + " to " + newvalue);
431 }
432 }
433
434
435
436
437 public long getSequenceNumber() {
438 return logSeqNum.get();
439 }
440
441
442 OutputStream getOutputStream() {
443 return this.hdfs_out;
444 }
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466 public byte [][] rollWriter() throws FailedLogCloseException, IOException {
467
468 if (this.writer != null && this.numEntries.get() <= 0) {
469 return null;
470 }
471 byte [][] regionsToFlush = null;
472 this.cacheFlushLock.lock();
473 try {
474 if (closed) {
475 return regionsToFlush;
476 }
477
478
479 long currentFilenum = this.filenum;
480 this.filenum = System.currentTimeMillis();
481 Path newPath = computeFilename();
482 HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
483 int nextInitialReplication = fs.getFileStatus(newPath).getReplication();
484
485
486
487 OutputStream nextHdfsOut = null;
488 if (nextWriter instanceof SequenceFileLogWriter) {
489 nextHdfsOut =
490 ((SequenceFileLogWriter)nextWriter).getDFSCOutputStream();
491 }
492
493 if (!this.listeners.isEmpty()) {
494 for (WALObserver i : this.listeners) {
495 i.logRolled(newPath);
496 }
497 }
498
499 synchronized (updateLock) {
500
501 Path oldFile = cleanupCurrentWriter(currentFilenum);
502 this.writer = nextWriter;
503 this.initialReplication = nextInitialReplication;
504 this.hdfs_out = nextHdfsOut;
505
506 LOG.info((oldFile != null?
507 "Roll " + FSUtils.getPath(oldFile) + ", entries=" +
508 this.numEntries.get() +
509 ", filesize=" +
510 this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
511 "New hlog " + FSUtils.getPath(newPath));
512 this.numEntries.set(0);
513 this.logRollRequested = false;
514 }
515
516 if (this.outputfiles.size() > 0) {
517 if (this.lastSeqWritten.isEmpty()) {
518 LOG.debug("Last sequenceid written is empty. Deleting all old hlogs");
519
520
521
522 for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
523 archiveLogFile(e.getValue(), e.getKey());
524 }
525 this.outputfiles.clear();
526 } else {
527 regionsToFlush = cleanOldLogs();
528 }
529 }
530 } finally {
531 this.cacheFlushLock.unlock();
532 }
533 return regionsToFlush;
534 }
535
536
537
538
539
540
541
542
543
544
545
546 protected Writer createWriterInstance(final FileSystem fs, final Path path,
547 final Configuration conf) throws IOException {
548 return createWriter(fs, path, conf);
549 }
550
551
552
553
554
555
556
557
558
559 public static Reader getReader(final FileSystem fs,
560 final Path path, Configuration conf)
561 throws IOException {
562 try {
563
564 if (logReaderClass == null) {
565
566 logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
567 SequenceFileLogReader.class, Reader.class);
568 }
569
570
571 HLog.Reader reader = logReaderClass.newInstance();
572 reader.init(fs, path, conf);
573 return reader;
574 } catch (IOException e) {
575 throw e;
576 }
577 catch (Exception e) {
578 throw new IOException("Cannot get log reader", e);
579 }
580 }
581
582
583
584
585
586
587
588
589 public static Writer createWriter(final FileSystem fs,
590 final Path path, Configuration conf)
591 throws IOException {
592 try {
593 if (logWriterClass == null) {
594 logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl",
595 SequenceFileLogWriter.class, Writer.class);
596 }
597 HLog.Writer writer = (HLog.Writer) logWriterClass.newInstance();
598 writer.init(fs, path, conf);
599 return writer;
600 } catch (Exception e) {
601 IOException ie = new IOException("cannot get log writer");
602 ie.initCause(e);
603 throw ie;
604 }
605 }
606
607
608
609
610
611
612
613
614 private byte [][] cleanOldLogs() throws IOException {
615 Long oldestOutstandingSeqNum = getOldestOutstandingSeqNum();
616
617
618 TreeSet<Long> sequenceNumbers =
619 new TreeSet<Long>(this.outputfiles.headMap(
620 (Long.valueOf(oldestOutstandingSeqNum.longValue()))).keySet());
621
622 int logsToRemove = sequenceNumbers.size();
623 if (logsToRemove > 0) {
624 if (LOG.isDebugEnabled()) {
625
626 byte [] oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
627 LOG.debug("Found " + logsToRemove + " hlogs to remove" +
628 " out of total " + this.outputfiles.size() + ";" +
629 " oldest outstanding sequenceid is " + oldestOutstandingSeqNum +
630 " from region " + Bytes.toStringBinary(oldestRegion));
631 }
632 for (Long seq : sequenceNumbers) {
633 archiveLogFile(this.outputfiles.remove(seq), seq);
634 }
635 }
636
637
638
639 byte [][] regions = null;
640 int logCount = this.outputfiles.size();
641 if (logCount > this.maxLogs && this.outputfiles != null &&
642 this.outputfiles.size() > 0) {
643
644 regions = findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(),
645 this.lastSeqWritten);
646 if (regions != null) {
647 StringBuilder sb = new StringBuilder();
648 for (int i = 0; i < regions.length; i++) {
649 if (i > 0) sb.append(", ");
650 sb.append(Bytes.toStringBinary(regions[i]));
651 }
652 LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" +
653 this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " +
654 sb.toString());
655 }
656 }
657 return regions;
658 }
659
660
661
662
663
664
665
666
667
668 static byte [][] findMemstoresWithEditsEqualOrOlderThan(final long oldestWALseqid,
669 final Map<byte [], Long> regionsToSeqids) {
670
671 List<byte []> regions = null;
672 for (Map.Entry<byte [], Long> e: regionsToSeqids.entrySet()) {
673 if (e.getValue().longValue() <= oldestWALseqid) {
674 if (regions == null) regions = new ArrayList<byte []>();
675 regions.add(e.getKey());
676 }
677 }
678 return regions == null?
679 null: regions.toArray(new byte [][] {HConstants.EMPTY_BYTE_ARRAY});
680 }
681
682
683
684
685 private Long getOldestOutstandingSeqNum() {
686 return Collections.min(this.lastSeqWritten.values());
687 }
688
689
690
691
692
693 private byte [] getOldestRegion(final Long oldestOutstandingSeqNum) {
694 byte [] oldestRegion = null;
695 for (Map.Entry<byte [], Long> e: this.lastSeqWritten.entrySet()) {
696 if (e.getValue().longValue() == oldestOutstandingSeqNum.longValue()) {
697 oldestRegion = e.getKey();
698 break;
699 }
700 }
701 return oldestRegion;
702 }
703
704
705
706
707
708
709
710 private Path cleanupCurrentWriter(final long currentfilenum)
711 throws IOException {
712 Path oldFile = null;
713 if (this.writer != null) {
714
715 try {
716 this.writer.close();
717 } catch (IOException e) {
718
719
720
721 FailedLogCloseException flce =
722 new FailedLogCloseException("#" + currentfilenum);
723 flce.initCause(e);
724 throw e;
725 }
726 if (currentfilenum >= 0) {
727 oldFile = computeFilename(currentfilenum);
728 this.outputfiles.put(Long.valueOf(this.logSeqNum.get()), oldFile);
729 }
730 }
731 return oldFile;
732 }
733
734 private void archiveLogFile(final Path p, final Long seqno) throws IOException {
735 Path newPath = getHLogArchivePath(this.oldLogDir, p);
736 LOG.info("moving old hlog file " + FSUtils.getPath(p) +
737 " whose highest sequenceid is " + seqno + " to " +
738 FSUtils.getPath(newPath));
739 if (!this.fs.rename(p, newPath)) {
740 throw new IOException("Unable to rename " + p + " to " + newPath);
741 }
742 }
743
744
745
746
747
748
749 protected Path computeFilename() {
750 return computeFilename(this.filenum);
751 }
752
753
754
755
756
757
758
759 protected Path computeFilename(long filenum) {
760 if (filenum < 0) {
761 throw new RuntimeException("hlog file number can't be < 0");
762 }
763 return new Path(dir, prefix + "." + filenum);
764 }
765
766
767
768
769
770
771 public void closeAndDelete() throws IOException {
772 close();
773 FileStatus[] files = fs.listStatus(this.dir);
774 for(FileStatus file : files) {
775 Path p = getHLogArchivePath(this.oldLogDir, file.getPath());
776 if (!fs.rename(file.getPath(),p)) {
777 throw new IOException("Unable to rename " + file.getPath() + " to " + p);
778 }
779 }
780 LOG.debug("Moved " + files.length + " log files to " +
781 FSUtils.getPath(this.oldLogDir));
782 if (!fs.delete(dir, true)) {
783 LOG.info("Unable to delete " + dir);
784 }
785 }
786
787
788
789
790
791
792 public void close() throws IOException {
793 try {
794 logSyncerThread.interrupt();
795
796 logSyncerThread.join(this.optionalFlushInterval*2);
797 } catch (InterruptedException e) {
798 LOG.error("Exception while waiting for syncer thread to die", e);
799 }
800
801 cacheFlushLock.lock();
802 try {
803
804 if (!this.listeners.isEmpty()) {
805 for (WALObserver i : this.listeners) {
806 i.logCloseRequested();
807 }
808 }
809 synchronized (updateLock) {
810 this.closed = true;
811 if (LOG.isDebugEnabled()) {
812 LOG.debug("closing hlog writer in " + this.dir.toString());
813 }
814 this.writer.close();
815 }
816 } finally {
817 cacheFlushLock.unlock();
818 }
819 }
820
821
822
823
824
825
826
827
828 public void append(HRegionInfo regionInfo, WALEdit logEdit,
829 final long now,
830 final boolean isMetaRegion)
831 throws IOException {
832 byte [] regionName = regionInfo.getEncodedNameAsBytes();
833 byte [] tableName = regionInfo.getTableDesc().getName();
834 this.append(regionInfo, makeKey(regionName, tableName, -1, now), logEdit);
835 }
836
837
838
839
840
841
842
843 protected HLogKey makeKey(byte[] regionName, byte[] tableName, long seqnum, long now) {
844 return new HLogKey(regionName, tableName, seqnum, now);
845 }
846
847
848
849
850
851
852
853
854
855
856 public void append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit)
857 throws IOException {
858 if (this.closed) {
859 throw new IOException("Cannot append; log is closed");
860 }
861 synchronized (updateLock) {
862 long seqNum = obtainSeqNum();
863 logKey.setLogSeqNum(seqNum);
864
865
866
867
868
869 this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(),
870 Long.valueOf(seqNum));
871 doWrite(regionInfo, logKey, logEdit);
872 this.numEntries.incrementAndGet();
873 }
874
875
876
877 if (regionInfo.isMetaRegion() ||
878 !regionInfo.getTableDesc().isDeferredLogFlush()) {
879
880 this.sync();
881 }
882 }
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907 public void append(HRegionInfo info, byte [] tableName, WALEdit edits,
908 final long now)
909 throws IOException {
910 if (edits.isEmpty()) return;
911 if (this.closed) {
912 throw new IOException("Cannot append; log is closed");
913 }
914 synchronized (this.updateLock) {
915 long seqNum = obtainSeqNum();
916
917
918
919
920
921
922
923 byte [] hriKey = info.getEncodedNameAsBytes();
924 this.lastSeqWritten.putIfAbsent(hriKey, seqNum);
925 HLogKey logKey = makeKey(hriKey, tableName, seqNum, now);
926 doWrite(info, logKey, edits);
927 this.numEntries.incrementAndGet();
928 }
929
930
931 if (info.isMetaRegion() ||
932 !info.getTableDesc().isDeferredLogFlush()) {
933
934 this.sync();
935 }
936 }
937
938
939
940
941
942 class LogSyncer extends Thread {
943
944 private final long optionalFlushInterval;
945
946 LogSyncer(long optionalFlushInterval) {
947 this.optionalFlushInterval = optionalFlushInterval;
948 }
949
950 @Override
951 public void run() {
952 try {
953
954
955 while(!this.isInterrupted()) {
956
957 Thread.sleep(this.optionalFlushInterval);
958 sync();
959 }
960 } catch (IOException e) {
961 LOG.error("Error while syncing, requesting close of hlog ", e);
962 requestLogRoll();
963 } catch (InterruptedException e) {
964 LOG.debug(getName() + " interrupted while waiting for sync requests");
965 } finally {
966 LOG.info(getName() + " exiting");
967 }
968 }
969 }
970
971 @Override
972 public void sync() throws IOException {
973 synchronized (this.updateLock) {
974 if (this.closed) {
975 return;
976 }
977 }
978 try {
979 long now = System.currentTimeMillis();
980
981 this.writer.sync();
982 synchronized (this.updateLock) {
983 syncTime += System.currentTimeMillis() - now;
984 syncOps++;
985 if (!logRollRequested) {
986 checkLowReplication();
987 if (this.writer.getLength() > this.logrollsize) {
988 requestLogRoll();
989 }
990 }
991 }
992
993 } catch (IOException e) {
994 LOG.fatal("Could not append. Requesting close of hlog", e);
995 requestLogRoll();
996 throw e;
997 }
998 }
999
1000 private void checkLowReplication() {
1001
1002
1003 try {
1004 int numCurrentReplicas = getLogReplication();
1005 if (numCurrentReplicas != 0 &&
1006 numCurrentReplicas < this.initialReplication) {
1007 LOG.warn("HDFS pipeline error detected. " +
1008 "Found " + numCurrentReplicas + " replicas but expecting " +
1009 this.initialReplication + " replicas. " +
1010 " Requesting close of hlog.");
1011 requestLogRoll();
1012 logRollRequested = true;
1013 }
1014 } catch (Exception e) {
1015 LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
1016 " still proceeding ahead...");
1017 }
1018 }
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032 int getLogReplication() throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
1033 if(this.getNumCurrentReplicas != null && this.hdfs_out != null) {
1034 Object repl = this.getNumCurrentReplicas.invoke(this.hdfs_out, NO_ARGS);
1035 if (repl instanceof Integer) {
1036 return ((Integer)repl).intValue();
1037 }
1038 }
1039 return 0;
1040 }
1041
1042 boolean canGetCurReplicas() {
1043 return this.getNumCurrentReplicas != null;
1044 }
1045
1046 public void hsync() throws IOException {
1047
1048 sync();
1049 }
1050
1051 private void requestLogRoll() {
1052 if (!this.listeners.isEmpty()) {
1053 for (WALObserver i: this.listeners) {
1054 i.logRollRequested();
1055 }
1056 }
1057 }
1058
1059 protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit)
1060 throws IOException {
1061 if (!this.enabled) {
1062 return;
1063 }
1064 if (!this.listeners.isEmpty()) {
1065 for (WALObserver i: this.listeners) {
1066 i.visitLogEntryBeforeWrite(info, logKey, logEdit);
1067 }
1068 }
1069 try {
1070 long now = System.currentTimeMillis();
1071 this.writer.append(new HLog.Entry(logKey, logEdit));
1072 long took = System.currentTimeMillis() - now;
1073 writeTime += took;
1074 writeOps++;
1075 if (took > 1000) {
1076 long len = 0;
1077 for(KeyValue kv : logEdit.getKeyValues()) {
1078 len += kv.getLength();
1079 }
1080 LOG.warn(String.format(
1081 "%s took %d ms appending an edit to hlog; editcount=%d, len~=%s",
1082 Thread.currentThread().getName(), took, this.numEntries.get(),
1083 StringUtils.humanReadableInt(len)));
1084 }
1085 } catch (IOException e) {
1086 LOG.fatal("Could not append. Requesting close of hlog", e);
1087 requestLogRoll();
1088 throw e;
1089 }
1090 }
1091
1092
1093 int getNumEntries() {
1094 return numEntries.get();
1095 }
1096
1097
1098
1099
1100 private long obtainSeqNum() {
1101 return this.logSeqNum.incrementAndGet();
1102 }
1103
1104
1105 int getNumLogFiles() {
1106 return outputfiles.size();
1107 }
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122 public long startCacheFlush() {
1123 this.cacheFlushLock.lock();
1124 return obtainSeqNum();
1125 }
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137 public void completeCacheFlush(final byte [] encodedRegionName,
1138 final byte [] tableName, final long logSeqId, final boolean isMetaRegion)
1139 throws IOException {
1140 try {
1141 if (this.closed) {
1142 return;
1143 }
1144 synchronized (updateLock) {
1145 long now = System.currentTimeMillis();
1146 WALEdit edit = completeCacheFlushLogEdit();
1147 HLogKey key = makeKey(encodedRegionName, tableName, logSeqId,
1148 System.currentTimeMillis());
1149 this.writer.append(new Entry(key, edit));
1150 writeTime += System.currentTimeMillis() - now;
1151 writeOps++;
1152 this.numEntries.incrementAndGet();
1153 Long seq = this.lastSeqWritten.get(encodedRegionName);
1154 if (seq != null && logSeqId >= seq.longValue()) {
1155 this.lastSeqWritten.remove(encodedRegionName);
1156 }
1157 }
1158
1159 this.sync();
1160
1161 } finally {
1162 this.cacheFlushLock.unlock();
1163 }
1164 }
1165
1166 private WALEdit completeCacheFlushLogEdit() {
1167 KeyValue kv = new KeyValue(METAROW, METAFAMILY, null,
1168 System.currentTimeMillis(), COMPLETE_CACHE_FLUSH);
1169 WALEdit e = new WALEdit();
1170 e.add(kv);
1171 return e;
1172 }
1173
1174
1175
1176
1177
1178
1179
1180 public void abortCacheFlush() {
1181 this.cacheFlushLock.unlock();
1182 }
1183
1184
1185
1186
1187
1188 public static boolean isMetaFamily(byte [] family) {
1189 return Bytes.equals(METAFAMILY, family);
1190 }
1191
1192 @SuppressWarnings("unchecked")
1193 public static Class<? extends HLogKey> getKeyClass(Configuration conf) {
1194 return (Class<? extends HLogKey>)
1195 conf.getClass("hbase.regionserver.hlog.keyclass", HLogKey.class);
1196 }
1197
1198 public static HLogKey newKey(Configuration conf) throws IOException {
1199 Class<? extends HLogKey> keyClass = getKeyClass(conf);
1200 try {
1201 return keyClass.newInstance();
1202 } catch (InstantiationException e) {
1203 throw new IOException("cannot create hlog key");
1204 } catch (IllegalAccessException e) {
1205 throw new IOException("cannot create hlog key");
1206 }
1207 }
1208
1209
1210
1211
1212
1213 public static class Entry implements Writable {
1214 private WALEdit edit;
1215 private HLogKey key;
1216
1217 public Entry() {
1218 edit = new WALEdit();
1219 key = new HLogKey();
1220 }
1221
1222
1223
1224
1225
1226
1227 public Entry(HLogKey key, WALEdit edit) {
1228 super();
1229 this.key = key;
1230 this.edit = edit;
1231 }
1232
1233
1234
1235
1236 public WALEdit getEdit() {
1237 return edit;
1238 }
1239
1240
1241
1242
1243 public HLogKey getKey() {
1244 return key;
1245 }
1246
1247 @Override
1248 public String toString() {
1249 return this.key + "=" + this.edit;
1250 }
1251
1252 @Override
1253 public void write(DataOutput dataOutput) throws IOException {
1254 this.key.write(dataOutput);
1255 this.edit.write(dataOutput);
1256 }
1257
1258 @Override
1259 public void readFields(DataInput dataInput) throws IOException {
1260 this.key.readFields(dataInput);
1261 this.edit.readFields(dataInput);
1262 }
1263 }
1264
1265
1266
1267
1268
1269
1270
1271 public static String getHLogDirectoryName(HServerInfo info) {
1272 return getHLogDirectoryName(info.getServerName());
1273 }
1274
1275
1276
1277
1278
1279
1280
1281
1282 public static String getHLogDirectoryName(String serverAddress,
1283 long startCode) {
1284 if (serverAddress == null || serverAddress.length() == 0) {
1285 return null;
1286 }
1287 return getHLogDirectoryName(
1288 HServerInfo.getServerName(serverAddress, startCode));
1289 }
1290
1291
1292
1293
1294
1295
1296
1297 public static String getHLogDirectoryName(String serverName) {
1298 StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME);
1299 dirName.append("/");
1300 dirName.append(serverName);
1301 return dirName.toString();
1302 }
1303
1304
1305
1306
1307
1308
1309 protected Path getDir() {
1310 return dir;
1311 }
1312
1313 public static boolean validateHLogFilename(String filename) {
1314 return pattern.matcher(filename).matches();
1315 }
1316
1317 static Path getHLogArchivePath(Path oldLogDir, Path p) {
1318 return new Path(oldLogDir, p.getName());
1319 }
1320
1321 static String formatRecoveredEditsFileName(final long seqid) {
1322 return String.format("%019d", seqid);
1323 }
1324
1325
1326
1327
1328
1329
1330
1331
1332 public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs,
1333 final Path regiondir)
1334 throws IOException {
1335 Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
1336 FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
1337 @Override
1338 public boolean accept(Path p) {
1339 boolean result = false;
1340 try {
1341
1342
1343
1344
1345 Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
1346 result = fs.isFile(p) && m.matches();
1347 } catch (IOException e) {
1348 LOG.warn("Failed isFile check on " + p);
1349 }
1350 return result;
1351 }
1352 });
1353 NavigableSet<Path> filesSorted = new TreeSet<Path>();
1354 if (files == null) return filesSorted;
1355 for (FileStatus status: files) {
1356 filesSorted.add(status.getPath());
1357 }
1358 return filesSorted;
1359 }
1360
1361
1362
1363
1364
1365
1366
1367
1368 public static Path moveAsideBadEditsFile(final FileSystem fs,
1369 final Path edits)
1370 throws IOException {
1371 Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." +
1372 System.currentTimeMillis());
1373 if (!fs.rename(edits, moveAsideName)) {
1374 LOG.warn("Rename failed from " + edits + " to " + moveAsideName);
1375 }
1376 return moveAsideName;
1377 }
1378
1379
1380
1381
1382
1383
1384 public static Path getRegionDirRecoveredEditsDir(final Path regiondir) {
1385 return new Path(regiondir, RECOVERED_EDITS_DIR);
1386 }
1387
1388 public static final long FIXED_OVERHEAD = ClassSize.align(
1389 ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
1390 ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
1391
1392 private static void usage() {
1393 System.err.println("Usage: HLog <ARGS>");
1394 System.err.println("Arguments:");
1395 System.err.println(" --dump Dump textual representation of passed one or more files");
1396 System.err.println(" For example: HLog --dump hdfs://example.com:9000/hbase/.logs/MACHINE/LOGFILE");
1397 System.err.println(" --split Split the passed directory of WAL logs");
1398 System.err.println(" For example: HLog --split hdfs://example.com:9000/hbase/.logs/DIR");
1399 }
1400
1401 private static void dump(final Configuration conf, final Path p)
1402 throws IOException {
1403 FileSystem fs = FileSystem.get(conf);
1404 if (!fs.exists(p)) {
1405 throw new FileNotFoundException(p.toString());
1406 }
1407 if (!fs.isFile(p)) {
1408 throw new IOException(p + " is not a file");
1409 }
1410 Reader log = getReader(fs, p, conf);
1411 try {
1412 int count = 0;
1413 HLog.Entry entry;
1414 while ((entry = log.next()) != null) {
1415 System.out.println("#" + count + ", pos=" + log.getPosition() + " " +
1416 entry.toString());
1417 count++;
1418 }
1419 } finally {
1420 log.close();
1421 }
1422 }
1423
1424 private static void split(final Configuration conf, final Path p)
1425 throws IOException {
1426 FileSystem fs = FileSystem.get(conf);
1427 if (!fs.exists(p)) {
1428 throw new FileNotFoundException(p.toString());
1429 }
1430 final Path baseDir = new Path(conf.get(HConstants.HBASE_DIR));
1431 final Path oldLogDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
1432 if (!fs.getFileStatus(p).isDir()) {
1433 throw new IOException(p + " is not a directory");
1434 }
1435
1436 HLogSplitter logSplitter = HLogSplitter.createLogSplitter(
1437 conf, baseDir, p, oldLogDir, fs);
1438 logSplitter.splitLog();
1439 }
1440
1441
1442
1443
1444
1445
1446
1447
1448 public static void main(String[] args) throws IOException {
1449 if (args.length < 2) {
1450 usage();
1451 System.exit(-1);
1452 }
1453
1454 if (args[0].compareTo("--dump") == 0) {
1455 HLogPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
1456 } else if (args[0].compareTo("--split") == 0) {
1457 Configuration conf = HBaseConfiguration.create();
1458 for (int i = 1; i < args.length; i++) {
1459 try {
1460 conf.set("fs.default.name", args[i]);
1461 conf.set("fs.defaultFS", args[i]);
1462 Path logPath = new Path(args[i]);
1463 split(conf, logPath);
1464 } catch (Throwable t) {
1465 t.printStackTrace(System.err);
1466 System.exit(-1);
1467 }
1468 }
1469 } else {
1470 usage();
1471 System.exit(-1);
1472 }
1473 }
1474 }