View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.directory.mavibot.btree.memory;
21  
22  
23  import static org.apache.directory.mavibot.btree.memory.InternalUtil.setDupsContainer;
24  
25  import java.io.IOException;
26  import java.lang.reflect.Array;
27  import java.util.LinkedList;
28  
29  import org.apache.directory.mavibot.btree.Tuple;
30  import org.apache.directory.mavibot.btree.exception.EndOfFileExceededException;
31  import org.apache.directory.mavibot.btree.exception.KeyNotFoundException;
32  
33  
34  /**
35   * A MVCC Leaf. It stores the keys and values. It does not have any children.
36   * 
37   * @param <K> The type for the Key
38   * @param <V> The type for the stored value
39   *
40   * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
41   */
42  /* No qualifier */class Leaf<K, V> extends AbstractPage<K, V>
43  {
44      /** Values associated with keys */
45      protected ElementHolder<V, K, V>[] values;
46  
47  
48      /**
49       * Constructor used to create a new Leaf when we read it from a file.
50       * 
51       * @param btree The BTree this page belongs to.
52       */
53      /* No qualifier */Leaf( BTree<K, V> btree )
54      {
55          super( btree );
56      }
57  
58  
59      /**
60       * Internal constructor used to create Page instance used when a page is being copied or overflow
61       * 
62       * @param btree The BTree this page belongs to.
63       * @param revision The page revision
64       * @param nbElems The number of elements this page will contain
65       */
66      @SuppressWarnings("unchecked")
67      // Cannot create an array of generic objects
68      /* No qualifier */Leaf( BTree<K, V> btree, long revision, int nbElems )
69      {
70          super( btree, revision, nbElems );
71  
72          if ( btree.isAllowDuplicates() )
73          {
74              this.values = ( MultipleMemoryHolder<K, V>[] ) Array.newInstance( MultipleMemoryHolder.class,
75                  nbElems );
76          }
77          else
78          {
79              this.values = ( MemoryHolder<K, V>[] ) Array.newInstance( MemoryHolder.class, nbElems );
80          }
81      }
82  
83  
84      /**
85       * {@inheritDoc}
86       */
87      public InsertResult<K, V> insert( long revision, K key, V value ) throws IOException
88      {
89          // Find the key into this leaf
90          int pos = findPos( key );
91  
92          if ( pos < 0 )
93          {
94              // We already have the key in the page : replace the value
95              // into a copy of this page, unless the page has already be copied
96              int index = -( pos + 1 );
97  
98              // Replace the existing value in a copy of the current page
99              InsertResult<K, V> result = replaceElement( revision, key, value, index );
100 
101             return result;
102         }
103 
104         // The key is not present in the leaf. We have to add it in the page
105         if ( nbElems < btree.getPageSize() )
106         {
107             // The current page is not full, it can contain the added element.
108             // We insert it into a copied page and return the result
109             Page<K, V> modifiedPage = addElement( revision, key, value, pos );
110 
111             InsertResult<K, V> result = new ModifyResult<K, V>( modifiedPage, null );
112             result.addCopiedPage( this );
113 
114             return result;
115         }
116         else
117         {
118             // The Page is already full : we split it and return the overflow element,
119             // after having created two pages.
120             InsertResult<K, V> result = addAndSplit( revision, key, value, pos );
121             result.addCopiedPage( this );
122 
123             return result;
124         }
125     }
126 
127 
128     /**
129      * {@inheritDoc}
130      */
131     @SuppressWarnings("unchecked")
132     public DeleteResult<K, V> delete( long revision, K key, V value, Page<K, V> parent, int parentPos )
133         throws IOException
134     {
135         // Check that the leaf is not empty
136         if ( nbElems == 0 )
137         {
138             // Empty leaf
139             return NotPresentResult.NOT_PRESENT;
140         }
141 
142         // Find the key in the page
143         int pos = findPos( key );
144 
145         if ( pos >= 0 )
146         {
147             // Not found : return the not present result.
148             return NotPresentResult.NOT_PRESENT;
149         }
150 
151         // Get the removed element
152         Tuple<K, V> removedElement = null;
153 
154         // flag to detect if a key was completely removed
155         boolean keyRemoved = false;
156 
157         int index = -( pos + 1 );
158 
159         if ( btree.isAllowDuplicates() )
160         {
161             MultipleMemoryHolder<K, V> mvHolder = ( MultipleMemoryHolder<K, V> ) values[index];
162 
163             //FIXME should the internal sub-tree should be deleted from RM?
164 
165             V existingVal = mvHolder.getValue( btree );
166 
167             if ( value == null ) // this is a case to delete entire <K,sub-BTree> or <K,single-V>
168             {
169                 removedElement = new Tuple<K, V>( keys[index], existingVal ); // the entire value was removed
170                 keyRemoved = true;
171             }
172             else
173             {
174                 if ( mvHolder.isSingleValue() )
175                 {
176                     if ( btree.getValueSerializer().compare( value, existingVal ) == 0 )
177                     {
178                         removedElement = new Tuple<K, V>( keys[index], existingVal ); // the entire value was removed
179                         keyRemoved = true;
180                     }
181                     else
182                     // value is not found
183                     {
184                         return NotPresentResult.NOT_PRESENT;
185                     }
186                 }
187                 else
188                 {
189                     BTree<V, V> dups = ( BTree<V, V> ) existingVal;
190 
191                     if ( dups.hasKey( value ) )
192                     {
193                         dups.delete( value );
194 
195                         if ( dups.getNbElems() == 0 )
196                         {
197                             keyRemoved = true;
198                         }
199                         else
200                         {
201                             if ( dups.getNbElems() == 1 )
202                             {
203                                 //FIXME switch to single value mode
204                                 mvHolder.switchToSingleValMode();
205                             }
206                         }
207 
208                         removedElement = new Tuple<K, V>( keys[index], value ); // we deleted only one value (even if it is from a tree of size 1)
209                     }
210                     else
211                     // value is not found
212                     {
213                         return NotPresentResult.NOT_PRESENT;
214                     }
215                 }
216             }
217         }
218         else
219         {
220             V existing = values[index].getValue( btree );
221 
222             if ( ( ( existing == null ) && ( value == null ) ) || ( value == null ) )
223             {
224                 removedElement = new Tuple<K, V>( keys[index], existing );
225                 keyRemoved = true;
226             }
227             else if ( btree.getValueSerializer().compare( value, existing ) == 0 )
228             {
229                 removedElement = new Tuple<K, V>( keys[index], value );
230                 keyRemoved = true;
231             }
232             else
233             {
234                 return NotPresentResult.NOT_PRESENT;
235             }
236         }
237 
238         Leaf<K, V> newLeaf = null;
239 
240         if ( keyRemoved )
241         {
242             newLeaf = new Leaf<K, V>( btree, revision, nbElems - 1 );
243         }
244         else
245         {
246             newLeaf = new Leaf<K, V>( btree, revision, nbElems );
247         }
248 
249         // Create the result
250         DeleteResult<K, V> defaultResult = new RemoveResult<K, V>( newLeaf, removedElement );
251 
252         // If the parent is null, then this page is the root page.
253         if ( parent == null )
254         {
255             // Just remove the entry if it's present
256             copyAfterRemovingElement( keyRemoved, newLeaf, index );
257 
258             // The current page is added in the copied page list
259             defaultResult.addCopiedPage( this );
260 
261             return defaultResult;
262         }
263         else if ( keyRemoved )
264         {
265             // The current page is not the root. Check if the leaf has more than N/2
266             // elements
267             int halfSize = btree.getPageSize() / 2;
268 
269             if ( nbElems == halfSize )
270             {
271                 // We have to find a sibling now, and either borrow an entry from it
272                 // if it has more than N/2 elements, or to merge the two pages.
273                 // Check in both next and previous page, if they have the same parent
274                 // and select the biggest page with the same parent to borrow an element.
275                 int siblingPos = selectSibling( ( Node<K, V> ) parent, parentPos );
276                 Leaf<K, V> sibling = ( Leaf<K, V> ) ( ( ( Node<K, V> ) parent ).children[siblingPos].getValue( btree ) );
277 
278                 if ( sibling.getNbElems() == halfSize )
279                 {
280                     // We will merge the current page with its sibling
281                     DeleteResult<K, V> result = mergeWithSibling( removedElement, revision, sibling,
282                         ( siblingPos < parentPos ), index );
283 
284                     return result;
285                 }
286                 else
287                 {
288                     // We can borrow the element from the left sibling
289                     if ( siblingPos < parentPos )
290                     {
291                         DeleteResult<K, V> result = borrowFromLeft( removedElement, revision, sibling, index );
292 
293                         return result;
294                     }
295                     else
296                     {
297                         // Borrow from the right sibling
298                         DeleteResult<K, V> result = borrowFromRight( removedElement, revision, sibling, index );
299 
300                         return result;
301                     }
302                 }
303             }
304             else
305             {
306                 // The page has more than N/2 elements.
307                 // We simply remove the element from the page, and if it was the leftmost,
308                 // we return the new pivot (it will replace any instance of the removed
309                 // key in its parents)
310                 copyAfterRemovingElement( keyRemoved, newLeaf, index );
311 
312                 // The current page is added in the copied page list
313                 defaultResult.addCopiedPage( this );
314 
315                 return defaultResult;
316             }
317         }
318         else
319         {
320             // Last, not least : we can copy the full page
321             // Copy the keys and the values
322             System.arraycopy( keys, 0, newLeaf.keys, 0, nbElems );
323             System.arraycopy( values, 0, newLeaf.values, 0, nbElems );
324 
325             // The current page is added in the copied page list
326             defaultResult.addCopiedPage( this );
327 
328             return defaultResult;
329         }
330     }
331 
332 
333     /**
334      * Merges the sibling with the current leaf, after having removed the element in the page.
335      * 
336      * @param revision The new revision
337      * @param sibling The sibling we will merge with
338      * @param isLeft Tells if the sibling is on the left or on the right
339      * @param pos The position of the removed element
340      * @return The new created leaf containing the sibling and the old page.
341      * @throws IOException If we have an error while trying to access the page
342      */
343     private DeleteResult<K, V> mergeWithSibling( Tuple<K, V> removedElement, long revision, Leaf<K, V> sibling,
344         boolean isLeft, int pos )
345         throws EndOfFileExceededException, IOException
346     {
347         // Create the new page. It will contain N - 1 elements (the maximum number)
348         // as we merge two pages that contain N/2 elements minus the one we remove
349         Leaf<K, V> newLeaf = new Leaf<K, V>( btree, revision, btree.getPageSize() - 1 );
350 
351         if ( isLeft )
352         {
353             // The sibling is on the left
354             // Copy all the elements from the sibling first
355             System.arraycopy( sibling.keys, 0, newLeaf.keys, 0, sibling.nbElems );
356             System.arraycopy( sibling.values, 0, newLeaf.values, 0, sibling.nbElems );
357 
358             // Copy all the elements from the page up to the deletion position
359             System.arraycopy( keys, 0, newLeaf.keys, sibling.nbElems, pos );
360             System.arraycopy( values, 0, newLeaf.values, sibling.nbElems, pos );
361 
362             // And copy the remaining elements after the deletion point
363             System.arraycopy( keys, pos + 1, newLeaf.keys, sibling.nbElems + pos, nbElems - pos - 1 );
364             System.arraycopy( values, pos + 1, newLeaf.values, sibling.nbElems + pos, nbElems - pos - 1 );
365         }
366         else
367         {
368             // The sibling is on the right
369             // Copy all the elements from the page up to the deletion position
370             System.arraycopy( keys, 0, newLeaf.keys, 0, pos );
371             System.arraycopy( values, 0, newLeaf.values, 0, pos );
372 
373             // Then copy the remaining elements after the deletion point
374             System.arraycopy( keys, pos + 1, newLeaf.keys, pos, nbElems - pos - 1 );
375             System.arraycopy( values, pos + 1, newLeaf.values, pos, nbElems - pos - 1 );
376 
377             // And copy all the elements from the sibling
378             System.arraycopy( sibling.keys, 0, newLeaf.keys, nbElems - 1, sibling.nbElems );
379             System.arraycopy( sibling.values, 0, newLeaf.values, nbElems - 1, sibling.nbElems );
380         }
381 
382         // And create the result
383         DeleteResult<K, V> result = new MergedWithSiblingResult<K, V>( newLeaf,
384             removedElement );
385 
386         result.addCopiedPage( this );
387         result.addCopiedPage( sibling );
388 
389         return result;
390     }
391 
392 
393     /**
394      * Borrows an element from the left sibling, creating a new sibling with one
395      * less element and creating a new page where the element to remove has been
396      * deleted and the borrowed element added on the left.
397      * 
398      * @param revision The new revision for all the pages
399      * @param sibling The left sibling
400      * @param pos The position of the element to remove
401      * @return The resulting pages
402      * @throws IOException If we have an error while trying to access the page 
403      */
404     private DeleteResult<K, V> borrowFromLeft( Tuple<K, V> removedElement, long revision, Leaf<K, V> sibling, int pos )
405         throws IOException
406     {
407         // The sibling is on the left, borrow the rightmost element
408         K siblingKey = sibling.keys[sibling.getNbElems() - 1];
409         ElementHolder<V, K, V> siblingValue = sibling.values[sibling.getNbElems() - 1];
410 
411         // Create the new sibling, with one less element at the end
412         Leaf<K, V> newSibling = ( Leaf<K, V> ) sibling.copy( revision, sibling.getNbElems() - 1 );
413 
414         // Create the new page and add the new element at the beginning
415         // First copy the current page, with the same size
416         Leaf<K, V> newLeaf = new Leaf<K, V>( btree, revision, nbElems );
417 
418         // Insert the borrowed element
419         newLeaf.keys[0] = siblingKey;
420         newLeaf.values[0] = siblingValue;
421 
422         // Copy the keys and the values up to the insertion position,
423         System.arraycopy( keys, 0, newLeaf.keys, 1, pos );
424         System.arraycopy( values, 0, newLeaf.values, 1, pos );
425 
426         // And copy the remaining elements
427         System.arraycopy( keys, pos + 1, newLeaf.keys, pos + 1, keys.length - pos - 1 );
428         System.arraycopy( values, pos + 1, newLeaf.values, pos + 1, values.length - pos - 1 );
429 
430         DeleteResult<K, V> result = new BorrowedFromLeftResult<K, V>( newLeaf, newSibling, removedElement );
431 
432         // Add the copied pages to the list
433         result.addCopiedPage( this );
434         result.addCopiedPage( sibling );
435 
436         return result;
437     }
438 
439 
440     /**
441      * Borrows an element from the right sibling, creating a new sibling with one
442      * less element and creating a new page where the element to remove has been
443      * deleted and the borrowed element added on the right.
444      * 
445      * @param revision The new revision for all the pages
446      * @param sibling The right sibling
447      * @param pos The position of the element to remove
448      * @return The resulting pages
449      * @throws IOException If we have an error while trying to access the page 
450      */
451     private DeleteResult<K, V> borrowFromRight( Tuple<K, V> removedElement, long revision, Leaf<K, V> sibling, int pos )
452         throws IOException
453     {
454         // The sibling is on the left, borrow the rightmost element
455         K siblingKey = sibling.keys[0];
456         ElementHolder<V, K, V> siblingHolder = sibling.values[0];
457 
458         // Create the new sibling
459         Leaf<K, V> newSibling = new Leaf<K, V>( btree, revision, sibling.getNbElems() - 1 );
460 
461         // Copy the keys and the values from 1 to N in the new sibling
462         System.arraycopy( sibling.keys, 1, newSibling.keys, 0, sibling.nbElems - 1 );
463         System.arraycopy( sibling.values, 1, newSibling.values, 0, sibling.nbElems - 1 );
464 
465         // Create the new page and add the new element at the end
466         // First copy the current page, with the same size
467         Leaf<K, V> newLeaf = new Leaf<K, V>( btree, revision, nbElems );
468 
469         // Insert the borrowed element at the end
470         newLeaf.keys[nbElems - 1] = siblingKey;
471         newLeaf.values[nbElems - 1] = siblingHolder;
472 
473         // Copy the keys and the values up to the deletion position,
474         System.arraycopy( keys, 0, newLeaf.keys, 0, pos );
475         System.arraycopy( values, 0, newLeaf.values, 0, pos );
476 
477         // And copy the remaining elements
478         System.arraycopy( keys, pos + 1, newLeaf.keys, pos, keys.length - pos - 1 );
479         System.arraycopy( values, pos + 1, newLeaf.values, pos, values.length - pos - 1 );
480 
481         DeleteResult<K, V> result = new BorrowedFromRightResult<K, V>( newLeaf, newSibling, removedElement );
482 
483         // Add the copied pages to the list
484         result.addCopiedPage( this );
485         result.addCopiedPage( sibling );
486 
487         return result;
488     }
489 
490 
491     /**
492      * Copies the elements of the current page to a new page
493      *
494      * @param keyRemoved a flag stating if the key was removed
495      * @param newLeaf The new page into which the remaining keys and values will be copied
496      * @param pos The position into the page of the element to remove
497      * @throws IOException If we have an error while trying to access the page
498      */
499     private void copyAfterRemovingElement( boolean keyRemoved, Leaf<K, V> newLeaf, int pos ) throws IOException
500     {
501         if ( keyRemoved )
502         {
503             // Deal with the special case of a page with only one element by skipping
504             // the copy, as we won't have any remaining  element in the page
505             if ( nbElems == 1 )
506             {
507                 return;
508             }
509 
510             // Copy the keys and the values up to the insertion position
511             System.arraycopy( keys, 0, newLeaf.keys, 0, pos );
512             System.arraycopy( values, 0, newLeaf.values, 0, pos );
513 
514             // And copy the elements after the position
515             System.arraycopy( keys, pos + 1, newLeaf.keys, pos, keys.length - pos - 1 );
516             System.arraycopy( values, pos + 1, newLeaf.values, pos, values.length - pos - 1 );
517         }
518         else
519         // one of the many values of the same key was removed, no change in the number of keys
520         {
521             System.arraycopy( keys, 0, newLeaf.keys, 0, nbElems );
522             System.arraycopy( values, 0, newLeaf.values, 0, nbElems );
523         }
524     }
525 
526 
527     /**
528      * {@inheritDoc}
529      */
530     public V get( K key ) throws KeyNotFoundException, IOException
531     {
532         int pos = findPos( key );
533 
534         if ( pos < 0 )
535         {
536             if ( btree.isAllowDuplicates() )
537             {
538                 MultipleMemoryHolder<K, V> mvHolder = ( MultipleMemoryHolder<K, V> ) values[-( pos + 1 )];
539                 if ( mvHolder.isSingleValue() )
540                 {
541                     return mvHolder.getValue( btree );
542                 }
543                 else
544                 {
545                     // always return the first value for get(key) when duplicates are allowed
546                     BTree<V, V> dupTree = ( BTree<V, V> ) mvHolder.getValue( btree );
547                     return dupTree.rootPage.getLeftMostKey();
548                 }
549             }
550 
551             V v = values[-( pos + 1 )].getValue( btree );
552 
553             return v;
554         }
555         else
556         {
557             throw KEY_NOT_FOUND_EXCEPTION;
558         }
559     }
560 
561 
562     /**
563      * {@inheritDoc}
564      */
565     @Override
566     public DuplicateKeyVal<V> getValues( K key ) throws KeyNotFoundException, IOException, IllegalArgumentException
567     {
568         if ( !btree.isAllowDuplicates() )
569         {
570             throw new IllegalArgumentException( "Duplicates are not allowed in this tree" );
571         }
572 
573         int pos = findPos( key );
574 
575         if ( pos < 0 )
576         {
577             MultipleMemoryHolder<K, V> mvHolder = ( MultipleMemoryHolder<K, V> ) values[-( pos + 1 )];
578 
579             if ( mvHolder.isSingleValue() )
580             {
581                 return new DuplicateKeyVal<V>( mvHolder.getValue( btree ) );
582             }
583 
584             return new DuplicateKeyVal<V>( ( BTree<V, V> ) mvHolder.getValue( btree ) );
585         }
586         else
587         {
588             throw KEY_NOT_FOUND_EXCEPTION;
589         }
590     }
591 
592 
593     /**
594      * {@inheritDoc}
595      */
596     public boolean hasKey( K key )
597     {
598         int pos = findPos( key );
599 
600         if ( pos < 0 )
601         {
602             return true;
603         }
604 
605         return false;
606     }
607 
608 
609     @Override
610     public boolean contains( K key, V value ) throws IOException
611     {
612         int pos = findPos( key );
613 
614         if ( pos < 0 )
615         {
616             if ( btree.isAllowDuplicates() )
617             {
618                 MultipleMemoryHolder<K, V> mvHolder = ( MultipleMemoryHolder<K, V> ) values[-( pos + 1 )];
619                 if ( mvHolder.isSingleValue() )
620                 {
621                     return ( btree.getValueSerializer().compare( value, mvHolder.getValue( btree ) ) == 0 );
622                 }
623                 else
624                 {
625                     // always return the first value for get(key) when duplicates are allowed
626                     BTree<V, V> dupTree = ( ( BTree<V, V> ) mvHolder.getValue( btree ) );
627                     return dupTree.hasKey( value );
628                 }
629             }
630             else
631             {
632                 V v = values[-( pos + 1 )].getValue( btree );
633                 return ( btree.getValueSerializer().compare( value, v ) == 0 );
634             }
635         }
636         else
637         {
638             return false;
639         }
640     }
641 
642 
643     /**
644      * {@inheritDoc}
645      */
646     public ElementHolder<V, K, V> getValue( int pos )
647     {
648         if ( pos < nbElems )
649         {
650             return values[pos];
651         }
652         else
653         {
654             return null;
655         }
656     }
657 
658 
659     /**
660      * Sets the value at a give position
661      * @param pos The position in the values array
662      * @param value the value to inject
663      */
664     public void setValue( int pos, ElementHolder<V, K, V> value )
665     {
666         values[pos] = value;
667     }
668 
669 
670     /**
671      * {@inheritDoc}
672      */
673     public CursorImpl<K, V> browse( K key, Transaction<K, V> transaction, LinkedList<ParentPos<K, V>> stack )
674     {
675         int pos = findPos( key );
676         CursorImpl<K, V> cursor = null;
677 
678         if ( pos < 0 )
679         {
680             int index = -( pos + 1 );
681 
682             // The first element has been found. Create the cursor
683             ParentPos<K, V> parentPos = new ParentPos<K, V>( this, index );
684             setDupsContainer( parentPos, btree );
685             stack.push( parentPos );
686 
687             cursor = new CursorImpl<K, V>( btree, transaction, stack );
688         }
689         else
690         {
691             // The key has not been found. Select the value just above, if we have one
692             if ( pos < nbElems )
693             {
694                 ParentPos<K, V> parentPos = new ParentPos<K, V>( this, pos );
695                 setDupsContainer( parentPos, btree );
696                 stack.push( parentPos );
697 
698                 cursor = new CursorImpl<K, V>( btree, transaction, stack );
699             }
700             else
701             {
702                 // Not found : return a null cursor
703                 stack.push( new ParentPos<K, V>( null, -1 ) );
704 
705                 return new CursorImpl<K, V>( btree, transaction, stack );
706             }
707         }
708 
709         return cursor;
710     }
711 
712 
713     /**
714      * {@inheritDoc}
715      */
716     public CursorImpl<K, V> browse( Transaction<K, V> transaction, LinkedList<ParentPos<K, V>> stack )
717     {
718         int pos = 0;
719         CursorImpl<K, V> cursor = null;
720 
721         if ( nbElems == 0 )
722         {
723             // The tree is empty, it's the root, we have nothing to return
724             stack.push( new ParentPos<K, V>( null, -1 ) );
725 
726             return new CursorImpl<K, V>( btree, transaction, stack );
727         }
728         else
729         {
730             // Start at the beginning of the page
731             ParentPos<K, V> parentPos = new ParentPos<K, V>( this, pos );
732 
733             setDupsContainer( parentPos, btree );
734 
735             stack.push( parentPos );
736 
737             cursor = new CursorImpl<K, V>( btree, transaction, stack );
738         }
739 
740         return cursor;
741     }
742 
743 
744     /**
745      * Copy the current page and all of the keys, values and children, if it's not a leaf.
746      * 
747      * @param revision The new revision
748      * @param nbElems The number of elements to copy
749      * @return The copied page
750      */
751     private Page<K, V> copy( long revision, int nbElems )
752     {
753         Leaf<K, V> newLeaf = new Leaf<K, V>( btree, revision, nbElems );
754 
755         // Copy the keys and the values
756         System.arraycopy( keys, 0, newLeaf.keys, 0, nbElems );
757         System.arraycopy( values, 0, newLeaf.values, 0, nbElems );
758 
759         return newLeaf;
760     }
761 
762 
763     /**
764      * Copy the current page if needed, and replace the value at the position we have found the key.
765      * 
766      * @param revision The new page revision
767      * @param key The new key
768      * @param value the new value
769      * @param pos The position of the key in the page
770      * @return The copied page
771      * @throws IOException If we have an error while trying to access the page
772      */
773     private InsertResult<K, V> replaceElement( long revision, K key, V value, int pos )
774         throws IOException
775     {
776         Leaf<K, V> newLeaf = this;
777 
778         if ( this.revision != revision )
779         {
780             // The page hasn't been modified yet, we need to copy it first
781             newLeaf = ( Leaf<K, V> ) copy( revision, nbElems );
782         }
783 
784         V oldValue = null;
785 
786         if ( btree.isAllowDuplicates() )
787         {
788             MultipleMemoryHolder<K, V> mvHolder = ( MultipleMemoryHolder<K, V> ) newLeaf.values[pos];
789 
790             if ( mvHolder.isSingleValue() )
791             {
792                 V singleVal = mvHolder.getValue( btree );
793 
794                 // see if the given value already matches
795                 if ( btree.getValueSerializer().compare( value, singleVal ) == 0 )
796                 {
797                     oldValue = value;
798                 }
799                 else
800                 // create a sub-tree
801                 {
802                     mvHolder.createAndSwitchToSubTree();
803                 }
804             }
805 
806             // if oldValue is null then the given 'value' is different and needs to be added
807             // ('oldValue' will be not null if it matched with the 'singleValue', see above nested if block )
808             if ( oldValue == null )
809             {
810                 BTree<V, V> dupValues = ( BTree<V, V> ) mvHolder.getValue( btree );
811 
812                 // return value will always be null  here 
813                 if ( !dupValues.hasKey( value ) )
814                 {
815                     dupValues.insert( value, null, 0 );
816                 }
817                 else
818                 {
819                     oldValue = value;
820                 }
821             }
822         }
823         else
824         {
825             // Now we can inject the value
826             oldValue = newLeaf.values[pos].getValue( btree );
827             newLeaf.values[pos] = btree.createValueHolder( value );
828         }
829 
830         // Create the result
831         InsertResult<K, V> result = new ModifyResult<K, V>( newLeaf, oldValue );
832         result.addCopiedPage( this );
833 
834         return result;
835     }
836 
837 
838     /**
839      * Adds a new <K, V> into a copy of the current page at a given position. We return the
840      * modified page. The new page will have one more element than the current page.
841      * 
842      * @param revision The revision of the modified page
843      * @param key The key to insert
844      * @param value The value to insert
845      * @param pos The position into the page
846      * @return The modified page with the <K,V> element added
847      */
848     private Page<K, V> addElement( long revision, K key, V value, int pos )
849     {
850         // First copy the current page, but add one element in the copied page
851         Leaf<K, V> newLeaf = new Leaf<K, V>( btree, revision, nbElems + 1 );
852 
853         // Atm, store the value in memory
854 
855         ElementHolder valueHolder = null;
856 
857         if ( btree.isAllowDuplicates() )
858         {
859             valueHolder = new MultipleMemoryHolder<K, V>( btree, value );
860         }
861         else
862         {
863             valueHolder = new MemoryHolder<K, V>( btree, value );
864         }
865 
866         //ValueHolder<K, V> valueHolder = btree.createHolder( value );
867 
868         // Deal with the special case of an empty page
869         if ( nbElems == 0 )
870         {
871             newLeaf.keys[0] = key;
872 
873             newLeaf.values[0] = valueHolder;
874         }
875         else
876         {
877             // Copy the keys and the values up to the insertion position
878             System.arraycopy( keys, 0, newLeaf.keys, 0, pos );
879             System.arraycopy( values, 0, newLeaf.values, 0, pos );
880 
881             // Add the new element
882             newLeaf.keys[pos] = key;
883             newLeaf.values[pos] = valueHolder;
884 
885             // And copy the remaining elements
886             System.arraycopy( keys, pos, newLeaf.keys, pos + 1, keys.length - pos );
887             System.arraycopy( values, pos, newLeaf.values, pos + 1, values.length - pos );
888         }
889 
890         return newLeaf;
891     }
892 
893 
894     /**
895      * Split a full page into two new pages, a left, a right and a pivot element. The new pages will
896      * each contains half of the original elements. <br/>
897      * The pivot will be computed, depending on the place
898      * we will inject the newly added element. <br/>
899      * If the newly added element is in the middle, we will use it
900      * as a pivot. Otherwise, we will use either the last element in the left page if the element is added
901      * on the left, or the first element in the right page if it's added on the right.
902      * 
903      * @param revision The new revision for all the created pages
904      * @param key The key to add
905      * @param value The value to add
906      * @param pos The position of the insertion of the new element
907      * @return An OverflowPage containing the pivot, and the new left and right pages
908      */
909     private InsertResult<K, V> addAndSplit( long revision, K key, V value, int pos )
910     {
911         int middle = btree.getPageSize() >> 1;
912         Leaf<K, V> leftLeaf = null;
913         Leaf<K, V> rightLeaf = null;
914         ElementHolder<V, K, V> valueHolder = btree.createValueHolder( value );
915 
916         // Determinate where to store the new value
917         if ( pos <= middle )
918         {
919             // The left page will contain the new value
920             leftLeaf = new Leaf<K, V>( btree, revision, middle + 1 );
921 
922             // Copy the keys and the values up to the insertion position
923             System.arraycopy( keys, 0, leftLeaf.keys, 0, pos );
924             System.arraycopy( values, 0, leftLeaf.values, 0, pos );
925 
926             // Add the new element
927             leftLeaf.keys[pos] = key;
928             leftLeaf.values[pos] = valueHolder;
929 
930             // And copy the remaining elements
931             System.arraycopy( keys, pos, leftLeaf.keys, pos + 1, middle - pos );
932             System.arraycopy( values, pos, leftLeaf.values, pos + 1, middle - pos );
933 
934             // Now, create the right page
935             rightLeaf = new Leaf<K, V>( btree, revision, middle );
936 
937             // Copy the keys and the values in the right page
938             System.arraycopy( keys, middle, rightLeaf.keys, 0, middle );
939             System.arraycopy( values, middle, rightLeaf.values, 0, middle );
940         }
941         else
942         {
943             // Create the left page
944             leftLeaf = new Leaf<K, V>( btree, revision, middle );
945 
946             // Copy all the element into the left page
947             System.arraycopy( keys, 0, leftLeaf.keys, 0, middle );
948             System.arraycopy( values, 0, leftLeaf.values, 0, middle );
949 
950             // Now, create the right page
951             rightLeaf = new Leaf<K, V>( btree, revision, middle + 1 );
952 
953             int rightPos = pos - middle;
954 
955             // Copy the keys and the values up to the insertion position
956             System.arraycopy( keys, middle, rightLeaf.keys, 0, rightPos );
957             System.arraycopy( values, middle, rightLeaf.values, 0, rightPos );
958 
959             // Add the new element
960             rightLeaf.keys[rightPos] = key;
961             rightLeaf.values[rightPos] = valueHolder;
962 
963             // And copy the remaining elements
964             System.arraycopy( keys, pos, rightLeaf.keys, rightPos + 1, nbElems - pos );
965             System.arraycopy( values, pos, rightLeaf.values, rightPos + 1, nbElems - pos );
966         }
967 
968         // Get the pivot
969         K pivot = rightLeaf.keys[0];
970 
971         // Create the result
972         InsertResult<K, V> result = new SplitResult<K, V>( pivot, leftLeaf, rightLeaf );
973 
974         return result;
975     }
976 
977 
978     /**
979      * {@inheritDoc}
980      */
981     public K getLeftMostKey()
982     {
983         return keys[0];
984     }
985 
986 
987     /**
988      * {@inheritDoc}
989      */
990     public K getRightMostKey()
991     {
992         return keys[nbElems - 1];
993     }
994 
995 
996     /**
997      * {@inheritDoc}
998      */
999     public Tuple<K, V> findLeftMost() throws IOException
1000     {
1001         V val = null;
1002 
1003         if ( btree.isAllowDuplicates() )
1004         {
1005             BTree<V, V> dupTree = ( BTree<V, V> ) values[0].getValue( btree );
1006             val = dupTree.rootPage.getLeftMostKey();
1007         }
1008         else
1009         {
1010             val = values[0].getValue( btree );
1011         }
1012 
1013         return new Tuple<K, V>( keys[0], val );
1014     }
1015 
1016 
1017     /**
1018      * {@inheritDoc}
1019      */
1020     public Tuple<K, V> findRightMost() throws EndOfFileExceededException, IOException
1021     {
1022         V val = null;
1023 
1024         if ( btree.isAllowDuplicates() )
1025         {
1026             BTree<V, V> dupTree = ( BTree<V, V> ) values[nbElems - 1].getValue( btree );
1027             val = dupTree.rootPage.getRightMostKey();
1028         }
1029         else
1030         {
1031             val = values[nbElems - 1].getValue( btree );
1032         }
1033 
1034         return new Tuple<K, V>( keys[nbElems - 1], val );
1035     }
1036 
1037 
1038     /**
1039      * @see Object#toString()
1040      */
1041     public String toString()
1042     {
1043         StringBuilder sb = new StringBuilder();
1044 
1045         sb.append( "Leaf[" );
1046         sb.append( super.toString() );
1047 
1048         sb.append( "] -> {" );
1049 
1050         if ( nbElems > 0 )
1051         {
1052             boolean isFirst = true;
1053 
1054             for ( int i = 0; i < nbElems; i++ )
1055             {
1056                 if ( isFirst )
1057                 {
1058                     isFirst = false;
1059                 }
1060                 else
1061                 {
1062                     sb.append( ", " );
1063                 }
1064 
1065                 sb.append( "<" ).append( keys[i] ).append( "," ).append( values[i] ).append( ">" );
1066             }
1067         }
1068 
1069         sb.append( "}" );
1070 
1071         return sb.toString();
1072     }
1073 
1074 
1075     /**
1076      * {@inheritDoc}
1077      */
1078     public String dumpPage( String tabs )
1079     {
1080         StringBuilder sb = new StringBuilder();
1081 
1082         sb.append( tabs );
1083 
1084         if ( nbElems > 0 )
1085         {
1086             boolean isFirst = true;
1087 
1088             for ( int i = 0; i < nbElems; i++ )
1089             {
1090                 if ( isFirst )
1091                 {
1092                     isFirst = false;
1093                 }
1094                 else
1095                 {
1096                     sb.append( ", " );
1097                 }
1098 
1099                 sb.append( "<" ).append( keys[i] ).append( "," ).append( values[i] ).append( ">" );
1100             }
1101         }
1102 
1103         sb.append( "\n" );
1104 
1105         return sb.toString();
1106     }
1107 }