1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import java.io.EOFException;
23 import java.io.IOException;
24 import java.io.InterruptedIOException;
25 import java.io.UnsupportedEncodingException;
26 import java.lang.reflect.Constructor;
27 import java.text.ParseException;
28 import java.util.AbstractList;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Collection;
32 import java.util.Collections;
33 import java.util.HashMap;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.NavigableMap;
37 import java.util.NavigableSet;
38 import java.util.Random;
39 import java.util.TreeMap;
40 import java.util.concurrent.ConcurrentHashMap;
41 import java.util.concurrent.ConcurrentSkipListMap;
42 import java.util.concurrent.CountDownLatch;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.atomic.AtomicBoolean;
45 import java.util.concurrent.atomic.AtomicInteger;
46 import java.util.concurrent.atomic.AtomicLong;
47 import java.util.concurrent.locks.ReentrantReadWriteLock;
48
49 import org.apache.commons.logging.Log;
50 import org.apache.commons.logging.LogFactory;
51 import org.apache.hadoop.conf.Configuration;
52 import org.apache.hadoop.fs.FSDataOutputStream;
53 import org.apache.hadoop.fs.FileStatus;
54 import org.apache.hadoop.fs.FileSystem;
55 import org.apache.hadoop.fs.Path;
56 import org.apache.hadoop.hbase.DoNotRetryIOException;
57 import org.apache.hadoop.hbase.DroppedSnapshotException;
58 import org.apache.hadoop.hbase.HBaseConfiguration;
59 import org.apache.hadoop.hbase.HColumnDescriptor;
60 import org.apache.hadoop.hbase.HConstants;
61 import org.apache.hadoop.hbase.HRegionInfo;
62 import org.apache.hadoop.hbase.HTableDescriptor;
63 import org.apache.hadoop.hbase.KeyValue;
64 import org.apache.hadoop.hbase.NotServingRegionException;
65 import org.apache.hadoop.hbase.UnknownScannerException;
66 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
67 import org.apache.hadoop.hbase.client.Delete;
68 import org.apache.hadoop.hbase.client.Get;
69 import org.apache.hadoop.hbase.client.Increment;
70 import org.apache.hadoop.hbase.client.Put;
71 import org.apache.hadoop.hbase.client.Result;
72 import org.apache.hadoop.hbase.client.Row;
73 import org.apache.hadoop.hbase.client.RowLock;
74 import org.apache.hadoop.hbase.client.Scan;
75 import org.apache.hadoop.hbase.filter.Filter;
76 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
77 import org.apache.hadoop.hbase.io.HeapSize;
78 import org.apache.hadoop.hbase.io.TimeRange;
79 import org.apache.hadoop.hbase.io.hfile.BlockCache;
80 import org.apache.hadoop.hbase.regionserver.wal.HLog;
81 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
82 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
83 import org.apache.hadoop.hbase.util.HashedBytes;
84 import org.apache.hadoop.hbase.util.Bytes;
85 import org.apache.hadoop.hbase.util.CancelableProgressable;
86 import org.apache.hadoop.hbase.util.ClassSize;
87 import org.apache.hadoop.hbase.util.CompressionTest;
88 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
89 import org.apache.hadoop.hbase.util.FSUtils;
90 import org.apache.hadoop.hbase.util.Pair;
91 import org.apache.hadoop.hbase.util.Writables;
92 import org.apache.hadoop.io.Writable;
93 import org.apache.hadoop.util.StringUtils;
94
95 import com.google.common.collect.Lists;
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133 public class HRegion implements HeapSize {
134 public static final Log LOG = LogFactory.getLog(HRegion.class);
135 static final String MERGEDIR = "merges";
136
137 final AtomicBoolean closed = new AtomicBoolean(false);
138
139
140
141
142
143 final AtomicBoolean closing = new AtomicBoolean(false);
144
145
146
147
148
149 private final ConcurrentHashMap<HashedBytes, CountDownLatch> lockedRows =
150 new ConcurrentHashMap<HashedBytes, CountDownLatch>();
151 private final ConcurrentHashMap<Integer, HashedBytes> lockIds =
152 new ConcurrentHashMap<Integer, HashedBytes>();
153 private final AtomicInteger lockIdGenerator = new AtomicInteger(1);
154 static private Random rand = new Random();
155
156 protected final Map<byte [], Store> stores =
157 new ConcurrentSkipListMap<byte [], Store>(Bytes.BYTES_RAWCOMPARATOR);
158
159 final AtomicLong memstoreSize = new AtomicLong(0);
160
161
162
163
164
165 final Path tableDir;
166
167 final HLog log;
168 final FileSystem fs;
169 final Configuration conf;
170 final int rowLockWaitDuration;
171 static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
172 final HRegionInfo regionInfo;
173 final Path regiondir;
174 KeyValue.KVComparator comparator;
175
176
177
178
179
180 private volatile boolean forceMajorCompaction = false;
181 private Pair<Long,Long> lastCompactInfo = null;
182
183
184 private final Object closeLock = new Object();
185
186
187
188
189
190 static class WriteState {
191
192 volatile boolean flushing = false;
193
194 volatile boolean flushRequested = false;
195
196 volatile boolean compacting = false;
197
198 volatile boolean writesEnabled = true;
199
200 volatile boolean readOnly = false;
201
202
203
204
205
206
207 synchronized void setReadOnly(final boolean onOff) {
208 this.writesEnabled = !onOff;
209 this.readOnly = onOff;
210 }
211
212 boolean isReadOnly() {
213 return this.readOnly;
214 }
215
216 boolean isFlushRequested() {
217 return this.flushRequested;
218 }
219
220 static final long HEAP_SIZE = ClassSize.align(
221 ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
222 }
223
224 final WriteState writestate = new WriteState();
225
226 final long memstoreFlushSize;
227 private volatile long lastFlushTime;
228 private List<Pair<Long,Long>> recentFlushes = new ArrayList<Pair<Long,Long>>();
229 final FlushRequester flushRequester;
230 private final long blockingMemStoreSize;
231 final long threadWakeFrequency;
232
233 final ReentrantReadWriteLock lock =
234 new ReentrantReadWriteLock();
235
236
237 private final ReentrantReadWriteLock updatesLock =
238 new ReentrantReadWriteLock();
239 private boolean splitRequest;
240 private byte[] splitPoint = null;
241
242 private final ReadWriteConsistencyControl rwcc =
243 new ReadWriteConsistencyControl();
244
245
246
247
248 public final static String REGIONINFO_FILE = ".regioninfo";
249
250
251
252
253 public HRegion(){
254 this.tableDir = null;
255 this.blockingMemStoreSize = 0L;
256 this.conf = null;
257 this.rowLockWaitDuration = DEFAULT_ROWLOCK_WAIT_DURATION;
258 this.flushRequester = null;
259 this.fs = null;
260 this.memstoreFlushSize = 0L;
261 this.log = null;
262 this.regiondir = null;
263 this.regionInfo = null;
264 this.threadWakeFrequency = 0L;
265 }
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290 public HRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf,
291 HRegionInfo regionInfo, FlushRequester flushRequester) {
292 this.tableDir = tableDir;
293 this.comparator = regionInfo.getComparator();
294 this.log = log;
295 this.fs = fs;
296 this.conf = conf;
297 this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
298 DEFAULT_ROWLOCK_WAIT_DURATION);
299 this.regionInfo = regionInfo;
300 this.flushRequester = flushRequester;
301 this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY,
302 10 * 1000);
303 String encodedNameStr = this.regionInfo.getEncodedName();
304 this.regiondir = getRegionDir(this.tableDir, encodedNameStr);
305 long flushSize = regionInfo.getTableDesc().getMemStoreFlushSize();
306 if (flushSize == HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE) {
307 flushSize = conf.getLong("hbase.hregion.memstore.flush.size",
308 HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
309 }
310 this.memstoreFlushSize = flushSize;
311 this.blockingMemStoreSize = this.memstoreFlushSize *
312 conf.getLong("hbase.hregion.memstore.block.multiplier", 2);
313 if (LOG.isDebugEnabled()) {
314
315 LOG.debug("Instantiated " + this);
316 }
317 }
318
319
320
321
322
323
324 public long initialize() throws IOException {
325 return initialize(null);
326 }
327
328
329
330
331
332
333
334
335 public long initialize(final CancelableProgressable reporter)
336 throws IOException {
337
338 this.closing.set(false);
339 this.closed.set(false);
340
341
342 checkRegioninfoOnFilesystem();
343
344
345 cleanupTmpDir();
346
347
348 long maxSeqId = -1;
349 for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
350 Store store = instantiateHStore(this.tableDir, c);
351 this.stores.put(c.getName(), store);
352 long storeSeqId = store.getMaxSequenceId();
353 if (storeSeqId > maxSeqId) {
354 maxSeqId = storeSeqId;
355 }
356 }
357
358 maxSeqId = replayRecoveredEditsIfAny(this.regiondir, maxSeqId, reporter);
359
360
361
362
363 SplitTransaction.cleanupAnySplitDetritus(this);
364 FSUtils.deleteDirectory(this.fs, new Path(regiondir, MERGEDIR));
365
366 this.writestate.setReadOnly(this.regionInfo.getTableDesc().isReadOnly());
367
368 this.writestate.compacting = false;
369 this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
370
371
372 long nextSeqid = maxSeqId + 1;
373 LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid);
374 return nextSeqid;
375 }
376
377
378
379
380
381
382
383 static void moveInitialFilesIntoPlace(final FileSystem fs,
384 final Path initialFiles, final Path regiondir)
385 throws IOException {
386 if (initialFiles != null && fs.exists(initialFiles)) {
387 if (!fs.rename(initialFiles, regiondir)) {
388 LOG.warn("Unable to rename " + initialFiles + " to " + regiondir);
389 }
390 }
391 }
392
393
394
395
396 public boolean hasReferences() {
397 for (Store store : this.stores.values()) {
398 for (StoreFile sf : store.getStorefiles()) {
399
400 if (sf.isReference()) return true;
401 }
402 }
403 return false;
404 }
405
406
407
408
409
410
411 private void checkRegioninfoOnFilesystem() throws IOException {
412 Path regioninfoPath = new Path(this.regiondir, REGIONINFO_FILE);
413 if (this.fs.exists(regioninfoPath) &&
414 this.fs.getFileStatus(regioninfoPath).getLen() > 0) {
415 return;
416 }
417
418
419
420
421 Path tmpPath = new Path(getTmpDir(), REGIONINFO_FILE);
422 FSDataOutputStream out = this.fs.create(tmpPath, true);
423 try {
424 this.regionInfo.write(out);
425 out.write('\n');
426 out.write('\n');
427 out.write(Bytes.toBytes(this.regionInfo.toString()));
428 } finally {
429 out.close();
430 }
431 if (!fs.rename(tmpPath, regioninfoPath)) {
432 throw new IOException("Unable to rename " + tmpPath + " to " +
433 regioninfoPath);
434 }
435 }
436
437
438 public HRegionInfo getRegionInfo() {
439 return this.regionInfo;
440 }
441
442
443 public boolean isClosed() {
444 return this.closed.get();
445 }
446
447
448
449
450 public boolean isClosing() {
451 return this.closing.get();
452 }
453
454 boolean areWritesEnabled() {
455 synchronized(this.writestate) {
456 return this.writestate.writesEnabled;
457 }
458 }
459
460 public ReadWriteConsistencyControl getRWCC() {
461 return rwcc;
462 }
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477 public List<StoreFile> close() throws IOException {
478 return close(false);
479 }
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495 public List<StoreFile> close(final boolean abort) throws IOException {
496
497
498 synchronized (closeLock) {
499 return doClose(abort);
500 }
501 }
502
503 private List<StoreFile> doClose(final boolean abort)
504 throws IOException {
505 if (isClosed()) {
506 LOG.warn("Region " + this + " already closed");
507 return null;
508 }
509 boolean wasFlushing = false;
510 synchronized (writestate) {
511
512
513 writestate.writesEnabled = false;
514 wasFlushing = writestate.flushing;
515 LOG.debug("Closing " + this + ": disabling compactions & flushes");
516 while (writestate.compacting || writestate.flushing) {
517 LOG.debug("waiting for" +
518 (writestate.compacting ? " compaction" : "") +
519 (writestate.flushing ?
520 (writestate.compacting ? "," : "") + " cache flush" :
521 "") + " to complete for region " + this);
522 try {
523 writestate.wait();
524 } catch (InterruptedException iex) {
525
526 }
527 }
528 }
529
530
531
532 if (!abort && !wasFlushing && worthPreFlushing()) {
533 LOG.info("Running close preflush of " + this.getRegionNameAsString());
534 internalFlushcache();
535 }
536 this.closing.set(true);
537 lock.writeLock().lock();
538 try {
539 if (this.isClosed()) {
540
541 return null;
542 }
543 LOG.debug("Updates disabled for region " + this);
544
545 if (!abort) {
546 internalFlushcache();
547 }
548
549 List<StoreFile> result = new ArrayList<StoreFile>();
550 for (Store store : stores.values()) {
551 result.addAll(store.close());
552 }
553 this.closed.set(true);
554 LOG.info("Closed " + this);
555 return result;
556 } finally {
557 lock.writeLock().unlock();
558 }
559 }
560
561
562
563
564 private boolean worthPreFlushing() {
565 return this.memstoreSize.get() >
566 this.conf.getLong("hbase.hregion.preclose.flush.size", 1024 * 1024 * 5);
567 }
568
569
570
571
572
573
574 public byte [] getStartKey() {
575 return this.regionInfo.getStartKey();
576 }
577
578
579 public byte [] getEndKey() {
580 return this.regionInfo.getEndKey();
581 }
582
583
584 public long getRegionId() {
585 return this.regionInfo.getRegionId();
586 }
587
588
589 public byte [] getRegionName() {
590 return this.regionInfo.getRegionName();
591 }
592
593
594 public String getRegionNameAsString() {
595 return this.regionInfo.getRegionNameAsString();
596 }
597
598
599 public HTableDescriptor getTableDesc() {
600 return this.regionInfo.getTableDesc();
601 }
602
603
604 public HLog getLog() {
605 return this.log;
606 }
607
608
609 public Configuration getConf() {
610 return this.conf;
611 }
612
613
614 public Path getRegionDir() {
615 return this.regiondir;
616 }
617
618
619
620
621
622
623
624
625 public static Path getRegionDir(final Path tabledir, final String name) {
626 return new Path(tabledir, name);
627 }
628
629
630 public FileSystem getFilesystem() {
631 return this.fs;
632 }
633
634
635 public Pair<Long,Long> getLastCompactInfo() {
636 return this.lastCompactInfo;
637 }
638
639
640 public long getLastFlushTime() {
641 return this.lastFlushTime;
642 }
643
644
645 public List<Pair<Long,Long>> getRecentFlushInfo() {
646 this.lock.readLock().lock();
647 List<Pair<Long,Long>> ret = this.recentFlushes;
648 this.recentFlushes = new ArrayList<Pair<Long,Long>>();
649 this.lock.readLock().unlock();
650 return ret;
651 }
652
653
654
655
656
657
658
659
660
661 public long getLargestHStoreSize() {
662 long size = 0;
663 for (Store h: stores.values()) {
664 long storeSize = h.getSize();
665 if (storeSize > size) {
666 size = storeSize;
667 }
668 }
669 return size;
670 }
671
672
673
674
675
676 void doRegionCompactionPrep() throws IOException {
677 }
678
679
680
681
682 private void cleanupTmpDir() throws IOException {
683 FSUtils.deleteDirectory(this.fs, getTmpDir());
684 }
685
686
687
688
689
690 Path getTmpDir() {
691 return new Path(getRegionDir(), ".tmp");
692 }
693
694 void setForceMajorCompaction(final boolean b) {
695 this.forceMajorCompaction = b;
696 }
697
698 boolean getForceMajorCompaction() {
699 return this.forceMajorCompaction;
700 }
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716 public byte [] compactStores() throws IOException {
717 boolean majorCompaction = this.forceMajorCompaction;
718 this.forceMajorCompaction = false;
719 return compactStores(majorCompaction);
720 }
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737 byte [] compactStores(final boolean majorCompaction)
738 throws IOException {
739 if (this.closing.get()) {
740 LOG.debug("Skipping compaction on " + this + " because closing");
741 return null;
742 }
743 lock.readLock().lock();
744 this.lastCompactInfo = null;
745 try {
746 if (this.closed.get()) {
747 LOG.debug("Skipping compaction on " + this + " because closed");
748 return null;
749 }
750 byte [] splitRow = null;
751 if (this.closed.get()) {
752 return splitRow;
753 }
754 try {
755 synchronized (writestate) {
756 if (!writestate.compacting && writestate.writesEnabled) {
757 writestate.compacting = true;
758 } else {
759 LOG.info("NOT compacting region " + this +
760 ": compacting=" + writestate.compacting + ", writesEnabled=" +
761 writestate.writesEnabled);
762 return splitRow;
763 }
764 }
765 LOG.info("Starting" + (majorCompaction? " major " : " ") +
766 "compaction on region " + this);
767 long startTime = EnvironmentEdgeManager.currentTimeMillis();
768 doRegionCompactionPrep();
769 long lastCompactSize = 0;
770 long maxSize = -1;
771 boolean completed = false;
772 try {
773 for (Store store: stores.values()) {
774 final Store.StoreSize ss = store.compact(majorCompaction);
775 lastCompactSize += store.getLastCompactSize();
776 if (ss != null && ss.getSize() > maxSize) {
777 maxSize = ss.getSize();
778 splitRow = ss.getSplitRow();
779 }
780 }
781 completed = true;
782 } catch (InterruptedIOException iioe) {
783 LOG.info("compaction interrupted by user: ", iioe);
784 } finally {
785 long now = EnvironmentEdgeManager.currentTimeMillis();
786 LOG.info(((completed) ? "completed" : "aborted")
787 + " compaction on region " + this
788 + " after " + StringUtils.formatTimeDiff(now, startTime));
789 if (completed) {
790 this.lastCompactInfo =
791 new Pair<Long,Long>((now - startTime) / 1000, lastCompactSize);
792 }
793 }
794 } finally {
795 synchronized (writestate) {
796 writestate.compacting = false;
797 writestate.notifyAll();
798 }
799 }
800 if (splitRow != null) {
801 assert splitPoint == null || Bytes.equals(splitRow, splitPoint);
802 this.splitPoint = null;
803 }
804 return splitRow;
805 } finally {
806 lock.readLock().unlock();
807 }
808 }
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830 public boolean flushcache() throws IOException {
831
832 if (this.closing.get()) {
833 LOG.debug("Skipping flush on " + this + " because closing");
834 return false;
835 }
836 lock.readLock().lock();
837 try {
838 if (this.closed.get()) {
839 LOG.debug("Skipping flush on " + this + " because closed");
840 return false;
841 }
842 try {
843 synchronized (writestate) {
844 if (!writestate.flushing && writestate.writesEnabled) {
845 this.writestate.flushing = true;
846 } else {
847 if (LOG.isDebugEnabled()) {
848 LOG.debug("NOT flushing memstore for region " + this +
849 ", flushing=" +
850 writestate.flushing + ", writesEnabled=" +
851 writestate.writesEnabled);
852 }
853 return false;
854 }
855 }
856 return internalFlushcache();
857 } finally {
858 synchronized (writestate) {
859 writestate.flushing = false;
860 this.writestate.flushRequested = false;
861 writestate.notifyAll();
862 }
863 }
864 } finally {
865 lock.readLock().unlock();
866 }
867 }
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903 protected boolean internalFlushcache() throws IOException {
904 return internalFlushcache(this.log, -1);
905 }
906
907
908
909
910
911
912
913
914
915 protected boolean internalFlushcache(final HLog wal, final long myseqid)
916 throws IOException {
917 final long startTime = EnvironmentEdgeManager.currentTimeMillis();
918
919
920 this.lastFlushTime = startTime;
921
922 if (this.memstoreSize.get() <= 0) {
923 return false;
924 }
925 if (LOG.isDebugEnabled()) {
926 LOG.debug("Started memstore flush for " + this +
927 ", current region memstore size " +
928 StringUtils.humanReadableInt(this.memstoreSize.get()) +
929 ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid));
930 }
931
932
933
934
935
936
937
938
939 long sequenceId = -1L;
940 long completeSequenceId = -1L;
941
942
943
944
945 this.updatesLock.writeLock().lock();
946 final long currentMemStoreSize = this.memstoreSize.get();
947 List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
948 try {
949 sequenceId = (wal == null)? myseqid: wal.startCacheFlush();
950 completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
951
952 for (Store s : stores.values()) {
953 storeFlushers.add(s.getStoreFlusher(completeSequenceId));
954 }
955
956
957 for (StoreFlusher flusher : storeFlushers) {
958 flusher.prepare();
959 }
960 } finally {
961 this.updatesLock.writeLock().unlock();
962 }
963
964 LOG.debug("Finished snapshotting, commencing flushing stores");
965
966
967
968
969
970 boolean compactionRequested = false;
971 try {
972
973
974
975
976 for (StoreFlusher flusher : storeFlushers) {
977 flusher.flushCache();
978 }
979
980
981 for (StoreFlusher flusher : storeFlushers) {
982 boolean needsCompaction = flusher.commit();
983 if (needsCompaction) {
984 compactionRequested = true;
985 }
986 }
987 storeFlushers.clear();
988
989
990 this.memstoreSize.addAndGet(-currentMemStoreSize);
991 } catch (Throwable t) {
992
993
994
995
996
997
998 if (wal != null) wal.abortCacheFlush();
999 DroppedSnapshotException dse = new DroppedSnapshotException("region: " +
1000 Bytes.toStringBinary(getRegionName()));
1001 dse.initCause(t);
1002 throw dse;
1003 }
1004
1005
1006
1007
1008
1009
1010
1011
1012 if (wal != null) {
1013 wal.completeCacheFlush(this.regionInfo.getEncodedNameAsBytes(),
1014 regionInfo.getTableDesc().getName(), completeSequenceId,
1015 this.getRegionInfo().isMetaRegion());
1016 }
1017
1018
1019
1020 synchronized (this) {
1021 notifyAll();
1022 }
1023
1024 long time = EnvironmentEdgeManager.currentTimeMillis() - startTime;
1025 if (LOG.isDebugEnabled()) {
1026 LOG.info("Finished memstore flush of ~" +
1027 StringUtils.humanReadableInt(currentMemStoreSize) + " for region " +
1028 this + " in " + time + "ms, sequenceid=" + sequenceId +
1029 ", compaction requested=" + compactionRequested +
1030 ((wal == null)? "; wal=null": ""));
1031 }
1032 this.recentFlushes.add(new Pair<Long,Long>(time/1000,currentMemStoreSize));
1033
1034 return compactionRequested;
1035 }
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045 protected long getCompleteCacheFlushSequenceId(long currentSequenceId) {
1046 return currentSequenceId;
1047 }
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061 Result getClosestRowBefore(final byte [] row)
1062 throws IOException{
1063 return getClosestRowBefore(row, HConstants.CATALOG_FAMILY);
1064 }
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076 public Result getClosestRowBefore(final byte [] row, final byte [] family)
1077 throws IOException {
1078
1079
1080 KeyValue key = null;
1081 checkRow(row);
1082 startRegionOperation();
1083 try {
1084 Store store = getStore(family);
1085 KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
1086
1087 key = store.getRowKeyAtOrBefore(kv);
1088 if (key == null) {
1089 return null;
1090 }
1091 Get get = new Get(key.getRow());
1092 get.addFamily(family);
1093 return get(get, null);
1094 } finally {
1095 closeRegionOperation();
1096 }
1097 }
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109 public InternalScanner getScanner(Scan scan)
1110 throws IOException {
1111 return getScanner(scan, null);
1112 }
1113
1114 protected InternalScanner getScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
1115 startRegionOperation();
1116 try {
1117
1118 if(scan.hasFamilies()) {
1119 for(byte [] family : scan.getFamilyMap().keySet()) {
1120 checkFamily(family);
1121 }
1122 } else {
1123 for(byte[] family: regionInfo.getTableDesc().getFamiliesKeys()){
1124 scan.addFamily(family);
1125 }
1126 }
1127 return instantiateInternalScanner(scan, additionalScanners);
1128
1129 } finally {
1130 closeRegionOperation();
1131 }
1132 }
1133
1134 protected InternalScanner instantiateInternalScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
1135 return new RegionScanner(scan, additionalScanners);
1136 }
1137
1138
1139
1140
1141 private void prepareDelete(Delete delete) throws IOException {
1142
1143 if(delete.getFamilyMap().isEmpty()){
1144 for(byte [] family : regionInfo.getTableDesc().getFamiliesKeys()){
1145
1146 delete.deleteFamily(family, delete.getTimeStamp());
1147 }
1148 } else {
1149 for(byte [] family : delete.getFamilyMap().keySet()) {
1150 if(family == null) {
1151 throw new NoSuchColumnFamilyException("Empty family is invalid");
1152 }
1153 checkFamily(family);
1154 }
1155 }
1156 }
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167 public void delete(Delete delete, Integer lockid, boolean writeToWAL)
1168 throws IOException {
1169 checkReadOnly();
1170 checkResources();
1171 Integer lid = null;
1172 startRegionOperation();
1173 try {
1174 byte [] row = delete.getRow();
1175
1176 lid = getLock(lockid, row, true);
1177
1178 try {
1179
1180 prepareDelete(delete);
1181 delete(delete.getFamilyMap(), writeToWAL);
1182 } finally {
1183 if(lockid == null) releaseRowLock(lid);
1184 }
1185 } finally {
1186 closeRegionOperation();
1187 }
1188 }
1189
1190
1191
1192
1193
1194
1195
1196 public void delete(Map<byte[], List<KeyValue>> familyMap, boolean writeToWAL)
1197 throws IOException {
1198 long now = EnvironmentEdgeManager.currentTimeMillis();
1199 byte [] byteNow = Bytes.toBytes(now);
1200 boolean flush = false;
1201
1202 updatesLock.readLock().lock();
1203
1204 try {
1205
1206 for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
1207
1208 byte[] family = e.getKey();
1209 List<KeyValue> kvs = e.getValue();
1210 Map<byte[], Integer> kvCount = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
1211
1212 for (KeyValue kv: kvs) {
1213
1214
1215 if (kv.isLatestTimestamp() && kv.isDeleteType()) {
1216 byte[] qual = kv.getQualifier();
1217 if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
1218
1219 Integer count = kvCount.get(qual);
1220 if (count == null) {
1221 kvCount.put(qual, 1);
1222 } else {
1223 kvCount.put(qual, count + 1);
1224 }
1225 count = kvCount.get(qual);
1226
1227 Get get = new Get(kv.getRow());
1228 get.setMaxVersions(count);
1229 get.addColumn(family, qual);
1230
1231 List<KeyValue> result = get(get);
1232
1233 if (result.size() < count) {
1234
1235 kv.updateLatestStamp(byteNow);
1236 continue;
1237 }
1238 if (result.size() > count) {
1239 throw new RuntimeException("Unexpected size: " + result.size());
1240 }
1241 KeyValue getkv = result.get(count - 1);
1242 Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
1243 getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
1244 } else {
1245 kv.updateLatestStamp(byteNow);
1246 }
1247 }
1248 }
1249
1250 if (writeToWAL) {
1251
1252
1253
1254
1255
1256
1257
1258
1259 WALEdit walEdit = new WALEdit();
1260 addFamilyMapToWALEdit(familyMap, walEdit);
1261 this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
1262 walEdit, now);
1263 }
1264
1265
1266 long addedSize = applyFamilyMapToMemstore(familyMap);
1267 flush = isFlushSize(memstoreSize.addAndGet(addedSize));
1268 } finally {
1269 this.updatesLock.readLock().unlock();
1270 }
1271
1272 if (flush) {
1273
1274 requestFlush();
1275 }
1276 }
1277
1278
1279
1280
1281
1282 public void put(Put put) throws IOException {
1283 this.put(put, null, put.getWriteToWAL());
1284 }
1285
1286
1287
1288
1289
1290
1291 public void put(Put put, boolean writeToWAL) throws IOException {
1292 this.put(put, null, writeToWAL);
1293 }
1294
1295
1296
1297
1298
1299
1300 public void put(Put put, Integer lockid) throws IOException {
1301 this.put(put, lockid, put.getWriteToWAL());
1302 }
1303
1304
1305
1306
1307
1308
1309
1310 public void put(Put put, Integer lockid, boolean writeToWAL)
1311 throws IOException {
1312 checkReadOnly();
1313
1314
1315
1316
1317
1318 checkResources();
1319 startRegionOperation();
1320 try {
1321
1322
1323
1324
1325
1326 byte [] row = put.getRow();
1327
1328 Integer lid = getLock(lockid, row, true);
1329
1330 try {
1331
1332 put(put.getFamilyMap(), writeToWAL);
1333 } finally {
1334 if(lockid == null) releaseRowLock(lid);
1335 }
1336 } finally {
1337 closeRegionOperation();
1338 }
1339 }
1340
1341
1342
1343
1344
1345
1346 private static class BatchOperationInProgress<T> {
1347 T[] operations;
1348 OperationStatusCode[] retCodes;
1349 int nextIndexToProcess = 0;
1350
1351 public BatchOperationInProgress(T[] operations) {
1352 this.operations = operations;
1353 retCodes = new OperationStatusCode[operations.length];
1354 Arrays.fill(retCodes, OperationStatusCode.NOT_RUN);
1355 }
1356
1357 public boolean isDone() {
1358 return nextIndexToProcess == operations.length;
1359 }
1360 }
1361
1362
1363
1364
1365
1366 public OperationStatusCode[] put(Put[] puts) throws IOException {
1367 @SuppressWarnings("unchecked")
1368 Pair<Put, Integer> putsAndLocks[] = new Pair[puts.length];
1369
1370 for (int i = 0; i < puts.length; i++) {
1371 putsAndLocks[i] = new Pair<Put, Integer>(puts[i], null);
1372 }
1373 return put(putsAndLocks);
1374 }
1375
1376
1377
1378
1379
1380
1381 public OperationStatusCode[] put(Pair<Put, Integer>[] putsAndLocks) throws IOException {
1382 BatchOperationInProgress<Pair<Put, Integer>> batchOp =
1383 new BatchOperationInProgress<Pair<Put,Integer>>(putsAndLocks);
1384
1385 while (!batchOp.isDone()) {
1386 checkReadOnly();
1387 checkResources();
1388
1389 long newSize;
1390 startRegionOperation();
1391 try {
1392 long addedSize = doMiniBatchPut(batchOp);
1393 newSize = memstoreSize.addAndGet(addedSize);
1394 } finally {
1395 closeRegionOperation();
1396 }
1397 if (isFlushSize(newSize)) {
1398 requestFlush();
1399 }
1400 }
1401 return batchOp.retCodes;
1402 }
1403
1404 private long doMiniBatchPut(BatchOperationInProgress<Pair<Put, Integer>> batchOp) throws IOException {
1405 long now = EnvironmentEdgeManager.currentTimeMillis();
1406 byte[] byteNow = Bytes.toBytes(now);
1407 boolean locked = false;
1408
1409
1410 List<Integer> acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
1411
1412 int firstIndex = batchOp.nextIndexToProcess;
1413 int lastIndexExclusive = firstIndex;
1414 boolean success = false;
1415 try {
1416
1417
1418
1419
1420 int numReadyToWrite = 0;
1421 while (lastIndexExclusive < batchOp.operations.length) {
1422 Pair<Put, Integer> nextPair = batchOp.operations[lastIndexExclusive];
1423 Put put = nextPair.getFirst();
1424 Integer providedLockId = nextPair.getSecond();
1425
1426
1427 try {
1428 checkFamilies(put.getFamilyMap().keySet());
1429 } catch (NoSuchColumnFamilyException nscf) {
1430 LOG.warn("No such column family in batch put", nscf);
1431 batchOp.retCodes[lastIndexExclusive] = OperationStatusCode.BAD_FAMILY;
1432 lastIndexExclusive++;
1433 continue;
1434 }
1435
1436
1437
1438 boolean shouldBlock = numReadyToWrite == 0;
1439 Integer acquiredLockId = getLock(providedLockId, put.getRow(), shouldBlock);
1440 if (acquiredLockId == null) {
1441
1442 assert !shouldBlock : "Should never fail to get lock when blocking";
1443 break;
1444 }
1445 if (providedLockId == null) {
1446 acquiredLocks.add(acquiredLockId);
1447 }
1448 lastIndexExclusive++;
1449 numReadyToWrite++;
1450 }
1451
1452 if (numReadyToWrite <= 0) return 0L;
1453
1454
1455
1456
1457
1458
1459 for (int i = firstIndex; i < lastIndexExclusive; i++) {
1460 updateKVTimestamps(
1461 batchOp.operations[i].getFirst().getFamilyMap().values(),
1462 byteNow);
1463 }
1464
1465
1466 this.updatesLock.readLock().lock();
1467 locked = true;
1468
1469
1470
1471
1472 WALEdit walEdit = new WALEdit();
1473 for (int i = firstIndex; i < lastIndexExclusive; i++) {
1474
1475 if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;
1476
1477 Put p = batchOp.operations[i].getFirst();
1478 if (!p.getWriteToWAL()) continue;
1479 addFamilyMapToWALEdit(p.getFamilyMap(), walEdit);
1480 }
1481
1482
1483 this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
1484 walEdit, now);
1485
1486
1487
1488
1489 long addedSize = 0;
1490 for (int i = firstIndex; i < lastIndexExclusive; i++) {
1491 if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;
1492
1493 Put p = batchOp.operations[i].getFirst();
1494 addedSize += applyFamilyMapToMemstore(p.getFamilyMap());
1495 batchOp.retCodes[i] = OperationStatusCode.SUCCESS;
1496 }
1497 success = true;
1498 return addedSize;
1499 } finally {
1500 if (locked)
1501 this.updatesLock.readLock().unlock();
1502
1503 for (Integer toRelease : acquiredLocks) {
1504 releaseRowLock(toRelease);
1505 }
1506 if (!success) {
1507 for (int i = firstIndex; i < lastIndexExclusive; i++) {
1508 if (batchOp.retCodes[i] == OperationStatusCode.NOT_RUN) {
1509 batchOp.retCodes[i] = OperationStatusCode.FAILURE;
1510 }
1511 }
1512 }
1513 batchOp.nextIndexToProcess = lastIndexExclusive;
1514 }
1515 }
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532 public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
1533 byte [] expectedValue, Writable w, Integer lockId, boolean writeToWAL)
1534 throws IOException{
1535 checkReadOnly();
1536
1537
1538 checkResources();
1539 boolean isPut = w instanceof Put;
1540 if (!isPut && !(w instanceof Delete))
1541 throw new DoNotRetryIOException("Action must be Put or Delete");
1542 Row r = (Row)w;
1543 if (Bytes.compareTo(row, r.getRow()) != 0) {
1544 throw new DoNotRetryIOException("Action's getRow must match the passed row");
1545 }
1546
1547 startRegionOperation();
1548 try {
1549 RowLock lock = isPut ? ((Put)w).getRowLock() : ((Delete)w).getRowLock();
1550 Get get = new Get(row, lock);
1551 checkFamily(family);
1552 get.addColumn(family, qualifier);
1553
1554
1555 Integer lid = getLock(lockId, get.getRow(), true);
1556 List<KeyValue> result = new ArrayList<KeyValue>();
1557 try {
1558 result = get(get);
1559
1560 boolean matches = false;
1561 if (result.size() == 0 &&
1562 (expectedValue == null || expectedValue.length == 0)) {
1563 matches = true;
1564 } else if (result.size() == 1) {
1565
1566 byte [] actualValue = result.get(0).getValue();
1567 matches = Bytes.equals(expectedValue, actualValue);
1568 }
1569
1570 if (matches) {
1571
1572 if (isPut) {
1573 put(((Put)w).getFamilyMap(), writeToWAL);
1574 } else {
1575 Delete d = (Delete)w;
1576 prepareDelete(d);
1577 delete(d.getFamilyMap(), writeToWAL);
1578 }
1579 return true;
1580 }
1581 return false;
1582 } finally {
1583 if(lockId == null) releaseRowLock(lid);
1584 }
1585 } finally {
1586 closeRegionOperation();
1587 }
1588 }
1589
1590
1591
1592
1593
1594
1595 private void updateKVTimestamps(
1596 final Iterable<List<KeyValue>> keyLists, final byte[] now) {
1597 for (List<KeyValue> keys: keyLists) {
1598 if (keys == null) continue;
1599 for (KeyValue key : keys) {
1600 key.updateLatestStamp(now);
1601 }
1602 }
1603 }
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614 private void checkResources() {
1615
1616
1617 if (this.getRegionInfo().isMetaRegion()) return;
1618
1619 boolean blocked = false;
1620 while (this.memstoreSize.get() > this.blockingMemStoreSize) {
1621 requestFlush();
1622 if (!blocked) {
1623 LOG.info("Blocking updates for '" + Thread.currentThread().getName() +
1624 "' on region " + Bytes.toStringBinary(getRegionName()) +
1625 ": memstore size " +
1626 StringUtils.humanReadableInt(this.memstoreSize.get()) +
1627 " is >= than blocking " +
1628 StringUtils.humanReadableInt(this.blockingMemStoreSize) + " size");
1629 }
1630 blocked = true;
1631 synchronized(this) {
1632 try {
1633 wait(threadWakeFrequency);
1634 } catch (InterruptedException e) {
1635
1636 }
1637 }
1638 }
1639 if (blocked) {
1640 LOG.info("Unblocking updates for region " + this + " '"
1641 + Thread.currentThread().getName() + "'");
1642 }
1643 }
1644
1645
1646
1647
1648 protected void checkReadOnly() throws IOException {
1649 if (this.writestate.isReadOnly()) {
1650 throw new IOException("region is read only");
1651 }
1652 }
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662 private void put(final byte [] family, final List<KeyValue> edits)
1663 throws IOException {
1664 Map<byte[], List<KeyValue>> familyMap = new HashMap<byte[], List<KeyValue>>();
1665 familyMap.put(family, edits);
1666 this.put(familyMap, true);
1667 }
1668
1669
1670
1671
1672
1673
1674
1675
1676 private void put(final Map<byte [], List<KeyValue>> familyMap,
1677 boolean writeToWAL) throws IOException {
1678 long now = EnvironmentEdgeManager.currentTimeMillis();
1679 byte[] byteNow = Bytes.toBytes(now);
1680 boolean flush = false;
1681 this.updatesLock.readLock().lock();
1682 try {
1683 checkFamilies(familyMap.keySet());
1684 updateKVTimestamps(familyMap.values(), byteNow);
1685
1686
1687
1688
1689
1690 if (writeToWAL) {
1691 WALEdit walEdit = new WALEdit();
1692 addFamilyMapToWALEdit(familyMap, walEdit);
1693 this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
1694 walEdit, now);
1695 }
1696
1697 long addedSize = applyFamilyMapToMemstore(familyMap);
1698 flush = isFlushSize(memstoreSize.addAndGet(addedSize));
1699 } finally {
1700 this.updatesLock.readLock().unlock();
1701 }
1702 if (flush) {
1703
1704 requestFlush();
1705 }
1706 }
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717 private long applyFamilyMapToMemstore(Map<byte[], List<KeyValue>> familyMap) {
1718 ReadWriteConsistencyControl.WriteEntry w = null;
1719 long size = 0;
1720 try {
1721 w = rwcc.beginMemstoreInsert();
1722
1723 for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
1724 byte[] family = e.getKey();
1725 List<KeyValue> edits = e.getValue();
1726
1727 Store store = getStore(family);
1728 for (KeyValue kv: edits) {
1729 kv.setMemstoreTS(w.getWriteNumber());
1730 size += store.add(kv);
1731 }
1732 }
1733 } finally {
1734 rwcc.completeMemstoreInsert(w);
1735 }
1736 return size;
1737 }
1738
1739
1740
1741
1742
1743 private void checkFamilies(Collection<byte[]> families)
1744 throws NoSuchColumnFamilyException {
1745 for (byte[] family : families) {
1746 checkFamily(family);
1747 }
1748 }
1749
1750
1751
1752
1753
1754
1755
1756 private void addFamilyMapToWALEdit(Map<byte[], List<KeyValue>> familyMap,
1757 WALEdit walEdit) {
1758 for (List<KeyValue> edits : familyMap.values()) {
1759 for (KeyValue kv : edits) {
1760 walEdit.add(kv);
1761 }
1762 }
1763 }
1764
1765 private void requestFlush() {
1766 if (this.flushRequester == null) {
1767 return;
1768 }
1769 synchronized (writestate) {
1770 if (this.writestate.isFlushRequested()) {
1771 return;
1772 }
1773 writestate.flushRequested = true;
1774 }
1775
1776 this.flushRequester.requestFlush(this);
1777 if (LOG.isDebugEnabled()) {
1778 LOG.debug("Flush requested on " + this);
1779 }
1780 }
1781
1782
1783
1784
1785
1786 private boolean isFlushSize(final long size) {
1787 return size > this.memstoreFlushSize;
1788 }
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826 protected long replayRecoveredEditsIfAny(final Path regiondir,
1827 final long minSeqId, final CancelableProgressable reporter)
1828 throws UnsupportedEncodingException, IOException {
1829 long seqid = minSeqId;
1830 NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs, regiondir);
1831 if (files == null || files.isEmpty()) return seqid;
1832 for (Path edits: files) {
1833 if (edits == null || !this.fs.exists(edits)) {
1834 LOG.warn("Null or non-existent edits file: " + edits);
1835 continue;
1836 }
1837 if (isZeroLengthThenDelete(this.fs, edits)) continue;
1838 try {
1839 seqid = replayRecoveredEdits(edits, seqid, reporter);
1840 } catch (IOException e) {
1841 boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
1842 if (skipErrors) {
1843 Path p = HLog.moveAsideBadEditsFile(fs, edits);
1844 LOG.error("hbase.skip.errors=true so continuing. Renamed " + edits +
1845 " as " + p, e);
1846 } else {
1847 throw e;
1848 }
1849 }
1850 }
1851 if (seqid > minSeqId) {
1852
1853 internalFlushcache(null, seqid);
1854 }
1855
1856 for (Path file: files) {
1857 if (!this.fs.delete(file, false)) {
1858 LOG.error("Failed delete of " + file);
1859 } else {
1860 LOG.debug("Deleted recovered.edits file=" + file);
1861 }
1862 }
1863 return seqid;
1864 }
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875 private long replayRecoveredEdits(final Path edits,
1876 final long minSeqId, final CancelableProgressable reporter)
1877 throws IOException {
1878 LOG.info("Replaying edits from " + edits + "; minSequenceid=" + minSeqId);
1879 HLog.Reader reader = HLog.getReader(this.fs, edits, conf);
1880 try {
1881 long currentEditSeqId = minSeqId;
1882 long firstSeqIdInLog = -1;
1883 long skippedEdits = 0;
1884 long editsCount = 0;
1885 long intervalEdits = 0;
1886 HLog.Entry entry;
1887 Store store = null;
1888
1889 try {
1890
1891 int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
1892 2000);
1893
1894 int period = this.conf.getInt("hbase.hstore.report.period",
1895 this.conf.getInt("hbase.master.assignment.timeoutmonitor.timeout",
1896 180000) / 2);
1897 long lastReport = EnvironmentEdgeManager.currentTimeMillis();
1898
1899 while ((entry = reader.next()) != null) {
1900 HLogKey key = entry.getKey();
1901 WALEdit val = entry.getEdit();
1902
1903 if (reporter != null) {
1904 intervalEdits += val.size();
1905 if (intervalEdits >= interval) {
1906
1907 intervalEdits = 0;
1908 long cur = EnvironmentEdgeManager.currentTimeMillis();
1909 if (lastReport + period <= cur) {
1910
1911 if(!reporter.progress()) {
1912 String msg = "Progressable reporter failed, stopping replay";
1913 LOG.warn(msg);
1914 throw new IOException(msg);
1915 }
1916 lastReport = cur;
1917 }
1918 }
1919 }
1920
1921 if (firstSeqIdInLog == -1) {
1922 firstSeqIdInLog = key.getLogSeqNum();
1923 }
1924
1925 if (key.getLogSeqNum() <= currentEditSeqId) {
1926 skippedEdits++;
1927 continue;
1928 }
1929 currentEditSeqId = key.getLogSeqNum();
1930 boolean flush = false;
1931 for (KeyValue kv: val.getKeyValues()) {
1932
1933
1934 if (kv.matchingFamily(HLog.METAFAMILY) ||
1935 !Bytes.equals(key.getEncodedRegionName(), this.regionInfo.getEncodedNameAsBytes())) {
1936 skippedEdits++;
1937 continue;
1938 }
1939
1940 if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
1941 store = this.stores.get(kv.getFamily());
1942 }
1943 if (store == null) {
1944
1945
1946 LOG.warn("No family for " + kv);
1947 skippedEdits++;
1948 continue;
1949 }
1950
1951
1952
1953 flush = restoreEdit(store, kv);
1954 editsCount++;
1955 }
1956 if (flush) internalFlushcache(null, currentEditSeqId);
1957 }
1958 } catch (EOFException eof) {
1959 Path p = HLog.moveAsideBadEditsFile(fs, edits);
1960 LOG.warn("Encountered EOF. Most likely due to Master failure during " +
1961 "log spliting, so we have this data in another edit. " +
1962 "Continuing, but renaming " + edits + " as " + p, eof);
1963 } catch (IOException ioe) {
1964
1965
1966 if (ioe.getCause() instanceof ParseException) {
1967 Path p = HLog.moveAsideBadEditsFile(fs, edits);
1968 LOG.warn("File corruption encountered! " +
1969 "Continuing, but renaming " + edits + " as " + p, ioe);
1970 } else {
1971
1972
1973 throw ioe;
1974 }
1975 }
1976 if (LOG.isDebugEnabled()) {
1977 LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits +
1978 ", firstSequenceidInLog=" + firstSeqIdInLog +
1979 ", maxSequenceidInLog=" + currentEditSeqId);
1980 }
1981 return currentEditSeqId;
1982 } finally {
1983 reader.close();
1984 }
1985 }
1986
1987
1988
1989
1990
1991
1992
1993 protected boolean restoreEdit(final Store s, final KeyValue kv) {
1994 return isFlushSize(this.memstoreSize.addAndGet(s.add(kv)));
1995 }
1996
1997
1998
1999
2000
2001
2002
2003 private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
2004 throws IOException {
2005 FileStatus stat = fs.getFileStatus(p);
2006 if (stat.getLen() > 0) return false;
2007 LOG.warn("File " + p + " is zero-length, deleting.");
2008 fs.delete(p, false);
2009 return true;
2010 }
2011
2012 protected Store instantiateHStore(Path tableDir, HColumnDescriptor c)
2013 throws IOException {
2014 return new Store(tableDir, this, c, this.fs, this.conf);
2015 }
2016
2017
2018
2019
2020
2021
2022
2023
2024 public Store getStore(final byte [] column) {
2025 return this.stores.get(column);
2026 }
2027
2028
2029
2030
2031
2032
2033 private void checkRow(final byte [] row) throws IOException {
2034 if(!rowIsInRange(regionInfo, row)) {
2035 throw new WrongRegionException("Requested row out of range for " +
2036 "HRegion " + this + ", startKey='" +
2037 Bytes.toStringBinary(regionInfo.getStartKey()) + "', getEndKey()='" +
2038 Bytes.toStringBinary(regionInfo.getEndKey()) + "', row='" +
2039 Bytes.toStringBinary(row) + "'");
2040 }
2041 }
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066 public Integer obtainRowLock(final byte [] row) throws IOException {
2067 startRegionOperation();
2068 try {
2069 return internalObtainRowLock(row, true);
2070 } finally {
2071 closeRegionOperation();
2072 }
2073 }
2074
2075
2076
2077
2078
2079
2080
2081 public Integer tryObtainRowLock(final byte[] row) throws IOException {
2082 startRegionOperation();
2083 try {
2084 return internalObtainRowLock(row, false);
2085 } finally {
2086 closeRegionOperation();
2087 }
2088 }
2089
2090
2091
2092
2093
2094
2095
2096 private Integer internalObtainRowLock(final byte[] row, boolean waitForLock)
2097 throws IOException {
2098 checkRow(row);
2099 startRegionOperation();
2100 try {
2101 HashedBytes rowKey = new HashedBytes(row);
2102 CountDownLatch rowLatch = new CountDownLatch(1);
2103
2104
2105 while (true) {
2106 CountDownLatch existingLatch = lockedRows.putIfAbsent(rowKey, rowLatch);
2107 if (existingLatch == null) {
2108 break;
2109 } else {
2110
2111 if (!waitForLock) {
2112 return null;
2113 }
2114 try {
2115 if (!existingLatch.await(this.rowLockWaitDuration,
2116 TimeUnit.MILLISECONDS)) {
2117 return null;
2118 }
2119 } catch (InterruptedException ie) {
2120
2121 }
2122 }
2123 }
2124
2125
2126 while (true) {
2127 Integer lockId = lockIdGenerator.incrementAndGet();
2128 HashedBytes existingRowKey = lockIds.putIfAbsent(lockId, rowKey);
2129 if (existingRowKey == null) {
2130 return lockId;
2131 } else {
2132
2133 lockIdGenerator.set(rand.nextInt());
2134 }
2135 }
2136 } finally {
2137 closeRegionOperation();
2138 }
2139 }
2140
2141
2142
2143
2144
2145
2146 byte[] getRowFromLock(final Integer lockid) {
2147 HashedBytes rowKey = lockIds.get(lockid);
2148 return rowKey == null ? null : rowKey.getBytes();
2149 }
2150
2151
2152
2153
2154
2155 void releaseRowLock(final Integer lockId) {
2156 HashedBytes rowKey = lockIds.remove(lockId);
2157 if (rowKey == null) {
2158 LOG.warn("Release unknown lockId: " + lockId);
2159 return;
2160 }
2161 CountDownLatch rowLatch = lockedRows.remove(rowKey);
2162 if (rowLatch == null) {
2163 LOG.error("Releases row not locked, lockId: " + lockId + " row: "
2164 + rowKey);
2165 return;
2166 }
2167 rowLatch.countDown();
2168 }
2169
2170
2171
2172
2173
2174
2175 boolean isRowLocked(final Integer lockId) {
2176 return lockIds.containsKey(lockId);
2177 }
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188 private Integer getLock(Integer lockid, byte [] row, boolean waitForLock)
2189 throws IOException {
2190 Integer lid = null;
2191 if (lockid == null) {
2192 lid = internalObtainRowLock(row, waitForLock);
2193 } else {
2194 if (!isRowLocked(lockid)) {
2195 throw new IOException("Invalid row lock");
2196 }
2197 lid = lockid;
2198 }
2199 return lid;
2200 }
2201
2202 public void bulkLoadHFile(String hfilePath, byte[] familyName)
2203 throws IOException {
2204 startRegionOperation();
2205 try {
2206 Store store = getStore(familyName);
2207 if (store == null) {
2208 throw new DoNotRetryIOException(
2209 "No such column family " + Bytes.toStringBinary(familyName));
2210 }
2211 store.bulkLoadHFile(hfilePath);
2212 } finally {
2213 closeRegionOperation();
2214 }
2215
2216 }
2217
2218
2219 @Override
2220 public boolean equals(Object o) {
2221 if (!(o instanceof HRegion)) {
2222 return false;
2223 }
2224 return this.hashCode() == ((HRegion)o).hashCode();
2225 }
2226
2227 @Override
2228 public int hashCode() {
2229 return Bytes.hashCode(this.regionInfo.getRegionName());
2230 }
2231
2232 @Override
2233 public String toString() {
2234 return this.regionInfo.getRegionNameAsString();
2235 }
2236
2237
2238 public Path getTableDir() {
2239 return this.tableDir;
2240 }
2241
2242
2243
2244
2245
2246
2247 class RegionScanner implements InternalScanner {
2248
2249 KeyValueHeap storeHeap = null;
2250 private final byte [] stopRow;
2251 private Filter filter;
2252 private List<KeyValue> results = new ArrayList<KeyValue>();
2253 private int batch;
2254 private int isScan;
2255 private boolean filterClosed = false;
2256 private long readPt;
2257
2258 public HRegionInfo getRegionName() {
2259 return regionInfo;
2260 }
2261 RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
2262
2263 this.filter = scan.getFilter();
2264 this.batch = scan.getBatch();
2265 if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
2266 this.stopRow = null;
2267 } else {
2268 this.stopRow = scan.getStopRow();
2269 }
2270
2271
2272 this.isScan = scan.isGetScan() ? -1 : 0;
2273
2274 this.readPt = ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
2275
2276 List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
2277 if (additionalScanners != null) {
2278 scanners.addAll(additionalScanners);
2279 }
2280
2281 for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
2282 scan.getFamilyMap().entrySet()) {
2283 Store store = stores.get(entry.getKey());
2284 scanners.add(store.getScanner(scan, entry.getValue()));
2285 }
2286 this.storeHeap = new KeyValueHeap(scanners, comparator);
2287 }
2288
2289 RegionScanner(Scan scan) throws IOException {
2290 this(scan, null);
2291 }
2292
2293
2294
2295
2296 protected void resetFilters() {
2297 if (filter != null) {
2298 filter.reset();
2299 }
2300 }
2301
2302 @Override
2303 public synchronized boolean next(List<KeyValue> outResults, int limit)
2304 throws IOException {
2305 if (this.filterClosed) {
2306 throw new UnknownScannerException("Scanner was closed (timed out?) " +
2307 "after we renewed it. Could be caused by a very slow scanner " +
2308 "or a lengthy garbage collection");
2309 }
2310 startRegionOperation();
2311 try {
2312
2313
2314 ReadWriteConsistencyControl.setThreadReadPoint(this.readPt);
2315
2316 results.clear();
2317 boolean returnResult = nextInternal(limit);
2318
2319 outResults.addAll(results);
2320 resetFilters();
2321 if (isFilterDone()) {
2322 return false;
2323 }
2324 return returnResult;
2325 } finally {
2326 closeRegionOperation();
2327 }
2328 }
2329
2330 @Override
2331 public synchronized boolean next(List<KeyValue> outResults)
2332 throws IOException {
2333
2334 return next(outResults, batch);
2335 }
2336
2337
2338
2339
2340 synchronized boolean isFilterDone() {
2341 return this.filter != null && this.filter.filterAllRemaining();
2342 }
2343
2344 private boolean nextInternal(int limit) throws IOException {
2345 while (true) {
2346 byte [] currentRow = peekRow();
2347 if (isStopRow(currentRow)) {
2348 if (filter != null && filter.hasFilterRow()) {
2349 filter.filterRow(results);
2350 }
2351 if (filter != null && filter.filterRow()) {
2352 results.clear();
2353 }
2354
2355 return false;
2356 } else if (filterRowKey(currentRow)) {
2357 nextRow(currentRow);
2358 } else {
2359 byte [] nextRow;
2360 do {
2361 this.storeHeap.next(results, limit - results.size());
2362 if (limit > 0 && results.size() == limit) {
2363 if (this.filter != null && filter.hasFilterRow()) throw new IncompatibleFilterException(
2364 "Filter with filterRow(List<KeyValue>) incompatible with scan with limit!");
2365 return true;
2366 }
2367 } while (Bytes.equals(currentRow, nextRow = peekRow()));
2368
2369 final boolean stopRow = isStopRow(nextRow);
2370
2371
2372
2373
2374 if (filter != null && filter.hasFilterRow()) {
2375 filter.filterRow(results);
2376 }
2377
2378 if (results.isEmpty() || filterRow()) {
2379
2380
2381
2382
2383
2384 nextRow(currentRow);
2385
2386
2387
2388
2389 if (!stopRow) continue;
2390 }
2391 return !stopRow;
2392 }
2393 }
2394 }
2395
2396 private boolean filterRow() {
2397 return filter != null
2398 && filter.filterRow();
2399 }
2400 private boolean filterRowKey(byte[] row) {
2401 return filter != null
2402 && filter.filterRowKey(row, 0, row.length);
2403 }
2404
2405 protected void nextRow(byte [] currentRow) throws IOException {
2406 while (Bytes.equals(currentRow, peekRow())) {
2407 this.storeHeap.next(MOCKED_LIST);
2408 }
2409 results.clear();
2410 resetFilters();
2411 }
2412
2413 private byte[] peekRow() {
2414 KeyValue kv = this.storeHeap.peek();
2415 return kv == null ? null : kv.getRow();
2416 }
2417
2418 private boolean isStopRow(byte [] currentRow) {
2419 return currentRow == null ||
2420 (stopRow != null &&
2421 comparator.compareRows(stopRow, 0, stopRow.length,
2422 currentRow, 0, currentRow.length) <= isScan);
2423 }
2424
2425 @Override
2426 public synchronized void close() {
2427 if (storeHeap != null) {
2428 storeHeap.close();
2429 storeHeap = null;
2430 }
2431 this.filterClosed = true;
2432 }
2433 }
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
2455
2456
2457 public static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf,
2458 HRegionInfo regionInfo, FlushRequester flushListener) {
2459 try {
2460 @SuppressWarnings("unchecked")
2461 Class<? extends HRegion> regionClass =
2462 (Class<? extends HRegion>) conf.getClass(HConstants.REGION_IMPL, HRegion.class);
2463
2464 Constructor<? extends HRegion> c =
2465 regionClass.getConstructor(Path.class, HLog.class, FileSystem.class,
2466 Configuration.class, HRegionInfo.class, FlushRequester.class);
2467
2468 return c.newInstance(tableDir, log, fs, conf, regionInfo, flushListener);
2469 } catch (Throwable e) {
2470
2471 throw new IllegalStateException("Could not instantiate a region instance.", e);
2472 }
2473 }
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488 public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
2489 final Configuration conf)
2490 throws IOException {
2491 Path tableDir =
2492 HTableDescriptor.getTableDir(rootDir, info.getTableDesc().getName());
2493 Path regionDir = HRegion.getRegionDir(tableDir, info.getEncodedName());
2494 FileSystem fs = FileSystem.get(conf);
2495 fs.mkdirs(regionDir);
2496 HRegion region = HRegion.newHRegion(tableDir,
2497 new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME),
2498 new Path(regionDir, HConstants.HREGION_OLDLOGDIR_NAME), conf),
2499 fs, conf, info, null);
2500 region.initialize();
2501 return region;
2502 }
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516 public static HRegion openHRegion(final HRegionInfo info, final HLog wal,
2517 final Configuration conf)
2518 throws IOException {
2519 return openHRegion(info, wal, conf, null, null);
2520 }
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536 public static HRegion openHRegion(final HRegionInfo info, final HLog wal,
2537 final Configuration conf, final FlushRequester flusher,
2538 final CancelableProgressable reporter)
2539 throws IOException {
2540 if (LOG.isDebugEnabled()) {
2541 LOG.debug("Opening region: " + info);
2542 }
2543 if (info == null) {
2544 throw new NullPointerException("Passed region info is null");
2545 }
2546 Path dir = HTableDescriptor.getTableDir(FSUtils.getRootDir(conf),
2547 info.getTableDesc().getName());
2548 HRegion r = HRegion.newHRegion(dir, wal, FileSystem.get(conf), conf, info,
2549 flusher);
2550 return r.openHRegion(reporter);
2551 }
2552
2553
2554
2555
2556
2557
2558
2559
2560 protected HRegion openHRegion(final CancelableProgressable reporter)
2561 throws IOException {
2562 checkCompressionCodecs();
2563
2564 long seqid = initialize(reporter);
2565 if (this.log != null) {
2566 this.log.setSequenceNumber(seqid);
2567 }
2568 return this;
2569 }
2570
2571 private void checkCompressionCodecs() throws IOException {
2572 for (HColumnDescriptor fam: regionInfo.getTableDesc().getColumnFamilies()) {
2573 CompressionTest.testCompression(fam.getCompression());
2574 CompressionTest.testCompression(fam.getCompactionCompression());
2575 }
2576 }
2577
2578
2579
2580
2581
2582
2583
2584
2585
2586
2587
2588 public static void addRegionToMETA(HRegion meta, HRegion r)
2589 throws IOException {
2590 meta.checkResources();
2591
2592 byte[] row = r.getRegionName();
2593 Integer lid = meta.obtainRowLock(row);
2594 try {
2595 final List<KeyValue> edits = new ArrayList<KeyValue>(1);
2596 edits.add(new KeyValue(row, HConstants.CATALOG_FAMILY,
2597 HConstants.REGIONINFO_QUALIFIER,
2598 EnvironmentEdgeManager.currentTimeMillis(),
2599 Writables.getBytes(r.getRegionInfo())));
2600 meta.put(HConstants.CATALOG_FAMILY, edits);
2601 } finally {
2602 meta.releaseRowLock(lid);
2603 }
2604 }
2605
2606
2607
2608
2609
2610
2611
2612
2613
2614 public static void deleteRegion(FileSystem fs, Path rootdir, HRegionInfo info)
2615 throws IOException {
2616 deleteRegion(fs, HRegion.getRegionDir(rootdir, info));
2617 }
2618
2619 private static void deleteRegion(FileSystem fs, Path regiondir)
2620 throws IOException {
2621 if (LOG.isDebugEnabled()) {
2622 LOG.debug("DELETING region " + regiondir.toString());
2623 }
2624 if (!fs.delete(regiondir, true)) {
2625 LOG.warn("Failed delete of " + regiondir);
2626 }
2627 }
2628
2629
2630
2631
2632
2633
2634
2635
2636 public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
2637 return new Path(
2638 HTableDescriptor.getTableDir(rootdir, info.getTableDesc().getName()),
2639 info.getEncodedName());
2640 }
2641
2642
2643
2644
2645
2646
2647
2648
2649
2650 public static boolean rowIsInRange(HRegionInfo info, final byte [] row) {
2651 return ((info.getStartKey().length == 0) ||
2652 (Bytes.compareTo(info.getStartKey(), row) <= 0)) &&
2653 ((info.getEndKey().length == 0) ||
2654 (Bytes.compareTo(info.getEndKey(), row) > 0));
2655 }
2656
2657
2658
2659
2660
2661
2662
2663
2664
2665
2666 public static void makeColumnFamilyDirs(FileSystem fs, Path tabledir,
2667 final HRegionInfo hri, byte [] colFamily)
2668 throws IOException {
2669 Path dir = Store.getStoreHomedir(tabledir, hri.getEncodedName(), colFamily);
2670 if (!fs.mkdirs(dir)) {
2671 LOG.warn("Failed to create " + dir);
2672 }
2673 }
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683 public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
2684 throws IOException {
2685 HRegion a = srcA;
2686 HRegion b = srcB;
2687
2688
2689
2690 if (srcA.getStartKey() == null) {
2691 if (srcB.getStartKey() == null) {
2692 throw new IOException("Cannot merge two regions with null start key");
2693 }
2694
2695 } else if ((srcB.getStartKey() == null) ||
2696 (Bytes.compareTo(srcA.getStartKey(), srcB.getStartKey()) > 0)) {
2697 a = srcB;
2698 b = srcA;
2699 }
2700
2701 if (!(Bytes.compareTo(a.getEndKey(), b.getStartKey()) == 0)) {
2702 throw new IOException("Cannot merge non-adjacent regions");
2703 }
2704 return merge(a, b);
2705 }
2706
2707
2708
2709
2710
2711
2712
2713
2714
2715 public static HRegion merge(HRegion a, HRegion b) throws IOException {
2716 if (!a.getRegionInfo().getTableDesc().getNameAsString().equals(
2717 b.getRegionInfo().getTableDesc().getNameAsString())) {
2718 throw new IOException("Regions do not belong to the same table");
2719 }
2720
2721 FileSystem fs = a.getFilesystem();
2722
2723
2724
2725 a.flushcache();
2726 b.flushcache();
2727
2728
2729
2730 a.compactStores(true);
2731 if (LOG.isDebugEnabled()) {
2732 LOG.debug("Files for region: " + a);
2733 listPaths(fs, a.getRegionDir());
2734 }
2735 b.compactStores(true);
2736 if (LOG.isDebugEnabled()) {
2737 LOG.debug("Files for region: " + b);
2738 listPaths(fs, b.getRegionDir());
2739 }
2740
2741 Configuration conf = a.getConf();
2742 HTableDescriptor tabledesc = a.getTableDesc();
2743 HLog log = a.getLog();
2744 Path tableDir = a.getTableDir();
2745
2746
2747 final byte[] startKey =
2748 (a.comparator.matchingRows(a.getStartKey(), 0, a.getStartKey().length,
2749 HConstants.EMPTY_BYTE_ARRAY, 0, HConstants.EMPTY_BYTE_ARRAY.length)
2750 || b.comparator.matchingRows(b.getStartKey(), 0,
2751 b.getStartKey().length, HConstants.EMPTY_BYTE_ARRAY, 0,
2752 HConstants.EMPTY_BYTE_ARRAY.length))
2753 ? HConstants.EMPTY_BYTE_ARRAY
2754 : (a.comparator.compareRows(a.getStartKey(), 0, a.getStartKey().length,
2755 b.getStartKey(), 0, b.getStartKey().length) <= 0
2756 ? a.getStartKey()
2757 : b.getStartKey());
2758 final byte[] endKey =
2759 (a.comparator.matchingRows(a.getEndKey(), 0, a.getEndKey().length,
2760 HConstants.EMPTY_BYTE_ARRAY, 0, HConstants.EMPTY_BYTE_ARRAY.length)
2761 || a.comparator.matchingRows(b.getEndKey(), 0, b.getEndKey().length,
2762 HConstants.EMPTY_BYTE_ARRAY, 0,
2763 HConstants.EMPTY_BYTE_ARRAY.length))
2764 ? HConstants.EMPTY_BYTE_ARRAY
2765 : (a.comparator.compareRows(a.getEndKey(), 0, a.getEndKey().length,
2766 b.getEndKey(), 0, b.getEndKey().length) <= 0
2767 ? b.getEndKey()
2768 : a.getEndKey());
2769
2770 HRegionInfo newRegionInfo = new HRegionInfo(tabledesc, startKey, endKey);
2771 LOG.info("Creating new region " + newRegionInfo.toString());
2772 String encodedName = newRegionInfo.getEncodedName();
2773 Path newRegionDir = HRegion.getRegionDir(a.getTableDir(), encodedName);
2774 if(fs.exists(newRegionDir)) {
2775 throw new IOException("Cannot merge; target file collision at " +
2776 newRegionDir);
2777 }
2778 fs.mkdirs(newRegionDir);
2779
2780 LOG.info("starting merge of regions: " + a + " and " + b +
2781 " into new region " + newRegionInfo.toString() +
2782 " with start key <" + Bytes.toStringBinary(startKey) + "> and end key <" +
2783 Bytes.toStringBinary(endKey) + ">");
2784
2785
2786 Map<byte [], List<StoreFile>> byFamily =
2787 new TreeMap<byte [], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
2788 byFamily = filesByFamily(byFamily, a.close());
2789 byFamily = filesByFamily(byFamily, b.close());
2790 for (Map.Entry<byte [], List<StoreFile>> es : byFamily.entrySet()) {
2791 byte [] colFamily = es.getKey();
2792 makeColumnFamilyDirs(fs, tableDir, newRegionInfo, colFamily);
2793
2794
2795 List<StoreFile> srcFiles = es.getValue();
2796 if (srcFiles.size() == 2) {
2797 long seqA = srcFiles.get(0).getMaxSequenceId();
2798 long seqB = srcFiles.get(1).getMaxSequenceId();
2799 if (seqA == seqB) {
2800
2801
2802
2803 throw new IOException("Files have same sequenceid: " + seqA);
2804 }
2805 }
2806 for (StoreFile hsf: srcFiles) {
2807 StoreFile.rename(fs, hsf.getPath(),
2808 StoreFile.getUniqueFile(fs, Store.getStoreHomedir(tableDir,
2809 newRegionInfo.getEncodedName(), colFamily)));
2810 }
2811 }
2812 if (LOG.isDebugEnabled()) {
2813 LOG.debug("Files for new region");
2814 listPaths(fs, newRegionDir);
2815 }
2816 HRegion dstRegion = HRegion.newHRegion(tableDir, log, fs, conf, newRegionInfo, null);
2817 dstRegion.initialize();
2818 dstRegion.compactStores();
2819 if (LOG.isDebugEnabled()) {
2820 LOG.debug("Files for new region");
2821 listPaths(fs, dstRegion.getRegionDir());
2822 }
2823 deleteRegion(fs, a.getRegionDir());
2824 deleteRegion(fs, b.getRegionDir());
2825
2826 LOG.info("merge completed. New region is " + dstRegion);
2827
2828 return dstRegion;
2829 }
2830
2831
2832
2833
2834
2835
2836
2837
2838 private static Map<byte [], List<StoreFile>> filesByFamily(
2839 Map<byte [], List<StoreFile>> byFamily, List<StoreFile> storeFiles) {
2840 for (StoreFile src: storeFiles) {
2841 byte [] family = src.getFamily();
2842 List<StoreFile> v = byFamily.get(family);
2843 if (v == null) {
2844 v = new ArrayList<StoreFile>();
2845 byFamily.put(family, v);
2846 }
2847 v.add(src);
2848 }
2849 return byFamily;
2850 }
2851
2852
2853
2854
2855
2856 boolean isMajorCompaction() throws IOException {
2857 for (Store store: this.stores.values()) {
2858 if (store.isMajorCompaction()) {
2859 return true;
2860 }
2861 }
2862 return false;
2863 }
2864
2865
2866
2867
2868
2869
2870
2871
2872 private static void listPaths(FileSystem fs, Path dir) throws IOException {
2873 if (LOG.isDebugEnabled()) {
2874 FileStatus[] stats = fs.listStatus(dir);
2875 if (stats == null || stats.length == 0) {
2876 return;
2877 }
2878 for (int i = 0; i < stats.length; i++) {
2879 String path = stats[i].getPath().toString();
2880 if (stats[i].isDir()) {
2881 LOG.debug("d " + path);
2882 listPaths(fs, stats[i].getPath());
2883 } else {
2884 LOG.debug("f " + path + " size=" + stats[i].getLen());
2885 }
2886 }
2887 }
2888 }
2889
2890
2891
2892
2893
2894
2895
2896
2897
2898
2899
2900 public Result get(final Get get, final Integer lockid) throws IOException {
2901
2902 if (get.hasFamilies()) {
2903 for (byte [] family: get.familySet()) {
2904 checkFamily(family);
2905 }
2906 } else {
2907 for (byte[] family: regionInfo.getTableDesc().getFamiliesKeys()) {
2908 get.addFamily(family);
2909 }
2910 }
2911 List<KeyValue> result = get(get);
2912
2913 return new Result(result);
2914 }
2915
2916
2917
2918
2919
2920
2921
2922
2923
2924
2925
2926
2927
2928
2929
2930 private List<KeyValue> getLastIncrement(final Get get) throws IOException {
2931 InternalScan iscan = new InternalScan(get);
2932
2933 List<KeyValue> results = new ArrayList<KeyValue>();
2934
2935
2936 iscan.checkOnlyMemStore();
2937 InternalScanner scanner = null;
2938 try {
2939 scanner = getScanner(iscan);
2940 scanner.next(results);
2941 } finally {
2942 if (scanner != null)
2943 scanner.close();
2944 }
2945
2946
2947 int expected = 0;
2948 Map<byte[], NavigableSet<byte[]>> familyMap = get.getFamilyMap();
2949 for (NavigableSet<byte[]> qfs : familyMap.values()) {
2950 expected += qfs.size();
2951 }
2952
2953
2954 if (results.size() == expected) {
2955 return results;
2956 }
2957
2958
2959 if (results != null && !results.isEmpty()) {
2960
2961 for (KeyValue kv : results) {
2962 byte [] family = kv.getFamily();
2963 NavigableSet<byte[]> qfs = familyMap.get(family);
2964 qfs.remove(kv.getQualifier());
2965 if (qfs.isEmpty()) familyMap.remove(family);
2966 expected--;
2967 }
2968
2969 Get newGet = new Get(get.getRow());
2970 for (Map.Entry<byte[], NavigableSet<byte[]>> f : familyMap.entrySet()) {
2971 byte [] family = f.getKey();
2972 for (byte [] qualifier : f.getValue()) {
2973 newGet.addColumn(family, qualifier);
2974 }
2975 }
2976 newGet.setTimeRange(get.getTimeRange().getMin(),
2977 get.getTimeRange().getMax());
2978 iscan = new InternalScan(newGet);
2979 }
2980
2981
2982 List<KeyValue> fileResults = new ArrayList<KeyValue>();
2983 iscan.checkOnlyStoreFiles();
2984 scanner = null;
2985 try {
2986 scanner = getScanner(iscan);
2987 scanner.next(fileResults);
2988 } finally {
2989 if (scanner != null)
2990 scanner.close();
2991 }
2992
2993
2994 results.addAll(fileResults);
2995 Collections.sort(results, KeyValue.COMPARATOR);
2996 return results;
2997 }
2998
2999
3000
3001
3002 private List<KeyValue> get(final Get get) throws IOException {
3003 Scan scan = new Scan(get);
3004
3005 List<KeyValue> results = new ArrayList<KeyValue>();
3006
3007 InternalScanner scanner = null;
3008 try {
3009 scanner = getScanner(scan);
3010 scanner.next(results);
3011 } finally {
3012 if (scanner != null)
3013 scanner.close();
3014 }
3015 return results;
3016 }
3017
3018
3019
3020
3021
3022
3023
3024
3025
3026
3027
3028
3029 public Result increment(Increment increment, Integer lockid,
3030 boolean writeToWAL)
3031 throws IOException {
3032
3033 byte [] row = increment.getRow();
3034 checkRow(row);
3035 TimeRange tr = increment.getTimeRange();
3036 boolean flush = false;
3037 WALEdit walEdits = null;
3038 List<KeyValue> allKVs = new ArrayList<KeyValue>(increment.numColumns());
3039 List<KeyValue> kvs = new ArrayList<KeyValue>(increment.numColumns());
3040 long now = EnvironmentEdgeManager.currentTimeMillis();
3041 long size = 0;
3042
3043
3044 startRegionOperation();
3045 try {
3046 Integer lid = getLock(lockid, row, true);
3047 this.updatesLock.readLock().lock();
3048 try {
3049
3050 for (Map.Entry<byte [], NavigableMap<byte [], Long>> family :
3051 increment.getFamilyMap().entrySet()) {
3052
3053 Store store = stores.get(family.getKey());
3054
3055
3056 Get get = new Get(row);
3057 for (Map.Entry<byte [], Long> column : family.getValue().entrySet()) {
3058 get.addColumn(family.getKey(), column.getKey());
3059 }
3060 get.setTimeRange(tr.getMin(), tr.getMax());
3061 List<KeyValue> results = getLastIncrement(get);
3062
3063
3064
3065 int idx = 0;
3066 for (Map.Entry<byte [], Long> column : family.getValue().entrySet()) {
3067 long amount = column.getValue();
3068 if (idx < results.size() &&
3069 results.get(idx).matchingQualifier(column.getKey())) {
3070 amount += Bytes.toLong(results.get(idx).getValue());
3071 idx++;
3072 }
3073
3074
3075 KeyValue newKV = new KeyValue(row, family.getKey(), column.getKey(),
3076 now, Bytes.toBytes(amount));
3077 kvs.add(newKV);
3078
3079
3080 if (writeToWAL) {
3081 if (walEdits == null) {
3082 walEdits = new WALEdit();
3083 }
3084 walEdits.add(newKV);
3085 }
3086 }
3087
3088
3089 size += store.upsert(kvs);
3090 allKVs.addAll(kvs);
3091 kvs.clear();
3092 }
3093
3094
3095 if (writeToWAL) {
3096 this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
3097 walEdits, now);
3098 }
3099
3100 size = this.memstoreSize.addAndGet(size);
3101 flush = isFlushSize(size);
3102 } finally {
3103 this.updatesLock.readLock().unlock();
3104 releaseRowLock(lid);
3105 }
3106 } finally {
3107 closeRegionOperation();
3108 }
3109
3110 if (flush) {
3111
3112 requestFlush();
3113 }
3114
3115 return new Result(allKVs);
3116 }
3117
3118
3119
3120
3121
3122
3123
3124
3125
3126
3127
3128 public long incrementColumnValue(byte [] row, byte [] family,
3129 byte [] qualifier, long amount, boolean writeToWAL)
3130 throws IOException {
3131 checkRow(row);
3132 boolean flush = false;
3133
3134 long result = amount;
3135 startRegionOperation();
3136 try {
3137 Integer lid = obtainRowLock(row);
3138 this.updatesLock.readLock().lock();
3139 try {
3140 Store store = stores.get(family);
3141
3142
3143 Get get = new Get(row);
3144 get.addColumn(family, qualifier);
3145
3146 List<KeyValue> results = getLastIncrement(get);
3147
3148 if (!results.isEmpty()) {
3149 KeyValue kv = results.get(0);
3150 byte [] buffer = kv.getBuffer();
3151 int valueOffset = kv.getValueOffset();
3152 result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG);
3153 }
3154
3155
3156 KeyValue newKv = new KeyValue(row, family,
3157 qualifier, EnvironmentEdgeManager.currentTimeMillis(),
3158 Bytes.toBytes(result));
3159
3160
3161 if (writeToWAL) {
3162 long now = EnvironmentEdgeManager.currentTimeMillis();
3163 WALEdit walEdit = new WALEdit();
3164 walEdit.add(newKv);
3165 this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
3166 walEdit, now);
3167 }
3168
3169
3170
3171
3172 long size = store.updateColumnValue(row, family, qualifier, result);
3173
3174 size = this.memstoreSize.addAndGet(size);
3175 flush = isFlushSize(size);
3176 } finally {
3177 this.updatesLock.readLock().unlock();
3178 releaseRowLock(lid);
3179 }
3180 } finally {
3181 closeRegionOperation();
3182 }
3183
3184 if (flush) {
3185
3186 requestFlush();
3187 }
3188
3189 return result;
3190 }
3191
3192
3193
3194
3195
3196
3197 private void checkFamily(final byte [] family)
3198 throws NoSuchColumnFamilyException {
3199 if(!regionInfo.getTableDesc().hasFamily(family)) {
3200 throw new NoSuchColumnFamilyException("Column family " +
3201 Bytes.toStringBinary(family) + " does not exist in region " + this
3202 + " in table " + regionInfo.getTableDesc());
3203 }
3204 }
3205
3206 public static final long FIXED_OVERHEAD = ClassSize.align(
3207 ClassSize.OBJECT +
3208 (4 * Bytes.SIZEOF_LONG) +
3209 Bytes.SIZEOF_INT +
3210 (2 * Bytes.SIZEOF_BOOLEAN) +
3211 ClassSize.ARRAY +
3212 (23 * ClassSize.REFERENCE));
3213
3214 public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
3215 ClassSize.OBJECT +
3216 (2 * ClassSize.ATOMIC_BOOLEAN) +
3217 ClassSize.ATOMIC_LONG +
3218 ClassSize.ATOMIC_INTEGER +
3219 (2 * ClassSize.CONCURRENT_HASHMAP) +
3220 WriteState.HEAP_SIZE +
3221 ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY +
3222 (2 * ClassSize.REENTRANT_LOCK) +
3223 ClassSize.ARRAYLIST +
3224 ReadWriteConsistencyControl.FIXED_SIZE
3225 ;
3226
3227 @Override
3228 public long heapSize() {
3229 long heapSize = DEEP_OVERHEAD;
3230 for(Store store : this.stores.values()) {
3231 heapSize += store.heapSize();
3232 }
3233
3234 return heapSize;
3235 }
3236
3237
3238
3239
3240
3241 private static void printUsageAndExit(final String message) {
3242 if (message != null && message.length() > 0) System.out.println(message);
3243 System.out.println("Usage: HRegion CATLALOG_TABLE_DIR [major_compact]");
3244 System.out.println("Options:");
3245 System.out.println(" major_compact Pass this option to major compact " +
3246 "passed region.");
3247 System.out.println("Default outputs scan of passed region.");
3248 System.exit(1);
3249 }
3250
3251
3252
3253
3254
3255
3256
3257
3258
3259
3260
3261 private static void processTable(final FileSystem fs, final Path p,
3262 final HLog log, final Configuration c,
3263 final boolean majorCompact)
3264 throws IOException {
3265 HRegion region = null;
3266 String rootStr = Bytes.toString(HConstants.ROOT_TABLE_NAME);
3267 String metaStr = Bytes.toString(HConstants.META_TABLE_NAME);
3268
3269 if (p.getName().startsWith(rootStr)) {
3270 region = HRegion.newHRegion(p, log, fs, c, HRegionInfo.ROOT_REGIONINFO, null);
3271 } else if (p.getName().startsWith(metaStr)) {
3272 region = HRegion.newHRegion(p, log, fs, c, HRegionInfo.FIRST_META_REGIONINFO,
3273 null);
3274 } else {
3275 throw new IOException("Not a known catalog table: " + p.toString());
3276 }
3277 try {
3278 region.initialize();
3279 if (majorCompact) {
3280 region.compactStores(true);
3281 } else {
3282
3283 Scan scan = new Scan();
3284
3285 InternalScanner scanner = region.getScanner(scan);
3286 try {
3287 List<KeyValue> kvs = new ArrayList<KeyValue>();
3288 boolean done = false;
3289 do {
3290 kvs.clear();
3291 done = scanner.next(kvs);
3292 if (kvs.size() > 0) LOG.info(kvs);
3293 } while (done);
3294 } finally {
3295 scanner.close();
3296 }
3297
3298 }
3299 } finally {
3300 region.close();
3301 }
3302 }
3303
3304 boolean shouldForceSplit() {
3305 return this.splitRequest;
3306 }
3307
3308 byte[] getSplitPoint() {
3309 return this.splitPoint;
3310 }
3311
3312 void forceSplit(byte[] sp) {
3313
3314
3315 this.splitRequest = true;
3316 if (sp != null) {
3317 this.splitPoint = sp;
3318 }
3319 }
3320
3321
3322
3323
3324 protected void prepareToSplit() {
3325
3326 }
3327
3328
3329
3330
3331 public int getCompactPriority() {
3332 int count = Integer.MAX_VALUE;
3333 for(Store store : stores.values()) {
3334 count = Math.min(count, store.getCompactPriority());
3335 }
3336 return count;
3337 }
3338
3339
3340
3341
3342
3343
3344 public boolean hasTooManyStoreFiles() {
3345 for(Store store : stores.values()) {
3346 if(store.hasTooManyStoreFiles()) {
3347 return true;
3348 }
3349 }
3350 return false;
3351 }
3352
3353
3354
3355
3356
3357
3358
3359
3360 private void startRegionOperation() throws NotServingRegionException {
3361 if (this.closing.get()) {
3362 throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
3363 " is closing");
3364 }
3365 lock.readLock().lock();
3366 if (this.closed.get()) {
3367 lock.readLock().unlock();
3368 throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
3369 " is closed");
3370 }
3371 }
3372
3373
3374
3375
3376
3377 private void closeRegionOperation(){
3378 lock.readLock().unlock();
3379 }
3380
3381
3382
3383
3384 private static final List<KeyValue> MOCKED_LIST = new AbstractList<KeyValue>() {
3385
3386 @Override
3387 public void add(int index, KeyValue element) {
3388
3389 }
3390
3391 @Override
3392 public boolean addAll(int index, Collection<? extends KeyValue> c) {
3393 return false;
3394 }
3395
3396 @Override
3397 public KeyValue get(int index) {
3398 throw new UnsupportedOperationException();
3399 }
3400
3401 @Override
3402 public int size() {
3403 return 0;
3404 }
3405 };
3406
3407
3408
3409
3410
3411
3412
3413
3414
3415
3416
3417
3418 public static void main(String[] args) throws IOException {
3419 if (args.length < 1) {
3420 printUsageAndExit(null);
3421 }
3422 boolean majorCompact = false;
3423 if (args.length > 1) {
3424 if (!args[1].toLowerCase().startsWith("major")) {
3425 printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">");
3426 }
3427 majorCompact = true;
3428 }
3429 final Path tableDir = new Path(args[0]);
3430 final Configuration c = HBaseConfiguration.create();
3431 final FileSystem fs = FileSystem.get(c);
3432 final Path logdir = new Path(c.get("hbase.tmp.dir"),
3433 "hlog" + tableDir.getName()
3434 + EnvironmentEdgeManager.currentTimeMillis());
3435 final Path oldLogDir = new Path(c.get("hbase.tmp.dir"),
3436 HConstants.HREGION_OLDLOGDIR_NAME);
3437 final HLog log = new HLog(fs, logdir, oldLogDir, c);
3438 try {
3439 processTable(fs, tableDir, log, c, majorCompact);
3440 } finally {
3441 log.close();
3442 BlockCache bc = StoreFile.getBlockCache(c);
3443 if (bc != null) bc.shutdown();
3444 }
3445 }
3446 }