View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.directory.mavibot.btree;
21  
22  
23  import java.io.File;
24  import java.io.IOException;
25  import java.io.RandomAccessFile;
26  import java.nio.ByteBuffer;
27  import java.nio.channels.FileChannel;
28  import java.util.ArrayList;
29  import java.util.HashSet;
30  import java.util.LinkedHashMap;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.Set;
34  import java.util.concurrent.atomic.AtomicLong;
35  
36  import org.apache.directory.mavibot.btree.exception.BTreeAlreadyManagedException;
37  import org.apache.directory.mavibot.btree.exception.BTreeCreationException;
38  import org.apache.directory.mavibot.btree.exception.EndOfFileExceededException;
39  import org.apache.directory.mavibot.btree.exception.FreePageException;
40  import org.apache.directory.mavibot.btree.exception.InvalidBTreeException;
41  import org.apache.directory.mavibot.btree.exception.KeyNotFoundException;
42  import org.apache.directory.mavibot.btree.exception.RecordManagerException;
43  import org.apache.directory.mavibot.btree.serializer.ElementSerializer;
44  import org.apache.directory.mavibot.btree.serializer.IntSerializer;
45  import org.apache.directory.mavibot.btree.serializer.LongArraySerializer;
46  import org.apache.directory.mavibot.btree.serializer.LongSerializer;
47  import org.apache.directory.mavibot.btree.util.Strings;
48  import org.slf4j.Logger;
49  import org.slf4j.LoggerFactory;
50  
51  
52  /**
53   * The RecordManager is used to manage the file in which we will store the BTrees.
54   * A RecordManager will manage more than one BTree.<br/>
55   *
56   * It stores data in fixed size pages (default size is 512 bytes), which may be linked one to
57   * the other if the data we want to store is too big for a page.
58   *
59   * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
60   */
61  public class RecordManager
62  {
63      /** The LoggerFactory used by this class */
64      protected static final Logger LOG = LoggerFactory.getLogger( RecordManager.class );
65  
66      /** A dedicated logger for the check */
67      protected static final Logger LOG_CHECK = LoggerFactory.getLogger( "RM_CHECK" );
68  
69      /** The associated file */
70      private File file;
71  
72      /** The channel used to read and write data */
73      private FileChannel fileChannel;
74  
75      /** The number of stored BTrees */
76      private int nbBtree;
77  
78      /** The first and last free page */
79      private long firstFreePage;
80  
81      /** The list of available free pages */
82      List<PageIO> freePages = new ArrayList<PageIO>();
83  
84      /** Some counters to track the number of free pages */
85      public AtomicLong nbFreedPages = new AtomicLong( 0 );
86      public AtomicLong nbCreatedPages = new AtomicLong( 0 );
87      public AtomicLong nbReusedPages = new AtomicLong( 0 );
88      public AtomicLong nbUpdateRMHeader = new AtomicLong( 0 );
89      public AtomicLong nbUpdateBTreeHeader = new AtomicLong( 0 );
90      public AtomicLong nbUpdatePageIOs = new AtomicLong( 0 );
91  
92      /** The offset of the end of the file */
93      private long endOfFileOffset;
94  
95      /**
96       * A Btree used to manage the page that has been copied in a new version.
97       * Those pages can be reclaimed when the associated version is dead.
98       **/
99      private BTree<RevisionName, long[]> copiedPageBTree;
100 
101     /** A BTree used to store all the valid revisions for all the stored BTrees */
102     private BTree<RevisionName, Long> revisionBTree;
103 
104     /** A constant for an offset on a non existing page */
105     private static final long NO_PAGE = -1L;
106 
107     /** The header page size */
108     private static final int PAGE_SIZE = 4;
109 
110     /** The size of the link to next page */
111     private static final int LINK_SIZE = 8;
112 
113     /** Some constants */
114     private static final int BYTE_SIZE = 1;
115     private static final int INT_SIZE = 4;
116     private static final int LONG_SIZE = 8;
117 
118     /** The default page size */
119     private static final int DEFAULT_PAGE_SIZE = 512;
120 
121     /** The header size */
122     private static int HEADER_SIZE = DEFAULT_PAGE_SIZE;
123 
124     /** A global buffer used to store the header */
125     private static ByteBuffer HEADER_BUFFER;
126 
127     /** A static buffer used to store the header */
128     private static byte[] HEADER_BYTES;
129 
130     /** The length of an Offset, as a negative value */
131     private static byte[] LONG_LENGTH = new byte[]
132         { ( byte ) 0xFF, ( byte ) 0xFF, ( byte ) 0xFF, ( byte ) 0xF8 };
133 
134     /** The RecordManager underlying page size. */
135     private int pageSize = DEFAULT_PAGE_SIZE;
136 
137     /** The set of managed BTrees */
138     private Map<String, BTree<Object, Object>> managedBTrees;
139 
140     /** The offset on the last added BTree */
141     private long lastAddedBTreeOffset = NO_PAGE;
142 
143     /** The default file name */
144     private static final String DEFAULT_FILE_NAME = "mavibot.db";
145 
146     /** A deserializer for Offsets */
147     private static final LongSerializer OFFSET_SERIALIZER = new LongSerializer();
148 
149     private static final String REVISION_BTREE_NAME = "_revisionBTree_";
150 
151     private static final String COPIED_PAGE_BTREE_NAME = "_copiedPageBTree_";
152 
153     /** A flag set to true if we want to keep old revisions */
154     private boolean keepRevisions;
155 
156     /** A flag used by internal btrees */
157     public static final boolean INTERNAL_BTREE = true;
158 
159     /** A flag used by internal btrees */
160     public static final boolean NORMAL_BTREE = false;
161 
162     /** A map of pending pages */
163     private Map<Page<?, ?>, BTree<?, ?>> pendingPages = new LinkedHashMap<Page<?, ?>, BTree<?, ?>>();
164 
165     /** The Btree of Btrees */
166     private BTree<NameRevision, Long> btreeOfBtrees;
167 
168     private static final String BOB_ONE_NAME = "_BTREE_OF_BTREES_";
169 
170     /** The two latest revisions of the BOB */
171     private long bobCurrentRevision;
172     private long bobOldRevision;
173 
174     /**
175      * Create a Record manager which will either create the underlying file
176      * or load an existing one. If a folder is provided, then we will create
177      * a file with a default name : mavibot.db
178      *
179      * @param name The file name, or a folder name
180      */
181     public RecordManager( String fileName )
182     {
183         this( fileName, DEFAULT_PAGE_SIZE );
184     }
185 
186 
187     /**
188      * Create a Record manager which will either create the underlying file
189      * or load an existing one. If a folder is provider, then we will create
190      * a file with a default name : mavibot.db
191      *
192      * @param name The file name, or a folder name
193      * @param pageSize the size of a page on disk
194      */
195     public RecordManager( String fileName, int pageSize )
196     {
197         managedBTrees = new LinkedHashMap<String, BTree<Object, Object>>();
198 
199         HEADER_BUFFER = ByteBuffer.allocate( pageSize );
200         HEADER_BYTES = new byte[pageSize];
201         HEADER_SIZE = pageSize;
202 
203         // Open the file or create it
204         File tmpFile = new File( fileName );
205 
206         if ( tmpFile.isDirectory() )
207         {
208             // It's a directory. Check that we don't have an existing mavibot file
209             tmpFile = new File( tmpFile, DEFAULT_FILE_NAME );
210         }
211 
212         // We have to create a new file, if it does not already exist
213         boolean isNewFile = createFile( tmpFile );
214 
215         try
216         {
217             RandomAccessFile randomFile = new RandomAccessFile( file, "rw" );
218             fileChannel = randomFile.getChannel();
219 
220             // get the current end of file offset
221             endOfFileOffset = fileChannel.size();
222 
223             if ( isNewFile )
224             {
225                 this.pageSize = pageSize;
226                 initRecordManager();
227             }
228             else
229             {
230                 loadRecordManager();
231             }
232         }
233         catch ( Exception e )
234         {
235             LOG.error( "Error while initializing the RecordManager : {}", e.getMessage() );
236             LOG.error( "", e );
237             throw new RecordManagerException( e );
238         }
239     }
240 
241 
242     /**
243      * Create the mavibot file if it does not exist
244      */
245     private boolean createFile( File mavibotFile )
246     {
247         try
248         {
249             boolean creation = mavibotFile.createNewFile();
250 
251             file = mavibotFile;
252 
253             if ( mavibotFile.length() == 0 )
254             {
255                 return true;
256             }
257             else
258             {
259                 return creation;
260             }
261         }
262         catch ( IOException ioe )
263         {
264             LOG.error( "Cannot create the file {}", mavibotFile.getName() );
265             return false;
266         }
267 
268     }
269 
270     /**
271      * We will create a brand new RecordManager file, containing nothing, but the header,
272      * a BTree to manage the old revisions we want to keep and
273      * a BTree used to manage pages associated with old versions.
274      * <br/>
275      * The Header contains the following details :
276      * <pre>
277      * +---------------+
278      * | PageSize      | 4 bytes : The size of a physical page (default to 4096)
279      * +---------------+
280      * |  NbTree       | 4 bytes : The number of managed BTrees (at least 1)
281      * +---------------+
282      * | FirstFree     | 8 bytes : The offset of the first free page
283      * +---------------+
284      * | currentBoB    | 1 byte : The current BoB in use
285      * +---------------+
286      * | BoB offset[0] | 8 bytes : The offset of the first BoB
287      * +---------------+
288      * | BoB offset[1] | 8 bytes : The offset of the second BoB
289      * +---------------+
290      * </pre>
291      *
292      * We then store the BTree managing the pages that have been copied when we have added
293      * or deleted an element in the BTree. They are associated with a version.
294      *
295      * Last, we add the bTree that keep a track on each revision we can have access to.
296      */
297     private void initRecordManager() throws IOException
298     {
299         // Create a new Header
300         nbBtree = 0;
301         firstFreePage = NO_PAGE;
302         bobCurrentRevision = 0L;
303         bobOldRevision = 0L;
304 
305         updateRecordManagerHeader();
306 
307         // Set the offset of the end of the file
308         endOfFileOffset = fileChannel.size();
309 
310         // First, create the btree of btrees <NameRevision, Long>
311         btreeOfBtrees = BTreeFactory.createPersistedBTree( BOB_ONE_NAME, new NameRevisionSerializer(),
312             new LongSerializer() );
313 
314         // Now, initialize the Copied Page BTree
315         copiedPageBTree = BTreeFactory.createPersistedBTree( COPIED_PAGE_BTREE_NAME, new RevisionNameSerializer(),
316             new LongArraySerializer() );
317 
318         // and initialize the Revision BTree
319         revisionBTree = BTreeFactory.createPersistedBTree( REVISION_BTREE_NAME, new RevisionNameSerializer(),
320             new LongSerializer() );
321 
322         // Inject these BTrees into the RecordManager
323         try
324         {
325             manage( copiedPageBTree );
326             manage( revisionBTree );
327         }
328         catch ( BTreeAlreadyManagedException btame )
329         {
330             // Can't happen here.
331         }
332 
333         // We are all set !
334     }
335 
336 
337     /**
338      * Load the BTrees from the disk.
339      *
340      * @throws InstantiationException
341      * @throws IllegalAccessException
342      * @throws ClassNotFoundException
343      */
344     private void loadRecordManager() throws IOException, ClassNotFoundException, IllegalAccessException,
345         InstantiationException
346     {
347         if ( fileChannel.size() != 0 )
348         {
349             ByteBuffer header = ByteBuffer.allocate( HEADER_SIZE );
350 
351             // The file exists, we have to load the data now
352             fileChannel.read( header );
353 
354             header.rewind();
355 
356             // read the RecordManager Header :
357             // +----------------+
358             // | PageSize       | 4 bytes : The size of a physical page (default to 4096)
359             // +----------------+
360             // | NbTree         | 4 bytes : The number of managed BTrees (at least 1)
361             // +----------------+
362             // | FirstFree      | 8 bytes : The offset of the first free page
363             // +----------------+
364             // | BoB old offset | 8 bytes : The previous BoB revision
365             // +----------------+
366             // | BoB new offset | 8 bytes : The current BoB revision
367             // +----------------+
368 
369             // The page size
370             pageSize = header.getInt();
371 
372             // The number of managed BTrees
373             nbBtree = header.getInt();
374 
375             // The first and last free page
376             firstFreePage = header.getLong();
377 
378             // The BOB revisions
379             long bobRevision1 = header.getLong();
380             long bobRevision2 = header.getLong();
381 
382             if ( bobRevision1 < bobRevision2 )
383             {
384                 bobOldRevision = bobRevision1;
385                 bobCurrentRevision = bobRevision2;
386             }
387             else if ( bobRevision1 > bobRevision2 )
388             {
389                 bobOldRevision = bobRevision2;
390                 bobCurrentRevision = bobRevision1;
391             }
392             else
393             {
394                 // Special case : the RecordManage has been shtudown correctly
395                 bobOldRevision = bobRevision1;
396                 bobCurrentRevision = bobRevision2;
397             }
398 
399             // Now read each BTree. The first one is the one which
400             // manage the modified pages. Once read, we can discard all
401             // the pages that are stored in it, as we have restarted
402             // the RecordManager.
403             long btreeOffset = HEADER_SIZE;
404 
405             PageIO[] pageIos = readPageIOs( HEADER_SIZE, Long.MAX_VALUE );
406 
407             // Create the BTree
408             copiedPageBTree = BTreeFactory.<RevisionName, long[]> createPersistedBTree();
409             ( ( PersistedBTree<RevisionName, long[]> ) copiedPageBTree ).setBtreeOffset( btreeOffset );
410 
411             loadBTree( pageIos, copiedPageBTree );
412             long nextBtreeOffset = ( ( PersistedBTree<RevisionName, long[]> ) copiedPageBTree ).getNextBTreeOffset();
413 
414             // And the Revision BTree
415             pageIos = readPageIOs( nextBtreeOffset, Long.MAX_VALUE );
416 
417             revisionBTree = BTreeFactory.<RevisionName, Long> createPersistedBTree();
418             ( ( PersistedBTree<RevisionName, Long> ) revisionBTree ).setBtreeOffset( nextBtreeOffset );
419 
420             loadBTree( pageIos, revisionBTree );
421             nextBtreeOffset = ( ( PersistedBTree<RevisionName, Long> ) revisionBTree ).getNextBTreeOffset();
422 
423             // Then process the next ones
424             for ( int i = 2; i < nbBtree; i++ )
425             {
426                 // Create the BTree
427                 BTree<Object, Object> btree = BTreeFactory.createPersistedBTree();
428                 ( ( PersistedBTree<Object, Object> ) btree ).setRecordManager( this );
429                 ( ( PersistedBTree<Object, Object> ) btree ).setBtreeOffset( nextBtreeOffset );
430                 lastAddedBTreeOffset = nextBtreeOffset;
431 
432                 // Read the associated pages
433                 pageIos = readPageIOs( nextBtreeOffset, Long.MAX_VALUE );
434 
435                 // Load the BTree
436                 loadBTree( pageIos, btree );
437                 nextBtreeOffset = ( ( PersistedBTree<Object, Object> ) btree ).getNextBTreeOffset();
438 
439                 // Store it into the managedBtrees map
440                 managedBTrees.put( btree.getName(), btree );
441             }
442 
443             // We are done ! Let's finish with the last initialization parts
444             endOfFileOffset = fileChannel.size();
445         }
446     }
447 
448 
449     /**
450      * Reads all the PageIOs that are linked to the page at the given position, including
451      * the first page.
452      *
453      * @param position The position of the first page
454      * @return An array of pages
455      */
456     private PageIO[] readPageIOs( long position, long limit ) throws IOException, EndOfFileExceededException
457     {
458         LOG.debug( "Read PageIOs at position {}", position );
459 
460         if ( limit <= 0 )
461         {
462             limit = Long.MAX_VALUE;
463         }
464 
465         PageIO firstPage = fetchPage( position );
466         firstPage.setSize();
467         List<PageIO> listPages = new ArrayList<PageIO>();
468         listPages.add( firstPage );
469         long dataRead = pageSize - LONG_SIZE - INT_SIZE;
470 
471         // Iterate on the pages, if needed
472         long nextPage = firstPage.getNextPage();
473 
474         if ( ( dataRead < limit ) && ( nextPage != NO_PAGE ) )
475         {
476             while ( dataRead < limit )
477             {
478                 PageIO page = fetchPage( nextPage );
479                 listPages.add( page );
480                 nextPage = page.getNextPage();
481                 dataRead += pageSize - LONG_SIZE;
482 
483                 if ( nextPage == NO_PAGE )
484                 {
485                     page.setNextPage( NO_PAGE );
486                     break;
487                 }
488             }
489         }
490 
491         LOG.debug( "Nb of PageIOs read : {}", listPages.size() );
492 
493         // Return
494         return listPages.toArray( new PageIO[]
495             {} );
496     }
497 
498 
499     /**
500      * Read a BTree from the disk. The meta-data are at the given position in the list of pages.
501      *
502      * @param pageIos The list of pages containing the meta-data
503      * @param btree The BTree we have to initialize
504      * @throws InstantiationException
505      * @throws IllegalAccessException
506      * @throws ClassNotFoundException
507      */
508     private <K, V> void loadBTree( PageIO[] pageIos, BTree<K, V> btree ) throws EndOfFileExceededException,
509         IOException, ClassNotFoundException, IllegalAccessException, InstantiationException
510     {
511         long dataPos = 0L;
512 
513         // The BTree current revision
514         long revision = readLong( pageIos, dataPos );
515         BTreeFactory.setRevision( btree, revision );
516         dataPos += LONG_SIZE;
517 
518         // The nb elems in the tree
519         long nbElems = readLong( pageIos, dataPos );
520         BTreeFactory.setNbElems( btree, nbElems );
521         dataPos += LONG_SIZE;
522 
523         // The BTree rootPage offset
524         long rootPageOffset = readLong( pageIos, dataPos );
525         BTreeFactory.setRootPageOffset( btree, rootPageOffset );
526         dataPos += LONG_SIZE;
527 
528         // The next BTree offset
529         long nextBTreeOffset = readLong( pageIos, dataPos );
530         BTreeFactory.setNextBTreeOffset( btree, nextBTreeOffset );
531         dataPos += LONG_SIZE;
532 
533         // The BTree page size
534         int btreePageSize = readInt( pageIos, dataPos );
535         BTreeFactory.setPageSize( btree, btreePageSize );
536         dataPos += INT_SIZE;
537 
538         // The tree name
539         ByteBuffer btreeNameBytes = readBytes( pageIos, dataPos );
540         dataPos += INT_SIZE + btreeNameBytes.limit();
541         String btreeName = Strings.utf8ToString( btreeNameBytes );
542         BTreeFactory.setName( btree, btreeName );
543 
544         // The keySerializer FQCN
545         ByteBuffer keySerializerBytes = readBytes( pageIos, dataPos );
546         dataPos += INT_SIZE + keySerializerBytes.limit();
547 
548         String keySerializerFqcn = "";
549 
550         if ( keySerializerBytes != null )
551         {
552             keySerializerFqcn = Strings.utf8ToString( keySerializerBytes );
553         }
554 
555         BTreeFactory.setKeySerializer( btree, keySerializerFqcn );
556 
557         // The valueSerialier FQCN
558         ByteBuffer valueSerializerBytes = readBytes( pageIos, dataPos );
559 
560         String valueSerializerFqcn = "";
561         dataPos += INT_SIZE + valueSerializerBytes.limit();
562 
563         if ( valueSerializerBytes != null )
564         {
565             valueSerializerFqcn = Strings.utf8ToString( valueSerializerBytes );
566         }
567 
568         BTreeFactory.setValueSerializer( btree, valueSerializerFqcn );
569 
570         // The BTree allowDuplicates flag
571         int allowDuplicates = readInt( pageIos, dataPos );
572         ( ( PersistedBTree<K, V> ) btree ).setAllowDuplicates( allowDuplicates != 0 );
573         dataPos += INT_SIZE;
574 
575         // Now, init the BTree
576         btree.init();
577 
578         ( ( PersistedBTree<K, V> ) btree ).setRecordManager( this );
579 
580         // Now, load the rootPage, which can be a Leaf or a Node, depending
581         // on the number of elements in the tree : if it's above the pageSize,
582         // it's a Node, otherwise it's a Leaf
583 
584         // Read the rootPage pages on disk
585         PageIO[] rootPageIos = readPageIOs( rootPageOffset, Long.MAX_VALUE );
586 
587         Page<K, V> btreeRoot = readPage( btree, rootPageIos );
588         BTreeFactory.setRecordManager( btree, this );
589 
590         BTreeFactory.setRootPage( btree, btreeRoot );
591     }
592 
593 
594     private <K, V> Page<K, V> readNode( BTree<K, V> btree, long offset, long revision, int nbElems ) throws IOException
595     {
596         Page<K, V> node = BTreeFactory.createNode( btree, revision, nbElems );
597 
598         // Read the rootPage pages on disk
599         PageIO[] pageIos = readPageIOs( offset, Long.MAX_VALUE );
600 
601         return node;
602     }
603 
604 
605     public <K, V> Page<K, V> deserialize( BTree<K, V> btree, long offset ) throws EndOfFileExceededException,
606         IOException
607     {
608         PageIO[] rootPageIos = readPageIOs( offset, Long.MAX_VALUE );
609 
610         Page<K, V> page = readPage( btree, rootPageIos );
611 
612         ( ( AbstractPage<K, V> ) page ).setOffset( rootPageIos[0].getOffset() );
613         ( ( AbstractPage<K, V> ) page ).setLastOffset( rootPageIos[rootPageIos.length - 1].getOffset() );
614 
615         return page;
616     }
617 
618 
619     private <K, V> Page<K, V> readPage( BTree<K, V> btree, PageIO[] pageIos ) throws IOException
620     {
621         // Deserialize the rootPage now
622         long position = 0L;
623 
624         // The revision
625         long revision = readLong( pageIos, position );
626         position += LONG_SIZE;
627 
628         // The number of elements in the page
629         int nbElems = readInt( pageIos, position );
630         position += INT_SIZE;
631 
632         // The size of the data containing the keys and values
633         Page<K, V> page = null;
634 
635         // Reads the bytes containing all the keys and values, if we have some
636         // We read  big blog of data into  ByteBuffer, then we will process
637         // this ByteBuffer
638         ByteBuffer byteBuffer = readBytes( pageIos, position );
639 
640         // Now, deserialize the data block. If the number of elements
641         // is positive, it's a Leaf, otherwise it's a Node
642         // Note that only a leaf can have 0 elements, and it's the root page then.
643         if ( nbElems >= 0 )
644         {
645             // It's a leaf
646             page = readLeafKeysAndValues( btree, nbElems, revision, byteBuffer, pageIos );
647         }
648         else
649         {
650             // It's a node
651             page = readNodeKeysAndValues( btree, -nbElems, revision, byteBuffer, pageIos );
652         }
653 
654         return page;
655     }
656 
657 
658     /**
659      * Deserialize a Leaf from some PageIOs
660      */
661     private <K, V> PersistedLeaf<K, V> readLeafKeysAndValues( BTree<K, V> btree, int nbElems, long revision,
662         ByteBuffer byteBuffer,
663         PageIO[] pageIos )
664     {
665         // Its a leaf, create it
666         PersistedLeaf<K, V> leaf = ( PersistedLeaf<K, V> ) BTreeFactory.createLeaf( btree, revision, nbElems );
667 
668         // Store the page offset on disk
669         leaf.setOffset( pageIos[0].getOffset() );
670         leaf.setLastOffset( pageIos[pageIos.length - 1].getOffset() );
671 
672         int[] keyLengths = new int[nbElems];
673         int[] valueLengths = new int[nbElems];
674 
675         // Read each key and value
676         for ( int i = 0; i < nbElems; i++ )
677         {
678             // Read the number of values
679             int nbValues = byteBuffer.getInt();
680             PersistedValueHolder<V> valueHolder = null;
681 
682             if ( nbValues < 0 )
683             {
684                 // This is a sub-btree
685                 byte[] btreeOffsetBytes = new byte[LONG_SIZE];
686                 byteBuffer.get( btreeOffsetBytes );
687 
688                 // Create the valueHolder. As the number of values is negative, we have to switch
689                 // to a positive value but as we start at -1 for 0 value, add 1.
690                 valueHolder = new PersistedValueHolder<V>( btree, 1 - nbValues, btreeOffsetBytes );
691             }
692             else
693             {
694                 // This is an array
695                 // Read the value's array length
696                 valueLengths[i] = byteBuffer.getInt();
697 
698                 // This is an Array of values, read the byte[] associated with it
699                 byte[] arrayBytes = new byte[valueLengths[i]];
700                 byteBuffer.get( arrayBytes );
701                 valueHolder = new PersistedValueHolder<V>( btree, nbValues, arrayBytes );
702             }
703 
704             BTreeFactory.setValue( btree, leaf, i, valueHolder );
705 
706             keyLengths[i] = byteBuffer.getInt();
707             byte[] data = new byte[keyLengths[i]];
708             byteBuffer.get( data );
709             BTreeFactory.setKey( btree, leaf, i, data );
710         }
711 
712         return leaf;
713     }
714 
715 
716     /**
717      * Deserialize a Node from some PageIos
718      */
719     private <K, V> PersistedNode<K, V> readNodeKeysAndValues( BTree<K, V> btree, int nbElems, long revision,
720         ByteBuffer byteBuffer,
721         PageIO[] pageIos ) throws IOException
722     {
723         PersistedNode<K, V> node = ( PersistedNode<K, V> ) BTreeFactory.createNode( btree, revision, nbElems );
724 
725         // Read each value and key
726         for ( int i = 0; i < nbElems; i++ )
727         {
728             // This is an Offset
729             long offset = OFFSET_SERIALIZER.deserialize( byteBuffer );
730             long lastOffset = OFFSET_SERIALIZER.deserialize( byteBuffer );
731 
732             PersistedPageHolder<K, V> valueHolder = new PersistedPageHolder<K, V>( btree, null, offset, lastOffset );
733             node.setValue( i, valueHolder );
734 
735             // Read the key length
736             int keyLength = byteBuffer.getInt();
737 
738             int currentPosition = byteBuffer.position();
739 
740             // and the key value
741             K key = btree.getKeySerializer().deserialize( byteBuffer );
742 
743             // Set the new position now
744             byteBuffer.position( currentPosition + keyLength );
745 
746             BTreeFactory.setKey( btree, node, i, key );
747         }
748 
749         // and read the last value, as it's a node
750         long offset = OFFSET_SERIALIZER.deserialize( byteBuffer );
751         long lastOffset = OFFSET_SERIALIZER.deserialize( byteBuffer );
752 
753         PersistedPageHolder<K, V> valueHolder = new PersistedPageHolder<K, V>( btree, null, offset, lastOffset );
754         node.setValue( nbElems, valueHolder );
755 
756         return node;
757     }
758 
759 
760     /**
761      * Read a byte[] from pages.
762      * @param pageIos The pages we want to read the byte[] from
763      * @param position The position in the data stored in those pages
764      * @return The byte[] we have read
765      */
766     private ByteBuffer readBytes( PageIO[] pageIos, long position )
767     {
768         // Read the byte[] length first
769         int length = readInt( pageIos, position );
770         position += INT_SIZE;
771 
772         // Compute the page in which we will store the data given the
773         // current position
774         int pageNb = computePageNb( position );
775 
776         // Compute the position in the current page
777         int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
778 
779         ByteBuffer pageData = pageIos[pageNb].getData();
780         int remaining = pageData.capacity() - pagePos;
781 
782         if ( length == 0 )
783         {
784             // No bytes to read : return null;
785             return null;
786         }
787         else
788         {
789             ByteBuffer bytes = ByteBuffer.allocate( length );
790             int bytesPos = 0;
791 
792             while ( length > 0 )
793             {
794                 if ( length <= remaining )
795                 {
796                     pageData.mark();
797                     pageData.position( pagePos );
798                     int oldLimit = pageData.limit();
799                     pageData.limit( pagePos + length );
800                     bytes.put( pageData );
801                     pageData.limit( oldLimit );
802                     pageData.reset();
803                     bytes.rewind();
804 
805                     return bytes;
806                 }
807 
808                 pageData.mark();
809                 pageData.position( pagePos );
810                 int oldLimit = pageData.limit();
811                 pageData.limit( pagePos + remaining );
812                 bytes.put( pageData );
813                 pageData.limit( oldLimit );
814                 pageData.reset();
815                 pageNb++;
816                 pagePos = LINK_SIZE;
817                 bytesPos += remaining;
818                 pageData = pageIos[pageNb].getData();
819                 length -= remaining;
820                 remaining = pageData.capacity() - pagePos;
821             }
822 
823             bytes.rewind();
824 
825             return bytes;
826         }
827     }
828 
829 
830     /**
831      * Read an int from pages
832      * @param pageIos The pages we want to read the int from
833      * @param position The position in the data stored in those pages
834      * @return The int we have read
835      */
836     private int readInt( PageIO[] pageIos, long position )
837     {
838         // Compute the page in which we will store the data given the
839         // current position
840         int pageNb = computePageNb( position );
841 
842         // Compute the position in the current page
843         int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
844 
845         ByteBuffer pageData = pageIos[pageNb].getData();
846         int remaining = pageData.capacity() - pagePos;
847         int value = 0;
848 
849         if ( remaining >= INT_SIZE )
850         {
851             value = pageData.getInt( pagePos );
852         }
853         else
854         {
855             value = 0;
856 
857             switch ( remaining )
858             {
859                 case 3:
860                     value += ( ( pageData.get( pagePos + 2 ) & 0x00FF ) << 8 );
861                     // Fallthrough !!!
862 
863                 case 2:
864                     value += ( ( pageData.get( pagePos + 1 ) & 0x00FF ) << 16 );
865                     // Fallthrough !!!
866 
867                 case 1:
868                     value += ( pageData.get( pagePos ) << 24 );
869                     break;
870             }
871 
872             // Now deal with the next page
873             pageData = pageIos[pageNb + 1].getData();
874             pagePos = LINK_SIZE;
875 
876             switch ( remaining )
877             {
878                 case 1:
879                     value += ( pageData.get( pagePos ) & 0x00FF ) << 16;
880                     // fallthrough !!!
881 
882                 case 2:
883                     value += ( pageData.get( pagePos + 2 - remaining ) & 0x00FF ) << 8;
884                     // fallthrough !!!
885 
886                 case 3:
887                     value += ( pageData.get( pagePos + 3 - remaining ) & 0x00FF );
888                     break;
889             }
890         }
891 
892         return value;
893     }
894 
895 
896     /**
897      * Read a byte from pages
898      * @param pageIos The pages we want to read the byte from
899      * @param position The position in the data stored in those pages
900      * @return The byte we have read
901      */
902     private byte readByte( PageIO[] pageIos, long position )
903     {
904         // Compute the page in which we will store the data given the
905         // current position
906         int pageNb = computePageNb( position );
907 
908         // Compute the position in the current page
909         int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
910 
911         ByteBuffer pageData = pageIos[pageNb].getData();
912         byte value = 0;
913 
914         value = pageData.get( pagePos );
915 
916         return value;
917     }
918 
919 
920     /**
921      * Read a long from pages
922      * @param pageIos The pages we want to read the long from
923      * @param position The position in the data stored in those pages
924      * @return The long we have read
925      */
926     private long readLong( PageIO[] pageIos, long position )
927     {
928         // Compute the page in which we will store the data given the
929         // current position
930         int pageNb = computePageNb( position );
931 
932         // Compute the position in the current page
933         int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
934 
935         ByteBuffer pageData = pageIos[pageNb].getData();
936         int remaining = pageData.capacity() - pagePos;
937         long value = 0L;
938 
939         if ( remaining >= LONG_SIZE )
940         {
941             value = pageData.getLong( pagePos );
942         }
943         else
944         {
945             switch ( remaining )
946             {
947                 case 7:
948                     value += ( ( ( long ) pageData.get( pagePos + 6 ) & 0x00FF ) << 8 );
949                     // Fallthrough !!!
950 
951                 case 6:
952                     value += ( ( ( long ) pageData.get( pagePos + 5 ) & 0x00FF ) << 16 );
953                     // Fallthrough !!!
954 
955                 case 5:
956                     value += ( ( ( long ) pageData.get( pagePos + 4 ) & 0x00FF ) << 24 );
957                     // Fallthrough !!!
958 
959                 case 4:
960                     value += ( ( ( long ) pageData.get( pagePos + 3 ) & 0x00FF ) << 32 );
961                     // Fallthrough !!!
962 
963                 case 3:
964                     value += ( ( ( long ) pageData.get( pagePos + 2 ) & 0x00FF ) << 40 );
965                     // Fallthrough !!!
966 
967                 case 2:
968                     value += ( ( ( long ) pageData.get( pagePos + 1 ) & 0x00FF ) << 48 );
969                     // Fallthrough !!!
970 
971                 case 1:
972                     value += ( ( long ) pageData.get( pagePos ) << 56 );
973                     break;
974             }
975 
976             // Now deal with the next page
977             pageData = pageIos[pageNb + 1].getData();
978             pagePos = LINK_SIZE;
979 
980             switch ( remaining )
981             {
982                 case 1:
983                     value += ( ( long ) pageData.get( pagePos ) & 0x00FF ) << 48;
984                     // fallthrough !!!
985 
986                 case 2:
987                     value += ( ( long ) pageData.get( pagePos + 2 - remaining ) & 0x00FF ) << 40;
988                     // fallthrough !!!
989 
990                 case 3:
991                     value += ( ( long ) pageData.get( pagePos + 3 - remaining ) & 0x00FF ) << 32;
992                     // fallthrough !!!
993 
994                 case 4:
995                     value += ( ( long ) pageData.get( pagePos + 4 - remaining ) & 0x00FF ) << 24;
996                     // fallthrough !!!
997 
998                 case 5:
999                     value += ( ( long ) pageData.get( pagePos + 5 - remaining ) & 0x00FF ) << 16;
1000                     // fallthrough !!!
1001 
1002                 case 6:
1003                     value += ( ( long ) pageData.get( pagePos + 6 - remaining ) & 0x00FF ) << 8;
1004                     // fallthrough !!!
1005 
1006                 case 7:
1007                     value += ( ( long ) pageData.get( pagePos + 7 - remaining ) & 0x00FF );
1008                     break;
1009             }
1010         }
1011 
1012         return value;
1013     }
1014 
1015 
1016     /**
1017      * Manage a BTree. The btree will be added and managed by this RecordManager. We will create a
1018      * new RootPage for this added BTree, which will contain no data.
1019      *
1020      * @param btree The new BTree to manage.
1021      */
1022     public synchronized <K, V> void manage( BTree<K, V> btree ) throws BTreeAlreadyManagedException, IOException
1023     {
1024         manage( ( BTree<Object, Object> ) btree, NORMAL_BTREE );
1025     }
1026 
1027 
1028     /**
1029      * works the same as @see #manage(BTree) except the given tree will not be linked to top level trees that will be
1030      * loaded initially if the internalTree flag is set to true
1031      *
1032      * @param btree The new BTree to manage.
1033      * @param internalTree flag indicating if this is an internal tree
1034      *
1035      * @throws BTreeAlreadyManagedException
1036      * @throws IOException
1037      */
1038     public synchronized <K, V> void manage( BTree<K, V> btree, boolean internalTree )
1039         throws BTreeAlreadyManagedException,
1040         IOException
1041     {
1042         LOG.debug( "Managing the btree {} which is an internam tree : {}", btree.getName(), internalTree );
1043         BTreeFactory.setRecordManager( btree, this );
1044 
1045         String name = btree.getName();
1046 
1047         if ( managedBTrees.containsKey( name ) )
1048         {
1049             // There is already a BTree with this name in the recordManager...
1050             LOG.error( "There is already a BTree named '{}' managed by this recordManager", name );
1051             throw new BTreeAlreadyManagedException( name );
1052         }
1053 
1054         // Do not add the BTree if it's internal into the Map of managed btrees, otherwise we will
1055         // not discard it when reloading a page wth internal btrees
1056         if ( !internalTree )
1057         {
1058             managedBTrees.put( name, ( BTree<Object, Object> ) btree );
1059         }
1060 
1061         // We will add the newly managed BTree at the end of the header.
1062         byte[] btreeNameBytes = Strings.getBytesUtf8( name );
1063         byte[] keySerializerBytes = Strings.getBytesUtf8( btree.getKeySerializerFQCN() );
1064         byte[] valueSerializerBytes = Strings.getBytesUtf8( btree.getValueSerializerFQCN() );
1065 
1066         int bufferSize =
1067             INT_SIZE + // The name size
1068                 btreeNameBytes.length + // The name
1069                 INT_SIZE + // The keySerializerBytes size
1070                 keySerializerBytes.length + // The keySerializerBytes
1071                 INT_SIZE + // The valueSerializerBytes size
1072                 valueSerializerBytes.length + // The valueSerializerBytes
1073                 INT_SIZE + // The page size
1074                 LONG_SIZE + // The revision
1075                 LONG_SIZE + // the number of element
1076                 LONG_SIZE + // the nextBtree offset
1077                 LONG_SIZE + // The root offset
1078                 INT_SIZE; // The allowDuplicates flag
1079 
1080         // Get the pageIOs we need to store the data. We may need more than one.
1081         PageIO[] pageIos = getFreePageIOs( bufferSize );
1082 
1083         // Store the BTree Offset into the BTree
1084         long btreeOffset = pageIos[0].getOffset();
1085         ( ( PersistedBTree<K, V> ) btree ).setBtreeOffset( btreeOffset );
1086 
1087         // Now store the BTree data in the pages :
1088         // - the BTree revision
1089         // - the BTree number of elements
1090         // - The RootPage offset
1091         // - The next Btree offset
1092         // - the BTree page size
1093         // - the BTree name
1094         // - the keySerializer FQCN
1095         // - the valueSerializer FQCN
1096         // - the flags that tell if the dups are allowed
1097         // Starts at 0
1098         long position = 0L;
1099 
1100         // The BTree current revision
1101         position = store( position, btree.getRevision(), pageIos );
1102 
1103         // The nb elems in the tree
1104         position = store( position, btree.getNbElems(), pageIos );
1105 
1106         // Serialize the BTree root page
1107         Page<K, V> rootPage = BTreeFactory.getRootPage( btree );
1108 
1109         PageIO[] rootPageIos = serializePage( btree, btree.getRevision(), rootPage );
1110 
1111         // Get the reference on the first page
1112         PageIO rootPageIo = rootPageIos[0];
1113 
1114         // Now, we can inject the BTree rootPage offset into the BTree header
1115         position = store( position, rootPageIo.getOffset(), pageIos );
1116         ( ( PersistedBTree<K, V> ) btree ).setRootPageOffset( rootPageIo.getOffset() );
1117         ( ( PersistedLeaf<K, V> ) rootPage ).setOffset( rootPageIo.getOffset() );
1118 
1119         // The next BTree Header offset (-1L, as it's a new BTree)
1120         position = store( position, NO_PAGE, pageIos );
1121 
1122         // The BTree page size
1123         position = store( position, btree.getPageSize(), pageIos );
1124 
1125         // The tree name
1126         position = store( position, btreeNameBytes, pageIos );
1127 
1128         // The keySerializer FQCN
1129         position = store( position, keySerializerBytes, pageIos );
1130 
1131         // The valueSerialier FQCN
1132         position = store( position, valueSerializerBytes, pageIos );
1133 
1134         // The allowDuplicates flag
1135         position = store( position, ( btree.isAllowDuplicates() ? 1 : 0 ), pageIos );
1136 
1137         // And flush the pages to disk now
1138         LOG.debug( "Flushing the newly managed '{}' btree header", btree.getName() );
1139         flushPages( pageIos );
1140         LOG.debug( "Flushing the newly managed '{}' btree rootpage", btree.getName() );
1141         flushPages( rootPageIos );
1142 
1143         // Now, if this added BTree is not the first BTree, we have to link it with the
1144         // latest added BTree
1145         if ( !internalTree )
1146         {
1147             nbBtree++;
1148 
1149             if ( lastAddedBTreeOffset != NO_PAGE )
1150             {
1151                 // We have to update the nextBtreeOffset from the previous BTreeHeader
1152                 pageIos = readPageIOs( lastAddedBTreeOffset, LONG_SIZE + LONG_SIZE + LONG_SIZE + LONG_SIZE );
1153                 store( LONG_SIZE + LONG_SIZE + LONG_SIZE, btreeOffset, pageIos );
1154 
1155                 // Write the pages on disk
1156                 LOG.debug( "Updated the previous btree pointer on the added BTree {}", btree.getName() );
1157                 flushPages( pageIos );
1158             }
1159 
1160             lastAddedBTreeOffset = btreeOffset;
1161 
1162             // Last, not least, update the number of managed BTrees in the header
1163             updateRecordManagerHeader();
1164         }
1165 
1166         if ( LOG_CHECK.isDebugEnabled() )
1167         {
1168             check();
1169         }
1170     }
1171 
1172 
1173     /**
1174      * Serialize a new Page. It will contain the following data :<br/>
1175      * <ul>
1176      * <li>the revision : a long</li>
1177      * <li>the number of elements : an int (if <= 0, it's a Node, otherwise it's a Leaf)</li>
1178      * <li>the size of the values/keys when serialized
1179      * <li>the keys : an array of serialized keys</li>
1180      * <li>the values : an array of references to the children pageIO offset (stored as long)
1181      * if it's a Node, or a list of values if it's a Leaf</li>
1182      * <li></li>
1183      * </ul>
1184      *
1185      * @param revision The node revision
1186      * @param keys The keys to serialize
1187      * @param children The references to the children
1188      * @return An array of pages containing the serialized node
1189      * @throws IOException
1190      */
1191     private <K, V> PageIO[] serializePage( BTree<K, V> btree, long revision, Page<K, V> page ) throws IOException
1192     {
1193         int nbElems = page.getNbElems();
1194 
1195         if ( nbElems == 0 )
1196         {
1197             return serializeRootPage( revision );
1198         }
1199         else
1200         {
1201             // Prepare a list of byte[] that will contain the serialized page
1202             int nbBuffers = 1 + 1 + 1 + nbElems * 3;
1203             int dataSize = 0;
1204             int serializedSize = 0;
1205 
1206             if ( page.isNode() )
1207             {
1208                 // A Node has one more value to store
1209                 nbBuffers++;
1210             }
1211 
1212             // Now, we can create the list with the right size
1213             List<byte[]> serializedData = new ArrayList<byte[]>( nbBuffers );
1214 
1215             // The revision
1216             byte[] buffer = LongSerializer.serialize( revision );
1217             serializedData.add( buffer );
1218             serializedSize += buffer.length;
1219 
1220             // The number of elements
1221             // Make it a negative value if it's a Node
1222             int pageNbElems = nbElems;
1223 
1224             if ( page.isNode() )
1225             {
1226                 pageNbElems = -nbElems;
1227             }
1228 
1229             buffer = IntSerializer.serialize( pageNbElems );
1230             serializedData.add( buffer );
1231             serializedSize += buffer.length;
1232 
1233             // Iterate on the keys and values. We first serialize the value, then the key
1234             // until we are done with all of them. If we are serializing a page, we have
1235             // to serialize one more value
1236             for ( int pos = 0; pos < nbElems; pos++ )
1237             {
1238                 // Start with the value
1239                 if ( page.isNode() )
1240                 {
1241                     dataSize += serializeNodeValue( ( PersistedNode<K, V> ) page, pos, serializedData );
1242                     dataSize += serializeNodeKey( ( PersistedNode<K, V> ) page, pos, serializedData );
1243                 }
1244                 else
1245                 {
1246                     dataSize += serializeLeafValue( ( PersistedLeaf<K, V> ) page, pos, serializedData );
1247                     dataSize += serializeLeafKey( ( PersistedLeaf<K, V> ) page, pos, serializedData );
1248                 }
1249             }
1250 
1251             // Nodes have one more value to serialize
1252             if ( page.isNode() )
1253             {
1254                 dataSize += serializeNodeValue( ( PersistedNode<K, V> ) page, nbElems, serializedData );
1255             }
1256 
1257             // Store the data size
1258             buffer = IntSerializer.serialize( dataSize );
1259             serializedData.add( 2, buffer );
1260             serializedSize += buffer.length;
1261 
1262             serializedSize += dataSize;
1263 
1264             // We are done. Allocate the pages we need to store the data
1265             PageIO[] pageIos = getFreePageIOs( serializedSize );
1266 
1267             // And store the data into those pages
1268             long position = 0L;
1269 
1270             for ( byte[] bytes : serializedData )
1271             {
1272                 position = storeRaw( position, bytes, pageIos );
1273             }
1274 
1275             return pageIos;
1276         }
1277     }
1278 
1279 
1280     /**
1281      * Serialize a Node's key
1282      */
1283     private <K, V> int serializeNodeKey( PersistedNode<K, V> node, int pos, List<byte[]> serializedData )
1284     {
1285         KeyHolder<K> holder = node.getKeyHolder( pos );
1286         byte[] buffer = ( ( PersistedKeyHolder<K> ) holder ).getRaw();
1287 
1288         // We have to store the serialized key length
1289         byte[] length = IntSerializer.serialize( buffer.length );
1290         serializedData.add( length );
1291 
1292         // And store the serialized key now if not null
1293         if ( buffer.length != 0 )
1294         {
1295             serializedData.add( buffer );
1296         }
1297 
1298         return buffer.length + INT_SIZE;
1299     }
1300 
1301 
1302     /**
1303      * Serialize a Node's Value. We store the two offsets of the child page.
1304      */
1305     private <K, V> int serializeNodeValue( PersistedNode<K, V> node, int pos, List<byte[]> serializedData )
1306         throws IOException
1307     {
1308         // For a node, we just store the children's offsets
1309         Page<K, V> child = node.getReference( pos );
1310 
1311         // The first offset
1312         byte[] buffer = LongSerializer.serialize( ( ( AbstractPage<K, V> ) child ).getOffset() );
1313         serializedData.add( buffer );
1314         int dataSize = buffer.length;
1315 
1316         // The last offset
1317         buffer = LongSerializer.serialize( ( ( AbstractPage<K, V> ) child ).getLastOffset() );
1318         serializedData.add( buffer );
1319         dataSize += buffer.length;
1320 
1321         return dataSize;
1322     }
1323 
1324 
1325     /**
1326      * Serialize a Leaf's key
1327      */
1328     private <K, V> int serializeLeafKey( PersistedLeaf<K, V> leaf, int pos, List<byte[]> serializedData )
1329     {
1330         int dataSize = 0;
1331         KeyHolder<K> keyHolder = leaf.getKeyHolder( pos );
1332         byte[] keyData = ( ( PersistedKeyHolder<K> ) keyHolder ).getRaw();
1333 
1334         if ( keyData != null )
1335         {
1336             // We have to store the serialized key length
1337             byte[] length = IntSerializer.serialize( keyData.length );
1338             serializedData.add( length );
1339 
1340             // And the key data
1341             serializedData.add( keyData );
1342             dataSize += keyData.length + INT_SIZE;
1343         }
1344         else
1345         {
1346             serializedData.add( IntSerializer.serialize( 0 ) );
1347             dataSize += INT_SIZE;
1348         }
1349 
1350         return dataSize;
1351     }
1352 
1353 
1354     /**
1355      * Serialize a Leaf's Value. We store
1356      */
1357     private <K, V> int serializeLeafValue( PersistedLeaf<K, V> leaf, int pos, List<byte[]> serializedData )
1358         throws IOException
1359     {
1360         // The value can be an Array or a sub-btree, but we don't care
1361         // we just iterate on all the values
1362         ValueHolder<V> valueHolder = leaf.getValue( pos );
1363         int dataSize = 0;
1364         int nbValues = valueHolder.size();
1365 
1366         if ( !valueHolder.isSubBtree() )
1367         {
1368             // Write the nb elements first
1369             byte[] buffer = IntSerializer.serialize( nbValues );
1370             serializedData.add( buffer );
1371             dataSize = INT_SIZE;
1372 
1373             // We have a serialized value. Just flush it
1374             byte[] data = ( ( PersistedValueHolder<V> ) valueHolder ).getRaw();
1375             dataSize += data.length;
1376 
1377             // Store the data size
1378             buffer = IntSerializer.serialize( data.length );
1379             serializedData.add( buffer );
1380             dataSize += INT_SIZE;
1381 
1382             // and add the data if it's not 0
1383             if ( data.length > 0 )
1384             {
1385                 serializedData.add( data );
1386             }
1387         }
1388         else
1389         {
1390             if ( nbValues == 0 )
1391             {
1392                 // No value.
1393                 byte[] buffer = IntSerializer.serialize( nbValues );
1394                 serializedData.add( buffer );
1395 
1396                 return buffer.length;
1397             }
1398 
1399             if ( valueHolder.isSubBtree() )
1400             {
1401                 // Store the nbVlues as a negative number. We add 1 so that 0 is not confused with an Array value
1402                 byte[] buffer = IntSerializer.serialize( -( nbValues + 1 ) );
1403                 serializedData.add( buffer );
1404                 dataSize += buffer.length;
1405 
1406                 // the BTree offset
1407                 buffer = LongSerializer.serialize( ( ( PersistedValueHolder<V> ) valueHolder ).getOffset() );
1408                 serializedData.add( buffer );
1409                 dataSize += buffer.length;
1410             }
1411             else
1412             {
1413                 // This is an array, store the nb of values as a positive number
1414                 byte[] buffer = IntSerializer.serialize( nbValues );
1415                 serializedData.add( buffer );
1416                 dataSize += buffer.length;
1417 
1418                 // Now store each value
1419                 byte[] data = ( ( PersistedValueHolder<V> ) valueHolder ).getRaw();
1420                 buffer = IntSerializer.serialize( data.length );
1421                 serializedData.add( buffer );
1422                 dataSize += buffer.length;
1423 
1424                 if ( data.length > 0 )
1425                 {
1426                     serializedData.add( data );
1427                 }
1428 
1429                 dataSize += data.length;
1430             }
1431         }
1432 
1433         return dataSize;
1434     }
1435 
1436 
1437     /**
1438      * Write a root page with no elements in it
1439      */
1440     private PageIO[] serializeRootPage( long revision ) throws IOException
1441     {
1442         // We will have 1 single page if we have no elements
1443         PageIO[] pageIos = new PageIO[1];
1444 
1445         // This is either a new root page or a new page that will be filled later
1446         PageIO newPage = fetchNewPage();
1447 
1448         // We need first to create a byte[] that will contain all the data
1449         // For the root page, this is easy, as we only have to store the revision,
1450         // and the number of elements, which is 0.
1451         long position = 0L;
1452 
1453         position = store( position, revision, newPage );
1454         position = store( position, 0, newPage );
1455 
1456         // Update the page size now
1457         newPage.setSize( ( int ) position );
1458 
1459         // Insert the result into the array of PageIO
1460         pageIos[0] = newPage;
1461 
1462         return pageIos;
1463     }
1464 
1465 
1466     /**
1467      * Update the header, injecting the following data :
1468      * <pre>
1469      * +---------------+
1470      * | PageSize      | 4 bytes : The size of a physical page (default to 4096)
1471      * +---------------+
1472      * | NbTree        | 4 bytes : The number of managed BTrees (at least 1)
1473      * +---------------+
1474      * | FirstFree     | 8 bytes : The offset of the first free page
1475      * +---------------+
1476      * | currentBoB    | 1 byte : The current BoB in use
1477      * +---------------+
1478      * | BoB offset[0] | 8 bytes : The offset of the first BoB
1479      * +---------------+
1480      * | BoB offset[1] | 8 bytes : The offset of the second BoB
1481      * +---------------+
1482      * </pre>
1483      */
1484     public void updateRecordManagerHeader() throws IOException
1485     {
1486         // The page size
1487         HEADER_BYTES[0] = ( byte ) ( pageSize >>> 24 );
1488         HEADER_BYTES[1] = ( byte ) ( pageSize >>> 16 );
1489         HEADER_BYTES[2] = ( byte ) ( pageSize >>> 8 );
1490         HEADER_BYTES[3] = ( byte ) ( pageSize );
1491 
1492         // The number of managed BTree (currently we have only one : the discardedPage BTree
1493         HEADER_BYTES[4] = ( byte ) ( nbBtree >>> 24 );
1494         HEADER_BYTES[5] = ( byte ) ( nbBtree >>> 16 );
1495         HEADER_BYTES[6] = ( byte ) ( nbBtree >>> 8 );
1496         HEADER_BYTES[7] = ( byte ) ( nbBtree );
1497 
1498         // The first free page
1499         HEADER_BYTES[8] = ( byte ) ( firstFreePage >>> 56 );
1500         HEADER_BYTES[9] = ( byte ) ( firstFreePage >>> 48 );
1501         HEADER_BYTES[10] = ( byte ) ( firstFreePage >>> 40 );
1502         HEADER_BYTES[11] = ( byte ) ( firstFreePage >>> 32 );
1503         HEADER_BYTES[12] = ( byte ) ( firstFreePage >>> 24 );
1504         HEADER_BYTES[13] = ( byte ) ( firstFreePage >>> 16 );
1505         HEADER_BYTES[14] = ( byte ) ( firstFreePage >>> 8 );
1506         HEADER_BYTES[15] = ( byte ) ( firstFreePage );
1507 
1508         // The offset of the first BoB
1509         HEADER_BYTES[17] = ( byte ) ( bobOldRevision >>> 56 );
1510         HEADER_BYTES[18] = ( byte ) ( bobOldRevision >>> 48 );
1511         HEADER_BYTES[19] = ( byte ) ( bobOldRevision >>> 40 );
1512         HEADER_BYTES[20] = ( byte ) ( bobOldRevision >>> 32 );
1513         HEADER_BYTES[21] = ( byte ) ( bobOldRevision >>> 24 );
1514         HEADER_BYTES[22] = ( byte ) ( bobOldRevision >>> 16 );
1515         HEADER_BYTES[23] = ( byte ) ( bobOldRevision >>> 8 );
1516         HEADER_BYTES[24] = ( byte ) ( bobOldRevision );
1517 
1518         // The offset of the second BoB
1519         HEADER_BYTES[17] = ( byte ) ( bobCurrentRevision >>> 56 );
1520         HEADER_BYTES[18] = ( byte ) ( bobCurrentRevision >>> 48 );
1521         HEADER_BYTES[19] = ( byte ) ( bobCurrentRevision >>> 40 );
1522         HEADER_BYTES[20] = ( byte ) ( bobCurrentRevision >>> 32 );
1523         HEADER_BYTES[21] = ( byte ) ( bobCurrentRevision >>> 24 );
1524         HEADER_BYTES[22] = ( byte ) ( bobCurrentRevision >>> 16 );
1525         HEADER_BYTES[23] = ( byte ) ( bobCurrentRevision >>> 8 );
1526         HEADER_BYTES[24] = ( byte ) ( bobCurrentRevision );
1527 
1528         // Write the header on disk
1529         HEADER_BUFFER.put( HEADER_BYTES );
1530         HEADER_BUFFER.flip();
1531 
1532         LOG.debug( "Update RM header, FF : {}", firstFreePage );
1533         fileChannel.write( HEADER_BUFFER, 0 );
1534         HEADER_BUFFER.clear();
1535 
1536         nbUpdateRMHeader.incrementAndGet();
1537     }
1538 
1539 
1540     /**
1541      * Update the BTree header after a BTree modification. This will make the latest modification
1542      * visible.
1543      * We update the following fields :
1544      * <ul>
1545      * <li>the revision</li>
1546      * <li>the number of elements</li>
1547      * <li>the reference to the current BTree revisions</li>
1548      * <li>the reference to the old BTree revisions</li>
1549      * </ul>
1550      * @param btree
1551      * @throws IOException
1552      * @throws EndOfFileExceededException
1553      */
1554     /* No qualifier*/<K, V> void updateBtreeHeader( BTree<K, V> btree, long rootPageOffset )
1555         throws EndOfFileExceededException,
1556         IOException
1557     {
1558         // Read the pageIOs associated with this BTree
1559         long offset = ( ( PersistedBTree<K, V> ) btree ).getBtreeOffset();
1560         long headerSize = LONG_SIZE + LONG_SIZE + LONG_SIZE;
1561 
1562         PageIO[] pageIos = readPageIOs( offset, headerSize );
1563 
1564         // Now, update the revision
1565         long position = 0;
1566 
1567         position = store( position, btree.getRevision(), pageIos );
1568         position = store( position, btree.getNbElems(), pageIos );
1569         position = store( position, rootPageOffset, pageIos );
1570 
1571         // Write the pages on disk
1572         if ( LOG.isDebugEnabled() )
1573         {
1574             LOG.debug( "-----> Flushing the '{}' BTreeHeader", btree.getName() );
1575             LOG.debug( "  revision : " + btree.getRevision() + ", NbElems : " + btree.getNbElems() + ", root offset : "
1576                 + rootPageOffset );
1577         }
1578 
1579         flushPages( pageIos );
1580 
1581         nbUpdateBTreeHeader.incrementAndGet();
1582 
1583         if ( LOG_CHECK.isDebugEnabled() )
1584         {
1585             check();
1586         }
1587     }
1588 
1589 
1590     /**
1591      * Write the pages in the disk, either at the end of the file, or at
1592      * the position they were taken from.
1593      *
1594      * @param pageIos The list of pages to write
1595      * @throws IOException If the write failed
1596      */
1597     private void flushPages( PageIO... pageIos ) throws IOException
1598     {
1599         if ( LOG.isDebugEnabled() )
1600         {
1601             for ( PageIO pageIo : pageIos )
1602             {
1603                 dump( pageIo );
1604             }
1605         }
1606 
1607         for ( PageIO pageIo : pageIos )
1608         {
1609             pageIo.getData().rewind();
1610 
1611             if ( fileChannel.size() < ( pageIo.getOffset() + pageSize ) )
1612             {
1613                 LOG.debug( "Adding a page at the end of the file" );
1614                 // This is a page we have to add to the file
1615                 fileChannel.write( pageIo.getData(), fileChannel.size() );
1616                 //fileChannel.force( false );
1617             }
1618             else
1619             {
1620                 LOG.debug( "Writing a page at position {}", pageIo.getOffset() );
1621                 fileChannel.write( pageIo.getData(), pageIo.getOffset() );
1622                 //fileChannel.force( false );
1623             }
1624 
1625             nbUpdatePageIOs.incrementAndGet();
1626 
1627             pageIo.getData().rewind();
1628         }
1629     }
1630 
1631 
1632     /**
1633      * Compute the page in which we will store data given an offset, when
1634      * we have a list of pages.
1635      *
1636      * @param offset The position in the data
1637      * @return The page number in which the offset will start
1638      */
1639     private int computePageNb( long offset )
1640     {
1641         long pageNb = 0;
1642 
1643         offset -= pageSize - LINK_SIZE - PAGE_SIZE;
1644 
1645         if ( offset < 0 )
1646         {
1647             return ( int ) pageNb;
1648         }
1649 
1650         pageNb = 1 + offset / ( pageSize - LINK_SIZE );
1651 
1652         return ( int ) pageNb;
1653     }
1654 
1655 
1656     /**
1657      * Stores a byte[] into one ore more pageIO (depending if the long is stored
1658      * across a boundary or not)
1659      *
1660      * @param position The position in a virtual byte[] if all the pages were contiguous
1661      * @param bytes The byte[] to serialize
1662      * @param pageIos The pageIOs we have to store the data in
1663      * @return The new position
1664      */
1665     private long store( long position, byte[] bytes, PageIO... pageIos )
1666     {
1667         if ( bytes != null )
1668         {
1669             // Write the bytes length
1670             position = store( position, bytes.length, pageIos );
1671 
1672             // Compute the page in which we will store the data given the
1673             // current position
1674             int pageNb = computePageNb( position );
1675 
1676             // Get back the buffer in this page
1677             ByteBuffer pageData = pageIos[pageNb].getData();
1678 
1679             // Compute the position in the current page
1680             int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
1681 
1682             // Compute the remaining size in the page
1683             int remaining = pageData.capacity() - pagePos;
1684             int nbStored = bytes.length;
1685 
1686             // And now, write the bytes until we have none
1687             while ( nbStored > 0 )
1688             {
1689                 if ( remaining > nbStored )
1690                 {
1691                     pageData.mark();
1692                     pageData.position( pagePos );
1693                     pageData.put( bytes, bytes.length - nbStored, nbStored );
1694                     pageData.reset();
1695                     nbStored = 0;
1696                 }
1697                 else
1698                 {
1699                     pageData.mark();
1700                     pageData.position( pagePos );
1701                     pageData.put( bytes, bytes.length - nbStored, remaining );
1702                     pageData.reset();
1703                     pageNb++;
1704                     pageData = pageIos[pageNb].getData();
1705                     pagePos = LINK_SIZE;
1706                     nbStored -= remaining;
1707                     remaining = pageData.capacity() - pagePos;
1708                 }
1709             }
1710 
1711             // We are done
1712             position += bytes.length;
1713         }
1714         else
1715         {
1716             // No bytes : write 0 and return
1717             position = store( position, 0, pageIos );
1718         }
1719 
1720         return position;
1721     }
1722 
1723 
1724     /**
1725      * Stores a byte[] into one ore more pageIO (depending if the long is stored
1726      * across a boundary or not). We don't add the byte[] size, it's already present
1727      * in the received byte[].
1728      *
1729      * @param position The position in a virtual byte[] if all the pages were contiguous
1730      * @param bytes The byte[] to serialize
1731      * @param pageIos The pageIOs we have to store the data in
1732      * @return The new position
1733      */
1734     private long storeRaw( long position, byte[] bytes, PageIO... pageIos )
1735     {
1736         if ( bytes != null )
1737         {
1738             // Compute the page in which we will store the data given the
1739             // current position
1740             int pageNb = computePageNb( position );
1741 
1742             // Get back the buffer in this page
1743             ByteBuffer pageData = pageIos[pageNb].getData();
1744 
1745             // Compute the position in the current page
1746             int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
1747 
1748             // Compute the remaining size in the page
1749             int remaining = pageData.capacity() - pagePos;
1750             int nbStored = bytes.length;
1751 
1752             // And now, write the bytes until we have none
1753             while ( nbStored > 0 )
1754             {
1755                 if ( remaining > nbStored )
1756                 {
1757                     pageData.mark();
1758                     pageData.position( pagePos );
1759                     pageData.put( bytes, bytes.length - nbStored, nbStored );
1760                     pageData.reset();
1761                     nbStored = 0;
1762                 }
1763                 else
1764                 {
1765                     pageData.mark();
1766                     pageData.position( pagePos );
1767                     pageData.put( bytes, bytes.length - nbStored, remaining );
1768                     pageData.reset();
1769                     pageNb++;
1770 
1771                     if ( pageNb == pageIos.length )
1772                     {
1773                         // We can stop here : we have reach the end of the page
1774                         break;
1775                     }
1776 
1777                     pageData = pageIos[pageNb].getData();
1778                     pagePos = LINK_SIZE;
1779                     nbStored -= remaining;
1780                     remaining = pageData.capacity() - pagePos;
1781                 }
1782             }
1783 
1784             // We are done
1785             position += bytes.length;
1786         }
1787         else
1788         {
1789             // No bytes : write 0 and return
1790             position = store( position, 0, pageIos );
1791         }
1792 
1793         return position;
1794     }
1795 
1796 
1797     /**
1798      * Stores an Integer into one ore more pageIO (depending if the int is stored
1799      * across a boundary or not)
1800      *
1801      * @param position The position in a virtual byte[] if all the pages were contiguous
1802      * @param value The int to serialize
1803      * @param pageIos The pageIOs we have to store the data in
1804      * @return The new position
1805      */
1806     private long store( long position, int value, PageIO... pageIos )
1807     {
1808         // Compute the page in which we will store the data given the
1809         // current position
1810         int pageNb = computePageNb( position );
1811 
1812         // Compute the position in the current page
1813         int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
1814 
1815         // Get back the buffer in this page
1816         ByteBuffer pageData = pageIos[pageNb].getData();
1817 
1818         // Compute the remaining size in the page
1819         int remaining = pageData.capacity() - pagePos;
1820 
1821         if ( remaining < INT_SIZE )
1822         {
1823             // We have to copy the serialized length on two pages
1824 
1825             switch ( remaining )
1826             {
1827                 case 3:
1828                     pageData.put( pagePos + 2, ( byte ) ( value >>> 8 ) );
1829                     // Fallthrough !!!
1830 
1831                 case 2:
1832                     pageData.put( pagePos + 1, ( byte ) ( value >>> 16 ) );
1833                     // Fallthrough !!!
1834 
1835                 case 1:
1836                     pageData.put( pagePos, ( byte ) ( value >>> 24 ) );
1837                     break;
1838             }
1839 
1840             // Now deal with the next page
1841             pageData = pageIos[pageNb + 1].getData();
1842             pagePos = LINK_SIZE;
1843 
1844             switch ( remaining )
1845             {
1846                 case 1:
1847                     pageData.put( pagePos, ( byte ) ( value >>> 16 ) );
1848                     // fallthrough !!!
1849 
1850                 case 2:
1851                     pageData.put( pagePos + 2 - remaining, ( byte ) ( value >>> 8 ) );
1852                     // fallthrough !!!
1853 
1854                 case 3:
1855                     pageData.put( pagePos + 3 - remaining, ( byte ) ( value ) );
1856                     break;
1857             }
1858         }
1859         else
1860         {
1861             // Store the value in the page at the selected position
1862             pageData.putInt( pagePos, value );
1863         }
1864 
1865         // Increment the position to reflect the addition of an Int (4 bytes)
1866         position += INT_SIZE;
1867 
1868         return position;
1869     }
1870 
1871 
1872     /**
1873      * Stores a Long into one ore more pageIO (depending if the long is stored
1874      * across a boundary or not)
1875      *
1876      * @param position The position in a virtual byte[] if all the pages were contiguous
1877      * @param value The long to serialize
1878      * @param pageIos The pageIOs we have to store the data in
1879      * @return The new position
1880      */
1881     private long store( long position, long value, PageIO... pageIos )
1882     {
1883         // Compute the page in which we will store the data given the
1884         // current position
1885         int pageNb = computePageNb( position );
1886 
1887         // Compute the position in the current page
1888         int pagePos = ( int ) ( position + ( pageNb + 1 ) * LONG_SIZE + INT_SIZE ) - pageNb * pageSize;
1889 
1890         // Get back the buffer in this page
1891         ByteBuffer pageData = pageIos[pageNb].getData();
1892 
1893         // Compute the remaining size in the page
1894         int remaining = pageData.capacity() - pagePos;
1895 
1896         if ( remaining < LONG_SIZE )
1897         {
1898             // We have to copy the serialized length on two pages
1899 
1900             switch ( remaining )
1901             {
1902                 case 7:
1903                     pageData.put( pagePos + 6, ( byte ) ( value >>> 8 ) );
1904                     // Fallthrough !!!
1905 
1906                 case 6:
1907                     pageData.put( pagePos + 5, ( byte ) ( value >>> 16 ) );
1908                     // Fallthrough !!!
1909 
1910                 case 5:
1911                     pageData.put( pagePos + 4, ( byte ) ( value >>> 24 ) );
1912                     // Fallthrough !!!
1913 
1914                 case 4:
1915                     pageData.put( pagePos + 3, ( byte ) ( value >>> 32 ) );
1916                     // Fallthrough !!!
1917 
1918                 case 3:
1919                     pageData.put( pagePos + 2, ( byte ) ( value >>> 40 ) );
1920                     // Fallthrough !!!
1921 
1922                 case 2:
1923                     pageData.put( pagePos + 1, ( byte ) ( value >>> 48 ) );
1924                     // Fallthrough !!!
1925 
1926                 case 1:
1927                     pageData.put( pagePos, ( byte ) ( value >>> 56 ) );
1928                     break;
1929             }
1930 
1931             // Now deal with the next page
1932             pageData = pageIos[pageNb + 1].getData();
1933             pagePos = LINK_SIZE;
1934 
1935             switch ( remaining )
1936             {
1937                 case 1:
1938                     pageData.put( pagePos, ( byte ) ( value >>> 48 ) );
1939                     // fallthrough !!!
1940 
1941                 case 2:
1942                     pageData.put( pagePos + 2 - remaining, ( byte ) ( value >>> 40 ) );
1943                     // fallthrough !!!
1944 
1945                 case 3:
1946                     pageData.put( pagePos + 3 - remaining, ( byte ) ( value >>> 32 ) );
1947                     // fallthrough !!!
1948 
1949                 case 4:
1950                     pageData.put( pagePos + 4 - remaining, ( byte ) ( value >>> 24 ) );
1951                     // fallthrough !!!
1952 
1953                 case 5:
1954                     pageData.put( pagePos + 5 - remaining, ( byte ) ( value >>> 16 ) );
1955                     // fallthrough !!!
1956 
1957                 case 6:
1958                     pageData.put( pagePos + 6 - remaining, ( byte ) ( value >>> 8 ) );
1959                     // fallthrough !!!
1960 
1961                 case 7:
1962                     pageData.put( pagePos + 7 - remaining, ( byte ) ( value ) );
1963                     break;
1964             }
1965         }
1966         else
1967         {
1968             // Store the value in the page at the selected position
1969             pageData.putLong( pagePos, value );
1970         }
1971 
1972         // Increment the position to reflect the addition of an Long (8 bytes)
1973         position += LONG_SIZE;
1974 
1975         return position;
1976     }
1977 
1978 
1979     /**
1980      * Stores a new page on disk. We will add the modified page into the tree of copied pages.
1981      * The new page is serialized and saved on disk.
1982      *
1983      * @param btree The persistedBTree we will create a new PageHolder for
1984      * @param newPage The page to write on disk
1985      * @param newRevision The page's revision
1986      * @return A PageHolder containing the copied page
1987      * @throws IOException If the page can't be written on disk
1988      */
1989     /* No qualifier*/<K, V> PageHolder<K, V> writePage( BTree<K, V> btree, Page<K, V> newPage,
1990         long newRevision ) throws IOException
1991     {
1992         // We first need to save the new page on disk
1993         PageIO[] pageIos = serializePage( btree, newRevision, newPage );
1994 
1995         LOG.debug( "Write data for '{}' btree ", btree.getName() );
1996 
1997         // Write the page on disk
1998         flushPages( pageIos );
1999 
2000         // Build the resulting reference
2001         long offset = pageIos[0].getOffset();
2002         long lastOffset = pageIos[pageIos.length - 1].getOffset();
2003         PersistedPageHolder<K, V> pageHolder = new PersistedPageHolder<K, V>( btree, newPage, offset,
2004             lastOffset );
2005 
2006         if ( LOG_CHECK.isDebugEnabled() )
2007         {
2008             check();
2009         }
2010 
2011         return pageHolder;
2012     }
2013 
2014 
2015     /**
2016      * Compute the number of pages needed to store some specific size of data.
2017      *
2018      * @param dataSize The size of the data we want to store in pages
2019      * @return The number of pages needed
2020      */
2021     private int computeNbPages( int dataSize )
2022     {
2023         if ( dataSize <= 0 )
2024         {
2025             return 0;
2026         }
2027 
2028         // Compute the number of pages needed.
2029         // Considering that each page can contain PageSize bytes,
2030         // but that the first 8 bytes are used for links and we
2031         // use 4 bytes to store the data size, the number of needed
2032         // pages is :
2033         // NbPages = ( (dataSize - (PageSize - 8 - 4 )) / (PageSize - 8) ) + 1
2034         // NbPages += ( if (dataSize - (PageSize - 8 - 4 )) % (PageSize - 8) > 0 : 1 : 0 )
2035         int availableSize = ( pageSize - LONG_SIZE );
2036         int nbNeededPages = 1;
2037 
2038         // Compute the number of pages that will be full but the first page
2039         if ( dataSize > availableSize - INT_SIZE )
2040         {
2041             int remainingSize = dataSize - ( availableSize - INT_SIZE );
2042             nbNeededPages += remainingSize / availableSize;
2043             int remain = remainingSize % availableSize;
2044 
2045             if ( remain > 0 )
2046             {
2047                 nbNeededPages++;
2048             }
2049         }
2050 
2051         return nbNeededPages;
2052     }
2053 
2054 
2055     /**
2056      * Get as many pages as needed to store the data of the given size
2057      *
2058      * @param dataSize The data size
2059      * @return An array of pages, enough to store the full data
2060      */
2061     private PageIO[] getFreePageIOs( int dataSize ) throws IOException
2062     {
2063         if ( dataSize == 0 )
2064         {
2065             return new PageIO[]
2066                 {};
2067         }
2068 
2069         int nbNeededPages = computeNbPages( dataSize );
2070 
2071         PageIO[] pageIOs = new PageIO[nbNeededPages];
2072 
2073         // The first page : set the size
2074         pageIOs[0] = fetchNewPage();
2075         pageIOs[0].setSize( dataSize );
2076 
2077         for ( int i = 1; i < nbNeededPages; i++ )
2078         {
2079             pageIOs[i] = fetchNewPage();
2080 
2081             // Create the link
2082             pageIOs[i - 1].setNextPage( pageIOs[i].getOffset() );
2083         }
2084 
2085         return pageIOs;
2086     }
2087 
2088 
2089     /**
2090      * Return a new Page. We take one of the existing free pages, or we create
2091      * a new page at the end of the file.
2092      *
2093      * @return The fetched PageIO
2094      */
2095     private PageIO fetchNewPage() throws IOException
2096     {
2097         if ( firstFreePage == NO_PAGE )
2098         {
2099             nbCreatedPages.incrementAndGet();
2100 
2101             // We don't have any free page. Reclaim some new page at the end
2102             // of the file
2103             PageIO newPage = new PageIO( endOfFileOffset );
2104 
2105             endOfFileOffset += pageSize;
2106 
2107             ByteBuffer data = ByteBuffer.allocateDirect( pageSize );
2108 
2109             newPage.setData( data );
2110             newPage.setNextPage( NO_PAGE );
2111             newPage.setSize( 0 );
2112 
2113             LOG.debug( "Requiring a new page at offset {}", newPage.getOffset() );
2114 
2115             return newPage;
2116         }
2117         else
2118         {
2119             nbReusedPages.incrementAndGet();
2120 
2121             // We have some existing free page. Fetch it from disk
2122             PageIO pageIo = fetchPage( firstFreePage );
2123 
2124             // Update the firstFreePage pointer
2125             firstFreePage = pageIo.getNextPage();
2126 
2127             // overwrite the data of old page
2128             ByteBuffer data = ByteBuffer.allocateDirect( pageSize );
2129             pageIo.setData( data );
2130 
2131             pageIo.setNextPage( NO_PAGE );
2132             pageIo.setSize( 0 );
2133 
2134             LOG.debug( "Reused page at offset {}", pageIo.getOffset() );
2135 
2136             return pageIo;
2137         }
2138     }
2139 
2140 
2141     /**
2142      * fetch a page from disk, knowing its position in the file.
2143      *
2144      * @param offset The position in the file
2145      * @return The found page
2146      */
2147     private PageIO fetchPage( long offset ) throws IOException, EndOfFileExceededException
2148     {
2149         if ( fileChannel.size() < offset + pageSize )
2150         {
2151             // Error : we are past the end of the file
2152             throw new EndOfFileExceededException( "We are fetching a page on " + offset +
2153                 " when the file's size is " + fileChannel.size() );
2154         }
2155         else
2156         {
2157             // Read the page
2158             fileChannel.position( offset );
2159 
2160             ByteBuffer data = ByteBuffer.allocate( pageSize );
2161             fileChannel.read( data );
2162             data.rewind();
2163 
2164             PageIO readPage = new PageIO( offset );
2165             readPage.setData( data );
2166 
2167             return readPage;
2168         }
2169     }
2170 
2171 
2172     /**
2173      * @return the pageSize
2174      */
2175     public int getPageSize()
2176     {
2177         return pageSize;
2178     }
2179 
2180 
2181     public void setPageSize( int pageSize )
2182     {
2183         if ( this.pageSize != -1 )
2184         {
2185         }
2186         else
2187         {
2188             this.pageSize = pageSize;
2189         }
2190     }
2191 
2192 
2193     /**
2194      * Close the RecordManager and flush everything on disk
2195      */
2196     public void close() throws IOException
2197     {
2198         // TODO : we must wait for the last write to finish
2199 
2200         for ( BTree<Object, Object> tree : managedBTrees.values() )
2201         {
2202             tree.close();
2203         }
2204 
2205         managedBTrees.clear();
2206 
2207         // Write the data
2208         fileChannel.force( true );
2209 
2210         // And close the channel
2211         fileChannel.close();
2212     }
2213 
2214     /** Hex chars */
2215     private static final byte[] HEX_CHAR = new byte[]
2216         { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' };
2217 
2218 
2219     public static String dump( byte octet )
2220     {
2221         return new String( new byte[]
2222             { HEX_CHAR[( octet & 0x00F0 ) >> 4], HEX_CHAR[octet & 0x000F] } );
2223     }
2224 
2225 
2226     /**
2227      * Dump a pageIO
2228      */
2229     private void dump( PageIO pageIo )
2230     {
2231         ByteBuffer buffer = pageIo.getData();
2232         buffer.mark();
2233         byte[] longBuffer = new byte[LONG_SIZE];
2234         byte[] intBuffer = new byte[INT_SIZE];
2235 
2236         // get the next page offset
2237         buffer.get( longBuffer );
2238         long nextOffset = LongSerializer.deserialize( longBuffer );
2239 
2240         // Get the data size
2241         buffer.get( intBuffer );
2242         int size = IntSerializer.deserialize( intBuffer );
2243 
2244         buffer.reset();
2245 
2246         System.out.println( "PageIO[" + Long.toHexString( pageIo.getOffset() ) + "], size = " + size + ", NEXT PageIO:"
2247             + Long.toHexString( nextOffset ) );
2248         System.out.println( " 0  1  2  3  4  5  6  7  8  9  A  B  C  D  E  F " );
2249         System.out.println( "+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+" );
2250 
2251         int position = buffer.position();
2252 
2253         for ( int i = 0; i < buffer.limit(); i += 16 )
2254         {
2255             System.out.print( "|" );
2256 
2257             for ( int j = 0; j < 16; j++ )
2258             {
2259                 System.out.print( dump( buffer.get() ) );
2260 
2261                 if ( j == 15 )
2262                 {
2263                     System.out.println( "|" );
2264                 }
2265                 else
2266                 {
2267                     System.out.print( " " );
2268                 }
2269             }
2270         }
2271 
2272         System.out.println( "+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+--+" );
2273 
2274         buffer.reset();
2275     }
2276 
2277 
2278     /**
2279      * Dump the RecordManager file
2280      * @throws IOException
2281      */
2282     public void dump() throws IOException
2283     {
2284         RandomAccessFile randomFile = new RandomAccessFile( file, "r" );
2285         FileChannel fileChannel = randomFile.getChannel();
2286 
2287         ByteBuffer header = ByteBuffer.allocate( HEADER_SIZE );
2288 
2289         // load the header
2290         fileChannel.read( header );
2291 
2292         header.rewind();
2293 
2294         // The page size
2295         int pageSize = header.getInt();
2296 
2297         // The number of managed BTrees
2298         int nbBTree = header.getInt();
2299 
2300         // The first and last free page
2301         long firstFreePage = header.getLong();
2302 
2303         if ( LOG.isDebugEnabled() )
2304         {
2305             LOG.debug( "RecordManager" );
2306             LOG.debug( "-------------" );
2307             LOG.debug( "  Header " );
2308             LOG.debug( "    '{}'", Strings.dumpBytes( header.array() ) );
2309             LOG.debug( "    page size : {}", pageSize );
2310             LOG.debug( "    nbTree : {}", nbBTree );
2311             LOG.debug( "    firstFreePage : {}", firstFreePage );
2312         }
2313 
2314         long position = HEADER_SIZE;
2315 
2316         // Dump the BTrees
2317         for ( int i = 0; i < nbBTree; i++ )
2318         {
2319             LOG.debug( "  Btree[{}]", i );
2320             PageIO[] pageIos = readPageIOs( position, Long.MAX_VALUE );
2321 
2322             for ( PageIO pageIo : pageIos )
2323             {
2324                 LOG.debug( "    {}", pageIo );
2325             }
2326         }
2327 
2328         randomFile.close();
2329     }
2330 
2331 
2332     /**
2333      * Get the number of managed trees. We don't count the CopiedPage BTree. and the Revsion BTree
2334      *
2335      * @return The number of managed BTrees
2336      */
2337     public int getNbManagedTrees()
2338     {
2339         return nbBtree - 2;
2340     }
2341 
2342 
2343     /**
2344      * Get the managed trees. We don't return the CopiedPage BTree nor the Revision BTree.
2345      *
2346      * @return The managed BTrees
2347      */
2348     public Set<String> getManagedTrees()
2349     {
2350         Set<String> btrees = new HashSet<String>( managedBTrees.keySet() );
2351 
2352         btrees.remove( COPIED_PAGE_BTREE_NAME );
2353         btrees.remove( REVISION_BTREE_NAME );
2354 
2355         return btrees;
2356     }
2357 
2358 
2359     /**
2360      * Store a reference to an old rootPage into the Revision BTree
2361      *
2362      * @param btree The BTree we want to keep an old RootPage for
2363      * @param rootPage The old rootPage
2364      * @throws IOException If we have an issue while writing on disk
2365      */
2366     /* No qualifier */<K, V> void storeRootPage( BTree<K, V> btree, Page<K, V> rootPage ) throws IOException
2367     {
2368         if ( !isKeepRevisions() )
2369         {
2370             return;
2371         }
2372 
2373         if ( ( btree == copiedPageBTree ) || ( btree == revisionBTree ) )
2374         {
2375             return;
2376         }
2377 
2378         RevisionName revisionName = new RevisionName( rootPage.getRevision(), btree.getName() );
2379 
2380         ( ( AbstractBTree<RevisionName, Long> ) revisionBTree ).insert( revisionName,
2381             ( ( AbstractPage<K, V> ) rootPage ).getOffset(), 0 );
2382 
2383         if ( LOG_CHECK.isDebugEnabled() )
2384         {
2385             check();
2386         }
2387     }
2388 
2389 
2390     /**
2391      * Fetch the rootPage of a given BTree for a given revision.
2392      *
2393      * @param btree The BTree we are interested in
2394      * @param revision The revision we want to get back
2395      * @return The rootPage for this BTree and this revision, if any
2396      * @throws KeyNotFoundException If we can't find the rootPage for this revision and this BTree
2397      * @throws IOException If we had an ise while accessing the data on disk
2398      */
2399     /* No qualifier */<K, V> Page<K, V> getRootPage( BTree<K, V> btree, long revision ) throws KeyNotFoundException,
2400         IOException
2401     {
2402         if ( btree.getRevision() == revision )
2403         {
2404             // We are asking for the current revision
2405             return btree.getRootPage();
2406         }
2407 
2408         RevisionName revisionName = new RevisionName( revision, btree.getName() );
2409         long rootPageOffset = revisionBTree.get( revisionName );
2410 
2411         // Read the rootPage pages on disk
2412         PageIO[] rootPageIos = readPageIOs( rootPageOffset, Long.MAX_VALUE );
2413 
2414         Page<K, V> btreeRoot = readPage( btree, rootPageIos );
2415 
2416         return btreeRoot;
2417     }
2418 
2419 
2420     /**
2421      * Get one managed trees, knowing its name.
2422      *
2423      * @return The managed BTrees
2424      */
2425     public <K, V> BTree<K, V> getManagedTree( String name )
2426     {
2427         return ( BTree<K, V> ) managedBTrees.get( name );
2428     }
2429 
2430 
2431     /**
2432      * Move a list of pages to the free page list. A logical page is associated with on
2433      * or physical PageIO, which are on the disk. We have to move all those PagIO instance
2434      * to the free list, and do the same in memory (we try to keep a reference to a set of
2435      * free pages.
2436      *
2437      * @param btree The BTree which were owning the pages
2438      * @param pages The pages to free
2439      * @throws IOException
2440      * @throws EndOfFileExceededException
2441      */
2442     /* Package protected */<K, V> void addFreePages( BTree<K, V> btree, List<Page<K, V>> pages )
2443         throws EndOfFileExceededException,
2444         IOException
2445     {
2446         if ( ( btree == copiedPageBTree ) || ( btree == revisionBTree ) )
2447         {
2448             return;
2449         }
2450 
2451         if ( ( pages == null ) || pages.isEmpty() )
2452         {
2453             return;
2454         }
2455 
2456         if ( !keepRevisions )
2457         {
2458             // if the btree doesn't keep revisions, we can safely move
2459             // the pages to the free page list.
2460             // NOTE : potential improvement : we can update the header only when
2461             // we have processed all the logical pages.
2462 
2463             for ( Page<K, V> page : pages )
2464             {
2465                 // Retrieve all the PageIO associated with this logical page
2466                 long firstOffset = ( ( AbstractPage<K, V> ) page ).getOffset();
2467 
2468                 // skip the page with offset 0, this is the first in-memory root page that
2469                 // was copied during first insert in a BTree.
2470                 // a Node or Leaf will *never* have 0 or -1 as its offset
2471                 if ( firstOffset == NO_PAGE )
2472                 {
2473                     continue;
2474                 }
2475 
2476                 long lastOffset = ( ( AbstractPage<K, V> ) page ).getLastOffset();
2477 
2478                 // Update the pointers
2479                 if ( firstFreePage == NO_PAGE )
2480                 {
2481                     // We don't have yet any free pageIos. The
2482                     // PageIOs for this Page will be used
2483                     firstFreePage = firstOffset;
2484                 }
2485                 else
2486                 {
2487                     // We add the Page's PageIOs before the
2488                     // existing free pages.
2489                     long offset = ( ( AbstractPage<K, V> ) page ).getLastOffset();
2490 
2491                     if ( offset == NO_PAGE )
2492                     {
2493                         offset = ( ( AbstractPage<K, V> ) page ).getOffset();
2494                     }
2495 
2496                     // Fetch the pageIO
2497                     PageIO pageIo = fetchPage( offset );
2498 
2499                     // Link it to the first free page
2500                     pageIo.setNextPage( firstFreePage );
2501 
2502                     LOG.debug( "Flushing the first free page" );
2503 
2504                     // And flush it to disk
2505                     flushPages( pageIo );
2506 
2507                     // We can update the firstFreePage offset
2508                     firstFreePage = firstOffset;
2509                 }
2510             }
2511         }
2512         else
2513         {
2514             LOG.debug( "We should not get there" );
2515 
2516             for ( Page<K, V> p : pages )
2517             {
2518                 addFreePage( btree, p );
2519             }
2520         }
2521     }
2522 
2523 
2524     /**
2525      *
2526      * TODO addFreePage.
2527      *
2528      * @param btree
2529      * @param freePage
2530      */
2531     private <K, V> void addFreePage( BTree<K, V> btree, Page<K, V> freePage )
2532     {
2533         try
2534         {
2535             RevisionName revision = new RevisionName( freePage.getRevision(), btree.getName() );
2536             long[] offsetArray = null;
2537 
2538             if ( copiedPageBTree.hasKey( revision ) )
2539             {
2540                 offsetArray = copiedPageBTree.get( revision );
2541                 long[] tmp = new long[offsetArray.length + 1];
2542                 System.arraycopy( offsetArray, 0, tmp, 0, offsetArray.length );
2543                 offsetArray = tmp;
2544             }
2545             else
2546             {
2547                 offsetArray = new long[1];
2548             }
2549 
2550             offsetArray[offsetArray.length - 1] = ( ( AbstractPage<K, V> ) freePage ).getOffset();
2551 
2552             ( ( AbstractBTree<RevisionName, long[]> ) copiedPageBTree ).insert( revision, offsetArray, 0 );
2553         }
2554         catch ( Exception e )
2555         {
2556             throw new FreePageException( e );
2557         }
2558     }
2559 
2560 
2561     /**
2562      * @return the keepRevisions
2563      */
2564     public boolean isKeepRevisions()
2565     {
2566         return keepRevisions;
2567     }
2568 
2569 
2570     /**
2571      * @param keepRevisions the keepRevisions to set
2572      */
2573     public void setKeepRevisions( boolean keepRevisions )
2574     {
2575         this.keepRevisions = keepRevisions;
2576     }
2577 
2578 
2579     /**
2580      * Creates a BTree and automatically adds it to the list of managed btrees
2581      *
2582      * @param name the name of the BTree
2583      * @param keySerializer key serializer
2584      * @param valueSerializer value serializer
2585      * @param allowDuplicates flag for allowing duplicate keys
2586      * @return a managed BTree
2587      * @throws IOException
2588      * @throws BTreeAlreadyManagedException
2589      */
2590     @SuppressWarnings("all")
2591     public <K, V> BTree<K, V> addBTree( String name, ElementSerializer<K> keySerializer,
2592         ElementSerializer<V> valueSerializer,
2593         boolean allowDuplicates ) throws IOException, BTreeAlreadyManagedException
2594     {
2595         PersistedBTreeConfiguration config = new PersistedBTreeConfiguration();
2596 
2597         config.setName( name );
2598         config.setKeySerializer( keySerializer );
2599         config.setValueSerializer( valueSerializer );
2600         config.setAllowDuplicates( allowDuplicates );
2601 
2602         BTree btree = new PersistedBTree( config );
2603         manage( btree );
2604 
2605         if ( LOG_CHECK.isDebugEnabled() )
2606         {
2607             check();
2608         }
2609 
2610         return btree;
2611     }
2612 
2613 
2614     private void setCheckedPage( long[] checkedPages, long offset, int pageSize )
2615     {
2616         long pageOffset = ( offset - HEADER_SIZE ) / pageSize;
2617         int index = ( int ) ( pageOffset / 64L );
2618         long mask = ( 1L << ( pageOffset % 64L ) );
2619         long bits = checkedPages[index];
2620 
2621         if ( ( bits & mask ) == 1 )
2622         {
2623             throw new RecordManagerException( "The page at : " + offset + " has already been checked" );
2624         }
2625 
2626         checkedPages[index] |= mask;
2627 
2628     }
2629 
2630 
2631     /**
2632      * Check the free pages
2633      *
2634      * @param checkedPages
2635      * @throws IOException
2636      */
2637     private void checkFreePages( long[] checkedPages, int pageSize, long firstFreePage )
2638         throws IOException
2639     {
2640         if ( firstFreePage == NO_PAGE )
2641         {
2642             return;
2643         }
2644 
2645         // Now, read all the free pages
2646         long currentOffset = firstFreePage;
2647         long fileSize = fileChannel.size();
2648 
2649         while ( currentOffset != NO_PAGE )
2650         {
2651             if ( currentOffset > fileSize )
2652             {
2653                 throw new FreePageException( "Wrong free page offset, above file size : " + currentOffset );
2654             }
2655 
2656             try
2657             {
2658                 PageIO pageIo = fetchPage( currentOffset );
2659 
2660                 if ( currentOffset != pageIo.getOffset() )
2661                 {
2662                     throw new InvalidBTreeException( "PageIO offset is incorrect : " + currentOffset + "-"
2663                         + pageIo.getOffset() );
2664                 }
2665 
2666                 setCheckedPage( checkedPages, currentOffset, pageSize );
2667 
2668                 long newOffset = pageIo.getNextPage();
2669                 currentOffset = newOffset;
2670             }
2671             catch ( IOException ioe )
2672             {
2673                 throw new InvalidBTreeException( "Cannot fetch page at : " + currentOffset );
2674             }
2675         }
2676     }
2677 
2678 
2679     /**
2680      * Check the root page for a given BTree
2681      * @throws IOException
2682      * @throws EndOfFileExceededException
2683      */
2684     private void checkRoot( long[] checkedPages, long offset, int pageSize, long nbBTreeElems,
2685         ElementSerializer keySerializer, ElementSerializer valueSerializer, boolean allowDuplicates )
2686         throws EndOfFileExceededException, IOException
2687     {
2688         // Read the rootPage pages on disk
2689         PageIO[] rootPageIos = readPageIOs( offset, Long.MAX_VALUE );
2690 
2691         // Deserialize the rootPage now
2692         long position = 0L;
2693 
2694         // The revision
2695         long revision = readLong( rootPageIos, position );
2696         position += LONG_SIZE;
2697 
2698         // The number of elements in the page
2699         int nbElems = readInt( rootPageIos, position );
2700         position += INT_SIZE;
2701 
2702         // The size of the data containing the keys and values
2703         ByteBuffer byteBuffer = null;
2704 
2705         // Reads the bytes containing all the keys and values, if we have some
2706         ByteBuffer data = readBytes( rootPageIos, position );
2707 
2708         if ( nbElems >= 0 )
2709         {
2710             // Its a leaf
2711 
2712             // Check the page offset
2713             long pageOffset = rootPageIos[0].getOffset();
2714 
2715             if ( ( pageOffset < 0 ) || ( pageOffset > fileChannel.size() ) )
2716             {
2717                 throw new InvalidBTreeException( "The page offset is incorrect : " + pageOffset );
2718             }
2719 
2720             // Check the page last offset
2721             long pageLastOffset = rootPageIos[rootPageIos.length - 1].getOffset();
2722 
2723             if ( ( pageLastOffset <= 0 ) || ( pageLastOffset > fileChannel.size() ) )
2724             {
2725                 throw new InvalidBTreeException( "The page last offset is incorrect : " + pageLastOffset );
2726             }
2727 
2728             // Read each value and key
2729             for ( int i = 0; i < nbElems; i++ )
2730             {
2731                 // Just deserialize all the keys and values
2732                 if ( allowDuplicates )
2733                 {
2734                     /*
2735                     long value = OFFSET_SERIALIZER.deserialize( byteBuffer );
2736 
2737                     rootPageIos = readPageIOs( value, Long.MAX_VALUE );
2738 
2739                     BTree dupValueContainer = BTreeFactory.createBTree();
2740                     dupValueContainer.setBtreeOffset( value );
2741 
2742                     try
2743                     {
2744                         loadBTree( pageIos, dupValueContainer );
2745                     }
2746                     catch ( Exception e )
2747                     {
2748                         // should not happen
2749                         throw new InvalidBTreeException( e );
2750                     }
2751                     */
2752                 }
2753                 else
2754                 {
2755                     valueSerializer.deserialize( byteBuffer );
2756                 }
2757 
2758                 keySerializer.deserialize( byteBuffer );
2759             }
2760         }
2761         else
2762         {
2763             /*
2764             // It's a node
2765             int nodeNbElems = -nbElems;
2766 
2767             // Read each value and key
2768             for ( int i = 0; i < nodeNbElems; i++ )
2769             {
2770                 // This is an Offset
2771                 long offset = OFFSET_SERIALIZER.deserialize( byteBuffer );
2772                 long lastOffset = OFFSET_SERIALIZER.deserialize( byteBuffer );
2773 
2774                 ElementHolder valueHolder = new ReferenceHolder( btree, null, offset, lastOffset );
2775                 ( ( Node ) page ).setValue( i, valueHolder );
2776 
2777                 Object key = btree.getKeySerializer().deserialize( byteBuffer );
2778                 BTreeFactory.setKey( page, i, key );
2779             }
2780 
2781             // and read the last value, as it's a node
2782             long offset = OFFSET_SERIALIZER.deserialize( byteBuffer );
2783             long lastOffset = OFFSET_SERIALIZER.deserialize( byteBuffer );
2784 
2785             ElementHolder valueHolder = new ReferenceHolder( btree, null, offset, lastOffset );
2786             ( ( Node ) page ).setValue( nodeNbElems, valueHolder );*/
2787         }
2788     }
2789 
2790 
2791     /**
2792      * Check a BTree
2793      * @throws IllegalAccessException
2794      * @throws InstantiationException
2795      * @throws ClassNotFoundException
2796      */
2797     private long checkBTree( long[] checkedPages, PageIO[] pageIos, int pageSize, boolean isLast )
2798         throws EndOfFileExceededException, IOException, InstantiationException, IllegalAccessException,
2799         ClassNotFoundException
2800     {
2801         long dataPos = 0L;
2802 
2803         // The BTree current revision
2804         long revision = readLong( pageIos, dataPos );
2805         dataPos += LONG_SIZE;
2806 
2807         // The nb elems in the tree
2808         long nbElems = readLong( pageIos, dataPos );
2809         dataPos += LONG_SIZE;
2810 
2811         // The BTree rootPage offset
2812         long rootPageOffset = readLong( pageIos, dataPos );
2813 
2814         if ( ( rootPageOffset < 0 ) || ( rootPageOffset > fileChannel.size() ) )
2815         {
2816             throw new InvalidBTreeException( "The rootpage is incorrect : " + rootPageOffset );
2817         }
2818 
2819         dataPos += LONG_SIZE;
2820 
2821         // The next BTree offset
2822         long nextBTreeOffset = readLong( pageIos, dataPos );
2823 
2824         if ( ( ( rootPageOffset < 0 ) && ( !isLast ) ) || ( nextBTreeOffset > fileChannel.size() ) )
2825         {
2826             throw new InvalidBTreeException( "The rootpage is incorrect : " + rootPageOffset );
2827         }
2828 
2829         dataPos += LONG_SIZE;
2830 
2831         // The BTree page size
2832         int btreePageSize = readInt( pageIos, dataPos );
2833 
2834         if ( ( btreePageSize < 2 ) || ( ( btreePageSize & ( ~btreePageSize + 1 ) ) != btreePageSize ) )
2835         {
2836             throw new InvalidBTreeException( "The BTree page size is not a power of 2 : " + btreePageSize );
2837         }
2838 
2839         dataPos += INT_SIZE;
2840 
2841         // The tree name
2842         ByteBuffer btreeNameBytes = readBytes( pageIos, dataPos );
2843         dataPos += INT_SIZE;
2844 
2845         dataPos += btreeNameBytes.limit();
2846         String btreeName = Strings.utf8ToString( btreeNameBytes );
2847 
2848         // The keySerializer FQCN
2849         ByteBuffer keySerializerBytes = readBytes( pageIos, dataPos );
2850 
2851         String keySerializerFqcn = null;
2852         dataPos += INT_SIZE;
2853 
2854         if ( keySerializerBytes != null )
2855         {
2856             dataPos += keySerializerBytes.limit();
2857             keySerializerFqcn = Strings.utf8ToString( keySerializerBytes );
2858         }
2859         else
2860         {
2861             keySerializerFqcn = "";
2862         }
2863 
2864         // The valueSerialier FQCN
2865         ByteBuffer valueSerializerBytes = readBytes( pageIos, dataPos );
2866 
2867         String valueSerializerFqcn = null;
2868         dataPos += INT_SIZE;
2869 
2870         if ( valueSerializerBytes != null )
2871         {
2872             dataPos += valueSerializerBytes.limit();
2873             valueSerializerFqcn = Strings.utf8ToString( valueSerializerBytes );
2874         }
2875         else
2876         {
2877             valueSerializerFqcn = "";
2878         }
2879 
2880         // The BTree allowDuplicates flag
2881         int allowDuplicates = readInt( pageIos, dataPos );
2882         dataPos += INT_SIZE;
2883 
2884         // Now, check the rootPage, which can be a Leaf or a Node, depending
2885         // on the number of elements in the tree : if it's above the pageSize,
2886         // it's a Node, otherwise it's a Leaf
2887         Class<?> valueSerializer = Class.forName( valueSerializerFqcn );
2888         Class<?> keySerializer = Class.forName( keySerializerFqcn );
2889 
2890         /*
2891         checkRoot( checkedPages, rootPageOffset, pageSize, nbElems,
2892             ( ElementSerializer<?> ) keySerializer.newInstance(),
2893             ( ElementSerializer<?> ) valueSerializer.newInstance(), allowDuplicates != 0 );
2894         */
2895 
2896         return nextBTreeOffset;
2897     }
2898 
2899 
2900     /**
2901      * Check each BTree we manage
2902      * @throws IOException
2903      * @throws EndOfFileExceededException
2904      * @throws ClassNotFoundException
2905      * @throws IllegalAccessException
2906      * @throws InstantiationException
2907      */
2908     private void checkBTrees( long[] checkedPages, int pageSize, int nbBTrees ) throws EndOfFileExceededException,
2909         IOException, InstantiationException, IllegalAccessException, ClassNotFoundException
2910     {
2911         // Iterate on each BTree until we have exhausted all of them. The number
2912         // of btrees is just used to check that we have the correct number
2913         // of stored BTrees, as they are all linked.
2914         long position = HEADER_SIZE;
2915 
2916         for ( int i = 0; i < nbBTrees; i++ )
2917         {
2918             // Load the pageIOs containing the BTree
2919             PageIO[] pageIos = readPageIOs( position, Long.MAX_VALUE );
2920 
2921             // Check that they are correctly linked and not already used
2922             int pageNb = 0;
2923 
2924             for ( PageIO currentPageIo : pageIos )
2925             {
2926                 //
2927                 long nextPageOffset = currentPageIo.getNextPage();
2928 
2929                 if ( pageNb == pageIos.length - 1 )
2930                 {
2931                     if ( nextPageOffset != NO_PAGE )
2932                     {
2933                         throw new InvalidBTreeException( "The pointer to the next page is not valid, expected NO_PAGE" );
2934                     }
2935                 }
2936                 else
2937                 {
2938                     if ( nextPageOffset == NO_PAGE )
2939                     {
2940                         throw new InvalidBTreeException( "The pointer to the next page is not valid, NO_PAGE" );
2941                     }
2942                 }
2943 
2944                 if ( ( nextPageOffset != NO_PAGE ) && ( ( nextPageOffset - HEADER_SIZE ) % pageSize != 0 ) )
2945                 {
2946                     throw new InvalidBTreeException( "The pointer to the next page is not valid" );
2947                 }
2948 
2949                 // Update the array of processed pages
2950                 setCheckedPage( checkedPages, currentPageIo.getOffset(), pageSize );
2951             }
2952 
2953             // Now check the BTree
2954             long nextBTree = checkBTree( checkedPages, pageIos, pageSize, i == nbBTrees - 1 );
2955 
2956             if ( ( nextBTree == NO_PAGE ) && ( i < nbBTrees - 1 ) )
2957             {
2958                 throw new InvalidBTreeException( "The pointer to the next BTree is incorrect" );
2959             }
2960 
2961             position = nextBTree;
2962         }
2963     }
2964 
2965 
2966     /**
2967      * Check the whole file
2968      */
2969     private void check()
2970     {
2971         try
2972         {
2973             // First check the header
2974             ByteBuffer header = ByteBuffer.allocate( HEADER_SIZE );
2975             long fileSize = fileChannel.size();
2976 
2977             if ( fileSize < HEADER_SIZE )
2978             {
2979                 throw new InvalidBTreeException( "File size too small : " + fileSize );
2980             }
2981 
2982             // Read the header
2983             fileChannel.read( header, 0L );
2984             header.flip();
2985 
2986             // The page size. It must be a power of 2, and above 16.
2987             int pageSize = header.getInt();
2988 
2989             if ( ( pageSize < 0 ) || ( pageSize < 32 ) || ( ( pageSize & ( ~pageSize + 1 ) ) != pageSize ) )
2990             {
2991                 throw new InvalidBTreeException( "Wrong page size : " + pageSize );
2992             }
2993 
2994             // Compute the number of pages in this file
2995             long nbPages = ( fileSize - HEADER_SIZE ) / pageSize;
2996 
2997             // The number of trees. It must be at least 2 and > 0
2998             int nbBTrees = header.getInt();
2999 
3000             if ( nbBTrees < 0 )
3001             {
3002                 throw new InvalidBTreeException( "Wrong nb trees : " + nbBTrees );
3003             }
3004 
3005             // The first free page offset. It must be either -1 or below file size
3006             // and its value must be a modulo of pageSize
3007             long firstFreePage = header.getLong();
3008 
3009             if ( firstFreePage > fileSize )
3010             {
3011                 throw new InvalidBTreeException( "First free page pointing after the end of the file : "
3012                     + firstFreePage );
3013             }
3014 
3015             if ( ( firstFreePage != NO_PAGE ) && ( ( ( firstFreePage - HEADER_SIZE ) % pageSize ) != 0 ) )
3016             {
3017                 throw new InvalidBTreeException( "First free page not pointing to a correct offset : " + firstFreePage );
3018             }
3019 
3020             int nbPageBits = ( int ) ( nbPages / 64 );
3021 
3022             // Create an array of pages to be checked
3023             // We use one bit per page. It's 0 when the page
3024             // hasn't been checked, 1 otherwise.
3025             long[] checkedPages = new long[nbPageBits + 1];
3026 
3027             // Then the free files
3028             checkFreePages( checkedPages, pageSize, firstFreePage );
3029 
3030             // The BTrees
3031             checkBTrees( checkedPages, pageSize, nbBTrees );
3032         }
3033         catch ( Exception e )
3034         {
3035             // We catch the exception and rethrow it immediately to be able to
3036             // put a breakpoint here
3037             e.printStackTrace();
3038             throw new InvalidBTreeException( "Error : " + e.getMessage() );
3039         }
3040     }
3041 
3042 
3043     /**
3044      * Loads a BTree holding the values of a duplicate key
3045      * This tree is also called as dups tree or sub tree
3046      *
3047      * @param offset the offset of the BTree header
3048      * @return the deserialized BTree
3049      */
3050     /* No qualifier */<K, V> BTree<K, V> loadDupsBTree( long offset )
3051     {
3052         try
3053         {
3054             PageIO[] pageIos = readPageIOs( offset, Long.MAX_VALUE );
3055 
3056             BTree<K, V> subBtree = BTreeFactory.createPersistedBTree();
3057             ( ( PersistedBTree<K, V> ) subBtree ).setBtreeOffset( offset );
3058 
3059             loadBTree( pageIos, subBtree );
3060 
3061             return subBtree;
3062         }
3063         catch ( Exception e )
3064         {
3065             // should not happen
3066             throw new BTreeCreationException( e );
3067         }
3068     }
3069 
3070 
3071     /**
3072      * @return the pendingPages
3073      */
3074     /* no qualifier*/ <K, V> Map<Page<?, ?>, BTree<?, ?>> getPendingPages()
3075     {
3076         return pendingPages;
3077     }
3078 
3079 
3080     /**
3081      * @see Object#toString()
3082      */
3083     public String toString()
3084     {
3085         StringBuilder sb = new StringBuilder();
3086 
3087         sb.append( "RM free pages : [" );
3088 
3089         if ( firstFreePage != NO_PAGE )
3090         {
3091             long current = firstFreePage;
3092             boolean isFirst = true;
3093 
3094             while ( current != NO_PAGE )
3095             {
3096                 if ( isFirst )
3097                 {
3098                     isFirst = false;
3099                 }
3100                 else
3101                 {
3102                     sb.append( ", " );
3103                 }
3104 
3105                 PageIO pageIo;
3106 
3107                 try
3108                 {
3109                     pageIo = fetchPage( current );
3110                     sb.append( pageIo.getOffset() );
3111                     current = pageIo.getNextPage();
3112                 }
3113                 catch ( EndOfFileExceededException e )
3114                 {
3115                     e.printStackTrace();
3116                 }
3117                 catch ( IOException e )
3118                 {
3119                     e.printStackTrace();
3120                 }
3121 
3122             }
3123         }
3124 
3125         sb.append( "]" );
3126 
3127         return sb.toString();
3128     }
3129 }