View Javadoc
1   /*
2    *   Licensed to the Apache Software Foundation (ASF) under one
3    *   or more contributor license agreements.  See the NOTICE file
4    *   distributed with this work for additional information
5    *   regarding copyright ownership.  The ASF licenses this file
6    *   to you under the Apache License, Version 2.0 (the
7    *   "License"); you may not use this file except in compliance
8    *   with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing,
13   *   software distributed under the License is distributed on an
14   *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *   KIND, either express or implied.  See the License for the
16   *   specific language governing permissions and limitations
17   *   under the License.
18   *
19   */
20  
21  package org.apache.directory.mavibot.btree;
22  
23  
24  import java.io.File;
25  import java.io.FileInputStream;
26  import java.io.FileNotFoundException;
27  import java.io.FileOutputStream;
28  import java.io.IOException;
29  import java.util.ArrayList;
30  import java.util.Arrays;
31  import java.util.Comparator;
32  import java.util.HashMap;
33  import java.util.Iterator;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.Set;
37  import java.util.TreeMap;
38  import java.util.TreeSet;
39  
40  import org.apache.directory.mavibot.btree.comparator.IntComparator;
41  import org.apache.directory.mavibot.btree.exception.EndOfFileExceededException;
42  import org.apache.directory.mavibot.btree.exception.KeyNotFoundException;
43  import org.apache.directory.mavibot.btree.serializer.IntSerializer;
44  
45  
46  /**
47   * A class used to bulk load a BTree. It will allow the load of N elements in 
48   * a given BTree without to have to inject one by one, saving a lot of time.
49   * The second advantage is that the btree will be dense (the leaves will be
50   * complete, except the last one).
51   * This class can also be used to compact a BTree.
52   *
53   * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
54   */
55  public class BulkLoader<K, V>
56  {
57      static enum LevelEnum
58      {
59          LEAF,
60          NODE
61      }
62  
63      /**
64       * A private class to store informations on a level. We have to keep :
65       * <ul>
66       * <li>The number of elements to store in this level</li>
67       * <li>A flag that tells if it's a leaf or a node level</li>
68       * <li>The number of pages necessary to store all the elements in a level</li>
69       * <li>The number of elements we can store in a complete page (we may have one or two 
70       * incomplete pages at the end)</li>
71       * <li>A flag that tells if we have some incomplete page at the end</li>
72       * </ul>
73       * TODO LevelInfo.
74       *
75       * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
76       */
77      /*private*/class LevelInfo
78      {
79          /** The level number */
80          private int levelNumber;
81  
82          /** Nb of elements for this level */
83          /*private*/int nbElems;
84  
85          /** The number of pages in this level */
86          /*private*/int nbPages;
87  
88          /** Nb of elements before we reach an incomplete page */
89          /*private*/int nbElemsLimit;
90  
91          /** A flag that tells if the level contains nodes or leaves */
92          private boolean isNode;
93  
94          /** The current page which contains the data until we move it to the resulting BTree */
95          /*private*/Page<K, V> currentPage;
96  
97          /** The current position in the currentPage */
98          private int currentPos;
99  
100         /** The number of already added elements for this level */
101         private int nbAddedElems;
102 
103 
104         /** @see Object#toString() */
105         public String toString()
106         {
107             StringBuilder sb = new StringBuilder();
108 
109             if ( isNode )
110             {
111                 sb.append( "NodeLevel[" );
112                 sb.append( levelNumber );
113                 sb.append( "] :" );
114             }
115             else
116             {
117                 sb.append( "LeafLevel:" );
118             }
119 
120             sb.append( "\n    nbElems           = " ).append( nbElems );
121             sb.append( "\n    nbPages           = " ).append( nbPages );
122             sb.append( "\n    nbElemsLimit      = " ).append( nbElemsLimit );
123             sb.append( "\n    nbAddedElems      = " ).append( nbAddedElems );
124             sb.append( "\n    currentPos        = " ).append( currentPos );
125             sb.append( "\n    currentPage" );
126             sb.append( "\n        nbKeys : " ).append( currentPage.getNbElems() );
127 
128             return sb.toString();
129         }
130     }
131 
132 
133     /**
134      * Process the data, and creates files to store them sorted if necessary, or store them
135      * TODO readElements.
136      *
137      * @param btree
138      * @param iterator
139      * @param sortedFiles
140      * @param tuples
141      * @param chunkSize
142      * @return
143      * @throws IOException
144      */
145     private int readElements( BTree<K, V> btree, Iterator<Tuple<K, V>> iterator, List<File> sortedFiles,
146         List<Tuple<K, V>> tuples, int chunkSize ) throws IOException
147     {
148         int nbRead = 0;
149         int nbIteration = 0;
150         int nbElems = 0;
151         boolean inMemory = true;
152 
153         while ( true )
154         {
155             nbIteration++;
156             tuples.clear();
157 
158             // Read up to chukSize elements
159             while ( iterator.hasNext() && ( nbRead < chunkSize ) )
160             {
161                 Tuple<K, V> tuple = iterator.next();
162                 tuples.add( tuple );
163                 nbRead++;
164             }
165 
166             if ( nbRead < chunkSize )
167             {
168                 if ( nbIteration != 1 )
169                 {
170                     // Flush the sorted data on disk and exit
171                     inMemory = false;
172 
173                     sortedFiles.add( flushToDisk( nbIteration, tuples, btree ) );
174                 }
175 
176                 // Update the number of read elements
177                 nbElems += nbRead;
178 
179                 break;
180             }
181             else
182             {
183                 if ( !iterator.hasNext() )
184                 {
185                     // special case : we have exactly chunkSize elements in the incoming data
186                     if ( nbIteration > 1 )
187                     {
188                         // Flush the sorted data on disk and exit
189                         inMemory = false;
190                         sortedFiles.add( flushToDisk( nbIteration, tuples, btree ) );
191                     }
192 
193                     // We have read all the data in one round trip, let's get out, no need
194                     // to store the data on disk
195 
196                     // Update the number of read elements
197                     nbElems += nbRead;
198 
199                     break;
200                 }
201 
202                 // We have read chunkSize elements, we have to sort them on disk
203                 nbElems += nbRead;
204                 nbRead = 0;
205                 sortedFiles.add( flushToDisk( nbIteration, tuples, btree ) );
206             }
207         }
208 
209         if ( !inMemory )
210         {
211             tuples.clear();
212         }
213 
214         return nbElems;
215     }
216 
217 
218     /**
219      * Read all the sorted files, and inject them into one single big file containing all the 
220      * sorted and merged elements.
221      * @throws IOException 
222      */
223     private Tuple<Iterator<Tuple<K, Set<V>>>, Integer> processFiles( BTree<K, V> btree,
224         Iterator<Tuple<K, Set<V>>> dataIterator ) throws IOException
225     {
226         File file = File.createTempFile( "sortedUnique", "data" );
227         file.deleteOnExit();
228         FileOutputStream fos = new FileOutputStream( file );
229 
230         // Number of read elements
231         int nbReads = 0;
232 
233         // Flush the tuples on disk
234         while ( dataIterator.hasNext() )
235         {
236             nbReads++;
237 
238             // grab a tuple
239             Tuple<K, Set<V>> tuple = dataIterator.next();
240 
241             // Serialize the key
242             byte[] bytesKey = btree.getKeySerializer().serialize( tuple.key );
243             fos.write( IntSerializer.serialize( bytesKey.length ) );
244             fos.write( bytesKey );
245 
246             // Serialize the number of values
247             int nbValues = tuple.getValue().size();
248             fos.write( IntSerializer.serialize( nbValues ) );
249 
250             // Serialize the values
251             for ( V value : tuple.getValue() )
252             {
253                 byte[] bytesValue = btree.getValueSerializer().serialize( value );
254 
255                 // Serialize the value
256                 fos.write( IntSerializer.serialize( bytesValue.length ) );
257                 fos.write( bytesValue );
258             }
259         }
260 
261         fos.flush();
262         fos.close();
263 
264         FileInputStream fis = new FileInputStream( file );
265         Iterator<Tuple<K, Set<V>>> uniqueIterator = createUniqueFileIterator( btree, fis );
266 
267         Tuple<Iterator<Tuple<K, Set<V>>>, Integer> result = new Tuple<Iterator<Tuple<K, Set<V>>>, Integer>(
268             uniqueIterator, nbReads );
269 
270         return result;
271     }
272 
273 
274     /**
275      * Bulk Load data into a persisted BTree
276      *
277      * @param btree The persisted BTree in which we want to load the data
278      * @param iterator The iterator over the data to bulkload
279      * @param chunkSize The number of elements we may store in memory at each iteration
280      * @throws IOException If there is a problem while processing the data
281      */
282     public BTree<K, V> load( PersistedBTree<K, V> btree, Iterator<Tuple<K, V>> iterator, int chunkSize )
283         throws IOException
284     {
285         if ( btree == null )
286         {
287             throw new RuntimeException( "Invalid BTree : it's null" );
288         }
289 
290         if ( iterator == null )
291         {
292             // Nothing to do...
293             return null;
294         }
295 
296         // Iterate through the elements by chunk
297         boolean inMemory = true;
298 
299         // The list of files we will use to store the sorted chunks
300         List<File> sortedFiles = new ArrayList<File>();
301 
302         // An array of chukSize tuple max
303         List<Tuple<K, V>> tuples = new ArrayList<Tuple<K, V>>( chunkSize );
304 
305         // Now, start to read all the tuples to sort them. We may use intermediate files
306         // for that purpose if we hit the threshold.
307         int nbElems = readElements( btree, iterator, sortedFiles, tuples, chunkSize );
308 
309         // If the tuple list is empty, we have to process the load based on files, not in memory
310         if ( nbElems > 0 )
311         {
312             inMemory = tuples.size() > 0;
313         }
314 
315         // Now that we have processed all the data, we can start storing them in the btree
316         Iterator<Tuple<K, Set<V>>> dataIterator = null;
317         FileInputStream[] streams = null;
318         BTree<K, V> resultBTree = null;
319 
320         if ( inMemory )
321         {
322             // Here, we have all the data in memory, no need to merge files
323             // We will build a simple iterator over the data
324             dataIterator = createTupleIterator( btree, tuples );
325             resultBTree = bulkLoad( btree, dataIterator, nbElems );
326         }
327         else
328         {
329             // We first have to build an iterator over the files
330             int nbFiles = sortedFiles.size();
331             streams = new FileInputStream[nbFiles];
332 
333             for ( int i = 0; i < nbFiles; i++ )
334             {
335                 streams[i] = new FileInputStream( sortedFiles.get( i ) );
336             }
337 
338             dataIterator = createIterator( btree, streams );
339 
340             // Process the files, and construct one single file with an iterator
341             Tuple<Iterator<Tuple<K, Set<V>>>, Integer> result = processFiles( btree, dataIterator );
342             resultBTree = bulkLoad( btree, result.key, result.value );
343         }
344 
345         // Ok, we have an iterator over sorted elements, we can now load them in the 
346         // target btree.
347         // Now, close the FileInputStream, and delete them if we have some
348         if ( !inMemory )
349         {
350             int nbFiles = sortedFiles.size();
351 
352             for ( int i = 0; i < nbFiles; i++ )
353             {
354                 streams[i].close();
355                 sortedFiles.get( i ).delete();
356             }
357         }
358 
359         return resultBTree;
360     }
361 
362 
363     /**
364      * Creates a node leaf LevelInfo based on the number of elements in the lower level. We can store
365      * up to PageSize + 1 references to pages in a node.
366      */
367     /* no qualifier*/LevelInfo computeLevel( BTree<K, V> btree, int nbElems, LevelEnum levelType )
368     {
369         int pageSize = btree.getPageSize();
370         int incrementNode = 0;
371 
372         if ( levelType == LevelEnum.NODE )
373         {
374             incrementNode = 1;
375         }
376 
377         LevelInfo level = new LevelInfo();
378         level.isNode = ( levelType == LevelEnum.NODE );
379         level.nbElems = nbElems;
380         level.nbPages = nbElems / ( pageSize + incrementNode );
381         level.levelNumber = 0;
382         level.nbAddedElems = 0;
383         level.currentPos = 0;
384 
385         // Create the first level page
386         if ( nbElems <= pageSize + incrementNode )
387         {
388             if ( nbElems % ( pageSize + incrementNode ) != 0 )
389             {
390                 level.nbPages = 1;
391             }
392 
393             level.nbElemsLimit = nbElems;
394 
395             if ( level.isNode )
396             {
397                 level.currentPage = BTreeFactory.createNode( btree, 0L, nbElems - 1 );
398             }
399             else
400             {
401                 level.currentPage = BTreeFactory.createLeaf( btree, 0L, nbElems );
402             }
403         }
404         else
405         {
406             int remaining = nbElems % ( pageSize + incrementNode );
407 
408             if ( remaining == 0 )
409             {
410                 level.nbElemsLimit = nbElems;
411 
412                 if ( level.isNode )
413                 {
414                     level.currentPage = BTreeFactory.createNode( btree, 0L, pageSize );
415                 }
416                 else
417                 {
418                     level.currentPage = BTreeFactory.createLeaf( btree, 0L, pageSize );
419                 }
420             }
421             else
422             {
423                 level.nbPages++;
424 
425                 if ( remaining < ( pageSize / 2 ) + incrementNode )
426                 {
427                     level.nbElemsLimit = nbElems - remaining - ( pageSize + incrementNode );
428 
429                     if ( level.nbElemsLimit > 0 )
430                     {
431                         if ( level.isNode )
432                         {
433                             level.currentPage = BTreeFactory.createNode( btree, 0L, pageSize );
434                         }
435                         else
436                         {
437                             level.currentPage = BTreeFactory.createLeaf( btree, 0L, pageSize );
438                         }
439                     }
440                     else
441                     {
442                         if ( level.isNode )
443                         {
444                             level.currentPage = BTreeFactory.createNode( btree, 0L, ( pageSize / 2 ) + remaining - 1 );
445                         }
446                         else
447                         {
448                             level.currentPage = BTreeFactory.createLeaf( btree, 0L, ( pageSize / 2 ) + remaining );
449                         }
450                     }
451                 }
452                 else
453                 {
454                     level.nbElemsLimit = nbElems - remaining;
455 
456                     if ( level.isNode )
457                     {
458                         level.currentPage = BTreeFactory.createNode( btree, 0L, pageSize );
459                     }
460                     else
461                     {
462                         level.currentPage = BTreeFactory.createLeaf( btree, 0L, pageSize );
463                     }
464                 }
465             }
466         }
467 
468         return level;
469     }
470 
471 
472     /**
473      * Compute the number of pages necessary to store all the elements per level. The resulting list is
474      * reversed ( ie the leaves are on the left, the root page on the right.
475      */
476     /* No Qualifier */List<LevelInfo> computeLevels( BTree<K, V> btree, int nbElems )
477     {
478         List<LevelInfo> levelList = new ArrayList<LevelInfo>();
479 
480         // Compute the leaves info
481         LevelInfo leafLevel = computeLevel( btree, nbElems, LevelEnum.LEAF );
482 
483         levelList.add( leafLevel );
484         int nbPages = leafLevel.nbPages;
485         int levelNumber = 1;
486 
487         while ( nbPages > 1 )
488         {
489             // Compute the Nodes info
490             LevelInfo nodeLevel = computeLevel( btree, nbPages, LevelEnum.NODE );
491             nodeLevel.levelNumber = levelNumber++;
492             levelList.add( nodeLevel );
493             nbPages = nodeLevel.nbPages;
494         }
495 
496         return levelList;
497     }
498 
499 
500     /**
501      * Inject a tuple into a leaf
502      */
503     private void injectInLeaf( BTree<K, V> btree, Tuple<K, Set<V>> tuple, LevelInfo leafLevel )
504     {
505         PersistedLeaf<K, V> leaf = ( PersistedLeaf<K, V> ) leafLevel.currentPage;
506 
507         KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), tuple.getKey() );
508         ValueHolder<V> valueHolder = new PersistedValueHolder<V>( btree, ( V[] ) tuple.getValue().toArray() );
509         leaf.setKey( leafLevel.currentPos, keyHolder );
510         leaf.setValue( leafLevel.currentPos, valueHolder );
511 
512         leafLevel.currentPos++;
513     }
514 
515 
516     private int computeNbElemsLeaf( BTree<K, V> btree, LevelInfo levelInfo )
517     {
518         int pageSize = btree.getPageSize();
519         int remaining = levelInfo.nbElems - levelInfo.nbAddedElems;
520 
521         if ( remaining < pageSize )
522         {
523             return remaining;
524         }
525         else if ( remaining == pageSize )
526         {
527             return pageSize;
528         }
529         else if ( remaining > levelInfo.nbElems - levelInfo.nbElemsLimit )
530         {
531             return pageSize;
532         }
533         else
534         {
535             return remaining - pageSize / 2;
536         }
537     }
538 
539 
540     /**
541      * Compute the number of nodes necessary to store all the elements.
542      */
543     /* No qualifier */int computeNbElemsNode( BTree<K, V> btree, LevelInfo levelInfo )
544     {
545         int pageSize = btree.getPageSize();
546         int remaining = levelInfo.nbElems - levelInfo.nbAddedElems;
547 
548         if ( remaining < pageSize + 1 )
549         {
550             return remaining;
551         }
552         else if ( remaining == pageSize + 1 )
553         {
554             return pageSize + 1;
555         }
556         else if ( remaining > levelInfo.nbElems - levelInfo.nbElemsLimit )
557         {
558             return pageSize + 1;
559         }
560         else
561         {
562             return remaining - pageSize / 2;
563         }
564     }
565 
566 
567     /**
568      * Inject a page reference into the root page.
569      */
570     private void injectInRoot( BTree<K, V> btree, Page<K, V> page, PageHolder<K, V> pageHolder, LevelInfo level )
571         throws IOException
572     {
573         PersistedNode<K, V> node = ( PersistedNode<K, V> ) level.currentPage;
574         if ( ( level.currentPos == 0 ) && ( node.getPage( 0 ) == null ) )
575 
576         {
577             node.setPageHolder( 0, pageHolder );
578             level.nbAddedElems++;
579         }
580         else
581         {
582             // Inject the pageHolder and the page leftmost key
583             node.setPageHolder( level.currentPos + 1, pageHolder );
584             KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), page.getLeftMostKey() );
585             node.setKey( level.currentPos, keyHolder );
586             level.currentPos++;
587             level.nbAddedElems++;
588 
589             // Check that we haven't added the last element. If so,
590             // we have to write the page on disk and update the btree
591             if ( level.nbAddedElems == level.nbElems )
592             {
593                 PageHolder<K, V> rootHolder = ( ( PersistedBTree<K, V> ) btree ).getRecordManager().writePage(
594                     btree, node, 0L );
595                 ( ( PersistedBTree<K, V> ) btree ).setRootPage( rootHolder.getValue() );
596             }
597         }
598 
599         return;
600     }
601 
602 
603     /**
604      * Inject a page reference into a Node. This method will recurse if needed.
605      */
606     private void injectInNode( BTree<K, V> btree, Page<K, V> page, List<LevelInfo> levels, int levelIndex )
607         throws IOException
608     {
609         int pageSize = btree.getPageSize();
610         LevelInfo level = levels.get( levelIndex );
611         PersistedNode<K, V> node = ( PersistedNode<K, V> ) level.currentPage;
612 
613         // We first have to write the page on disk
614         PageHolder<K, V> pageHolder = ( ( PersistedBTree<K, V> ) btree ).getRecordManager().writePage( btree, page, 0L );
615 
616         // First deal with a node that has less than PageSize elements at this level.
617         // It will become the root node.
618         if ( level.nbElems <= pageSize + 1 )
619         {
620             injectInRoot( btree, page, pageHolder, level );
621 
622             return;
623         }
624 
625         // Now, we have some parent node. We process the 3 different use case :
626         // o Full node before the limit
627         // o Node over the limit but with at least N/2 elements
628         // o Node over the limit but with elements spread into 2 nodes
629         if ( level.nbAddedElems < level.nbElemsLimit )
630         {
631             // Ok, we haven't yet reached the incomplete pages (if any).
632             // Let's add the page reference into the node
633             // There is one special case : when we are adding the very first page 
634             // reference into a node. In this case, we don't store the key
635             if ( ( level.currentPos == 0 ) && ( node.getKey( 0 ) == null ) )
636             {
637                 node.setPageHolder( 0, pageHolder );
638             }
639             else
640             {
641                 // Inject the pageHolder and the page leftmost key
642                 node.setPageHolder( level.currentPos, pageHolder );
643                 KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), page.getLeftMostKey() );
644                 node.setKey( level.currentPos - 1, keyHolder );
645             }
646 
647             // Now, increment this level nb of added elements
648             level.currentPos++;
649             level.nbAddedElems++;
650 
651             // Check that we haven't added the last element. If so,
652             // we have to write the page on disk and update the parent's node
653             if ( level.nbAddedElems == level.nbElems )
654             {
655                 //PageHolder<K, V> rootHolder = ( ( PersistedBTree<K, V> ) btree ).getRecordManager().writePage(
656                 //    btree, node, 0L );
657                 //( ( PersistedBTree<K, V> ) btree ).setRootPage( rootHolder.getValue() );
658                 injectInNode( btree, node, levels, levelIndex + 1 );
659 
660                 return;
661             }
662             else
663             {
664                 // Check that we haven't completed the current node, and that this is not the root node.
665                 if ( ( level.currentPos == pageSize + 1 ) && ( level.levelNumber < levels.size() - 1 ) )
666                 {
667                     // yes. We have to write the node on disk, update its parent
668                     // and create a new current node
669                     injectInNode( btree, node, levels, levelIndex + 1 );
670 
671                     // The page is full, we have to create a new one, with a size depending on the remaining elements
672                     if ( level.nbAddedElems < level.nbElemsLimit )
673                     {
674                         // We haven't reached the limit, create a new full node
675                         level.currentPage = BTreeFactory.createNode( btree, 0L, pageSize );
676                     }
677                     else if ( level.nbElems - level.nbAddedElems <= pageSize )
678                     {
679                         level.currentPage = BTreeFactory.createNode( btree, 0L, level.nbElems - level.nbAddedElems - 1 );
680                     }
681                     else
682                     {
683                         level.currentPage = BTreeFactory.createNode( btree, 0L, ( level.nbElems - 1 )
684                             - ( level.nbAddedElems + 1 ) - pageSize / 2 );
685                     }
686 
687                     level.currentPos = 0;
688                 }
689             }
690 
691             return;
692         }
693         else
694         {
695             // We are dealing with the last page or the last two pages 
696             // We can have either one single pages which can contain up to pageSize-1 elements
697             // or with two pages, the first one containing ( nbElems - limit ) - pageSize/2 elements
698             // and the second one will contain pageSize/2 elements. 
699             if ( level.nbElems - level.nbElemsLimit > pageSize )
700             {
701                 // As the remaining elements are above a page size, they will be spread across
702                 // two pages. We have two cases here, depending on the page we are filling
703                 if ( level.nbElems - level.nbAddedElems <= pageSize / 2 + 1 )
704                 {
705                     // As we have less than PageSize/2 elements to write, we are on the second page
706                     if ( ( level.currentPos == 0 ) && ( node.getKey( 0 ) == null ) )
707                     {
708                         node.setPageHolder( 0, pageHolder );
709                     }
710                     else
711                     {
712                         // Inject the pageHolder and the page leftmost key
713                         node.setPageHolder( level.currentPos, pageHolder );
714                         KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(),
715                             page.getLeftMostKey() );
716                         node.setKey( level.currentPos - 1, keyHolder );
717                     }
718 
719                     // Now, increment this level nb of added elements
720                     level.currentPos++;
721                     level.nbAddedElems++;
722 
723                     // Check if we are done with the page
724                     if ( level.nbAddedElems == level.nbElems )
725                     {
726                         // Yes, we have to update the parent
727                         injectInNode( btree, node, levels, levelIndex + 1 );
728                     }
729                 }
730                 else
731                 {
732                     // This is the first page 
733                     if ( ( level.currentPos == 0 ) && ( node.getKey( 0 ) == null ) )
734                     {
735                         // First element of the page
736                         node.setPageHolder( 0, pageHolder );
737                     }
738                     else
739                     {
740                         // Any other following elements
741                         // Inject the pageHolder and the page leftmost key
742                         node.setPageHolder( level.currentPos, pageHolder );
743                         KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(),
744                             page.getLeftMostKey() );
745                         node.setKey( level.currentPos - 1, keyHolder );
746                     }
747 
748                     // Now, increment this level nb of added elements
749                     level.currentPos++;
750                     level.nbAddedElems++;
751 
752                     // Check if we are done with the page
753                     if ( level.currentPos == node.getNbElems() + 1 )
754                     {
755                         // Yes, we have to update the parent
756                         injectInNode( btree, node, levels, levelIndex + 1 );
757 
758                         // An create a new one
759                         level.currentPage = BTreeFactory.createNode( btree, 0L, pageSize / 2 );
760                         level.currentPos = 0;
761                     }
762                 }
763             }
764             else
765             {
766                 // Two cases : we don't have anything else to write, or this is a single page
767                 if ( level.nbAddedElems == level.nbElems )
768                 {
769                     // We are done with the page
770                     injectInNode( btree, node, levels, levelIndex + 1 );
771                 }
772                 else
773                 {
774                     // We have some more elements to add in  the page
775                     // This is the first page 
776                     if ( ( level.currentPos == 0 ) && ( node.getKey( 0 ) == null ) )
777                     {
778                         // First element of the page
779                         node.setPageHolder( 0, pageHolder );
780                     }
781                     else
782                     {
783                         // Any other following elements
784                         // Inject the pageHolder and the page leftmost key
785                         node.setPageHolder( level.currentPos, pageHolder );
786                         KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(),
787                             page.getLeftMostKey() );
788                         node.setKey( level.currentPos - 1, keyHolder );
789                     }
790 
791                     // Now, increment this level nb of added elements
792                     level.currentPos++;
793                     level.nbAddedElems++;
794 
795                     // Check if we are done with the page
796                     if ( level.currentPos == node.getNbElems() + 1 )
797                     {
798                         // Yes, we have to update the parent
799                         injectInNode( btree, node, levels, levelIndex + 1 );
800 
801                         // An create a new one
802                         level.currentPage = BTreeFactory.createNode( btree, 0L, pageSize / 2 );
803                         level.currentPos = 0;
804                     }
805                 }
806 
807                 return;
808             }
809         }
810     }
811 
812 
813     private BTree<K, V> bulkLoadSinglePage( BTree<K, V> btree, Iterator<Tuple<K, Set<V>>> dataIterator, int nbElems )
814         throws IOException
815     {
816         // Create a new page
817         PersistedLeaf<K, V> rootPage = ( PersistedLeaf<K, V> ) BTreeFactory.createLeaf( btree, 0L, nbElems );
818 
819         // We first have to inject data into the page
820         int pos = 0;
821 
822         while ( dataIterator.hasNext() )
823         {
824             Tuple<K, Set<V>> tuple = dataIterator.next();
825 
826             // Store the current element in the rootPage
827             KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), tuple.getKey() );
828             ValueHolder<V> valueHolder = new PersistedValueHolder<V>( btree, ( V[] ) tuple.getValue().toArray() );
829             rootPage.setKey( pos, keyHolder );
830             rootPage.setValue( pos, valueHolder );
831             pos++;
832         }
833 
834         // Now write the page on disk
835         ( ( PersistedBTree<K, V> ) btree ).getRecordManager().writePage( btree, rootPage, 0L );
836 
837         // Update the btree with the rootPage and the nb of added elements
838         ( ( PersistedBTree<K, V> ) btree ).getBtreeHeader().setRootPage( rootPage );
839         ( ( PersistedBTree<K, V> ) btree ).getBtreeHeader().setNbElems( nbElems );
840 
841         return btree;
842     }
843 
844 
845     /**
846      * Construct the target BTree from the sorted data. We will use the nb of elements
847      * to determinate the structure of the BTree, as it must be balanced
848      */
849     private BTree<K, V> bulkLoad( BTree<K, V> btree, Iterator<Tuple<K, Set<V>>> dataIterator, int nbElems )
850         throws IOException
851     {
852         int pageSize = btree.getPageSize();
853 
854         // Special case : we can store all the element sin a single page
855         if ( nbElems <= pageSize )
856         {
857             return bulkLoadSinglePage( btree, dataIterator, nbElems );
858         }
859 
860         // Ok, we will need more than one page to store the elements, which
861         // means we also will need more than one level.
862         // First, compute the needed number of levels.
863         List<LevelInfo> levels = computeLevels( btree, nbElems );
864 
865         // Now, let's fill the levels
866         LevelInfo leafLevel = levels.get( 0 );
867 
868         while ( dataIterator.hasNext() )
869         {
870             // let's fill page up to the point all the complete pages have been filled
871             if ( leafLevel.nbAddedElems < leafLevel.nbElemsLimit )
872             {
873                 // grab a tuple
874                 Tuple<K, Set<V>> tuple = dataIterator.next();
875 
876                 injectInLeaf( btree, tuple, leafLevel );
877                 leafLevel.nbAddedElems++;
878 
879                 // The page is completed, update the parent's node and create a new current page
880                 if ( leafLevel.currentPos == pageSize )
881                 {
882                     injectInNode( btree, leafLevel.currentPage, levels, 1 );
883 
884                     // The page is full, we have to create a new one
885                     leafLevel.currentPage = BTreeFactory.createLeaf( btree, 0L, computeNbElemsLeaf( btree, leafLevel ) );
886                     leafLevel.currentPos = 0;
887                 }
888             }
889             else
890             {
891                 // We have to deal with uncompleted pages now (if we have any)
892                 if ( leafLevel.nbAddedElems == nbElems )
893                 {
894                     // First use case : we have injected all the elements in the btree : get out
895                     break;
896                 }
897 
898                 if ( nbElems - leafLevel.nbElemsLimit > pageSize )
899                 {
900                     // Second use case : the number of elements after the limit does not
901                     // fit in a page, that means we have to split it into
902                     // two pages
903 
904                     // First page will contain nbElems - leafLevel.nbElemsLimit - PageSize/2 elements
905                     int nbToAdd = nbElems - leafLevel.nbElemsLimit - pageSize / 2;
906 
907                     while ( nbToAdd > 0 )
908                     {
909                         // grab a tuple
910                         Tuple<K, Set<V>> tuple = dataIterator.next();
911 
912                         injectInLeaf( btree, tuple, leafLevel );
913                         leafLevel.nbAddedElems++;
914                         nbToAdd--;
915                     }
916 
917                     // Now inject the page into the node
918                     injectInNode( btree, leafLevel.currentPage, levels, 1 );
919 
920                     // Create a new page for the remaining elements
921                     nbToAdd = pageSize / 2;
922                     leafLevel.currentPage = BTreeFactory.createLeaf( btree, 0L, nbToAdd );
923                     leafLevel.currentPos = 0;
924 
925                     while ( nbToAdd > 0 )
926                     {
927                         // grab a tuple
928                         Tuple<K, Set<V>> tuple = dataIterator.next();
929 
930                         injectInLeaf( btree, tuple, leafLevel );
931                         leafLevel.nbAddedElems++;
932                         nbToAdd--;
933                     }
934 
935                     // And update the parent node
936                     injectInNode( btree, leafLevel.currentPage, levels, 1 );
937 
938                     // We are done
939                     break;
940                 }
941                 else
942                 {
943                     // Third use case : we can push all the elements in the last page.
944                     // Let's do it
945                     int nbToAdd = nbElems - leafLevel.nbElemsLimit;
946 
947                     while ( nbToAdd > 0 )
948                     {
949                         // grab a tuple
950                         Tuple<K, Set<V>> tuple = dataIterator.next();
951 
952                         injectInLeaf( btree, tuple, leafLevel );
953                         leafLevel.nbAddedElems++;
954                         nbToAdd--;
955                     }
956 
957                     // Now inject the page into the node
958                     injectInNode( btree, leafLevel.currentPage, levels, 1 );
959 
960                     // and we are done
961                     break;
962                 }
963             }
964         }
965 
966         return btree;
967     }
968 
969 
970     /**
971      * Flush a list of tuples to disk after having sorted them. In the process, we may have to gather the values
972      * for the tuples having the same keys.
973      * @throws IOException 
974      */
975     private File flushToDisk( int fileNb, List<Tuple<K, V>> tuples, BTree<K, V> btree ) throws IOException
976     {
977         // Sort the tuples. 
978         Tuple<K, Set<V>>[] sortedTuples = sort( btree, tuples );
979 
980         File file = File.createTempFile( "sorted", Integer.toString( fileNb ) );
981         file.deleteOnExit();
982         FileOutputStream fos = new FileOutputStream( file );
983 
984         // Flush the tuples on disk
985         for ( Tuple<K, Set<V>> tuple : sortedTuples )
986         {
987             // Serialize the key
988             byte[] bytesKey = btree.getKeySerializer().serialize( tuple.key );
989             fos.write( IntSerializer.serialize( bytesKey.length ) );
990             fos.write( bytesKey );
991 
992             // Serialize the number of values
993             int nbValues = tuple.getValue().size();
994             fos.write( IntSerializer.serialize( nbValues ) );
995 
996             // Serialize the values
997             for ( V value : tuple.getValue() )
998             {
999                 byte[] bytesValue = btree.getValueSerializer().serialize( value );
1000 
1001                 // Serialize the value
1002                 fos.write( IntSerializer.serialize( bytesValue.length ) );
1003                 fos.write( bytesValue );
1004             }
1005         }
1006 
1007         fos.flush();
1008         fos.close();
1009 
1010         return file;
1011     }
1012 
1013 
1014     /**
1015      * Sort a list of tuples, eliminating the duplicate keys and storing the values in a set when we 
1016      * have a duplicate key
1017      */
1018     private Tuple<K, Set<V>>[] sort( BTree<K, V> btree, List<Tuple<K, V>> tuples )
1019     {
1020         Comparator<Tuple<K, Set<V>>> tupleComparator = new TupleComparator( btree.getKeyComparator(), btree
1021             .getValueComparator() );
1022 
1023         // Sort the list
1024         Tuple<K, V>[] tuplesArray = ( Tuple<K, V>[] ) tuples.toArray( new Tuple[]
1025             {} );
1026 
1027         // First, eliminate the equals keys. We use a map for that
1028         Map<K, Set<V>> mapTuples = new HashMap<K, Set<V>>();
1029 
1030         for ( Tuple<K, V> tuple : tuplesArray )
1031         {
1032             // Is the key present in the map ?
1033             Set<V> foundSet = mapTuples.get( tuple.key );
1034 
1035             if ( foundSet != null )
1036             {
1037                 // We already have had such a key, add the value to the existing key
1038                 foundSet.add( tuple.value );
1039             }
1040             else
1041             {
1042                 // No such key present in the map : create a new set to store the values,
1043                 // and add it in the map associated with the new key
1044                 Set<V> set = new TreeSet<V>();
1045                 set.add( tuple.value );
1046                 mapTuples.put( tuple.key, set );
1047             }
1048         }
1049 
1050         // Now, sort the map, by extracting all the key/values from the map
1051         int size = mapTuples.size();
1052         Tuple<K, Set<V>>[] sortedTuples = new Tuple[size];
1053         int pos = 0;
1054 
1055         // We create an array containing all the elements
1056         for ( Map.Entry<K, Set<V>> entry : mapTuples.entrySet() )
1057         {
1058             sortedTuples[pos] = new Tuple<K, Set<V>>();
1059             sortedTuples[pos].key = entry.getKey();
1060             sortedTuples[pos].value = entry.getValue();
1061             pos++;
1062         }
1063 
1064         // And we sort the array
1065         Arrays.sort( sortedTuples, tupleComparator );
1066 
1067         return sortedTuples;
1068     }
1069 
1070 
1071     /**
1072      * Build an iterator over an array of sorted tuples, in memory
1073      */
1074     private Iterator<Tuple<K, Set<V>>> createTupleIterator( BTree<K, V> btree, List<Tuple<K, V>> tuples )
1075     {
1076         final Tuple<K, Set<V>>[] sortedTuples = sort( btree, tuples );
1077 
1078         Iterator<Tuple<K, Set<V>>> tupleIterator = new Iterator<Tuple<K, Set<V>>>()
1079         {
1080             private int pos = 0;
1081 
1082 
1083             @Override
1084             public Tuple<K, Set<V>> next()
1085             {
1086                 // Return the current tuple, if any
1087                 if ( pos < sortedTuples.length )
1088                 {
1089                     Tuple<K, Set<V>> tuple = sortedTuples[pos];
1090                     pos++;
1091 
1092                     return tuple;
1093                 }
1094 
1095                 return null;
1096             }
1097 
1098 
1099             @Override
1100             public boolean hasNext()
1101             {
1102                 return pos < sortedTuples.length;
1103             }
1104 
1105 
1106             @Override
1107             public void remove()
1108             {
1109             }
1110         };
1111 
1112         return tupleIterator;
1113     }
1114 
1115 
1116     private Tuple<K, Set<V>> fetchTuple( BTree<K, V> btree, FileInputStream fis )
1117     {
1118         try
1119         {
1120             if ( fis.available() == 0 )
1121             {
1122                 return null;
1123             }
1124 
1125             Tuple<K, Set<V>> tuple = new Tuple<K, Set<V>>();
1126             tuple.value = new TreeSet<V>();
1127 
1128             byte[] intBytes = new byte[4];
1129 
1130             // Read the key length
1131             fis.read( intBytes );
1132             int keyLength = IntSerializer.deserialize( intBytes );
1133 
1134             // Read the key
1135             byte[] keyBytes = new byte[keyLength];
1136             fis.read( keyBytes );
1137             K key = btree.getKeySerializer().fromBytes( keyBytes );
1138             tuple.key = key;
1139 
1140             // get the number of values
1141             fis.read( intBytes );
1142             int nbValues = IntSerializer.deserialize( intBytes );
1143 
1144             // Serialize the values
1145             for ( int i = 0; i < nbValues; i++ )
1146             {
1147                 // Read the value length
1148                 fis.read( intBytes );
1149                 int valueLength = IntSerializer.deserialize( intBytes );
1150 
1151                 // Read the value
1152                 byte[] valueBytes = new byte[valueLength];
1153                 fis.read( valueBytes );
1154                 V value = btree.getValueSerializer().fromBytes( valueBytes );
1155                 tuple.value.add( value );
1156             }
1157 
1158             return tuple;
1159         }
1160         catch ( IOException ioe )
1161         {
1162             return null;
1163         }
1164     }
1165 
1166 
1167     /**
1168      * Build an iterator over an array of sorted tuples, from files on the disk
1169      * @throws FileNotFoundException 
1170      */
1171     private Iterator<Tuple<K, Set<V>>> createIterator( final BTree<K, V> btree, final FileInputStream[] streams )
1172         throws FileNotFoundException
1173     {
1174         // The number of files we have to read from
1175         final int nbFiles = streams.length;
1176 
1177         // We will read only one element at a time from each file
1178         final Tuple<K, Set<V>>[] readTuples = new Tuple[nbFiles];
1179         final TreeMap<Tuple<K, Integer>, Set<V>> candidates =
1180             new TreeMap<Tuple<K, Integer>, Set<V>>(
1181                 new TupleComparator<K, Integer>( btree.getKeyComparator(), IntComparator.INSTANCE ) );
1182 
1183         // Read the tuple from each files
1184         for ( int i = 0; i < nbFiles; i++ )
1185         {
1186             while ( true )
1187             {
1188                 readTuples[i] = fetchTuple( btree, streams[i] );
1189 
1190                 if ( readTuples[i] != null )
1191                 {
1192                     Tuple<K, Integer> candidate = new Tuple<K, Integer>( readTuples[i].key, i, btree.getKeySerializer()
1193                         .getComparator() );
1194 
1195                     if ( !candidates.containsKey( candidate ) )
1196                     {
1197                         candidates.put( candidate, readTuples[i].getValue() );
1198                         break;
1199                     }
1200                     else
1201                     {
1202                         // We have to merge the pulled tuple with the existing one, and read one more tuple
1203                         Set<V> oldValues = candidates.get( candidate );
1204                         oldValues.addAll( readTuples[i].getValue() );
1205                     }
1206                 }
1207                 else
1208                 {
1209                     break;
1210                 }
1211             }
1212         }
1213 
1214         Iterator<Tuple<K, Set<V>>> tupleIterator = new Iterator<Tuple<K, Set<V>>>()
1215         {
1216             @Override
1217             public Tuple<K, Set<V>> next()
1218             {
1219                 // Get the first candidate
1220                 Tuple<K, Integer> tupleCandidate = candidates.firstKey();
1221 
1222                 // Remove it from the set
1223                 candidates.remove( tupleCandidate );
1224 
1225                 // Get the the next tuple from the stream we just got the tuple from
1226                 Tuple<K, Set<V>> tuple = readTuples[tupleCandidate.value];
1227 
1228                 // fetch the next tuple from the file we just read teh candidate from 
1229                 while ( true )
1230                 {
1231                     // fetch it from the disk and store it into its reader
1232                     readTuples[tupleCandidate.value] = fetchTuple( btree, streams[tupleCandidate.value] );
1233 
1234                     if ( readTuples[tupleCandidate.value] == null )
1235                     {
1236                         // No more tuple for this file
1237                         break;
1238                     }
1239 
1240                     if ( readTuples[tupleCandidate.value] != null )
1241                     {
1242                         // And store it into the candidate set
1243                         Set<V> oldValues = candidates.get( readTuples[tupleCandidate.value] );
1244 
1245                         if ( oldValues != null )
1246                         {
1247                             // We already have another element with the same key, merge them
1248                             oldValues.addAll( readTuples[tupleCandidate.value].value );
1249                         }
1250                         else
1251                         {
1252                             Tuple<K, Integer> newTuple = new Tuple<K, Integer>( readTuples[tupleCandidate.value].key,
1253                                 tupleCandidate.value );
1254                             candidates.put( newTuple, readTuples[tupleCandidate.value].getValue() );
1255 
1256                             // and exit the loop
1257                             break;
1258                         }
1259                     }
1260                 }
1261 
1262                 // We can now return the found value
1263                 return tuple;
1264             }
1265 
1266 
1267             @Override
1268             public boolean hasNext()
1269             {
1270                 // Check that we have at least one element to read
1271                 return !candidates.isEmpty();
1272             }
1273 
1274 
1275             @Override
1276             public void remove()
1277             {
1278             }
1279 
1280         };
1281 
1282         return tupleIterator;
1283     }
1284 
1285 
1286     /**
1287      * Build an iterator over an array of sorted tuples, from files on the disk
1288      * @throws FileNotFoundException 
1289      */
1290     private Iterator<Tuple<K, Set<V>>> createUniqueFileIterator( final BTree<K, V> btree, final FileInputStream stream )
1291         throws FileNotFoundException
1292     {
1293         Iterator<Tuple<K, Set<V>>> tupleIterator = new Iterator<Tuple<K, Set<V>>>()
1294         {
1295             boolean hasNext = true;
1296 
1297 
1298             @Override
1299             public Tuple<K, Set<V>> next()
1300             {
1301                 // Get the tuple from the stream
1302                 Tuple<K, Set<V>> tuple = fetchTuple( btree, stream );
1303 
1304                 // We can now return the found value
1305                 return tuple;
1306             }
1307 
1308 
1309             @Override
1310             public boolean hasNext()
1311             {
1312                 // Check that we have at least one element to read
1313                 try
1314                 {
1315                     return stream.available() > 0;
1316                 }
1317                 catch ( IOException e )
1318                 {
1319                     return false;
1320                 }
1321             }
1322 
1323 
1324             @Override
1325             public void remove()
1326             {
1327             }
1328 
1329         };
1330 
1331         return tupleIterator;
1332     }
1333 
1334 
1335     /**
1336      * Compact a given persisted BTree, making it dense. All the values will be stored 
1337      * in newly created pages, each one of them containing as much elements
1338      * as it's size.
1339      * </br>
1340      * The RecordManager will be stopped and restarted, do not use this method
1341      * on a running BTree.
1342      *
1343      * @param recordManager The associated recordManager
1344      * @param btree The BTree to compact
1345      */
1346     public static void compact( RecordManager recordManager, BTree<?, ?> btree )
1347     {
1348 
1349     }
1350 
1351 
1352     /**
1353      * Compact a given in-memory BTree, making it dense. All the values will be stored 
1354      * in newly created pages, each one of them containing as much elements
1355      * as it's size.
1356      * </br>
1357      *
1358      * @param btree The BTree to compact
1359      * @throws KeyNotFoundException 
1360      * @throws IOException 
1361      */
1362     public static BTree<?, ?> compact( BTree<?, ?> btree ) throws IOException, KeyNotFoundException
1363     {
1364         // First, create a new BTree which will contain all the elements
1365         InMemoryBTreeConfiguration configuration = new InMemoryBTreeConfiguration();
1366         configuration.setName( btree.getName() );
1367         configuration.setPageSize( btree.getPageSize() );
1368         configuration.setKeySerializer( btree.getKeySerializer() );
1369         configuration.setValueSerializer( btree.getValueSerializer() );
1370         configuration.setAllowDuplicates( btree.isAllowDuplicates() );
1371         configuration.setReadTimeOut( btree.getReadTimeOut() );
1372         configuration.setWriteBufferSize( btree.getWriteBufferSize() );
1373 
1374         File file = ( ( InMemoryBTree ) btree ).getFile();
1375 
1376         if ( file != null )
1377         {
1378             configuration.setFilePath( file.getPath() );
1379         }
1380 
1381         // Create a new Btree Builder
1382         InMemoryBTreeBuilder btreeBuilder = new InMemoryBTreeBuilder( configuration );
1383 
1384         // Create a cursor over the existing btree
1385         final TupleCursor cursor = btree.browse();
1386 
1387         // Create an iterator that will iterate the existing btree
1388         Iterator<Tuple> tupleItr = new Iterator<Tuple>()
1389         {
1390             @Override
1391             public Tuple next()
1392             {
1393                 try
1394                 {
1395                     return cursor.next();
1396                 }
1397                 catch ( EndOfFileExceededException e )
1398                 {
1399                     return null;
1400                 }
1401                 catch ( IOException e )
1402                 {
1403                     return null;
1404                 }
1405             }
1406 
1407 
1408             @Override
1409             public boolean hasNext()
1410             {
1411                 try
1412                 {
1413                     return cursor.hasNext();
1414                 }
1415                 catch ( EndOfFileExceededException e )
1416                 {
1417                     return false;
1418                 }
1419                 catch ( IOException e )
1420                 {
1421                     return false;
1422                 }
1423             }
1424 
1425 
1426             @Override
1427             public void remove()
1428             {
1429             }
1430         };
1431 
1432         // And finally, compact the btree
1433         return btreeBuilder.build( tupleItr );
1434     }
1435 }