View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver.wal;
21  
22  import static org.apache.hadoop.hbase.util.FSUtils.recoverFileLease;
23  
24  import java.io.EOFException;
25  import java.io.FileNotFoundException;
26  import java.io.IOException;
27  import java.lang.reflect.Constructor;
28  import java.lang.reflect.InvocationTargetException;
29  import java.text.ParseException;
30  import java.util.ArrayList;
31  import java.util.Collections;
32  import java.util.LinkedList;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.Set;
36  import java.util.TreeMap;
37  import java.util.TreeSet;
38  import java.util.concurrent.atomic.AtomicReference;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.fs.FileStatus;
44  import org.apache.hadoop.fs.FileSystem;
45  import org.apache.hadoop.fs.Path;
46  import org.apache.hadoop.hbase.io.HeapSize;
47  import org.apache.hadoop.hbase.HConstants;
48  import org.apache.hadoop.hbase.HTableDescriptor;
49  import org.apache.hadoop.hbase.RemoteExceptionHandler;
50  import org.apache.hadoop.hbase.regionserver.HRegion;
51  import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
52  import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
53  import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
54  import org.apache.hadoop.hbase.util.Bytes;
55  import org.apache.hadoop.hbase.util.ClassSize;
56  import org.apache.hadoop.io.MultipleIOException;
57  
58  import com.google.common.base.Preconditions;
59  import com.google.common.collect.Lists;
60  
61  /**
62   * This class is responsible for splitting up a bunch of regionserver commit log
63   * files that are no longer being written to, into new files, one per region for
64   * region to replay on startup. Delete the old log files when finished.
65   */
66  public class HLogSplitter {
67  
68    private static final String LOG_SPLITTER_IMPL = "hbase.hlog.splitter.impl";
69  
70    /**
71     * Name of file that holds recovered edits written by the wal log splitting
72     * code, one per region
73     */
74    public static final String RECOVERED_EDITS = "recovered.edits";
75  
76    
77    static final Log LOG = LogFactory.getLog(HLogSplitter.class);
78  
79    private boolean hasSplit = false;
80    private long splitTime = 0;
81    private long splitSize = 0;
82  
83  
84    // Parameters for split process
85    protected final Path rootDir;
86    protected final Path srcDir;
87    protected final Path oldLogDir;
88    protected final FileSystem fs;
89    protected final Configuration conf;
90    
91    // Major subcomponents of the split process.
92    // These are separated into inner classes to make testing easier.
93    OutputSink outputSink;
94    EntryBuffers entryBuffers;
95  
96    // If an exception is thrown by one of the other threads, it will be
97    // stored here.
98    protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
99  
100   // Wait/notify for when data has been produced by the reader thread,
101   // consumed by the reader thread, or an exception occurred
102   Object dataAvailable = new Object();
103 
104   
105   /**
106    * Create a new HLogSplitter using the given {@link Configuration} and the
107    * <code>hbase.hlog.splitter.impl</code> property to derived the instance
108    * class to use.
109    *
110    * @param conf
111    * @param rootDir hbase directory
112    * @param srcDir logs directory
113    * @param oldLogDir directory where processed logs are archived to
114    * @param fs FileSystem
115    */
116   public static HLogSplitter createLogSplitter(Configuration conf,
117       final Path rootDir, final Path srcDir,
118       Path oldLogDir, final FileSystem fs)  {
119 
120     @SuppressWarnings("unchecked")
121     Class<? extends HLogSplitter> splitterClass = (Class<? extends HLogSplitter>) conf
122         .getClass(LOG_SPLITTER_IMPL, HLogSplitter.class);
123     try {
124        Constructor<? extends HLogSplitter> constructor =
125          splitterClass.getConstructor(
126           Configuration.class, // conf
127           Path.class, // rootDir
128           Path.class, // srcDir
129           Path.class, // oldLogDir
130           FileSystem.class); // fs
131       return constructor.newInstance(conf, rootDir, srcDir, oldLogDir, fs);
132     } catch (IllegalArgumentException e) {
133       throw new RuntimeException(e);
134     } catch (InstantiationException e) {
135       throw new RuntimeException(e);
136     } catch (IllegalAccessException e) {
137       throw new RuntimeException(e);
138     } catch (InvocationTargetException e) {
139       throw new RuntimeException(e);
140     } catch (SecurityException e) {
141       throw new RuntimeException(e);
142     } catch (NoSuchMethodException e) {
143       throw new RuntimeException(e);
144     }
145   }
146 
147   public HLogSplitter(Configuration conf, Path rootDir, Path srcDir,
148       Path oldLogDir, FileSystem fs) {
149     this.conf = conf;
150     this.rootDir = rootDir;
151     this.srcDir = srcDir;
152     this.oldLogDir = oldLogDir;
153     this.fs = fs;
154     
155     entryBuffers = new EntryBuffers(
156         conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
157             128*1024*1024));
158     outputSink = new OutputSink();
159   }
160   
161   /**
162    * Split up a bunch of regionserver commit log files that are no longer being
163    * written to, into new files, one per region for region to replay on startup.
164    * Delete the old log files when finished.
165    * 
166    * @throws IOException will throw if corrupted hlogs aren't tolerated
167    * @return the list of splits
168    */
169   public List<Path> splitLog()
170       throws IOException {
171     Preconditions.checkState(!hasSplit,
172         "An HLogSplitter instance may only be used once");
173     hasSplit = true;
174 
175     long startTime = System.currentTimeMillis();
176     List<Path> splits = null;
177     if (!fs.exists(srcDir)) {
178       // Nothing to do
179       return splits;
180     }
181     FileStatus[] logfiles = fs.listStatus(srcDir);
182     if (logfiles == null || logfiles.length == 0) {
183       // Nothing to do
184       return splits;
185     }
186     LOG.info("Splitting " + logfiles.length + " hlog(s) in "
187         + srcDir.toString());
188     splits = splitLog(logfiles);
189     
190     splitTime = System.currentTimeMillis() - startTime;
191     LOG.info("hlog file splitting completed in " + splitTime +
192         " ms for " + srcDir.toString());
193     return splits;
194   }
195   
196   /**
197    * @return time that this split took
198    */
199   public long getTime() {
200     return this.splitTime;
201   }
202   
203   /**
204    * @return aggregate size of hlogs that were split
205    */
206   public long getSize() {
207     return this.splitSize;
208   }
209 
210   /**
211    * @return a map from encoded region ID to the number of edits written out
212    * for that region.
213    */
214   Map<byte[], Long> getOutputCounts() {
215     Preconditions.checkState(hasSplit);
216     return outputSink.getOutputCounts();
217   }
218    
219   /**
220    * Splits the HLog edits in the given list of logfiles (that are a mix of edits
221    * on multiple regions) by region and then splits them per region directories,
222    * in batches of (hbase.hlog.split.batch.size)
223    * 
224    * This process is split into multiple threads. In the main thread, we loop
225    * through the logs to be split. For each log, we:
226    * <ul>
227    *   <li> Recover it (take and drop HDFS lease) to ensure no other process can write</li>
228    *   <li> Read each edit (see {@link #parseHLog}</li>
229    *   <li> Mark as "processed" or "corrupt" depending on outcome</li>
230    * </ul>
231    * 
232    * Each edit is passed into the EntryBuffers instance, which takes care of
233    * memory accounting and splitting the edits by region.
234    * 
235    * The OutputSink object then manages N other WriterThreads which pull chunks
236    * of edits from EntryBuffers and write them to the output region directories.
237    * 
238    * After the process is complete, the log files are archived to a separate
239    * directory.
240    */
241   private List<Path> splitLog(final FileStatus[] logfiles) throws IOException {
242     List<Path> processedLogs = new ArrayList<Path>();
243     List<Path> corruptedLogs = new ArrayList<Path>();
244     List<Path> splits = null;
245 
246     boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", true);
247 
248     splitSize = 0;
249 
250     outputSink.startWriterThreads(entryBuffers);
251     
252     try {
253       int i = 0;
254       for (FileStatus log : logfiles) {
255        Path logPath = log.getPath();
256         long logLength = log.getLen();
257         splitSize += logLength;
258         LOG.debug("Splitting hlog " + (i++ + 1) + " of " + logfiles.length
259             + ": " + logPath + ", length=" + logLength);
260         try {
261           recoverFileLease(fs, logPath, conf);
262           parseHLog(log, entryBuffers, fs, conf);
263           processedLogs.add(logPath);
264         } catch (EOFException eof) {
265           // truncated files are expected if a RS crashes (see HBASE-2643)
266           LOG.info("EOF from hlog " + logPath + ". Continuing");
267           processedLogs.add(logPath);
268         } catch (FileNotFoundException fnfe) {
269           // A file may be missing if the region server was able to archive it
270           // before shutting down. This means the edits were persisted already
271           LOG.info("A log was missing " + logPath +
272               ", probably because it was moved by the" +
273               " now dead region server. Continuing");
274           processedLogs.add(logPath);
275         } catch (IOException e) {
276           // If the IOE resulted from bad file format,
277           // then this problem is idempotent and retrying won't help
278           if (e.getCause() instanceof ParseException) {
279             LOG.warn("Parse exception from hlog " + logPath + ".  continuing", e);
280             processedLogs.add(logPath);
281           } else {
282             if (skipErrors) {
283               LOG.info("Got while parsing hlog " + logPath +
284                 ". Marking as corrupted", e);
285               corruptedLogs.add(logPath);
286             } else {
287               throw e;
288             }
289           }
290         }
291       }
292       if (fs.listStatus(srcDir).length > processedLogs.size()
293           + corruptedLogs.size()) {
294         throw new OrphanHLogAfterSplitException(
295             "Discovered orphan hlog after split. Maybe the "
296             + "HRegionServer was not dead when we started");
297       }
298       archiveLogs(srcDir, corruptedLogs, processedLogs, oldLogDir, fs, conf);      
299     } finally {
300       splits = outputSink.finishWritingAndClose();
301     }
302     return splits;
303   }
304 
305   /**
306    * Moves processed logs to a oldLogDir after successful processing Moves
307    * corrupted logs (any log that couldn't be successfully parsed to corruptDir
308    * (.corrupt) for later investigation
309    * 
310    * @param corruptedLogs
311    * @param processedLogs
312    * @param oldLogDir
313    * @param fs
314    * @param conf
315    * @throws IOException
316    */
317   private static void archiveLogs(
318       final Path srcDir,
319       final List<Path> corruptedLogs,
320       final List<Path> processedLogs, final Path oldLogDir,
321       final FileSystem fs, final Configuration conf) throws IOException {
322     final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get(
323         "hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"));
324 
325     if (!fs.mkdirs(corruptDir)) {
326       LOG.info("Unable to mkdir " + corruptDir);
327     }
328     fs.mkdirs(oldLogDir);
329 
330     for (Path corrupted : corruptedLogs) {
331       Path p = new Path(corruptDir, corrupted.getName());
332       if (!fs.rename(corrupted, p)) { 
333         LOG.info("Unable to move corrupted log " + corrupted + " to " + p);
334       } else {
335         LOG.info("Moving corrupted log " + corrupted + " to " + p);
336       }
337     }
338 
339     for (Path p : processedLogs) {
340       Path newPath = HLog.getHLogArchivePath(oldLogDir, p);
341       if (!fs.rename(p, newPath)) {
342         LOG.info("Unable to move  " + p + " to " + newPath);
343       } else {
344         LOG.info("Archived processed log " + p + " to " + newPath);
345       }
346     }
347     
348     if (!fs.delete(srcDir, true)) {
349       throw new IOException("Unable to delete src dir: " + srcDir);
350     }
351   }
352 
353   /**
354    * Path to a file under RECOVERED_EDITS_DIR directory of the region found in
355    * <code>logEntry</code> named for the sequenceid in the passed
356    * <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
357    * This method also ensures existence of RECOVERED_EDITS_DIR under the region
358    * creating it if necessary.
359    * @param fs
360    * @param logEntry
361    * @param rootDir HBase root dir.
362    * @return Path to file into which to dump split log edits.
363    * @throws IOException
364    */
365   static Path getRegionSplitEditsPath(final FileSystem fs,
366       final Entry logEntry, final Path rootDir) throws IOException {
367     Path tableDir = HTableDescriptor.getTableDir(rootDir, logEntry.getKey()
368         .getTablename());
369     Path regiondir = HRegion.getRegionDir(tableDir,
370         Bytes.toString(logEntry.getKey().getEncodedRegionName()));
371     if (!fs.exists(regiondir)) {
372       LOG.info("This region's directory doesn't exist: "
373           + regiondir.toString() + ". It is very likely that it was" +
374           " already split so it's safe to discard those edits.");
375       return null;
376     }
377     Path dir = HLog.getRegionDirRecoveredEditsDir(regiondir);
378     if (!fs.exists(dir)) {
379       if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
380     }
381     return new Path(dir, formatRecoveredEditsFileName(logEntry.getKey()
382         .getLogSeqNum()));
383   }
384 
385   static String formatRecoveredEditsFileName(final long seqid) {
386     return String.format("%019d", seqid);
387   }
388   
389   /*
390    * Parse a single hlog and put the edits in @splitLogsMap
391    *
392    * @param logfile to split
393    * @param splitLogsMap output parameter: a map with region names as keys and a
394    * list of edits as values
395    * @param fs the filesystem
396    * @param conf the configuration
397    * @throws IOException if hlog is corrupted, or can't be open
398    */
399   private void parseHLog(final FileStatus logfile,
400 		EntryBuffers entryBuffers, final FileSystem fs,
401     final Configuration conf) 
402 	throws IOException {
403     // Check for possibly empty file. With appends, currently Hadoop reports a
404     // zero length even if the file has been sync'd. Revisit if HDFS-376 or
405     // HDFS-878 is committed.
406     long length = logfile.getLen();
407     if (length <= 0) {
408       LOG.warn("File " + logfile.getPath() + " might be still open, length is 0");
409     }
410     Path path = logfile.getPath();
411     Reader in;
412     int editsCount = 0;
413     try {
414       in = getReader(fs, path, conf);
415     } catch (EOFException e) {
416       if (length <= 0) {
417 	      //TODO should we ignore an empty, not-last log file if skip.errors is false?
418         //Either way, the caller should decide what to do. E.g. ignore if this is the last
419         //log in sequence.
420         //TODO is this scenario still possible if the log has been recovered (i.e. closed)
421         LOG.warn("Could not open " + path + " for reading. File is empty" + e);
422         return;
423       } else {
424         throw e;
425       }
426     }
427     try {
428       Entry entry;
429       while ((entry = in.next()) != null) {
430         entryBuffers.appendEntry(entry);
431         editsCount++;
432       }
433     } catch (InterruptedException ie) {
434       throw new RuntimeException(ie);
435     } finally {
436       LOG.debug("Pushed=" + editsCount + " entries from " + path);
437       try {
438         if (in != null) {
439           in.close();
440         }
441       } catch (IOException e) {
442         LOG.warn("Close log reader in finally threw exception -- continuing",
443                  e);
444       }
445     }
446   }
447 
448   private void writerThreadError(Throwable t) {
449     thrown.compareAndSet(null, t);
450   }
451   
452   /**
453    * Check for errors in the writer threads. If any is found, rethrow it.
454    */
455   private void checkForErrors() throws IOException {
456     Throwable thrown = this.thrown.get();
457     if (thrown == null) return;
458     if (thrown instanceof IOException) {
459       throw (IOException)thrown;
460     } else {
461       throw new RuntimeException(thrown);
462     }
463   }
464   /**
465    * Create a new {@link Writer} for writing log splits.
466    */
467   protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
468       throws IOException {
469     return HLog.createWriter(fs, logfile, conf);
470   }
471 
472   /**
473    * Create a new {@link Reader} for reading logs to split.
474    */
475   protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf)
476       throws IOException {
477     return HLog.getReader(fs, curLogFile, conf);
478   }
479 
480 
481   /**
482    * Class which accumulates edits and separates them into a buffer per region
483    * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
484    * a predefined threshold.
485    * 
486    * Writer threads then pull region-specific buffers from this class.
487    */
488   class EntryBuffers {
489     Map<byte[], RegionEntryBuffer> buffers =
490       new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
491     
492     /* Track which regions are currently in the middle of writing. We don't allow
493        an IO thread to pick up bytes from a region if we're already writing
494        data for that region in a different IO thread. */ 
495     Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
496 
497     long totalBuffered = 0;
498     long maxHeapUsage;
499     
500     EntryBuffers(long maxHeapUsage) {
501       this.maxHeapUsage = maxHeapUsage;
502     }
503 
504     /**
505      * Append a log entry into the corresponding region buffer.
506      * Blocks if the total heap usage has crossed the specified threshold.
507      * 
508      * @throws InterruptedException
509      * @throws IOException 
510      */
511     void appendEntry(Entry entry) throws InterruptedException, IOException {
512       HLogKey key = entry.getKey();
513       
514       RegionEntryBuffer buffer;
515       long incrHeap;
516       synchronized (this) {
517         buffer = buffers.get(key.getEncodedRegionName());
518         if (buffer == null) {
519           buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName());
520           buffers.put(key.getEncodedRegionName(), buffer);
521         }
522         incrHeap= buffer.appendEntry(entry);        
523       }
524 
525       // If we crossed the chunk threshold, wait for more space to be available
526       synchronized (dataAvailable) {
527         totalBuffered += incrHeap;
528         while (totalBuffered > maxHeapUsage && thrown.get() == null) {
529           LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
530           dataAvailable.wait(3000);
531         }
532         dataAvailable.notifyAll();
533       }
534       checkForErrors();
535     }
536 
537     synchronized RegionEntryBuffer getChunkToWrite() {
538       long biggestSize=0;
539       byte[] biggestBufferKey=null;
540 
541       for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
542         long size = entry.getValue().heapSize();
543         if (size > biggestSize && !currentlyWriting.contains(entry.getKey())) {
544           biggestSize = size;
545           biggestBufferKey = entry.getKey();
546         }
547       }
548       if (biggestBufferKey == null) {
549         return null;
550       }
551 
552       RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
553       currentlyWriting.add(biggestBufferKey);
554       return buffer;
555     }
556 
557     void doneWriting(RegionEntryBuffer buffer) {
558       synchronized (this) {
559         boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
560         assert removed;
561       }
562       long size = buffer.heapSize();
563 
564       synchronized (dataAvailable) {
565         totalBuffered -= size;
566         // We may unblock writers
567         dataAvailable.notifyAll();
568       }
569     }
570     
571     synchronized boolean isRegionCurrentlyWriting(byte[] region) {
572       return currentlyWriting.contains(region);
573     }
574   }
575 
576   /**
577    * A buffer of some number of edits for a given region.
578    * This accumulates edits and also provides a memory optimization in order to
579    * share a single byte array instance for the table and region name.
580    * Also tracks memory usage of the accumulated edits.
581    */
582   static class RegionEntryBuffer implements HeapSize {
583     long heapInBuffer = 0;
584     List<Entry> entryBuffer;
585     byte[] tableName;
586     byte[] encodedRegionName;
587 
588     RegionEntryBuffer(byte[] table, byte[] region) {
589       this.tableName = table;
590       this.encodedRegionName = region;
591       this.entryBuffer = new LinkedList<Entry>();
592     }
593 
594     long appendEntry(Entry entry) {
595       internify(entry);
596       entryBuffer.add(entry);
597       long incrHeap = entry.getEdit().heapSize() +
598         ClassSize.align(2 * ClassSize.REFERENCE) + // HLogKey pointers
599         0; // TODO linkedlist entry
600       heapInBuffer += incrHeap;
601       return incrHeap;
602     }
603 
604     private void internify(Entry entry) {
605       HLogKey k = entry.getKey();
606       k.internTableName(this.tableName);
607       k.internEncodedRegionName(this.encodedRegionName);
608     }
609 
610     public long heapSize() {
611       return heapInBuffer;
612     }
613   }
614 
615 
616   class WriterThread extends Thread {
617     private volatile boolean shouldStop = false;
618     
619     WriterThread(int i) {
620       super("WriterThread-" + i);
621     }
622     
623     public void run()  {
624       try {
625         doRun();
626       } catch (Throwable t) {
627         LOG.error("Error in log splitting write thread", t);
628         writerThreadError(t);
629       }
630     }
631     
632     private void doRun() throws IOException {
633       LOG.debug("Writer thread " + this + ": starting");
634       while (true) {
635         RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
636         if (buffer == null) {
637           // No data currently available, wait on some more to show up
638           synchronized (dataAvailable) {
639             if (shouldStop) return;
640             try {
641               dataAvailable.wait(1000);
642             } catch (InterruptedException ie) {
643               if (!shouldStop) {
644                 throw new RuntimeException(ie);
645               }
646             }
647           }
648           continue;
649         }
650         
651         assert buffer != null;
652         try {
653           writeBuffer(buffer);
654         } finally {
655           entryBuffers.doneWriting(buffer);
656         }
657       }
658     }
659        
660     private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
661       List<Entry> entries = buffer.entryBuffer;      
662       if (entries.isEmpty()) {
663         LOG.warn(this.getName() + " got an empty buffer, skipping");
664         return;
665       }
666 
667       WriterAndPath wap = null;
668       
669       long startTime = System.nanoTime();
670       try {
671         int editsCount = 0;
672 
673         for (Entry logEntry : entries) {
674           if (wap == null) {
675             wap = outputSink.getWriterAndPath(logEntry);
676             if (wap == null) {
677               // getWriterAndPath decided we don't need to write these edits
678               // Message was already logged
679               return;
680             }
681           }
682           wap.w.append(logEntry);
683           editsCount++;
684         }
685         // Pass along summary statistics
686         wap.incrementEdits(editsCount);
687         wap.incrementNanoTime(System.nanoTime() - startTime);
688       } catch (IOException e) {
689         e = RemoteExceptionHandler.checkIOException(e);
690         LOG.fatal(this.getName() + " Got while writing log entry to log", e);
691         throw e;
692       }
693     }
694     
695     void finish() {
696       shouldStop = true;
697     }
698   }
699 
700   /**
701    * Class that manages the output streams from the log splitting process.
702    */
703   class OutputSink {
704     private final Map<byte[], WriterAndPath> logWriters = Collections.synchronizedMap(
705           new TreeMap<byte[], WriterAndPath>(Bytes.BYTES_COMPARATOR));
706     private final List<WriterThread> writerThreads = Lists.newArrayList();
707     
708     /* Set of regions which we've decided should not output edits */
709     private final Set<byte[]> blacklistedRegions = Collections.synchronizedSet(
710         new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
711     
712     private boolean hasClosed = false;    
713     
714     /**
715      * Start the threads that will pump data from the entryBuffers
716      * to the output files.
717      * @return the list of started threads
718      */
719     synchronized void startWriterThreads(EntryBuffers entryBuffers) {
720       // More threads could potentially write faster at the expense
721       // of causing more disk seeks as the logs are split.
722       // 3. After a certain setting (probably around 3) the
723       // process will be bound on the reader in the current
724       // implementation anyway.
725       int numThreads = conf.getInt(
726           "hbase.regionserver.hlog.splitlog.writer.threads", 3);
727 
728       for (int i = 0; i < numThreads; i++) {
729         WriterThread t = new WriterThread(i);
730         t.start();
731         writerThreads.add(t);
732       }
733     }
734     
735     List<Path> finishWritingAndClose() throws IOException {
736       LOG.info("Waiting for split writer threads to finish");
737       for (WriterThread t : writerThreads) {
738         t.finish();
739       }
740       for (WriterThread t: writerThreads) {
741         try {
742           t.join();
743         } catch (InterruptedException ie) {
744           throw new IOException(ie);
745         }
746         checkForErrors();
747       }
748       LOG.info("Split writers finished");
749       
750       return closeStreams();
751     }
752 
753     /**
754      * Close all of the output streams.
755      * @return the list of paths written.
756      */
757     private List<Path> closeStreams() throws IOException {
758       Preconditions.checkState(!hasClosed);
759       
760       List<Path> paths = new ArrayList<Path>();
761       List<IOException> thrown = Lists.newArrayList();
762       
763       for (WriterAndPath wap : logWriters.values()) {
764         try {
765           wap.w.close();
766         } catch (IOException ioe) {
767           LOG.error("Couldn't close log at " + wap.p, ioe);
768           thrown.add(ioe);
769           continue;
770         }
771         paths.add(wap.p);
772         LOG.info("Closed path " + wap.p +" (wrote " + wap.editsWritten + " edits in "
773             + (wap.nanosSpent / 1000/ 1000) + "ms)");
774       }
775       if (!thrown.isEmpty()) {
776         throw MultipleIOException.createIOException(thrown);
777       }
778       
779       hasClosed = true;
780       return paths;
781     }
782 
783     /**
784      * Get a writer and path for a log starting at the given entry.
785      * 
786      * This function is threadsafe so long as multiple threads are always
787      * acting on different regions.
788      * 
789      * @return null if this region shouldn't output any logs
790      */
791     WriterAndPath getWriterAndPath(Entry entry) throws IOException {
792     
793       byte region[] = entry.getKey().getEncodedRegionName();
794       WriterAndPath ret = logWriters.get(region);
795       if (ret != null) {
796         return ret;
797       }
798       
799       // If we already decided that this region doesn't get any output
800       // we don't need to check again.
801       if (blacklistedRegions.contains(region)) {
802         return null;
803       }
804       
805       // Need to create writer
806       Path regionedits = getRegionSplitEditsPath(fs,
807           entry, rootDir);
808       if (regionedits == null) {
809         // Edits dir doesn't exist
810         blacklistedRegions.add(region);
811         return null;
812       }
813       deletePreexistingOldEdits(regionedits);
814       Writer w = createWriter(fs, regionedits, conf);
815       ret = new WriterAndPath(regionedits, w);
816       logWriters.put(region, ret);
817       LOG.debug("Creating writer path=" + regionedits + " region="
818           + Bytes.toStringBinary(region));
819 
820       return ret;
821     }
822 
823     /**
824      * If the specified path exists, issue a warning and delete it.
825      */
826     private void deletePreexistingOldEdits(Path regionedits) throws IOException {
827       if (fs.exists(regionedits)) {
828         LOG.warn("Found existing old edits file. It could be the "
829             + "result of a previous failed split attempt. Deleting "
830             + regionedits + ", length="
831             + fs.getFileStatus(regionedits).getLen());
832         if (!fs.delete(regionedits, false)) {
833           LOG.warn("Failed delete of old " + regionedits);
834         }
835       }
836     }
837 
838     /**
839      * @return a map from encoded region ID to the number of edits written out
840      * for that region.
841      */
842     private Map<byte[], Long> getOutputCounts() {
843       TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(
844           Bytes.BYTES_COMPARATOR);
845       synchronized (logWriters) {
846         for (Map.Entry<byte[], WriterAndPath> entry : logWriters.entrySet()) {
847           ret.put(entry.getKey(), entry.getValue().editsWritten);
848         }
849       }
850       return ret;
851     }
852   }
853 
854   /**
855    *  Private data structure that wraps a Writer and its Path,
856    *  also collecting statistics about the data written to this
857    *  output.
858    */
859   private final static class WriterAndPath {
860     final Path p;
861     final Writer w;
862 
863     /* Count of edits written to this path */
864     long editsWritten = 0;
865     /* Number of nanos spent writing to this log */
866     long nanosSpent = 0;
867 
868     WriterAndPath(final Path p, final Writer w) {
869       this.p = p;
870       this.w = w;
871     }
872 
873     void incrementEdits(int edits) {
874       editsWritten += edits;
875     }
876 
877     void incrementNanoTime(long nanos) {
878       nanosSpent += nanos;
879     }
880   }
881 }