View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.directory.mavibot.btree;
21  
22  
23  import java.io.Closeable;
24  import java.io.IOException;
25  import java.nio.ByteBuffer;
26  import java.nio.channels.FileChannel;
27  import java.util.Map;
28  import java.util.concurrent.ConcurrentLinkedQueue;
29  import java.util.concurrent.locks.ReentrantLock;
30  
31  import net.sf.ehcache.Cache;
32  import net.sf.ehcache.Status;
33  import net.sf.ehcache.config.CacheConfiguration;
34  
35  import org.apache.directory.mavibot.btree.exception.KeyNotFoundException;
36  import org.slf4j.Logger;
37  import org.slf4j.LoggerFactory;
38  
39  
40  /**
41   * The B+Tree MVCC data structure.
42   *
43   * @param <K> The type for the keys
44   * @param <V> The type for the stored values
45   *
46   * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
47   */
48  public class PersistedBTree<K, V> extends AbstractBTree<K, V> implements Closeable
49  {
50      /** The LoggerFactory used by this class */
51      protected static final Logger LOG = LoggerFactory.getLogger( PersistedBTree.class );
52  
53      /** The RecordManager if the BTree is managed */
54      private RecordManager recordManager;
55  
56      /** The cache associated with this BTree */
57      protected Cache cache;
58  
59      /** The default number of pages to keep in memory */
60      static final int DEFAULT_CACHE_SIZE = 1000;
61  
62      /** The cache size, default to 1000 elements */
63      protected int cacheSize = DEFAULT_CACHE_SIZE;
64  
65      /** A flag indicating if this BTree is a Sub BTree */
66      private boolean isSubBtree = false;
67  
68      /** The number of stored Values before we switch to a BTree */
69      private static final int DEFAULT_VALUE_THRESHOLD_UP = 8;
70  
71      /** The number of stored Values before we switch back to an array */
72      private static final int DEFAULT_VALUE_THRESHOLD_LOW = 1;
73  
74      /** The configuration for the array <-> BTree switch */
75      /*No qualifier*/static int valueThresholdUp = DEFAULT_VALUE_THRESHOLD_UP;
76      /*No qualifier*/static int valueThresholdLow = DEFAULT_VALUE_THRESHOLD_LOW;
77  
78      /** A lock to protect the creation of the transaction */
79      protected ReentrantLock createTransaction = new ReentrantLock();
80  
81  
82      /**
83       * Creates a new BTree, with no initialization.
84       */
85      /* no qualifier */PersistedBTree()
86      {
87          btreeHeader = new BTreeHeader();
88          setType( BTreeTypeEnum.PERSISTED );
89      }
90  
91  
92      /**
93       * Creates a new persisted BTree using the BTreeConfiguration to initialize the
94       * BTree
95       *
96       * @param configuration The configuration to use
97       */
98      /* no qualifier */PersistedBTree( PersistedBTreeConfiguration<K, V> configuration )
99      {
100         super();
101         String name = configuration.getName();
102 
103         if ( name == null )
104         {
105             throw new IllegalArgumentException( "BTree name cannot be null" );
106         }
107 
108         btreeHeader = new BTreeHeader();
109         btreeHeader.setName( name );
110         btreeHeader.setPageSize( configuration.getPageSize() );
111         isSubBtree = configuration.isSubBtree();
112 
113         keySerializer = configuration.getKeySerializer();
114         btreeHeader.setKeySerializerFQCN( keySerializer.getClass().getName() );
115 
116         valueSerializer = configuration.getValueSerializer();
117         btreeHeader.setValueSerializerFQCN( valueSerializer.getClass().getName() );
118 
119         readTimeOut = configuration.getReadTimeOut();
120         writeBufferSize = configuration.getWriteBufferSize();
121         btreeHeader.setAllowDuplicates( configuration.isAllowDuplicates() );
122         cacheSize = configuration.getCacheSize();
123         setType( BTreeTypeEnum.PERSISTED );
124 
125         if ( keySerializer.getComparator() == null )
126         {
127             throw new IllegalArgumentException( "Comparator should not be null" );
128         }
129 
130         // Create the first root page, with revision 0L. It will be empty
131         // and increment the revision at the same time
132         rootPage = new PersistedLeaf<K, V>( this );
133 
134         if ( isSubBtree )
135         {
136             // The subBTree inherit its cache from its parent BTree
137             this.cache = ( ( PersistedBTree<K, V> ) configuration.getParentBTree() ).getCache();
138             this.writeLock = ( ( PersistedBTree<K, V> ) configuration.getParentBTree() ).getWriteLock();
139             readTransactions = new ConcurrentLinkedQueue<ReadTransaction<K, V>>();
140         }
141 
142         // Now, initialize the BTree
143         init();
144     }
145 
146 
147     /**
148      * Initialize the BTree.
149      *
150      * @throws IOException If we get some exception while initializing the BTree
151      */
152     public void init()
153     {
154         if ( !isSubBtree )
155         {
156             // This is not a subBtree, we have to initialize the cache
157 
158             // Create the queue containing the pending read transactions
159             readTransactions = new ConcurrentLinkedQueue<ReadTransaction<K, V>>();
160 
161             writeLock = new ReentrantLock();
162 
163             // Initialize the caches
164             CacheConfiguration cacheConfiguration = new CacheConfiguration();
165             cacheConfiguration.setName( "pages" );
166             cacheConfiguration.setEternal( true );
167             cacheConfiguration.setOverflowToDisk( false );
168             cacheConfiguration.setCacheLoaderTimeoutMillis( 0 );
169             cacheConfiguration.setMaxElementsInMemory( cacheSize );
170             cacheConfiguration.setMemoryStoreEvictionPolicy( "LRU" );
171 
172             cache = new Cache( cacheConfiguration );
173             cache.initialise();
174         }
175 
176         // Initialize the txnManager thread
177         //FIXME we should NOT create a new transaction manager thread for each BTree
178         //createTransactionManager();
179     }
180 
181 
182     /**
183      * Return the cache we use in this BTree
184      */
185     /* No qualifier */Cache getCache()
186     {
187         return cache;
188     }
189 
190 
191     /**
192      * Return the cache we use in this BTree
193      */
194     /* No qualifier */ReentrantLock getWriteLock()
195     {
196         return writeLock;
197     }
198 
199 
200     /**
201      * Return the cache we use in this BTree
202      */
203     /* No qualifier */ConcurrentLinkedQueue<ReadTransaction<K, V>> getReadTransactions()
204     {
205         return readTransactions;
206     }
207 
208 
209     /**
210      * Close the BTree, cleaning up all the data structure
211      */
212     public void close() throws IOException
213     {
214         // Stop the readTransaction thread
215         // readTransactionsThread.interrupt();
216         // readTransactions.clear();
217 
218         // Clean the cache
219         if ( cache.getStatus() == Status.STATUS_ALIVE )
220         {
221             cache.removeAll();
222         }
223 
224         cache.dispose();
225 
226         rootPage = null;
227     }
228 
229 
230     /**
231      * @return the btreeOffset
232      */
233     /* No qualifier*/long getBtreeOffset()
234     {
235         return btreeHeader.getBTreeOffset();
236     }
237 
238 
239     /**
240      * @param btreeOffset the btreeOffset to set
241      */
242     /* No qualifier*/void setBtreeOffset( long btreeOffset )
243     {
244         btreeHeader.setBTreeOffset( btreeOffset );
245     }
246 
247 
248     /**
249      * @return the rootPageOffset
250      */
251     /* No qualifier*/long getRootPageOffset()
252     {
253         return btreeHeader.getRootPageOffset();
254     }
255 
256 
257     /**
258      * @param rootPageOffset the rootPageOffset to set
259      */
260     /* No qualifier*/void setRootPageOffset( long rootPageOffset )
261     {
262         btreeHeader.setRootPageOffset( rootPageOffset );
263     }
264 
265 
266     /**
267      * @return the nextBTreeOffset
268      */
269     /* No qualifier*/long getNextBTreeOffset()
270     {
271         return btreeHeader.getNextBTreeOffset();
272     }
273 
274 
275     /**
276      * @param nextBTreeOffset the nextBTreeOffset to set
277      */
278     /* No qualifier*/void setNextBTreeOffset( long nextBTreeOffset )
279     {
280         btreeHeader.setNextBTreeOffset( nextBTreeOffset );
281     }
282 
283 
284     /**
285      * Gets the RecordManager for a managed BTree
286      *
287      * @return The recordManager if the BTree is managed
288      */
289     /* No qualifier */RecordManager getRecordManager()
290     {
291         return recordManager;
292     }
293 
294 
295     /**
296      * Inject a RecordManager for a managed BTree
297      *
298      * @param recordManager The injected RecordManager
299      */
300     /* No qualifier */void setRecordManager( RecordManager recordManager )
301     {
302         this.recordManager = recordManager;
303     }
304 
305 
306     /**
307      *
308      * Deletes the given <key,value> pair if both key and value match. If the given value is null
309      * and there is no null value associated with the given key then the entry with the given key
310      * will be removed.
311      *
312      * @param key The key to be removed
313      * @param value The value to be removed (can be null, and when no null value exists the key will be removed irrespective of the value)
314      * @param revision The revision to be associated with this operation
315      * @return
316      * @throws IOException
317      */
318     protected Tuple<K, V> delete( K key, V value, long revision ) throws IOException
319     {
320         writeLock.lock();
321 
322         try
323         {
324             // If the key exists, the existing value will be replaced. We store it
325             // to return it to the caller.
326             Tuple<K, V> tuple = null;
327 
328             // Try to delete the entry starting from the root page. Here, the root
329             // page may be either a Node or a Leaf
330             DeleteResult<K, V> result = rootPage.delete( revision, key, value, null, -1 );
331 
332             if ( result instanceof NotPresentResult )
333             {
334                 // Key not found.
335                 return null;
336             }
337 
338             // Keep the oldRootPage so that we can later access it
339             Page<K, V> oldRootPage = rootPage;
340 
341             if ( result instanceof RemoveResult )
342             {
343                 // The element was found, and removed
344                 RemoveResult<K, V> removeResult = ( RemoveResult<K, V> ) result;
345 
346                 Page<K, V> modifiedPage = removeResult.getModifiedPage();
347 
348                 // Write the modified page on disk
349                 // Note that we don't use the holder, the new root page will
350                 // remain in memory.
351                 PageHolder<K, V> holder = writePage( modifiedPage, revision );
352 
353                 // Store the offset on disk in the page in memory
354                 ( ( AbstractPage<K, V> ) modifiedPage ).setOffset( ( ( PersistedPageHolder<K, V> ) holder )
355                     .getOffset() );
356 
357                 // Store the last offset on disk in the page in memory
358                 ( ( AbstractPage<K, V> ) modifiedPage )
359                     .setLastOffset( ( ( PersistedPageHolder<K, V> ) holder )
360                         .getLastOffset() );
361 
362                 // This is a new root
363                 rootPage = modifiedPage;
364                 tuple = removeResult.getRemovedElement();
365             }
366 
367             // Decrease the number of elements in the current tree if the deletion is successful
368             if ( tuple != null )
369             {
370                 btreeHeader.decrementNbElems();
371 
372                 // We have to update the rootPage on disk
373                 // Update the BTree header now
374                 recordManager.updateBtreeHeader( this, ( ( AbstractPage<K, V> ) rootPage ).getOffset() );
375             }
376 
377             recordManager.addFreePages( this, result.getCopiedPages() );
378 
379             // Update the RecordManager header
380             recordManager.updateRecordManagerHeader();
381 
382             // Store the created rootPage into the revision BTree, this will be stored in RecordManager only if revisions are set to keep
383             recordManager.storeRootPage( this, rootPage );
384 
385             // Return the value we have found if it was modified
386             return tuple;
387         }
388         finally
389         {
390             // See above
391             writeLock.unlock();
392         }
393     }
394 
395 
396     /**
397      * Insert an entry in the BTree.
398      * <p>
399      * We will replace the value if the provided key already exists in the
400      * btree.
401      * <p>
402      * The revision number is the revision to use to insert the data.
403      *
404      * @param key Inserted key
405      * @param value Inserted value
406      * @param revision The revision to use
407      * @return an instance of the InsertResult.
408      */
409     /* no qualifier */InsertResult<K, V> insert( K key, V value, long revision ) throws IOException
410     {
411         if ( key == null )
412         {
413             throw new IllegalArgumentException( "Key must not be null" );
414         }
415 
416         // If the key exists, the existing value will be replaced. We store it
417         // to return it to the caller.
418         V modifiedValue = null;
419 
420         // Try to insert the new value in the tree at the right place,
421         // starting from the root page. Here, the root page may be either
422         // a Node or a Leaf
423         InsertResult<K, V> result = rootPage.insert( revision, key, value );
424 
425         if ( result instanceof ModifyResult )
426         {
427             ModifyResult<K, V> modifyResult = ( ( ModifyResult<K, V> ) result );
428 
429             Page<K, V> modifiedPage = modifyResult.getModifiedPage();
430 
431             // Write the modified page on disk
432             // Note that we don't use the holder, the new root page will
433             // remain in memory.
434             writePage( modifiedPage, revision );
435 
436             // The root has just been modified, we haven't split it
437             // Get it and make it the current root page
438             rootPage = modifiedPage;
439 
440             modifiedValue = modifyResult.getModifiedValue();
441         }
442         else
443         {
444             // We have split the old root, create a new one containing
445             // only the pivotal we got back
446             SplitResult<K, V> splitResult = ( ( SplitResult<K, V> ) result );
447 
448             K pivot = splitResult.getPivot();
449             Page<K, V> leftPage = splitResult.getLeftPage();
450             Page<K, V> rightPage = splitResult.getRightPage();
451             Page<K, V> newRootPage = null;
452 
453             // If the BTree is managed, we have to write the two pages that were created
454             // and to keep a track of the two offsets for the upper node
455             PageHolder<K, V> holderLeft = writePage( leftPage, revision );
456 
457             PageHolder<K, V> holderRight = writePage( rightPage, revision );
458 
459             // Create the new rootPage
460             newRootPage = new PersistedNode<K, V>( this, revision, pivot, holderLeft, holderRight );
461 
462             // If the BTree is managed, we now have to write the page on disk
463             // and to add this page to the list of modified pages
464             PageHolder<K, V> holder = writePage( newRootPage, revision );
465 
466             rootPage = newRootPage;
467         }
468 
469         // Increase the number of element in the current tree if the insertion is successful
470         // and does not replace an element
471         if ( modifiedValue == null )
472         {
473             btreeHeader.incrementNbElems();
474         }
475 
476         // If the BTree is managed, we have to update the rootPage on disk
477         // Update the RecordManager header
478         if ( ( writeTransaction == null ) || !writeTransaction.isStarted() )
479         {
480             recordManager.updateRecordManagerHeader();
481 
482             // Update the BTree header now
483             recordManager.updateBtreeHeader( this, ( ( AbstractPage<K, V> ) rootPage ).getOffset() );
484 
485             // Moved the free pages into the list of free pages
486             recordManager.addFreePages( this, result.getCopiedPages() );
487 
488             // Store the created rootPage into the revision BTree, this will be stored in RecordManager only if revisions are set to keep
489             recordManager.storeRootPage( this, rootPage );
490         }
491 
492         // Return the value we have found if it was modified
493         return result;
494     }
495 
496 
497     /**
498      * Write the data in the ByteBuffer, and eventually on disk if needed.
499      *
500      * @param channel The channel we want to write to
501      * @param bb The ByteBuffer we want to feed
502      * @param buffer The data to inject
503      * @throws IOException If the write failed
504      */
505     private void writeBuffer( FileChannel channel, ByteBuffer bb, byte[] buffer ) throws IOException
506     {
507         int size = buffer.length;
508         int pos = 0;
509 
510         // Loop until we have written all the data
511         do
512         {
513             if ( bb.remaining() >= size )
514             {
515                 // No flush, as the ByteBuffer is big enough
516                 bb.put( buffer, pos, size );
517                 size = 0;
518             }
519             else
520             {
521                 // Flush the data on disk, reinitialize the ByteBuffer
522                 int len = bb.remaining();
523                 size -= len;
524                 bb.put( buffer, pos, len );
525                 pos += len;
526 
527                 bb.flip();
528 
529                 channel.write( bb );
530 
531                 bb.clear();
532             }
533         }
534         while ( size > 0 );
535     }
536 
537 
538     /**
539      * Write a page either in the pending pages if the transaction is started,
540      * or directly on disk.
541      */
542     private PageHolder<K, V> writePage( Page<K, V> modifiedPage, long revision ) throws IOException
543     {
544         if ( ( writeTransaction != null ) && writeTransaction.isStarted() )
545         {
546             Map<Page<?, ?>, BTree<?, ?>> pendingPages = recordManager.getPendingPages();
547             pendingPages.put( modifiedPage, this );
548 
549             PageHolder<K, V> pageHolder = new PageHolder<K, V>( this, modifiedPage );
550 
551             return pageHolder;
552         }
553         else
554         {
555             PageHolder<K, V> pageHolder = recordManager.writePage( this, modifiedPage, revision );
556 
557             return pageHolder;
558         }
559     }
560 
561     /**
562      * Get the rootPzge associated to a give revision.
563      *
564      * @param revision The revision we are looking for
565      * @return The rootPage associated to this revision
566      * @throws IOException If we had an issue while accessing the underlying file
567      * @throws KeyNotFoundException If the revision does not exist for this Btree
568      */
569     public Page<K, V> getRootPage( long revision ) throws IOException, KeyNotFoundException
570     {
571         return recordManager.getRootPage( this, revision );
572     }
573 
574 
575     /**
576      * Starts a transaction
577      */
578     public void beginTransaction()
579     {
580         createTransaction.lock();
581 
582         if ( writeTransaction == null )
583         {
584             writeTransaction = new WriteTransaction( recordManager );
585         }
586 
587         createTransaction.unlock();
588 
589         writeTransaction.start();
590     }
591 
592 
593     /**
594      * Commits a transaction
595      */
596     public void commit()
597     {
598         createTransaction.lock();
599 
600         if ( writeTransaction == null )
601         {
602             writeTransaction = new WriteTransaction( recordManager );
603         }
604 
605         createTransaction.unlock();
606 
607         writeTransaction.commit();
608     }
609 
610 
611     /**
612      * Rollback a transaction
613      */
614     public void rollback()
615     {
616         createTransaction.lock();
617 
618         if ( writeTransaction == null )
619         {
620             writeTransaction = new WriteTransaction( recordManager );
621         }
622 
623         createTransaction.unlock();
624 
625         writeTransaction.rollback();
626     }
627 
628 
629 
630 
631     /**
632      * @see Object#toString()
633      */
634     public String toString()
635     {
636         StringBuilder sb = new StringBuilder();
637 
638         sb.append( "Managed BTree" );
639         sb.append( "[" ).append( btreeHeader.getName() ).append( "]" );
640         sb.append( "( pageSize:" ).append( btreeHeader.getPageSize() );
641 
642         if ( rootPage != null )
643         {
644             sb.append( ", nbEntries:" ).append( btreeHeader.getNbElems() );
645         }
646         else
647         {
648             sb.append( ", nbEntries:" ).append( 0 );
649         }
650 
651         sb.append( ", comparator:" );
652 
653         if ( keySerializer.getComparator() == null )
654         {
655             sb.append( "null" );
656         }
657         else
658         {
659             sb.append( keySerializer.getComparator().getClass().getSimpleName() );
660         }
661 
662         sb.append( ", DuplicatesAllowed: " ).append( btreeHeader.isAllowDuplicates() );
663 
664         sb.append( ") : \n" );
665         sb.append( rootPage.dumpPage( "" ) );
666 
667         return sb.toString();
668     }
669 }