1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.NavigableSet;
29 import java.util.SortedSet;
30 import java.util.concurrent.CopyOnWriteArraySet;
31 import java.util.concurrent.locks.ReentrantReadWriteLock;
32
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.FileStatus;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.FileUtil;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.hbase.HColumnDescriptor;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.HRegionInfo;
43 import org.apache.hadoop.hbase.KeyValue;
44 import org.apache.hadoop.hbase.RemoteExceptionHandler;
45 import org.apache.hadoop.hbase.client.Scan;
46 import org.apache.hadoop.hbase.io.HeapSize;
47 import org.apache.hadoop.hbase.io.hfile.Compression;
48 import org.apache.hadoop.hbase.io.hfile.HFile;
49 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
50 import org.apache.hadoop.hbase.util.Bytes;
51 import org.apache.hadoop.hbase.util.ClassSize;
52 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
53 import org.apache.hadoop.util.StringUtils;
54
55 import com.google.common.collect.ImmutableList;
56 import com.google.common.collect.Iterables;
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81 public class Store implements HeapSize {
82 static final Log LOG = LogFactory.getLog(Store.class);
83 protected final MemStore memstore;
84
85 private final Path homedir;
86 private final HRegion region;
87 private final HColumnDescriptor family;
88 final FileSystem fs;
89 final Configuration conf;
90
91 protected long ttl;
92 private long majorCompactionTime;
93 private final int maxFilesToCompact;
94 private final long minCompactSize;
95
96
97 private double compactRatio;
98 private long lastCompactSize = 0;
99
100 static int closeCheckInterval = 0;
101 private final long desiredMaxFileSize;
102 private final int blockingStoreFileCount;
103 private volatile long storeSize = 0L;
104 private final Object flushLock = new Object();
105 final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
106 private final String storeNameStr;
107 private final boolean inMemory;
108
109
110
111
112
113 private ImmutableList<StoreFile> storefiles = null;
114
115
116
117 private final CopyOnWriteArraySet<ChangedReadersObserver> changedReaderObservers =
118 new CopyOnWriteArraySet<ChangedReadersObserver>();
119
120 private final Object compactLock = new Object();
121 private final int compactionThreshold;
122 private final int blocksize;
123 private final boolean blockcache;
124
125 private final Compression.Algorithm compression;
126
127 private final Compression.Algorithm compactionCompression;
128
129
130 final KeyValue.KVComparator comparator;
131
132
133
134
135
136
137
138
139
140
141
142
143 protected Store(Path basedir, HRegion region, HColumnDescriptor family,
144 FileSystem fs, Configuration conf)
145 throws IOException {
146 HRegionInfo info = region.regionInfo;
147 this.fs = fs;
148 this.homedir = getStoreHomedir(basedir, info.getEncodedName(), family.getName());
149 if (!this.fs.exists(this.homedir)) {
150 if (!this.fs.mkdirs(this.homedir))
151 throw new IOException("Failed create of: " + this.homedir.toString());
152 }
153 this.region = region;
154 this.family = family;
155 this.conf = conf;
156 this.blockcache = family.isBlockCacheEnabled();
157 this.blocksize = family.getBlocksize();
158 this.compression = family.getCompression();
159
160
161 this.compactionCompression =
162 (family.getCompactionCompression() != Compression.Algorithm.NONE) ?
163 family.getCompactionCompression() : this.compression;
164 this.comparator = info.getComparator();
165
166 this.ttl = family.getTimeToLive();
167 if (ttl == HConstants.FOREVER) {
168
169 ttl = Long.MAX_VALUE;
170 } else if (ttl == -1) {
171 ttl = Long.MAX_VALUE;
172 } else {
173
174 this.ttl *= 1000;
175 }
176 this.memstore = new MemStore(conf, this.comparator);
177 this.storeNameStr = Bytes.toString(this.family.getName());
178
179
180
181 this.compactionThreshold = Math.max(2,
182 conf.getInt("hbase.hstore.compactionThreshold", 3));
183
184
185 this.inMemory = family.isInMemory();
186
187
188 long maxFileSize = info.getTableDesc().getMaxFileSize();
189 if (maxFileSize == HConstants.DEFAULT_MAX_FILE_SIZE) {
190 maxFileSize = conf.getLong("hbase.hregion.max.filesize",
191 HConstants.DEFAULT_MAX_FILE_SIZE);
192 }
193 this.desiredMaxFileSize = maxFileSize;
194 this.blockingStoreFileCount =
195 conf.getInt("hbase.hstore.blockingStoreFiles", 7);
196
197 this.majorCompactionTime = getNextMajorCompactTime();
198
199 this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
200 this.minCompactSize = conf.getLong("hbase.hstore.compaction.min.size",
201 this.region.memstoreFlushSize);
202 this.compactRatio = conf.getFloat("hbase.hstore.compaction.ratio", 1.2F);
203
204 if (Store.closeCheckInterval == 0) {
205 Store.closeCheckInterval = conf.getInt(
206 "hbase.hstore.close.check.interval", 10*1000*1000
207 }
208 this.storefiles = sortAndClone(loadStoreFiles());
209 }
210
211 public HColumnDescriptor getFamily() {
212 return this.family;
213 }
214
215
216
217
218 long getMaxSequenceId() {
219 return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
220 }
221
222
223
224
225
226
227
228 public static Path getStoreHomedir(final Path tabledir,
229 final String encodedName, final byte [] family) {
230 return new Path(tabledir, new Path(encodedName,
231 new Path(Bytes.toString(family))));
232 }
233
234
235
236
237
238 public Path getHomedir() {
239 return homedir;
240 }
241
242
243
244
245
246 private List<StoreFile> loadStoreFiles()
247 throws IOException {
248 ArrayList<StoreFile> results = new ArrayList<StoreFile>();
249 FileStatus files[] = this.fs.listStatus(this.homedir);
250 for (int i = 0; files != null && i < files.length; i++) {
251
252 if (files[i].isDir()) {
253 continue;
254 }
255 Path p = files[i].getPath();
256
257
258 if (this.fs.getFileStatus(p).getLen() <= 0) {
259 LOG.warn("Skipping " + p + " because its empty. HBASE-646 DATA LOSS?");
260 continue;
261 }
262 StoreFile curfile = null;
263 try {
264 curfile = new StoreFile(fs, p, blockcache, this.conf,
265 this.family.getBloomFilterType(), this.inMemory);
266 curfile.createReader();
267 } catch (IOException ioe) {
268 LOG.warn("Failed open of " + p + "; presumption is that file was " +
269 "corrupted at flush and lost edits picked up by commit log replay. " +
270 "Verify!", ioe);
271 continue;
272 }
273 long length = curfile.getReader().length();
274 this.storeSize += length;
275 if (LOG.isDebugEnabled()) {
276 LOG.debug("loaded " + curfile.toStringDetailed());
277 }
278 results.add(curfile);
279 }
280 return results;
281 }
282
283
284
285
286
287
288
289 protected long add(final KeyValue kv) {
290 lock.readLock().lock();
291 try {
292 return this.memstore.add(kv);
293 } finally {
294 lock.readLock().unlock();
295 }
296 }
297
298
299
300
301
302
303
304 protected long delete(final KeyValue kv) {
305 lock.readLock().lock();
306 try {
307 return this.memstore.delete(kv);
308 } finally {
309 lock.readLock().unlock();
310 }
311 }
312
313
314
315
316 List<StoreFile> getStorefiles() {
317 return this.storefiles;
318 }
319
320 public void bulkLoadHFile(String srcPathStr) throws IOException {
321 Path srcPath = new Path(srcPathStr);
322
323 HFile.Reader reader = null;
324 try {
325 LOG.info("Validating hfile at " + srcPath + " for inclusion in "
326 + "store " + this + " region " + this.region);
327 reader = new HFile.Reader(srcPath.getFileSystem(conf),
328 srcPath, null, false);
329 reader.loadFileInfo();
330
331 byte[] firstKey = reader.getFirstRowKey();
332 byte[] lk = reader.getLastKey();
333 byte[] lastKey =
334 (lk == null) ? null :
335 KeyValue.createKeyValueFromKey(lk).getRow();
336
337 LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) +
338 " last=" + Bytes.toStringBinary(lastKey));
339 LOG.debug("Region bounds: first=" +
340 Bytes.toStringBinary(region.getStartKey()) +
341 " last=" + Bytes.toStringBinary(region.getEndKey()));
342
343 HRegionInfo hri = region.getRegionInfo();
344 if (!hri.containsRange(firstKey, lastKey)) {
345 throw new WrongRegionException(
346 "Bulk load file " + srcPathStr + " does not fit inside region "
347 + this.region);
348 }
349 } finally {
350 if (reader != null) reader.close();
351 }
352
353
354 FileSystem srcFs = srcPath.getFileSystem(conf);
355 if (!srcFs.equals(fs)) {
356 LOG.info("File " + srcPath + " on different filesystem than " +
357 "destination store - moving to this filesystem.");
358 Path tmpPath = getTmpPath();
359 FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, conf);
360 LOG.info("Copied to temporary path on dst filesystem: " + tmpPath);
361 srcPath = tmpPath;
362 }
363
364 Path dstPath = StoreFile.getRandomFilename(fs, homedir);
365 LOG.info("Renaming bulk load file " + srcPath + " to " + dstPath);
366 StoreFile.rename(fs, srcPath, dstPath);
367
368 StoreFile sf = new StoreFile(fs, dstPath, blockcache,
369 this.conf, this.family.getBloomFilterType(), this.inMemory);
370 sf.createReader();
371
372 LOG.info("Moved hfile " + srcPath + " into store directory " +
373 homedir + " - updating store file list.");
374
375
376 this.lock.writeLock().lock();
377 try {
378 ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles);
379 newFiles.add(sf);
380 this.storefiles = sortAndClone(newFiles);
381 notifyChangedReadersObservers();
382 } finally {
383 this.lock.writeLock().unlock();
384 }
385 LOG.info("Successfully loaded store file " + srcPath
386 + " into store " + this + " (new location: " + dstPath + ")");
387 }
388
389
390
391
392
393
394 private Path getTmpPath() throws IOException {
395 return StoreFile.getRandomFilename(
396 fs, region.getTmpDir());
397 }
398
399
400
401
402
403
404
405
406
407 ImmutableList<StoreFile> close() throws IOException {
408 this.lock.writeLock().lock();
409 try {
410 ImmutableList<StoreFile> result = storefiles;
411
412
413 storefiles = ImmutableList.of();
414
415 for (StoreFile f: result) {
416 f.closeReader();
417 }
418 LOG.debug("closed " + this.storeNameStr);
419 return result;
420 } finally {
421 this.lock.writeLock().unlock();
422 }
423 }
424
425
426
427
428
429 void snapshot() {
430 this.memstore.snapshot();
431 }
432
433
434
435
436
437
438
439
440
441
442 private StoreFile flushCache(final long logCacheFlushId,
443 SortedSet<KeyValue> snapshot,
444 TimeRangeTracker snapshotTimeRangeTracker) throws IOException {
445
446
447
448 return internalFlushCache(snapshot, logCacheFlushId, snapshotTimeRangeTracker);
449 }
450
451
452
453
454
455
456
457 private StoreFile internalFlushCache(final SortedSet<KeyValue> set,
458 final long logCacheFlushId,
459 TimeRangeTracker snapshotTimeRangeTracker)
460 throws IOException {
461 StoreFile.Writer writer = null;
462 long flushed = 0;
463
464 if (set.size() == 0) {
465 return null;
466 }
467 long oldestTimestamp = System.currentTimeMillis() - ttl;
468
469
470
471 synchronized (flushLock) {
472
473 writer = createWriterInTmp(set.size());
474 writer.setTimeRangeTracker(snapshotTimeRangeTracker);
475 int entries = 0;
476 try {
477 for (KeyValue kv: set) {
478 if (!isExpired(kv, oldestTimestamp)) {
479 writer.append(kv);
480 entries++;
481 flushed += this.memstore.heapSizeChange(kv, true);
482 }
483 }
484 } finally {
485
486
487 writer.appendMetadata(logCacheFlushId, false);
488 writer.close();
489 }
490 }
491
492
493 Path dstPath = StoreFile.getUniqueFile(fs, homedir);
494 LOG.info("Renaming flushed file at " + writer.getPath() + " to " + dstPath);
495 if (!fs.rename(writer.getPath(), dstPath)) {
496 LOG.warn("Unable to rename " + writer.getPath() + " to " + dstPath);
497 }
498
499 StoreFile sf = new StoreFile(this.fs, dstPath, blockcache,
500 this.conf, this.family.getBloomFilterType(), this.inMemory);
501 StoreFile.Reader r = sf.createReader();
502 this.storeSize += r.length();
503 if(LOG.isInfoEnabled()) {
504 LOG.info("Added " + sf + ", entries=" + r.getEntries() +
505 ", sequenceid=" + logCacheFlushId +
506 ", memsize=" + StringUtils.humanReadableInt(flushed) +
507 ", filesize=" + StringUtils.humanReadableInt(r.length()));
508 }
509 return sf;
510 }
511
512
513
514
515
516 private StoreFile.Writer createWriterInTmp(int maxKeyCount)
517 throws IOException {
518 return createWriterInTmp(maxKeyCount, this.compression);
519 }
520
521
522
523
524
525
526 private StoreFile.Writer createWriterInTmp(int maxKeyCount,
527 Compression.Algorithm compression)
528 throws IOException {
529 return StoreFile.createWriter(this.fs, region.getTmpDir(), this.blocksize,
530 compression, this.comparator, this.conf,
531 this.family.getBloomFilterType(), maxKeyCount);
532 }
533
534
535
536
537
538
539
540
541 private boolean updateStorefiles(final StoreFile sf,
542 final SortedSet<KeyValue> set)
543 throws IOException {
544 this.lock.writeLock().lock();
545 try {
546 ArrayList<StoreFile> newList = new ArrayList<StoreFile>(storefiles);
547 newList.add(sf);
548 storefiles = sortAndClone(newList);
549 this.memstore.clearSnapshot(set);
550
551
552 notifyChangedReadersObservers();
553
554 return this.storefiles.size() >= this.compactionThreshold;
555 } finally {
556 this.lock.writeLock().unlock();
557 }
558 }
559
560
561
562
563
564 private void notifyChangedReadersObservers() throws IOException {
565 for (ChangedReadersObserver o: this.changedReaderObservers) {
566 o.updateReaders();
567 }
568 }
569
570
571
572
573 void addChangedReaderObserver(ChangedReadersObserver o) {
574 this.changedReaderObservers.add(o);
575 }
576
577
578
579
580 void deleteChangedReaderObserver(ChangedReadersObserver o) {
581
582 this.changedReaderObservers.remove(o);
583 }
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609 StoreSize compact(final boolean forceMajor) throws IOException {
610 boolean forceSplit = this.region.shouldForceSplit();
611 boolean majorcompaction = forceMajor;
612 synchronized (compactLock) {
613 this.lastCompactSize = 0;
614
615
616 List<StoreFile> filesToCompact = this.storefiles;
617 if (filesToCompact.isEmpty()) {
618 LOG.debug(this.storeNameStr + ": no store files to compact");
619 return null;
620 }
621
622
623
624
625 if (!majorcompaction) {
626 majorcompaction = isMajorCompaction(filesToCompact);
627 }
628
629 boolean references = hasReferences(filesToCompact);
630 if (!majorcompaction && !references &&
631 (forceSplit || (filesToCompact.size() < compactionThreshold))) {
632 return checkSplit(forceSplit);
633 }
634
635
636
637
638
639
640
641
642
643
644
645
646
647 int countOfFiles = filesToCompact.size();
648 long [] fileSizes = new long[countOfFiles];
649 long [] sumSize = new long[countOfFiles];
650 for (int i = countOfFiles-1; i >= 0; --i) {
651 StoreFile file = filesToCompact.get(i);
652 Path path = file.getPath();
653 if (path == null) {
654 LOG.error("Path is null for " + file);
655 return null;
656 }
657 StoreFile.Reader r = file.getReader();
658 if (r == null) {
659 LOG.error("StoreFile " + file + " has a null Reader");
660 return null;
661 }
662 fileSizes[i] = file.getReader().length();
663
664 int tooFar = i + this.maxFilesToCompact - 1;
665 sumSize[i] = fileSizes[i]
666 + ((i+1 < countOfFiles) ? sumSize[i+1] : 0)
667 - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
668 }
669
670 long totalSize = 0;
671 if (!majorcompaction && !references) {
672
673 int start = 0;
674 double r = this.compactRatio;
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689 while(countOfFiles - start >= this.compactionThreshold &&
690 fileSizes[start] >
691 Math.max(minCompactSize, (long)(sumSize[start+1] * r))) {
692 ++start;
693 }
694 int end = Math.min(countOfFiles, start + this.maxFilesToCompact);
695 totalSize = fileSizes[start]
696 + ((start+1 < countOfFiles) ? sumSize[start+1] : 0);
697
698
699 if (end - start < this.compactionThreshold) {
700 if (LOG.isDebugEnabled()) {
701 LOG.debug("Skipped compaction of " + this.storeNameStr
702 + " because only " + (end - start) + " file(s) of size "
703 + StringUtils.humanReadableInt(totalSize)
704 + " meet compaction criteria.");
705 }
706 return checkSplit(forceSplit);
707 }
708
709 if (0 == start && end == countOfFiles) {
710
711 majorcompaction = true;
712 } else {
713 filesToCompact = new ArrayList<StoreFile>(filesToCompact.subList(start,
714 end));
715 }
716 } else {
717
718 for (long i : fileSizes) {
719 totalSize += i;
720 }
721 }
722 this.lastCompactSize = totalSize;
723
724
725 long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
726
727
728 LOG.info("Started compaction of " + filesToCompact.size() + " file(s) in cf=" +
729 this.storeNameStr +
730 (references? ", hasReferences=true,": " ") + " into " +
731 region.getTmpDir() + ", seqid=" + maxId +
732 ", totalSize=" + StringUtils.humanReadableInt(totalSize));
733 StoreFile.Writer writer = compact(filesToCompact, majorcompaction, maxId);
734
735 StoreFile sf = completeCompaction(filesToCompact, writer);
736 if (LOG.isInfoEnabled()) {
737 LOG.info("Completed" + (majorcompaction? " major ": " ") +
738 "compaction of " + filesToCompact.size() +
739 " file(s), new file=" + (sf == null? "none": sf.toString()) +
740 ", size=" + (sf == null? "none": StringUtils.humanReadableInt(sf.getReader().length())) +
741 "; total size for store is " + StringUtils.humanReadableInt(storeSize));
742 }
743 }
744 return checkSplit(forceSplit);
745 }
746
747
748
749
750 protected void compactRecent(int N) throws IOException {
751 synchronized(compactLock) {
752 List<StoreFile> filesToCompact = this.storefiles;
753 int count = filesToCompact.size();
754 if (N > count) {
755 throw new RuntimeException("Not enough files");
756 }
757
758 filesToCompact = new ArrayList<StoreFile>(filesToCompact.subList(count-N, count));
759 long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
760 boolean majorcompaction = (N == count);
761
762
763 StoreFile.Writer writer = compact(filesToCompact, majorcompaction, maxId);
764
765 StoreFile sf = completeCompaction(filesToCompact, writer);
766 }
767 }
768
769
770
771
772
773 private boolean hasReferences(Collection<StoreFile> files) {
774 if (files != null && files.size() > 0) {
775 for (StoreFile hsf: files) {
776 if (hsf.isReference()) {
777 return true;
778 }
779 }
780 }
781 return false;
782 }
783
784
785
786
787
788
789
790
791 private static long getLowestTimestamp(FileSystem fs, Path dir) throws IOException {
792 FileStatus[] stats = fs.listStatus(dir);
793 if (stats == null || stats.length == 0) {
794 return 0l;
795 }
796 long lowTimestamp = Long.MAX_VALUE;
797 for (int i = 0; i < stats.length; i++) {
798 long timestamp = stats[i].getModificationTime();
799 if (timestamp < lowTimestamp){
800 lowTimestamp = timestamp;
801 }
802 }
803 return lowTimestamp;
804 }
805
806
807
808
809 boolean isMajorCompaction() throws IOException {
810 return isMajorCompaction(storefiles);
811 }
812
813
814
815
816
817 private boolean isMajorCompaction(final List<StoreFile> filesToCompact) throws IOException {
818 boolean result = false;
819 if (filesToCompact == null || filesToCompact.isEmpty() ||
820 majorCompactionTime == 0) {
821 return result;
822 }
823
824 long lowTimestamp = getLowestTimestamp(fs,
825 filesToCompact.get(0).getPath().getParent());
826 long now = System.currentTimeMillis();
827 if (lowTimestamp > 0l && lowTimestamp < (now - this.majorCompactionTime)) {
828
829 if (filesToCompact.size() == 1) {
830
831 StoreFile sf = filesToCompact.get(0);
832 long oldest =
833 (sf.getReader().timeRangeTracker == null) ?
834 Long.MIN_VALUE :
835 now - sf.getReader().timeRangeTracker.minimumTimestamp;
836 if (sf.isMajorCompaction() &&
837 (this.ttl == HConstants.FOREVER || oldest < this.ttl)) {
838 if (LOG.isDebugEnabled()) {
839 LOG.debug("Skipping major compaction of " + this.storeNameStr +
840 " because one (major) compacted file only and oldestTime " +
841 oldest + "ms is < ttl=" + this.ttl);
842 }
843 }
844 if(this.ttl != HConstants.FOREVER && oldest > this.ttl){
845 LOG.debug("Major compaction triggered on store " + this.storeNameStr +
846 ", because keyvalues outdated; time since last major compaction " +
847 (now - lowTimestamp) + "ms");
848 result = true;
849 }
850 } else {
851 if (LOG.isDebugEnabled()) {
852 LOG.debug("Major compaction triggered on store " + this.storeNameStr +
853 "; time since last major compaction " + (now - lowTimestamp) + "ms");
854 }
855 result = true;
856 this.majorCompactionTime = getNextMajorCompactTime();
857 }
858 }
859 return result;
860 }
861
862 long getNextMajorCompactTime() {
863
864 long ret = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
865 if (family.getValue(HConstants.MAJOR_COMPACTION_PERIOD) != null) {
866 String strCompactionTime =
867 family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
868 ret = (new Long(strCompactionTime)).longValue();
869 }
870
871 if (ret > 0) {
872
873 double jitterPct = conf.getFloat("hbase.hregion.majorcompaction.jitter",
874 0.20F);
875 if (jitterPct > 0) {
876 long jitter = Math.round(ret * jitterPct);
877 ret += jitter - Math.round(2L * jitter * Math.random());
878 }
879 }
880 return ret;
881 }
882
883
884
885
886
887
888
889
890
891
892
893 private StoreFile.Writer compact(final List<StoreFile> filesToCompact,
894 final boolean majorCompaction, final long maxId)
895 throws IOException {
896
897 int maxKeyCount = 0;
898 for (StoreFile file : filesToCompact) {
899 StoreFile.Reader r = file.getReader();
900 if (r != null) {
901
902
903 long keyCount = (r.getBloomFilterType() == family.getBloomFilterType())
904 ? r.getFilterEntries() : r.getEntries();
905 maxKeyCount += keyCount;
906 if (LOG.isDebugEnabled()) {
907 LOG.debug("Compacting " + file +
908 ", keycount=" + keyCount +
909 ", bloomtype=" + r.getBloomFilterType().toString() +
910 ", size=" + StringUtils.humanReadableInt(r.length()) );
911 }
912 }
913 }
914
915
916 List<StoreFileScanner> scanners = StoreFileScanner
917 .getScannersForStoreFiles(filesToCompact, false, false);
918
919
920
921 StoreFile.Writer writer = null;
922 try {
923 InternalScanner scanner = null;
924 try {
925 Scan scan = new Scan();
926 scan.setMaxVersions(family.getMaxVersions());
927
928 scanner = new StoreScanner(this, scan, scanners, !majorCompaction);
929 int bytesWritten = 0;
930
931
932 ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
933 while (scanner.next(kvs)) {
934 if (writer == null && !kvs.isEmpty()) {
935 writer = createWriterInTmp(maxKeyCount,
936 this.compactionCompression);
937 }
938 if (writer != null) {
939
940 for (KeyValue kv : kvs) {
941 writer.append(kv);
942
943
944 if (Store.closeCheckInterval > 0) {
945 bytesWritten += kv.getLength();
946 if (bytesWritten > Store.closeCheckInterval) {
947 bytesWritten = 0;
948 if (!this.region.areWritesEnabled()) {
949 writer.close();
950 fs.delete(writer.getPath(), false);
951 throw new InterruptedIOException(
952 "Aborting compaction of store " + this +
953 " in region " + this.region +
954 " because user requested stop.");
955 }
956 }
957 }
958 }
959 }
960 kvs.clear();
961 }
962 } finally {
963 if (scanner != null) {
964 scanner.close();
965 }
966 }
967 } finally {
968 if (writer != null) {
969 writer.appendMetadata(maxId, majorCompaction);
970 writer.close();
971 }
972 }
973 return writer;
974 }
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998 private StoreFile completeCompaction(final List<StoreFile> compactedFiles,
999 final StoreFile.Writer compactedFile)
1000 throws IOException {
1001
1002
1003 StoreFile result = null;
1004 if (compactedFile != null) {
1005 Path p = null;
1006 try {
1007 p = StoreFile.rename(this.fs, compactedFile.getPath(),
1008 StoreFile.getRandomFilename(fs, this.homedir));
1009 } catch (IOException e) {
1010 LOG.error("Failed move of compacted file " + compactedFile.getPath(), e);
1011 return null;
1012 }
1013 result = new StoreFile(this.fs, p, blockcache, this.conf,
1014 this.family.getBloomFilterType(), this.inMemory);
1015 result.createReader();
1016 }
1017 this.lock.writeLock().lock();
1018 try {
1019 try {
1020
1021
1022
1023
1024
1025
1026 ArrayList<StoreFile> newStoreFiles = new ArrayList<StoreFile>();
1027 for (StoreFile sf : storefiles) {
1028 if (!compactedFiles.contains(sf)) {
1029 newStoreFiles.add(sf);
1030 }
1031 }
1032
1033
1034 if (result != null) {
1035 newStoreFiles.add(result);
1036 }
1037
1038 this.storefiles = sortAndClone(newStoreFiles);
1039
1040
1041 notifyChangedReadersObservers();
1042
1043 for (StoreFile hsf: compactedFiles) {
1044 hsf.deleteReader();
1045 }
1046 } catch (IOException e) {
1047 e = RemoteExceptionHandler.checkIOException(e);
1048 LOG.error("Failed replacing compacted files in " + this.storeNameStr +
1049 ". Compacted file is " + (result == null? "none": result.toString()) +
1050 ". Files replaced " + compactedFiles.toString() +
1051 " some of which may have been already removed", e);
1052 }
1053
1054 this.storeSize = 0L;
1055 for (StoreFile hsf : this.storefiles) {
1056 StoreFile.Reader r = hsf.getReader();
1057 if (r == null) {
1058 LOG.warn("StoreFile " + hsf + " has a null Reader");
1059 continue;
1060 }
1061 this.storeSize += r.length();
1062 }
1063 } finally {
1064 this.lock.writeLock().unlock();
1065 }
1066 return result;
1067 }
1068
1069 public ImmutableList<StoreFile> sortAndClone(List<StoreFile> storeFiles) {
1070 Collections.sort(storeFiles, StoreFile.Comparators.FLUSH_TIME);
1071 ImmutableList<StoreFile> newList = ImmutableList.copyOf(storeFiles);
1072 return newList;
1073 }
1074
1075
1076
1077
1078
1079
1080
1081
1082 public int getNumberOfstorefiles() {
1083 return this.storefiles.size();
1084 }
1085
1086
1087
1088
1089
1090 int versionsToReturn(final int wantedVersions) {
1091 if (wantedVersions <= 0) {
1092 throw new IllegalArgumentException("Number of versions must be > 0");
1093 }
1094
1095 int maxVersions = this.family.getMaxVersions();
1096 return wantedVersions > maxVersions ? maxVersions: wantedVersions;
1097 }
1098
1099 static boolean isExpired(final KeyValue key, final long oldestTimestamp) {
1100 return key.getTimestamp() < oldestTimestamp;
1101 }
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118 KeyValue getRowKeyAtOrBefore(final KeyValue kv) throws IOException {
1119 GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker(
1120 this.comparator, kv, this.ttl, this.region.getRegionInfo().isMetaRegion());
1121 this.lock.readLock().lock();
1122 try {
1123
1124 this.memstore.getRowKeyAtOrBefore(state);
1125
1126
1127 for (StoreFile sf : Iterables.reverse(storefiles)) {
1128
1129 rowAtOrBeforeFromStoreFile(sf, state);
1130 }
1131 return state.getCandidate();
1132 } finally {
1133 this.lock.readLock().unlock();
1134 }
1135 }
1136
1137
1138
1139
1140
1141
1142
1143 private void rowAtOrBeforeFromStoreFile(final StoreFile f,
1144 final GetClosestRowBeforeTracker state)
1145 throws IOException {
1146 StoreFile.Reader r = f.getReader();
1147 if (r == null) {
1148 LOG.warn("StoreFile " + f + " has a null Reader");
1149 return;
1150 }
1151
1152 byte [] fk = r.getFirstKey();
1153 KeyValue firstKV = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
1154 byte [] lk = r.getLastKey();
1155 KeyValue lastKV = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
1156 KeyValue firstOnRow = state.getTargetKey();
1157 if (this.comparator.compareRows(lastKV, firstOnRow) < 0) {
1158
1159
1160 if (!state.isTargetTable(lastKV)) return;
1161
1162
1163 firstOnRow = new KeyValue(lastKV.getRow(), HConstants.LATEST_TIMESTAMP);
1164 }
1165
1166 HFileScanner scanner = r.getScanner(true, true);
1167
1168 if (!seekToScanner(scanner, firstOnRow, firstKV)) return;
1169
1170
1171 if (walkForwardInSingleRow(scanner, firstOnRow, state)) return;
1172
1173 while (scanner.seekBefore(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(),
1174 firstOnRow.getKeyLength())) {
1175 KeyValue kv = scanner.getKeyValue();
1176 if (!state.isTargetTable(kv)) break;
1177 if (!state.isBetterCandidate(kv)) break;
1178
1179 firstOnRow = new KeyValue(kv.getRow(), HConstants.LATEST_TIMESTAMP);
1180
1181 if (!seekToScanner(scanner, firstOnRow, firstKV)) break;
1182
1183 if (walkForwardInSingleRow(scanner, firstOnRow, state)) break;
1184 }
1185 }
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195 private boolean seekToScanner(final HFileScanner scanner,
1196 final KeyValue firstOnRow,
1197 final KeyValue firstKV)
1198 throws IOException {
1199 KeyValue kv = firstOnRow;
1200
1201 if (this.comparator.compareRows(firstKV, firstOnRow) == 0) kv = firstKV;
1202 int result = scanner.seekTo(kv.getBuffer(), kv.getKeyOffset(),
1203 kv.getKeyLength());
1204 return result >= 0;
1205 }
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217 private boolean walkForwardInSingleRow(final HFileScanner scanner,
1218 final KeyValue firstOnRow,
1219 final GetClosestRowBeforeTracker state)
1220 throws IOException {
1221 boolean foundCandidate = false;
1222 do {
1223 KeyValue kv = scanner.getKeyValue();
1224
1225 if (this.comparator.compareRows(kv, firstOnRow) < 0) continue;
1226
1227 if (state.isTooFar(kv, firstOnRow)) break;
1228 if (state.isExpired(kv)) {
1229 continue;
1230 }
1231
1232 if (state.handle(kv)) {
1233 foundCandidate = true;
1234 break;
1235 }
1236 } while(scanner.next());
1237 return foundCandidate;
1238 }
1239
1240
1241
1242
1243
1244
1245 StoreSize checkSplit(final boolean force) {
1246 this.lock.readLock().lock();
1247 try {
1248
1249 if (this.storefiles.isEmpty()) {
1250 return null;
1251 }
1252 if (!force && (storeSize < this.desiredMaxFileSize)) {
1253 return null;
1254 }
1255
1256 if (this.region.getRegionInfo().isMetaRegion()) {
1257 if (force) {
1258 LOG.warn("Cannot split meta regions in HBase 0.20");
1259 }
1260 return null;
1261 }
1262
1263
1264 boolean splitable = true;
1265 long maxSize = 0L;
1266 StoreFile largestSf = null;
1267 for (StoreFile sf : storefiles) {
1268 if (splitable) {
1269 splitable = !sf.isReference();
1270 if (!splitable) {
1271
1272 if (LOG.isDebugEnabled()) {
1273 LOG.debug(sf + " is not splittable");
1274 }
1275 return null;
1276 }
1277 }
1278 StoreFile.Reader r = sf.getReader();
1279 if (r == null) {
1280 LOG.warn("Storefile " + sf + " Reader is null");
1281 continue;
1282 }
1283 long size = r.length();
1284 if (size > maxSize) {
1285
1286 maxSize = size;
1287 largestSf = sf;
1288 }
1289 }
1290
1291 if (this.region.getSplitPoint() != null) {
1292 return new StoreSize(maxSize, this.region.getSplitPoint());
1293 }
1294 StoreFile.Reader r = largestSf.getReader();
1295 if (r == null) {
1296 LOG.warn("Storefile " + largestSf + " Reader is null");
1297 return null;
1298 }
1299
1300
1301
1302 byte [] midkey = r.midkey();
1303 if (midkey != null) {
1304 KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length);
1305 byte [] fk = r.getFirstKey();
1306 KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
1307 byte [] lk = r.getLastKey();
1308 KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
1309
1310
1311 if (this.comparator.compareRows(mk, firstKey) == 0 &&
1312 this.comparator.compareRows(mk, lastKey) == 0) {
1313 if (LOG.isDebugEnabled()) {
1314 LOG.debug("cannot split because midkey is the same as first or " +
1315 "last row");
1316 }
1317 return null;
1318 }
1319 return new StoreSize(maxSize, mk.getRow());
1320 }
1321 } catch(IOException e) {
1322 LOG.warn("Failed getting store size for " + this.storeNameStr, e);
1323 } finally {
1324 this.lock.readLock().unlock();
1325 }
1326 return null;
1327 }
1328
1329
1330 public long getLastCompactSize() {
1331 return this.lastCompactSize;
1332 }
1333
1334
1335 public long getSize() {
1336 return storeSize;
1337 }
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347 public KeyValueScanner getScanner(Scan scan,
1348 final NavigableSet<byte []> targetCols) throws IOException {
1349 lock.readLock().lock();
1350 try {
1351 return new StoreScanner(this, scan, targetCols);
1352 } finally {
1353 lock.readLock().unlock();
1354 }
1355 }
1356
1357 @Override
1358 public String toString() {
1359 return this.storeNameStr;
1360 }
1361
1362
1363
1364
1365 int getStorefilesCount() {
1366 return this.storefiles.size();
1367 }
1368
1369
1370
1371
1372 long getStorefilesSize() {
1373 long size = 0;
1374 for (StoreFile s: storefiles) {
1375 StoreFile.Reader r = s.getReader();
1376 if (r == null) {
1377 LOG.warn("StoreFile " + s + " has a null Reader");
1378 continue;
1379 }
1380 size += r.length();
1381 }
1382 return size;
1383 }
1384
1385
1386
1387
1388 long getStorefilesIndexSize() {
1389 long size = 0;
1390 for (StoreFile s: storefiles) {
1391 StoreFile.Reader r = s.getReader();
1392 if (r == null) {
1393 LOG.warn("StoreFile " + s + " has a null Reader");
1394 continue;
1395 }
1396 size += r.indexSize();
1397 }
1398 return size;
1399 }
1400
1401
1402
1403
1404 int getCompactPriority() {
1405 return this.blockingStoreFileCount - this.storefiles.size();
1406 }
1407
1408
1409
1410
1411
1412 static class StoreSize {
1413 private final long size;
1414 private final byte [] row;
1415
1416 StoreSize(long size, byte [] row) {
1417 this.size = size;
1418 this.row = row;
1419 }
1420
1421 long getSize() {
1422 return size;
1423 }
1424
1425 byte [] getSplitRow() {
1426 return this.row;
1427 }
1428 }
1429
1430 HRegion getHRegion() {
1431 return this.region;
1432 }
1433
1434 HRegionInfo getHRegionInfo() {
1435 return this.region.regionInfo;
1436 }
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452 public long updateColumnValue(byte [] row, byte [] f,
1453 byte [] qualifier, long newValue)
1454 throws IOException {
1455
1456 this.lock.readLock().lock();
1457 try {
1458 long now = EnvironmentEdgeManager.currentTimeMillis();
1459
1460 return this.memstore.updateColumnValue(row,
1461 f,
1462 qualifier,
1463 newValue,
1464 now);
1465
1466 } finally {
1467 this.lock.readLock().unlock();
1468 }
1469 }
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484 public long upsert(List<KeyValue> kvs)
1485 throws IOException {
1486 this.lock.readLock().lock();
1487 try {
1488
1489 return this.memstore.upsert(kvs);
1490 } finally {
1491 this.lock.readLock().unlock();
1492 }
1493 }
1494
1495 public StoreFlusher getStoreFlusher(long cacheFlushId) {
1496 return new StoreFlusherImpl(cacheFlushId);
1497 }
1498
1499 private class StoreFlusherImpl implements StoreFlusher {
1500
1501 private long cacheFlushId;
1502 private SortedSet<KeyValue> snapshot;
1503 private StoreFile storeFile;
1504 private TimeRangeTracker snapshotTimeRangeTracker;
1505
1506 private StoreFlusherImpl(long cacheFlushId) {
1507 this.cacheFlushId = cacheFlushId;
1508 }
1509
1510 @Override
1511 public void prepare() {
1512 memstore.snapshot();
1513 this.snapshot = memstore.getSnapshot();
1514 this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker();
1515 }
1516
1517 @Override
1518 public void flushCache() throws IOException {
1519 storeFile = Store.this.flushCache(cacheFlushId, snapshot, snapshotTimeRangeTracker);
1520 }
1521
1522 @Override
1523 public boolean commit() throws IOException {
1524 if (storeFile == null) {
1525 return false;
1526 }
1527
1528
1529 return Store.this.updateStorefiles(storeFile, snapshot);
1530 }
1531 }
1532
1533
1534
1535
1536
1537
1538 public boolean hasTooManyStoreFiles() {
1539 return this.storefiles.size() > this.compactionThreshold;
1540 }
1541
1542 public static final long FIXED_OVERHEAD = ClassSize.align(
1543 ClassSize.OBJECT + (15 * ClassSize.REFERENCE) +
1544 (6 * Bytes.SIZEOF_LONG) + (1 * Bytes.SIZEOF_DOUBLE) +
1545 (4 * Bytes.SIZEOF_INT) + (Bytes.SIZEOF_BOOLEAN * 2));
1546
1547 public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
1548 ClassSize.OBJECT + ClassSize.REENTRANT_LOCK +
1549 ClassSize.CONCURRENT_SKIPLISTMAP +
1550 ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT);
1551
1552 @Override
1553 public long heapSize() {
1554 return DEEP_OVERHEAD + this.memstore.heapSize();
1555 }
1556 }