View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.directory.mavibot.btree.managed;
21  
22  
23  import java.io.File;
24  import java.io.IOException;
25  import java.io.RandomAccessFile;
26  import java.nio.ByteBuffer;
27  import java.nio.channels.FileChannel;
28  import java.util.ArrayList;
29  import java.util.HashSet;
30  import java.util.LinkedHashMap;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.Set;
34  import java.util.concurrent.atomic.AtomicLong;
35  
36  import org.apache.directory.mavibot.btree.Page;
37  import org.apache.directory.mavibot.btree.exception.BTreeAlreadyManagedException;
38  import org.apache.directory.mavibot.btree.exception.EndOfFileExceededException;
39  import org.apache.directory.mavibot.btree.exception.KeyNotFoundException;
40  import org.apache.directory.mavibot.btree.serializer.ElementSerializer;
41  import org.apache.directory.mavibot.btree.serializer.IntSerializer;
42  import org.apache.directory.mavibot.btree.serializer.LongArraySerializer;
43  import org.apache.directory.mavibot.btree.serializer.LongSerializer;
44  import org.apache.directory.mavibot.btree.util.Strings;
45  import org.slf4j.Logger;
46  import org.slf4j.LoggerFactory;
47  
48  
49  /**
50   * The RecordManager is used to manage the file in which we will store the BTrees. 
51   * A RecordManager will manage more than one BTree.<br/>
52   * 
53   * It stores data in fixed size pages (default size is 512 bytes), which may be linked one to 
54   * the other if the data we want to store is too big for a page.
55   *  
56   * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
57   */
58  public class RecordManager
59  {
60      /** The LoggerFactory used by this class */
61      protected static final Logger LOG = LoggerFactory.getLogger( RecordManager.class );
62  
63      /** A dedicated logger for the check */
64      protected static final Logger LOG_CHECK = LoggerFactory.getLogger( "RM_CHECK" );
65  
66      /** The associated file */
67      private File file;
68  
69      /** The channel used to read and write data */
70      private FileChannel fileChannel;
71  
72      /** The number of stored BTrees */
73      private int nbBtree;
74  
75      /** The first and last free page */
76      private long firstFreePage;
77      private long lastFreePage;
78  
79      /** The list of available free pages */
80      List<PageIO> freePages = new ArrayList<PageIO>();
81  
82      /** A counter to track the number of free pages */
83      public AtomicLong nbFreedPages = new AtomicLong( 0 );
84      public AtomicLong nbCreatedPages = new AtomicLong( 0 );
85      public AtomicLong nbReusedPages = new AtomicLong( 0 );
86      public AtomicLong nbUpdateRMHeader = new AtomicLong( 0 );
87      public AtomicLong nbUpdateBTreeHeader = new AtomicLong( 0 );
88      public AtomicLong nbUpdatePageIOs = new AtomicLong( 0 );
89  
90      /** The offset of the end of the file */
91      private long endOfFileOffset;
92  
93      /** 
94       * A Btree used to manage the page that has been copied in a new version.
95       * Those pages can be reclaimed when the associated version is dead. 
96       **/
97      private BTree<RevisionName, long[]> copiedPageBTree;
98  
99      /** A BTree used to store all the valid revisions for all the stored BTrees */
100     private BTree<RevisionName, Long> revisionBTree;
101 
102     /** A constant for an offset on a non existing page */
103     private static final long NO_PAGE = -1L;
104 
105     /** The number of stored BTrees */
106     private static final int NB_TREE_SIZE = 4;
107 
108     /** The header page size */
109     private static final int PAGE_SIZE = 4;
110 
111     /** The size of the data size in a page */
112     private static final int DATA_SIZE = 4;
113 
114     /** The size of the link to next page */
115     private static final int LINK_SIZE = 8;
116 
117     /** Some constants */
118     private static final int BYTE_SIZE = 1;
119     private static final int INT_SIZE = 4;
120     private static final int LONG_SIZE = 8;
121 
122     /** The size of the link to the first and last free page */
123     private static final int FIRST_FREE_PAGE_SIZE = 8;
124     private static final int LAST_FREE_PAGE_SIZE = 8;
125 
126     /** The default page size */
127     private static final int DEFAULT_PAGE_SIZE = 512;
128 
129     /** The header size */
130     private static int HEADER_SIZE = DEFAULT_PAGE_SIZE;
131 
132     /** A global buffer used to store the header */
133     private static ByteBuffer HEADER_BUFFER;
134 
135     /** A static buffer used to store the header */
136     private static byte[] HEADER_BYTES;
137 
138     /** The length of an Offset, as a nagative value */
139     private static byte[] LONG_LENGTH = new byte[]
140         { ( byte ) 0xFF, ( byte ) 0xFF, ( byte ) 0xFF, ( byte ) 0xF8 };
141 
142     /** The RecordManager underlying page size. */
143     private int pageSize = DEFAULT_PAGE_SIZE;
144 
145     /** The set of managed BTrees */
146     private Map<String, BTree<Object, Object>> managedBTrees;
147 
148     /** The offset on the last added BTree */
149     private long lastAddedBTreeOffset = NO_PAGE;
150 
151     /** The default file name */
152     private static final String DEFAULT_FILE_NAME = "mavibot.db";
153 
154     /** A deserializer for Offsets */
155     private static final LongSerializer OFFSET_SERIALIZER = new LongSerializer();
156 
157     private static final String REVISION_BTREE_NAME = "_revisionBTree_";
158 
159     private static final String COPIED_PAGE_BTREE_NAME = "_copiedPageBTree_";
160 
161     /** A flag set to true if we want to keep old revisions */
162     private boolean keepRevisions;
163     
164     /** A flag used by internal btrees */
165     public static final boolean INTERNAL_BTREE = true;
166     
167     /** A flag used by internal btrees */
168     public static final boolean NORMAL_BTREE = false;
169 
170 
171     /**
172      * Create a Record manager which will either create the underlying file
173      * or load an existing one. If a folder is provided, then we will create
174      * a file with a default name : mavibot.db
175      * 
176      * @param name The file name, or a folder name
177      */
178     public RecordManager( String fileName )
179     {
180         this( fileName, DEFAULT_PAGE_SIZE );
181     }
182 
183 
184     /**
185      * Create a Record manager which will either create the underlying file
186      * or load an existing one. If a folder is provider, then we will create
187      * a file with a default name : mavibot.db
188      * 
189      * @param name The file name, or a folder name
190      * @param pageSize the size of a page on disk
191      */
192     public RecordManager( String fileName, int pageSize )
193     {
194         managedBTrees = new LinkedHashMap<String, BTree<Object, Object>>();
195 
196         HEADER_BUFFER = ByteBuffer.allocate( pageSize );
197         HEADER_BYTES = new byte[pageSize];
198         HEADER_SIZE = pageSize;
199 
200         // Open the file or create it
201         File tmpFile = new File( fileName );
202         boolean isNewFile = false;
203 
204         if ( tmpFile.isDirectory() )
205         {
206             // It's a directory. Check that we don't have an existing mavibot file
207             File mavibotFile = new File( tmpFile, DEFAULT_FILE_NAME );
208 
209             if ( !mavibotFile.exists() )
210             {
211                 // We have to create a new file
212                 try
213                 {
214                     mavibotFile.createNewFile();
215                     isNewFile = true;
216                 }
217                 catch ( IOException ioe )
218                 {
219                     LOG.error( "Cannot create the file {}", mavibotFile.getName() );
220                     return;
221                 }
222             }
223 
224             file = mavibotFile;
225         }
226         else
227         {
228             // It's a file. Let's see if it exists, otherwise create it
229             if ( !tmpFile.exists() || ( tmpFile.length() == 0 ) )
230             {
231                 isNewFile = true;
232 
233                 try
234                 {
235                     tmpFile.createNewFile();
236                 }
237                 catch ( IOException ioe )
238                 {
239                     LOG.error( "Cannot create the file {}", tmpFile.getName() );
240                     return;
241                 }
242             }
243 
244             file = tmpFile;
245         }
246 
247         try
248         {
249             RandomAccessFile randomFile = new RandomAccessFile( file, "rw" );
250             fileChannel = randomFile.getChannel();
251 
252             // get the current end of file offset
253             endOfFileOffset = fileChannel.size();
254 
255             if ( isNewFile )
256             {
257                 this.pageSize = pageSize;
258                 initRecordManager();
259             }
260             else
261             {
262                 loadRecordManager();
263             }
264         }
265         catch ( Exception e )
266         {
267             LOG.error( "Error while initializing the RecordManager : {}", e.getMessage() );
268             LOG.error( "", e );
269             throw new RuntimeException( e );
270         }
271     }
272 
273 
274     /**
275      * We will create a brand new RecordManager file, containing nothing, but the header, 
276      * a BTree to manage the old revisions we want to keep and
277      * a BTree used to manage pages associated with old versions.
278      * <br/>
279      * The Header contains the following details :
280      * <pre>
281      * +-----------+
282      * | PageSize  | 4 bytes : The size of a physical page (default to 4096)
283      * +-----------+
284      * |  NbTree   | 4 bytes : The number of managed BTrees (at least 1)
285      * +-----------+ 
286      * | FirstFree | 8 bytes : The offset of the first free page
287      * +-----------+
288      * | LastFree  | 8 bytes : The offset of the last free page
289      * +-----------+
290      * </pre>
291      * 
292      * We then store the BTree managing the pages that have been copied when we have added
293      * or deleted an element in the BTree. They are associated with a version.
294      * 
295      * Last, we add the bTree that keep a track on each revision we can have access to.
296      */
297     private void initRecordManager() throws IOException
298     {
299         // Create a new Header
300         nbBtree = 0;
301         firstFreePage = NO_PAGE;
302         lastFreePage = NO_PAGE;
303         updateRecordManagerHeader();
304 
305         // Set the offset of the end of the file
306         endOfFileOffset = fileChannel.size();
307 
308         // Now, initialize the Copied Page BTree
309         copiedPageBTree = new BTree<RevisionName, long[]>( COPIED_PAGE_BTREE_NAME, new RevisionNameSerializer(),
310             new LongArraySerializer() );
311 
312         // and initialize the Revision BTree
313         revisionBTree = new BTree<RevisionName, Long>( REVISION_BTREE_NAME, new RevisionNameSerializer(),
314             new LongSerializer() );
315 
316         // Inject these BTrees into the RecordManager
317         try
318         {
319             manage( copiedPageBTree );
320             manage( revisionBTree );
321         }
322         catch ( BTreeAlreadyManagedException btame )
323         {
324             // Can't happen here.
325         }
326 
327         // We are all set !
328     }
329 
330 
331     /**
332      * Load the BTrees from the disk. 
333      * 
334      * @throws InstantiationException 
335      * @throws IllegalAccessException 
336      * @throws ClassNotFoundException 
337      */
338     private void loadRecordManager() throws IOException, ClassNotFoundException, IllegalAccessException,
339         InstantiationException
340     {
341         if ( fileChannel.size() != 0 )
342         {
343             ByteBuffer header = ByteBuffer.allocate( HEADER_SIZE );
344 
345             // The file exists, we have to load the data now 
346             fileChannel.read( header );
347 
348             header.rewind();
349 
350             // The page size
351             pageSize = header.getInt();
352 
353             // The number of managed BTrees
354             nbBtree = header.getInt();
355 
356             // The first and last free page
357             firstFreePage = header.getLong();
358             lastFreePage = header.getLong();
359 
360             // Now read each BTree. The first one is the one which
361             // manage the modified pages. Once read, we can discard all
362             // the pages that are stored in it, as we have restarted 
363             // the RecordManager.
364             long btreeOffset = HEADER_SIZE;
365 
366             PageIO[] pageIos = readPageIOs( HEADER_SIZE, Long.MAX_VALUE );
367 
368             // Create the BTree
369             copiedPageBTree = BTreeFactory.<RevisionName, long[]> createBTree();
370             copiedPageBTree.setBtreeOffset( btreeOffset );
371 
372             loadBTree( pageIos, copiedPageBTree );
373             long nextBtreeOffset = copiedPageBTree.getNextBTreeOffset();
374 
375             // And the Revision BTree
376             pageIos = readPageIOs( nextBtreeOffset, Long.MAX_VALUE );
377 
378             revisionBTree = BTreeFactory.<RevisionName, Long> createBTree();
379             revisionBTree.setBtreeOffset( nextBtreeOffset );
380 
381             loadBTree( pageIos, revisionBTree );
382             nextBtreeOffset = revisionBTree.getNextBTreeOffset();
383 
384             // Then process the next ones
385             for ( int i = 2; i < nbBtree; i++ )
386             {
387                 // Create the BTree
388                 BTree<Object, Object> btree = BTreeFactory.createBTree();
389                 btree.setBtreeOffset( nextBtreeOffset );
390                 lastAddedBTreeOffset = nextBtreeOffset;
391 
392                 // Read the associated pages
393                 pageIos = readPageIOs( nextBtreeOffset, Long.MAX_VALUE );
394 
395                 // Load the BTree
396                 loadBTree( pageIos, btree );
397                 nextBtreeOffset = btree.getNextBTreeOffset();
398 
399                 // Store it into the managedBtrees map
400                 managedBTrees.put( btree.getName(), btree );
401             }
402 
403             // We are done ! Let's finish with the last initilization parts
404             endOfFileOffset = fileChannel.size();
405         }
406     }
407 
408 
409     /**
410      * Reads all the PageIOs that are linked to the page at the given position, including
411      * the first page.
412      * 
413      * @param position The position of the first page
414      * @return An array of pages
415      */
416     private PageIO[] readPageIOs( long position, long limit ) throws IOException, EndOfFileExceededException
417     {
418         LOG.debug( "Read PageIOs at position {}", position );
419 
420         if ( limit <= 0 )
421         {
422             limit = Long.MAX_VALUE;
423         }
424 
425         PageIO firstPage = fetchPage( position );
426         firstPage.setSize();
427         List<PageIO> listPages = new ArrayList<PageIO>();
428         listPages.add( firstPage );
429         long dataRead = pageSize - LONG_SIZE - INT_SIZE;
430 
431         // Iterate on the pages, if needed
432         long nextPage = firstPage.getNextPage();
433 
434         if ( ( dataRead < limit ) && ( nextPage != NO_PAGE ) )
435         {
436             while ( dataRead < limit )
437             {
438                 PageIO page = fetchPage( nextPage );
439                 listPages.add( page );
440                 nextPage = page.getNextPage();
441                 dataRead += pageSize - LONG_SIZE;
442 
443                 if ( nextPage == NO_PAGE )
444                 {
445                     page.setNextPage( NO_PAGE );
446                     break;
447                 }
448             }
449         }
450 
451         LOG.debug( "Nb of PageIOs read : {}", listPages.size() );
452 
453         // Return 
454         return listPages.toArray( new PageIO[]
455             {} );
456     }
457 
458 
459     /**
460      * Read a BTree from the disk. The meta-data are at the given position in the list of pages.
461      * 
462      * @param pageIos The list of pages containing the meta-data
463      * @param btree The BTree we have to initialize
464      * @throws InstantiationException 
465      * @throws IllegalAccessException 
466      * @throws ClassNotFoundException 
467      */
468     private <K, V> void loadBTree( PageIO[] pageIos, BTree<K, V> btree ) throws EndOfFileExceededException,
469         IOException, ClassNotFoundException, IllegalAccessException, InstantiationException
470     {
471         long dataPos = 0L;
472 
473         // The BTree current revision
474         long revision = readLong( pageIos, dataPos );
475         BTreeFactory.setRevision( btree, revision );
476         dataPos += LONG_SIZE;
477 
478         // The nb elems in the tree
479         long nbElems = readLong( pageIos, dataPos );
480         BTreeFactory.setNbElems( btree, nbElems );
481         dataPos += LONG_SIZE;
482 
483         // The BTree rootPage offset
484         long rootPageOffset = readLong( pageIos, dataPos );
485         BTreeFactory.setRootPageOffset( btree, rootPageOffset );
486         dataPos += LONG_SIZE;
487 
488         // The next BTree offset
489         long nextBTreeOffset = readLong( pageIos, dataPos );
490         BTreeFactory.setNextBTreeOffset( btree, nextBTreeOffset );
491         dataPos += LONG_SIZE;
492 
493         // The BTree page size
494         int btreePageSize = readInt( pageIos, dataPos );
495         BTreeFactory.setPageSize( btree, btreePageSize );
496         dataPos += INT_SIZE;
497 
498         // The tree name
499         ByteBuffer btreeNameBytes = readBytes( pageIos, dataPos );
500         dataPos += INT_SIZE + btreeNameBytes.limit();
501         String btreeName = Strings.utf8ToString( btreeNameBytes );
502         BTreeFactory.setName( btree, btreeName );
503 
504         // The keySerializer FQCN
505         ByteBuffer keySerializerBytes = readBytes( pageIos, dataPos );
506         dataPos += INT_SIZE + keySerializerBytes.limit();
507 
508         String keySerializerFqcn = "";
509 
510         if ( keySerializerBytes != null )
511         {
512             keySerializerFqcn = Strings.utf8ToString( keySerializerBytes );
513         }
514 
515         BTreeFactory.setKeySerializer( btree, keySerializerFqcn );
516 
517         // The valueSerialier FQCN
518         ByteBuffer valueSerializerBytes = readBytes( pageIos, dataPos );
519 
520         String valueSerializerFqcn = "";
521         dataPos += INT_SIZE + valueSerializerBytes.limit();
522 
523         if ( valueSerializerBytes != null )
524         {
525             valueSerializerFqcn = Strings.utf8ToString( valueSerializerBytes );
526         }
527 
528         BTreeFactory.setValueSerializer( btree, valueSerializerFqcn );
529 
530         // The BTree allowDuplicates flag
531         int allowDuplicates = readInt( pageIos, dataPos );
532         btree.setAllowDuplicates( allowDuplicates != 0 );
533         dataPos += INT_SIZE;
534 
535         // Now, init the BTree
536         btree.init();
537 
538         // Now, load the rootPage, which can be a Leaf or a Node, depending 
539         // on the number of elements in the tree : if it's above the pageSize,
540         // it's a Node, otherwise it's a Leaf
541 
542         // Read the rootPage pages on disk
543         PageIO[] rootPageIos = readPageIOs( rootPageOffset, Long.MAX_VALUE );
544 
545         Page<K, V> btreeRoot = readPage( btree, rootPageIos );
546         BTreeFactory.setRecordManager( btree, this );
547 
548         BTreeFactory.setRoot( btree, btreeRoot );
549     }
550 
551 
552     private <K, V> Page<K, V> readNode( BTree<K, V> btree, long offset, long revision, int nbElems ) throws IOException
553     {
554         Page<K, V> node = BTreeFactory.createNode( btree, revision, nbElems );
555 
556         // Read the rootPage pages on disk
557         PageIO[] pageIos = readPageIOs( offset, Long.MAX_VALUE );
558 
559         return node;
560     }
561 
562 
563     public <K, V> Page<K, V> deserialize( BTree<K, V> btree, long offset ) throws EndOfFileExceededException,
564         IOException
565     {
566         PageIO[] rootPageIos = readPageIOs( offset, Long.MAX_VALUE );
567 
568         Page<K, V> page = readPage( btree, rootPageIos );
569 
570         ( ( AbstractPage<K, V> ) page ).setOffset( rootPageIos[0].getOffset() );
571         ( ( AbstractPage<K, V> ) page ).setLastOffset( rootPageIos[rootPageIos.length - 1].getOffset() );
572 
573         return page;
574     }
575 
576 
577     private <K, V> Page<K, V> readPage( BTree<K, V> btree, PageIO[] pageIos ) throws IOException
578     {
579         // Deserialize the rootPage now
580         long position = 0L;
581 
582         // The revision
583         long revision = readLong( pageIos, position );
584         position += LONG_SIZE;
585 
586         // The number of elements in the page
587         int nbElems = readInt( pageIos, position );
588         position += INT_SIZE;
589 
590         // The size of the data containing the keys and values
591         Page<K, V> page = null;
592 
593         // Reads the bytes containing all the keys and values, if we have some
594         // We read  big blog of data into  ByteBuffer, then we will process
595         // this ByteBuffer
596         ByteBuffer byteBuffer = readBytes( pageIos, position );
597 
598         // Now, deserialize the data block. If the number of elements
599         // is positive, it's a Leaf, otherwise it's a Node
600         // Note that only a leaf can have 0 elements, and it's the root page then.
601         if ( nbElems >= 0 )
602         {
603             // It's a leaf
604             page = readLeafKeysAndValues( btree, nbElems, revision, byteBuffer, pageIos );
605         }
606         else
607         {
608             // It's a node
609             page = readNodeKeysAndValues( btree, -nbElems, revision, byteBuffer, pageIos );
610         }
611 
612         return page;
613     }
614 
615 
616     /**
617      * Deserialize a Leaf from some PageIOs
618      */
619     private <K, V> Leaf<K, V> readLeafKeysAndValues( BTree<K, V> btree, int nbElems, long revision, ByteBuffer byteBuffer,
620         PageIO[] pageIos )
621     {
622         // Its a leaf, create it
623         Leaf<K, V> leaf = BTreeFactory.createLeaf( btree, revision, nbElems );
624 
625         // Store the page offset on disk
626         leaf.setOffset( pageIos[0].getOffset() );
627         leaf.setLastOffset( pageIos[pageIos.length - 1].getOffset() );
628 
629         int[] keyLengths = new int[nbElems];
630         int[] valueLengths = new int[nbElems];
631 
632         // Read each key and value
633         for ( int i = 0; i < nbElems; i++ )
634         {
635             // Read the number of values
636             int nbValues = byteBuffer.getInt();
637             ValueHolder<V> valueHolder = null;
638 
639             if ( nbValues < 0 )
640             {
641                 // This is a sub-btree
642                 byte[] btreeOffsetBytes = new byte[LONG_SIZE];
643                 byteBuffer.get( btreeOffsetBytes );
644                 
645                 // Create the valueHolder. As the number of values is negative, we have to switch
646                 // to a positive value but as we start at -1 for 0 value, add 1.
647                 valueHolder = new ValueHolder<V>( btree, 1 - nbValues, btreeOffsetBytes );
648             }
649             else
650             {
651                 // This is an array
652                 // Read the value's array length
653                 valueLengths[i] = byteBuffer.getInt();
654 
655                 // This is an Array of values, read the byte[] associated with it
656                 byte[] arrayBytes = new byte[valueLengths[i]];
657                 byteBuffer.get( arrayBytes );
658                 valueHolder = new ValueHolder<V>( btree, nbValues, arrayBytes );
659             }
660 
661             BTreeFactory.setValue( leaf, i, valueHolder );
662 
663             keyLengths[i] = byteBuffer.getInt();
664             byte[] data = new byte[keyLengths[i]];
665             byteBuffer.get( data );
666             BTreeFactory.setKey( leaf, i, data );
667         }
668 
669         return leaf;
670     }
671 
672 
673     /**
674      * Deserialize a Node from some PageIos
675      */
676     private <K, V> Node<K, V> readNodeKeysAndValues( BTree<K, V> btree, int nbElems, long revision, ByteBuffer byteBuffer,
677         PageIO[] pageIos ) throws IOException
678     {
679         Node<K, V> node = BTreeFactory.createNode( btree, revision, nbElems );
680 
681         // Read each value and key
682         for ( int i = 0; i < nbElems; i++ )
683         {
684             // This is an Offset
685             long offset = OFFSET_SERIALIZER.deserialize( byteBuffer );
686             long lastOffset = OFFSET_SERIALIZER.deserialize( byteBuffer );
687 
688             PageHolder<K, V> valueHolder = new PageHolder<K, V>( btree, null, offset, lastOffset );
689             node.setValue( i, valueHolder );
690 
691             // Read the key length
692             int keyLength = byteBuffer.getInt();
693             
694             int currentPosition = byteBuffer.position();
695             
696             // and the key value
697             K key = btree.getKeySerializer().deserialize( byteBuffer );
698             
699             // Set the new position now
700             byteBuffer.position( currentPosition + keyLength );
701             
702             BTreeFactory.setKey( node, i, key );
703         }
704 
705         // and read the last value, as it's a node
706         long offset = OFFSET_SERIALIZER.deserialize( byteBuffer );
707         long lastOffset = OFFSET_SERIALIZER.deserialize( byteBuffer );
708 
709         PageHolder<K, V> valueHolder = new PageHolder<K, V>( btree, null, offset, lastOffset );
710         node.setValue( nbElems, valueHolder );
711 
712         return node;
713     }
714 
715 
716     /**
717      * Read a byte[] from pages.
718      * @param pageIos The pages we want to read the byte[] from
719      * @param position The position in the data stored in those pages
720      * @return The byte[] we have read
721      */
722     private ByteBuffer readBytes( PageIO[] pageIos, long position )
723     {
724         // Read the byte[] length first
725         int length = readInt( pageIos, position );
726         position += INT_SIZE;
727 
728         // Compute the page in which we will store the data given the 
729         // current position
730         int pageNb = computePageNb( position );
731 
732         // Compute the position in the current page
733         int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
734 
735         ByteBuffer pageData = pageIos[pageNb].getData();
736         int remaining = pageData.capacity() - pagePos;
737 
738         if ( length == 0 )
739         {
740             // No bytes to read : return null;
741             return null;
742         }
743         else
744         {
745             ByteBuffer bytes = ByteBuffer.allocate( length );
746             int bytesPos = 0;
747 
748             while ( length > 0 )
749             {
750                 if ( length <= remaining )
751                 {
752                     pageData.mark();
753                     pageData.position( pagePos );
754                     int oldLimit = pageData.limit();
755                     pageData.limit( pagePos + length );
756                     bytes.put( pageData );
757                     pageData.limit( oldLimit );
758                     pageData.reset();
759                     bytes.rewind();
760 
761                     return bytes;
762                 }
763 
764                 pageData.mark();
765                 pageData.position( pagePos );
766                 int oldLimit = pageData.limit();
767                 pageData.limit( pagePos + remaining );
768                 bytes.put( pageData );
769                 pageData.limit( oldLimit );
770                 pageData.reset();
771                 pageNb++;
772                 pagePos = LINK_SIZE;
773                 bytesPos += remaining;
774                 pageData = pageIos[pageNb].getData();
775                 length -= remaining;
776                 remaining = pageData.capacity() - pagePos;
777             }
778 
779             bytes.rewind();
780 
781             return bytes;
782         }
783     }
784 
785 
786     /**
787      * Read an int from pages 
788      * @param pageIos The pages we want to read the int from
789      * @param position The position in the data stored in those pages
790      * @return The int we have read
791      */
792     private int readInt( PageIO[] pageIos, long position )
793     {
794         // Compute the page in which we will store the data given the 
795         // current position
796         int pageNb = computePageNb( position );
797 
798         // Compute the position in the current page
799         int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
800 
801         ByteBuffer pageData = pageIos[pageNb].getData();
802         int remaining = pageData.capacity() - pagePos;
803         int value = 0;
804 
805         if ( remaining >= INT_SIZE )
806         {
807             value = pageData.getInt( pagePos );
808         }
809         else
810         {
811             value = 0;
812 
813             switch ( remaining )
814             {
815                 case 3:
816                     value += ( ( pageData.get( pagePos + 2 ) & 0x00FF ) << 8 );
817                     // Fallthrough !!!
818 
819                 case 2:
820                     value += ( ( pageData.get( pagePos + 1 ) & 0x00FF ) << 16 );
821                     // Fallthrough !!!
822 
823                 case 1:
824                     value += ( pageData.get( pagePos ) << 24 );
825                     break;
826             }
827 
828             // Now deal with the next page
829             pageData = pageIos[pageNb + 1].getData();
830             pagePos = LINK_SIZE;
831 
832             switch ( remaining )
833             {
834                 case 1:
835                     value += ( pageData.get( pagePos ) & 0x00FF ) << 16;
836                     // fallthrough !!!
837 
838                 case 2:
839                     value += ( pageData.get( pagePos + 2 - remaining ) & 0x00FF ) << 8;
840                     // fallthrough !!!
841 
842                 case 3:
843                     value += ( pageData.get( pagePos + 3 - remaining ) & 0x00FF );
844                     break;
845             }
846         }
847 
848         return value;
849     }
850 
851 
852     /**
853      * Read a byte from pages 
854      * @param pageIos The pages we want to read the byte from
855      * @param position The position in the data stored in those pages
856      * @return The byte we have read
857      */
858     private byte readByte( PageIO[] pageIos, long position )
859     {
860         // Compute the page in which we will store the data given the 
861         // current position
862         int pageNb = computePageNb( position );
863 
864         // Compute the position in the current page
865         int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
866 
867         ByteBuffer pageData = pageIos[pageNb].getData();
868         byte value = 0;
869 
870         value = pageData.get( pagePos );
871 
872         return value;
873     }
874 
875 
876     /**
877      * Read a long from pages 
878      * @param pageIos The pages we want to read the long from
879      * @param position The position in the data stored in those pages
880      * @return The long we have read
881      */
882     private long readLong( PageIO[] pageIos, long position )
883     {
884         // Compute the page in which we will store the data given the 
885         // current position
886         int pageNb = computePageNb( position );
887 
888         // Compute the position in the current page
889         int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
890 
891         ByteBuffer pageData = pageIos[pageNb].getData();
892         int remaining = pageData.capacity() - pagePos;
893         long value = 0L;
894 
895         if ( remaining >= LONG_SIZE )
896         {
897             value = pageData.getLong( pagePos );
898         }
899         else
900         {
901             switch ( remaining )
902             {
903                 case 7:
904                     value += ( ( ( long ) pageData.get( pagePos + 6 ) & 0x00FF ) << 8 );
905                     // Fallthrough !!!
906 
907                 case 6:
908                     value += ( ( ( long ) pageData.get( pagePos + 5 ) & 0x00FF ) << 16 );
909                     // Fallthrough !!!
910 
911                 case 5:
912                     value += ( ( ( long ) pageData.get( pagePos + 4 ) & 0x00FF ) << 24 );
913                     // Fallthrough !!!
914 
915                 case 4:
916                     value += ( ( ( long ) pageData.get( pagePos + 3 ) & 0x00FF ) << 32 );
917                     // Fallthrough !!!
918 
919                 case 3:
920                     value += ( ( ( long ) pageData.get( pagePos + 2 ) & 0x00FF ) << 40 );
921                     // Fallthrough !!!
922 
923                 case 2:
924                     value += ( ( ( long ) pageData.get( pagePos + 1 ) & 0x00FF ) << 48 );
925                     // Fallthrough !!!
926 
927                 case 1:
928                     value += ( ( long ) pageData.get( pagePos ) << 56 );
929                     break;
930             }
931 
932             // Now deal with the next page
933             pageData = pageIos[pageNb + 1].getData();
934             pagePos = LINK_SIZE;
935 
936             switch ( remaining )
937             {
938                 case 1:
939                     value += ( ( long ) pageData.get( pagePos ) & 0x00FF ) << 48;
940                     // fallthrough !!!
941 
942                 case 2:
943                     value += ( ( long ) pageData.get( pagePos + 2 - remaining ) & 0x00FF ) << 40;
944                     // fallthrough !!!
945 
946                 case 3:
947                     value += ( ( long ) pageData.get( pagePos + 3 - remaining ) & 0x00FF ) << 32;
948                     // fallthrough !!!
949 
950                 case 4:
951                     value += ( ( long ) pageData.get( pagePos + 4 - remaining ) & 0x00FF ) << 24;
952                     // fallthrough !!!
953 
954                 case 5:
955                     value += ( ( long ) pageData.get( pagePos + 5 - remaining ) & 0x00FF ) << 16;
956                     // fallthrough !!!
957 
958                 case 6:
959                     value += ( ( long ) pageData.get( pagePos + 6 - remaining ) & 0x00FF ) << 8;
960                     // fallthrough !!!
961 
962                 case 7:
963                     value += ( ( long ) pageData.get( pagePos + 7 - remaining ) & 0x00FF );
964                     break;
965             }
966         }
967 
968         return value;
969     }
970 
971 
972     /**
973      * Manage a BTree. The btree will be added and managed by this RecordManager. We will create a 
974      * new RootPage for this added BTree, which will contain no data. 
975      *  
976      * @param btree The new BTree to manage.
977      */
978     public synchronized <K, V> void manage( BTree<K, V> btree ) throws BTreeAlreadyManagedException, IOException
979     {
980         manage( ( BTree<Object, Object> ) btree, NORMAL_BTREE );
981     }
982 
983 
984     /**
985      * works the same as @see #manage(BTree) except the given tree will not be linked to top level trees that will be
986      * loaded initially if the internalTree flag is set to true
987      * 
988      * @param btree The new BTree to manage.
989      * @param internalTree flag indicating if this is an internal tree
990      * 
991      * @throws BTreeAlreadyManagedException
992      * @throws IOException
993      */
994     public synchronized <K, V> void manage( BTree<K, V> btree, boolean internalTree )
995         throws BTreeAlreadyManagedException,
996         IOException
997     {
998         LOG.debug( "Managing the btree {} which is an internam tree : {}", btree.getName(), internalTree );
999         BTreeFactory.setRecordManager( btree, this );
1000 
1001         String name = btree.getName();
1002 
1003         if ( managedBTrees.containsKey( name ) )
1004         {
1005             // There is already a BTree with this name in the recordManager...
1006             LOG.error( "There is already a BTree named '{}' managed by this recordManager", name );
1007             throw new BTreeAlreadyManagedException( name );
1008         }
1009 
1010         // Do not add the BTree if it's internal into the Map of managed btrees, otherwise we will 
1011         // not discard it when reloading a page wth internal btrees
1012         if ( !internalTree )
1013         {
1014             managedBTrees.put( name, ( BTree<Object, Object> ) btree );
1015         } 
1016 
1017         // We will add the newly managed BTree at the end of the header.
1018         byte[] btreeNameBytes = Strings.getBytesUtf8( name );
1019         byte[] keySerializerBytes = Strings.getBytesUtf8( btree.getKeySerializerFQCN() );
1020         byte[] valueSerializerBytes = Strings.getBytesUtf8( btree.getValueSerializerFQCN() );
1021 
1022         int bufferSize =
1023             INT_SIZE + // The name size
1024                 btreeNameBytes.length + // The name
1025                 INT_SIZE + // The keySerializerBytes size 
1026                 keySerializerBytes.length + // The keySerializerBytes
1027                 INT_SIZE + // The valueSerializerBytes size
1028                 valueSerializerBytes.length + // The valueSerializerBytes
1029                 INT_SIZE + // The page size
1030                 LONG_SIZE + // The revision
1031                 LONG_SIZE + // the number of element
1032                 LONG_SIZE + // the nextBtree offset
1033                 LONG_SIZE + // The root offset
1034                 INT_SIZE; // The allowDuplicates flag
1035 
1036         // Get the pageIOs we need to store the data. We may need more than one.
1037         PageIO[] pageIos = getFreePageIOs( bufferSize );
1038 
1039         // Store the BTree Offset into the BTree
1040         long btreeOffset = pageIos[0].getOffset();
1041         btree.setBtreeOffset( btreeOffset );
1042 
1043         // Now store the BTree data in the pages :
1044         // - the BTree revision
1045         // - the BTree number of elements
1046         // - The RootPage offset
1047         // - The next Btree offset 
1048         // - the BTree page size
1049         // - the BTree name
1050         // - the keySerializer FQCN
1051         // - the valueSerializer FQCN
1052         // - the flags that tell if the dups are allowed
1053         // Starts at 0
1054         long position = 0L;
1055 
1056         // The BTree current revision
1057         position = store( position, btree.getRevision(), pageIos );
1058 
1059         // The nb elems in the tree
1060         position = store( position, btree.getNbElems(), pageIos );
1061 
1062         // Serialize the BTree root page
1063         Page<K, V> rootPage = BTreeFactory.getRoot( btree );
1064 
1065         PageIO[] rootPageIos = serializePage( btree, btree.getRevision(), rootPage );
1066 
1067         // Get the reference on the first page
1068         PageIO rootPageIo = rootPageIos[0];
1069 
1070         // Now, we can inject the BTree rootPage offset into the BTree header
1071         position = store( position, rootPageIo.getOffset(), pageIos );
1072         btree.setRootPageOffset( rootPageIo.getOffset() );
1073         ( ( Leaf<K, V> ) rootPage ).setOffset( rootPageIo.getOffset() );
1074 
1075         // The next BTree Header offset (-1L, as it's a new BTree)
1076         position = store( position, NO_PAGE, pageIos );
1077 
1078         // The BTree page size
1079         position = store( position, btree.getPageSize(), pageIos );
1080 
1081         // The tree name
1082         position = store( position, btreeNameBytes, pageIos );
1083 
1084         // The keySerializer FQCN
1085         position = store( position, keySerializerBytes, pageIos );
1086 
1087         // The valueSerialier FQCN
1088         position = store( position, valueSerializerBytes, pageIos );
1089 
1090         // The allowDuplicates flag
1091         position = store( position, ( btree.isAllowDuplicates() ? 1 : 0 ), pageIos );
1092 
1093         // And flush the pages to disk now
1094         LOG.debug( "Flushing the newly managed '{}' btree header", btree.getName() );
1095         flushPages( pageIos );
1096         LOG.debug( "Flushing the newly managed '{}' btree rootpage", btree.getName() );
1097         flushPages( rootPageIos );
1098 
1099         // Now, if this added BTree is not the first BTree, we have to link it with the 
1100         // latest added BTree
1101         if ( !internalTree )
1102         {
1103             nbBtree++;
1104 
1105             if ( lastAddedBTreeOffset != NO_PAGE )
1106             {
1107                 // We have to update the nextBtreeOffset from the previous BTreeHeader
1108                 pageIos = readPageIOs( lastAddedBTreeOffset, LONG_SIZE + LONG_SIZE + LONG_SIZE + LONG_SIZE );
1109                 store( LONG_SIZE + LONG_SIZE + LONG_SIZE, btreeOffset, pageIos );
1110 
1111                 // Write the pages on disk
1112                 LOG.debug( "Updated the previous btree pointer on the added BTree {}", btree.getName() );
1113                 flushPages( pageIos );
1114             }
1115 
1116             lastAddedBTreeOffset = btreeOffset;
1117 
1118             // Last, not least, update the number of managed BTrees in the header
1119             updateRecordManagerHeader();
1120         }
1121 
1122         if ( LOG_CHECK.isDebugEnabled() )
1123         {
1124             check();
1125         }
1126     }
1127 
1128 
1129     /**
1130      * Serialize a new Page. It will contain the following data :<br/>
1131      * <ul>
1132      * <li>the revision : a long</li>
1133      * <li>the number of elements : an int (if <= 0, it's a Node, otherwise it's a Leaf)</li>
1134      * <li>the size of the values/keys when serialized
1135      * <li>the keys : an array of serialized keys</li>
1136      * <li>the values : an array of references to the children pageIO offset (stored as long)
1137      * if it's a Node, or a list of values if it's a Leaf</li>
1138      * <li></li>
1139      * </ul>
1140      * 
1141      * @param revision The node revision
1142      * @param keys The keys to serialize
1143      * @param children The references to the children
1144      * @return An array of pages containing the serialized node
1145      * @throws IOException
1146      */
1147     private <K, V> PageIO[] serializePage( BTree<K, V> btree, long revision, Page<K, V> page ) throws IOException
1148     {
1149         int nbElems = page.getNbElems();
1150 
1151         if ( nbElems == 0 )
1152         {
1153             return serializeRootPage( revision );
1154         }
1155         else
1156         {
1157             // Prepare a list of byte[] that will contain the serialized page
1158             int nbBuffers = 1 + 1 + 1 + nbElems * 3;
1159             int dataSize = 0;
1160             int serializedSize = 0;
1161 
1162             if ( page instanceof Node )
1163             {
1164                 // A Node has one more value to store
1165                 nbBuffers++;
1166             }
1167 
1168             // Now, we can create the list with the right size
1169             List<byte[]> serializedData = new ArrayList<byte[]>( nbBuffers );
1170 
1171             // The revision
1172             byte[] buffer = LongSerializer.serialize( revision );
1173             serializedData.add( buffer );
1174             serializedSize += buffer.length;
1175 
1176             // The number of elements
1177             // Make it a negative value if it's a Node
1178             int pageNbElems = nbElems;
1179 
1180             if ( page instanceof Node )
1181             {
1182                 pageNbElems = -nbElems;
1183             }
1184 
1185             buffer = IntSerializer.serialize( pageNbElems );
1186             serializedData.add( buffer );
1187             serializedSize += buffer.length;
1188 
1189             // Iterate on the keys and values. We first serialize the value, then the key
1190             // until we are done with all of them. If we are serializing a page, we have
1191             // to serialize one more value
1192             for ( int pos = 0; pos < nbElems; pos++ )
1193             {
1194                 // Start with the value
1195                 if ( page instanceof Node )
1196                 {
1197                     dataSize += serializeNodeValue( ( Node<K, V> ) page, pos, serializedData );
1198                     dataSize += serializeNodeKey( ( Node<K, V> ) page, pos, serializedData );
1199                 }
1200                 else
1201                 {
1202                     dataSize += serializeLeafValue( ( Leaf<K, V> ) page, pos, serializedData );
1203                     dataSize += serializeLeafKey( ( Leaf<K, V> ) page, pos, serializedData );
1204                 }
1205             }
1206 
1207             // Nodes have one more value to serialize
1208             if ( page instanceof Node )
1209             {
1210                 dataSize += serializeNodeValue( ( Node<K, V> ) page, nbElems, serializedData );
1211             }
1212 
1213             // Store the data size
1214             buffer = IntSerializer.serialize( dataSize );
1215             serializedData.add( 2, buffer );
1216             serializedSize += buffer.length;
1217 
1218             serializedSize += dataSize;
1219 
1220             // We are done. Allocate the pages we need to store the data
1221             PageIO[] pageIos = getFreePageIOs( serializedSize );
1222 
1223             // And store the data into those pages
1224             long position = 0L;
1225 
1226             for ( byte[] bytes : serializedData )
1227             {
1228                 position = storeRaw( position, bytes, pageIos );
1229             }
1230 
1231             return pageIos;
1232         }
1233     }
1234 
1235 
1236     /**
1237      * Serialize a Node's key
1238      */
1239     private <K, V> int serializeNodeKey( Node<K, V> node, int pos, List<byte[]> serializedData )
1240     {
1241         KeyHolder<K> holder = node.getKeyHolder( pos );
1242         byte[] buffer = holder.getRaw();
1243         
1244         // We have to store the serialized key length
1245         byte[] length = IntSerializer.serialize( buffer.length );
1246         serializedData.add( length );
1247 
1248         // And store the serialized key now
1249         serializedData.add( buffer );
1250 
1251         return buffer.length + INT_SIZE;
1252     }
1253 
1254 
1255     /**
1256      * Serialize a Node's Value. We store the two offsets of the child page.
1257      */
1258     private <K, V> int serializeNodeValue( Node<K, V> node, int pos, List<byte[]> serializedData )
1259         throws IOException
1260     {
1261         // For a node, we just store the children's offsets
1262         Page<K, V> child = node.getReference( pos );
1263 
1264         // The first offset
1265         byte[] buffer = LongSerializer.serialize( child.getOffset() );
1266         serializedData.add( buffer );
1267         int dataSize = buffer.length;
1268 
1269         // The last offset
1270         buffer = LongSerializer.serialize( child.getLastOffset() );
1271         serializedData.add( buffer );
1272         dataSize += buffer.length;
1273 
1274         return dataSize;
1275     }
1276 
1277 
1278     /**
1279      * Serialize a Leaf's key
1280      */
1281     private <K, V> int serializeLeafKey( Leaf<K, V> leaf, int pos, List<byte[]> serializedData )
1282     {
1283         int dataSize = 0;
1284         KeyHolder<K> keyHolder = leaf.getKeyHolder( pos );
1285         byte[] keyData = keyHolder.getRaw();
1286 
1287         if ( keyData != null )
1288         {
1289             // We have to store the serialized key length
1290             byte[] length = IntSerializer.serialize( keyData.length );
1291             serializedData.add( length );
1292 
1293             // And the key data
1294             serializedData.add( keyData );
1295             dataSize += keyData.length + INT_SIZE;
1296         }
1297         else
1298         {
1299             serializedData.add( IntSerializer.serialize( 0 ) );
1300             dataSize += INT_SIZE;
1301         }
1302 
1303         return dataSize;
1304     }
1305 
1306 
1307     /**
1308      * Serialize a Leaf's Value. We store 
1309      */
1310     private <K, V> int serializeLeafValue( Leaf<K, V> leaf, int pos, List<byte[]> serializedData )
1311         throws IOException
1312     {
1313         // The value can be an Array or a sub-btree, but we don't care
1314         // we just iterate on all the values
1315         ValueHolder<V> valueHolder = leaf.getValue( pos );
1316         int dataSize = 0;
1317 
1318         if ( !valueHolder.isSubBtree() )
1319         {
1320             int nbValues = valueHolder.size();
1321             
1322             // Write the nb elements first
1323             byte[] buffer = IntSerializer.serialize( nbValues );
1324             serializedData.add( buffer );
1325             dataSize = INT_SIZE;
1326 
1327             // We have a serialized value. Just flush it
1328             byte[] data = valueHolder.getRaw();
1329             dataSize += data.length;
1330             
1331             // Store the data size
1332             buffer = IntSerializer.serialize( data.length );
1333             serializedData.add( buffer );
1334             dataSize += INT_SIZE;
1335 
1336             // and add the data
1337             serializedData.add( data );
1338         }
1339         else
1340         {
1341             // First take the number of values
1342             int nbValues = valueHolder.size();
1343     
1344             if ( nbValues == 0 )
1345             {
1346                 // No value. 
1347                 byte[] buffer = IntSerializer.serialize( nbValues );
1348                 serializedData.add( buffer );
1349     
1350                 return buffer.length;
1351             }
1352 
1353             if ( valueHolder.isSubBtree() )
1354             {
1355                 // Store the nbVlues as a negative number. We add 1 so that 0 is not confused with an Array value 
1356                 byte[] buffer = IntSerializer.serialize( -( nbValues + 1 ) );
1357                 serializedData.add( buffer );
1358                 dataSize += buffer.length;
1359     
1360                 // the BTree offset
1361                 buffer = LongSerializer.serialize( valueHolder.getOffset() );
1362                 serializedData.add( buffer );
1363                 dataSize += buffer.length;
1364             }
1365             else
1366             {
1367                 // This is an array, store the nb of values as a positive number
1368                 byte[] buffer = IntSerializer.serialize( nbValues );
1369                 serializedData.add( buffer );
1370                 dataSize += buffer.length;
1371     
1372                 // Now store each value
1373                 byte[] data = valueHolder.getRaw();
1374                 buffer = IntSerializer.serialize( data.length );
1375                 serializedData.add( buffer );
1376                 dataSize += buffer.length;
1377     
1378                 if ( data.length > 0 )
1379                 {
1380                     serializedData.add( data );
1381                 }
1382     
1383                 dataSize += data.length;
1384             }
1385         }
1386 
1387         return dataSize;
1388     }
1389 
1390 
1391     /**
1392      * Write a root page with no elements in it
1393      */
1394     private PageIO[] serializeRootPage( long revision ) throws IOException
1395     {
1396         // We will have 1 single page if we have no elements
1397         PageIO[] pageIos = new PageIO[1];
1398 
1399         // This is either a new root page or a new page that will be filled later
1400         PageIO newPage = fetchNewPage();
1401 
1402         // We need first to create a byte[] that will contain all the data
1403         // For the root page, this is easy, as we only have to store the revision, 
1404         // and the number of elements, which is 0.
1405         long position = 0L;
1406 
1407         position = store( position, revision, newPage );
1408         position = store( position, 0, newPage );
1409 
1410         // Update the page size now
1411         newPage.setSize( ( int ) position );
1412 
1413         // Insert the result into the array of PageIO
1414         pageIos[0] = newPage;
1415 
1416         return pageIos;
1417     }
1418 
1419 
1420     /**
1421      * Update the header, injecting the nbBtree, firstFreePage and lastFreePage
1422      */
1423     private void updateRecordManagerHeader() throws IOException
1424     {
1425         // The page size
1426         HEADER_BYTES[0] = ( byte ) ( pageSize >>> 24 );
1427         HEADER_BYTES[1] = ( byte ) ( pageSize >>> 16 );
1428         HEADER_BYTES[2] = ( byte ) ( pageSize >>> 8 );
1429         HEADER_BYTES[3] = ( byte ) ( pageSize );
1430 
1431         // The number of managed BTree (currently we have only one : the discardedPage BTree
1432         HEADER_BYTES[4] = ( byte ) ( nbBtree >>> 24 );
1433         HEADER_BYTES[5] = ( byte ) ( nbBtree >>> 16 );
1434         HEADER_BYTES[6] = ( byte ) ( nbBtree >>> 8 );
1435         HEADER_BYTES[7] = ( byte ) ( nbBtree );
1436 
1437         // The first free page
1438         HEADER_BYTES[8] = ( byte ) ( firstFreePage >>> 56 );
1439         HEADER_BYTES[9] = ( byte ) ( firstFreePage >>> 48 );
1440         HEADER_BYTES[10] = ( byte ) ( firstFreePage >>> 40 );
1441         HEADER_BYTES[11] = ( byte ) ( firstFreePage >>> 32 );
1442         HEADER_BYTES[12] = ( byte ) ( firstFreePage >>> 24 );
1443         HEADER_BYTES[13] = ( byte ) ( firstFreePage >>> 16 );
1444         HEADER_BYTES[14] = ( byte ) ( firstFreePage >>> 8 );
1445         HEADER_BYTES[15] = ( byte ) ( firstFreePage );
1446 
1447         // The last free page
1448         HEADER_BYTES[16] = ( byte ) ( lastFreePage >>> 56 );
1449         HEADER_BYTES[17] = ( byte ) ( lastFreePage >>> 48 );
1450         HEADER_BYTES[18] = ( byte ) ( lastFreePage >>> 40 );
1451         HEADER_BYTES[19] = ( byte ) ( lastFreePage >>> 32 );
1452         HEADER_BYTES[20] = ( byte ) ( lastFreePage >>> 24 );
1453         HEADER_BYTES[21] = ( byte ) ( lastFreePage >>> 16 );
1454         HEADER_BYTES[22] = ( byte ) ( lastFreePage >>> 8 );
1455         HEADER_BYTES[23] = ( byte ) ( lastFreePage );
1456 
1457         // Write the header on disk
1458         HEADER_BUFFER.put( HEADER_BYTES );
1459         HEADER_BUFFER.flip();
1460 
1461         LOG.debug( "Update RM header, FF : {}, LF : {}", firstFreePage, lastFreePage );
1462         fileChannel.write( HEADER_BUFFER, 0 );
1463         HEADER_BUFFER.clear();
1464 
1465         nbUpdateRMHeader.incrementAndGet();
1466     }
1467 
1468 
1469     /**
1470      * Update the BTree header after a BTree modification. We update the following fields :
1471      * <ul>
1472      * <li>the revision</li>
1473      * <li>the number of elements</li>
1474      * <li>the rootPage offset</li>
1475      * </ul>
1476      * @param btree
1477      * @throws IOException 
1478      * @throws EndOfFileExceededException 
1479      */
1480     /* No qualifier*/<K, V> void updateBtreeHeader( BTree<K, V> btree, long rootPageOffset )
1481         throws EndOfFileExceededException,
1482         IOException
1483     {
1484         // Read the pageIOs associated with this BTree
1485         long offset = btree.getBtreeOffset();
1486         long headerSize = LONG_SIZE + LONG_SIZE + LONG_SIZE;
1487 
1488         PageIO[] pageIos = readPageIOs( offset, headerSize );
1489 
1490         // Now, update the revision
1491         long position = 0;
1492 
1493         position = store( position, btree.getRevision(), pageIos );
1494         position = store( position, btree.getNbElems(), pageIos );
1495         position = store( position, rootPageOffset, pageIos );
1496 
1497         // Write the pages on disk
1498         if ( LOG.isDebugEnabled() )
1499         {
1500             LOG.debug( "-----> Flushing the '{}' BTreeHeader", btree.getName() );
1501             LOG.debug( "  revision : " + btree.getRevision() + ", NbElems : " + btree.getNbElems() + ", root offset : "
1502                 + rootPageOffset );
1503         }
1504 
1505         flushPages( pageIos );
1506 
1507         nbUpdateBTreeHeader.incrementAndGet();
1508 
1509         if ( LOG_CHECK.isDebugEnabled() )
1510         {
1511             check();
1512         }
1513     }
1514 
1515 
1516     /**
1517      * Write the pages in the disk, either at the end of the file, or at
1518      * the position they were taken from.
1519      * 
1520      * @param pageIos The list of pages to write
1521      * @throws IOException If the write failed
1522      */
1523     private void flushPages( PageIO... pageIos ) throws IOException
1524     {
1525         if ( LOG.isDebugEnabled() )
1526         {
1527             for ( PageIO pageIo : pageIos )
1528             {
1529                 dump( pageIo );
1530             }
1531         }
1532 
1533         for ( PageIO pageIo : pageIos )
1534         {
1535             pageIo.getData().rewind();
1536 
1537             if ( fileChannel.size() < ( pageIo.getOffset() + pageSize ) )
1538             {
1539                 LOG.debug( "Adding a page at the end of the file" );
1540                 // This is a page we have to add to the file
1541                 fileChannel.write( pageIo.getData(), fileChannel.size() );
1542                 //fileChannel.force( false );
1543             }
1544             else
1545             {
1546                 LOG.debug( "Writing a page at position {}", pageIo.getOffset() );
1547                 fileChannel.write( pageIo.getData(), pageIo.getOffset() );
1548                 //fileChannel.force( false );
1549             }
1550 
1551             nbUpdatePageIOs.incrementAndGet();
1552 
1553             pageIo.getData().rewind();
1554         }
1555     }
1556 
1557 
1558     /**
1559      * Compute the page in which we will store data given an offset, when 
1560      * we have a list of pages.
1561      * 
1562      * @param offset The position in the data
1563      * @return The page number in which the offset will start
1564      */
1565     private int computePageNb( long offset )
1566     {
1567         long pageNb = 0;
1568 
1569         offset -= pageSize - LINK_SIZE - PAGE_SIZE;
1570 
1571         if ( offset < 0 )
1572         {
1573             return ( int ) pageNb;
1574         }
1575 
1576         pageNb = 1 + offset / ( pageSize - LINK_SIZE );
1577 
1578         return ( int ) pageNb;
1579     }
1580 
1581 
1582     /**
1583      * Stores a byte[] into one ore more pageIO (depending if the long is stored
1584      * across a boundary or not)
1585      * 
1586      * @param position The position in a virtual byte[] if all the pages were contiguous
1587      * @param bytes The byte[] to serialize
1588      * @param pageIos The pageIOs we have to store the data in
1589      * @return The new position
1590      */
1591     private long store( long position, byte[] bytes, PageIO... pageIos )
1592     {
1593         if ( bytes != null )
1594         {
1595             // Write the bytes length
1596             position = store( position, bytes.length, pageIos );
1597 
1598             // Compute the page in which we will store the data given the 
1599             // current position
1600             int pageNb = computePageNb( position );
1601 
1602             // Get back the buffer in this page
1603             ByteBuffer pageData = pageIos[pageNb].getData();
1604 
1605             // Compute the position in the current page
1606             int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
1607 
1608             // Compute the remaining size in the page
1609             int remaining = pageData.capacity() - pagePos;
1610             int nbStored = bytes.length;
1611 
1612             // And now, write the bytes until we have none
1613             while ( nbStored > 0 )
1614             {
1615                 if ( remaining > nbStored )
1616                 {
1617                     pageData.mark();
1618                     pageData.position( pagePos );
1619                     pageData.put( bytes, bytes.length - nbStored, nbStored );
1620                     pageData.reset();
1621                     nbStored = 0;
1622                 }
1623                 else
1624                 {
1625                     pageData.mark();
1626                     pageData.position( pagePos );
1627                     pageData.put( bytes, bytes.length - nbStored, remaining );
1628                     pageData.reset();
1629                     pageNb++;
1630                     pageData = pageIos[pageNb].getData();
1631                     pagePos = LINK_SIZE;
1632                     nbStored -= remaining;
1633                     remaining = pageData.capacity() - pagePos;
1634                 }
1635             }
1636 
1637             // We are done
1638             position += bytes.length;
1639         }
1640         else
1641         {
1642             // No bytes : write 0 and return
1643             position = store( position, 0, pageIos );
1644         }
1645 
1646         return position;
1647     }
1648 
1649 
1650     /**
1651      * Stores a byte[] into one ore more pageIO (depending if the long is stored
1652      * across a boundary or not). We don't add the byte[] size, it's already present
1653      * in the received byte[].
1654      * 
1655      * @param position The position in a virtual byte[] if all the pages were contiguous
1656      * @param bytes The byte[] to serialize
1657      * @param pageIos The pageIOs we have to store the data in
1658      * @return The new position
1659      */
1660     private long storeRaw( long position, byte[] bytes, PageIO... pageIos )
1661     {
1662         if ( bytes != null )
1663         {
1664             // Compute the page in which we will store the data given the 
1665             // current position
1666             int pageNb = computePageNb( position );
1667 
1668             // Get back the buffer in this page
1669             ByteBuffer pageData = pageIos[pageNb].getData();
1670 
1671             // Compute the position in the current page
1672             int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
1673 
1674             // Compute the remaining size in the page
1675             int remaining = pageData.capacity() - pagePos;
1676             int nbStored = bytes.length;
1677 
1678             // And now, write the bytes until we have none
1679             while ( nbStored > 0 )
1680             {
1681                 if ( remaining > nbStored )
1682                 {
1683                     pageData.mark();
1684                     pageData.position( pagePos );
1685                     pageData.put( bytes, bytes.length - nbStored, nbStored );
1686                     pageData.reset();
1687                     nbStored = 0;
1688                 }
1689                 else
1690                 {
1691                     pageData.mark();
1692                     pageData.position( pagePos );
1693                     pageData.put( bytes, bytes.length - nbStored, remaining );
1694                     pageData.reset();
1695                     pageNb++;
1696 
1697                     if ( pageNb == pageIos.length )
1698                     {
1699                         // We can stop here : we have reach the end of the page
1700                         break;
1701                     }
1702 
1703                     pageData = pageIos[pageNb].getData();
1704                     pagePos = LINK_SIZE;
1705                     nbStored -= remaining;
1706                     remaining = pageData.capacity() - pagePos;
1707                 }
1708             }
1709 
1710             // We are done
1711             position += bytes.length;
1712         }
1713         else
1714         {
1715             // No bytes : write 0 and return
1716             position = store( position, 0, pageIos );
1717         }
1718 
1719         return position;
1720     }
1721 
1722 
1723     /**
1724      * Stores an Integer into one ore more pageIO (depending if the int is stored
1725      * across a boundary or not)
1726      * 
1727      * @param position The position in a virtual byte[] if all the pages were contiguous
1728      * @param value The int to serialize
1729      * @param pageIos The pageIOs we have to store the data in
1730      * @return The new position
1731      */
1732     private long store( long position, int value, PageIO... pageIos )
1733     {
1734         // Compute the page in which we will store the data given the 
1735         // current position
1736         int pageNb = computePageNb( position );
1737 
1738         // Compute the position in the current page
1739         int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
1740 
1741         // Get back the buffer in this page
1742         ByteBuffer pageData = pageIos[pageNb].getData();
1743 
1744         // Compute the remaining size in the page
1745         int remaining = pageData.capacity() - pagePos;
1746 
1747         if ( remaining < INT_SIZE )
1748         {
1749             // We have to copy the serialized length on two pages
1750 
1751             switch ( remaining )
1752             {
1753                 case 3:
1754                     pageData.put( pagePos + 2, ( byte ) ( value >>> 8 ) );
1755                     // Fallthrough !!!
1756 
1757                 case 2:
1758                     pageData.put( pagePos + 1, ( byte ) ( value >>> 16 ) );
1759                     // Fallthrough !!!
1760 
1761                 case 1:
1762                     pageData.put( pagePos, ( byte ) ( value >>> 24 ) );
1763                     break;
1764             }
1765 
1766             // Now deal with the next page
1767             pageData = pageIos[pageNb + 1].getData();
1768             pagePos = LINK_SIZE;
1769 
1770             switch ( remaining )
1771             {
1772                 case 1:
1773                     pageData.put( pagePos, ( byte ) ( value >>> 16 ) );
1774                     // fallthrough !!!
1775 
1776                 case 2:
1777                     pageData.put( pagePos + 2 - remaining, ( byte ) ( value >>> 8 ) );
1778                     // fallthrough !!!
1779 
1780                 case 3:
1781                     pageData.put( pagePos + 3 - remaining, ( byte ) ( value ) );
1782                     break;
1783             }
1784         }
1785         else
1786         {
1787             // Store the value in the page at the selected position
1788             pageData.putInt( pagePos, value );
1789         }
1790 
1791         // Increment the position to reflect the addition of an Int (4 bytes)
1792         position += INT_SIZE;
1793 
1794         return position;
1795     }
1796 
1797 
1798     /**
1799      * Stores a Long into one ore more pageIO (depending if the long is stored
1800      * across a boundary or not)
1801      * 
1802      * @param position The position in a virtual byte[] if all the pages were contiguous
1803      * @param value The long to serialize
1804      * @param pageIos The pageIOs we have to store the data in
1805      * @return The new position
1806      */
1807     private long store( long position, long value, PageIO... pageIos )
1808     {
1809         // Compute the page in which we will store the data given the 
1810         // current position
1811         int pageNb = computePageNb( position );
1812 
1813         // Compute the position in the current page
1814         int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
1815 
1816         // Get back the buffer in this page
1817         ByteBuffer pageData = pageIos[pageNb].getData();
1818 
1819         // Compute the remaining size in the page
1820         int remaining = pageData.capacity() - pagePos;
1821 
1822         if ( remaining < LONG_SIZE )
1823         {
1824             // We have to copy the serialized length on two pages
1825 
1826             switch ( remaining )
1827             {
1828                 case 7:
1829                     pageData.put( pagePos + 6, ( byte ) ( value >>> 8 ) );
1830                     // Fallthrough !!!
1831 
1832                 case 6:
1833                     pageData.put( pagePos + 5, ( byte ) ( value >>> 16 ) );
1834                     // Fallthrough !!!
1835 
1836                 case 5:
1837                     pageData.put( pagePos + 4, ( byte ) ( value >>> 24 ) );
1838                     // Fallthrough !!!
1839 
1840                 case 4:
1841                     pageData.put( pagePos + 3, ( byte ) ( value >>> 32 ) );
1842                     // Fallthrough !!!
1843 
1844                 case 3:
1845                     pageData.put( pagePos + 2, ( byte ) ( value >>> 40 ) );
1846                     // Fallthrough !!!
1847 
1848                 case 2:
1849                     pageData.put( pagePos + 1, ( byte ) ( value >>> 48 ) );
1850                     // Fallthrough !!!
1851 
1852                 case 1:
1853                     pageData.put( pagePos, ( byte ) ( value >>> 56 ) );
1854                     break;
1855             }
1856 
1857             // Now deal with the next page
1858             pageData = pageIos[pageNb + 1].getData();
1859             pagePos = LINK_SIZE;
1860 
1861             switch ( remaining )
1862             {
1863                 case 1:
1864                     pageData.put( pagePos, ( byte ) ( value >>> 48 ) );
1865                     // fallthrough !!!
1866 
1867                 case 2:
1868                     pageData.put( pagePos + 2 - remaining, ( byte ) ( value >>> 40 ) );
1869                     // fallthrough !!!
1870 
1871                 case 3:
1872                     pageData.put( pagePos + 3 - remaining, ( byte ) ( value >>> 32 ) );
1873                     // fallthrough !!!
1874 
1875                 case 4:
1876                     pageData.put( pagePos + 4 - remaining, ( byte ) ( value >>> 24 ) );
1877                     // fallthrough !!!
1878 
1879                 case 5:
1880                     pageData.put( pagePos + 5 - remaining, ( byte ) ( value >>> 16 ) );
1881                     // fallthrough !!!
1882 
1883                 case 6:
1884                     pageData.put( pagePos + 6 - remaining, ( byte ) ( value >>> 8 ) );
1885                     // fallthrough !!!
1886 
1887                 case 7:
1888                     pageData.put( pagePos + 7 - remaining, ( byte ) ( value ) );
1889                     break;
1890             }
1891         }
1892         else
1893         {
1894             // Store the value in the page at the selected position
1895             pageData.putLong( pagePos, value );
1896         }
1897 
1898         // Increment the position to reflect the addition of an Long (8 bytes)
1899         position += LONG_SIZE;
1900 
1901         return position;
1902     }
1903 
1904 
1905     /**
1906      * Stores a new page on disk. We will add the modified page into the tree of copied pages.
1907      * The new page is serialized and saved on disk.
1908      * 
1909      * @param oldPage
1910      * @param oldRevision
1911      * @param newPage
1912      * @param newRevision
1913      * @return The offset of the new page
1914      * @throws IOException 
1915      */
1916     /* No qualifier*/<K, V> PageHolder<K, V> writePage( BTree<K, V> btree, Page<K, V> newPage,
1917         long newRevision )
1918         throws IOException
1919     {
1920         // We first need to save the new page on disk
1921         PageIO[] pageIos = serializePage( btree, newRevision, newPage );
1922         
1923         LOG.debug( "Write data for '{}' btree ", btree.getName() );
1924         
1925         // Write the page on disk
1926         flushPages( pageIos );
1927 
1928         // Build the resulting reference
1929         long offset = pageIos[0].getOffset();
1930         long lastOffset = pageIos[pageIos.length - 1].getOffset();
1931         PageHolder<K, V> valueHolder = new PageHolder<K, V>( btree, newPage, offset,
1932             lastOffset );
1933 
1934         if ( LOG_CHECK.isDebugEnabled() )
1935         {
1936             check();
1937         }
1938 
1939         return valueHolder;
1940     }
1941 
1942 
1943     /**
1944      * Compute the number of pages needed to store some specific size of data.
1945      * 
1946      * @param dataSize The size of the data we want to store in pages
1947      * @return The number of pages needed
1948      */
1949     private int computeNbPages( int dataSize )
1950     {
1951         if ( dataSize <= 0 )
1952         {
1953             return 0;
1954         }
1955 
1956         // Compute the number of pages needed.
1957         // Considering that each page can contain PageSize bytes,
1958         // but that the first 8 bytes are used for links and we 
1959         // use 4 bytes to store the data size, the number of needed
1960         // pages is :
1961         // NbPages = ( (dataSize - (PageSize - 8 - 4 )) / (PageSize - 8) ) + 1 
1962         // NbPages += ( if (dataSize - (PageSize - 8 - 4 )) % (PageSize - 8) > 0 : 1 : 0 )
1963         int availableSize = ( pageSize - LONG_SIZE );
1964         int nbNeededPages = 1;
1965 
1966         // Compute the number of pages that will be full but the first page
1967         if ( dataSize > availableSize - INT_SIZE )
1968         {
1969             int remainingSize = dataSize - ( availableSize - INT_SIZE );
1970             nbNeededPages += remainingSize / availableSize;
1971             int remain = remainingSize % availableSize;
1972 
1973             if ( remain > 0 )
1974             {
1975                 nbNeededPages++;
1976             }
1977         }
1978 
1979         return nbNeededPages;
1980     }
1981 
1982 
1983     /**
1984      * Get as many pages as needed to store the data of the given size
1985      *  
1986      * @param dataSize The data size
1987      * @return An array of pages, enough to store the full data
1988      */
1989     private PageIO[] getFreePageIOs( int dataSize ) throws IOException
1990     {
1991         if ( dataSize == 0 )
1992         {
1993             return new PageIO[]
1994                 {};
1995         }
1996 
1997         int nbNeededPages = computeNbPages( dataSize );
1998 
1999         PageIO[] pageIOs = new PageIO[nbNeededPages];
2000 
2001         // The first page : set the size
2002         pageIOs[0] = fetchNewPage();
2003         pageIOs[0].setSize( dataSize );
2004 
2005         for ( int i = 1; i < nbNeededPages; i++ )
2006         {
2007             pageIOs[i] = fetchNewPage();
2008 
2009             // Create the link
2010             pageIOs[i - 1].setNextPage( pageIOs[i].getOffset() );
2011         }
2012 
2013         return pageIOs;
2014     }
2015 
2016 
2017     /**
2018      * Return a new Page. We take one of the existing free pages, or we create
2019      * a new page at the end of the file.
2020      * 
2021      * @return The fetched PageIO
2022      */
2023     private PageIO fetchNewPage() throws IOException
2024     {
2025         if ( firstFreePage == NO_PAGE )
2026         {
2027             nbCreatedPages.incrementAndGet();
2028 
2029             // We don't have any free page. Reclaim some new page at the end
2030             // of the file
2031             PageIO newPage = new PageIO( endOfFileOffset );
2032 
2033             endOfFileOffset += pageSize;
2034 
2035             ByteBuffer data = ByteBuffer.allocateDirect( pageSize );
2036 
2037             newPage.setData( data );
2038             newPage.setNextPage( NO_PAGE );
2039             newPage.setSize( 0 );
2040 
2041             LOG.debug( "Requiring a new page at offset {}", newPage.getOffset() );
2042 
2043             return newPage;
2044         }
2045         else
2046         {
2047             nbReusedPages.incrementAndGet();
2048 
2049             // We have some existing free page. Fetch it from disk
2050             PageIO pageIo = fetchPage( firstFreePage );
2051 
2052             // Update the firstFreePage pointer
2053             firstFreePage = pageIo.getNextPage();
2054 
2055             // overwrite the data of old page
2056             ByteBuffer data = ByteBuffer.allocateDirect( pageSize );
2057             pageIo.setData( data );
2058 
2059             pageIo.setNextPage( NO_PAGE );
2060             pageIo.setSize( 0 );
2061 
2062             LOG.debug( "Reused page at offset {}", pageIo.getOffset() );
2063 
2064             // If we don't have any more free page, update the last free page pointer too
2065             if ( firstFreePage == NO_PAGE )
2066             {
2067                 lastFreePage = NO_PAGE;
2068             }
2069 
2070             // Update the header
2071             updateRecordManagerHeader();
2072 
2073             return pageIo;
2074         }
2075     }
2076 
2077 
2078     /**
2079      * fetch a page from disk, knowing its position in the file.
2080      * 
2081      * @param offset The position in the file
2082      * @return The found page
2083      */
2084     private PageIO fetchPage( long offset ) throws IOException, EndOfFileExceededException
2085     {
2086         if ( fileChannel.size() < offset + pageSize )
2087         {
2088             // Error : we are past the end of the file
2089             throw new EndOfFileExceededException( "We are fetching a page on " + offset +
2090                 " when the file's size is " + fileChannel.size() );
2091         }
2092         else
2093         {
2094             // Read the page
2095             fileChannel.position( offset );
2096 
2097             ByteBuffer data = ByteBuffer.allocate( pageSize );
2098             fileChannel.read( data );
2099             data.rewind();
2100 
2101             PageIO readPage = new PageIO( offset );
2102             readPage.setData( data );
2103 
2104             return readPage;
2105         }
2106     }
2107 
2108 
2109     /**
2110      * @return the pageSize
2111      */
2112     public int getPageSize()
2113     {
2114         return pageSize;
2115     }
2116 
2117 
2118     public void setPageSize( int pageSize )
2119     {
2120         if ( this.pageSize != -1 )
2121         {
2122         }
2123         else
2124         {
2125             this.pageSize = pageSize;
2126         }
2127     }
2128 
2129 
2130     /**
2131      * Close the RecordManager and flush everything on disk
2132      */
2133     public void close() throws IOException
2134     {
2135         // TODO : we must wait for the last write to finish
2136 
2137         for ( BTree<Object, Object> tree : managedBTrees.values() )
2138         {
2139             tree.close();
2140         }
2141 
2142         managedBTrees.clear();
2143 
2144         // Write the data
2145         fileChannel.force( true );
2146 
2147         // And close the channel
2148         fileChannel.close();
2149     }
2150     
2151     
2152     /** Hex chars */
2153     private static final byte[] HEX_CHAR = new byte[]
2154         { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' };
2155 
2156     
2157     public static String dump( byte octet )
2158     {
2159         return new String( new byte[]
2160             { HEX_CHAR[( octet & 0x00F0 ) >> 4], HEX_CHAR[octet & 0x000F] } );
2161     }
2162 
2163     /**
2164      * Dump a pageIO
2165      */
2166     private void dump( PageIO pageIo )
2167     {
2168         ByteBuffer buffer = pageIo.getData();
2169         buffer.mark();
2170         byte[] longBuffer = new byte[LONG_SIZE];
2171         byte[] intBuffer = new byte[INT_SIZE]; 
2172         
2173         // get the next page offset
2174         buffer.get( longBuffer );
2175         long nextOffset = LongSerializer.deserialize( longBuffer );
2176         
2177         // Get the data size 
2178         buffer.get( intBuffer );
2179         int size = IntSerializer.deserialize( intBuffer );
2180         
2181         buffer.reset();
2182         
2183         System.out.println( "PageIO[" + Long.toHexString( pageIo.getOffset() ) + "], size = " + size + ", NEXT PageIO:" + Long.toHexString( nextOffset ) );
2184         System.out.println( " 0  1  2  3  4  5  6  7  8  9  A  B  C  D  E  F " );
2185         System.out.println( "+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+" );
2186         
2187         int position = buffer.position();
2188         
2189         for ( int i = 0; i < buffer.limit(); i+= 16 )
2190         {
2191             System.out.print( "|" );
2192             
2193             for ( int j = 0; j < 16; j++ )
2194             {
2195                 System.out.print( dump( buffer.get() ) );
2196                 
2197                 if ( j == 15 )
2198                 {
2199                     System.out.println( "|" );
2200                 }
2201                 else
2202                 {
2203                     System.out.print( " " );
2204                 }
2205             }
2206         }
2207         
2208         System.out.println( "+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+" );
2209 
2210         buffer.reset();
2211     }
2212 
2213 
2214     /**
2215      * Dump the RecordManager file
2216      * @throws IOException 
2217      */
2218     public void dump() throws IOException
2219     {
2220         RandomAccessFile randomFile = new RandomAccessFile( file, "r" );
2221         FileChannel fileChannel = randomFile.getChannel();
2222 
2223         ByteBuffer header = ByteBuffer.allocate( HEADER_SIZE );
2224 
2225         // load the header 
2226         fileChannel.read( header );
2227 
2228         header.rewind();
2229 
2230         // The page size
2231         int pageSize = header.getInt();
2232 
2233         // The number of managed BTrees
2234         int nbBTree = header.getInt();
2235 
2236         // The first and last free page
2237         long firstFreePage = header.getLong();
2238         long lastFreePage = header.getLong();
2239 
2240         if ( LOG.isDebugEnabled() )
2241         {
2242             LOG.debug( "RecordManager" );
2243             LOG.debug( "-------------" );
2244             LOG.debug( "  Header " );
2245             LOG.debug( "    '{}'", Strings.dumpBytes( header.array() ) );
2246             LOG.debug( "    page size : {}", pageSize );
2247             LOG.debug( "    nbTree : {}", nbBTree );
2248             LOG.debug( "    firstFreePage : {}", firstFreePage );
2249             LOG.debug( "    lastFreePage : {}", lastFreePage );
2250         }
2251 
2252         long position = HEADER_SIZE;
2253 
2254         // Dump the BTrees
2255         for ( int i = 0; i < nbBTree; i++ )
2256         {
2257             LOG.debug( "  Btree[{}]", i );
2258             PageIO[] pageIos = readPageIOs( position, Long.MAX_VALUE );
2259 
2260             for ( PageIO pageIo : pageIos )
2261             {
2262                 LOG.debug( "    {}", pageIo );
2263             }
2264         }
2265 
2266         randomFile.close();
2267     }
2268 
2269 
2270     /**
2271      * Get the number of managed trees. We don't count the CopiedPage BTree. and the Revsion BTree
2272      * 
2273      * @return The number of managed BTrees
2274      */
2275     public int getNbManagedTrees()
2276     {
2277         return nbBtree - 2;
2278     }
2279 
2280 
2281     /**
2282      * Get the managed trees. We don't return the CopiedPage BTree nor the Revision BTree.
2283      * 
2284      * @return The managed BTrees
2285      */
2286     public Set<String> getManagedTrees()
2287     {
2288         Set<String> btrees = new HashSet<String>( managedBTrees.keySet() );
2289 
2290         btrees.remove( COPIED_PAGE_BTREE_NAME );
2291         btrees.remove( REVISION_BTREE_NAME );
2292 
2293         return btrees;
2294     }
2295 
2296 
2297     /**
2298      * Store a reference to an old rootPage into the Revision BTree
2299      * 
2300      * @param btree The BTree we want to keep an old RootPage for
2301      * @param rootPage The old rootPage
2302      * @throws IOException If we have an issue while writing on disk
2303      */
2304     /* No qualifier */<K, V> void storeRootPage( BTree<K, V> btree, Page<K, V> rootPage ) throws IOException
2305     {
2306         if ( !isKeepRevisions() )
2307         {
2308             return;
2309         }
2310 
2311         if ( ( btree == copiedPageBTree ) || ( btree == revisionBTree ) )
2312         {
2313             return;
2314         }
2315 
2316         RevisionName revisionName = new RevisionName( rootPage.getRevision(), btree.getName() );
2317 
2318         revisionBTree.insert( revisionName, rootPage.getOffset(), 0 );
2319 
2320         if ( LOG_CHECK.isDebugEnabled() )
2321         {
2322             check();
2323         }
2324     }
2325 
2326 
2327     /**
2328      * Fetch the rootPage of a given BTree for a given revision.
2329      * 
2330      * @param btree The BTree we are interested in
2331      * @param revision The revision we want to get back
2332      * @return The rootPage for this BTree and this revision, if any
2333      * @throws KeyNotFoundException If we can't find the rootPage for this revision and this BTree
2334      * @throws IOException If we had an ise while accessing the data on disk
2335      */
2336     /* No qualifier */<K, V> Page<K, V> getRootPage( BTree<K, V> btree, long revision ) throws KeyNotFoundException,
2337         IOException
2338     {
2339         if ( btree.getRevision() == revision )
2340         {
2341             // We are asking for the current revision
2342             return btree.rootPage;
2343         }
2344 
2345         RevisionName revisionName = new RevisionName( revision, btree.getName() );
2346         long rootPageOffset = revisionBTree.get( revisionName );
2347 
2348         // Read the rootPage pages on disk
2349         PageIO[] rootPageIos = readPageIOs( rootPageOffset, Long.MAX_VALUE );
2350 
2351         Page<K, V> btreeRoot = readPage( btree, rootPageIos );
2352 
2353         return btreeRoot;
2354     }
2355 
2356 
2357     /**
2358      * Get one managed trees, knowing its name. 
2359      * 
2360      * @return The managed BTrees
2361      */
2362     public <K, V> BTree<K, V> getManagedTree( String name )
2363     {
2364         return ( BTree<K, V> ) managedBTrees.get( name );
2365     }
2366 
2367 
2368     /**
2369      * Move a list of pages to the free page list. A logical page is associated with on 
2370      * or physical PageIO, which are on the disk. We have to move all those PagIO instance
2371      * to the free list, and do the same in memory (we try to keep a reference to a set of 
2372      * free pages.
2373      *  
2374      * @param btree The BTree which were owning the pages
2375      * @param pages The pages to free
2376      * @throws IOException 
2377      * @throws EndOfFileExceededException 
2378      */
2379     /* Package protected */<K, V> void addFreePages( BTree<K, V> btree, List<Page<K, V>> pages )
2380         throws EndOfFileExceededException,
2381         IOException
2382     {
2383         if ( ( btree == copiedPageBTree ) || ( btree == revisionBTree ) )
2384         {
2385             return;
2386         }
2387 
2388         if ( ( pages == null ) || pages.isEmpty() )
2389         {
2390             return;
2391         }
2392 
2393         if ( !keepRevisions )
2394         {
2395             // if the btree doesn't keep revisions, we can safely move
2396             // the pages to the free page list.
2397             // NOTE : potential improvement : we can update the header only when
2398             // we have processed all the logical pages.
2399 
2400             for ( Page<K, V> page : pages )
2401             {
2402                 // Retrieve all the PageIO associated with this logical page
2403                 long firstOffset = page.getOffset();
2404 
2405                 // skip the page with offset 0, this is the first in-memory root page that
2406                 // was copied during first insert in a BTree.
2407                 // a Node or Leaf will *never* have 0 or -1 as its offset 
2408                 if ( firstOffset == NO_PAGE )
2409                 {
2410                     continue;
2411                 }
2412 
2413                 long lastOffset = page.getLastOffset();
2414 
2415                 // Update the pointers
2416                 if ( firstFreePage == NO_PAGE )
2417                 {
2418                     // We don't have yet any free pageIos. The
2419                     // PageIOs for this Page will be used
2420                     firstFreePage = firstOffset;
2421                 }
2422                 else
2423                 {
2424                     // We add the Page's PageIOs before the 
2425                     // existing free pages.
2426                     long offset = page.getLastOffset();
2427 
2428                     if ( offset == NO_PAGE )
2429                     {
2430                         offset = page.getOffset();
2431                     }
2432 
2433                     // Fetch the pageIO
2434                     PageIO pageIo = fetchPage( offset );
2435 
2436                     // Link it to the first free page
2437                     pageIo.setNextPage( firstFreePage );
2438 
2439                     LOG.debug( "Flushing the first free page" );
2440 
2441                     // And flush it to disk
2442                     flushPages( pageIo );
2443 
2444                     // We can update the lastFreePage offset 
2445                     firstFreePage = firstOffset;
2446                 }
2447             }
2448 
2449             // Last, not least, flush the header
2450             updateRecordManagerHeader();
2451         }
2452         else
2453         {
2454             LOG.debug( "We should not get there" );
2455 
2456             for ( Page<K, V> p : pages )
2457             {
2458                 addFreePage( btree, p );
2459             }
2460         }
2461     }
2462 
2463 
2464     /**
2465      * 
2466      * TODO addFreePage.
2467      *
2468      * @param btree
2469      * @param freePage
2470      */
2471     private <K, V> void addFreePage( BTree<K, V> btree, Page<K, V> freePage )
2472     {
2473         try
2474         {
2475             RevisionName revision = new RevisionName( freePage.getRevision(), btree.getName() );
2476             long[] offsetArray = null;
2477 
2478             if ( copiedPageBTree.hasKey( revision ) )
2479             {
2480                 offsetArray = copiedPageBTree.get( revision );
2481                 long[] tmp = new long[offsetArray.length + 1];
2482                 System.arraycopy( offsetArray, 0, tmp, 0, offsetArray.length );
2483                 offsetArray = tmp;
2484             }
2485             else
2486             {
2487                 offsetArray = new long[1];
2488             }
2489 
2490             offsetArray[offsetArray.length - 1] = freePage.getOffset();
2491 
2492             copiedPageBTree.insert( revision, offsetArray, 0 );
2493         }
2494         catch ( Exception e )
2495         {
2496             throw new RuntimeException( e );
2497         }
2498     }
2499 
2500 
2501     /**
2502      * @return the keepRevisions
2503      */
2504     public boolean isKeepRevisions()
2505     {
2506         return keepRevisions;
2507     }
2508 
2509 
2510     /**
2511      * @param keepRevisions the keepRevisions to set
2512      */
2513     public void setKeepRevisions( boolean keepRevisions )
2514     {
2515         this.keepRevisions = keepRevisions;
2516     }
2517 
2518 
2519     /**
2520      * Creates a BTree and automatically adds it to the list of managed btrees
2521      * 
2522      * @param name the name of the BTree
2523      * @param keySerializer key serializer
2524      * @param valueSerializer value serializer
2525      * @param allowDuplicates flag for allowing duplicate keys 
2526      * @return a managed BTree
2527      * @throws IOException
2528      * @throws BTreeAlreadyManagedException
2529      */
2530     @SuppressWarnings("all")
2531     public <K, V> BTree<K, V> addBTree( String name, ElementSerializer<K> keySerializer,
2532         ElementSerializer<V> valueSerializer,
2533         boolean allowDuplicates ) throws IOException, BTreeAlreadyManagedException
2534     {
2535         BTreeConfiguration config = new BTreeConfiguration();
2536 
2537         config.setName( name );
2538         config.setKeySerializer( keySerializer );
2539         config.setValueSerializer( valueSerializer );
2540         config.setAllowDuplicates( allowDuplicates );
2541 
2542         BTree btree = new BTree( config );
2543         manage( btree );
2544 
2545         if ( LOG_CHECK.isDebugEnabled() )
2546         {
2547             check();
2548         }
2549 
2550         return btree;
2551     }
2552 
2553 
2554     private void setCheckedPage( long[] checkedPages, long offset, int pageSize )
2555     {
2556         long pageOffset = ( offset - HEADER_SIZE ) / pageSize;
2557         int index = ( int ) ( pageOffset / 64L );
2558         long mask = ( 1L << ( pageOffset % 64L ) );
2559         long bits = checkedPages[index];
2560 
2561         if ( ( bits & mask ) == 1 )
2562         {
2563             throw new RuntimeException( "The page at : " + offset + " has already been checked" );
2564         }
2565 
2566         checkedPages[index] |= mask;
2567 
2568     }
2569 
2570 
2571     /**
2572      * Check the free pages
2573      * 
2574      * @param checkedPages
2575      * @throws IOException 
2576      */
2577     private void checkFreePages( long[] checkedPages, int pageSize, long firstFreePage, long lastFreePage )
2578         throws IOException
2579     {
2580         if ( firstFreePage == NO_PAGE )
2581         {
2582             if ( lastFreePage == NO_PAGE )
2583             {
2584                 return;
2585             }
2586             else
2587             {
2588                 throw new RuntimeException( "Wrong last free page : " + lastFreePage );
2589             }
2590         }
2591 
2592         if ( lastFreePage != NO_PAGE )
2593         {
2594             throw new RuntimeException( "Wrong last free page : " + lastFreePage );
2595         }
2596 
2597         // Now, read all the free pages
2598         long currentOffset = firstFreePage;
2599         long fileSize = fileChannel.size();
2600 
2601         while ( currentOffset != NO_PAGE )
2602         {
2603             if ( currentOffset > fileSize )
2604             {
2605                 throw new RuntimeException( "Wrong free page offset, above file size : " + currentOffset );
2606             }
2607 
2608             try
2609             {
2610                 PageIO pageIo = fetchPage( currentOffset );
2611 
2612                 if ( currentOffset != pageIo.getOffset() )
2613                 {
2614                     throw new RuntimeException( "PageIO offset is incorrect : " + currentOffset + "-"
2615                         + pageIo.getOffset() );
2616                 }
2617 
2618                 setCheckedPage( checkedPages, currentOffset, pageSize );
2619 
2620                 long newOffset = pageIo.getNextPage();
2621                 currentOffset = newOffset;
2622             }
2623             catch ( IOException ioe )
2624             {
2625                 throw new RuntimeException( "Cannot fetch page at : " + currentOffset );
2626             }
2627         }
2628     }
2629 
2630 
2631     /**
2632      * Check the root page for a given BTree
2633      * @throws IOException 
2634      * @throws EndOfFileExceededException 
2635      */
2636     private void checkRoot( long[] checkedPages, long offset, int pageSize, long nbBTreeElems,
2637         ElementSerializer keySerializer, ElementSerializer valueSerializer, boolean allowDuplicates )
2638         throws EndOfFileExceededException, IOException
2639     {
2640         // Read the rootPage pages on disk
2641         PageIO[] rootPageIos = readPageIOs( offset, Long.MAX_VALUE );
2642 
2643         // Deserialize the rootPage now
2644         long position = 0L;
2645 
2646         // The revision
2647         long revision = readLong( rootPageIos, position );
2648         position += LONG_SIZE;
2649 
2650         // The number of elements in the page
2651         int nbElems = readInt( rootPageIos, position );
2652         position += INT_SIZE;
2653 
2654         // The size of the data containing the keys and values
2655         ByteBuffer byteBuffer = null;
2656 
2657         // Reads the bytes containing all the keys and values, if we have some
2658         ByteBuffer data = readBytes( rootPageIos, position );
2659 
2660         if ( nbElems >= 0 )
2661         {
2662             // Its a leaf
2663 
2664             // Check the page offset
2665             long pageOffset = rootPageIos[0].getOffset();
2666 
2667             if ( ( pageOffset < 0 ) || ( pageOffset > fileChannel.size() ) )
2668             {
2669                 throw new RuntimeException( "The page offset is incorrect : " + pageOffset );
2670             }
2671 
2672             // Check the page last offset
2673             long pageLastOffset = rootPageIos[rootPageIos.length - 1].getOffset();
2674 
2675             if ( ( pageLastOffset <= 0 ) || ( pageLastOffset > fileChannel.size() ) )
2676             {
2677                 throw new RuntimeException( "The page last offset is incorrect : " + pageLastOffset );
2678             }
2679 
2680             // Read each value and key
2681             for ( int i = 0; i < nbElems; i++ )
2682             {
2683                 // Just deserialize all the keys and values
2684                 if ( allowDuplicates )
2685                 {
2686                     /*
2687                     long value = OFFSET_SERIALIZER.deserialize( byteBuffer );
2688 
2689                     rootPageIos = readPageIOs( value, Long.MAX_VALUE );
2690 
2691                     BTree dupValueContainer = BTreeFactory.createBTree();
2692                     dupValueContainer.setBtreeOffset( value );
2693 
2694                     try
2695                     {
2696                         loadBTree( pageIos, dupValueContainer );
2697                     }
2698                     catch ( Exception e )
2699                     {
2700                         // should not happen
2701                         throw new RuntimeException( e );
2702                     }
2703                     */
2704                 }
2705                 else
2706                 {
2707                     valueSerializer.deserialize( byteBuffer );
2708                 }
2709 
2710                 keySerializer.deserialize( byteBuffer );
2711             }
2712         }
2713         else
2714         {
2715             /*
2716             // It's a node
2717             int nodeNbElems = -nbElems;
2718 
2719             // Read each value and key
2720             for ( int i = 0; i < nodeNbElems; i++ )
2721             {
2722                 // This is an Offset
2723                 long offset = OFFSET_SERIALIZER.deserialize( byteBuffer );
2724                 long lastOffset = OFFSET_SERIALIZER.deserialize( byteBuffer );
2725 
2726                 ElementHolder valueHolder = new ReferenceHolder( btree, null, offset, lastOffset );
2727                 ( ( Node ) page ).setValue( i, valueHolder );
2728 
2729                 Object key = btree.getKeySerializer().deserialize( byteBuffer );
2730                 BTreeFactory.setKey( page, i, key );
2731             }
2732 
2733             // and read the last value, as it's a node
2734             long offset = OFFSET_SERIALIZER.deserialize( byteBuffer );
2735             long lastOffset = OFFSET_SERIALIZER.deserialize( byteBuffer );
2736 
2737             ElementHolder valueHolder = new ReferenceHolder( btree, null, offset, lastOffset );
2738             ( ( Node ) page ).setValue( nodeNbElems, valueHolder );*/
2739         }
2740     }
2741 
2742 
2743     /**
2744      * Check a BTree
2745      * @throws IllegalAccessException 
2746      * @throws InstantiationException 
2747      * @throws ClassNotFoundException 
2748      */
2749     private long checkBTree( long[] checkedPages, PageIO[] pageIos, int pageSize, boolean isLast )
2750         throws EndOfFileExceededException, IOException, InstantiationException, IllegalAccessException,
2751         ClassNotFoundException
2752     {
2753         long dataPos = 0L;
2754 
2755         // The BTree current revision
2756         long revision = readLong( pageIos, dataPos );
2757         dataPos += LONG_SIZE;
2758 
2759         // The nb elems in the tree
2760         long nbElems = readLong( pageIos, dataPos );
2761         dataPos += LONG_SIZE;
2762 
2763         // The BTree rootPage offset
2764         long rootPageOffset = readLong( pageIos, dataPos );
2765 
2766         if ( ( rootPageOffset < 0 ) || ( rootPageOffset > fileChannel.size() ) )
2767         {
2768             throw new RuntimeException( "The rootpage is incorrect : " + rootPageOffset );
2769         }
2770 
2771         dataPos += LONG_SIZE;
2772 
2773         // The next BTree offset
2774         long nextBTreeOffset = readLong( pageIos, dataPos );
2775 
2776         if ( ( ( rootPageOffset < 0 ) && ( !isLast ) ) || ( nextBTreeOffset > fileChannel.size() ) )
2777         {
2778             throw new RuntimeException( "The rootpage is incorrect : " + rootPageOffset );
2779         }
2780 
2781         dataPos += LONG_SIZE;
2782 
2783         // The BTree page size
2784         int btreePageSize = readInt( pageIos, dataPos );
2785 
2786         if ( ( btreePageSize < 2 ) || ( ( btreePageSize & ( ~btreePageSize + 1 ) ) != btreePageSize ) )
2787         {
2788             throw new RuntimeException( "The BTree page size is not a power of 2 : " + btreePageSize );
2789         }
2790 
2791         dataPos += INT_SIZE;
2792 
2793         // The tree name
2794         ByteBuffer btreeNameBytes = readBytes( pageIos, dataPos );
2795         dataPos += INT_SIZE;
2796 
2797         dataPos += btreeNameBytes.limit();
2798         String btreeName = Strings.utf8ToString( btreeNameBytes );
2799 
2800         // The keySerializer FQCN
2801         ByteBuffer keySerializerBytes = readBytes( pageIos, dataPos );
2802 
2803         String keySerializerFqcn = null;
2804         dataPos += INT_SIZE;
2805 
2806         if ( keySerializerBytes != null )
2807         {
2808             dataPos += keySerializerBytes.limit();
2809             keySerializerFqcn = Strings.utf8ToString( keySerializerBytes );
2810         }
2811         else
2812         {
2813             keySerializerFqcn = "";
2814         }
2815 
2816         // The valueSerialier FQCN
2817         ByteBuffer valueSerializerBytes = readBytes( pageIos, dataPos );
2818 
2819         String valueSerializerFqcn = null;
2820         dataPos += INT_SIZE;
2821 
2822         if ( valueSerializerBytes != null )
2823         {
2824             dataPos += valueSerializerBytes.limit();
2825             valueSerializerFqcn = Strings.utf8ToString( valueSerializerBytes );
2826         }
2827         else
2828         {
2829             valueSerializerFqcn = "";
2830         }
2831 
2832         // The BTree allowDuplicates flag
2833         int allowDuplicates = readInt( pageIos, dataPos );
2834         dataPos += INT_SIZE;
2835 
2836         // Now, check the rootPage, which can be a Leaf or a Node, depending 
2837         // on the number of elements in the tree : if it's above the pageSize,
2838         // it's a Node, otherwise it's a Leaf
2839         Class<?> valueSerializer = Class.forName( valueSerializerFqcn );
2840         Class<?> keySerializer = Class.forName( keySerializerFqcn );
2841 
2842         /*
2843         checkRoot( checkedPages, rootPageOffset, pageSize, nbElems,
2844             ( ElementSerializer<?> ) keySerializer.newInstance(),
2845             ( ElementSerializer<?> ) valueSerializer.newInstance(), allowDuplicates != 0 );
2846         */
2847 
2848         return nextBTreeOffset;
2849     }
2850 
2851 
2852     /**
2853      * Check each BTree we manage
2854      * @throws IOException 
2855      * @throws EndOfFileExceededException 
2856      * @throws ClassNotFoundException 
2857      * @throws IllegalAccessException 
2858      * @throws InstantiationException 
2859      */
2860     private void checkBTrees( long[] checkedPages, int pageSize, int nbBTrees ) throws EndOfFileExceededException,
2861         IOException, InstantiationException, IllegalAccessException, ClassNotFoundException
2862     {
2863         // Iterate on each BTree until we have exhausted all of them. The number
2864         // of btrees is just used to check that we have the correct number
2865         // of stored BTrees, as they are all linked.
2866         long position = HEADER_SIZE;
2867 
2868         for ( int i = 0; i < nbBTrees; i++ )
2869         {
2870             // Load the pageIOs containing the BTree
2871             PageIO[] pageIos = readPageIOs( position, Long.MAX_VALUE );
2872 
2873             // Check that they are correctly linked and not already used
2874             int pageNb = 0;
2875 
2876             for ( PageIO currentPageIo : pageIos )
2877             {
2878                 // 
2879                 long nextPageOffset = currentPageIo.getNextPage();
2880 
2881                 if ( pageNb == pageIos.length - 1 )
2882                 {
2883                     if ( nextPageOffset != NO_PAGE )
2884                     {
2885                         throw new RuntimeException( "The pointer to the next page is not valid, expected NO_PAGE" );
2886                     }
2887                 }
2888                 else
2889                 {
2890                     if ( nextPageOffset == NO_PAGE )
2891                     {
2892                         throw new RuntimeException( "The pointer to the next page is not valid, NO_PAGE" );
2893                     }
2894                 }
2895 
2896                 if ( ( nextPageOffset != NO_PAGE ) && ( ( nextPageOffset - HEADER_SIZE ) % pageSize != 0 ) )
2897                 {
2898                     throw new RuntimeException( "The pointer to the next page is not valid" );
2899                 }
2900 
2901                 // Update the array of processed pages
2902                 setCheckedPage( checkedPages, currentPageIo.getOffset(), pageSize );
2903             }
2904 
2905             // Now check the BTree
2906             long nextBTree = checkBTree( checkedPages, pageIos, pageSize, i == nbBTrees - 1 );
2907 
2908             if ( ( nextBTree == NO_PAGE ) && ( i < nbBTrees - 1 ) )
2909             {
2910                 throw new RuntimeException( "The pointer to the next BTree is incorrect" );
2911             }
2912 
2913             position = nextBTree;
2914         }
2915     }
2916 
2917 
2918     /**
2919      * Check the whole file
2920      */
2921     private void check()
2922     {
2923         try
2924         {
2925             // First check the header
2926             ByteBuffer header = ByteBuffer.allocate( HEADER_SIZE );
2927             long fileSize = fileChannel.size();
2928 
2929             if ( fileSize < HEADER_SIZE )
2930             {
2931                 throw new RuntimeException( "File size too small : " + fileSize );
2932             }
2933 
2934             // Read the header
2935             fileChannel.read( header, 0L );
2936             header.flip();
2937 
2938             // The page size. It must be a power of 2, and above 16.
2939             int pageSize = header.getInt();
2940 
2941             if ( ( pageSize < 0 ) || ( pageSize < 32 ) || ( ( pageSize & ( ~pageSize + 1 ) ) != pageSize ) )
2942             {
2943                 throw new RuntimeException( "Wrong page size : " + pageSize );
2944             }
2945 
2946             // Compute the number of pages in this file
2947             long nbPages = ( fileSize - HEADER_SIZE ) / pageSize;
2948 
2949             // The number of trees. It must be at least 2 and > 0
2950             int nbBTrees = header.getInt();
2951 
2952             if ( nbBTrees < 0 )
2953             {
2954                 throw new RuntimeException( "Wrong nb trees : " + nbBTrees );
2955             }
2956 
2957             // The first free page offset. It must be either -1 or below file size
2958             // and its value must be a modulo of pageSize
2959             long firstFreePage = header.getLong();
2960 
2961             if ( firstFreePage > fileSize )
2962             {
2963                 throw new RuntimeException( "First free page pointing after the end of the file : " + firstFreePage );
2964             }
2965 
2966             if ( ( firstFreePage != NO_PAGE ) && ( ( ( firstFreePage - HEADER_SIZE ) % pageSize ) != 0 ) )
2967             {
2968                 throw new RuntimeException( "First free page not pointing to a correct offset : " + firstFreePage );
2969             }
2970 
2971             // The last free page offset. It must be -1
2972             long lastFreePage = header.getLong();
2973 
2974             if ( ( ( lastFreePage != NO_PAGE ) && ( ( ( lastFreePage - HEADER_SIZE ) % pageSize ) != 0 ) ) )
2975             //|| ( lastFreePage != 0 ) )
2976             {
2977                 throw new RuntimeException( "Invalid last free page : " + lastFreePage );
2978             }
2979 
2980             int nbPageBits = ( int ) ( nbPages / 64 );
2981 
2982             // Create an array of pages to be checked
2983             // We use one bit per page. It's 0 when the page
2984             // hasn't been checked, 1 otherwise.
2985             long[] checkedPages = new long[nbPageBits + 1];
2986 
2987             // Then the free files
2988             checkFreePages( checkedPages, pageSize, firstFreePage, lastFreePage );
2989 
2990             // The BTrees
2991             checkBTrees( checkedPages, pageSize, nbBTrees );
2992         }
2993         catch ( Exception e )
2994         {
2995             // We catch the exception and rethrow it immediately to be able to
2996             // put a breakpoint here
2997             e.printStackTrace();
2998             throw new RuntimeException( "Error : " + e.getMessage() );
2999         }
3000     }
3001 
3002 
3003     /**
3004      * Loads a BTree holding the values of a duplicate key
3005      * This tree is also called as dups tree or sub tree
3006      * 
3007      * @param offset the offset of the BTree header
3008      * @return the deserialized BTree
3009      */
3010     /* No qualifier */<K, V> BTree<K, V> loadDupsBTree( long offset )
3011     {
3012         try
3013         {
3014             PageIO[] pageIos = readPageIOs( offset, Long.MAX_VALUE );
3015 
3016             BTree<K, V> dupValueContainer = BTreeFactory.createBTree();
3017             dupValueContainer.setBtreeOffset( offset );
3018 
3019             loadBTree( pageIos, dupValueContainer );
3020 
3021             return dupValueContainer;
3022         }
3023         catch ( Exception e )
3024         {
3025             // should not happen
3026             throw new RuntimeException( e );
3027         }
3028     }
3029 
3030 
3031     /**
3032      * @see Object#toString()
3033      */
3034     public String toString()
3035     {
3036         StringBuilder sb = new StringBuilder();
3037 
3038         sb.append( "RM free pages : [" );
3039 
3040         if ( firstFreePage != NO_PAGE )
3041         {
3042             long current = firstFreePage;
3043             boolean isFirst = true;
3044 
3045             while ( current != NO_PAGE )
3046             {
3047                 if ( isFirst )
3048                 {
3049                     isFirst = false;
3050                 }
3051                 else
3052                 {
3053                     sb.append( ", " );
3054                 }
3055 
3056                 PageIO pageIo;
3057 
3058                 try
3059                 {
3060                     pageIo = fetchPage( current );
3061                     sb.append( pageIo.getOffset() );
3062                     current = pageIo.getNextPage();
3063                 }
3064                 catch ( EndOfFileExceededException e )
3065                 {
3066                     e.printStackTrace();
3067                 }
3068                 catch ( IOException e )
3069                 {
3070                     e.printStackTrace();
3071                 }
3072 
3073             }
3074         }
3075 
3076         sb.append( "]" );
3077 
3078         return sb.toString();
3079     }
3080 }