View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver.wal;
21  
22  import java.io.DataInput;
23  import java.io.DataOutput;
24  import java.io.FileNotFoundException;
25  import java.io.IOException;
26  import java.io.OutputStream;
27  import java.io.UnsupportedEncodingException;
28  import java.lang.reflect.InvocationTargetException;
29  import java.lang.reflect.Method;
30  import java.net.URLEncoder;
31  import java.util.ArrayList;
32  import java.util.Collections;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.NavigableSet;
36  import java.util.Arrays;
37  import java.util.SortedMap;
38  import java.util.TreeMap;
39  import java.util.TreeSet;
40  import java.util.concurrent.ConcurrentSkipListMap;
41  import java.util.concurrent.CopyOnWriteArrayList;
42  import java.util.concurrent.atomic.AtomicInteger;
43  import java.util.concurrent.atomic.AtomicLong;
44  import java.util.concurrent.locks.Condition;
45  import java.util.concurrent.locks.Lock;
46  import java.util.concurrent.locks.ReentrantLock;
47  import java.util.regex.Matcher;
48  import java.util.regex.Pattern;
49  
50  import org.apache.commons.logging.Log;
51  import org.apache.commons.logging.LogFactory;
52  import org.apache.hadoop.conf.Configuration;
53  import org.apache.hadoop.fs.FileStatus;
54  import org.apache.hadoop.fs.FileSystem;
55  import org.apache.hadoop.fs.Path;
56  import org.apache.hadoop.fs.PathFilter;
57  import org.apache.hadoop.fs.Syncable;
58  import org.apache.hadoop.hbase.HBaseConfiguration;
59  import org.apache.hadoop.hbase.HConstants;
60  import org.apache.hadoop.hbase.HRegionInfo;
61  import org.apache.hadoop.hbase.HServerInfo;
62  import org.apache.hadoop.hbase.KeyValue;
63  import org.apache.hadoop.hbase.util.Bytes;
64  import org.apache.hadoop.hbase.util.ClassSize;
65  import org.apache.hadoop.hbase.util.FSUtils;
66  import org.apache.hadoop.hbase.util.Threads;
67  import org.apache.hadoop.io.Writable;
68  import org.apache.hadoop.util.StringUtils;
69  
70  /**
71   * HLog stores all the edits to the HStore.  Its the hbase write-ahead-log
72   * implementation.
73   *
74   * It performs logfile-rolling, so external callers are not aware that the
75   * underlying file is being rolled.
76   *
77   * <p>
78   * There is one HLog per RegionServer.  All edits for all Regions carried by
79   * a particular RegionServer are entered first in the HLog.
80   *
81   * <p>
82   * Each HRegion is identified by a unique long <code>int</code>. HRegions do
83   * not need to declare themselves before using the HLog; they simply include
84   * their HRegion-id in the <code>append</code> or
85   * <code>completeCacheFlush</code> calls.
86   *
87   * <p>
88   * An HLog consists of multiple on-disk files, which have a chronological order.
89   * As data is flushed to other (better) on-disk structures, the log becomes
90   * obsolete. We can destroy all the log messages for a given HRegion-id up to
91   * the most-recent CACHEFLUSH message from that HRegion.
92   *
93   * <p>
94   * It's only practical to delete entire files. Thus, we delete an entire on-disk
95   * file F when all of the messages in F have a log-sequence-id that's older
96   * (smaller) than the most-recent CACHEFLUSH message for every HRegion that has
97   * a message in F.
98   *
99   * <p>
100  * Synchronized methods can never execute in parallel. However, between the
101  * start of a cache flush and the completion point, appends are allowed but log
102  * rolling is not. To prevent log rolling taking place during this period, a
103  * separate reentrant lock is used.
104  *
105  * <p>To read an HLog, call {@link #getReader(org.apache.hadoop.fs.FileSystem,
106  * org.apache.hadoop.fs.Path, org.apache.hadoop.conf.Configuration)}.
107  *
108  */
109 public class HLog implements Syncable {
110   static final Log LOG = LogFactory.getLog(HLog.class);
111   public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
112   static final byte [] METAROW = Bytes.toBytes("METAROW");
113 
114   /*
115    * Name of directory that holds recovered edits written by the wal log
116    * splitting code, one per region
117    */
118   private static final String RECOVERED_EDITS_DIR = "recovered.edits";
119   private static final Pattern EDITFILES_NAME_PATTERN =
120     Pattern.compile("-?[0-9]+");
121   
122   private final FileSystem fs;
123   private final Path dir;
124   private final Configuration conf;
125   // Listeners that are called on WAL events.
126   private List<WALObserver> listeners =
127     new CopyOnWriteArrayList<WALObserver>();
128   private final long optionalFlushInterval;
129   private final long blocksize;
130   private final String prefix;
131   private final Path oldLogDir;
132   private boolean logRollRequested;
133 
134 
135   private static Class<? extends Writer> logWriterClass;
136   private static Class<? extends Reader> logReaderClass;
137 
138   static void resetLogReaderClass() {
139     HLog.logReaderClass = null;
140   }
141 
142   private OutputStream hdfs_out;     // OutputStream associated with the current SequenceFile.writer
143   private int initialReplication;    // initial replication factor of SequenceFile.writer
144   private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
145   final static Object [] NO_ARGS = new Object []{};
146 
147   // used to indirectly tell syncFs to force the sync
148   private boolean forceSync = false;
149 
150   public interface Reader {
151     void init(FileSystem fs, Path path, Configuration c) throws IOException;
152     void close() throws IOException;
153     Entry next() throws IOException;
154     Entry next(Entry reuse) throws IOException;
155     void seek(long pos) throws IOException;
156     long getPosition() throws IOException;
157   }
158 
159   public interface Writer {
160     void init(FileSystem fs, Path path, Configuration c) throws IOException;
161     void close() throws IOException;
162     void sync() throws IOException;
163     void append(Entry entry) throws IOException;
164     long getLength() throws IOException;
165   }
166 
167   /*
168    * Current log file.
169    */
170   Writer writer;
171 
172   /*
173    * Map of all log files but the current one.
174    */
175   final SortedMap<Long, Path> outputfiles =
176     Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
177 
178   /*
179    * Map of regions to most recent sequence/edit id in their memstore.
180    * Key is encoded region name.
181    */
182   private final ConcurrentSkipListMap<byte [], Long> lastSeqWritten =
183     new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
184 
185   private volatile boolean closed = false;
186 
187   private final AtomicLong logSeqNum = new AtomicLong(0);
188 
189   // The timestamp (in ms) when the log file was created.
190   private volatile long filenum = -1;
191 
192   //number of transactions in the current Hlog.
193   private final AtomicInteger numEntries = new AtomicInteger(0);
194 
195   // If > than this size, roll the log. This is typically 0.95 times the size
196   // of the default Hdfs block size.
197   private final long logrollsize;
198 
199   // This lock prevents starting a log roll during a cache flush.
200   // synchronized is insufficient because a cache flush spans two method calls.
201   private final Lock cacheFlushLock = new ReentrantLock();
202 
203   // We synchronize on updateLock to prevent updates and to prevent a log roll
204   // during an update
205   // locked during appends
206   private final Object updateLock = new Object();
207 
208   private final boolean enabled;
209 
210   /*
211    * If more than this many logs, force flush of oldest region to oldest edit
212    * goes to disk.  If too many and we crash, then will take forever replaying.
213    * Keep the number of logs tidy.
214    */
215   private final int maxLogs;
216 
217   /**
218    * Thread that handles optional sync'ing
219    */
220   private final LogSyncer logSyncerThread;
221 
222   /**
223    * Pattern used to validate a HLog file name
224    */
225   private static final Pattern pattern = Pattern.compile(".*\\.\\d*");
226 
227   static byte [] COMPLETE_CACHE_FLUSH;
228   static {
229     try {
230       COMPLETE_CACHE_FLUSH =
231         "HBASE::CACHEFLUSH".getBytes(HConstants.UTF8_ENCODING);
232     } catch (UnsupportedEncodingException e) {
233       assert(false);
234     }
235   }
236 
237   // For measuring latency of writes
238   private static volatile long writeOps;
239   private static volatile long writeTime;
240   // For measuring latency of syncs
241   private static volatile long syncOps;
242   private static volatile long syncTime;
243   
244   public static long getWriteOps() {
245     long ret = writeOps;
246     writeOps = 0;
247     return ret;
248   }
249 
250   public static long getWriteTime() {
251     long ret = writeTime;
252     writeTime = 0;
253     return ret;
254   }
255 
256   public static long getSyncOps() {
257     long ret = syncOps;
258     syncOps = 0;
259     return ret;
260   }
261 
262   public static long getSyncTime() {
263     long ret = syncTime;
264     syncTime = 0;
265     return ret;
266   }
267 
268   /**
269    * Constructor.
270    *
271    * @param fs filesystem handle
272    * @param dir path to where hlogs are stored
273    * @param oldLogDir path to where hlogs are archived
274    * @param conf configuration to use
275    * @throws IOException
276    */
277   public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
278               final Configuration conf)
279   throws IOException {
280     this(fs, dir, oldLogDir, conf, null, true, null);
281   }
282 
283   /**
284    * Create an edit log at the given <code>dir</code> location.
285    *
286    * You should never have to load an existing log. If there is a log at
287    * startup, it should have already been processed and deleted by the time the
288    * HLog object is started up.
289    *
290    * @param fs filesystem handle
291    * @param dir path to where hlogs are stored
292    * @param oldLogDir path to where hlogs are archived
293    * @param conf configuration to use
294    * @param listeners Listeners on WAL events. Listeners passed here will
295    * be registered before we do anything else; e.g. the
296    * Constructor {@link #rollWriter()}.
297    * @param prefix should always be hostname and port in distributed env and
298    *        it will be URL encoded before being used.
299    *        If prefix is null, "hlog" will be used
300    * @throws IOException
301    */
302   public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
303       final Configuration conf, final List<WALObserver> listeners,
304       final String prefix) throws IOException {
305     this(fs, dir, oldLogDir, conf, listeners, true, prefix);
306   }
307 
308   /**
309    * Create an edit log at the given <code>dir</code> location.
310    *
311    * You should never have to load an existing log. If there is a log at
312    * startup, it should have already been processed and deleted by the time the
313    * HLog object is started up.
314    *
315    * @param fs filesystem handle
316    * @param dir path to where hlogs are stored
317    * @param oldLogDir path to where hlogs are archived
318    * @param conf configuration to use
319    * @param listeners Listeners on WAL events. Listeners passed here will
320    * be registered before we do anything else; e.g. the
321    * Constructor {@link #rollWriter()}.
322    * @param failIfLogDirExists If true IOException will be thrown if dir already exists.
323    * @param prefix should always be hostname and port in distributed env and
324    *        it will be URL encoded before being used.
325    *        If prefix is null, "hlog" will be used
326    * @throws IOException
327    */
328   public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
329       final Configuration conf, final List<WALObserver> listeners,
330       final boolean failIfLogDirExists, final String prefix)
331  throws IOException {
332     super();
333     this.fs = fs;
334     this.dir = dir;
335     this.conf = conf;
336     if (listeners != null) {
337       for (WALObserver i: listeners) {
338         registerWALActionsListener(i);
339       }
340     }
341     this.blocksize = conf.getLong("hbase.regionserver.hlog.blocksize",
342       this.fs.getDefaultBlockSize());
343     // Roll at 95% of block size.
344     float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
345     this.logrollsize = (long)(this.blocksize * multi);
346     this.optionalFlushInterval =
347       conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
348     if (failIfLogDirExists && fs.exists(dir)) {
349       throw new IOException("Target HLog directory already exists: " + dir);
350     }
351     if (!fs.mkdirs(dir)) {
352       throw new IOException("Unable to mkdir " + dir);
353     }
354     this.oldLogDir = oldLogDir;
355     if (!fs.exists(oldLogDir)) {
356       if (!fs.mkdirs(this.oldLogDir)) {
357         throw new IOException("Unable to mkdir " + this.oldLogDir);
358       }
359     }
360     this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
361     this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
362     LOG.info("HLog configuration: blocksize=" +
363       StringUtils.byteDesc(this.blocksize) +
364       ", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
365       ", enabled=" + this.enabled +
366       ", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
367     // If prefix is null||empty then just name it hlog
368     this.prefix = prefix == null || prefix.isEmpty() ?
369         "hlog" : URLEncoder.encode(prefix, "UTF8");
370     // rollWriter sets this.hdfs_out if it can.
371     rollWriter();
372 
373     // handle the reflection necessary to call getNumCurrentReplicas()
374     this.getNumCurrentReplicas = null;
375     Exception exception = null;
376     if (this.hdfs_out != null) {
377       try {
378         this.getNumCurrentReplicas = this.hdfs_out.getClass().
379           getMethod("getNumCurrentReplicas", new Class<?> []{});
380         this.getNumCurrentReplicas.setAccessible(true);
381       } catch (NoSuchMethodException e) {
382         // Thrown if getNumCurrentReplicas() function isn't available
383         exception = e;
384       } catch (SecurityException e) {
385         // Thrown if we can't get access to getNumCurrentReplicas()
386         exception = e;
387         this.getNumCurrentReplicas = null; // could happen on setAccessible()
388       }
389     }
390     if (this.getNumCurrentReplicas != null) {
391       LOG.info("Using getNumCurrentReplicas--HDFS-826");
392     } else {
393       LOG.info("getNumCurrentReplicas--HDFS-826 not available; hdfs_out=" +
394         this.hdfs_out + ", exception=" + exception.getMessage());
395     }
396 
397     logSyncerThread = new LogSyncer(this.optionalFlushInterval);
398     Threads.setDaemonThreadRunning(logSyncerThread,
399         Thread.currentThread().getName() + ".logSyncer");
400   }
401 
402   public void registerWALActionsListener (final WALObserver listener) {
403     this.listeners.add(listener);
404   }
405 
406   public boolean unregisterWALActionsListener(final WALObserver listener) {
407     return this.listeners.remove(listener);
408   }
409 
410   /**
411    * @return Current state of the monotonically increasing file id.
412    */
413   public long getFilenum() {
414     return this.filenum;
415   }
416 
417   /**
418    * Called by HRegionServer when it opens a new region to ensure that log
419    * sequence numbers are always greater than the latest sequence number of the
420    * region being brought on-line.
421    *
422    * @param newvalue We'll set log edit/sequence number to this value if it
423    * is greater than the current value.
424    */
425   public void setSequenceNumber(final long newvalue) {
426     for (long id = this.logSeqNum.get(); id < newvalue &&
427         !this.logSeqNum.compareAndSet(id, newvalue); id = this.logSeqNum.get()) {
428       // This could spin on occasion but better the occasional spin than locking
429       // every increment of sequence number.
430       LOG.debug("Changed sequenceid from " + logSeqNum + " to " + newvalue);
431     }
432   }
433 
434   /**
435    * @return log sequence number
436    */
437   public long getSequenceNumber() {
438     return logSeqNum.get();
439   }
440 
441   // usage: see TestLogRolling.java
442   OutputStream getOutputStream() {
443     return this.hdfs_out;
444   }
445 
446   /**
447    * Roll the log writer. That is, start writing log messages to a new file.
448    *
449    * Because a log cannot be rolled during a cache flush, and a cache flush
450    * spans two method calls, a special lock needs to be obtained so that a cache
451    * flush cannot start when the log is being rolled and the log cannot be
452    * rolled during a cache flush.
453    *
454    * <p>Note that this method cannot be synchronized because it is possible that
455    * startCacheFlush runs, obtaining the cacheFlushLock, then this method could
456    * start which would obtain the lock on this but block on obtaining the
457    * cacheFlushLock and then completeCacheFlush could be called which would wait
458    * for the lock on this and consequently never release the cacheFlushLock
459    *
460    * @return If lots of logs, flush the returned regions so next time through
461    * we can clean logs. Returns null if nothing to flush.  Names are actual
462    * region names as returned by {@link HRegionInfo#getEncodedName()}
463    * @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException
464    * @throws IOException
465    */
466   public byte [][] rollWriter() throws FailedLogCloseException, IOException {
467     // Return if nothing to flush.
468     if (this.writer != null && this.numEntries.get() <= 0) {
469       return null;
470     }
471     byte [][] regionsToFlush = null;
472     this.cacheFlushLock.lock();
473     try {
474       if (closed) {
475         return regionsToFlush;
476       }
477       // Do all the preparation outside of the updateLock to block
478       // as less as possible the incoming writes
479       long currentFilenum = this.filenum;
480       this.filenum = System.currentTimeMillis();
481       Path newPath = computeFilename();
482       HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
483       int nextInitialReplication = fs.getFileStatus(newPath).getReplication();
484       // Can we get at the dfsclient outputstream?  If an instance of
485       // SFLW, it'll have done the necessary reflection to get at the
486       // protected field name.
487       OutputStream nextHdfsOut = null;
488       if (nextWriter instanceof SequenceFileLogWriter) {
489         nextHdfsOut =
490           ((SequenceFileLogWriter)nextWriter).getDFSCOutputStream();
491       }
492       // Tell our listeners that a new log was created
493       if (!this.listeners.isEmpty()) {
494         for (WALObserver i : this.listeners) {
495           i.logRolled(newPath);
496         }
497       }
498 
499       synchronized (updateLock) {
500         // Clean up current writer.
501         Path oldFile = cleanupCurrentWriter(currentFilenum);
502         this.writer = nextWriter;
503         this.initialReplication = nextInitialReplication;
504         this.hdfs_out = nextHdfsOut;
505 
506         LOG.info((oldFile != null?
507             "Roll " + FSUtils.getPath(oldFile) + ", entries=" +
508             this.numEntries.get() +
509             ", filesize=" +
510             this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
511           "New hlog " + FSUtils.getPath(newPath));
512         this.numEntries.set(0);
513         this.logRollRequested = false;
514       }
515       // Can we delete any of the old log files?
516       if (this.outputfiles.size() > 0) {
517         if (this.lastSeqWritten.isEmpty()) {
518           LOG.debug("Last sequenceid written is empty. Deleting all old hlogs");
519           // If so, then no new writes have come in since all regions were
520           // flushed (and removed from the lastSeqWritten map). Means can
521           // remove all but currently open log file.
522           for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
523             archiveLogFile(e.getValue(), e.getKey());
524           }
525           this.outputfiles.clear();
526         } else {
527           regionsToFlush = cleanOldLogs();
528         }
529       }
530     } finally {
531       this.cacheFlushLock.unlock();
532     }
533     return regionsToFlush;
534   }
535 
536   /**
537    * This method allows subclasses to inject different writers without having to
538    * extend other methods like rollWriter().
539    * 
540    * @param fs
541    * @param path
542    * @param conf
543    * @return Writer instance
544    * @throws IOException
545    */
546   protected Writer createWriterInstance(final FileSystem fs, final Path path,
547       final Configuration conf) throws IOException {
548     return createWriter(fs, path, conf);
549   }
550 
551   /**
552    * Get a reader for the WAL.
553    * @param fs
554    * @param path
555    * @param conf
556    * @return A WAL reader.  Close when done with it.
557    * @throws IOException
558    */
559   public static Reader getReader(final FileSystem fs,
560     final Path path, Configuration conf)
561   throws IOException {
562     try {
563 
564       if (logReaderClass == null) {
565 
566         logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
567             SequenceFileLogReader.class, Reader.class);
568       }
569 
570 
571       HLog.Reader reader = logReaderClass.newInstance();
572       reader.init(fs, path, conf);
573       return reader;
574     } catch (IOException e) {
575       throw e;
576     }
577     catch (Exception e) {
578       throw new IOException("Cannot get log reader", e);
579     }
580   }
581 
582   /**
583    * Get a writer for the WAL.
584    * @param path
585    * @param conf
586    * @return A WAL writer.  Close when done with it.
587    * @throws IOException
588    */
589   public static Writer createWriter(final FileSystem fs,
590       final Path path, Configuration conf)
591   throws IOException {
592     try {
593       if (logWriterClass == null) {
594         logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl",
595             SequenceFileLogWriter.class, Writer.class);
596       }
597       HLog.Writer writer = (HLog.Writer) logWriterClass.newInstance();
598       writer.init(fs, path, conf);
599       return writer;
600     } catch (Exception e) {
601       IOException ie = new IOException("cannot get log writer");
602       ie.initCause(e);
603       throw ie;
604     }
605   }
606 
607   /*
608    * Clean up old commit logs.
609    * @return If lots of logs, flush the returned region so next time through
610    * we can clean logs. Returns null if nothing to flush.  Returns array of
611    * encoded region names to flush.
612    * @throws IOException
613    */
614   private byte [][] cleanOldLogs() throws IOException {
615     Long oldestOutstandingSeqNum = getOldestOutstandingSeqNum();
616     // Get the set of all log files whose last sequence number is smaller than
617     // the oldest edit's sequence number.
618     TreeSet<Long> sequenceNumbers =
619       new TreeSet<Long>(this.outputfiles.headMap(
620         (Long.valueOf(oldestOutstandingSeqNum.longValue()))).keySet());
621     // Now remove old log files (if any)
622     int logsToRemove = sequenceNumbers.size();
623     if (logsToRemove > 0) {
624       if (LOG.isDebugEnabled()) {
625         // Find associated region; helps debugging.
626         byte [] oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
627         LOG.debug("Found " + logsToRemove + " hlogs to remove" +
628           " out of total " + this.outputfiles.size() + ";" +
629           " oldest outstanding sequenceid is " + oldestOutstandingSeqNum +
630           " from region " + Bytes.toStringBinary(oldestRegion));
631       }
632       for (Long seq : sequenceNumbers) {
633         archiveLogFile(this.outputfiles.remove(seq), seq);
634       }
635     }
636 
637     // If too many log files, figure which regions we need to flush.
638     // Array is an array of encoded region names.
639     byte [][] regions = null;
640     int logCount = this.outputfiles.size();
641     if (logCount > this.maxLogs && this.outputfiles != null &&
642         this.outputfiles.size() > 0) {
643       // This is an array of encoded region names.
644       regions = findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(),
645         this.lastSeqWritten);
646       if (regions != null) {
647         StringBuilder sb = new StringBuilder();
648         for (int i = 0; i < regions.length; i++) {
649           if (i > 0) sb.append(", ");
650           sb.append(Bytes.toStringBinary(regions[i]));
651         }
652         LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" +
653            this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " +
654            sb.toString());
655       }
656     }
657     return regions;
658   }
659 
660   /**
661    * Return regions (memstores) that have edits that are equal or less than
662    * the passed <code>oldestWALseqid</code>.
663    * @param oldestWALseqid
664    * @param regionsToSeqids
665    * @return All regions whose seqid is < than <code>oldestWALseqid</code> (Not
666    * necessarily in order).  Null if no regions found.
667    */
668   static byte [][] findMemstoresWithEditsEqualOrOlderThan(final long oldestWALseqid,
669       final Map<byte [], Long> regionsToSeqids) {
670     //  This method is static so it can be unit tested the easier.
671     List<byte []> regions = null;
672     for (Map.Entry<byte [], Long> e: regionsToSeqids.entrySet()) {
673       if (e.getValue().longValue() <= oldestWALseqid) {
674         if (regions == null) regions = new ArrayList<byte []>();
675         regions.add(e.getKey());
676       }
677     }
678     return regions == null?
679       null: regions.toArray(new byte [][] {HConstants.EMPTY_BYTE_ARRAY});
680   }
681 
682   /*
683    * @return Logs older than this id are safe to remove.
684    */
685   private Long getOldestOutstandingSeqNum() {
686     return Collections.min(this.lastSeqWritten.values());
687   }
688 
689   /**
690    * @param oldestOutstandingSeqNum
691    * @return (Encoded) name of oldest outstanding region.
692    */
693   private byte [] getOldestRegion(final Long oldestOutstandingSeqNum) {
694     byte [] oldestRegion = null;
695     for (Map.Entry<byte [], Long> e: this.lastSeqWritten.entrySet()) {
696       if (e.getValue().longValue() == oldestOutstandingSeqNum.longValue()) {
697         oldestRegion = e.getKey();
698         break;
699       }
700     }
701     return oldestRegion;
702   }
703 
704   /*
705    * Cleans up current writer closing and adding to outputfiles.
706    * Presumes we're operating inside an updateLock scope.
707    * @return Path to current writer or null if none.
708    * @throws IOException
709    */
710   private Path cleanupCurrentWriter(final long currentfilenum)
711   throws IOException {
712     Path oldFile = null;
713     if (this.writer != null) {
714       // Close the current writer, get a new one.
715       try {
716         this.writer.close();
717       } catch (IOException e) {
718         // Failed close of log file.  Means we're losing edits.  For now,
719         // shut ourselves down to minimize loss.  Alternative is to try and
720         // keep going.  See HBASE-930.
721         FailedLogCloseException flce =
722           new FailedLogCloseException("#" + currentfilenum);
723         flce.initCause(e);
724         throw e;
725       }
726       if (currentfilenum >= 0) {
727         oldFile = computeFilename(currentfilenum);
728         this.outputfiles.put(Long.valueOf(this.logSeqNum.get()), oldFile);
729       }
730     }
731     return oldFile;
732   }
733 
734   private void archiveLogFile(final Path p, final Long seqno) throws IOException {
735     Path newPath = getHLogArchivePath(this.oldLogDir, p);
736     LOG.info("moving old hlog file " + FSUtils.getPath(p) +
737       " whose highest sequenceid is " + seqno + " to " +
738       FSUtils.getPath(newPath));
739     if (!this.fs.rename(p, newPath)) {
740       throw new IOException("Unable to rename " + p + " to " + newPath);
741     }
742   }
743 
744   /**
745    * This is a convenience method that computes a new filename with a given
746    * using the current HLog file-number
747    * @return Path
748    */
749   protected Path computeFilename() {
750     return computeFilename(this.filenum);
751   }
752 
753   /**
754    * This is a convenience method that computes a new filename with a given
755    * file-number.
756    * @param filenum to use
757    * @return Path
758    */
759   protected Path computeFilename(long filenum) {
760     if (filenum < 0) {
761       throw new RuntimeException("hlog file number can't be < 0");
762     }
763     return new Path(dir, prefix + "." + filenum);
764   }
765 
766   /**
767    * Shut down the log and delete the log directory
768    *
769    * @throws IOException
770    */
771   public void closeAndDelete() throws IOException {
772     close();
773     FileStatus[] files = fs.listStatus(this.dir);
774     for(FileStatus file : files) {
775       Path p = getHLogArchivePath(this.oldLogDir, file.getPath());
776       if (!fs.rename(file.getPath(),p)) {
777         throw new IOException("Unable to rename " + file.getPath() + " to " + p);
778       }
779     }
780     LOG.debug("Moved " + files.length + " log files to " +
781         FSUtils.getPath(this.oldLogDir));
782     if (!fs.delete(dir, true)) {
783       LOG.info("Unable to delete " + dir);
784     }
785   }
786 
787   /**
788    * Shut down the log.
789    *
790    * @throws IOException
791    */
792   public void close() throws IOException {
793     try {
794       logSyncerThread.interrupt();
795       // Make sure we synced everything
796       logSyncerThread.join(this.optionalFlushInterval*2);
797     } catch (InterruptedException e) {
798       LOG.error("Exception while waiting for syncer thread to die", e);
799     }
800 
801     cacheFlushLock.lock();
802     try {
803       // Tell our listeners that the log is closing
804       if (!this.listeners.isEmpty()) {
805         for (WALObserver i : this.listeners) {
806           i.logCloseRequested();
807         }
808       }
809       synchronized (updateLock) {
810         this.closed = true;
811         if (LOG.isDebugEnabled()) {
812           LOG.debug("closing hlog writer in " + this.dir.toString());
813         }
814         this.writer.close();
815       }
816     } finally {
817       cacheFlushLock.unlock();
818     }
819   }
820 
821    /** Append an entry to the log.
822    *
823    * @param regionInfo
824    * @param logEdit
825    * @param now Time of this edit write.
826    * @throws IOException
827    */
828   public void append(HRegionInfo regionInfo, WALEdit logEdit,
829     final long now,
830     final boolean isMetaRegion)
831   throws IOException {
832     byte [] regionName = regionInfo.getEncodedNameAsBytes();
833     byte [] tableName = regionInfo.getTableDesc().getName();
834     this.append(regionInfo, makeKey(regionName, tableName, -1, now), logEdit);
835   }
836 
837   /**
838    * @param now
839    * @param regionName
840    * @param tableName
841    * @return New log key.
842    */
843   protected HLogKey makeKey(byte[] regionName, byte[] tableName, long seqnum, long now) {
844     return new HLogKey(regionName, tableName, seqnum, now);
845   }
846 
847 
848 
849   /** Append an entry to the log.
850    *
851    * @param regionInfo
852    * @param logEdit
853    * @param logKey
854    * @throws IOException
855    */
856   public void append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit)
857   throws IOException {
858     if (this.closed) {
859       throw new IOException("Cannot append; log is closed");
860     }
861     synchronized (updateLock) {
862       long seqNum = obtainSeqNum();
863       logKey.setLogSeqNum(seqNum);
864       // The 'lastSeqWritten' map holds the sequence number of the oldest
865       // write for each region (i.e. the first edit added to the particular
866       // memstore). When the cache is flushed, the entry for the
867       // region being flushed is removed if the sequence number of the flush
868       // is greater than or equal to the value in lastSeqWritten.
869       this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(),
870         Long.valueOf(seqNum));
871       doWrite(regionInfo, logKey, logEdit);
872       this.numEntries.incrementAndGet();
873     }
874 
875     // Sync if catalog region, and if not then check if that table supports
876     // deferred log flushing
877     if (regionInfo.isMetaRegion() ||
878         !regionInfo.getTableDesc().isDeferredLogFlush()) {
879       // sync txn to file system
880       this.sync();
881     }
882   }
883 
884   /**
885    * Append a set of edits to the log. Log edits are keyed by (encoded)
886    * regionName, rowname, and log-sequence-id.
887    *
888    * Later, if we sort by these keys, we obtain all the relevant edits for a
889    * given key-range of the HRegion (TODO). Any edits that do not have a
890    * matching COMPLETE_CACHEFLUSH message can be discarded.
891    *
892    * <p>
893    * Logs cannot be restarted once closed, or once the HLog process dies. Each
894    * time the HLog starts, it must create a new log. This means that other
895    * systems should process the log appropriately upon each startup (and prior
896    * to initializing HLog).
897    *
898    * synchronized prevents appends during the completion of a cache flush or for
899    * the duration of a log roll.
900    *
901    * @param info
902    * @param tableName
903    * @param edits
904    * @param now
905    * @throws IOException
906    */
907   public void append(HRegionInfo info, byte [] tableName, WALEdit edits,
908     final long now)
909   throws IOException {
910     if (edits.isEmpty()) return;
911     if (this.closed) {
912       throw new IOException("Cannot append; log is closed");
913     }
914     synchronized (this.updateLock) {
915       long seqNum = obtainSeqNum();
916       // The 'lastSeqWritten' map holds the sequence number of the oldest
917       // write for each region (i.e. the first edit added to the particular
918       // memstore). . When the cache is flushed, the entry for the
919       // region being flushed is removed if the sequence number of the flush
920       // is greater than or equal to the value in lastSeqWritten.
921       // Use encoded name.  Its shorter, guaranteed unique and a subset of
922       // actual  name.
923       byte [] hriKey = info.getEncodedNameAsBytes();
924       this.lastSeqWritten.putIfAbsent(hriKey, seqNum);
925       HLogKey logKey = makeKey(hriKey, tableName, seqNum, now);
926       doWrite(info, logKey, edits);
927       this.numEntries.incrementAndGet();
928     }
929     // Sync if catalog region, and if not then check if that table supports
930     // deferred log flushing
931     if (info.isMetaRegion() ||
932         !info.getTableDesc().isDeferredLogFlush()) {
933       // sync txn to file system
934       this.sync();
935     }
936   }
937 
938   /**
939    * This thread is responsible to call syncFs and buffer up the writers while
940    * it happens.
941    */
942    class LogSyncer extends Thread {
943 
944     private final long optionalFlushInterval;
945 
946     LogSyncer(long optionalFlushInterval) {
947       this.optionalFlushInterval = optionalFlushInterval;
948     }
949 
950     @Override
951     public void run() {
952       try {
953         // awaiting with a timeout doesn't always
954         // throw exceptions on interrupt
955         while(!this.isInterrupted()) {
956 
957           Thread.sleep(this.optionalFlushInterval);
958           sync();
959         }
960       } catch (IOException e) {
961         LOG.error("Error while syncing, requesting close of hlog ", e);
962         requestLogRoll();
963       } catch (InterruptedException e) {
964         LOG.debug(getName() + " interrupted while waiting for sync requests");
965       } finally {
966         LOG.info(getName() + " exiting");
967       }
968     }
969   }
970 
971   @Override
972   public void sync() throws IOException {
973     synchronized (this.updateLock) {
974       if (this.closed) {
975         return;
976       }
977     }
978     try {
979       long now = System.currentTimeMillis();
980       // Done in parallel for all writer threads, thanks to HDFS-895
981       this.writer.sync();
982       synchronized (this.updateLock) {
983         syncTime += System.currentTimeMillis() - now;
984         syncOps++;
985         if (!logRollRequested) {
986           checkLowReplication();
987           if (this.writer.getLength() > this.logrollsize) {
988             requestLogRoll();
989           }
990         }
991       }
992 
993     } catch (IOException e) {
994       LOG.fatal("Could not append. Requesting close of hlog", e);
995       requestLogRoll();
996       throw e;
997     }
998   }
999 
1000   private void checkLowReplication() {
1001     // if the number of replicas in HDFS has fallen below the initial
1002     // value, then roll logs.
1003     try {
1004       int numCurrentReplicas = getLogReplication();
1005       if (numCurrentReplicas != 0 &&
1006           numCurrentReplicas < this.initialReplication) {
1007         LOG.warn("HDFS pipeline error detected. " +
1008             "Found " + numCurrentReplicas + " replicas but expecting " +
1009             this.initialReplication + " replicas. " +
1010             " Requesting close of hlog.");
1011         requestLogRoll();
1012         logRollRequested = true;
1013       }
1014     } catch (Exception e) {
1015       LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
1016           " still proceeding ahead...");
1017     }
1018   }
1019 
1020   /**
1021    * This method gets the datanode replication count for the current HLog.
1022    *
1023    * If the pipeline isn't started yet or is empty, you will get the default
1024    * replication factor.  Therefore, if this function returns 0, it means you
1025    * are not properly running with the HDFS-826 patch.
1026    * @throws InvocationTargetException
1027    * @throws IllegalAccessException
1028    * @throws IllegalArgumentException
1029    *
1030    * @throws Exception
1031    */
1032   int getLogReplication() throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
1033     if(this.getNumCurrentReplicas != null && this.hdfs_out != null) {
1034       Object repl = this.getNumCurrentReplicas.invoke(this.hdfs_out, NO_ARGS);
1035       if (repl instanceof Integer) {
1036         return ((Integer)repl).intValue();
1037       }
1038     }
1039     return 0;
1040   }
1041 
1042   boolean canGetCurReplicas() {
1043     return this.getNumCurrentReplicas != null;
1044   }
1045 
1046   public void hsync() throws IOException {
1047     // Not yet implemented up in hdfs so just call hflush.
1048     sync();
1049   }
1050 
1051   private void requestLogRoll() {
1052     if (!this.listeners.isEmpty()) {
1053       for (WALObserver i: this.listeners) {
1054         i.logRollRequested();
1055       }
1056     }
1057   }
1058 
1059   protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit)
1060   throws IOException {
1061     if (!this.enabled) {
1062       return;
1063     }
1064     if (!this.listeners.isEmpty()) {
1065       for (WALObserver i: this.listeners) {
1066         i.visitLogEntryBeforeWrite(info, logKey, logEdit);
1067       }
1068     }
1069     try {
1070       long now = System.currentTimeMillis();
1071       this.writer.append(new HLog.Entry(logKey, logEdit));
1072       long took = System.currentTimeMillis() - now;
1073       writeTime += took;
1074       writeOps++;
1075       if (took > 1000) {
1076         long len = 0;
1077         for(KeyValue kv : logEdit.getKeyValues()) { 
1078           len += kv.getLength(); 
1079         }
1080         LOG.warn(String.format(
1081           "%s took %d ms appending an edit to hlog; editcount=%d, len~=%s",
1082           Thread.currentThread().getName(), took, this.numEntries.get(), 
1083           StringUtils.humanReadableInt(len)));
1084       }
1085     } catch (IOException e) {
1086       LOG.fatal("Could not append. Requesting close of hlog", e);
1087       requestLogRoll();
1088       throw e;
1089     }
1090   }
1091 
1092   /** @return How many items have been added to the log */
1093   int getNumEntries() {
1094     return numEntries.get();
1095   }
1096 
1097   /**
1098    * Obtain a log sequence number.
1099    */
1100   private long obtainSeqNum() {
1101     return this.logSeqNum.incrementAndGet();
1102   }
1103 
1104   /** @return the number of log files in use */
1105   int getNumLogFiles() {
1106     return outputfiles.size();
1107   }
1108 
1109   /**
1110    * By acquiring a log sequence ID, we can allow log messages to continue while
1111    * we flush the cache.
1112    *
1113    * Acquire a lock so that we do not roll the log between the start and
1114    * completion of a cache-flush. Otherwise the log-seq-id for the flush will
1115    * not appear in the correct logfile.
1116    *
1117    * @return sequence ID to pass {@link #completeCacheFlush(byte[], byte[], long, boolean)}
1118    * (byte[], byte[], long)}
1119    * @see #completeCacheFlush(byte[], byte[], long, boolean)
1120    * @see #abortCacheFlush()
1121    */
1122   public long startCacheFlush() {
1123     this.cacheFlushLock.lock();
1124     return obtainSeqNum();
1125   }
1126 
1127   /**
1128    * Complete the cache flush
1129    *
1130    * Protected by cacheFlushLock
1131    *
1132    * @param encodedRegionName
1133    * @param tableName
1134    * @param logSeqId
1135    * @throws IOException
1136    */
1137   public void completeCacheFlush(final byte [] encodedRegionName,
1138       final byte [] tableName, final long logSeqId, final boolean isMetaRegion)
1139   throws IOException {
1140     try {
1141       if (this.closed) {
1142         return;
1143       }
1144       synchronized (updateLock) {
1145         long now = System.currentTimeMillis();
1146         WALEdit edit = completeCacheFlushLogEdit();
1147         HLogKey key = makeKey(encodedRegionName, tableName, logSeqId,
1148             System.currentTimeMillis());
1149         this.writer.append(new Entry(key, edit));
1150         writeTime += System.currentTimeMillis() - now;
1151         writeOps++;
1152         this.numEntries.incrementAndGet();
1153         Long seq = this.lastSeqWritten.get(encodedRegionName);
1154         if (seq != null && logSeqId >= seq.longValue()) {
1155           this.lastSeqWritten.remove(encodedRegionName);
1156         }
1157       }
1158       // sync txn to file system
1159       this.sync();
1160 
1161     } finally {
1162       this.cacheFlushLock.unlock();
1163     }
1164   }
1165 
1166   private WALEdit completeCacheFlushLogEdit() {
1167     KeyValue kv = new KeyValue(METAROW, METAFAMILY, null,
1168       System.currentTimeMillis(), COMPLETE_CACHE_FLUSH);
1169     WALEdit e = new WALEdit();
1170     e.add(kv);
1171     return e;
1172   }
1173 
1174   /**
1175    * Abort a cache flush.
1176    * Call if the flush fails. Note that the only recovery for an aborted flush
1177    * currently is a restart of the regionserver so the snapshot content dropped
1178    * by the failure gets restored to the memstore.
1179    */
1180   public void abortCacheFlush() {
1181     this.cacheFlushLock.unlock();
1182   }
1183 
1184   /**
1185    * @param family
1186    * @return true if the column is a meta column
1187    */
1188   public static boolean isMetaFamily(byte [] family) {
1189     return Bytes.equals(METAFAMILY, family);
1190   }
1191 
1192   @SuppressWarnings("unchecked")
1193   public static Class<? extends HLogKey> getKeyClass(Configuration conf) {
1194      return (Class<? extends HLogKey>)
1195        conf.getClass("hbase.regionserver.hlog.keyclass", HLogKey.class);
1196   }
1197 
1198   public static HLogKey newKey(Configuration conf) throws IOException {
1199     Class<? extends HLogKey> keyClass = getKeyClass(conf);
1200     try {
1201       return keyClass.newInstance();
1202     } catch (InstantiationException e) {
1203       throw new IOException("cannot create hlog key");
1204     } catch (IllegalAccessException e) {
1205       throw new IOException("cannot create hlog key");
1206     }
1207   }
1208 
1209   /**
1210    * Utility class that lets us keep track of the edit with it's key
1211    * Only used when splitting logs
1212    */
1213   public static class Entry implements Writable {
1214     private WALEdit edit;
1215     private HLogKey key;
1216 
1217     public Entry() {
1218       edit = new WALEdit();
1219       key = new HLogKey();
1220     }
1221 
1222     /**
1223      * Constructor for both params
1224      * @param edit log's edit
1225      * @param key log's key
1226      */
1227     public Entry(HLogKey key, WALEdit edit) {
1228       super();
1229       this.key = key;
1230       this.edit = edit;
1231     }
1232     /**
1233      * Gets the edit
1234      * @return edit
1235      */
1236     public WALEdit getEdit() {
1237       return edit;
1238     }
1239     /**
1240      * Gets the key
1241      * @return key
1242      */
1243     public HLogKey getKey() {
1244       return key;
1245     }
1246 
1247     @Override
1248     public String toString() {
1249       return this.key + "=" + this.edit;
1250     }
1251 
1252     @Override
1253     public void write(DataOutput dataOutput) throws IOException {
1254       this.key.write(dataOutput);
1255       this.edit.write(dataOutput);
1256     }
1257 
1258     @Override
1259     public void readFields(DataInput dataInput) throws IOException {
1260       this.key.readFields(dataInput);
1261       this.edit.readFields(dataInput);
1262     }
1263   }
1264 
1265   /**
1266    * Construct the HLog directory name
1267    *
1268    * @param info HServerInfo for server
1269    * @return the HLog directory name
1270    */
1271   public static String getHLogDirectoryName(HServerInfo info) {
1272     return getHLogDirectoryName(info.getServerName());
1273   }
1274 
1275   /**
1276    * Construct the HLog directory name
1277    *
1278    * @param serverAddress
1279    * @param startCode
1280    * @return the HLog directory name
1281    */
1282   public static String getHLogDirectoryName(String serverAddress,
1283       long startCode) {
1284     if (serverAddress == null || serverAddress.length() == 0) {
1285       return null;
1286     }
1287     return getHLogDirectoryName(
1288         HServerInfo.getServerName(serverAddress, startCode));
1289   }
1290 
1291   /**
1292    * Construct the HLog directory name
1293    *
1294    * @param serverName
1295    * @return the HLog directory name
1296    */
1297   public static String getHLogDirectoryName(String serverName) {
1298     StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME);
1299     dirName.append("/");
1300     dirName.append(serverName);
1301     return dirName.toString();
1302   }
1303 
1304   /**
1305    * Get the directory we are making logs in.
1306    * 
1307    * @return dir
1308    */
1309   protected Path getDir() {
1310     return dir;
1311   }
1312   
1313   public static boolean validateHLogFilename(String filename) {
1314     return pattern.matcher(filename).matches();
1315   }
1316 
1317   static Path getHLogArchivePath(Path oldLogDir, Path p) {
1318     return new Path(oldLogDir, p.getName());
1319   }
1320 
1321   static String formatRecoveredEditsFileName(final long seqid) {
1322     return String.format("%019d", seqid);
1323   }
1324 
1325   /**
1326    * Returns sorted set of edit files made by wal-log splitter.
1327    * @param fs
1328    * @param regiondir
1329    * @return Files in passed <code>regiondir</code> as a sorted set.
1330    * @throws IOException
1331    */
1332   public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs,
1333       final Path regiondir)
1334   throws IOException {
1335     Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
1336     FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
1337       @Override
1338       public boolean accept(Path p) {
1339         boolean result = false;
1340         try {
1341           // Return files and only files that match the editfile names pattern.
1342           // There can be other files in this directory other than edit files.
1343           // In particular, on error, we'll move aside the bad edit file giving
1344           // it a timestamp suffix.  See moveAsideBadEditsFile.
1345           Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
1346           result = fs.isFile(p) && m.matches();
1347         } catch (IOException e) {
1348           LOG.warn("Failed isFile check on " + p);
1349         }
1350         return result;
1351       }
1352     });
1353     NavigableSet<Path> filesSorted = new TreeSet<Path>();
1354     if (files == null) return filesSorted;
1355     for (FileStatus status: files) {
1356       filesSorted.add(status.getPath());
1357     }
1358     return filesSorted;
1359   }
1360 
1361   /**
1362    * Move aside a bad edits file.
1363    * @param fs
1364    * @param edits Edits file to move aside.
1365    * @return The name of the moved aside file.
1366    * @throws IOException
1367    */
1368   public static Path moveAsideBadEditsFile(final FileSystem fs,
1369       final Path edits)
1370   throws IOException {
1371     Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." +
1372       System.currentTimeMillis());
1373     if (!fs.rename(edits, moveAsideName)) {
1374       LOG.warn("Rename failed from " + edits + " to " + moveAsideName);
1375     }
1376     return moveAsideName;
1377   }
1378 
1379   /**
1380    * @param regiondir This regions directory in the filesystem.
1381    * @return The directory that holds recovered edits files for the region
1382    * <code>regiondir</code>
1383    */
1384   public static Path getRegionDirRecoveredEditsDir(final Path regiondir) {
1385     return new Path(regiondir, RECOVERED_EDITS_DIR);
1386   }
1387 
1388   public static final long FIXED_OVERHEAD = ClassSize.align(
1389     ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
1390     ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
1391 
1392   private static void usage() {
1393     System.err.println("Usage: HLog <ARGS>");
1394     System.err.println("Arguments:");
1395     System.err.println(" --dump  Dump textual representation of passed one or more files");
1396     System.err.println("         For example: HLog --dump hdfs://example.com:9000/hbase/.logs/MACHINE/LOGFILE");
1397     System.err.println(" --split Split the passed directory of WAL logs");
1398     System.err.println("         For example: HLog --split hdfs://example.com:9000/hbase/.logs/DIR");
1399   }
1400 
1401   private static void dump(final Configuration conf, final Path p)
1402   throws IOException {
1403     FileSystem fs = FileSystem.get(conf);
1404     if (!fs.exists(p)) {
1405       throw new FileNotFoundException(p.toString());
1406     }
1407     if (!fs.isFile(p)) {
1408       throw new IOException(p + " is not a file");
1409     }
1410     Reader log = getReader(fs, p, conf);
1411     try {
1412       int count = 0;
1413       HLog.Entry entry;
1414       while ((entry = log.next()) != null) {
1415         System.out.println("#" + count + ", pos=" + log.getPosition() + " " +
1416           entry.toString());
1417         count++;
1418       }
1419     } finally {
1420       log.close();
1421     }
1422   }
1423 
1424   private static void split(final Configuration conf, final Path p)
1425   throws IOException {
1426     FileSystem fs = FileSystem.get(conf);
1427     if (!fs.exists(p)) {
1428       throw new FileNotFoundException(p.toString());
1429     }
1430     final Path baseDir = new Path(conf.get(HConstants.HBASE_DIR));
1431     final Path oldLogDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
1432     if (!fs.getFileStatus(p).isDir()) {
1433       throw new IOException(p + " is not a directory");
1434     }
1435 
1436     HLogSplitter logSplitter = HLogSplitter.createLogSplitter(
1437         conf, baseDir, p, oldLogDir, fs);
1438     logSplitter.splitLog();
1439   }
1440 
1441   /**
1442    * Pass one or more log file names and it will either dump out a text version
1443    * on <code>stdout</code> or split the specified log files.
1444    *
1445    * @param args
1446    * @throws IOException
1447    */
1448   public static void main(String[] args) throws IOException {
1449     if (args.length < 2) {
1450       usage();
1451       System.exit(-1);
1452     }
1453     // either dump using the HLogPrettyPrinter or split, depending on args
1454     if (args[0].compareTo("--dump") == 0) {
1455       HLogPrettyPrinter.run(Arrays.copyOfRange(args, 1, args.length));
1456     } else if (args[0].compareTo("--split") == 0) {
1457       Configuration conf = HBaseConfiguration.create();
1458       for (int i = 1; i < args.length; i++) {
1459         try {
1460           conf.set("fs.default.name", args[i]);
1461           conf.set("fs.defaultFS", args[i]);
1462           Path logPath = new Path(args[i]);
1463           split(conf, logPath);
1464         } catch (Throwable t) {
1465           t.printStackTrace(System.err);
1466           System.exit(-1);
1467         }
1468       }
1469     } else {
1470       usage();
1471       System.exit(-1);
1472     }
1473   }
1474 }