View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.directory.mavibot.btree.managed;
21  
22  
23  import java.io.Closeable;
24  import java.io.IOException;
25  import java.nio.ByteBuffer;
26  import java.nio.channels.FileChannel;
27  import java.util.Comparator;
28  import java.util.concurrent.ConcurrentLinkedQueue;
29  import java.util.concurrent.locks.ReentrantLock;
30  
31  import net.sf.ehcache.Cache;
32  import net.sf.ehcache.config.CacheConfiguration;
33  
34  import org.apache.directory.mavibot.btree.BTreeHeader;
35  import org.apache.directory.mavibot.btree.DeleteResult;
36  import org.apache.directory.mavibot.btree.InsertResult;
37  import org.apache.directory.mavibot.btree.ModifyResult;
38  import org.apache.directory.mavibot.btree.NotPresentResult;
39  import org.apache.directory.mavibot.btree.Page;
40  import org.apache.directory.mavibot.btree.ParentPos;
41  import org.apache.directory.mavibot.btree.RemoveResult;
42  import org.apache.directory.mavibot.btree.SplitResult;
43  import org.apache.directory.mavibot.btree.Transaction;
44  import org.apache.directory.mavibot.btree.Tuple;
45  import org.apache.directory.mavibot.btree.TupleCursor;
46  import org.apache.directory.mavibot.btree.ValueCursor;
47  import org.apache.directory.mavibot.btree.exception.KeyNotFoundException;
48  import org.apache.directory.mavibot.btree.serializer.ElementSerializer;
49  import org.slf4j.Logger;
50  import org.slf4j.LoggerFactory;
51  
52  
53  /**
54   * The B+Tree MVCC data structure.
55   * 
56   * @param <K> The type for the keys
57   * @param <V> The type for the stored values
58   *
59   * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
60   */
61  public class BTree<K, V> implements Closeable
62  {
63      /** The LoggerFactory used by this class */
64      protected static final Logger LOG = LoggerFactory.getLogger( BTree.class );
65  
66      /** The Header for a managed BTree */
67      private BTreeHeader btreeHeader;
68  
69      /** Default page size (number of entries per node) */
70      public static final int DEFAULT_PAGE_SIZE = 16;
71  
72      /** Default size of the buffer used to write data on disk. Around 1Mb */
73      public static final int DEFAULT_WRITE_BUFFER_SIZE = 4096 * 250;
74  
75      /** The default journal name */
76      public static final String DEFAULT_JOURNAL = "mavibot.log";
77  
78      /** The default data file suffix */
79      public static final String DATA_SUFFIX = ".db";
80  
81      /** The default journal file suffix */
82      public static final String JOURNAL_SUFFIX = ".log";
83  
84      /** The current rootPage */
85      protected volatile Page<K, V> rootPage;
86  
87      /** The list of read transactions being executed */
88      private ConcurrentLinkedQueue<Transaction<K, V>> readTransactions;
89  
90      /** The size of the buffer used to write data in disk */
91      private int writeBufferSize;
92  
93      /** The Key serializer used for this tree.*/
94      private ElementSerializer<K> keySerializer;
95  
96      /** The Value serializer used for this tree. */
97      private ElementSerializer<V> valueSerializer;
98  
99      /** The RecordManager if the BTree is managed */
100     private RecordManager recordManager;
101 
102     /** A lock used to protect the write operation against concurrent access */
103     private ReentrantLock writeLock;
104 
105     /** The thread responsible for the cleanup of timed out reads */
106     private Thread readTransactionsThread;
107 
108     /** Define a default delay for a read transaction. This is 10 seconds */
109     public static final long DEFAULT_READ_TIMEOUT = 10 * 1000L;
110 
111     /** The read transaction timeout */
112     private long readTimeOut = DEFAULT_READ_TIMEOUT;
113 
114     /** The cache associated with this BTree */
115     private Cache cache;
116 
117     /** The cache size, default to 1000 elements */
118     private int cacheSize = DEFAULT_CACHE_SIZE;
119 
120     /** The default number of pages to keep in memory */
121     private static final int DEFAULT_CACHE_SIZE = 1000;
122     
123     /** A flag indicating if this BTree is a Sub BTree */
124     private boolean isSubBtree = false;
125 
126     /** The number of stored Values before we switch to a BTree */
127     private static final int DEFAULT_VALUE_THRESHOLD_UP = 8;
128 
129     /** The number of stored Values before we switch back to an array */
130     private static final int DEFAULT_VALUE_THRESHOLD_LOW = 1;
131 
132     /** The configuration for the array <-> BTree switch */
133     /*No qualifier*/static int valueThresholdUp = DEFAULT_VALUE_THRESHOLD_UP;
134     /*No qualifier*/static int valueThresholdLow = DEFAULT_VALUE_THRESHOLD_LOW;
135 
136     /**
137      * Create a thread that is responsible of cleaning the transactions when
138      * they hit the timeout
139      */
140     private void createTransactionManager()
141     {
142         Runnable readTransactionTask = new Runnable()
143         {
144             public void run()
145             {
146                 try
147                 {
148                     Transaction<K, V> transaction = null;
149 
150                     while ( !Thread.currentThread().isInterrupted() )
151                     {
152                         long timeoutDate = System.currentTimeMillis() - readTimeOut;
153                         long t0 = System.currentTimeMillis();
154                         int nbTxns = 0;
155 
156                         // Loop on all the transactions from the queue
157                         while ( ( transaction = readTransactions.peek() ) != null )
158                         {
159                             nbTxns++;
160 
161                             if ( transaction.isClosed() )
162                             {
163                                 // The transaction is already closed, remove it from the queue
164                                 readTransactions.poll();
165                                 continue;
166                             }
167 
168                             // Check if the transaction has timed out
169                             if ( transaction.getCreationDate() < timeoutDate )
170                             {
171                                 transaction.close();
172                                 readTransactions.poll();
173                                 continue;
174                             }
175 
176                             // We need to stop now
177                             break;
178                         }
179 
180                         long t1 = System.currentTimeMillis();
181 
182                         if ( nbTxns > 0 )
183                         {
184                             System.out.println( "Processing old txn : " + nbTxns + ", " + ( t1 - t0 ) + "ms" );
185                         }
186 
187                         // Wait until we reach the timeout
188                         Thread.sleep( readTimeOut );
189                     }
190                 }
191                 catch ( InterruptedException ie )
192                 {
193                     //System.out.println( "Interrupted" );
194                 }
195                 catch ( Exception e )
196                 {
197                     throw new RuntimeException( e );
198                 }
199             }
200         };
201 
202         readTransactionsThread = new Thread( readTransactionTask );
203         readTransactionsThread.setDaemon( true );
204         readTransactionsThread.start();
205     }
206 
207 
208     /**
209      * Creates a new BTree, with no initialization. 
210      */
211     public BTree()
212     {
213         btreeHeader = new BTreeHeader();
214     }
215 
216 
217     /**
218      * Creates a new in-memory BTree using the BTreeConfiguration to initialize the 
219      * BTree
220      * 
221      * @param comparator The comparator to use
222      */
223     public BTree( BTreeConfiguration<K, V> configuration ) throws IOException
224     {
225         String name = configuration.getName();
226 
227         if ( name == null )
228         {
229             throw new IllegalArgumentException( "BTree name cannot be null" );
230         }
231 
232         btreeHeader = new BTreeHeader();
233         btreeHeader.setName( name );
234         btreeHeader.setPageSize( configuration.getPageSize() );
235         isSubBtree = configuration.isSubBtree();
236 
237         keySerializer = configuration.getKeySerializer();
238         btreeHeader.setKeySerializerFQCN( keySerializer.getClass().getName() );
239 
240         valueSerializer = configuration.getValueSerializer();
241         btreeHeader.setValueSerializerFQCN( valueSerializer.getClass().getName() );
242 
243         readTimeOut = configuration.getReadTimeOut();
244         writeBufferSize = configuration.getWriteBufferSize();
245         btreeHeader.setAllowDuplicates( configuration.isAllowDuplicates() );
246         cacheSize = configuration.getCacheSize();
247 
248         if ( keySerializer.getComparator() == null )
249         {
250             throw new IllegalArgumentException( "Comparator should not be null" );
251         }
252 
253         // Create the first root page, with revision 0L. It will be empty
254         // and increment the revision at the same time
255         rootPage = new Leaf<K, V>( this );
256         
257         if ( isSubBtree )
258         {
259             // The subBTree inherit its cache from its parent BTree
260             this.cache = configuration.getParentBTree().getCache();
261             this.writeLock = configuration.getParentBTree().getWriteLock();
262             readTransactions = new ConcurrentLinkedQueue<Transaction<K, V>>();
263         }
264 
265         // Now, initialize the BTree
266         init();
267     }
268 
269 
270     /**
271      * Creates a new in-memory BTree with a default page size and key/value serializers.
272      * 
273      * @param comparator The comparator to use
274      */
275     public BTree( String name, ElementSerializer<K> keySerializer, ElementSerializer<V> valueSerializer )
276         throws IOException
277     {
278         this( name, keySerializer, valueSerializer, false );
279     }
280 
281 
282     public BTree( String name, ElementSerializer<K> keySerializer, ElementSerializer<V> valueSerializer,
283         boolean allowDuplicates )
284         throws IOException
285     {
286         this( name, null, keySerializer, valueSerializer, DEFAULT_PAGE_SIZE, allowDuplicates, DEFAULT_CACHE_SIZE );
287     }
288 
289 
290     public BTree( String name, ElementSerializer<K> keySerializer, ElementSerializer<V> valueSerializer,
291         boolean allowDuplicates, int cacheSize )
292         throws IOException
293     {
294         this( name, null, keySerializer, valueSerializer, DEFAULT_PAGE_SIZE, allowDuplicates, cacheSize );
295     }
296 
297 
298     /**
299      * Creates a new in-memory BTree with a default page size and key/value serializers.
300      * 
301      * @param comparator The comparator to use
302      */
303     public BTree( String name, ElementSerializer<K> keySerializer, ElementSerializer<V> valueSerializer, int pageSize )
304         throws IOException
305     {
306         this( name, null, keySerializer, valueSerializer, pageSize );
307     }
308 
309 
310     /**
311      * Creates a new BTree with a default page size and a comparator, with an associated file.
312      * @param comparator The comparator to use
313      * @param serializer The serializer to use
314      */
315     public BTree( String name, String path, ElementSerializer<K> keySerializer, ElementSerializer<V> valueSerializer )
316         throws IOException
317     {
318         this( name, path, keySerializer, valueSerializer, DEFAULT_PAGE_SIZE );
319     }
320 
321 
322     /**
323      * 
324      * Creates a new instance of BTree with the given name and store it under the given dataDir if provided.
325      *
326      * @param name the name of the BTree
327      * @param dataDir the name of the data directory with absolute path
328      * @param keySerializer key serializer
329      * @param valueSerializer value serializer
330      * @param pageSize size of the page
331      * @throws IOException
332      */
333     public BTree( String name, String dataDir, ElementSerializer<K> keySerializer,
334         ElementSerializer<V> valueSerializer,
335         int pageSize )
336         throws IOException
337     {
338         this( name, dataDir, keySerializer, valueSerializer, pageSize, false, DEFAULT_CACHE_SIZE );
339     }
340 
341 
342     public BTree( String name, String dataDir, ElementSerializer<K> keySerializer,
343         ElementSerializer<V> valueSerializer,
344         int pageSize, boolean allowDuplicates )
345         throws IOException
346     {
347         this( name, dataDir, keySerializer, valueSerializer, pageSize, allowDuplicates, DEFAULT_CACHE_SIZE );
348     }
349 
350 
351     public BTree( String name, String dataDir, ElementSerializer<K> keySerializer,
352         ElementSerializer<V> valueSerializer,
353         int pageSize, boolean allowDuplicates, int cacheSize )
354         throws IOException
355     {
356         btreeHeader = new BTreeHeader();
357         btreeHeader.setName( name );
358 
359         setPageSize( pageSize );
360         writeBufferSize = DEFAULT_WRITE_BUFFER_SIZE;
361 
362         this.cacheSize = cacheSize;
363 
364         this.keySerializer = keySerializer;
365 
366         btreeHeader.setKeySerializerFQCN( keySerializer.getClass().getName() );
367 
368         this.valueSerializer = valueSerializer;
369 
370         btreeHeader.setValueSerializerFQCN( valueSerializer.getClass().getName() );
371 
372         btreeHeader.setAllowDuplicates( allowDuplicates );
373 
374         // Create the first root page, with revision 0L. It will be empty
375         // and increment the revision at the same time
376         rootPage = new Leaf<K, V>( this );
377 
378         // Now, call the init() method
379         init();
380     }
381 
382 
383     /**
384      * Initialize the BTree.
385      * 
386      * @throws IOException If we get some exception while initializing the BTree
387      */
388     public void init() throws IOException
389     {
390         if ( !isSubBtree )
391         {
392             // This is not a subBtree, we have to initialize the cache
393 
394             // Create the queue containing the pending read transactions
395             readTransactions = new ConcurrentLinkedQueue<Transaction<K, V>>();
396     
397             writeLock = new ReentrantLock();
398     
399             // Initialize the caches
400             CacheConfiguration cacheConfiguration = new CacheConfiguration();
401             cacheConfiguration.setName( "pages" );
402             cacheConfiguration.setEternal( true );
403             cacheConfiguration.setOverflowToDisk( false );
404             cacheConfiguration.setCacheLoaderTimeoutMillis( 0 );
405             cacheConfiguration.setMaxElementsInMemory( cacheSize );
406             cacheConfiguration.setMemoryStoreEvictionPolicy( "LRU" );
407     
408             cache = new Cache( cacheConfiguration );
409             cache.initialise();
410         }
411 
412         // Initialize the txnManager thread
413         //FIXME we should NOT create a new transaction manager thread for each BTree
414         //createTransactionManager();
415     }
416 
417 
418     /**
419      * Return the cache we use in this BTree
420      */
421     /* No qualifier */Cache getCache()
422     {
423         return cache;
424     }
425 
426 
427     /**
428      * Return the cache we use in this BTree
429      */
430     /* No qualifier */ReentrantLock getWriteLock()
431     {
432         return writeLock;
433     }
434 
435 
436     /**
437      * Return the cache we use in this BTree
438      */
439     /* No qualifier */ConcurrentLinkedQueue<Transaction<K, V>> getReadTransactions()
440     {
441         return readTransactions;
442     }
443 
444 
445     /**
446      * Close the BTree, cleaning up all the data structure
447      */
448     public void close() throws IOException
449     {
450         // Stop the readTransaction thread
451         // readTransactionsThread.interrupt();
452         // readTransactions.clear();
453 
454         rootPage = null;
455     }
456 
457 
458     /**
459      * @return the btreeOffset
460      */
461     /* No qualifier*/long getBtreeOffset()
462     {
463         return btreeHeader.getBTreeOffset();
464     }
465 
466 
467     /**
468      * @param btreeOffset the btreeOffset to set
469      */
470     /* No qualifier*/void setBtreeOffset( long btreeOffset )
471     {
472         btreeHeader.setBTreeOffset( btreeOffset );
473     }
474 
475 
476     /**
477      * @return the rootPageOffset
478      */
479     /* No qualifier*/long getRootPageOffset()
480     {
481         return btreeHeader.getRootPageOffset();
482     }
483 
484 
485     /**
486      * @param rootPageOffset the rootPageOffset to set
487      */
488     /* No qualifier*/void setRootPageOffset( long rootPageOffset )
489     {
490         btreeHeader.setRootPageOffset( rootPageOffset );
491     }
492 
493 
494     /**
495      * @return the nextBTreeOffset
496      */
497     /* No qualifier*/long getNextBTreeOffset()
498     {
499         return btreeHeader.getNextBTreeOffset();
500     }
501 
502 
503     /**
504      * @param nextBTreeOffset the nextBTreeOffset to set
505      */
506     /* No qualifier*/void setNextBTreeOffset( long nextBTreeOffset )
507     {
508         btreeHeader.setNextBTreeOffset( nextBTreeOffset );
509     }
510 
511 
512     /**
513      * Gets the number which is a power of 2 immediately above the given positive number.
514      */
515     private int getPowerOf2( int size )
516     {
517         int newSize = --size;
518         newSize |= newSize >> 1;
519         newSize |= newSize >> 2;
520         newSize |= newSize >> 4;
521         newSize |= newSize >> 8;
522         newSize |= newSize >> 16;
523         newSize++;
524 
525         return newSize;
526     }
527 
528 
529     /**
530      * Set the maximum number of elements we can store in a page. This must be a
531      * number greater than 1, and a power of 2. The default page size is 16.
532      * <br/>
533      * If the provided size is below 2, we will default to DEFAULT_PAGE_SIZE.<br/>
534      * If the provided size is not a power of 2, we will select the closest power of 2
535      * higher than the given number<br/>
536      * 
537      * @param pageSize The requested page size
538      */
539     public void setPageSize( int pageSize )
540     {
541         if ( pageSize <= 2 )
542         {
543             btreeHeader.setPageSize( DEFAULT_PAGE_SIZE );
544         }
545         else
546         {
547             btreeHeader.setPageSize( getPowerOf2( pageSize ) );
548         }
549     }
550 
551 
552     /**
553      * Set the new root page for this tree. Used for debug purpose only. The revision
554      * will always be 0;
555      * 
556      * @param root the new root page.
557      */
558     /* No qualifier */void setRoot( Page<K, V> root )
559     {
560         rootPage = root;
561     }
562 
563 
564     /**
565      * Gets the RecordManager for a managed BTree
566      * 
567      * @return The recordManager if the BTree is managed
568      */
569     /* No qualifier */RecordManager getRecordManager()
570     {
571         return recordManager;
572     }
573 
574 
575     /**
576      * Inject a RecordManager for a managed BTree
577      * 
578      * @param recordManager The injected RecordManager
579      */
580     /* No qualifier */void setRecordManager( RecordManager recordManager )
581     {
582         this.recordManager = recordManager;
583     }
584 
585 
586     /**
587      * @return the pageSize
588      */
589     public int getPageSize()
590     {
591         return btreeHeader.getPageSize();
592     }
593 
594 
595     /**
596      * Generates a new revision number. It's only used by the Page instances.
597      * 
598      * @return a new incremental revision number
599      */
600     /** No qualifier */
601     long generateRevision()
602     {
603         return btreeHeader.incrementRevision();
604     }
605 
606 
607     /**
608      * Insert an entry in the BTree.
609      * <p>
610      * We will replace the value if the provided key already exists in the
611      * btree.
612      *
613      * @param key Inserted key
614      * @param value Inserted value
615      * @return Existing value, if any.
616      * @throws IOException TODO
617      */
618     public V insert( K key, V value ) throws IOException
619     {
620         long revision = generateRevision();
621 
622         V existingValue = null;
623 
624         try
625         {
626             // Commented atm, we will have to play around the idea of transactions later
627             writeLock.lock();
628 
629             InsertResult<K, V> result = insert( key, value, revision );
630 
631             if ( result instanceof ModifyResult )
632             {
633                 existingValue = ( ( ModifyResult<K, V> ) result ).getModifiedValue();
634             }
635         }
636         finally
637         {
638             // See above
639             writeLock.unlock();
640         }
641 
642         return existingValue;
643     }
644 
645 
646     /**
647      * Delete the entry which key is given as a parameter. If the entry exists, it will
648      * be removed from the tree, the old tuple will be returned. Otherwise, null is returned.
649      * 
650      * @param key The key for the entry we try to remove
651      * @return A Tuple<K, V> containing the removed entry, or null if it's not found.
652      */
653     public Tuple<K, V> delete( K key ) throws IOException
654     {
655         if ( key == null )
656         {
657             throw new IllegalArgumentException( "Key must not be null" );
658         }
659 
660         long revision = generateRevision();
661 
662         Tuple<K, V> deleted = delete( key, revision );
663 
664         return deleted;
665     }
666 
667 
668     /**
669      * Delete the value from an entry associated with the given key. If the value
670      * If the value is present, it will be deleted first, later if there are no other 
671      * values associated with this key(which can happen when duplicates are enabled), 
672      * we will remove the key from the tree.
673      * 
674      * @param key The key for the entry we try to remove
675      * @param value The value to delete (can be null)
676      * @return A Tuple<K, V> containing the removed entry, or null if it's not found.
677      */
678     public Tuple<K, V> delete( K key, V value ) throws IOException
679     {
680         if ( key == null )
681         {
682             throw new IllegalArgumentException( "Key must not be null" );
683         }
684 
685         if ( value == null )
686         {
687             throw new IllegalArgumentException( "Value must not be null" );
688         }
689 
690         long revision = generateRevision();
691 
692         Tuple<K, V> deleted = delete( key, value, revision );
693 
694         return deleted;
695     }
696 
697 
698     /**
699      * Delete the entry which key is given as a parameter. If the entry exists, it will
700      * be removed from the tree, the old tuple will be returned. Otherwise, null is returned.
701      * 
702      * @param key The key for the entry we try to remove
703      * @return A Tuple<K, V> containing the removed entry, or null if it's not found.
704      */
705     private Tuple<K, V> delete( K key, long revision ) throws IOException
706     {
707         return delete( key, null, revision );
708     }
709 
710 
711     /**
712      * 
713      * Deletes the given <key,value> pair if both key and value match. If the given value is null
714      * and there is no null value associated with the given key then the entry with the given key
715      * will be removed.
716      *
717      * @param key The key to be removed 
718      * @param value The value to be removed (can be null, and when no null value exists the key will be removed irrespective of the value)
719      * @param revision The revision to be associated with this operation
720      * @return
721      * @throws IOException
722      */
723     private Tuple<K, V> delete( K key, V value, long revision ) throws IOException
724     {
725         writeLock.lock();
726 
727         try
728         {
729             // If the key exists, the existing value will be replaced. We store it
730             // to return it to the caller.
731             Tuple<K, V> tuple = null;
732 
733             // Try to delete the entry starting from the root page. Here, the root
734             // page may be either a Node or a Leaf
735             DeleteResult<K, V> result = rootPage.delete( revision, key, value, null, -1 );
736 
737             if ( result instanceof NotPresentResult )
738             {
739                 // Key not found.
740                 return null;
741             }
742 
743             // Keep the oldRootPage so that we can later access it
744             Page<K, V> oldRootPage = rootPage;
745 
746             if ( result instanceof RemoveResult )
747             {
748                 // The element was found, and removed
749                 RemoveResult<K, V> removeResult = ( RemoveResult<K, V> ) result;
750 
751                 Page<K, V> modifiedPage = removeResult.getModifiedPage();
752 
753                 // Write the modified page on disk
754                 // Note that we don't use the holder, the new root page will
755                 // remain in memory.
756                 PageHolder<K, V> holder = recordManager.writePage( this, modifiedPage,
757                     revision );
758 
759                 // Store the offset on disk in the page in memory
760                 ( ( AbstractPage<K, V> ) modifiedPage ).setOffset( ( ( PageHolder<K, V> ) holder )
761                     .getOffset() );
762 
763                 // Store the last offset on disk in the page in memory
764                 ( ( AbstractPage<K, V> ) modifiedPage )
765                     .setLastOffset( ( ( PageHolder<K, V> ) holder )
766                         .getLastOffset() );
767 
768                 // This is a new root
769                 rootPage = modifiedPage;
770                 tuple = removeResult.getRemovedElement();
771             }
772 
773             // Decrease the number of elements in the current tree if the deletion is successful
774             if ( tuple != null )
775             {
776                 btreeHeader.decrementNbElems();
777 
778                 // We have to update the rootPage on disk
779                 // Update the BTree header now
780                 recordManager.updateBtreeHeader( this, ( ( AbstractPage<K, V> ) rootPage ).getOffset() );
781             }
782 
783             recordManager.addFreePages( this, result.getCopiedPages() );
784 
785             // Store the created rootPage into the revision BTree, this will be stored in RecordManager only if revisions are set to keep
786             recordManager.storeRootPage( this, rootPage );
787 
788             // Return the value we have found if it was modified
789             return tuple;
790         }
791         finally
792         {
793             // See above
794             writeLock.unlock();
795         }
796     }
797 
798 
799     /**
800      * Find a value in the tree, given its key. If the key is not found,
801      * it will throw a KeyNotFoundException. <br/>
802      * Note that we can get a null value stored, or many values.
803      * 
804      * @param key The key we are looking at
805      * @return The found value, or null if the key is not present in the tree
806      * @throws KeyNotFoundException If the key is not found in the BTree
807      * @throws IOException TODO
808      */
809     public V get( K key ) throws IOException, KeyNotFoundException
810     {
811         return rootPage.get( key );
812     }
813 
814 
815     /**
816      * @see Page#getValues(Object)
817      */
818     public ValueCursor<V> getValues( K key ) throws IOException, KeyNotFoundException
819     {
820         return rootPage.getValues( key );
821     }
822 
823 
824     /**
825      * Find a value in the tree, given its key, at a specific revision. If the key is not found,
826      * it will throw a KeyNotFoundException. <br/>
827      * Note that we can get a null value stored, or many values.
828      * 
829      * @param revision The revision for which we want to find a key
830      * @param key The key we are looking at
831      * @return The found value, or null if the key is not present in the tree
832      * @throws KeyNotFoundException If the key is not found in the BTree
833      * @throws IOException If there was an issue while fetching data from the disk
834      */
835     public V get( long revision, K key ) throws IOException, KeyNotFoundException
836     {
837         // Fetch the root page for this revision
838         Page<K, V> revisionRootPage = getRootPage( revision );
839 
840         return revisionRootPage.get( key );
841     }
842 
843 
844     /**
845      * Checks if the given key exists.
846      *  
847      * @param key The key we are looking at
848      * @return true if the key is present, false otherwise
849      * @throws IOException If we have an error while trying to access the page
850      */
851     public boolean hasKey( K key ) throws IOException
852     {
853         if ( key == null )
854         {
855             return false;
856         }
857 
858         return rootPage.hasKey( key );
859     }
860 
861 
862     /**
863      * Checks if the given key exists for a given revision.
864      *  
865      * @param revision The revision for which we want to find a key
866      * @param key The key we are looking at
867      * @return true if the key is present, false otherwise
868      * @throws IOException If we have an error while trying to access the page
869      * @throws KeyNotFoundException If the key is not found in the BTree
870      */
871     public boolean hasKey( long revision, K key ) throws IOException, KeyNotFoundException
872     {
873         if ( key == null )
874         {
875             return false;
876         }
877 
878         // Fetch the root page for this revision
879         Page<K, V> revisionRootPage = getRootPage( revision );
880 
881         return revisionRootPage.hasKey( key );
882     }
883 
884 
885     /**
886      * Checks if the BTree contains the given key with the given value.
887      * 
888      * @param key The key we are looking for
889      * @param value The value associated with the given key
890      * @return true if the key and value are associated with each other, false otherwise
891      */
892     public boolean contains( K key, V value ) throws IOException
893     {
894         return rootPage.contains( key, value );
895     }
896 
897 
898     /**
899      * Checks if the BTree contains the given key with the given value for a given revision
900      * 
901      * @param revision The revision we would like to browse
902      * @param key The key we are looking for
903      * @param value The value associated with the given key
904      * @return true if the key and value are associated with each other, false otherwise
905      * @throws KeyNotFoundException If the key is not found in the BTree
906      */
907     public boolean contains( long revision, K key, V value ) throws IOException, KeyNotFoundException
908     {
909         // Fetch the root page for this revision
910         Page<K, V> revisionRootPage = getRootPage( revision );
911 
912         return revisionRootPage.contains( key, value );
913     }
914 
915 
916     /**
917      * Creates a cursor starting at the beginning of the tree
918      * 
919      * @return A cursor on the btree
920      * @throws IOException
921      */
922     public TupleCursor<K, V> browse() throws IOException
923     {
924         Transaction<K, V> transaction = beginReadTransaction();
925 
926         // Fetch the root page for this revision
927         ParentPos<K, V>[] stack = new ParentPos[32];
928         TupleCursor<K, V> cursor = rootPage.browse( transaction, stack, 0 );
929         
930         // Set the position before the first element
931         cursor.beforeFirst();
932 
933         return cursor;
934     }
935 
936 
937     /**
938      * Creates a cursor starting at the beginning of the tree, for a given revision
939      * 
940      * @param revision The revision we would like to browse
941      * @return A cursor on the btree
942      * @throws IOException If we had an issue while fetching data from the disk
943      * @throws KeyNotFoundException If the key is not found in the BTree
944      */
945     public TupleCursor<K, V> browse( long revision ) throws IOException, KeyNotFoundException
946     {
947         Transaction<K, V> transaction = beginReadTransaction();
948 
949         // Fetch the root page for this revision
950         Page<K, V> revisionRootPage = getRootPage( revision );
951 
952         // And get the cursor
953         TupleCursor<K, V> cursor = revisionRootPage.browse( transaction, new ParentPos[32], 0 );
954 
955         return cursor;
956     }
957 
958 
959     /**
960      * Creates a cursor starting on the given key
961      * 
962      * @param key The key which is the starting point. If the key is not found,
963      * then the cursor will always return null.
964      * @return A cursor on the btree
965      * @throws IOException
966      */
967     public TupleCursor<K, V> browseFrom( K key ) throws IOException
968     {
969         Transaction<K, V> transaction = beginReadTransaction();
970 
971         // Fetch the root page for this revision
972         @SuppressWarnings("unchecked")
973         ParentPos<K, V>[] stack = new ParentPos[32];
974         TupleCursor<K, V> cursor = rootPage.browse( key, transaction, stack, 0 );
975 
976         return cursor;
977     }
978 
979 
980     /**
981      * Creates a cursor starting on the given key at the given revision
982      * 
983      * @param The revision we are looking for
984      * @param key The key which is the starting point. If the key is not found,
985      * then the cursor will always return null.
986      * @return A cursor on the btree
987      * @throws IOException If wxe had an issue reading the BTree from disk
988      * @throws KeyNotFoundException  If we can't find a rootPage for this revision
989      */
990     public TupleCursor<K, V> browseFrom( long revision, K key ) throws IOException, KeyNotFoundException
991     {
992         Transaction<K, V> transaction = beginReadTransaction();
993 
994         // Fetch the rootPage for this revision
995         Page<K, V> revisionRootPage = getRootPage( revision );
996 
997         // And get the cursor
998         TupleCursor<K, V> cursor = revisionRootPage.browse( key, transaction, new ParentPos[32], 0 );
999 
1000         return cursor;
1001     }
1002 
1003 
1004     /**
1005      * Insert an entry in the BTree.
1006      * <p>
1007      * We will replace the value if the provided key already exists in the
1008      * btree.
1009      * <p>
1010      * The revision number is the revision to use to insert the data.
1011      *
1012      * @param key Inserted key
1013      * @param value Inserted value
1014      * @param revision The revision to use
1015      * @return an instance of the InsertResult.
1016      */
1017     /*No qualifier*/InsertResult<K, V> insert( K key, V value, long revision ) throws IOException
1018     {
1019         if ( key == null )
1020         {
1021             throw new IllegalArgumentException( "Key must not be null" );
1022         }
1023 
1024         // If the key exists, the existing value will be replaced. We store it
1025         // to return it to the caller.
1026         V modifiedValue = null;
1027 
1028         // Try to insert the new value in the tree at the right place,
1029         // starting from the root page. Here, the root page may be either
1030         // a Node or a Leaf
1031         InsertResult<K, V> result = rootPage.insert( revision, key, value );
1032 
1033         if ( result instanceof ModifyResult )
1034         {
1035             ModifyResult<K, V> modifyResult = ( ( ModifyResult<K, V> ) result );
1036 
1037             Page<K, V> modifiedPage = modifyResult.getModifiedPage();
1038 
1039             // Write the modified page on disk
1040             // Note that we don't use the holder, the new root page will
1041             // remain in memory.
1042             PageHolder<K, V> holder = recordManager.writePage( this, modifiedPage,
1043                 revision );
1044             
1045             // The root has just been modified, we haven't split it
1046             // Get it and make it the current root page
1047             rootPage = modifiedPage;
1048 
1049             modifiedValue = modifyResult.getModifiedValue();
1050         }
1051         else
1052         {
1053             // We have split the old root, create a new one containing
1054             // only the pivotal we got back
1055             SplitResult<K, V> splitResult = ( ( SplitResult<K, V> ) result );
1056 
1057             K pivot = splitResult.getPivot();
1058             Page<K, V> leftPage = splitResult.getLeftPage();
1059             Page<K, V> rightPage = splitResult.getRightPage();
1060             Page<K, V> newRootPage = null;
1061 
1062             // If the BTree is managed, we have to write the two pages that were created
1063             // and to keep a track of the two offsets for the upper node
1064             PageHolder<K, V> holderLeft = recordManager.writePage( this,
1065                 leftPage, revision );
1066 
1067             PageHolder<K, V> holderRight = recordManager.writePage( this,
1068                 rightPage, revision );
1069 
1070             // Create the new rootPage
1071             newRootPage = new Node<K, V>( this, revision, pivot, holderLeft, holderRight );
1072 
1073             // If the BTree is managed, we now have to write the page on disk
1074             // and to add this page to the list of modified pages
1075             PageHolder<K, V> holder = recordManager
1076                 .writePage( this, newRootPage, revision );
1077 
1078             rootPage = newRootPage;
1079         }
1080 
1081         // Increase the number of element in the current tree if the insertion is successful
1082         // and does not replace an element
1083         if ( modifiedValue == null )
1084         {
1085             btreeHeader.incrementNbElems();
1086         }
1087 
1088         // If the BTree is managed, we have to update the rootPage on disk
1089         // Update the BTree header now
1090         recordManager.updateBtreeHeader( this, ( ( AbstractPage<K, V> ) rootPage ).getOffset() );
1091 
1092         // Moved the free pages into the list of free pages
1093         recordManager.addFreePages( this, result.getCopiedPages() );
1094 
1095         // Store the created rootPage into the revision BTree, this will be stored in RecordManager only if revisions are set to keep
1096         recordManager.storeRootPage( this, rootPage );
1097 
1098         // Return the value we have found if it was modified
1099         return result;
1100     }
1101 
1102 
1103     /**
1104      * Starts a Read Only transaction. If the transaction is not closed, it will be 
1105      * automatically closed after the timeout
1106      * @return The created transaction
1107      */
1108     private Transaction<K, V> beginReadTransaction()
1109     {
1110         Transaction<K, V> readTransaction = new Transaction<K, V>( rootPage, btreeHeader.getRevision() - 1,
1111             System.currentTimeMillis() );
1112 
1113         readTransactions.add( readTransaction );
1114 
1115         return readTransaction;
1116     }
1117 
1118 
1119     /**
1120      * @return the comparator
1121      */
1122     public Comparator<K> getComparator()
1123     {
1124         return keySerializer.getComparator();
1125     }
1126 
1127 
1128     /**
1129      * @param keySerializer the Key serializer to set
1130      */
1131     public void setKeySerializer( ElementSerializer<K> keySerializer )
1132     {
1133         this.keySerializer = keySerializer;
1134         btreeHeader.setKeySerializerFQCN( keySerializer.getClass().getName() );
1135     }
1136 
1137 
1138     /**
1139      * @param valueSerializer the Value serializer to set
1140      */
1141     public void setValueSerializer( ElementSerializer<V> valueSerializer )
1142     {
1143         this.valueSerializer = valueSerializer;
1144         btreeHeader.setValueSerializerFQCN( valueSerializer.getClass().getName() );
1145     }
1146 
1147 
1148     /**
1149      * Write the data in the ByteBuffer, and eventually on disk if needed.
1150      * 
1151      * @param channel The channel we want to write to
1152      * @param bb The ByteBuffer we want to feed
1153      * @param buffer The data to inject
1154      * @throws IOException If the write failed
1155      */
1156     private void writeBuffer( FileChannel channel, ByteBuffer bb, byte[] buffer ) throws IOException
1157     {
1158         int size = buffer.length;
1159         int pos = 0;
1160 
1161         // Loop until we have written all the data
1162         do
1163         {
1164             if ( bb.remaining() >= size )
1165             {
1166                 // No flush, as the ByteBuffer is big enough
1167                 bb.put( buffer, pos, size );
1168                 size = 0;
1169             }
1170             else
1171             {
1172                 // Flush the data on disk, reinitialize the ByteBuffer
1173                 int len = bb.remaining();
1174                 size -= len;
1175                 bb.put( buffer, pos, len );
1176                 pos += len;
1177 
1178                 bb.flip();
1179 
1180                 channel.write( bb );
1181 
1182                 bb.clear();
1183             }
1184         }
1185         while ( size > 0 );
1186     }
1187 
1188 
1189     /**
1190      * Get the rootPzge associated to a give revision.
1191      * 
1192      * @param revision The revision we are looking for
1193      * @return The rootPage associated to this revision
1194      * @throws IOException If we had an issue while accessing the underlying file
1195      * @throws KeyNotFoundException If the revision does not exist for this Btree
1196      */
1197     private Page<K, V> getRootPage( long revision ) throws IOException, KeyNotFoundException
1198     {
1199         return recordManager.getRootPage( this, revision );
1200     }
1201 
1202 
1203     /**
1204      * Flush the latest revision to disk. We will replace the current file by the new one, as
1205      * we flush in a temporary file.
1206      */
1207     public void flush() throws IOException
1208     {
1209     }
1210 
1211 
1212     /**
1213      * @return the readTimeOut
1214      */
1215     public long getReadTimeOut()
1216     {
1217         return readTimeOut;
1218     }
1219 
1220 
1221     /**
1222      * @param readTimeOut the readTimeOut to set
1223      */
1224     public void setReadTimeOut( long readTimeOut )
1225     {
1226         this.readTimeOut = readTimeOut;
1227     }
1228 
1229 
1230     /**
1231      * @return the name
1232      */
1233     public String getName()
1234     {
1235         return btreeHeader.getName();
1236     }
1237 
1238 
1239     /**
1240      * @param name the name to set
1241      */
1242     public void setName( String name )
1243     {
1244         btreeHeader.setName( name );
1245     }
1246 
1247 
1248     /**
1249      * @return the writeBufferSize
1250      */
1251     public int getWriteBufferSize()
1252     {
1253         return writeBufferSize;
1254     }
1255 
1256 
1257     /**
1258      * @param writeBufferSize the writeBufferSize to set
1259      */
1260     public void setWriteBufferSize( int writeBufferSize )
1261     {
1262         this.writeBufferSize = writeBufferSize;
1263     }
1264 
1265 
1266     /**
1267      * Create a ValueHolder depending on the kind of holder we want.
1268      * 
1269      * @param value The value to store
1270      * @return The value holder
1271      */
1272     @SuppressWarnings("unchecked")
1273     /* no qualifier */ValueHolder<V> createValueHolder( V value )
1274     {
1275         return new ValueHolder<V>( this, value );
1276     }
1277 
1278 
1279     /**
1280      * Create a ValueHolder depending on the kind of holder we want.
1281      * 
1282      * @param value The value to store
1283      * @return The value holder
1284      */
1285     /* no qualifier */PageHolder<K, V> createPageHolder( Page<K, V> value )
1286     {
1287         return new PageHolder<K, V>( this, value,
1288             value.getOffset(), value.getLastOffset() );
1289     }
1290 
1291 
1292     /**
1293      * @return the keySerializer
1294      */
1295     public ElementSerializer<K> getKeySerializer()
1296     {
1297         return keySerializer;
1298     }
1299 
1300 
1301     /**
1302      * @return the keySerializer FQCN
1303      */
1304     public String getKeySerializerFQCN()
1305     {
1306         return btreeHeader.getKeySerializerFQCN();
1307     }
1308 
1309 
1310     /**
1311      * @return the valueSerializer
1312      */
1313     public ElementSerializer<V> getValueSerializer()
1314     {
1315         return valueSerializer;
1316     }
1317 
1318 
1319     /**
1320      * @return the valueSerializer FQCN
1321      */
1322     public String getValueSerializerFQCN()
1323     {
1324         return btreeHeader.getValueSerializerFQCN();
1325     }
1326 
1327 
1328     /** 
1329      * @return The current BTree revision
1330      */
1331     public long getRevision()
1332     {
1333         return btreeHeader.getRevision();
1334     }
1335 
1336 
1337     /**
1338      * @param revision the revision to set
1339      */
1340     /* No qualifier */void setRevision( long revision )
1341     {
1342         btreeHeader.setRevision( revision );
1343     }
1344 
1345 
1346     /** 
1347      * @return The current number of elements in the BTree
1348      */
1349     public long getNbElems()
1350     {
1351         return btreeHeader.getNbElems();
1352     }
1353 
1354 
1355     /**
1356      * @param nbElems the nbElems to set
1357      */
1358     /* No qualifier */void setNbElems( long nbElems )
1359     {
1360         btreeHeader.setNbElems( nbElems );
1361     }
1362 
1363 
1364     /**
1365      * @return true if this BTree allow duplicate values
1366      */
1367     public boolean isAllowDuplicates()
1368     {
1369         return btreeHeader.isAllowDuplicates();
1370     }
1371 
1372 
1373     /* No qualifier */void setAllowDuplicates( boolean allowDuplicates )
1374     {
1375         btreeHeader.setAllowDuplicates( allowDuplicates );
1376     }
1377 
1378 
1379     /**
1380      * @see Object#toString()
1381      */
1382     public String toString()
1383     {
1384         StringBuilder sb = new StringBuilder();
1385 
1386         sb.append( "Managed BTree" );
1387         sb.append( "[" ).append( btreeHeader.getName() ).append( "]" );
1388         sb.append( "( pageSize:" ).append( btreeHeader.getPageSize() );
1389 
1390         if ( rootPage != null )
1391         {
1392             sb.append( ", nbEntries:" ).append( btreeHeader.getNbElems() );
1393         }
1394         else
1395         {
1396             sb.append( ", nbEntries:" ).append( 0 );
1397         }
1398 
1399         sb.append( ", comparator:" );
1400 
1401         if ( keySerializer.getComparator() == null )
1402         {
1403             sb.append( "null" );
1404         }
1405         else
1406         {
1407             sb.append( keySerializer.getComparator().getClass().getSimpleName() );
1408         }
1409 
1410         sb.append( ", DuplicatesAllowed: " ).append( btreeHeader.isAllowDuplicates() );
1411 
1412         sb.append( ") : \n" );
1413         sb.append( rootPage.dumpPage( "" ) );
1414 
1415         return sb.toString();
1416     }
1417 }