View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.List;
28  import java.util.NavigableSet;
29  import java.util.SortedSet;
30  import java.util.concurrent.CopyOnWriteArraySet;
31  import java.util.concurrent.locks.ReentrantReadWriteLock;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FileStatus;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.FileUtil;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hbase.HColumnDescriptor;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HRegionInfo;
43  import org.apache.hadoop.hbase.KeyValue;
44  import org.apache.hadoop.hbase.RemoteExceptionHandler;
45  import org.apache.hadoop.hbase.client.Scan;
46  import org.apache.hadoop.hbase.io.HeapSize;
47  import org.apache.hadoop.hbase.io.hfile.Compression;
48  import org.apache.hadoop.hbase.io.hfile.HFile;
49  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
50  import org.apache.hadoop.hbase.util.Bytes;
51  import org.apache.hadoop.hbase.util.ClassSize;
52  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
53  import org.apache.hadoop.util.StringUtils;
54  
55  import com.google.common.collect.ImmutableList;
56  import com.google.common.collect.Iterables;
57  
58  /**
59   * A Store holds a column family in a Region.  Its a memstore and a set of zero
60   * or more StoreFiles, which stretch backwards over time.
61   *
62   * <p>There's no reason to consider append-logging at this level; all logging
63   * and locking is handled at the HRegion level.  Store just provides
64   * services to manage sets of StoreFiles.  One of the most important of those
65   * services is compaction services where files are aggregated once they pass
66   * a configurable threshold.
67   *
68   * <p>The only thing having to do with logs that Store needs to deal with is
69   * the reconstructionLog.  This is a segment of an HRegion's log that might
70   * NOT be present upon startup.  If the param is NULL, there's nothing to do.
71   * If the param is non-NULL, we need to process the log to reconstruct
72   * a TreeMap that might not have been written to disk before the process
73   * died.
74   *
75   * <p>It's assumed that after this constructor returns, the reconstructionLog
76   * file will be deleted (by whoever has instantiated the Store).
77   *
78   * <p>Locking and transactions are handled at a higher level.  This API should
79   * not be called directly but by an HRegion manager.
80   */
81  public class Store implements HeapSize {
82    static final Log LOG = LogFactory.getLog(Store.class);
83    protected final MemStore memstore;
84    // This stores directory in the filesystem.
85    private final Path homedir;
86    private final HRegion region;
87    private final HColumnDescriptor family;
88    final FileSystem fs;
89    final Configuration conf;
90    // ttl in milliseconds.
91    protected long ttl;
92    private long majorCompactionTime;
93    private final int maxFilesToCompact;
94    private final long minCompactSize;
95    // compactRatio: double on purpose!  Float.MAX < Long.MAX < Double.MAX
96    // With float, java will downcast your long to float for comparisons (bad)
97    private double compactRatio;
98    private long lastCompactSize = 0;
99    /* how many bytes to write between status checks */
100   static int closeCheckInterval = 0;
101   private final long desiredMaxFileSize;
102   private final int blockingStoreFileCount;
103   private volatile long storeSize = 0L;
104   private final Object flushLock = new Object();
105   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
106   private final String storeNameStr;
107   private final boolean inMemory;
108 
109   /*
110    * List of store files inside this store. This is an immutable list that
111    * is atomically replaced when its contents change.
112    */
113   private ImmutableList<StoreFile> storefiles = null;
114 
115 
116   // All access must be synchronized.
117   private final CopyOnWriteArraySet<ChangedReadersObserver> changedReaderObservers =
118     new CopyOnWriteArraySet<ChangedReadersObserver>();
119 
120   private final Object compactLock = new Object();
121   private final int compactionThreshold;
122   private final int blocksize;
123   private final boolean blockcache;
124   /** Compression algorithm for flush files and minor compaction */
125   private final Compression.Algorithm compression;
126   /** Compression algorithm for major compaction */
127   private final Compression.Algorithm compactionCompression;
128 
129   // Comparing KeyValues
130   final KeyValue.KVComparator comparator;
131 
132   /**
133    * Constructor
134    * @param basedir qualified path under which the region directory lives;
135    * generally the table subdirectory
136    * @param region
137    * @param family HColumnDescriptor for this column
138    * @param fs file system object
139    * @param conf configuration object
140    * failed.  Can be null.
141    * @throws IOException
142    */
143   protected Store(Path basedir, HRegion region, HColumnDescriptor family,
144     FileSystem fs, Configuration conf)
145   throws IOException {
146     HRegionInfo info = region.regionInfo;
147     this.fs = fs;
148     this.homedir = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
149     if (!this.fs.exists(this.homedir)) {
150       if (!this.fs.mkdirs(this.homedir))
151         throw new IOException("Failed create of: " + this.homedir.toString());
152     }
153     this.region = region;
154     this.family = family;
155     this.conf = conf;
156     this.blockcache = family.isBlockCacheEnabled();
157     this.blocksize = family.getBlocksize();
158     this.compression = family.getCompression();
159     // avoid overriding compression setting for major compactions if the user
160     // has not specified it separately
161     this.compactionCompression =
162       (family.getCompactionCompression() != Compression.Algorithm.NONE) ?
163         family.getCompactionCompression() : this.compression;
164     this.comparator = info.getComparator();
165     // getTimeToLive returns ttl in seconds.  Convert to milliseconds.
166     this.ttl = family.getTimeToLive();
167     if (ttl == HConstants.FOREVER) {
168       // default is unlimited ttl.
169       ttl = Long.MAX_VALUE;
170     } else if (ttl == -1) {
171       ttl = Long.MAX_VALUE;
172     } else {
173       // second -> ms adjust for user data
174       this.ttl *= 1000;
175     }
176     this.memstore = new MemStore(conf, this.comparator);
177     this.storeNameStr = Bytes.toString(this.family.getName());
178 
179     // By default, we compact if an HStore has more than
180     // MIN_COMMITS_FOR_COMPACTION map files
181     this.compactionThreshold = Math.max(2,
182       conf.getInt("hbase.hstore.compactionThreshold", 3));
183 
184     // Check if this is in-memory store
185     this.inMemory = family.isInMemory();
186 
187     // By default we split region if a file > HConstants.DEFAULT_MAX_FILE_SIZE.
188     long maxFileSize = info.getTableDesc().getMaxFileSize();
189     if (maxFileSize == HConstants.DEFAULT_MAX_FILE_SIZE) {
190       maxFileSize = conf.getLong("hbase.hregion.max.filesize",
191         HConstants.DEFAULT_MAX_FILE_SIZE);
192     }
193     this.desiredMaxFileSize = maxFileSize;
194     this.blockingStoreFileCount =
195       conf.getInt("hbase.hstore.blockingStoreFiles", 7);
196 
197     this.majorCompactionTime = getNextMajorCompactTime();
198 
199     this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
200     this.minCompactSize = conf.getLong("hbase.hstore.compaction.min.size",
201         this.region.memstoreFlushSize);
202     this.compactRatio = conf.getFloat("hbase.hstore.compaction.ratio", 1.2F);
203 
204     if (Store.closeCheckInterval == 0) {
205       Store.closeCheckInterval = conf.getInt(
206           "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
207     }
208     this.storefiles = sortAndClone(loadStoreFiles());
209   }
210 
211   public HColumnDescriptor getFamily() {
212     return this.family;
213   }
214 
215   /**
216    * @return The maximum sequence id in all store files.
217    */
218   long getMaxSequenceId() {
219     return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
220   }
221 
222   /**
223    * @param tabledir
224    * @param encodedName Encoded region name.
225    * @param family
226    * @return Path to family/Store home directory.
227    */
228   public static Path getStoreHomedir(final Path tabledir,
229       final String encodedName, final byte [] family) {
230     return new Path(tabledir, new Path(encodedName,
231       new Path(Bytes.toString(family))));
232   }
233 
234   /**
235    * Return the directory in which this store stores its
236    * StoreFiles
237    */
238   public Path getHomedir() {
239     return homedir;
240   }
241 
242   /*
243    * Creates an unsorted list of StoreFile loaded from the given directory.
244    * @throws IOException
245    */
246   private List<StoreFile> loadStoreFiles()
247   throws IOException {
248     ArrayList<StoreFile> results = new ArrayList<StoreFile>();
249     FileStatus files[] = this.fs.listStatus(this.homedir);
250     for (int i = 0; files != null && i < files.length; i++) {
251       // Skip directories.
252       if (files[i].isDir()) {
253         continue;
254       }
255       Path p = files[i].getPath();
256       // Check for empty file.  Should never be the case but can happen
257       // after data loss in hdfs for whatever reason (upgrade, etc.): HBASE-646
258       if (this.fs.getFileStatus(p).getLen() <= 0) {
259         LOG.warn("Skipping " + p + " because its empty. HBASE-646 DATA LOSS?");
260         continue;
261       }
262       StoreFile curfile = null;
263       try {
264         curfile = new StoreFile(fs, p, blockcache, this.conf,
265             this.family.getBloomFilterType(), this.inMemory);
266         curfile.createReader();
267       } catch (IOException ioe) {
268         LOG.warn("Failed open of " + p + "; presumption is that file was " +
269           "corrupted at flush and lost edits picked up by commit log replay. " +
270           "Verify!", ioe);
271         continue;
272       }
273       long length = curfile.getReader().length();
274       this.storeSize += length;
275       if (LOG.isDebugEnabled()) {
276         LOG.debug("loaded " + curfile.toStringDetailed());
277       }
278       results.add(curfile);
279     }
280     return results;
281   }
282 
283   /**
284    * Adds a value to the memstore
285    *
286    * @param kv
287    * @return memstore size delta
288    */
289   protected long add(final KeyValue kv) {
290     lock.readLock().lock();
291     try {
292       return this.memstore.add(kv);
293     } finally {
294       lock.readLock().unlock();
295     }
296   }
297 
298   /**
299    * Adds a value to the memstore
300    *
301    * @param kv
302    * @return memstore size delta
303    */
304   protected long delete(final KeyValue kv) {
305     lock.readLock().lock();
306     try {
307       return this.memstore.delete(kv);
308     } finally {
309       lock.readLock().unlock();
310     }
311   }
312 
313   /**
314    * @return All store files.
315    */
316   List<StoreFile> getStorefiles() {
317     return this.storefiles;
318   }
319 
320   public void bulkLoadHFile(String srcPathStr) throws IOException {
321     Path srcPath = new Path(srcPathStr);
322 
323     HFile.Reader reader  = null;
324     try {
325       LOG.info("Validating hfile at " + srcPath + " for inclusion in "
326           + "store " + this + " region " + this.region);
327       reader = new HFile.Reader(srcPath.getFileSystem(conf),
328           srcPath, null, false);
329       reader.loadFileInfo();
330 
331       byte[] firstKey = reader.getFirstRowKey();
332       byte[] lk = reader.getLastKey();
333       byte[] lastKey =
334           (lk == null) ? null :
335               KeyValue.createKeyValueFromKey(lk).getRow();
336 
337       LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) +
338           " last=" + Bytes.toStringBinary(lastKey));
339       LOG.debug("Region bounds: first=" +
340           Bytes.toStringBinary(region.getStartKey()) +
341           " last=" + Bytes.toStringBinary(region.getEndKey()));
342 
343       HRegionInfo hri = region.getRegionInfo();
344       if (!hri.containsRange(firstKey, lastKey)) {
345         throw new WrongRegionException(
346             "Bulk load file " + srcPathStr + " does not fit inside region "
347             + this.region);
348       }
349     } finally {
350       if (reader != null) reader.close();
351     }
352 
353     // Move the file if it's on another filesystem
354     FileSystem srcFs = srcPath.getFileSystem(conf);
355     if (!srcFs.equals(fs)) {
356       LOG.info("File " + srcPath + " on different filesystem than " +
357           "destination store - moving to this filesystem.");
358       Path tmpPath = getTmpPath();
359       FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, conf);
360       LOG.info("Copied to temporary path on dst filesystem: " + tmpPath);
361       srcPath = tmpPath;
362     }
363 
364     Path dstPath = StoreFile.getRandomFilename(fs, homedir);
365     LOG.info("Renaming bulk load file " + srcPath + " to " + dstPath);
366     StoreFile.rename(fs, srcPath, dstPath);
367 
368     StoreFile sf = new StoreFile(fs, dstPath, blockcache,
369         this.conf, this.family.getBloomFilterType(), this.inMemory);
370     sf.createReader();
371 
372     LOG.info("Moved hfile " + srcPath + " into store directory " +
373         homedir + " - updating store file list.");
374 
375     // Append the new storefile into the list
376     this.lock.writeLock().lock();
377     try {
378       ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles);
379       newFiles.add(sf);
380       this.storefiles = sortAndClone(newFiles);
381       notifyChangedReadersObservers();
382     } finally {
383       this.lock.writeLock().unlock();
384     }
385     LOG.info("Successfully loaded store file " + srcPath
386         + " into store " + this + " (new location: " + dstPath + ")");
387   }
388 
389   /**
390    * Get a temporary path in this region. These temporary files
391    * will get cleaned up when the region is re-opened if they are
392    * still around.
393    */
394   private Path getTmpPath() throws IOException {
395     return StoreFile.getRandomFilename(
396         fs, region.getTmpDir());
397   }
398 
399   /**
400    * Close all the readers
401    *
402    * We don't need to worry about subsequent requests because the HRegion holds
403    * a write lock that will prevent any more reads or writes.
404    *
405    * @throws IOException
406    */
407   ImmutableList<StoreFile> close() throws IOException {
408     this.lock.writeLock().lock();
409     try {
410       ImmutableList<StoreFile> result = storefiles;
411 
412       // Clear so metrics doesn't find them.
413       storefiles = ImmutableList.of();
414 
415       for (StoreFile f: result) {
416         f.closeReader();
417       }
418       LOG.debug("closed " + this.storeNameStr);
419       return result;
420     } finally {
421       this.lock.writeLock().unlock();
422     }
423   }
424 
425   /**
426    * Snapshot this stores memstore.  Call before running
427    * {@link #flushCache(long, SortedSet<KeyValue>)} so it has some work to do.
428    */
429   void snapshot() {
430     this.memstore.snapshot();
431   }
432 
433   /**
434    * Write out current snapshot.  Presumes {@link #snapshot()} has been called
435    * previously.
436    * @param logCacheFlushId flush sequence number
437    * @param snapshot
438    * @param snapshotTimeRangeTracker
439    * @return true if a compaction is needed
440    * @throws IOException
441    */
442   private StoreFile flushCache(final long logCacheFlushId,
443       SortedSet<KeyValue> snapshot,
444       TimeRangeTracker snapshotTimeRangeTracker) throws IOException {
445     // If an exception happens flushing, we let it out without clearing
446     // the memstore snapshot.  The old snapshot will be returned when we say
447     // 'snapshot', the next time flush comes around.
448     return internalFlushCache(snapshot, logCacheFlushId, snapshotTimeRangeTracker);
449   }
450 
451   /*
452    * @param cache
453    * @param logCacheFlushId
454    * @return StoreFile created.
455    * @throws IOException
456    */
457   private StoreFile internalFlushCache(final SortedSet<KeyValue> set,
458       final long logCacheFlushId,
459       TimeRangeTracker snapshotTimeRangeTracker)
460       throws IOException {
461     StoreFile.Writer writer = null;
462     long flushed = 0;
463     // Don't flush if there are no entries.
464     if (set.size() == 0) {
465       return null;
466     }
467     long oldestTimestamp = System.currentTimeMillis() - ttl;
468     // TODO:  We can fail in the below block before we complete adding this
469     // flush to list of store files.  Add cleanup of anything put on filesystem
470     // if we fail.
471     synchronized (flushLock) {
472       // A. Write the map out to the disk
473       writer = createWriterInTmp(set.size());
474       writer.setTimeRangeTracker(snapshotTimeRangeTracker);
475       int entries = 0;
476       try {
477         for (KeyValue kv: set) {
478           if (!isExpired(kv, oldestTimestamp)) {
479             writer.append(kv);
480             entries++;
481             flushed += this.memstore.heapSizeChange(kv, true);
482           }
483         }
484       } finally {
485         // Write out the log sequence number that corresponds to this output
486         // hfile.  The hfile is current up to and including logCacheFlushId.
487         writer.appendMetadata(logCacheFlushId, false);
488         writer.close();
489       }
490     }
491 
492     // Write-out finished successfully, move into the right spot
493     Path dstPath = StoreFile.getUniqueFile(fs, homedir);
494     LOG.info("Renaming flushed file at " + writer.getPath() + " to " + dstPath);
495     if (!fs.rename(writer.getPath(), dstPath)) {
496       LOG.warn("Unable to rename " + writer.getPath() + " to " + dstPath);
497     }
498 
499     StoreFile sf = new StoreFile(this.fs, dstPath, blockcache,
500       this.conf, this.family.getBloomFilterType(), this.inMemory);
501     StoreFile.Reader r = sf.createReader();
502     this.storeSize += r.length();
503     if(LOG.isInfoEnabled()) {
504       LOG.info("Added " + sf + ", entries=" + r.getEntries() +
505         ", sequenceid=" + logCacheFlushId +
506         ", memsize=" + StringUtils.humanReadableInt(flushed) +
507         ", filesize=" + StringUtils.humanReadableInt(r.length()));
508     }
509     return sf;
510   }
511 
512   /*
513    * @param maxKeyCount
514    * @return Writer for a new StoreFile in the tmp dir.
515    */
516   private StoreFile.Writer createWriterInTmp(int maxKeyCount)
517   throws IOException {
518     return createWriterInTmp(maxKeyCount, this.compression);
519   }
520 
521   /*
522    * @param maxKeyCount
523    * @param compression Compression algorithm to use
524    * @return Writer for a new StoreFile in the tmp dir.
525    */
526   private StoreFile.Writer createWriterInTmp(int maxKeyCount,
527     Compression.Algorithm compression)
528   throws IOException {
529     return StoreFile.createWriter(this.fs, region.getTmpDir(), this.blocksize,
530         compression, this.comparator, this.conf,
531         this.family.getBloomFilterType(), maxKeyCount);
532   }
533 
534   /*
535    * Change storefiles adding into place the Reader produced by this new flush.
536    * @param sf
537    * @param set That was used to make the passed file <code>p</code>.
538    * @throws IOException
539    * @return Whether compaction is required.
540    */
541   private boolean updateStorefiles(final StoreFile sf,
542                                    final SortedSet<KeyValue> set)
543   throws IOException {
544     this.lock.writeLock().lock();
545     try {
546       ArrayList<StoreFile> newList = new ArrayList<StoreFile>(storefiles);
547       newList.add(sf);
548       storefiles = sortAndClone(newList);
549       this.memstore.clearSnapshot(set);
550 
551       // Tell listeners of the change in readers.
552       notifyChangedReadersObservers();
553 
554       return this.storefiles.size() >= this.compactionThreshold;
555     } finally {
556       this.lock.writeLock().unlock();
557     }
558   }
559 
560   /*
561    * Notify all observers that set of Readers has changed.
562    * @throws IOException
563    */
564   private void notifyChangedReadersObservers() throws IOException {
565     for (ChangedReadersObserver o: this.changedReaderObservers) {
566       o.updateReaders();
567     }
568   }
569 
570   /*
571    * @param o Observer who wants to know about changes in set of Readers
572    */
573   void addChangedReaderObserver(ChangedReadersObserver o) {
574     this.changedReaderObservers.add(o);
575   }
576 
577   /*
578    * @param o Observer no longer interested in changes in set of Readers.
579    */
580   void deleteChangedReaderObserver(ChangedReadersObserver o) {
581     // We don't check if observer present; it may not be (legitimately)
582     this.changedReaderObservers.remove(o);
583   }
584 
585   //////////////////////////////////////////////////////////////////////////////
586   // Compaction
587   //////////////////////////////////////////////////////////////////////////////
588 
589   /**
590    * Compact the StoreFiles.  This method may take some time, so the calling
591    * thread must be able to block for long periods.
592    *
593    * <p>During this time, the Store can work as usual, getting values from
594    * StoreFiles and writing new StoreFiles from the memstore.
595    *
596    * Existing StoreFiles are not destroyed until the new compacted StoreFile is
597    * completely written-out to disk.
598    *
599    * <p>The compactLock prevents multiple simultaneous compactions.
600    * The structureLock prevents us from interfering with other write operations.
601    *
602    * <p>We don't want to hold the structureLock for the whole time, as a compact()
603    * can be lengthy and we want to allow cache-flushes during this period.
604    *
605    * @param forceMajor True to force a major compaction regardless of thresholds
606    * @return row to split around if a split is needed, null otherwise
607    * @throws IOException
608    */
609   StoreSize compact(final boolean forceMajor) throws IOException {
610     boolean forceSplit = this.region.shouldForceSplit();
611     boolean majorcompaction = forceMajor;
612     synchronized (compactLock) {
613       this.lastCompactSize = 0;
614 
615       // filesToCompact are sorted oldest to newest.
616       List<StoreFile> filesToCompact = this.storefiles;
617       if (filesToCompact.isEmpty()) {
618         LOG.debug(this.storeNameStr + ": no store files to compact");
619         return null;
620       }
621 
622       // Check to see if we need to do a major compaction on this region.
623       // If so, change doMajorCompaction to true to skip the incremental
624       // compacting below. Only check if doMajorCompaction is not true.
625       if (!majorcompaction) {
626         majorcompaction = isMajorCompaction(filesToCompact);
627       }
628 
629       boolean references = hasReferences(filesToCompact);
630       if (!majorcompaction && !references &&
631           (forceSplit || (filesToCompact.size() < compactionThreshold))) {
632         return checkSplit(forceSplit);
633       }
634 
635       /* get store file sizes for incremental compacting selection.
636        * normal skew:
637        *
638        *         older ----> newer
639        *     _
640        *    | |   _
641        *    | |  | |   _
642        *  --|-|- |-|- |-|---_-------_-------  minCompactSize
643        *    | |  | |  | |  | |  _  | |
644        *    | |  | |  | |  | | | | | |
645        *    | |  | |  | |  | | | | | |
646        */
647       int countOfFiles = filesToCompact.size();
648       long [] fileSizes = new long[countOfFiles];
649       long [] sumSize = new long[countOfFiles];
650       for (int i = countOfFiles-1; i >= 0; --i) {
651         StoreFile file = filesToCompact.get(i);
652         Path path = file.getPath();
653         if (path == null) {
654           LOG.error("Path is null for " + file);
655           return null;
656         }
657         StoreFile.Reader r = file.getReader();
658         if (r == null) {
659           LOG.error("StoreFile " + file + " has a null Reader");
660           return null;
661         }
662         fileSizes[i] = file.getReader().length();
663         // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
664         int tooFar = i + this.maxFilesToCompact - 1;
665         sumSize[i] = fileSizes[i]
666                    + ((i+1    < countOfFiles) ? sumSize[i+1]      : 0)
667                    - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
668       }
669 
670       long totalSize = 0;
671       if (!majorcompaction && !references) {
672         // we're doing a minor compaction, let's see what files are applicable
673         int start = 0;
674         double r = this.compactRatio;
675 
676         /* Start at the oldest file and stop when you find the first file that
677          * meets compaction criteria:
678          *   (1) a recently-flushed, small file (i.e. <= minCompactSize)
679          *      OR
680          *   (2) within the compactRatio of sum(newer_files)
681          * Given normal skew, any newer files will also meet this criteria
682          *
683          * Additional Note:
684          * If fileSizes.size() >> maxFilesToCompact, we will recurse on
685          * compact().  Consider the oldest files first to avoid a
686          * situation where we always compact [end-threshold,end).  Then, the
687          * last file becomes an aggregate of the previous compactions.
688          */
689         while(countOfFiles - start >= this.compactionThreshold &&
690               fileSizes[start] >
691                 Math.max(minCompactSize, (long)(sumSize[start+1] * r))) {
692           ++start;
693         }
694         int end = Math.min(countOfFiles, start + this.maxFilesToCompact);
695         totalSize = fileSizes[start]
696                   + ((start+1 < countOfFiles) ? sumSize[start+1] : 0);
697 
698         // if we don't have enough files to compact, just wait
699         if (end - start < this.compactionThreshold) {
700           if (LOG.isDebugEnabled()) {
701             LOG.debug("Skipped compaction of " + this.storeNameStr
702               + " because only " + (end - start) + " file(s) of size "
703               + StringUtils.humanReadableInt(totalSize)
704               + " meet compaction criteria.");
705           }
706           return checkSplit(forceSplit);
707         }
708 
709         if (0 == start && end == countOfFiles) {
710           // we decided all the files were candidates! major compact
711           majorcompaction = true;
712         } else {
713           filesToCompact = new ArrayList<StoreFile>(filesToCompact.subList(start,
714             end));
715         }
716       } else {
717         // all files included in this compaction
718         for (long i : fileSizes) {
719           totalSize += i;
720         }
721       }
722       this.lastCompactSize = totalSize;
723 
724       // Max-sequenceID is the last key in the files we're compacting
725       long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
726 
727       // Ready to go.  Have list of files to compact.
728       LOG.info("Started compaction of " + filesToCompact.size() + " file(s) in cf=" +
729           this.storeNameStr +
730         (references? ", hasReferences=true,": " ") + " into " +
731           region.getTmpDir() + ", seqid=" + maxId +
732           ", totalSize=" + StringUtils.humanReadableInt(totalSize));
733       StoreFile.Writer writer = compact(filesToCompact, majorcompaction, maxId);
734       // Move the compaction into place.
735       StoreFile sf = completeCompaction(filesToCompact, writer);
736       if (LOG.isInfoEnabled()) {
737         LOG.info("Completed" + (majorcompaction? " major ": " ") +
738           "compaction of " + filesToCompact.size() +
739           " file(s), new file=" + (sf == null? "none": sf.toString()) +
740           ", size=" + (sf == null? "none": StringUtils.humanReadableInt(sf.getReader().length())) +
741           "; total size for store is " + StringUtils.humanReadableInt(storeSize));
742       }
743     }
744     return checkSplit(forceSplit);
745   }
746 
747   /*
748    * Compact the most recent N files. Essentially a hook for testing.
749    */
750   protected void compactRecent(int N) throws IOException {
751     synchronized(compactLock) {
752       List<StoreFile> filesToCompact = this.storefiles;
753       int count = filesToCompact.size();
754       if (N > count) {
755         throw new RuntimeException("Not enough files");
756       }
757 
758       filesToCompact = new ArrayList<StoreFile>(filesToCompact.subList(count-N, count));
759       long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
760       boolean majorcompaction = (N == count);
761 
762       // Ready to go.  Have list of files to compact.
763       StoreFile.Writer writer = compact(filesToCompact, majorcompaction, maxId);
764       // Move the compaction into place.
765       StoreFile sf = completeCompaction(filesToCompact, writer);
766     }
767   }
768 
769   /*
770    * @param files
771    * @return True if any of the files in <code>files</code> are References.
772    */
773   private boolean hasReferences(Collection<StoreFile> files) {
774     if (files != null && files.size() > 0) {
775       for (StoreFile hsf: files) {
776         if (hsf.isReference()) {
777           return true;
778         }
779       }
780     }
781     return false;
782   }
783 
784   /*
785    * Gets lowest timestamp from files in a dir
786    *
787    * @param fs
788    * @param dir
789    * @throws IOException
790    */
791   private static long getLowestTimestamp(FileSystem fs, Path dir) throws IOException {
792     FileStatus[] stats = fs.listStatus(dir);
793     if (stats == null || stats.length == 0) {
794       return 0l;
795     }
796     long lowTimestamp = Long.MAX_VALUE;
797     for (int i = 0; i < stats.length; i++) {
798       long timestamp = stats[i].getModificationTime();
799       if (timestamp < lowTimestamp){
800         lowTimestamp = timestamp;
801       }
802     }
803     return lowTimestamp;
804   }
805 
806   /*
807    * @return True if we should run a major compaction.
808    */
809   boolean isMajorCompaction() throws IOException {
810     return isMajorCompaction(storefiles);
811   }
812 
813   /*
814    * @param filesToCompact Files to compact. Can be null.
815    * @return True if we should run a major compaction.
816    */
817   private boolean isMajorCompaction(final List<StoreFile> filesToCompact) throws IOException {
818     boolean result = false;
819     if (filesToCompact == null || filesToCompact.isEmpty() ||
820         majorCompactionTime == 0) {
821       return result;
822     }
823     // TODO: Use better method for determining stamp of last major (HBASE-2990)
824     long lowTimestamp = getLowestTimestamp(fs,
825       filesToCompact.get(0).getPath().getParent());
826     long now = System.currentTimeMillis();
827     if (lowTimestamp > 0l && lowTimestamp < (now - this.majorCompactionTime)) {
828       // Major compaction time has elapsed.
829       if (filesToCompact.size() == 1) {
830         // Single file
831         StoreFile sf = filesToCompact.get(0);
832         long oldest =
833             (sf.getReader().timeRangeTracker == null) ?
834                 Long.MIN_VALUE :
835                 now - sf.getReader().timeRangeTracker.minimumTimestamp;
836         if (sf.isMajorCompaction() &&
837             (this.ttl == HConstants.FOREVER || oldest < this.ttl)) {
838           if (LOG.isDebugEnabled()) {
839             LOG.debug("Skipping major compaction of " + this.storeNameStr +
840                 " because one (major) compacted file only and oldestTime " +
841                 oldest + "ms is < ttl=" + this.ttl);
842           }
843         }
844         if(this.ttl != HConstants.FOREVER && oldest > this.ttl){
845             LOG.debug("Major compaction triggered on store " + this.storeNameStr +
846                 ", because keyvalues outdated; time since last major compaction " + 
847                 (now - lowTimestamp) + "ms");
848         	result = true;
849         }
850       } else {
851         if (LOG.isDebugEnabled()) {
852           LOG.debug("Major compaction triggered on store " + this.storeNameStr +
853             "; time since last major compaction " + (now - lowTimestamp) + "ms");
854         }
855         result = true;
856         this.majorCompactionTime = getNextMajorCompactTime();
857       }
858     }
859     return result;
860   }
861 
862   long getNextMajorCompactTime() {
863     // default = 24hrs
864     long ret = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
865     if (family.getValue(HConstants.MAJOR_COMPACTION_PERIOD) != null) {
866       String strCompactionTime =
867         family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
868       ret = (new Long(strCompactionTime)).longValue();
869     }
870 
871     if (ret > 0) {
872       // default = 20% = +/- 4.8 hrs
873       double jitterPct =  conf.getFloat("hbase.hregion.majorcompaction.jitter",
874           0.20F);
875       if (jitterPct > 0) {
876         long jitter = Math.round(ret * jitterPct);
877         ret += jitter - Math.round(2L * jitter * Math.random());
878       }
879     }
880     return ret;
881   }
882 
883   /**
884    * Do a minor/major compaction.  Uses the scan infrastructure to make it easy.
885    *
886    * @param filesToCompact which files to compact
887    * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
888    * @param maxId Readers maximum sequence id.
889    * @return Product of compaction or null if all cells expired or deleted and
890    * nothing made it through the compaction.
891    * @throws IOException
892    */
893   private StoreFile.Writer compact(final List<StoreFile> filesToCompact,
894                                final boolean majorCompaction, final long maxId)
895       throws IOException {
896     // calculate maximum key count after compaction (for blooms)
897     int maxKeyCount = 0;
898     for (StoreFile file : filesToCompact) {
899       StoreFile.Reader r = file.getReader();
900       if (r != null) {
901         // NOTE: getFilterEntries could cause under-sized blooms if the user
902         //       switches bloom type (e.g. from ROW to ROWCOL)
903         long keyCount = (r.getBloomFilterType() == family.getBloomFilterType())
904           ? r.getFilterEntries() : r.getEntries();
905         maxKeyCount += keyCount;
906         if (LOG.isDebugEnabled()) {
907           LOG.debug("Compacting " + file +
908             ", keycount=" + keyCount +
909             ", bloomtype=" + r.getBloomFilterType().toString() +
910             ", size=" + StringUtils.humanReadableInt(r.length()) );
911         }
912       }
913     }
914 
915     // For each file, obtain a scanner:
916     List<StoreFileScanner> scanners = StoreFileScanner
917       .getScannersForStoreFiles(filesToCompact, false, false);
918 
919     // Make the instantiation lazy in case compaction produces no product; i.e.
920     // where all source cells are expired or deleted.
921     StoreFile.Writer writer = null;
922     try {
923       InternalScanner scanner = null;
924       try {
925         Scan scan = new Scan();
926         scan.setMaxVersions(family.getMaxVersions());
927         /* include deletes, unless we are doing a major compaction */
928         scanner = new StoreScanner(this, scan, scanners, !majorCompaction);
929         int bytesWritten = 0;
930         // since scanner.next() can return 'false' but still be delivering data,
931         // we have to use a do/while loop.
932         ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
933         while (scanner.next(kvs)) {
934           if (writer == null && !kvs.isEmpty()) {
935             writer = createWriterInTmp(maxKeyCount,
936               this.compactionCompression);
937           }
938           if (writer != null) {
939             // output to writer:
940             for (KeyValue kv : kvs) {
941               writer.append(kv);
942 
943               // check periodically to see if a system stop is requested
944               if (Store.closeCheckInterval > 0) {
945                 bytesWritten += kv.getLength();
946                 if (bytesWritten > Store.closeCheckInterval) {
947                   bytesWritten = 0;
948                   if (!this.region.areWritesEnabled()) {
949                     writer.close();
950                     fs.delete(writer.getPath(), false);
951                     throw new InterruptedIOException(
952                         "Aborting compaction of store " + this +
953                         " in region " + this.region +
954                         " because user requested stop.");
955                   }
956                 }
957               }
958             }
959           }
960           kvs.clear();
961         }
962       } finally {
963         if (scanner != null) {
964           scanner.close();
965         }
966       }
967     } finally {
968       if (writer != null) {
969         writer.appendMetadata(maxId, majorCompaction);
970         writer.close();
971       }
972     }
973     return writer;
974   }
975 
976   /*
977    * It's assumed that the compactLock  will be acquired prior to calling this
978    * method!  Otherwise, it is not thread-safe!
979    *
980    * <p>It works by processing a compaction that's been written to disk.
981    *
982    * <p>It is usually invoked at the end of a compaction, but might also be
983    * invoked at HStore startup, if the prior execution died midway through.
984    *
985    * <p>Moving the compacted TreeMap into place means:
986    * <pre>
987    * 1) Moving the new compacted StoreFile into place
988    * 2) Unload all replaced StoreFile, close and collect list to delete.
989    * 3) Loading the new TreeMap.
990    * 4) Compute new store size
991    * </pre>
992    *
993    * @param compactedFiles list of files that were compacted
994    * @param compactedFile StoreFile that is the result of the compaction
995    * @return StoreFile created. May be null.
996    * @throws IOException
997    */
998   private StoreFile completeCompaction(final List<StoreFile> compactedFiles,
999                                        final StoreFile.Writer compactedFile)
1000       throws IOException {
1001     // 1. Moving the new files into place -- if there is a new file (may not
1002     // be if all cells were expired or deleted).
1003     StoreFile result = null;
1004     if (compactedFile != null) {
1005       Path p = null;
1006       try {
1007         p = StoreFile.rename(this.fs, compactedFile.getPath(),
1008           StoreFile.getRandomFilename(fs, this.homedir));
1009       } catch (IOException e) {
1010         LOG.error("Failed move of compacted file " + compactedFile.getPath(), e);
1011         return null;
1012       }
1013       result = new StoreFile(this.fs, p, blockcache, this.conf,
1014           this.family.getBloomFilterType(), this.inMemory);
1015       result.createReader();
1016     }
1017     this.lock.writeLock().lock();
1018     try {
1019       try {
1020         // 2. Unloading
1021         // 3. Loading the new TreeMap.
1022         // Change this.storefiles so it reflects new state but do not
1023         // delete old store files until we have sent out notification of
1024         // change in case old files are still being accessed by outstanding
1025         // scanners.
1026         ArrayList<StoreFile> newStoreFiles = new ArrayList<StoreFile>();
1027         for (StoreFile sf : storefiles) {
1028           if (!compactedFiles.contains(sf)) {
1029             newStoreFiles.add(sf);
1030           }
1031         }
1032 
1033         // If a StoreFile result, move it into place.  May be null.
1034         if (result != null) {
1035           newStoreFiles.add(result);
1036         }
1037 
1038         this.storefiles = sortAndClone(newStoreFiles);
1039 
1040         // Tell observers that list of StoreFiles has changed.
1041         notifyChangedReadersObservers();
1042         // Finally, delete old store files.
1043         for (StoreFile hsf: compactedFiles) {
1044           hsf.deleteReader();
1045         }
1046       } catch (IOException e) {
1047         e = RemoteExceptionHandler.checkIOException(e);
1048         LOG.error("Failed replacing compacted files in " + this.storeNameStr +
1049           ". Compacted file is " + (result == null? "none": result.toString()) +
1050           ".  Files replaced " + compactedFiles.toString() +
1051           " some of which may have been already removed", e);
1052       }
1053       // 4. Compute new store size
1054       this.storeSize = 0L;
1055       for (StoreFile hsf : this.storefiles) {
1056         StoreFile.Reader r = hsf.getReader();
1057         if (r == null) {
1058           LOG.warn("StoreFile " + hsf + " has a null Reader");
1059           continue;
1060         }
1061         this.storeSize += r.length();
1062       }
1063     } finally {
1064       this.lock.writeLock().unlock();
1065     }
1066     return result;
1067   }
1068 
1069   public ImmutableList<StoreFile> sortAndClone(List<StoreFile> storeFiles) {
1070     Collections.sort(storeFiles, StoreFile.Comparators.FLUSH_TIME);
1071     ImmutableList<StoreFile> newList = ImmutableList.copyOf(storeFiles);
1072     return newList;
1073   }
1074 
1075   // ////////////////////////////////////////////////////////////////////////////
1076   // Accessors.
1077   // (This is the only section that is directly useful!)
1078   //////////////////////////////////////////////////////////////////////////////
1079   /**
1080    * @return the number of files in this store
1081    */
1082   public int getNumberOfstorefiles() {
1083     return this.storefiles.size();
1084   }
1085 
1086   /*
1087    * @param wantedVersions How many versions were asked for.
1088    * @return wantedVersions or this families' {@link HConstants#VERSIONS}.
1089    */
1090   int versionsToReturn(final int wantedVersions) {
1091     if (wantedVersions <= 0) {
1092       throw new IllegalArgumentException("Number of versions must be > 0");
1093     }
1094     // Make sure we do not return more than maximum versions for this store.
1095     int maxVersions = this.family.getMaxVersions();
1096     return wantedVersions > maxVersions ? maxVersions: wantedVersions;
1097   }
1098 
1099   static boolean isExpired(final KeyValue key, final long oldestTimestamp) {
1100     return key.getTimestamp() < oldestTimestamp;
1101   }
1102 
1103   /**
1104    * Find the key that matches <i>row</i> exactly, or the one that immediately
1105    * preceeds it. WARNING: Only use this method on a table where writes occur
1106    * with strictly increasing timestamps. This method assumes this pattern of
1107    * writes in order to make it reasonably performant.  Also our search is
1108    * dependent on the axiom that deletes are for cells that are in the container
1109    * that follows whether a memstore snapshot or a storefile, not for the
1110    * current container: i.e. we'll see deletes before we come across cells we
1111    * are to delete. Presumption is that the memstore#kvset is processed before
1112    * memstore#snapshot and so on.
1113    * @param kv First possible item on targeted row; i.e. empty columns, latest
1114    * timestamp and maximum type.
1115    * @return Found keyvalue or null if none found.
1116    * @throws IOException
1117    */
1118   KeyValue getRowKeyAtOrBefore(final KeyValue kv) throws IOException {
1119     GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker(
1120       this.comparator, kv, this.ttl, this.region.getRegionInfo().isMetaRegion());
1121     this.lock.readLock().lock();
1122     try {
1123       // First go to the memstore.  Pick up deletes and candidates.
1124       this.memstore.getRowKeyAtOrBefore(state);
1125       // Check if match, if we got a candidate on the asked for 'kv' row.
1126       // Process each store file. Run through from newest to oldest.
1127       for (StoreFile sf : Iterables.reverse(storefiles)) {
1128         // Update the candidate keys from the current map file
1129         rowAtOrBeforeFromStoreFile(sf, state);
1130       }
1131       return state.getCandidate();
1132     } finally {
1133       this.lock.readLock().unlock();
1134     }
1135   }
1136 
1137   /*
1138    * Check an individual MapFile for the row at or before a given row.
1139    * @param f
1140    * @param state
1141    * @throws IOException
1142    */
1143   private void rowAtOrBeforeFromStoreFile(final StoreFile f,
1144                                           final GetClosestRowBeforeTracker state)
1145       throws IOException {
1146     StoreFile.Reader r = f.getReader();
1147     if (r == null) {
1148       LOG.warn("StoreFile " + f + " has a null Reader");
1149       return;
1150     }
1151     // TODO: Cache these keys rather than make each time?
1152     byte [] fk = r.getFirstKey();
1153     KeyValue firstKV = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
1154     byte [] lk = r.getLastKey();
1155     KeyValue lastKV = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
1156     KeyValue firstOnRow = state.getTargetKey();
1157     if (this.comparator.compareRows(lastKV, firstOnRow) < 0) {
1158       // If last key in file is not of the target table, no candidates in this
1159       // file.  Return.
1160       if (!state.isTargetTable(lastKV)) return;
1161       // If the row we're looking for is past the end of file, set search key to
1162       // last key. TODO: Cache last and first key rather than make each time.
1163       firstOnRow = new KeyValue(lastKV.getRow(), HConstants.LATEST_TIMESTAMP);
1164     }
1165     // Get a scanner that caches blocks and that uses pread.
1166     HFileScanner scanner = r.getScanner(true, true);
1167     // Seek scanner.  If can't seek it, return.
1168     if (!seekToScanner(scanner, firstOnRow, firstKV)) return;
1169     // If we found candidate on firstOnRow, just return. THIS WILL NEVER HAPPEN!
1170     // Unlikely that there'll be an instance of actual first row in table.
1171     if (walkForwardInSingleRow(scanner, firstOnRow, state)) return;
1172     // If here, need to start backing up.
1173     while (scanner.seekBefore(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(),
1174        firstOnRow.getKeyLength())) {
1175       KeyValue kv = scanner.getKeyValue();
1176       if (!state.isTargetTable(kv)) break;
1177       if (!state.isBetterCandidate(kv)) break;
1178       // Make new first on row.
1179       firstOnRow = new KeyValue(kv.getRow(), HConstants.LATEST_TIMESTAMP);
1180       // Seek scanner.  If can't seek it, break.
1181       if (!seekToScanner(scanner, firstOnRow, firstKV)) break;
1182       // If we find something, break;
1183       if (walkForwardInSingleRow(scanner, firstOnRow, state)) break;
1184     }
1185   }
1186 
1187   /*
1188    * Seek the file scanner to firstOnRow or first entry in file.
1189    * @param scanner
1190    * @param firstOnRow
1191    * @param firstKV
1192    * @return True if we successfully seeked scanner.
1193    * @throws IOException
1194    */
1195   private boolean seekToScanner(final HFileScanner scanner,
1196                                 final KeyValue firstOnRow,
1197                                 final KeyValue firstKV)
1198       throws IOException {
1199     KeyValue kv = firstOnRow;
1200     // If firstOnRow < firstKV, set to firstKV
1201     if (this.comparator.compareRows(firstKV, firstOnRow) == 0) kv = firstKV;
1202     int result = scanner.seekTo(kv.getBuffer(), kv.getKeyOffset(),
1203       kv.getKeyLength());
1204     return result >= 0;
1205   }
1206 
1207   /*
1208    * When we come in here, we are probably at the kv just before we break into
1209    * the row that firstOnRow is on.  Usually need to increment one time to get
1210    * on to the row we are interested in.
1211    * @param scanner
1212    * @param firstOnRow
1213    * @param state
1214    * @return True we found a candidate.
1215    * @throws IOException
1216    */
1217   private boolean walkForwardInSingleRow(final HFileScanner scanner,
1218                                          final KeyValue firstOnRow,
1219                                          final GetClosestRowBeforeTracker state)
1220       throws IOException {
1221     boolean foundCandidate = false;
1222     do {
1223       KeyValue kv = scanner.getKeyValue();
1224       // If we are not in the row, skip.
1225       if (this.comparator.compareRows(kv, firstOnRow) < 0) continue;
1226       // Did we go beyond the target row? If so break.
1227       if (state.isTooFar(kv, firstOnRow)) break;
1228       if (state.isExpired(kv)) {
1229         continue;
1230       }
1231       // If we added something, this row is a contender. break.
1232       if (state.handle(kv)) {
1233         foundCandidate = true;
1234         break;
1235       }
1236     } while(scanner.next());
1237     return foundCandidate;
1238   }
1239 
1240   /**
1241    * Determines if HStore can be split
1242    * @param force Whether to force a split or not.
1243    * @return a StoreSize if store can be split, null otherwise.
1244    */
1245   StoreSize checkSplit(final boolean force) {
1246     this.lock.readLock().lock();
1247     try {
1248       // Iterate through all store files
1249       if (this.storefiles.isEmpty()) {
1250         return null;
1251       }
1252       if (!force && (storeSize < this.desiredMaxFileSize)) {
1253         return null;
1254       }
1255 
1256       if (this.region.getRegionInfo().isMetaRegion()) {
1257         if (force) {
1258           LOG.warn("Cannot split meta regions in HBase 0.20");
1259         }
1260         return null;
1261       }
1262 
1263       // Not splitable if we find a reference store file present in the store.
1264       boolean splitable = true;
1265       long maxSize = 0L;
1266       StoreFile largestSf = null;
1267       for (StoreFile sf : storefiles) {
1268         if (splitable) {
1269           splitable = !sf.isReference();
1270           if (!splitable) {
1271             // RETURN IN MIDDLE OF FUNCTION!!! If not splitable, just return.
1272             if (LOG.isDebugEnabled()) {
1273               LOG.debug(sf +  " is not splittable");
1274             }
1275             return null;
1276           }
1277         }
1278         StoreFile.Reader r = sf.getReader();
1279         if (r == null) {
1280           LOG.warn("Storefile " + sf + " Reader is null");
1281           continue;
1282         }
1283         long size = r.length();
1284         if (size > maxSize) {
1285           // This is the largest one so far
1286           maxSize = size;
1287           largestSf = sf;
1288         }
1289       }
1290       // if the user explicit set a split point, use that
1291       if (this.region.getSplitPoint() != null) {
1292         return new StoreSize(maxSize, this.region.getSplitPoint());
1293       }
1294       StoreFile.Reader r = largestSf.getReader();
1295       if (r == null) {
1296         LOG.warn("Storefile " + largestSf + " Reader is null");
1297         return null;
1298       }
1299       // Get first, last, and mid keys.  Midkey is the key that starts block
1300       // in middle of hfile.  Has column and timestamp.  Need to return just
1301       // the row we want to split on as midkey.
1302       byte [] midkey = r.midkey();
1303       if (midkey != null) {
1304         KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length);
1305         byte [] fk = r.getFirstKey();
1306         KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
1307         byte [] lk = r.getLastKey();
1308         KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
1309         // if the midkey is the same as the first and last keys, then we cannot
1310         // (ever) split this region.
1311         if (this.comparator.compareRows(mk, firstKey) == 0 &&
1312             this.comparator.compareRows(mk, lastKey) == 0) {
1313           if (LOG.isDebugEnabled()) {
1314             LOG.debug("cannot split because midkey is the same as first or " +
1315               "last row");
1316           }
1317           return null;
1318         }
1319         return new StoreSize(maxSize, mk.getRow());
1320       }
1321     } catch(IOException e) {
1322       LOG.warn("Failed getting store size for " + this.storeNameStr, e);
1323     } finally {
1324       this.lock.readLock().unlock();
1325     }
1326     return null;
1327   }
1328 
1329   /** @return aggregate size of all HStores used in the last compaction */
1330   public long getLastCompactSize() {
1331     return this.lastCompactSize;
1332   }
1333 
1334   /** @return aggregate size of HStore */
1335   public long getSize() {
1336     return storeSize;
1337   }
1338 
1339   //////////////////////////////////////////////////////////////////////////////
1340   // File administration
1341   //////////////////////////////////////////////////////////////////////////////
1342 
1343   /**
1344    * Return a scanner for both the memstore and the HStore files
1345    * @throws IOException
1346    */
1347   public KeyValueScanner getScanner(Scan scan,
1348       final NavigableSet<byte []> targetCols) throws IOException {
1349     lock.readLock().lock();
1350     try {
1351       return new StoreScanner(this, scan, targetCols);
1352     } finally {
1353       lock.readLock().unlock();
1354     }
1355   }
1356 
1357   @Override
1358   public String toString() {
1359     return this.storeNameStr;
1360   }
1361 
1362   /**
1363    * @return Count of store files
1364    */
1365   int getStorefilesCount() {
1366     return this.storefiles.size();
1367   }
1368 
1369   /**
1370    * @return The size of the store files, in bytes.
1371    */
1372   long getStorefilesSize() {
1373     long size = 0;
1374     for (StoreFile s: storefiles) {
1375       StoreFile.Reader r = s.getReader();
1376       if (r == null) {
1377         LOG.warn("StoreFile " + s + " has a null Reader");
1378         continue;
1379       }
1380       size += r.length();
1381     }
1382     return size;
1383   }
1384 
1385   /**
1386    * @return The size of the store file indexes, in bytes.
1387    */
1388   long getStorefilesIndexSize() {
1389     long size = 0;
1390     for (StoreFile s: storefiles) {
1391       StoreFile.Reader r = s.getReader();
1392       if (r == null) {
1393         LOG.warn("StoreFile " + s + " has a null Reader");
1394         continue;
1395       }
1396       size += r.indexSize();
1397     }
1398     return size;
1399   }
1400 
1401   /**
1402    * @return The priority that this store should have in the compaction queue
1403    */
1404   int getCompactPriority() {
1405     return this.blockingStoreFileCount - this.storefiles.size();
1406   }
1407 
1408   /**
1409    * Datastructure that holds size and row to split a file around.
1410    * TODO: Take a KeyValue rather than row.
1411    */
1412   static class StoreSize {
1413     private final long size;
1414     private final byte [] row;
1415 
1416     StoreSize(long size, byte [] row) {
1417       this.size = size;
1418       this.row = row;
1419     }
1420     /* @return the size */
1421     long getSize() {
1422       return size;
1423     }
1424 
1425     byte [] getSplitRow() {
1426       return this.row;
1427     }
1428   }
1429 
1430   HRegion getHRegion() {
1431     return this.region;
1432   }
1433 
1434   HRegionInfo getHRegionInfo() {
1435     return this.region.regionInfo;
1436   }
1437 
1438   /**
1439    * Increments the value for the given row/family/qualifier.
1440    *
1441    * This function will always be seen as atomic by other readers
1442    * because it only puts a single KV to memstore. Thus no
1443    * read/write control necessary.
1444    *
1445    * @param row
1446    * @param f
1447    * @param qualifier
1448    * @param newValue the new value to set into memstore
1449    * @return memstore size delta
1450    * @throws IOException
1451    */
1452   public long updateColumnValue(byte [] row, byte [] f,
1453                                 byte [] qualifier, long newValue)
1454       throws IOException {
1455 
1456     this.lock.readLock().lock();
1457     try {
1458       long now = EnvironmentEdgeManager.currentTimeMillis();
1459 
1460       return this.memstore.updateColumnValue(row,
1461           f,
1462           qualifier,
1463           newValue,
1464           now);
1465 
1466     } finally {
1467       this.lock.readLock().unlock();
1468     }
1469   }
1470 
1471   /**
1472    * Adds or replaces the specified KeyValues.
1473    * <p>
1474    * For each KeyValue specified, if a cell with the same row, family, and
1475    * qualifier exists in MemStore, it will be replaced.  Otherwise, it will just
1476    * be inserted to MemStore.
1477    * <p>
1478    * This operation is atomic on each KeyValue (row/family/qualifier) but not
1479    * necessarily atomic across all of them.
1480    * @param kvs
1481    * @return memstore size delta
1482    * @throws IOException
1483    */
1484   public long upsert(List<KeyValue> kvs)
1485       throws IOException {
1486     this.lock.readLock().lock();
1487     try {
1488       // TODO: Make this operation atomic w/ RWCC
1489       return this.memstore.upsert(kvs);
1490     } finally {
1491       this.lock.readLock().unlock();
1492     }
1493   }
1494 
1495   public StoreFlusher getStoreFlusher(long cacheFlushId) {
1496     return new StoreFlusherImpl(cacheFlushId);
1497   }
1498 
1499   private class StoreFlusherImpl implements StoreFlusher {
1500 
1501     private long cacheFlushId;
1502     private SortedSet<KeyValue> snapshot;
1503     private StoreFile storeFile;
1504     private TimeRangeTracker snapshotTimeRangeTracker;
1505 
1506     private StoreFlusherImpl(long cacheFlushId) {
1507       this.cacheFlushId = cacheFlushId;
1508     }
1509 
1510     @Override
1511     public void prepare() {
1512       memstore.snapshot();
1513       this.snapshot = memstore.getSnapshot();
1514       this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker();
1515     }
1516 
1517     @Override
1518     public void flushCache() throws IOException {
1519       storeFile = Store.this.flushCache(cacheFlushId, snapshot, snapshotTimeRangeTracker);
1520     }
1521 
1522     @Override
1523     public boolean commit() throws IOException {
1524       if (storeFile == null) {
1525         return false;
1526       }
1527       // Add new file to store files.  Clear snapshot too while we have
1528       // the Store write lock.
1529       return Store.this.updateStorefiles(storeFile, snapshot);
1530     }
1531   }
1532 
1533   /**
1534    * See if there's too much store files in this store
1535    * @return true if number of store files is greater than
1536    *  the number defined in compactionThreshold
1537    */
1538   public boolean hasTooManyStoreFiles() {
1539     return this.storefiles.size() > this.compactionThreshold;
1540   }
1541 
1542   public static final long FIXED_OVERHEAD = ClassSize.align(
1543       ClassSize.OBJECT + (15 * ClassSize.REFERENCE) +
1544       (6 * Bytes.SIZEOF_LONG) + (1 * Bytes.SIZEOF_DOUBLE) +
1545       (4 * Bytes.SIZEOF_INT) + (Bytes.SIZEOF_BOOLEAN * 2));
1546 
1547   public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
1548       ClassSize.OBJECT + ClassSize.REENTRANT_LOCK +
1549       ClassSize.CONCURRENT_SKIPLISTMAP +
1550       ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT);
1551 
1552   @Override
1553   public long heapSize() {
1554     return DEEP_OVERHEAD + this.memstore.heapSize();
1555   }
1556 }