1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.directory.mavibot.btree;
22
23
24 import java.io.File;
25 import java.io.FileInputStream;
26 import java.io.FileNotFoundException;
27 import java.io.FileOutputStream;
28 import java.io.IOException;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Comparator;
32 import java.util.HashMap;
33 import java.util.Iterator;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Set;
37 import java.util.TreeMap;
38 import java.util.TreeSet;
39
40 import org.apache.directory.mavibot.btree.comparator.IntComparator;
41 import org.apache.directory.mavibot.btree.exception.EndOfFileExceededException;
42 import org.apache.directory.mavibot.btree.exception.KeyNotFoundException;
43 import org.apache.directory.mavibot.btree.serializer.IntSerializer;
44
45
46
47
48
49
50
51
52
53
54
55 public class BulkLoader<K, V>
56 {
57 static enum LevelEnum
58 {
59 LEAF,
60 NODE
61 }
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77 class LevelInfo
78 {
79
80 private int levelNumber;
81
82
83 int nbElems;
84
85
86 int nbPages;
87
88
89 int nbElemsLimit;
90
91
92 private boolean isNode;
93
94
95 Page<K, V> currentPage;
96
97
98 private int currentPos;
99
100
101 private int nbAddedElems;
102
103
104
105 public String toString()
106 {
107 StringBuilder sb = new StringBuilder();
108
109 if ( isNode )
110 {
111 sb.append( "NodeLevel[" );
112 sb.append( levelNumber );
113 sb.append( "] :" );
114 }
115 else
116 {
117 sb.append( "LeafLevel:" );
118 }
119
120 sb.append( "\n nbElems = " ).append( nbElems );
121 sb.append( "\n nbPages = " ).append( nbPages );
122 sb.append( "\n nbElemsLimit = " ).append( nbElemsLimit );
123 sb.append( "\n nbAddedElems = " ).append( nbAddedElems );
124 sb.append( "\n currentPos = " ).append( currentPos );
125 sb.append( "\n currentPage" );
126 sb.append( "\n nbKeys : " ).append( currentPage.getNbElems() );
127
128 return sb.toString();
129 }
130 }
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145 private int readElements( BTree<K, V> btree, Iterator<Tuple<K, V>> iterator, List<File> sortedFiles,
146 List<Tuple<K, V>> tuples, int chunkSize ) throws IOException
147 {
148 int nbRead = 0;
149 int nbIteration = 0;
150 int nbElems = 0;
151 boolean inMemory = true;
152
153 while ( true )
154 {
155 nbIteration++;
156 tuples.clear();
157
158
159 while ( iterator.hasNext() && ( nbRead < chunkSize ) )
160 {
161 Tuple<K, V> tuple = iterator.next();
162 tuples.add( tuple );
163 nbRead++;
164 }
165
166 if ( nbRead < chunkSize )
167 {
168 if ( nbIteration != 1 )
169 {
170
171 inMemory = false;
172
173 sortedFiles.add( flushToDisk( nbIteration, tuples, btree ) );
174 }
175
176
177 nbElems += nbRead;
178
179 break;
180 }
181 else
182 {
183 if ( !iterator.hasNext() )
184 {
185
186 if ( nbIteration > 1 )
187 {
188
189 inMemory = false;
190 sortedFiles.add( flushToDisk( nbIteration, tuples, btree ) );
191 }
192
193
194
195
196
197 nbElems += nbRead;
198
199 break;
200 }
201
202
203 nbElems += nbRead;
204 nbRead = 0;
205 sortedFiles.add( flushToDisk( nbIteration, tuples, btree ) );
206 }
207 }
208
209 if ( !inMemory )
210 {
211 tuples.clear();
212 }
213
214 return nbElems;
215 }
216
217
218
219
220
221
222
223 private Tuple<Iterator<Tuple<K, Set<V>>>, Integer> processFiles( BTree<K, V> btree,
224 Iterator<Tuple<K, Set<V>>> dataIterator ) throws IOException
225 {
226 File file = File.createTempFile( "sortedUnique", "data" );
227 file.deleteOnExit();
228 FileOutputStream fos = new FileOutputStream( file );
229
230
231 int nbReads = 0;
232
233
234 while ( dataIterator.hasNext() )
235 {
236 nbReads++;
237
238
239 Tuple<K, Set<V>> tuple = dataIterator.next();
240
241
242 byte[] bytesKey = btree.getKeySerializer().serialize( tuple.key );
243 fos.write( IntSerializer.serialize( bytesKey.length ) );
244 fos.write( bytesKey );
245
246
247 int nbValues = tuple.getValue().size();
248 fos.write( IntSerializer.serialize( nbValues ) );
249
250
251 for ( V value : tuple.getValue() )
252 {
253 byte[] bytesValue = btree.getValueSerializer().serialize( value );
254
255
256 fos.write( IntSerializer.serialize( bytesValue.length ) );
257 fos.write( bytesValue );
258 }
259 }
260
261 fos.flush();
262 fos.close();
263
264 FileInputStream fis = new FileInputStream( file );
265 Iterator<Tuple<K, Set<V>>> uniqueIterator = createUniqueFileIterator( btree, fis );
266
267 Tuple<Iterator<Tuple<K, Set<V>>>, Integer> result = new Tuple<Iterator<Tuple<K, Set<V>>>, Integer>(
268 uniqueIterator, nbReads );
269
270 return result;
271 }
272
273
274
275
276
277
278
279
280
281
282 public BTree<K, V> load( PersistedBTree<K, V> btree, Iterator<Tuple<K, V>> iterator, int chunkSize )
283 throws IOException
284 {
285 if ( btree == null )
286 {
287 throw new RuntimeException( "Invalid BTree : it's null" );
288 }
289
290 if ( iterator == null )
291 {
292
293 return null;
294 }
295
296
297 boolean inMemory = true;
298
299
300 List<File> sortedFiles = new ArrayList<File>();
301
302
303 List<Tuple<K, V>> tuples = new ArrayList<Tuple<K, V>>( chunkSize );
304
305
306
307 int nbElems = readElements( btree, iterator, sortedFiles, tuples, chunkSize );
308
309
310 if ( nbElems > 0 )
311 {
312 inMemory = tuples.size() > 0;
313 }
314
315
316 Iterator<Tuple<K, Set<V>>> dataIterator = null;
317 FileInputStream[] streams = null;
318 BTree<K, V> resultBTree = null;
319
320 if ( inMemory )
321 {
322
323
324 dataIterator = createTupleIterator( btree, tuples );
325 resultBTree = bulkLoad( btree, dataIterator, nbElems );
326 }
327 else
328 {
329
330 int nbFiles = sortedFiles.size();
331 streams = new FileInputStream[nbFiles];
332
333 for ( int i = 0; i < nbFiles; i++ )
334 {
335 streams[i] = new FileInputStream( sortedFiles.get( i ) );
336 }
337
338 dataIterator = createIterator( btree, streams );
339
340
341 Tuple<Iterator<Tuple<K, Set<V>>>, Integer> result = processFiles( btree, dataIterator );
342 resultBTree = bulkLoad( btree, result.key, result.value );
343 }
344
345
346
347
348 if ( !inMemory )
349 {
350 int nbFiles = sortedFiles.size();
351
352 for ( int i = 0; i < nbFiles; i++ )
353 {
354 streams[i].close();
355 sortedFiles.get( i ).delete();
356 }
357 }
358
359 return resultBTree;
360 }
361
362
363
364
365
366
367 LevelInfo computeLevel( BTree<K, V> btree, int nbElems, LevelEnum levelType )
368 {
369 int pageSize = btree.getPageSize();
370 int incrementNode = 0;
371
372 if ( levelType == LevelEnum.NODE )
373 {
374 incrementNode = 1;
375 }
376
377 LevelInfo level = new LevelInfo();
378 level.isNode = ( levelType == LevelEnum.NODE );
379 level.nbElems = nbElems;
380 level.nbPages = nbElems / ( pageSize + incrementNode );
381 level.levelNumber = 0;
382 level.nbAddedElems = 0;
383 level.currentPos = 0;
384
385
386 if ( nbElems <= pageSize + incrementNode )
387 {
388 if ( nbElems % ( pageSize + incrementNode ) != 0 )
389 {
390 level.nbPages = 1;
391 }
392
393 level.nbElemsLimit = nbElems;
394
395 if ( level.isNode )
396 {
397 level.currentPage = BTreeFactory.createNode( btree, 0L, nbElems - 1 );
398 }
399 else
400 {
401 level.currentPage = BTreeFactory.createLeaf( btree, 0L, nbElems );
402 }
403 }
404 else
405 {
406 int remaining = nbElems % ( pageSize + incrementNode );
407
408 if ( remaining == 0 )
409 {
410 level.nbElemsLimit = nbElems;
411
412 if ( level.isNode )
413 {
414 level.currentPage = BTreeFactory.createNode( btree, 0L, pageSize );
415 }
416 else
417 {
418 level.currentPage = BTreeFactory.createLeaf( btree, 0L, pageSize );
419 }
420 }
421 else
422 {
423 level.nbPages++;
424
425 if ( remaining < ( pageSize / 2 ) + incrementNode )
426 {
427 level.nbElemsLimit = nbElems - remaining - ( pageSize + incrementNode );
428
429 if ( level.nbElemsLimit > 0 )
430 {
431 if ( level.isNode )
432 {
433 level.currentPage = BTreeFactory.createNode( btree, 0L, pageSize );
434 }
435 else
436 {
437 level.currentPage = BTreeFactory.createLeaf( btree, 0L, pageSize );
438 }
439 }
440 else
441 {
442 if ( level.isNode )
443 {
444 level.currentPage = BTreeFactory.createNode( btree, 0L, ( pageSize / 2 ) + remaining - 1 );
445 }
446 else
447 {
448 level.currentPage = BTreeFactory.createLeaf( btree, 0L, ( pageSize / 2 ) + remaining );
449 }
450 }
451 }
452 else
453 {
454 level.nbElemsLimit = nbElems - remaining;
455
456 if ( level.isNode )
457 {
458 level.currentPage = BTreeFactory.createNode( btree, 0L, pageSize );
459 }
460 else
461 {
462 level.currentPage = BTreeFactory.createLeaf( btree, 0L, pageSize );
463 }
464 }
465 }
466 }
467
468 return level;
469 }
470
471
472
473
474
475
476 List<LevelInfo> computeLevels( BTree<K, V> btree, int nbElems )
477 {
478 List<LevelInfo> levelList = new ArrayList<LevelInfo>();
479
480
481 LevelInfo leafLevel = computeLevel( btree, nbElems, LevelEnum.LEAF );
482
483 levelList.add( leafLevel );
484 int nbPages = leafLevel.nbPages;
485 int levelNumber = 1;
486
487 while ( nbPages > 1 )
488 {
489
490 LevelInfo nodeLevel = computeLevel( btree, nbPages, LevelEnum.NODE );
491 nodeLevel.levelNumber = levelNumber++;
492 levelList.add( nodeLevel );
493 nbPages = nodeLevel.nbPages;
494 }
495
496 return levelList;
497 }
498
499
500
501
502
503 private void injectInLeaf( BTree<K, V> btree, Tuple<K, Set<V>> tuple, LevelInfo leafLevel )
504 {
505 PersistedLeaf<K, V> leaf = ( PersistedLeaf<K, V> ) leafLevel.currentPage;
506
507 KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), tuple.getKey() );
508 ValueHolder<V> valueHolder = new PersistedValueHolder<V>( btree, ( V[] ) tuple.getValue().toArray() );
509 leaf.setKey( leafLevel.currentPos, keyHolder );
510 leaf.setValue( leafLevel.currentPos, valueHolder );
511
512 leafLevel.currentPos++;
513 }
514
515
516 private int computeNbElemsLeaf( BTree<K, V> btree, LevelInfo levelInfo )
517 {
518 int pageSize = btree.getPageSize();
519 int remaining = levelInfo.nbElems - levelInfo.nbAddedElems;
520
521 if ( remaining < pageSize )
522 {
523 return remaining;
524 }
525 else if ( remaining == pageSize )
526 {
527 return pageSize;
528 }
529 else if ( remaining > levelInfo.nbElems - levelInfo.nbElemsLimit )
530 {
531 return pageSize;
532 }
533 else
534 {
535 return remaining - pageSize / 2;
536 }
537 }
538
539
540
541
542
543 int computeNbElemsNode( BTree<K, V> btree, LevelInfo levelInfo )
544 {
545 int pageSize = btree.getPageSize();
546 int remaining = levelInfo.nbElems - levelInfo.nbAddedElems;
547
548 if ( remaining < pageSize + 1 )
549 {
550 return remaining;
551 }
552 else if ( remaining == pageSize + 1 )
553 {
554 return pageSize + 1;
555 }
556 else if ( remaining > levelInfo.nbElems - levelInfo.nbElemsLimit )
557 {
558 return pageSize + 1;
559 }
560 else
561 {
562 return remaining - pageSize / 2;
563 }
564 }
565
566
567
568
569
570 private void injectInRoot( BTree<K, V> btree, Page<K, V> page, PageHolder<K, V> pageHolder, LevelInfo level )
571 throws IOException
572 {
573 PersistedNode<K, V> node = ( PersistedNode<K, V> ) level.currentPage;
574 if ( ( level.currentPos == 0 ) && ( node.getPage( 0 ) == null ) )
575
576 {
577 node.setPageHolder( 0, pageHolder );
578 level.nbAddedElems++;
579 }
580 else
581 {
582
583 node.setPageHolder( level.currentPos + 1, pageHolder );
584 KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), page.getLeftMostKey() );
585 node.setKey( level.currentPos, keyHolder );
586 level.currentPos++;
587 level.nbAddedElems++;
588
589
590
591 if ( level.nbAddedElems == level.nbElems )
592 {
593 PageHolder<K, V> rootHolder = ( ( PersistedBTree<K, V> ) btree ).getRecordManager().writePage(
594 btree, node, 0L );
595 ( ( PersistedBTree<K, V> ) btree ).setRootPage( rootHolder.getValue() );
596 }
597 }
598
599 return;
600 }
601
602
603
604
605
606 private void injectInNode( BTree<K, V> btree, Page<K, V> page, List<LevelInfo> levels, int levelIndex )
607 throws IOException
608 {
609 int pageSize = btree.getPageSize();
610 LevelInfo level = levels.get( levelIndex );
611 PersistedNode<K, V> node = ( PersistedNode<K, V> ) level.currentPage;
612
613
614 PageHolder<K, V> pageHolder = ( ( PersistedBTree<K, V> ) btree ).getRecordManager().writePage( btree, page, 0L );
615
616
617
618 if ( level.nbElems <= pageSize + 1 )
619 {
620 injectInRoot( btree, page, pageHolder, level );
621
622 return;
623 }
624
625
626
627
628
629 if ( level.nbAddedElems < level.nbElemsLimit )
630 {
631
632
633
634
635 if ( ( level.currentPos == 0 ) && ( node.getKey( 0 ) == null ) )
636 {
637 node.setPageHolder( 0, pageHolder );
638 }
639 else
640 {
641
642 node.setPageHolder( level.currentPos, pageHolder );
643 KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), page.getLeftMostKey() );
644 node.setKey( level.currentPos - 1, keyHolder );
645 }
646
647
648 level.currentPos++;
649 level.nbAddedElems++;
650
651
652
653 if ( level.nbAddedElems == level.nbElems )
654 {
655
656
657
658 injectInNode( btree, node, levels, levelIndex + 1 );
659
660 return;
661 }
662 else
663 {
664
665 if ( ( level.currentPos == pageSize + 1 ) && ( level.levelNumber < levels.size() - 1 ) )
666 {
667
668
669 injectInNode( btree, node, levels, levelIndex + 1 );
670
671
672 if ( level.nbAddedElems < level.nbElemsLimit )
673 {
674
675 level.currentPage = BTreeFactory.createNode( btree, 0L, pageSize );
676 }
677 else if ( level.nbElems - level.nbAddedElems <= pageSize )
678 {
679 level.currentPage = BTreeFactory.createNode( btree, 0L, level.nbElems - level.nbAddedElems - 1 );
680 }
681 else
682 {
683 level.currentPage = BTreeFactory.createNode( btree, 0L, ( level.nbElems - 1 )
684 - ( level.nbAddedElems + 1 ) - pageSize / 2 );
685 }
686
687 level.currentPos = 0;
688 }
689 }
690
691 return;
692 }
693 else
694 {
695
696
697
698
699 if ( level.nbElems - level.nbElemsLimit > pageSize )
700 {
701
702
703 if ( level.nbElems - level.nbAddedElems <= pageSize / 2 + 1 )
704 {
705
706 if ( ( level.currentPos == 0 ) && ( node.getKey( 0 ) == null ) )
707 {
708 node.setPageHolder( 0, pageHolder );
709 }
710 else
711 {
712
713 node.setPageHolder( level.currentPos, pageHolder );
714 KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(),
715 page.getLeftMostKey() );
716 node.setKey( level.currentPos - 1, keyHolder );
717 }
718
719
720 level.currentPos++;
721 level.nbAddedElems++;
722
723
724 if ( level.nbAddedElems == level.nbElems )
725 {
726
727 injectInNode( btree, node, levels, levelIndex + 1 );
728 }
729 }
730 else
731 {
732
733 if ( ( level.currentPos == 0 ) && ( node.getKey( 0 ) == null ) )
734 {
735
736 node.setPageHolder( 0, pageHolder );
737 }
738 else
739 {
740
741
742 node.setPageHolder( level.currentPos, pageHolder );
743 KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(),
744 page.getLeftMostKey() );
745 node.setKey( level.currentPos - 1, keyHolder );
746 }
747
748
749 level.currentPos++;
750 level.nbAddedElems++;
751
752
753 if ( level.currentPos == node.getNbElems() + 1 )
754 {
755
756 injectInNode( btree, node, levels, levelIndex + 1 );
757
758
759 level.currentPage = BTreeFactory.createNode( btree, 0L, pageSize / 2 );
760 level.currentPos = 0;
761 }
762 }
763 }
764 else
765 {
766
767 if ( level.nbAddedElems == level.nbElems )
768 {
769
770 injectInNode( btree, node, levels, levelIndex + 1 );
771 }
772 else
773 {
774
775
776 if ( ( level.currentPos == 0 ) && ( node.getKey( 0 ) == null ) )
777 {
778
779 node.setPageHolder( 0, pageHolder );
780 }
781 else
782 {
783
784
785 node.setPageHolder( level.currentPos, pageHolder );
786 KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(),
787 page.getLeftMostKey() );
788 node.setKey( level.currentPos - 1, keyHolder );
789 }
790
791
792 level.currentPos++;
793 level.nbAddedElems++;
794
795
796 if ( level.currentPos == node.getNbElems() + 1 )
797 {
798
799 injectInNode( btree, node, levels, levelIndex + 1 );
800
801
802 level.currentPage = BTreeFactory.createNode( btree, 0L, pageSize / 2 );
803 level.currentPos = 0;
804 }
805 }
806
807 return;
808 }
809 }
810 }
811
812
813 private BTree<K, V> bulkLoadSinglePage( BTree<K, V> btree, Iterator<Tuple<K, Set<V>>> dataIterator, int nbElems )
814 throws IOException
815 {
816
817 PersistedLeaf<K, V> rootPage = ( PersistedLeaf<K, V> ) BTreeFactory.createLeaf( btree, 0L, nbElems );
818
819
820 int pos = 0;
821
822 while ( dataIterator.hasNext() )
823 {
824 Tuple<K, Set<V>> tuple = dataIterator.next();
825
826
827 KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), tuple.getKey() );
828 ValueHolder<V> valueHolder = new PersistedValueHolder<V>( btree, ( V[] ) tuple.getValue().toArray() );
829 rootPage.setKey( pos, keyHolder );
830 rootPage.setValue( pos, valueHolder );
831 pos++;
832 }
833
834
835 ( ( PersistedBTree<K, V> ) btree ).getRecordManager().writePage( btree, rootPage, 0L );
836
837
838 ( ( PersistedBTree<K, V> ) btree ).getBtreeHeader().setRootPage( rootPage );
839 ( ( PersistedBTree<K, V> ) btree ).getBtreeHeader().setNbElems( nbElems );
840
841 return btree;
842 }
843
844
845
846
847
848
849 private BTree<K, V> bulkLoad( BTree<K, V> btree, Iterator<Tuple<K, Set<V>>> dataIterator, int nbElems )
850 throws IOException
851 {
852 int pageSize = btree.getPageSize();
853
854
855 if ( nbElems <= pageSize )
856 {
857 return bulkLoadSinglePage( btree, dataIterator, nbElems );
858 }
859
860
861
862
863 List<LevelInfo> levels = computeLevels( btree, nbElems );
864
865
866 LevelInfo leafLevel = levels.get( 0 );
867
868 while ( dataIterator.hasNext() )
869 {
870
871 if ( leafLevel.nbAddedElems < leafLevel.nbElemsLimit )
872 {
873
874 Tuple<K, Set<V>> tuple = dataIterator.next();
875
876 injectInLeaf( btree, tuple, leafLevel );
877 leafLevel.nbAddedElems++;
878
879
880 if ( leafLevel.currentPos == pageSize )
881 {
882 injectInNode( btree, leafLevel.currentPage, levels, 1 );
883
884
885 leafLevel.currentPage = BTreeFactory.createLeaf( btree, 0L, computeNbElemsLeaf( btree, leafLevel ) );
886 leafLevel.currentPos = 0;
887 }
888 }
889 else
890 {
891
892 if ( leafLevel.nbAddedElems == nbElems )
893 {
894
895 break;
896 }
897
898 if ( nbElems - leafLevel.nbElemsLimit > pageSize )
899 {
900
901
902
903
904
905 int nbToAdd = nbElems - leafLevel.nbElemsLimit - pageSize / 2;
906
907 while ( nbToAdd > 0 )
908 {
909
910 Tuple<K, Set<V>> tuple = dataIterator.next();
911
912 injectInLeaf( btree, tuple, leafLevel );
913 leafLevel.nbAddedElems++;
914 nbToAdd--;
915 }
916
917
918 injectInNode( btree, leafLevel.currentPage, levels, 1 );
919
920
921 nbToAdd = pageSize / 2;
922 leafLevel.currentPage = BTreeFactory.createLeaf( btree, 0L, nbToAdd );
923 leafLevel.currentPos = 0;
924
925 while ( nbToAdd > 0 )
926 {
927
928 Tuple<K, Set<V>> tuple = dataIterator.next();
929
930 injectInLeaf( btree, tuple, leafLevel );
931 leafLevel.nbAddedElems++;
932 nbToAdd--;
933 }
934
935
936 injectInNode( btree, leafLevel.currentPage, levels, 1 );
937
938
939 break;
940 }
941 else
942 {
943
944
945 int nbToAdd = nbElems - leafLevel.nbElemsLimit;
946
947 while ( nbToAdd > 0 )
948 {
949
950 Tuple<K, Set<V>> tuple = dataIterator.next();
951
952 injectInLeaf( btree, tuple, leafLevel );
953 leafLevel.nbAddedElems++;
954 nbToAdd--;
955 }
956
957
958 injectInNode( btree, leafLevel.currentPage, levels, 1 );
959
960
961 break;
962 }
963 }
964 }
965
966 return btree;
967 }
968
969
970
971
972
973
974
975 private File flushToDisk( int fileNb, List<Tuple<K, V>> tuples, BTree<K, V> btree ) throws IOException
976 {
977
978 Tuple<K, Set<V>>[] sortedTuples = sort( btree, tuples );
979
980 File file = File.createTempFile( "sorted", Integer.toString( fileNb ) );
981 file.deleteOnExit();
982 FileOutputStream fos = new FileOutputStream( file );
983
984
985 for ( Tuple<K, Set<V>> tuple : sortedTuples )
986 {
987
988 byte[] bytesKey = btree.getKeySerializer().serialize( tuple.key );
989 fos.write( IntSerializer.serialize( bytesKey.length ) );
990 fos.write( bytesKey );
991
992
993 int nbValues = tuple.getValue().size();
994 fos.write( IntSerializer.serialize( nbValues ) );
995
996
997 for ( V value : tuple.getValue() )
998 {
999 byte[] bytesValue = btree.getValueSerializer().serialize( value );
1000
1001
1002 fos.write( IntSerializer.serialize( bytesValue.length ) );
1003 fos.write( bytesValue );
1004 }
1005 }
1006
1007 fos.flush();
1008 fos.close();
1009
1010 return file;
1011 }
1012
1013
1014
1015
1016
1017
1018 private Tuple<K, Set<V>>[] sort( BTree<K, V> btree, List<Tuple<K, V>> tuples )
1019 {
1020 Comparator<Tuple<K, Set<V>>> tupleComparator = new TupleComparator( btree.getKeyComparator(), btree
1021 .getValueComparator() );
1022
1023
1024 Tuple<K, V>[] tuplesArray = ( Tuple<K, V>[] ) tuples.toArray( new Tuple[]
1025 {} );
1026
1027
1028 Map<K, Set<V>> mapTuples = new HashMap<K, Set<V>>();
1029
1030 for ( Tuple<K, V> tuple : tuplesArray )
1031 {
1032
1033 Set<V> foundSet = mapTuples.get( tuple.key );
1034
1035 if ( foundSet != null )
1036 {
1037
1038 foundSet.add( tuple.value );
1039 }
1040 else
1041 {
1042
1043
1044 Set<V> set = new TreeSet<V>();
1045 set.add( tuple.value );
1046 mapTuples.put( tuple.key, set );
1047 }
1048 }
1049
1050
1051 int size = mapTuples.size();
1052 Tuple<K, Set<V>>[] sortedTuples = new Tuple[size];
1053 int pos = 0;
1054
1055
1056 for ( Map.Entry<K, Set<V>> entry : mapTuples.entrySet() )
1057 {
1058 sortedTuples[pos] = new Tuple<K, Set<V>>();
1059 sortedTuples[pos].key = entry.getKey();
1060 sortedTuples[pos].value = entry.getValue();
1061 pos++;
1062 }
1063
1064
1065 Arrays.sort( sortedTuples, tupleComparator );
1066
1067 return sortedTuples;
1068 }
1069
1070
1071
1072
1073
1074 private Iterator<Tuple<K, Set<V>>> createTupleIterator( BTree<K, V> btree, List<Tuple<K, V>> tuples )
1075 {
1076 final Tuple<K, Set<V>>[] sortedTuples = sort( btree, tuples );
1077
1078 Iterator<Tuple<K, Set<V>>> tupleIterator = new Iterator<Tuple<K, Set<V>>>()
1079 {
1080 private int pos = 0;
1081
1082
1083 @Override
1084 public Tuple<K, Set<V>> next()
1085 {
1086
1087 if ( pos < sortedTuples.length )
1088 {
1089 Tuple<K, Set<V>> tuple = sortedTuples[pos];
1090 pos++;
1091
1092 return tuple;
1093 }
1094
1095 return null;
1096 }
1097
1098
1099 @Override
1100 public boolean hasNext()
1101 {
1102 return pos < sortedTuples.length;
1103 }
1104
1105
1106 @Override
1107 public void remove()
1108 {
1109 }
1110 };
1111
1112 return tupleIterator;
1113 }
1114
1115
1116 private Tuple<K, Set<V>> fetchTuple( BTree<K, V> btree, FileInputStream fis )
1117 {
1118 try
1119 {
1120 if ( fis.available() == 0 )
1121 {
1122 return null;
1123 }
1124
1125 Tuple<K, Set<V>> tuple = new Tuple<K, Set<V>>();
1126 tuple.value = new TreeSet<V>();
1127
1128 byte[] intBytes = new byte[4];
1129
1130
1131 fis.read( intBytes );
1132 int keyLength = IntSerializer.deserialize( intBytes );
1133
1134
1135 byte[] keyBytes = new byte[keyLength];
1136 fis.read( keyBytes );
1137 K key = btree.getKeySerializer().fromBytes( keyBytes );
1138 tuple.key = key;
1139
1140
1141 fis.read( intBytes );
1142 int nbValues = IntSerializer.deserialize( intBytes );
1143
1144
1145 for ( int i = 0; i < nbValues; i++ )
1146 {
1147
1148 fis.read( intBytes );
1149 int valueLength = IntSerializer.deserialize( intBytes );
1150
1151
1152 byte[] valueBytes = new byte[valueLength];
1153 fis.read( valueBytes );
1154 V value = btree.getValueSerializer().fromBytes( valueBytes );
1155 tuple.value.add( value );
1156 }
1157
1158 return tuple;
1159 }
1160 catch ( IOException ioe )
1161 {
1162 return null;
1163 }
1164 }
1165
1166
1167
1168
1169
1170
1171 private Iterator<Tuple<K, Set<V>>> createIterator( final BTree<K, V> btree, final FileInputStream[] streams )
1172 throws FileNotFoundException
1173 {
1174
1175 final int nbFiles = streams.length;
1176
1177
1178 final Tuple<K, Set<V>>[] readTuples = new Tuple[nbFiles];
1179 final TreeMap<Tuple<K, Integer>, Set<V>> candidates =
1180 new TreeMap<Tuple<K, Integer>, Set<V>>(
1181 new TupleComparator<K, Integer>( btree.getKeyComparator(), IntComparator.INSTANCE ) );
1182
1183
1184 for ( int i = 0; i < nbFiles; i++ )
1185 {
1186 while ( true )
1187 {
1188 readTuples[i] = fetchTuple( btree, streams[i] );
1189
1190 if ( readTuples[i] != null )
1191 {
1192 Tuple<K, Integer> candidate = new Tuple<K, Integer>( readTuples[i].key, i, btree.getKeySerializer()
1193 .getComparator() );
1194
1195 if ( !candidates.containsKey( candidate ) )
1196 {
1197 candidates.put( candidate, readTuples[i].getValue() );
1198 break;
1199 }
1200 else
1201 {
1202
1203 Set<V> oldValues = candidates.get( candidate );
1204 oldValues.addAll( readTuples[i].getValue() );
1205 }
1206 }
1207 else
1208 {
1209 break;
1210 }
1211 }
1212 }
1213
1214 Iterator<Tuple<K, Set<V>>> tupleIterator = new Iterator<Tuple<K, Set<V>>>()
1215 {
1216 @Override
1217 public Tuple<K, Set<V>> next()
1218 {
1219
1220 Tuple<K, Integer> tupleCandidate = candidates.firstKey();
1221
1222
1223 candidates.remove( tupleCandidate );
1224
1225
1226 Tuple<K, Set<V>> tuple = readTuples[tupleCandidate.value];
1227
1228
1229 while ( true )
1230 {
1231
1232 readTuples[tupleCandidate.value] = fetchTuple( btree, streams[tupleCandidate.value] );
1233
1234 if ( readTuples[tupleCandidate.value] == null )
1235 {
1236
1237 break;
1238 }
1239
1240 if ( readTuples[tupleCandidate.value] != null )
1241 {
1242
1243 Set<V> oldValues = candidates.get( readTuples[tupleCandidate.value] );
1244
1245 if ( oldValues != null )
1246 {
1247
1248 oldValues.addAll( readTuples[tupleCandidate.value].value );
1249 }
1250 else
1251 {
1252 Tuple<K, Integer> newTuple = new Tuple<K, Integer>( readTuples[tupleCandidate.value].key,
1253 tupleCandidate.value );
1254 candidates.put( newTuple, readTuples[tupleCandidate.value].getValue() );
1255
1256
1257 break;
1258 }
1259 }
1260 }
1261
1262
1263 return tuple;
1264 }
1265
1266
1267 @Override
1268 public boolean hasNext()
1269 {
1270
1271 return !candidates.isEmpty();
1272 }
1273
1274
1275 @Override
1276 public void remove()
1277 {
1278 }
1279
1280 };
1281
1282 return tupleIterator;
1283 }
1284
1285
1286
1287
1288
1289
1290 private Iterator<Tuple<K, Set<V>>> createUniqueFileIterator( final BTree<K, V> btree, final FileInputStream stream )
1291 throws FileNotFoundException
1292 {
1293 Iterator<Tuple<K, Set<V>>> tupleIterator = new Iterator<Tuple<K, Set<V>>>()
1294 {
1295 boolean hasNext = true;
1296
1297
1298 @Override
1299 public Tuple<K, Set<V>> next()
1300 {
1301
1302 Tuple<K, Set<V>> tuple = fetchTuple( btree, stream );
1303
1304
1305 return tuple;
1306 }
1307
1308
1309 @Override
1310 public boolean hasNext()
1311 {
1312
1313 try
1314 {
1315 return stream.available() > 0;
1316 }
1317 catch ( IOException e )
1318 {
1319 return false;
1320 }
1321 }
1322
1323
1324 @Override
1325 public void remove()
1326 {
1327 }
1328
1329 };
1330
1331 return tupleIterator;
1332 }
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346 public static void compact( RecordManager recordManager, BTree<?, ?> btree )
1347 {
1348
1349 }
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362 public static BTree<?, ?> compact( BTree<?, ?> btree ) throws IOException, KeyNotFoundException
1363 {
1364
1365 InMemoryBTreeConfiguration configuration = new InMemoryBTreeConfiguration();
1366 configuration.setName( btree.getName() );
1367 configuration.setPageSize( btree.getPageSize() );
1368 configuration.setKeySerializer( btree.getKeySerializer() );
1369 configuration.setValueSerializer( btree.getValueSerializer() );
1370 configuration.setAllowDuplicates( btree.isAllowDuplicates() );
1371 configuration.setReadTimeOut( btree.getReadTimeOut() );
1372 configuration.setWriteBufferSize( btree.getWriteBufferSize() );
1373
1374 File file = ( ( InMemoryBTree ) btree ).getFile();
1375
1376 if ( file != null )
1377 {
1378 configuration.setFilePath( file.getPath() );
1379 }
1380
1381
1382 InMemoryBTreeBuilder btreeBuilder = new InMemoryBTreeBuilder( configuration );
1383
1384
1385 final TupleCursor cursor = btree.browse();
1386
1387
1388 Iterator<Tuple> tupleItr = new Iterator<Tuple>()
1389 {
1390 @Override
1391 public Tuple next()
1392 {
1393 try
1394 {
1395 return cursor.next();
1396 }
1397 catch ( EndOfFileExceededException e )
1398 {
1399 return null;
1400 }
1401 catch ( IOException e )
1402 {
1403 return null;
1404 }
1405 }
1406
1407
1408 @Override
1409 public boolean hasNext()
1410 {
1411 try
1412 {
1413 return cursor.hasNext();
1414 }
1415 catch ( EndOfFileExceededException e )
1416 {
1417 return false;
1418 }
1419 catch ( IOException e )
1420 {
1421 return false;
1422 }
1423 }
1424
1425
1426 @Override
1427 public void remove()
1428 {
1429 }
1430 };
1431
1432
1433 return btreeBuilder.build( tupleItr );
1434 }
1435 }