View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.regionserver;
22  
23  import java.lang.management.ManagementFactory;
24  import java.lang.management.RuntimeMXBean;
25  import java.rmi.UnexpectedException;
26  import java.util.Arrays;
27  import java.util.Collections;
28  import java.util.Iterator;
29  import java.util.List;
30  import java.util.NavigableSet;
31  import java.util.SortedSet;
32  import java.util.concurrent.atomic.AtomicLong;
33  import java.util.concurrent.locks.ReentrantReadWriteLock;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.hbase.HBaseConfiguration;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.KeyValue;
41  import org.apache.hadoop.hbase.client.Scan;
42  import org.apache.hadoop.hbase.io.HeapSize;
43  import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.hbase.util.ClassSize;
46  
47  /**
48   * The MemStore holds in-memory modifications to the Store.  Modifications
49   * are {@link KeyValue}s.  When asked to flush, current memstore is moved
50   * to snapshot and is cleared.  We continue to serve edits out of new memstore
51   * and backing snapshot until flusher reports in that the flush succeeded. At
52   * this point we let the snapshot go.
53   * TODO: Adjust size of the memstore when we remove items because they have
54   * been deleted.
55   * TODO: With new KVSLS, need to make sure we update HeapSize with difference
56   * in KV size.
57   */
58  public class MemStore implements HeapSize {
59    private static final Log LOG = LogFactory.getLog(MemStore.class);
60  
61    static final String USEMSLAB_KEY =
62      "hbase.hregion.memstore.mslab.enabled";
63    private static final boolean USEMSLAB_DEFAULT = false;
64  
65    static final String RESEEKMAX_KEY =
66      "hbase.hregion.memstore.reseek.maxkeys";
67    private static final int RESEEKMAX_DEFAULT = 32;
68  
69    private Configuration conf;
70  
71    // MemStore.  Use a KeyValueSkipListSet rather than SkipListSet because of the
72    // better semantics.  The Map will overwrite if passed a key it already had
73    // whereas the Set will not add new KV if key is same though value might be
74    // different.  Value is not important -- just make sure always same
75    // reference passed.
76    volatile KeyValueSkipListSet kvset;
77  
78    // Snapshot of memstore.  Made for flusher.
79    volatile KeyValueSkipListSet snapshot;
80  
81    final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
82  
83    final KeyValue.KVComparator comparator;
84  
85    // Used comparing versions -- same r/c and ts but different type.
86    final KeyValue.KVComparator comparatorIgnoreType;
87  
88    // Used comparing versions -- same r/c and type but different timestamp.
89    final KeyValue.KVComparator comparatorIgnoreTimestamp;
90  
91    // Used to track own heapSize
92    final AtomicLong size;
93  
94    TimeRangeTracker timeRangeTracker;
95    TimeRangeTracker snapshotTimeRangeTracker;
96    
97    MemStoreLAB allocator;
98  
99    // if a reseek has to scan over more than these number of keys, then
100   // it morphs into a seek. A seek does a tree map-search while 
101   // reseek does a linear scan.
102   int reseekNumKeys;
103 
104   /**
105    * Default constructor. Used for tests.
106    */
107   public MemStore() {
108     this(HBaseConfiguration.create(), KeyValue.COMPARATOR);
109   }
110 
111   /**
112    * Constructor.
113    * @param c Comparator
114    */
115   public MemStore(final Configuration conf,
116                   final KeyValue.KVComparator c) {
117     this.conf = conf;
118     this.comparator = c;
119     this.comparatorIgnoreTimestamp =
120       this.comparator.getComparatorIgnoringTimestamps();
121     this.comparatorIgnoreType = this.comparator.getComparatorIgnoringType();
122     this.kvset = new KeyValueSkipListSet(c);
123     this.snapshot = new KeyValueSkipListSet(c);
124     timeRangeTracker = new TimeRangeTracker();
125     snapshotTimeRangeTracker = new TimeRangeTracker();
126     this.size = new AtomicLong(DEEP_OVERHEAD);
127     if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
128       this.allocator = new MemStoreLAB(conf);
129     } else {
130       this.allocator = null;
131     }
132     this.reseekNumKeys = conf.getInt(RESEEKMAX_KEY, RESEEKMAX_DEFAULT);
133   }
134 
135   void dump() {
136     for (KeyValue kv: this.kvset) {
137       LOG.info(kv);
138     }
139     for (KeyValue kv: this.snapshot) {
140       LOG.info(kv);
141     }
142   }
143 
144   /**
145    * Creates a snapshot of the current memstore.
146    * Snapshot must be cleared by call to {@link #clearSnapshot(SortedSet<KeyValue>)}
147    * To get the snapshot made by this method, use {@link #getSnapshot()}
148    */
149   void snapshot() {
150     this.lock.writeLock().lock();
151     try {
152       // If snapshot currently has entries, then flusher failed or didn't call
153       // cleanup.  Log a warning.
154       if (!this.snapshot.isEmpty()) {
155         LOG.warn("Snapshot called again without clearing previous. " +
156           "Doing nothing. Another ongoing flush or did we fail last attempt?");
157       } else {
158         if (!this.kvset.isEmpty()) {
159           this.snapshot = this.kvset;
160           this.kvset = new KeyValueSkipListSet(this.comparator);
161           this.snapshotTimeRangeTracker = this.timeRangeTracker;
162           this.timeRangeTracker = new TimeRangeTracker();
163           // Reset heap to not include any keys
164           this.size.set(DEEP_OVERHEAD);
165           // Reset allocator so we get a fresh buffer for the new memstore
166           if (allocator != null) {
167             this.allocator = new MemStoreLAB(conf);
168           }
169         }
170       }
171     } finally {
172       this.lock.writeLock().unlock();
173     }
174   }
175 
176   /**
177    * Return the current snapshot.
178    * Called by flusher to get current snapshot made by a previous
179    * call to {@link #snapshot()}
180    * @return Return snapshot.
181    * @see {@link #snapshot()}
182    * @see {@link #clearSnapshot(SortedSet<KeyValue>)}
183    */
184   KeyValueSkipListSet getSnapshot() {
185     return this.snapshot;
186   }
187 
188   /**
189    * The passed snapshot was successfully persisted; it can be let go.
190    * @param ss The snapshot to clean out.
191    * @throws UnexpectedException
192    * @see {@link #snapshot()}
193    */
194   void clearSnapshot(final SortedSet<KeyValue> ss)
195   throws UnexpectedException {
196     this.lock.writeLock().lock();
197     try {
198       if (this.snapshot != ss) {
199         throw new UnexpectedException("Current snapshot is " +
200           this.snapshot + ", was passed " + ss);
201       }
202       // OK. Passed in snapshot is same as current snapshot.  If not-empty,
203       // create a new snapshot and let the old one go.
204       if (!ss.isEmpty()) {
205         this.snapshot = new KeyValueSkipListSet(this.comparator);
206         this.snapshotTimeRangeTracker = new TimeRangeTracker();
207       }
208     } finally {
209       this.lock.writeLock().unlock();
210     }
211   }
212 
213   /**
214    * Write an update
215    * @param kv
216    * @return approximate size of the passed key and value.
217    */
218   long add(final KeyValue kv) {
219     this.lock.readLock().lock();
220     try {
221       KeyValue toAdd = maybeCloneWithAllocator(kv);
222       return internalAdd(toAdd);
223     } finally {
224       this.lock.readLock().unlock();
225     }
226   }
227   
228   /**
229    * Internal version of add() that doesn't clone KVs with the
230    * allocator, and doesn't take the lock.
231    * 
232    * Callers should ensure they already have the read lock taken
233    */
234   private long internalAdd(final KeyValue toAdd) {
235     long s = heapSizeChange(toAdd, this.kvset.add(toAdd));
236     timeRangeTracker.includeTimestamp(toAdd);
237     this.size.addAndGet(s);
238     return s;
239   }
240 
241   private KeyValue maybeCloneWithAllocator(KeyValue kv) {
242     if (allocator == null) {
243       return kv;
244     }
245 
246     int len = kv.getLength();
247     Allocation alloc = allocator.allocateBytes(len);
248     if (alloc == null) {
249       // The allocation was too large, allocator decided
250       // not to do anything with it.
251       return kv;
252     }
253     assert alloc != null && alloc.getData() != null;
254     System.arraycopy(kv.getBuffer(), kv.getOffset(), alloc.getData(), alloc.getOffset(), len);
255     KeyValue newKv = new KeyValue(alloc.getData(), alloc.getOffset(), len);
256     newKv.setMemstoreTS(kv.getMemstoreTS());
257     return newKv;
258   }
259 
260   /**
261    * Write a delete
262    * @param delete
263    * @return approximate size of the passed key and value.
264    */
265   long delete(final KeyValue delete) {
266     long s = 0;
267     this.lock.readLock().lock();
268     try {
269       KeyValue toAdd = maybeCloneWithAllocator(delete);
270       s += heapSizeChange(toAdd, this.kvset.add(toAdd));
271       timeRangeTracker.includeTimestamp(toAdd);
272     } finally {
273       this.lock.readLock().unlock();
274     }
275     this.size.addAndGet(s);
276     return s;
277   }
278 
279   /**
280    * @param kv Find the row that comes after this one.  If null, we return the
281    * first.
282    * @return Next row or null if none found.
283    */
284   KeyValue getNextRow(final KeyValue kv) {
285     this.lock.readLock().lock();
286     try {
287       return getLowest(getNextRow(kv, this.kvset), getNextRow(kv, this.snapshot));
288     } finally {
289       this.lock.readLock().unlock();
290     }
291   }
292 
293   /*
294    * @param a
295    * @param b
296    * @return Return lowest of a or b or null if both a and b are null
297    */
298   private KeyValue getLowest(final KeyValue a, final KeyValue b) {
299     if (a == null) {
300       return b;
301     }
302     if (b == null) {
303       return a;
304     }
305     return comparator.compareRows(a, b) <= 0? a: b;
306   }
307 
308   /*
309    * @param key Find row that follows this one.  If null, return first.
310    * @param map Set to look in for a row beyond <code>row</code>.
311    * @return Next row or null if none found.  If one found, will be a new
312    * KeyValue -- can be destroyed by subsequent calls to this method.
313    */
314   private KeyValue getNextRow(final KeyValue key,
315       final NavigableSet<KeyValue> set) {
316     KeyValue result = null;
317     SortedSet<KeyValue> tail = key == null? set: set.tailSet(key);
318     // Iterate until we fall into the next row; i.e. move off current row
319     for (KeyValue kv: tail) {
320       if (comparator.compareRows(kv, key) <= 0)
321         continue;
322       // Note: Not suppressing deletes or expired cells.  Needs to be handled
323       // by higher up functions.
324       result = kv;
325       break;
326     }
327     return result;
328   }
329 
330   /**
331    * @param state column/delete tracking state
332    */
333   void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
334     this.lock.readLock().lock();
335     try {
336       getRowKeyAtOrBefore(kvset, state);
337       getRowKeyAtOrBefore(snapshot, state);
338     } finally {
339       this.lock.readLock().unlock();
340     }
341   }
342 
343   /*
344    * @param set
345    * @param state Accumulates deletes and candidates.
346    */
347   private void getRowKeyAtOrBefore(final NavigableSet<KeyValue> set,
348       final GetClosestRowBeforeTracker state) {
349     if (set.isEmpty()) {
350       return;
351     }
352     if (!walkForwardInSingleRow(set, state.getTargetKey(), state)) {
353       // Found nothing in row.  Try backing up.
354       getRowKeyBefore(set, state);
355     }
356   }
357 
358   /*
359    * Walk forward in a row from <code>firstOnRow</code>.  Presumption is that
360    * we have been passed the first possible key on a row.  As we walk forward
361    * we accumulate deletes until we hit a candidate on the row at which point
362    * we return.
363    * @param set
364    * @param firstOnRow First possible key on this row.
365    * @param state
366    * @return True if we found a candidate walking this row.
367    */
368   private boolean walkForwardInSingleRow(final SortedSet<KeyValue> set,
369       final KeyValue firstOnRow, final GetClosestRowBeforeTracker state) {
370     boolean foundCandidate = false;
371     SortedSet<KeyValue> tail = set.tailSet(firstOnRow);
372     if (tail.isEmpty()) return foundCandidate;
373     for (Iterator<KeyValue> i = tail.iterator(); i.hasNext();) {
374       KeyValue kv = i.next();
375       // Did we go beyond the target row? If so break.
376       if (state.isTooFar(kv, firstOnRow)) break;
377       if (state.isExpired(kv)) {
378         i.remove();
379         continue;
380       }
381       // If we added something, this row is a contender. break.
382       if (state.handle(kv)) {
383         foundCandidate = true;
384         break;
385       }
386     }
387     return foundCandidate;
388   }
389 
390   /*
391    * Walk backwards through the passed set a row at a time until we run out of
392    * set or until we get a candidate.
393    * @param set
394    * @param state
395    */
396   private void getRowKeyBefore(NavigableSet<KeyValue> set,
397       final GetClosestRowBeforeTracker state) {
398     KeyValue firstOnRow = state.getTargetKey();
399     for (Member p = memberOfPreviousRow(set, state, firstOnRow);
400         p != null; p = memberOfPreviousRow(p.set, state, firstOnRow)) {
401       // Make sure we don't fall out of our table.
402       if (!state.isTargetTable(p.kv)) break;
403       // Stop looking if we've exited the better candidate range.
404       if (!state.isBetterCandidate(p.kv)) break;
405       // Make into firstOnRow
406       firstOnRow = new KeyValue(p.kv.getRow(), HConstants.LATEST_TIMESTAMP);
407       // If we find something, break;
408       if (walkForwardInSingleRow(p.set, firstOnRow, state)) break;
409     }
410   }
411 
412   /**
413    * Given the specs of a column, update it, first by inserting a new record,
414    * then removing the old one.  Since there is only 1 KeyValue involved, the memstoreTS
415    * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying
416    * store will ensure that the insert/delete each are atomic. A scanner/reader will either
417    * get the new value, or the old value and all readers will eventually only see the new
418    * value after the old was removed.
419    *
420    * @param row
421    * @param family
422    * @param qualifier
423    * @param newValue
424    * @param now
425    * @return  Timestamp
426    */
427   public long updateColumnValue(byte[] row,
428                                 byte[] family,
429                                 byte[] qualifier,
430                                 long newValue,
431                                 long now) {
432    this.lock.readLock().lock();
433     try {
434       KeyValue firstKv = KeyValue.createFirstOnRow(
435           row, family, qualifier);
436       // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
437       SortedSet<KeyValue> snSs = snapshot.tailSet(firstKv);
438       if (!snSs.isEmpty()) {
439         KeyValue snKv = snSs.first();
440         // is there a matching KV in the snapshot?
441         if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) {
442           if (snKv.getTimestamp() == now) {
443             // poop,
444             now += 1;
445           }
446         }
447       }
448 
449       // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
450       // But the timestamp should also be max(now, mostRecentTsInMemstore)
451 
452       // so we cant add the new KV w/o knowing what's there already, but we also
453       // want to take this chance to delete some kvs. So two loops (sad)
454 
455       SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
456       Iterator<KeyValue> it = ss.iterator();
457       while ( it.hasNext() ) {
458         KeyValue kv = it.next();
459 
460         // if this isnt the row we are interested in, then bail:
461         if (!kv.matchingColumn(family,qualifier) || !kv.matchingRow(firstKv) ) {
462           break; // rows dont match, bail.
463         }
464 
465         // if the qualifier matches and it's a put, just RM it out of the kvset.
466         if (kv.getType() == KeyValue.Type.Put.getCode() &&
467             kv.getTimestamp() > now && firstKv.matchingQualifier(kv)) {
468           now = kv.getTimestamp();
469         }
470       }
471 
472       // create or update (upsert) a new KeyValue with
473       // 'now' and a 0 memstoreTS == immediately visible
474       return upsert(Arrays.asList(
475           new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)))
476       );
477     } finally {
478       this.lock.readLock().unlock();
479     }
480   }
481 
482   /**
483    * Update or insert the specified KeyValues.
484    * <p>
485    * For each KeyValue, insert into MemStore.  This will atomically upsert the
486    * value for that row/family/qualifier.  If a KeyValue did already exist,
487    * it will then be removed.
488    * <p>
489    * Currently the memstoreTS is kept at 0 so as each insert happens, it will
490    * be immediately visible.  May want to change this so it is atomic across
491    * all KeyValues.
492    * <p>
493    * This is called under row lock, so Get operations will still see updates
494    * atomically.  Scans will only see each KeyValue update as atomic.
495    *
496    * @param kvs
497    * @return change in memstore size
498    */
499   public long upsert(List<KeyValue> kvs) {
500    this.lock.readLock().lock();
501     try {
502       long size = 0;
503       for (KeyValue kv : kvs) {
504         kv.setMemstoreTS(0);
505         size += upsert(kv);
506       }
507       return size;
508     } finally {
509       this.lock.readLock().unlock();
510     }
511   }
512 
513   /**
514    * Inserts the specified KeyValue into MemStore and deletes any existing
515    * versions of the same row/family/qualifier as the specified KeyValue.
516    * <p>
517    * First, the specified KeyValue is inserted into the Memstore.
518    * <p>
519    * If there are any existing KeyValues in this MemStore with the same row,
520    * family, and qualifier, they are removed.
521    * <p>
522    * Callers must hold the read lock.
523    * 
524    * @param kv
525    * @return change in size of MemStore
526    */
527   private long upsert(KeyValue kv) {
528     // Add the KeyValue to the MemStore
529     // Use the internalAdd method here since we (a) already have a lock
530     // and (b) cannot safely use the MSLAB here without potentially
531     // hitting OOME - see TestMemStore.testUpsertMSLAB for a
532     // test that triggers the pathological case if we don't avoid MSLAB
533     // here.
534     long addedSize = internalAdd(kv);
535 
536     // Get the KeyValues for the row/family/qualifier regardless of timestamp.
537     // For this case we want to clean up any other puts
538     KeyValue firstKv = KeyValue.createFirstOnRow(
539         kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(),
540         kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(),
541         kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength());
542     SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
543     Iterator<KeyValue> it = ss.iterator();
544     while ( it.hasNext() ) {
545       KeyValue cur = it.next();
546 
547       if (kv == cur) {
548         // ignore the one just put in
549         continue;
550       }
551       // if this isn't the row we are interested in, then bail
552       if (!kv.matchingRow(cur)) {
553         break;
554       }
555 
556       // if the qualifier matches and it's a put, remove it
557       if (kv.matchingQualifier(cur)) {
558 
559         // to be extra safe we only remove Puts that have a memstoreTS==0
560         if (kv.getType() == KeyValue.Type.Put.getCode() &&
561             kv.getMemstoreTS() == 0) {
562           // false means there was a change, so give us the size.
563           addedSize -= heapSizeChange(kv, true);
564           it.remove();
565         }
566       } else {
567         // past the column, done
568         break;
569       }
570     }
571     return addedSize;
572   }
573 
574   /*
575    * Immutable data structure to hold member found in set and the set it was
576    * found in.  Include set because it is carrying context.
577    */
578   private static class Member {
579     final KeyValue kv;
580     final NavigableSet<KeyValue> set;
581     Member(final NavigableSet<KeyValue> s, final KeyValue kv) {
582       this.kv = kv;
583       this.set = s;
584     }
585   }
586 
587   /*
588    * @param set Set to walk back in.  Pass a first in row or we'll return
589    * same row (loop).
590    * @param state Utility and context.
591    * @param firstOnRow First item on the row after the one we want to find a
592    * member in.
593    * @return Null or member of row previous to <code>firstOnRow</code>
594    */
595   private Member memberOfPreviousRow(NavigableSet<KeyValue> set,
596       final GetClosestRowBeforeTracker state, final KeyValue firstOnRow) {
597     NavigableSet<KeyValue> head = set.headSet(firstOnRow, false);
598     if (head.isEmpty()) return null;
599     for (Iterator<KeyValue> i = head.descendingIterator(); i.hasNext();) {
600       KeyValue found = i.next();
601       if (state.isExpired(found)) {
602         i.remove();
603         continue;
604       }
605       return new Member(head, found);
606     }
607     return null;
608   }
609 
610   /**
611    * @return scanner on memstore and snapshot in this order.
612    */
613   List<KeyValueScanner> getScanners() {
614     this.lock.readLock().lock();
615     try {
616       return Collections.<KeyValueScanner>singletonList(
617           new MemStoreScanner());
618     } finally {
619       this.lock.readLock().unlock();
620     }
621   }
622 
623   /**
624    * Check if this memstore may contain the required keys
625    * @param scan
626    * @return False if the key definitely does not exist in this Memstore
627    */
628   public boolean shouldSeek(Scan scan) {
629     return timeRangeTracker.includesTimeRange(scan.getTimeRange()) ||
630         snapshotTimeRangeTracker.includesTimeRange(scan.getTimeRange());
631   }
632 
633   public TimeRangeTracker getSnapshotTimeRangeTracker() {
634     return this.snapshotTimeRangeTracker;
635   }
636 
637   /*
638    * MemStoreScanner implements the KeyValueScanner.
639    * It lets the caller scan the contents of a memstore -- both current
640    * map and snapshot.
641    * This behaves as if it were a real scanner but does not maintain position.
642    */
643   protected class MemStoreScanner implements KeyValueScanner {
644     // Next row information for either kvset or snapshot
645     private KeyValue kvsetNextRow = null;
646     private KeyValue snapshotNextRow = null;
647 
648     // iterator based scanning.
649     Iterator<KeyValue> kvsetIt;
650     Iterator<KeyValue> snapshotIt;
651 
652     // number of iterations in this reseek operation
653     int numIterReseek;
654     
655     /*
656     Some notes...
657 
658      So memstorescanner is fixed at creation time. this includes pointers/iterators into
659     existing kvset/snapshot.  during a snapshot creation, the kvset is null, and the
660     snapshot is moved.  since kvset is null there is no point on reseeking on both,
661       we can save us the trouble. During the snapshot->hfile transition, the memstore
662       scanner is re-created by StoreScanner#updateReaders().  StoreScanner should
663       potentially do something smarter by adjusting the existing memstore scanner.
664 
665       But there is a greater problem here, that being once a scanner has progressed
666       during a snapshot scenario, we currently iterate past the kvset then 'finish' up.
667       if a scan lasts a little while, there is a chance for new entries in kvset to
668       become available but we will never see them.  This needs to be handled at the
669       StoreScanner level with coordination with MemStoreScanner.
670 
671     */
672 
673     MemStoreScanner() {
674       super();
675 
676       //DebugPrint.println(" MS new@" + hashCode());
677     }
678 
679     protected KeyValue getNext(Iterator<KeyValue> it) {
680       KeyValue ret = null;
681       long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
682       //DebugPrint.println( " MS@" + hashCode() + ": threadpoint = " + readPoint);
683 
684       while (ret == null && it.hasNext()) {
685         KeyValue v = it.next();
686         if (v.getMemstoreTS() <= readPoint) {
687           // keep it.
688           ret = v;
689         }
690         numIterReseek--;
691         if (numIterReseek == 0) {
692           break;
693         }
694       }
695       return ret;
696     }
697 
698     public synchronized boolean seek(KeyValue key) {
699       if (key == null) {
700         close();
701         return false;
702       }
703       numIterReseek = 0;
704 
705       // kvset and snapshot will never be empty.
706       // if tailSet cant find anything, SS is empty (not null).
707       SortedSet<KeyValue> kvTail = kvset.tailSet(key);
708       SortedSet<KeyValue> snapshotTail = snapshot.tailSet(key);
709 
710       kvsetIt = kvTail.iterator();
711       snapshotIt = snapshotTail.iterator();
712 
713       kvsetNextRow = getNext(kvsetIt);
714       snapshotNextRow = getNext(snapshotIt);
715 
716 
717       //long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
718       //DebugPrint.println( " MS@" + hashCode() + " kvset seek: " + kvsetNextRow + " with size = " +
719       //    kvset.size() + " threadread = " + readPoint);
720       //DebugPrint.println( " MS@" + hashCode() + " snapshot seek: " + snapshotNextRow + " with size = " +
721       //    snapshot.size() + " threadread = " + readPoint);
722 
723 
724       KeyValue lowest = getLowest();
725 
726       // has data := (lowest != null)
727       return lowest != null;
728     }
729 
730     @Override
731     public boolean reseek(KeyValue key) {
732       numIterReseek = reseekNumKeys;
733       while (kvsetNextRow != null &&
734           comparator.compare(kvsetNextRow, key) < 0) {
735         kvsetNextRow = getNext(kvsetIt);
736         // if we scanned enough entries but still not able to find the
737         // kv we are looking for, better cut our costs and do a tree
738         // scan using seek.
739         if (kvsetNextRow == null && numIterReseek == 0) {
740           return seek(key);
741         }
742       }
743 
744       while (snapshotNextRow != null &&
745           comparator.compare(snapshotNextRow, key) < 0) {
746         snapshotNextRow = getNext(snapshotIt);
747         // if we scanned enough entries but still not able to find the
748         // kv we are looking for, better cut our costs and do a tree
749         // scan using seek.
750         if (snapshotNextRow == null && numIterReseek == 0) {
751           return seek(key);
752         }
753       }
754       return (kvsetNextRow != null || snapshotNextRow != null);
755     }
756 
757     public synchronized KeyValue peek() {
758       //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
759       return getLowest();
760     }
761 
762 
763     public synchronized KeyValue next() {
764       KeyValue theNext = getLowest();
765 
766       if (theNext == null) {
767           return null;
768       }
769 
770       // Advance one of the iterators
771       if (theNext == kvsetNextRow) {
772         kvsetNextRow = getNext(kvsetIt);
773       } else {
774         snapshotNextRow = getNext(snapshotIt);
775       }
776 
777       //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint();
778       //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " +
779       //    getLowest() + " threadpoint=" + readpoint);
780       return theNext;
781     }
782 
783     protected KeyValue getLowest() {
784       return getLower(kvsetNextRow,
785           snapshotNextRow);
786     }
787 
788     /*
789      * Returns the lower of the two key values, or null if they are both null.
790      * This uses comparator.compare() to compare the KeyValue using the memstore
791      * comparator.
792      */
793     protected KeyValue getLower(KeyValue first, KeyValue second) {
794       if (first == null && second == null) {
795         return null;
796       }
797       if (first != null && second != null) {
798         int compare = comparator.compare(first, second);
799         return (compare <= 0 ? first : second);
800       }
801       return (first != null ? first : second);
802     }
803 
804     public synchronized void close() {
805       this.kvsetNextRow = null;
806       this.snapshotNextRow = null;
807 
808       this.kvsetIt = null;
809       this.snapshotIt = null;
810     }
811 
812     /**
813      * MemStoreScanner returns max value as sequence id because it will
814      * always have the latest data among all files.
815      */
816     @Override
817     public long getSequenceID() {
818       return Long.MAX_VALUE;
819     }
820   }
821 
822   public final static long FIXED_OVERHEAD = ClassSize.align(
823       ClassSize.OBJECT + (12 * ClassSize.REFERENCE));
824 
825   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
826       ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +
827       ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST +
828       (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
829 
830   /*
831    * Calculate how the MemStore size has changed.  Includes overhead of the
832    * backing Map.
833    * @param kv
834    * @param notpresent True if the kv was NOT present in the set.
835    * @return Size
836    */
837   long heapSizeChange(final KeyValue kv, final boolean notpresent) {
838     return notpresent ?
839         ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()):
840         0;
841   }
842 
843   /**
844    * Get the entire heap usage for this MemStore not including keys in the
845    * snapshot.
846    */
847   @Override
848   public long heapSize() {
849     return size.get();
850   }
851 
852   /**
853    * Get the heap usage of KVs in this MemStore.
854    */
855   public long keySize() {
856     return heapSize() - DEEP_OVERHEAD;
857   }
858 
859   /**
860    * Code to help figure if our approximation of object heap sizes is close
861    * enough.  See hbase-900.  Fills memstores then waits so user can heap
862    * dump and bring up resultant hprof in something like jprofiler which
863    * allows you get 'deep size' on objects.
864    * @param args main args
865    */
866   public static void main(String [] args) {
867     RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
868     LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
869       runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());
870     LOG.info("vmInputArguments=" + runtime.getInputArguments());
871     MemStore memstore1 = new MemStore();
872     // TODO: x32 vs x64
873     long size = 0;
874     final int count = 10000;
875     byte [] fam = Bytes.toBytes("col");
876     byte [] qf = Bytes.toBytes("umn");
877     byte [] empty = new byte[0];
878     for (int i = 0; i < count; i++) {
879       // Give each its own ts
880       size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
881     }
882     LOG.info("memstore1 estimated size=" + size);
883     for (int i = 0; i < count; i++) {
884       size += memstore1.add(new KeyValue(Bytes.toBytes(i), fam, qf, i, empty));
885     }
886     LOG.info("memstore1 estimated size (2nd loading of same data)=" + size);
887     // Make a variably sized memstore.
888     MemStore memstore2 = new MemStore();
889     for (int i = 0; i < count; i++) {
890       size += memstore2.add(new KeyValue(Bytes.toBytes(i), fam, qf, i,
891         new byte[i]));
892     }
893     LOG.info("memstore2 estimated size=" + size);
894     final int seconds = 30;
895     LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
896     for (int i = 0; i < seconds; i++) {
897       // Thread.sleep(1000);
898     }
899     LOG.info("Exiting.");
900   }
901 }