1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.regionserver;
22
23 import java.lang.management.ManagementFactory;
24 import java.lang.management.RuntimeMXBean;
25 import java.rmi.UnexpectedException;
26 import java.util.Arrays;
27 import java.util.Collections;
28 import java.util.Iterator;
29 import java.util.List;
30 import java.util.NavigableSet;
31 import java.util.SortedSet;
32 import java.util.concurrent.atomic.AtomicLong;
33 import java.util.concurrent.locks.ReentrantReadWriteLock;
34
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.hbase.HBaseConfiguration;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.KeyValue;
41 import org.apache.hadoop.hbase.client.Scan;
42 import org.apache.hadoop.hbase.io.HeapSize;
43 import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
44 import org.apache.hadoop.hbase.util.Bytes;
45 import org.apache.hadoop.hbase.util.ClassSize;
46
47
48
49
50
51
52
53
54
55
56
57
58 public class MemStore implements HeapSize {
59 private static final Log LOG = LogFactory.getLog(MemStore.class);
60
61 static final String USEMSLAB_KEY =
62 "hbase.hregion.memstore.mslab.enabled";
63 private static final boolean USEMSLAB_DEFAULT = false;
64
65 static final String RESEEKMAX_KEY =
66 "hbase.hregion.memstore.reseek.maxkeys";
67 private static final int RESEEKMAX_DEFAULT = 32;
68
69 private Configuration conf;
70
71
72
73
74
75
76 volatile KeyValueSkipListSet kvset;
77
78
79 volatile KeyValueSkipListSet snapshot;
80
81 final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
82
83 final KeyValue.KVComparator comparator;
84
85
86 final KeyValue.KVComparator comparatorIgnoreType;
87
88
89 final KeyValue.KVComparator comparatorIgnoreTimestamp;
90
91
92 final AtomicLong size;
93
94 TimeRangeTracker timeRangeTracker;
95 TimeRangeTracker snapshotTimeRangeTracker;
96
97 MemStoreLAB allocator;
98
99
100
101
102 int reseekNumKeys;
103
104
105
106
107 public MemStore() {
108 this(HBaseConfiguration.create(), KeyValue.COMPARATOR);
109 }
110
111
112
113
114
115 public MemStore(final Configuration conf,
116 final KeyValue.KVComparator c) {
117 this.conf = conf;
118 this.comparator = c;
119 this.comparatorIgnoreTimestamp =
120 this.comparator.getComparatorIgnoringTimestamps();
121 this.comparatorIgnoreType = this.comparator.getComparatorIgnoringType();
122 this.kvset = new KeyValueSkipListSet(c);
123 this.snapshot = new KeyValueSkipListSet(c);
124 timeRangeTracker = new TimeRangeTracker();
125 snapshotTimeRangeTracker = new TimeRangeTracker();
126 this.size = new AtomicLong(DEEP_OVERHEAD);
127 if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
128 this.allocator = new MemStoreLAB(conf);
129 } else {
130 this.allocator = null;
131 }
132 this.reseekNumKeys = conf.getInt(RESEEKMAX_KEY, RESEEKMAX_DEFAULT);
133 }
134
135 void dump() {
136 for (KeyValue kv: this.kvset) {
137 LOG.info(kv);
138 }
139 for (KeyValue kv: this.snapshot) {
140 LOG.info(kv);
141 }
142 }
143
144
145
146
147
148
149 void snapshot() {
150 this.lock.writeLock().lock();
151 try {
152
153
154 if (!this.snapshot.isEmpty()) {
155 LOG.warn("Snapshot called again without clearing previous. " +
156 "Doing nothing. Another ongoing flush or did we fail last attempt?");
157 } else {
158 if (!this.kvset.isEmpty()) {
159 this.snapshot = this.kvset;
160 this.kvset = new KeyValueSkipListSet(this.comparator);
161 this.snapshotTimeRangeTracker = this.timeRangeTracker;
162 this.timeRangeTracker = new TimeRangeTracker();
163
164 this.size.set(DEEP_OVERHEAD);
165
166 if (allocator != null) {
167 this.allocator = new MemStoreLAB(conf);
168 }
169 }
170 }
171 } finally {
172 this.lock.writeLock().unlock();
173 }
174 }
175
176
177
178
179
180
181
182
183
184 KeyValueSkipListSet getSnapshot() {
185 return this.snapshot;
186 }
187
188
189
190
191
192
193
194 void clearSnapshot(final SortedSet<KeyValue> ss)
195 throws UnexpectedException {
196 this.lock.writeLock().lock();
197 try {
198 if (this.snapshot != ss) {
199 throw new UnexpectedException("Current snapshot is " +
200 this.snapshot + ", was passed " + ss);
201 }
202
203
204 if (!ss.isEmpty()) {
205 this.snapshot = new KeyValueSkipListSet(this.comparator);
206 this.snapshotTimeRangeTracker = new TimeRangeTracker();
207 }
208 } finally {
209 this.lock.writeLock().unlock();
210 }
211 }
212
213
214
215
216
217
218 long add(final KeyValue kv) {
219 this.lock.readLock().lock();
220 try {
221 KeyValue toAdd = maybeCloneWithAllocator(kv);
222 return internalAdd(toAdd);
223 } finally {
224 this.lock.readLock().unlock();
225 }
226 }
227
228
229
230
231
232
233
234 private long internalAdd(final KeyValue toAdd) {
235 long s = heapSizeChange(toAdd, this.kvset.add(toAdd));
236 timeRangeTracker.includeTimestamp(toAdd);
237 this.size.addAndGet(s);
238 return s;
239 }
240
241 private KeyValue maybeCloneWithAllocator(KeyValue kv) {
242 if (allocator == null) {
243 return kv;
244 }
245
246 int len = kv.getLength();
247 Allocation alloc = allocator.allocateBytes(len);
248 if (alloc == null) {
249
250
251 return kv;
252 }
253 assert alloc != null && alloc.getData() != null;
254 System.arraycopy(kv.getBuffer(), kv.getOffset(), alloc.getData(), alloc.getOffset(), len);
255 KeyValue newKv = new KeyValue(alloc.getData(), alloc.getOffset(), len);
256 newKv.setMemstoreTS(kv.getMemstoreTS());
257 return newKv;
258 }
259
260
261
262
263
264
265 long delete(final KeyValue delete) {
266 long s = 0;
267 this.lock.readLock().lock();
268 try {
269 KeyValue toAdd = maybeCloneWithAllocator(delete);
270 s += heapSizeChange(toAdd, this.kvset.add(toAdd));
271 timeRangeTracker.includeTimestamp(toAdd);
272 } finally {
273 this.lock.readLock().unlock();
274 }
275 this.size.addAndGet(s);
276 return s;
277 }
278
279
280
281
282
283
284 KeyValue getNextRow(final KeyValue kv) {
285 this.lock.readLock().lock();
286 try {
287 return getLowest(getNextRow(kv, this.kvset), getNextRow(kv, this.snapshot));
288 } finally {
289 this.lock.readLock().unlock();
290 }
291 }
292
293
294
295
296
297
298 private KeyValue getLowest(final KeyValue a, final KeyValue b) {
299 if (a == null) {
300 return b;
301 }
302 if (b == null) {
303 return a;
304 }
305 return comparator.compareRows(a, b) <= 0? a: b;
306 }
307
308
309
310
311
312
313
314 private KeyValue getNextRow(final KeyValue key,
315 final NavigableSet<KeyValue> set) {
316 KeyValue result = null;
317 SortedSet<KeyValue> tail = key == null? set: set.tailSet(key);
318
319 for (KeyValue kv: tail) {
320 if (comparator.compareRows(kv, key) <= 0)
321 continue;
322
323
324 result = kv;
325 break;
326 }
327 return result;
328 }
329
330
331
332
333 void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
334 this.lock.readLock().lock();
335 try {
336 getRowKeyAtOrBefore(kvset, state);
337 getRowKeyAtOrBefore(snapshot, state);
338 } finally {
339 this.lock.readLock().unlock();
340 }
341 }
342
343
344
345
346
347 private void getRowKeyAtOrBefore(final NavigableSet<KeyValue> set,
348 final GetClosestRowBeforeTracker state) {
349 if (set.isEmpty()) {
350 return;
351 }
352 if (!walkForwardInSingleRow(set, state.getTargetKey(), state)) {
353
354 getRowKeyBefore(set, state);
355 }
356 }
357
358
359
360
361
362
363
364
365
366
367
368 private boolean walkForwardInSingleRow(final SortedSet<KeyValue> set,
369 final KeyValue firstOnRow, final GetClosestRowBeforeTracker state) {
370 boolean foundCandidate = false;
371 SortedSet<KeyValue> tail = set.tailSet(firstOnRow);
372 if (tail.isEmpty()) return foundCandidate;
373 for (Iterator<KeyValue> i = tail.iterator(); i.hasNext();) {
374 KeyValue kv = i.next();
375
376 if (state.isTooFar(kv, firstOnRow)) break;
377 if (state.isExpired(kv)) {
378 i.remove();
379 continue;
380 }
381
382 if (state.handle(kv)) {
383 foundCandidate = true;
384 break;
385 }
386 }
387 return foundCandidate;
388 }
389
390
391
392
393
394
395
396 private void getRowKeyBefore(NavigableSet<KeyValue> set,
397 final GetClosestRowBeforeTracker state) {
398 KeyValue firstOnRow = state.getTargetKey();
399 for (Member p = memberOfPreviousRow(set, state, firstOnRow);
400 p != null; p = memberOfPreviousRow(p.set, state, firstOnRow)) {
401
402 if (!state.isTargetTable(p.kv)) break;
403
404 if (!state.isBetterCandidate(p.kv)) break;
405
406 firstOnRow = new KeyValue(p.kv.getRow(), HConstants.LATEST_TIMESTAMP);
407
408 if (walkForwardInSingleRow(p.set, firstOnRow, state)) break;
409 }
410 }
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427 public long updateColumnValue(byte[] row,
428 byte[] family,
429 byte[] qualifier,
430 long newValue,
431 long now) {
432 this.lock.readLock().lock();
433 try {
434 KeyValue firstKv = KeyValue.createFirstOnRow(
435 row, family, qualifier);
436
437 SortedSet<KeyValue> snSs = snapshot.tailSet(firstKv);
438 if (!snSs.isEmpty()) {
439 KeyValue snKv = snSs.first();
440
441 if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) {
442 if (snKv.getTimestamp() == now) {
443
444 now += 1;
445 }
446 }
447 }
448
449
450
451
452
453
454
455 SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
456 Iterator<KeyValue> it = ss.iterator();
457 while ( it.hasNext() ) {
458 KeyValue kv = it.next();
459
460
461 if (!kv.matchingColumn(family,qualifier) || !kv.matchingRow(firstKv) ) {
462 break;
463 }
464
465
466 if (kv.getType() == KeyValue.Type.Put.getCode() &&
467 kv.getTimestamp() > now && firstKv.matchingQualifier(kv)) {
468 now = kv.getTimestamp();
469 }
470 }
471
472
473
474 return upsert(Arrays.asList(
475 new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)))
476 );
477 } finally {
478 this.lock.readLock().unlock();
479 }
480 }
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499 public long upsert(List<KeyValue> kvs) {
500 this.lock.readLock().lock();
501 try {
502 long size = 0;
503 for (KeyValue kv : kvs) {
504 kv.setMemstoreTS(0);
505 size += upsert(kv);
506 }
507 return size;
508 } finally {
509 this.lock.readLock().unlock();
510 }
511 }
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527 private long upsert(KeyValue kv) {
528
529
530
531
532
533
534 long addedSize = internalAdd(kv);
535
536
537
538 KeyValue firstKv = KeyValue.createFirstOnRow(
539 kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(),
540 kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(),
541 kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength());
542 SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
543 Iterator<KeyValue> it = ss.iterator();
544 while ( it.hasNext() ) {
545 KeyValue cur = it.next();
546
547 if (kv == cur) {
548
549 continue;
550 }
551
552 if (!kv.matchingRow(cur)) {
553 break;
554 }
555
556
557 if (kv.matchingQualifier(cur)) {
558
559
560 if (kv.getType() == KeyValue.Type.Put.getCode() &&
561 kv.getMemstoreTS() == 0) {
562
563 addedSize -= heapSizeChange(kv, true);
564 it.remove();
565 }
566 } else {
567
568 break;
569 }
570 }
571 return addedSize;
572 }
573
574
575
576
577
578 private static class Member {
579 final KeyValue kv;
580 final NavigableSet<KeyValue> set;
581 Member(final NavigableSet<KeyValue> s, final KeyValue kv) {
582 this.kv = kv;
583 this.set = s;
584 }
585 }
586
587
588
589
590
591
592
593
594
595 private Member memberOfPreviousRow(NavigableSet<KeyValue> set,
596 final GetClosestRowBeforeTracker state, final KeyValue firstOnRow) {
597 NavigableSet<KeyValue> head = set.headSet(firstOnRow, false);
598 if (head.isEmpty()) return null;
599 for (Iterator<KeyValue> i = head.descendingIterator(); i.hasNext();) {
600 KeyValue found = i.next();
601 if (state.isExpired(found)) {
602 i.remove();
603 continue;
604 }
605 return new Member(head, found);
606 }
607 return null;
608 }
609
610
611
612
613 List<KeyValueScanner> getScanners() {
614 this.lock.readLock().lock();
615 try {
616 return Collections.<KeyValueScanner>singletonList(
617 new MemStoreScanner());
618 } finally {
619 this.lock.readLock().unlock();
620 }
621 }
622
623
624
625
626
627
628 public boolean shouldSeek(Scan scan) {
629 return timeRangeTracker.includesTimeRange(scan.getTimeRange()) ||
630 snapshotTimeRangeTracker.includesTimeRange(scan.getTimeRange());
631 }
632
633 public TimeRangeTracker getSnapshotTimeRangeTracker() {
634 return this.snapshotTimeRangeTracker;
635 }
636
637
638
639
640
641
642
643 protected class MemStoreScanner implements KeyValueScanner {
644
645 private KeyValue kvsetNextRow = null;
646 private KeyValue snapshotNextRow = null;
647
648
649 Iterator<KeyValue> kvsetIt;
650 Iterator<KeyValue> snapshotIt;
651
652
653 int numIterReseek;
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673 MemStoreScanner() {
674 super();
675
676
677 }
678
679 protected KeyValue getNext(Iterator<KeyValue> it) {
680 KeyValue ret = null;
681 long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
682
683
684 while (ret == null && it.hasNext()) {
685 KeyValue v = it.next();
686 if (v.getMemstoreTS() <= readPoint) {
687
688 ret = v;
689 }
690 numIterReseek--;
691 if (numIterReseek == 0) {
692 break;
693 }
694 }
695 return ret;
696 }
697
698 public synchronized boolean seek(KeyValue key) {
699 if (key == null) {
700 close();
701 return false;
702 }
703 numIterReseek = 0;
704
705
706
707 SortedSet<KeyValue> kvTail = kvset.tailSet(key);
708 SortedSet<KeyValue> snapshotTail = snapshot.tailSet(key);
709
710 kvsetIt = kvTail.iterator();
711 snapshotIt = snapshotTail.iterator();
712
713 kvsetNextRow = getNext(kvsetIt);
714 snapshotNextRow = getNext(snapshotIt);
715
716
717
718
719
720
721
722
723
724 KeyValue lowest = getLowest();
725
726
727 return lowest != null;
728 }
729
730 @Override
731 public boolean reseek(KeyValue key) {
732 numIterReseek = reseekNumKeys;
733 while (kvsetNextRow != null &&
734 comparator.compare(kvsetNextRow, key) < 0) {
735 kvsetNextRow = getNext(kvsetIt);
736
737
738
739 if (kvsetNextRow == null && numIterReseek == 0) {
740 return seek(key);
741 }
742 }
743
744 while (snapshotNextRow != null &&
745 comparator.compare(snapshotNextRow, key) < 0) {
746 snapshotNextRow = getNext(snapshotIt);
747
748
749
750 if (snapshotNextRow == null && numIterReseek == 0) {
751 return seek(key);
752 }
753 }
754 return (kvsetNextRow != null || snapshotNextRow != null);
755 }
756
757 public synchronized KeyValue peek() {
758
759 return getLowest();
760 }
761
762
763 public synchronized KeyValue next() {
764 KeyValue theNext = getLowest();
765
766 if (theNext == null) {
767 return null;
768 }
769
770
771 if (theNext == kvsetNextRow) {
772 kvsetNextRow = getNext(kvsetIt);
773 } else {
774 snapshotNextRow = getNext(snapshotIt);
775 }
776
777
778
779
780 return theNext;
781 }
782
783 protected KeyValue getLowest() {
784 return getLower(kvsetNextRow,
785 snapshotNextRow);
786 }
787
788
789
790
791
792
793 protected KeyValue getLower(KeyValue first, KeyValue second) {
794 if (first == null && second == null) {
795 return null;
796 }
797 if (first != null && second != null) {
798 int compare = comparator.compare(first, second);
799 return (compare <= 0 ? first : second);
800 }
801 return (first != null ? first : second);
802 }
803
804 public synchronized void close() {
805 this.kvsetNextRow = null;
806 this.snapshotNextRow = null;
807
808 this.kvsetIt = null;
809 this.snapshotIt = null;
810 }
811
812
813
814
815
816 @Override
817 public long getSequenceID() {
818 return Long.MAX_VALUE;
819 }
820 }
821
822 public final static long FIXED_OVERHEAD = ClassSize.align(
823 ClassSize.OBJECT + (12 * ClassSize.REFERENCE));
824
825 public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
826 ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +
827 ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST +
828 (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
829
830
831
832
833
834
835
836
837 long heapSizeChange(final KeyValue kv, final boolean notpresent) {
838 return notpresent ?
839 ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()):
840 0;
841 }
842
843
844
845
846
847 @Override
848 public long heapSize() {
849 return size.get();
850 }
851
852
853
854
855 public long keySize() {
856 return heapSize() - DEEP_OVERHEAD;
857 }
858
859
860
861
862
863
864
865
866 public static void main(String [] args) {
867 RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
868 LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
869 runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());
870 LOG.info("vmInputArguments=" + runtime.getInputArguments());
871 MemStore memstore1 = new MemStore();
872
873 long size = 0;
874 final int count = 10000;
875 byte [] fam = Bytes.toBytes("col");
876 byte [] qf = Bytes.toBytes("umn");
877 byte [] empty = new byte[0];
878 for (int i = 0; i < count; i++) {
879
880 size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
881 }
882 LOG.info("memstore1 estimated size=" + size);
883 for (int i = 0; i < count; i++) {
884 size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
885 }
886 LOG.info("memstore1 estimated size (2nd loading of same data)=" + size);
887
888 MemStore memstore2 = new MemStore();
889 for (int i = 0; i < count; i++) {
890 size += memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i,
891 new byte[i]));
892 }
893 LOG.info("memstore2 estimated size=" + size);
894 final int seconds = 30;
895 LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
896 for (int i = 0; i < seconds; i++) {
897
898 }
899 LOG.info("Exiting.");
900 }
901 }