1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.directory.mavibot.btree;
21
22
23 import java.io.Closeable;
24 import java.io.EOFException;
25 import java.io.File;
26 import java.io.FileOutputStream;
27 import java.io.IOException;
28 import java.io.RandomAccessFile;
29 import java.nio.ByteBuffer;
30 import java.nio.channels.FileChannel;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.locks.ReentrantLock;
33
34 import org.apache.directory.mavibot.btree.exception.InitializationException;
35 import org.apache.directory.mavibot.btree.exception.KeyNotFoundException;
36 import org.apache.directory.mavibot.btree.exception.MissingSerializerException;
37 import org.apache.directory.mavibot.btree.serializer.BufferHandler;
38 import org.apache.directory.mavibot.btree.serializer.LongSerializer;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42
43
44
45
46
47
48
49
50
51
52 {
53
54 protected static final Logger LOG = LoggerFactory.getLogger( InMemoryBTree.class );
55
56
57 public static final String DEFAULT_JOURNAL = "mavibot.log";
58
59
60 public static final String DATA_SUFFIX = ".db";
61
62
63 public static final String JOURNAL_SUFFIX = ".log";
64
65
66
67 private File file;
68
69
70 private boolean withJournal;
71
72
73 private File journal;
74
75 private File envDir;
76
77 private FileChannel journalChannel = null;
78
79
80
81
82
83
84 {
85 super();
86 btreeHeader = new BTreeHeader();
87 setType( BTreeTypeEnum.IN_MEMORY );
88 }
89
90
91
92
93
94
95
96
97
98 {
99 super();
100 String name = configuration.getName();
101
102 if ( name == null )
103 {
104 throw new IllegalArgumentException( "BTree name cannot be null" );
105 }
106
107 String filePath = configuration.getFilePath();
108
109 if ( filePath != null )
110 {
111 envDir = new File( filePath );
112 }
113
114 btreeHeader = new BTreeHeader();
115 btreeHeader.setName( name );
116 btreeHeader.setPageSize( configuration.getPageSize() );
117
118 keySerializer = configuration.getKeySerializer();
119 btreeHeader.setKeySerializerFQCN( keySerializer.getClass().getName() );
120
121 valueSerializer = configuration.getValueSerializer();
122 btreeHeader.setValueSerializerFQCN( valueSerializer.getClass().getName() );
123
124 readTimeOut = configuration.getReadTimeOut();
125 writeBufferSize = configuration.getWriteBufferSize();
126 btreeHeader.setAllowDuplicates( configuration.isAllowDuplicates() );
127 setType( configuration.getType() );
128
129 if ( keySerializer.getComparator() == null )
130 {
131 throw new IllegalArgumentException( "Comparator should not be null" );
132 }
133
134
135
136 rootPage = new InMemoryLeaf<K, V>( this );
137
138
139 try
140 {
141 init();
142 }
143 catch ( IOException ioe )
144 {
145 throw new InitializationException( ioe.getMessage() );
146 }
147 }
148
149
150
151
152
153
154
155 public void init() throws IOException
156 {
157
158 if ( envDir != null )
159 {
160 if ( !envDir.exists() )
161 {
162 boolean created = envDir.mkdirs();
163 if ( !created )
164 {
165 throw new IllegalStateException( "Could not create the directory " + envDir + " for storing data" );
166 }
167 }
168
169 this.file = new File( envDir, btreeHeader.getName() + DATA_SUFFIX );
170
171 this.journal = new File( envDir, file.getName() + JOURNAL_SUFFIX );
172 setType( BTreeTypeEnum.BACKED_ON_DISK );
173 }
174
175
176 readTransactions = new ConcurrentLinkedQueue<ReadTransaction<K, V>>();
177
178 writeLock = new ReentrantLock();
179
180
181
182 if ( getType() == BTreeTypeEnum.BACKED_ON_DISK )
183 {
184 if ( file.length() > 0 )
185 {
186
187 load( file );
188 }
189
190 withJournal = true;
191
192 FileOutputStream stream = new FileOutputStream( journal );
193 journalChannel = stream.getChannel();
194
195
196
197 if ( journal.length() > 0 )
198 {
199 applyJournal();
200 }
201 }
202 else
203 {
204 setType( BTreeTypeEnum.IN_MEMORY );
205 }
206
207
208
209
210 }
211
212
213
214
215
216 public void close() throws IOException
217 {
218
219
220
221
222 if ( getType() == BTreeTypeEnum.BACKED_ON_DISK )
223 {
224
225 flush();
226 journalChannel.close();
227 }
228
229 rootPage = null;
230 }
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245 protected Tuple<K, V> delete( K key, V value, long revision ) throws IOException
246 {
247 writeLock.lock();
248
249 try
250 {
251
252
253 Tuple<K, V> tuple = null;
254
255
256
257 DeleteResult<K, V> result = rootPage.delete( revision, key, value, null, -1 );
258
259 if ( result instanceof NotPresentResult )
260 {
261
262 return null;
263 }
264
265
266 Page<K, V> oldRootPage = rootPage;
267
268 if ( result instanceof RemoveResult )
269 {
270
271 RemoveResult<K, V> removeResult = ( RemoveResult<K, V> ) result;
272
273 Page<K, V> modifiedPage = removeResult.getModifiedPage();
274
275
276 rootPage = modifiedPage;
277 tuple = removeResult.getRemovedElement();
278 }
279
280 if ( withJournal )
281 {
282
283 writeToJournal( new Deletion<K, V>( key ) );
284 }
285
286
287 if ( tuple != null )
288 {
289 btreeHeader.decrementNbElems();
290 }
291
292
293 return tuple;
294 }
295 finally
296 {
297
298 writeLock.unlock();
299 }
300 }
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317 {
318 if ( key == null )
319 {
320 throw new IllegalArgumentException( "Key must not be null" );
321 }
322
323
324
325 V modifiedValue = null;
326
327
328
329
330 InsertResult<K, V> result = rootPage.insert( revision, key, value );
331
332 if ( result instanceof ModifyResult )
333 {
334 ModifyResult<K, V> modifyResult = ( ( ModifyResult<K, V> ) result );
335
336 Page<K, V> modifiedPage = modifyResult.getModifiedPage();
337
338
339
340 rootPage = modifiedPage;
341
342 modifiedValue = modifyResult.getModifiedValue();
343 }
344 else
345 {
346
347
348 SplitResult<K, V> splitResult = ( ( SplitResult<K, V> ) result );
349
350 K pivot = splitResult.getPivot();
351 Page<K, V> leftPage = splitResult.getLeftPage();
352 Page<K, V> rightPage = splitResult.getRightPage();
353 Page<K, V> newRootPage = null;
354
355
356 newRootPage = new InMemoryNode<K, V>( this, revision, pivot, leftPage, rightPage );
357
358 rootPage = newRootPage;
359 }
360
361
362 if ( withJournal )
363 {
364 writeToJournal( new Addition<K, V>( key, value ) );
365 }
366
367
368
369 if ( modifiedValue == null )
370 {
371 btreeHeader.incrementNbElems();
372 }
373
374
375 return result;
376 }
377
378
379
380
381
382
383
384
385
386
387 private void writeBuffer( FileChannel channel, ByteBuffer bb, byte[] buffer ) throws IOException
388 {
389 int size = buffer.length;
390 int pos = 0;
391
392
393 do
394 {
395 if ( bb.remaining() >= size )
396 {
397
398 bb.put( buffer, pos, size );
399 size = 0;
400 }
401 else
402 {
403
404 int len = bb.remaining();
405 size -= len;
406 bb.put( buffer, pos, len );
407 pos += len;
408
409 bb.flip();
410
411 channel.write( bb );
412
413 bb.clear();
414 }
415 }
416 while ( size > 0 );
417 }
418
419
420
421
422
423
424 public void flush( File file ) throws IOException
425 {
426 File parentFile = file.getParentFile();
427 File baseDirectory = null;
428
429 if ( parentFile != null )
430 {
431 baseDirectory = new File( file.getParentFile().getAbsolutePath() );
432 }
433 else
434 {
435 baseDirectory = new File( "." );
436 }
437
438
439 File tmpFileFD = File.createTempFile( "mavibot", null, baseDirectory );
440 FileOutputStream stream = new FileOutputStream( tmpFileFD );
441 FileChannel ch = stream.getChannel();
442
443
444 ByteBuffer bb = ByteBuffer.allocateDirect( writeBufferSize );
445
446 TupleCursor<K, V> cursor = browse();
447
448 if ( keySerializer == null )
449 {
450 throw new MissingSerializerException( "Cannot flush the btree without a Key serializer" );
451 }
452
453 if ( valueSerializer == null )
454 {
455 throw new MissingSerializerException( "Cannot flush the btree without a Value serializer" );
456 }
457
458
459 bb.putLong( btreeHeader.getNbElems() );
460
461 while ( cursor.hasNext() )
462 {
463 Tuple<K, V> tuple = cursor.next();
464
465 byte[] keyBuffer = keySerializer.serialize( tuple.getKey() );
466
467 writeBuffer( ch, bb, keyBuffer );
468
469 byte[] valueBuffer = valueSerializer.serialize( tuple.getValue() );
470
471 writeBuffer( ch, bb, valueBuffer );
472 }
473
474
475 if ( bb.position() > 0 )
476 {
477 bb.flip();
478 ch.write( bb );
479 }
480
481
482 ch.force( true );
483 ch.close();
484
485
486 File backupFile = File.createTempFile( "mavibot", null, baseDirectory );
487 file.renameTo( backupFile );
488
489
490 tmpFileFD.renameTo( file );
491
492
493 backupFile.delete();
494 }
495
496
497
498
499
500
501
502 private void applyJournal() throws IOException
503 {
504 long revision = generateRevision();
505
506 if ( !journal.exists() )
507 {
508 throw new IOException( "The journal does not exist" );
509 }
510
511 FileChannel channel =
512 new RandomAccessFile( journal, "rw" ).getChannel();
513 ByteBuffer buffer = ByteBuffer.allocate( 65536 );
514
515 BufferHandler bufferHandler = new BufferHandler( channel, buffer );
516
517
518 try
519 {
520 while ( true )
521 {
522
523 byte[] type = bufferHandler.read( 1 );
524
525 if ( type[0] == Modification.ADDITION )
526 {
527
528 K key = keySerializer.deserialize( bufferHandler );
529
530
531
532
533 V value = valueSerializer.deserialize( bufferHandler );
534
535
536
537
538 insert( key, value, revision );
539 }
540 else
541 {
542
543 K key = keySerializer.deserialize( bufferHandler );
544
545
546 delete( key, revision );
547 }
548 }
549 }
550 catch ( EOFException eofe )
551 {
552 eofe.printStackTrace();
553
554 journalChannel.truncate( 0 );
555 }
556 }
557
558
559
560
561
562
563
564
565
566 public void load( File file ) throws IOException
567 {
568 long revision = generateRevision();
569
570 if ( !file.exists() )
571 {
572 throw new IOException( "The file does not exist" );
573 }
574
575 FileChannel channel =
576 new RandomAccessFile( file, "rw" ).getChannel();
577 ByteBuffer buffer = ByteBuffer.allocate( 65536 );
578
579 BufferHandler bufferHandler = new BufferHandler( channel, buffer );
580
581 long nbElems = LongSerializer.deserialize( bufferHandler.read( 8 ) );
582 btreeHeader.setNbElems( nbElems );
583
584
585
586
587
588
589 boolean isJournalActivated = withJournal;
590
591 withJournal = false;
592
593
594 for ( long i = 0; i < nbElems; i++ )
595 {
596
597 K key = keySerializer.deserialize( bufferHandler );
598
599
600
601
602 V value = valueSerializer.deserialize( bufferHandler );
603
604
605
606
607 insert( key, value, revision );
608 }
609
610
611 withJournal = isJournalActivated;
612
613
614
615 }
616
617
618
619
620
621
622
623
624
625
626 public Page<K, V> getRootPage( long revision ) throws IOException, KeyNotFoundException
627 {
628
629 return rootPage;
630 }
631
632
633
634
635
636
637 public void flush() throws IOException
638 {
639 if ( getType() == BTreeTypeEnum.BACKED_ON_DISK )
640 {
641
642 flush( file );
643 journalChannel.truncate( 0 );
644 }
645 }
646
647
648
649
650
651 public File getFile()
652 {
653 return file;
654 }
655
656
657
658
659
660 public File getJournal()
661 {
662 return journal;
663 }
664
665
666
667
668
669 public boolean isInMemory()
670 {
671 return getType() == BTreeTypeEnum.IN_MEMORY;
672 }
673
674
675
676
677
678 public boolean isPersistent()
679 {
680 return getType() == BTreeTypeEnum.IN_MEMORY;
681 }
682
683
684 private void writeToJournal( Modification<K, V> modification )
685 throws IOException
686 {
687 if ( modification instanceof Addition )
688 {
689 byte[] keyBuffer = keySerializer.serialize( modification.getKey() );
690 ByteBuffer bb = ByteBuffer.allocateDirect( keyBuffer.length + 1 );
691 bb.put( Modification.ADDITION );
692 bb.put( keyBuffer );
693 bb.flip();
694
695 journalChannel.write( bb );
696
697 byte[] valueBuffer = valueSerializer.serialize( modification.getValue() );
698 bb = ByteBuffer.allocateDirect( valueBuffer.length );
699 bb.put( valueBuffer );
700 bb.flip();
701
702 journalChannel.write( bb );
703 }
704 else if ( modification instanceof Deletion )
705 {
706 byte[] keyBuffer = keySerializer.serialize( modification.getKey() );
707 ByteBuffer bb = ByteBuffer.allocateDirect( keyBuffer.length + 1 );
708 bb.put( Modification.DELETION );
709 bb.put( keyBuffer );
710 bb.flip();
711
712 journalChannel.write( bb );
713 }
714
715
716 journalChannel.force( true );
717 }
718
719
720
721
722
723 public void beginTransaction()
724 {
725
726 }
727
728
729
730
731
732 public void commit()
733 {
734
735 }
736
737
738
739
740
741 public void rollback()
742 {
743
744 }
745
746
747
748
749
750 public String toString()
751 {
752 StringBuilder sb = new StringBuilder();
753
754 switch ( getType() )
755 {
756 case IN_MEMORY:
757 sb.append( "In-memory " );
758 break;
759
760 case BACKED_ON_DISK:
761 sb.append( "Persistent " );
762 break;
763
764 }
765
766 sb.append( "BTree" );
767 sb.append( "[" ).append( btreeHeader.getName() ).append( "]" );
768 sb.append( "( pageSize:" ).append( btreeHeader.getPageSize() );
769
770 if ( rootPage != null )
771 {
772 sb.append( ", nbEntries:" ).append( btreeHeader.getNbElems() );
773 }
774 else
775 {
776 sb.append( ", nbEntries:" ).append( 0 );
777 }
778
779 sb.append( ", comparator:" );
780
781 if ( keySerializer.getComparator() == null )
782 {
783 sb.append( "null" );
784 }
785 else
786 {
787 sb.append( keySerializer.getComparator().getClass().getSimpleName() );
788 }
789
790 sb.append( ", DuplicatesAllowed: " ).append( btreeHeader.isAllowDuplicates() );
791
792 if ( getType() == BTreeTypeEnum.BACKED_ON_DISK )
793 {
794 try
795 {
796 sb.append( ", file : " );
797
798 if ( file != null )
799 {
800 sb.append( file.getCanonicalPath() );
801 }
802 else
803 {
804 sb.append( "Unknown" );
805 }
806
807 sb.append( ", journal : " );
808
809 if ( journal != null )
810 {
811 sb.append( journal.getCanonicalPath() );
812 }
813 else
814 {
815 sb.append( "Unkown" );
816 }
817 }
818 catch ( IOException ioe )
819 {
820
821 }
822 }
823
824 sb.append( ") : \n" );
825 sb.append( rootPage.dumpPage( "" ) );
826
827 return sb.toString();
828 }
829 }