View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.directory.mavibot.btree;
21  
22  
23  import java.io.Closeable;
24  import java.io.EOFException;
25  import java.io.File;
26  import java.io.FileOutputStream;
27  import java.io.IOException;
28  import java.io.RandomAccessFile;
29  import java.nio.ByteBuffer;
30  import java.nio.channels.FileChannel;
31  import java.util.concurrent.ConcurrentLinkedQueue;
32  import java.util.concurrent.locks.ReentrantLock;
33  
34  import org.apache.directory.mavibot.btree.exception.InitializationException;
35  import org.apache.directory.mavibot.btree.exception.KeyNotFoundException;
36  import org.apache.directory.mavibot.btree.exception.MissingSerializerException;
37  import org.apache.directory.mavibot.btree.serializer.BufferHandler;
38  import org.apache.directory.mavibot.btree.serializer.LongSerializer;
39  import org.slf4j.Logger;
40  import org.slf4j.LoggerFactory;
41  
42  
43  /**
44   * The B+Tree MVCC data structure.
45   *
46   * @param <K> The type for the keys
47   * @param <V> The type for the stored values
48   *
49   * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
50   */
51  /* No qualifier */class InMemoryBTree<K, V> extends AbstractBTree<K, V> implements Closeable
52  {
53      /** The LoggerFactory used by this class */
54      protected static final Logger LOG = LoggerFactory.getLogger( InMemoryBTree.class );
55  
56      /** The default journal name */
57      public static final String DEFAULT_JOURNAL = "mavibot.log";
58  
59      /** The default data file suffix */
60      public static final String DATA_SUFFIX = ".db";
61  
62      /** The default journal file suffix */
63      public static final String JOURNAL_SUFFIX = ".log";
64  
65      /** The type to use to create the keys */
66      /** The associated file. If null, this is an in-memory btree  */
67      private File file;
68  
69      /** A flag used to tell the BTree that the journal is activated */
70      private boolean withJournal;
71  
72      /** The associated journal. If null, this is an in-memory btree  */
73      private File journal;
74  
75      private File envDir;
76  
77      private FileChannel journalChannel = null;
78  
79  
80      /**
81       * Creates a new BTree, with no initialization.
82       */
83      /* no qualifier */InMemoryBTree()
84      {
85          super();
86          btreeHeader = new BTreeHeader();
87          setType( BTreeTypeEnum.IN_MEMORY );
88      }
89  
90  
91      /**
92       * Creates a new in-memory BTree using the BTreeConfiguration to initialize the
93       * BTree
94       *
95       * @param configuration The configuration to use
96       */
97      /* no qualifier */InMemoryBTree( InMemoryBTreeConfiguration<K, V> configuration )
98      {
99          super();
100         String name = configuration.getName();
101 
102         if ( name == null )
103         {
104             throw new IllegalArgumentException( "BTree name cannot be null" );
105         }
106 
107         String filePath = configuration.getFilePath();
108 
109         if ( filePath != null )
110         {
111             envDir = new File( filePath );
112         }
113 
114         btreeHeader = new BTreeHeader();
115         btreeHeader.setName( name );
116         btreeHeader.setPageSize( configuration.getPageSize() );
117 
118         keySerializer = configuration.getKeySerializer();
119         btreeHeader.setKeySerializerFQCN( keySerializer.getClass().getName() );
120 
121         valueSerializer = configuration.getValueSerializer();
122         btreeHeader.setValueSerializerFQCN( valueSerializer.getClass().getName() );
123 
124         readTimeOut = configuration.getReadTimeOut();
125         writeBufferSize = configuration.getWriteBufferSize();
126         btreeHeader.setAllowDuplicates( configuration.isAllowDuplicates() );
127         setType( configuration.getType() );
128 
129         if ( keySerializer.getComparator() == null )
130         {
131             throw new IllegalArgumentException( "Comparator should not be null" );
132         }
133 
134         // Create the first root page, with revision 0L. It will be empty
135         // and increment the revision at the same time
136         rootPage = new InMemoryLeaf<K, V>( this );
137 
138         // Now, initialize the BTree
139         try
140         {
141             init();
142         }
143         catch ( IOException ioe )
144         {
145             throw new InitializationException( ioe.getMessage() );
146         }
147     }
148 
149 
150     /**
151      * Initialize the BTree.
152      *
153      * @throws IOException If we get some exception while initializing the BTree
154      */
155     public void init() throws IOException
156     {
157         // if not in-memory then default to persist mode instead of managed
158         if ( envDir != null )
159         {
160             if ( !envDir.exists() )
161             {
162                 boolean created = envDir.mkdirs();
163                 if ( !created )
164                 {
165                     throw new IllegalStateException( "Could not create the directory " + envDir + " for storing data" );
166                 }
167             }
168 
169             this.file = new File( envDir, btreeHeader.getName() + DATA_SUFFIX );
170 
171             this.journal = new File( envDir, file.getName() + JOURNAL_SUFFIX );
172             setType( BTreeTypeEnum.BACKED_ON_DISK );
173         }
174 
175         // Create the queue containing the pending read transactions
176         readTransactions = new ConcurrentLinkedQueue<ReadTransaction<K, V>>();
177 
178         writeLock = new ReentrantLock();
179 
180         // Check the files and create them if missing
181         // Create the queue containing the modifications, if it's not a in-memory btree
182         if ( getType() == BTreeTypeEnum.BACKED_ON_DISK )
183         {
184             if ( file.length() > 0 )
185             {
186                 // We have some existing file, load it
187                 load( file );
188             }
189 
190             withJournal = true;
191 
192             FileOutputStream stream = new FileOutputStream( journal );
193             journalChannel = stream.getChannel();
194 
195             // If the journal is not empty, we have to read it
196             // and to apply all the modifications to the current file
197             if ( journal.length() > 0 )
198             {
199                 applyJournal();
200             }
201         }
202         else
203         {
204             setType( BTreeTypeEnum.IN_MEMORY );
205         }
206 
207         // Initialize the txnManager thread
208         //FIXME we should NOT create a new transaction manager thread for each BTree
209         //createTransactionManager();
210     }
211 
212 
213     /**
214      * Close the BTree, cleaning up all the data structure
215      */
216     public void close() throws IOException
217     {
218         // Stop the readTransaction thread
219         // readTransactionsThread.interrupt();
220         // readTransactions.clear();
221 
222         if ( getType() == BTreeTypeEnum.BACKED_ON_DISK )
223         {
224             // Flush the data
225             flush();
226             journalChannel.close();
227         }
228 
229         rootPage = null;
230     }
231 
232 
233     /**
234      *
235      * Deletes the given <key,value> pair if both key and value match. If the given value is null
236      * and there is no null value associated with the given key then the entry with the given key
237      * will be removed.
238      *
239      * @param key The key to be removed
240      * @param value The value to be removed (can be null, and when no null value exists the key will be removed irrespective of the value)
241      * @param revision The revision to be associated with this operation
242      * @return
243      * @throws IOException
244      */
245     protected Tuple<K, V> delete( K key, V value, long revision ) throws IOException
246     {
247         writeLock.lock();
248 
249         try
250         {
251             // If the key exists, the existing value will be replaced. We store it
252             // to return it to the caller.
253             Tuple<K, V> tuple = null;
254 
255             // Try to delete the entry starting from the root page. Here, the root
256             // page may be either a Node or a Leaf
257             DeleteResult<K, V> result = rootPage.delete( revision, key, value, null, -1 );
258 
259             if ( result instanceof NotPresentResult )
260             {
261                 // Key not found.
262                 return null;
263             }
264 
265             // Keep the oldRootPage so that we can later access it
266             Page<K, V> oldRootPage = rootPage;
267 
268             if ( result instanceof RemoveResult )
269             {
270                 // The element was found, and removed
271                 RemoveResult<K, V> removeResult = ( RemoveResult<K, V> ) result;
272 
273                 Page<K, V> modifiedPage = removeResult.getModifiedPage();
274 
275                 // This is a new root
276                 rootPage = modifiedPage;
277                 tuple = removeResult.getRemovedElement();
278             }
279 
280             if ( withJournal )
281             {
282                 // Inject the modification into the modification queue
283                 writeToJournal( new Deletion<K, V>( key ) );
284             }
285 
286             // Decrease the number of elements in the current tree if the deletion is successful
287             if ( tuple != null )
288             {
289                 btreeHeader.decrementNbElems();
290             }
291 
292             // Return the value we have found if it was modified
293             return tuple;
294         }
295         finally
296         {
297             // See above
298             writeLock.unlock();
299         }
300     }
301 
302 
303     /**
304      * Insert an entry in the BTree.
305      * <p>
306      * We will replace the value if the provided key already exists in the
307      * btree.
308      * <p>
309      * The revision number is the revision to use to insert the data.
310      *
311      * @param key Inserted key
312      * @param value Inserted value
313      * @param revision The revision to use
314      * @return an instance of the InsertResult.
315      */
316     /* no qualifier */InsertResult<K, V> insert( K key, V value, long revision ) throws IOException
317     {
318         if ( key == null )
319         {
320             throw new IllegalArgumentException( "Key must not be null" );
321         }
322 
323         // If the key exists, the existing value will be replaced. We store it
324         // to return it to the caller.
325         V modifiedValue = null;
326 
327         // Try to insert the new value in the tree at the right place,
328         // starting from the root page. Here, the root page may be either
329         // a Node or a Leaf
330         InsertResult<K, V> result = rootPage.insert( revision, key, value );
331 
332         if ( result instanceof ModifyResult )
333         {
334             ModifyResult<K, V> modifyResult = ( ( ModifyResult<K, V> ) result );
335 
336             Page<K, V> modifiedPage = modifyResult.getModifiedPage();
337 
338             // The root has just been modified, we haven't split it
339             // Get it and make it the current root page
340             rootPage = modifiedPage;
341 
342             modifiedValue = modifyResult.getModifiedValue();
343         }
344         else
345         {
346             // We have split the old root, create a new one containing
347             // only the pivotal we got back
348             SplitResult<K, V> splitResult = ( ( SplitResult<K, V> ) result );
349 
350             K pivot = splitResult.getPivot();
351             Page<K, V> leftPage = splitResult.getLeftPage();
352             Page<K, V> rightPage = splitResult.getRightPage();
353             Page<K, V> newRootPage = null;
354 
355             // Create the new rootPage
356             newRootPage = new InMemoryNode<K, V>( this, revision, pivot, leftPage, rightPage );
357 
358             rootPage = newRootPage;
359         }
360 
361         // Inject the modification into the modification queue
362         if ( withJournal )
363         {
364             writeToJournal( new Addition<K, V>( key, value ) );
365         }
366 
367         // Increase the number of element in the current tree if the insertion is successful
368         // and does not replace an element
369         if ( modifiedValue == null )
370         {
371             btreeHeader.incrementNbElems();
372         }
373 
374         // Return the value we have found if it was modified
375         return result;
376     }
377 
378 
379     /**
380      * Write the data in the ByteBuffer, and eventually on disk if needed.
381      *
382      * @param channel The channel we want to write to
383      * @param bb The ByteBuffer we want to feed
384      * @param buffer The data to inject
385      * @throws IOException If the write failed
386      */
387     private void writeBuffer( FileChannel channel, ByteBuffer bb, byte[] buffer ) throws IOException
388     {
389         int size = buffer.length;
390         int pos = 0;
391 
392         // Loop until we have written all the data
393         do
394         {
395             if ( bb.remaining() >= size )
396             {
397                 // No flush, as the ByteBuffer is big enough
398                 bb.put( buffer, pos, size );
399                 size = 0;
400             }
401             else
402             {
403                 // Flush the data on disk, reinitialize the ByteBuffer
404                 int len = bb.remaining();
405                 size -= len;
406                 bb.put( buffer, pos, len );
407                 pos += len;
408 
409                 bb.flip();
410 
411                 channel.write( bb );
412 
413                 bb.clear();
414             }
415         }
416         while ( size > 0 );
417     }
418 
419 
420     /**
421      * Flush the latest revision to disk
422      * @param file The file into which the data will be written
423      */
424     public void flush( File file ) throws IOException
425     {
426         File parentFile = file.getParentFile();
427         File baseDirectory = null;
428 
429         if ( parentFile != null )
430         {
431             baseDirectory = new File( file.getParentFile().getAbsolutePath() );
432         }
433         else
434         {
435             baseDirectory = new File( "." );
436         }
437 
438         // Create a temporary file in the same directory to flush the current btree
439         File tmpFileFD = File.createTempFile( "mavibot", null, baseDirectory );
440         FileOutputStream stream = new FileOutputStream( tmpFileFD );
441         FileChannel ch = stream.getChannel();
442 
443         // Create a buffer containing 200 4Kb pages (around 1Mb)
444         ByteBuffer bb = ByteBuffer.allocateDirect( writeBufferSize );
445 
446         TupleCursor<K, V> cursor = browse();
447 
448         if ( keySerializer == null )
449         {
450             throw new MissingSerializerException( "Cannot flush the btree without a Key serializer" );
451         }
452 
453         if ( valueSerializer == null )
454         {
455             throw new MissingSerializerException( "Cannot flush the btree without a Value serializer" );
456         }
457 
458         // Write the number of elements first
459         bb.putLong( btreeHeader.getNbElems() );
460 
461         while ( cursor.hasNext() )
462         {
463             Tuple<K, V> tuple = cursor.next();
464 
465             byte[] keyBuffer = keySerializer.serialize( tuple.getKey() );
466 
467             writeBuffer( ch, bb, keyBuffer );
468 
469             byte[] valueBuffer = valueSerializer.serialize( tuple.getValue() );
470 
471             writeBuffer( ch, bb, valueBuffer );
472         }
473 
474         // Write the buffer if needed
475         if ( bb.position() > 0 )
476         {
477             bb.flip();
478             ch.write( bb );
479         }
480 
481         // Flush to the disk for real
482         ch.force( true );
483         ch.close();
484 
485         // Rename the current file to save a backup
486         File backupFile = File.createTempFile( "mavibot", null, baseDirectory );
487         file.renameTo( backupFile );
488 
489         // Rename the temporary file to the initial file
490         tmpFileFD.renameTo( file );
491 
492         // We can now delete the backup file
493         backupFile.delete();
494     }
495 
496 
497     /**
498      * Inject all the modification from the journal into the btree
499      *
500      * @throws IOException If we had some issue while reading the journal
501      */
502     private void applyJournal() throws IOException
503     {
504         long revision = generateRevision();
505 
506         if ( !journal.exists() )
507         {
508             throw new IOException( "The journal does not exist" );
509         }
510 
511         FileChannel channel =
512             new RandomAccessFile( journal, "rw" ).getChannel();
513         ByteBuffer buffer = ByteBuffer.allocate( 65536 );
514 
515         BufferHandler bufferHandler = new BufferHandler( channel, buffer );
516 
517         // Loop on all the elements, store them in lists atm
518         try
519         {
520             while ( true )
521             {
522                 // Read the type
523                 byte[] type = bufferHandler.read( 1 );
524 
525                 if ( type[0] == Modification.ADDITION )
526                 {
527                     // Read the key
528                     K key = keySerializer.deserialize( bufferHandler );
529 
530                     //keys.add( key );
531 
532                     // Read the value
533                     V value = valueSerializer.deserialize( bufferHandler );
534 
535                     //values.add( value );
536 
537                     // Inject the data in the tree. (to be replaced by a bulk load)
538                     insert( key, value, revision );
539                 }
540                 else
541                 {
542                     // Read the key
543                     K key = keySerializer.deserialize( bufferHandler );
544 
545                     // Remove the key from the tree
546                     delete( key, revision );
547                 }
548             }
549         }
550         catch ( EOFException eofe )
551         {
552             eofe.printStackTrace();
553             // Done reading the journal. truncate it
554             journalChannel.truncate( 0 );
555         }
556     }
557 
558 
559     /**
560      * Read the data from the disk into this BTree. All the existing data in the
561      * BTree are kept, the read data will be associated with a new revision.
562      *
563      * @param file
564      * @throws IOException
565      */
566     public void load( File file ) throws IOException
567     {
568         long revision = generateRevision();
569 
570         if ( !file.exists() )
571         {
572             throw new IOException( "The file does not exist" );
573         }
574 
575         FileChannel channel =
576             new RandomAccessFile( file, "rw" ).getChannel();
577         ByteBuffer buffer = ByteBuffer.allocate( 65536 );
578 
579         BufferHandler bufferHandler = new BufferHandler( channel, buffer );
580 
581         long nbElems = LongSerializer.deserialize( bufferHandler.read( 8 ) );
582         btreeHeader.setNbElems( nbElems );
583 
584         // Prepare a list of keys and values read from the disk
585         //List<K> keys = new ArrayList<K>();
586         //List<V> values = new ArrayList<V>();
587 
588         // desactivate the journal while we load the file
589         boolean isJournalActivated = withJournal;
590 
591         withJournal = false;
592 
593         // Loop on all the elements, store them in lists atm
594         for ( long i = 0; i < nbElems; i++ )
595         {
596             // Read the key
597             K key = keySerializer.deserialize( bufferHandler );
598 
599             //keys.add( key );
600 
601             // Read the value
602             V value = valueSerializer.deserialize( bufferHandler );
603 
604             //values.add( value );
605 
606             // Inject the data in the tree. (to be replaced by a bulk load)
607             insert( key, value, revision );
608         }
609 
610         // Restore the withJournal value
611         withJournal = isJournalActivated;
612 
613         // Now, process the lists to create the btree
614         // TODO... BulkLoad
615     }
616 
617 
618     /**
619      * Get the rootPzge associated to a give revision.
620      *
621      * @param revision The revision we are looking for
622      * @return The rootPage associated to this revision
623      * @throws IOException If we had an issue while accessing the underlying file
624      * @throws KeyNotFoundException If the revision does not exist for this Btree
625      */
626     public Page<K, V> getRootPage( long revision ) throws IOException, KeyNotFoundException
627     {
628         // Atm, the in-memory BTree does not support searches in many revisions
629         return rootPage;
630     }
631 
632 
633     /**
634      * Flush the latest revision to disk. We will replace the current file by the new one, as
635      * we flush in a temporary file.
636      */
637     public void flush() throws IOException
638     {
639         if ( getType() == BTreeTypeEnum.BACKED_ON_DISK )
640         {
641             // Then flush the file
642             flush( file );
643             journalChannel.truncate( 0 );
644         }
645     }
646 
647 
648     /**
649      * @return the file
650      */
651     public File getFile()
652     {
653         return file;
654     }
655 
656 
657     /**
658      * @return the journal
659      */
660     public File getJournal()
661     {
662         return journal;
663     }
664 
665 
666     /**
667      * @return true if the BTree is fully in memory
668      */
669     public boolean isInMemory()
670     {
671         return getType() == BTreeTypeEnum.IN_MEMORY;
672     }
673 
674 
675     /**
676      * @return true if the BTree is persisted on disk
677      */
678     public boolean isPersistent()
679     {
680         return getType() == BTreeTypeEnum.IN_MEMORY;
681     }
682 
683 
684     private void writeToJournal( Modification<K, V> modification )
685         throws IOException
686     {
687         if ( modification instanceof Addition )
688         {
689             byte[] keyBuffer = keySerializer.serialize( modification.getKey() );
690             ByteBuffer bb = ByteBuffer.allocateDirect( keyBuffer.length + 1 );
691             bb.put( Modification.ADDITION );
692             bb.put( keyBuffer );
693             bb.flip();
694 
695             journalChannel.write( bb );
696 
697             byte[] valueBuffer = valueSerializer.serialize( modification.getValue() );
698             bb = ByteBuffer.allocateDirect( valueBuffer.length );
699             bb.put( valueBuffer );
700             bb.flip();
701 
702             journalChannel.write( bb );
703         }
704         else if ( modification instanceof Deletion )
705         {
706             byte[] keyBuffer = keySerializer.serialize( modification.getKey() );
707             ByteBuffer bb = ByteBuffer.allocateDirect( keyBuffer.length + 1 );
708             bb.put( Modification.DELETION );
709             bb.put( keyBuffer );
710             bb.flip();
711 
712             journalChannel.write( bb );
713         }
714 
715         // Flush to the disk for real
716         journalChannel.force( true );
717     }
718 
719 
720     /**
721      * Starts a transaction
722      */
723     public void beginTransaction()
724     {
725         // Does nothing...
726     }
727 
728 
729     /**
730      * Commits a transaction
731      */
732     public void commit()
733     {
734         // Does nothing...
735     }
736 
737 
738     /**
739      * Rollback a transaction
740      */
741     public void rollback()
742     {
743         // Does nothing...
744     }
745 
746 
747     /**
748      * @see Object#toString()
749      */
750     public String toString()
751     {
752         StringBuilder sb = new StringBuilder();
753 
754         switch ( getType() )
755         {
756             case IN_MEMORY:
757                 sb.append( "In-memory " );
758                 break;
759 
760             case BACKED_ON_DISK:
761                 sb.append( "Persistent " );
762                 break;
763 
764         }
765 
766         sb.append( "BTree" );
767         sb.append( "[" ).append( btreeHeader.getName() ).append( "]" );
768         sb.append( "( pageSize:" ).append( btreeHeader.getPageSize() );
769 
770         if ( rootPage != null )
771         {
772             sb.append( ", nbEntries:" ).append( btreeHeader.getNbElems() );
773         }
774         else
775         {
776             sb.append( ", nbEntries:" ).append( 0 );
777         }
778 
779         sb.append( ", comparator:" );
780 
781         if ( keySerializer.getComparator() == null )
782         {
783             sb.append( "null" );
784         }
785         else
786         {
787             sb.append( keySerializer.getComparator().getClass().getSimpleName() );
788         }
789 
790         sb.append( ", DuplicatesAllowed: " ).append( btreeHeader.isAllowDuplicates() );
791 
792         if ( getType() == BTreeTypeEnum.BACKED_ON_DISK )
793         {
794             try
795             {
796                 sb.append( ", file : " );
797 
798                 if ( file != null )
799                 {
800                     sb.append( file.getCanonicalPath() );
801                 }
802                 else
803                 {
804                     sb.append( "Unknown" );
805                 }
806 
807                 sb.append( ", journal : " );
808 
809                 if ( journal != null )
810                 {
811                     sb.append( journal.getCanonicalPath() );
812                 }
813                 else
814                 {
815                     sb.append( "Unkown" );
816                 }
817             }
818             catch ( IOException ioe )
819             {
820                 // There is little we can do here...
821             }
822         }
823 
824         sb.append( ") : \n" );
825         sb.append( rootPage.dumpPage( "" ) );
826 
827         return sb.toString();
828     }
829 }