View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.directory.mavibot.btree;
21  
22  
23  import static org.apache.directory.mavibot.btree.BTreeTypeEnum.PERSISTED_SUB;
24  
25  import java.io.IOException;
26  import java.lang.reflect.Array;
27  
28  import org.apache.directory.mavibot.btree.exception.EndOfFileExceededException;
29  import org.apache.directory.mavibot.btree.exception.KeyNotFoundException;
30  
31  
32  /**
33   * A MVCC Leaf. It stores the keys and values. It does not have any children.
34   *
35   * @param <K> The type for the Key
36   * @param <V> The type for the stored value
37   *
38   * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
39   */
40  /* No qualifier */class PersistedLeaf<K, V> extends AbstractPage<K, V>
41  {
42      /** Values associated with keys */
43      protected ValueHolder<V>[] values;
44  
45  
46      /**
47       * Constructor used to create a new Leaf when we read it from a file.
48       *
49       * @param btree The BTree this page belongs to.
50       */
51      PersistedLeaf( BTree<K, V> btree )
52      {
53          super( btree );
54      }
55  
56  
57      /**
58       * Internal constructor used to create Page instance used when a page is being copied or overflow
59       *
60       * @param btree The BTree this page belongs to.
61       * @param revision The page revision
62       * @param nbElems The number of elements this page will contain
63       */
64      @SuppressWarnings("unchecked")
65      PersistedLeaf( BTree<K, V> btree, long revision, int nbElems )
66      {
67          super( btree, revision, nbElems );
68          if ( btree.getType() != BTreeTypeEnum.PERSISTED_SUB )
69          {
70              values = ( ValueHolder<V>[] ) Array.newInstance( PersistedValueHolder.class, nbElems );
71          }
72      }
73  
74  
75      /**
76       * {@inheritDoc}
77       */
78      public InsertResult<K, V> insert( K key, V value, long revision ) throws IOException
79      {
80          // Find the key into this leaf
81          int pos = findPos( key );
82  
83          boolean isSubTree = ( btree.getType() == PERSISTED_SUB );
84  
85          if ( pos < 0 )
86          {
87              // We already have the key in the page : replace the value
88              // into a copy of this page, unless the page has already be copied
89              int index = -( pos + 1 );
90  
91              if ( isSubTree )
92              {
93                  return ExistsResult.EXISTS;
94              }
95  
96              // Replace the existing value in a copy of the current page
97              InsertResult<K, V> result = replaceElement( revision, key, value, index );
98  
99              return result;
100         }
101 
102         // The key is not present in the leaf. We have to add it in the page
103         if ( nbElems < btree.getPageSize() )
104         {
105             // The current page is not full, it can contain the added element.
106             // We insert it into a copied page and return the result
107             Page<K, V> modifiedPage = null;
108 
109             if ( isSubTree )
110             {
111                 modifiedPage = addSubTreeElement( revision, key, pos );
112             }
113             else
114             {
115                 modifiedPage = addElement( revision, key, value, pos );
116             }
117 
118             InsertResult<K, V> result = new ModifyResult<K, V>( modifiedPage, null );
119             result.addCopiedPage( this );
120 
121             return result;
122         }
123         else
124         {
125             // The Page is already full : we split it and return the overflow element,
126             // after having created two pages.
127             InsertResult<K, V> result = null;
128 
129             if ( isSubTree )
130             {
131                 result = addAndSplitSubTree( revision, key, pos );
132             }
133             else
134             {
135                 result = addAndSplit( revision, key, value, pos );
136             }
137 
138             result.addCopiedPage( this );
139 
140             return result;
141         }
142     }
143 
144 
145     /**
146      * {@inheritDoc}
147      */
148     @SuppressWarnings("unchecked")
149     /* no qualifier */DeleteResult<K, V> delete( K key, V value, long revision, Page<K, V> parent, int parentPos )
150         throws IOException
151     {
152         // Check that the leaf is not empty
153         if ( nbElems == 0 )
154         {
155             // Empty leaf
156             return NotPresentResult.NOT_PRESENT;
157         }
158 
159         // Find the key in the page
160         int pos = findPos( key );
161 
162         if ( pos >= 0 )
163         {
164             // Not found : return the not present result.
165             return NotPresentResult.NOT_PRESENT;
166         }
167 
168         // Get the removed element
169         Tuple<K, V> removedElement = null;
170 
171         // flag to detect if a key was completely removed
172         boolean keyRemoved = false;
173 
174         int index = -( pos + 1 );
175 
176         boolean isNotSubTree = ( btree.getType() != PERSISTED_SUB );
177 
178         ValueHolder<V> valueHolder = null;
179 
180         if ( isNotSubTree )
181         {
182             valueHolder = values[index];
183         }
184         else
185         // set value to null, just incase if a non-null value passed while deleting a key from from sub-btree
186         {
187             value = null;
188         }
189 
190         if ( value == null )
191         {
192             // we have to delete the whole value
193             removedElement = new Tuple<K, V>( keys[index].getKey(), value ); // the entire value was removed
194             keyRemoved = true;
195         }
196         else
197         {
198             if ( valueHolder.contains( value ) )
199             {
200                 keyRemoved = ( valueHolder.size() == 1 );
201 
202                 removedElement = new Tuple<K, V>( keys[index].getKey(), value ); // only one value was removed
203             }
204             else
205             {
206                 return NotPresentResult.NOT_PRESENT;
207             }
208         }
209 
210         PersistedLeaf<K, V> newLeaf = null;
211 
212         if ( keyRemoved )
213         {
214             // No value, we can remove the key
215             newLeaf = new PersistedLeaf<K, V>( btree, revision, nbElems - 1 );
216         }
217         else
218         {
219             // Copy the page as we will delete a value from a ValueHolder
220             newLeaf = new PersistedLeaf<K, V>( btree, revision, nbElems );
221         }
222 
223         // Create the result
224         DeleteResult<K, V> defaultResult = new RemoveResult<K, V>( newLeaf, removedElement );
225 
226         // If the parent is null, then this page is the root page.
227         if ( parent == null )
228         {
229             // Just remove the entry if it's present, or replace it if we have more than one value in the ValueHolder
230             copyAfterRemovingElement( keyRemoved, value, newLeaf, index );
231 
232             // The current page is added in the copied page list
233             defaultResult.addCopiedPage( this );
234 
235             return defaultResult;
236         }
237         else if ( keyRemoved )
238         {
239             // The current page is not the root. Check if the leaf has more than N/2
240             // elements
241             int halfSize = btree.getPageSize() / 2;
242 
243             if ( nbElems == halfSize )
244             {
245                 // We have to find a sibling now, and either borrow an entry from it
246                 // if it has more than N/2 elements, or to merge the two pages.
247                 // Check in both next and previous page, if they have the same parent
248                 // and select the biggest page with the same parent to borrow an element.
249                 int siblingPos = selectSibling( parent, parentPos );
250                 PersistedLeaf<K, V> sibling = ( PersistedLeaf<K, V> ) ( ( ( PersistedNode<K, V> ) parent )
251                     .getPage( siblingPos ) );
252 
253                 if ( sibling.getNbElems() == halfSize )
254                 {
255                     // We will merge the current page with its sibling
256                     DeleteResult<K, V> result = mergeWithSibling( removedElement, revision, sibling,
257                         ( siblingPos < parentPos ), index );
258 
259                     return result;
260                 }
261                 else
262                 {
263                     // We can borrow the element from the left sibling
264                     if ( siblingPos < parentPos )
265                     {
266                         DeleteResult<K, V> result = borrowFromLeft( removedElement, revision, sibling, index );
267 
268                         return result;
269                     }
270                     else
271                     {
272                         // Borrow from the right sibling
273                         DeleteResult<K, V> result = borrowFromRight( removedElement, revision, sibling, index );
274 
275                         return result;
276                     }
277                 }
278             }
279             else
280             {
281                 // The page has more than N/2 elements.
282                 // We simply remove the element from the page, and if it was the leftmost,
283                 // we return the new pivot (it will replace any instance of the removed
284                 // key in its parents)
285                 copyAfterRemovingElement( true, value, newLeaf, index );
286 
287                 // The current page is added in the copied page list
288                 defaultResult.addCopiedPage( this );
289 
290                 return defaultResult;
291             }
292         }
293         else
294         {
295             // Last, not least : we can copy the full page
296             // Copy the keys and the values
297             System.arraycopy( keys, 0, newLeaf.keys, 0, nbElems );
298             System.arraycopy( values, 0, newLeaf.values, 0, nbElems );
299 
300             // Replace the ValueHolder now
301             try
302             {
303                 ValueHolder<V> newValueHolder = valueHolder.clone();
304                 newValueHolder.remove( value );
305 
306                 newLeaf.values[pos] = newValueHolder;
307             }
308             catch ( CloneNotSupportedException e )
309             {
310                 throw new RuntimeException( e );
311             }
312 
313             // The current page is added in the copied page list
314             defaultResult.addCopiedPage( this );
315 
316             return defaultResult;
317         }
318     }
319 
320 
321     /**
322      * Merges the sibling with the current leaf, after having removed the element in the page.
323      *
324      * @param revision The new revision
325      * @param sibling The sibling we will merge with
326      * @param isLeft Tells if the sibling is on the left or on the right
327      * @param pos The position of the removed element
328      * @return The new created leaf containing the sibling and the old page.
329      * @throws IOException If we have an error while trying to access the page
330      */
331     private DeleteResult<K, V> mergeWithSibling( Tuple<K, V> removedElement, long revision,
332         PersistedLeaf<K, V> sibling,
333         boolean isLeft, int pos )
334         throws EndOfFileExceededException, IOException
335     {
336         boolean isNotSubTree = ( btree.getType() != PERSISTED_SUB );
337 
338         // Create the new page. It will contain N - 1 elements (the maximum number)
339         // as we merge two pages that contain N/2 elements minus the one we remove
340         PersistedLeaf<K, V> newLeaf = new PersistedLeaf<K, V>( btree, revision, btree.getPageSize() - 1 );
341 
342         if ( isLeft )
343         {
344             // The sibling is on the left
345             // Copy all the elements from the sibling first
346             System.arraycopy( sibling.keys, 0, newLeaf.keys, 0, sibling.nbElems );
347             if ( isNotSubTree )
348             {
349                 System.arraycopy( sibling.values, 0, newLeaf.values, 0, sibling.nbElems );
350             }
351 
352             // Copy all the elements from the page up to the deletion position
353             System.arraycopy( keys, 0, newLeaf.keys, sibling.nbElems, pos );
354             if ( isNotSubTree )
355             {
356                 System.arraycopy( values, 0, newLeaf.values, sibling.nbElems, pos );
357             }
358 
359             // And copy the remaining elements after the deletion point
360             System.arraycopy( keys, pos + 1, newLeaf.keys, sibling.nbElems + pos, nbElems - pos - 1 );
361             if ( isNotSubTree )
362             {
363                 System.arraycopy( values, pos + 1, newLeaf.values, sibling.nbElems + pos, nbElems - pos - 1 );
364             }
365         }
366         else
367         {
368             // The sibling is on the right
369             // Copy all the elements from the page up to the deletion position
370             System.arraycopy( keys, 0, newLeaf.keys, 0, pos );
371             if ( isNotSubTree )
372             {
373                 System.arraycopy( values, 0, newLeaf.values, 0, pos );
374             }
375 
376             // Then copy the remaining elements after the deletion point
377             System.arraycopy( keys, pos + 1, newLeaf.keys, pos, nbElems - pos - 1 );
378             if ( isNotSubTree )
379             {
380                 System.arraycopy( values, pos + 1, newLeaf.values, pos, nbElems - pos - 1 );
381             }
382 
383             // And copy all the elements from the sibling
384             System.arraycopy( sibling.keys, 0, newLeaf.keys, nbElems - 1, sibling.nbElems );
385             if ( isNotSubTree )
386             {
387                 System.arraycopy( sibling.values, 0, newLeaf.values, nbElems - 1, sibling.nbElems );
388             }
389         }
390 
391         // And create the result
392         DeleteResult<K, V> result = new MergedWithSiblingResult<K, V>( newLeaf, removedElement );
393 
394         result.addCopiedPage( this );
395         result.addCopiedPage( sibling );
396 
397         return result;
398     }
399 
400 
401     /**
402      * Borrows an element from the left sibling, creating a new sibling with one
403      * less element and creating a new page where the element to remove has been
404      * deleted and the borrowed element added on the left.
405      *
406      * @param revision The new revision for all the pages
407      * @param sibling The left sibling
408      * @param pos The position of the element to remove
409      * @return The resulting pages
410      * @throws IOException If we have an error while trying to access the page
411      */
412     private DeleteResult<K, V> borrowFromLeft( Tuple<K, V> removedElement, long revision, PersistedLeaf<K, V> sibling,
413         int pos )
414         throws IOException
415     {
416         boolean isNotSubTree = ( btree.getType() != PERSISTED_SUB );
417 
418         // The sibling is on the left, borrow the rightmost element
419         K siblingKey = sibling.keys[sibling.getNbElems() - 1].getKey();
420         ValueHolder<V> siblingValue = null;
421         if ( isNotSubTree )
422         {
423             siblingValue = sibling.values[sibling.getNbElems() - 1];
424         }
425 
426         // Create the new sibling, with one less element at the end
427         PersistedLeaf<K, V> newSibling = ( PersistedLeaf<K, V> ) sibling.copy( revision, sibling.getNbElems() - 1 );
428 
429         // Create the new page and add the new element at the beginning
430         // First copy the current page, with the same size
431         PersistedLeaf<K, V> newLeaf = new PersistedLeaf<K, V>( btree, revision, nbElems );
432 
433         // Insert the borrowed element
434         newLeaf.keys[0] = new PersistedKeyHolder<K>( btree.getKeySerializer(), siblingKey );
435         if ( isNotSubTree )
436         {
437             newLeaf.values[0] = siblingValue;
438         }
439 
440         // Copy the keys and the values up to the insertion position,
441         System.arraycopy( keys, 0, newLeaf.keys, 1, pos );
442         if ( isNotSubTree )
443         {
444             System.arraycopy( values, 0, newLeaf.values, 1, pos );
445         }
446 
447         // And copy the remaining elements
448         System.arraycopy( keys, pos + 1, newLeaf.keys, pos + 1, keys.length - pos - 1 );
449         if ( isNotSubTree )
450         {
451             System.arraycopy( values, pos + 1, newLeaf.values, pos + 1, values.length - pos - 1 );
452         }
453 
454         DeleteResult<K, V> result = new BorrowedFromLeftResult<K, V>( newLeaf, newSibling, removedElement );
455 
456         // Add the copied pages to the list
457         result.addCopiedPage( this );
458         result.addCopiedPage( sibling );
459 
460         return result;
461     }
462 
463 
464     /**
465      * Borrows an element from the right sibling, creating a new sibling with one
466      * less element and creating a new page where the element to remove has been
467      * deleted and the borrowed element added on the right.
468      *
469      * @param revision The new revision for all the pages
470      * @param sibling The right sibling
471      * @param pos The position of the element to remove
472      * @return The resulting pages
473      * @throws IOException If we have an error while trying to access the page
474      */
475     private DeleteResult<K, V> borrowFromRight( Tuple<K, V> removedElement, long revision, PersistedLeaf<K, V> sibling,
476         int pos )
477         throws IOException
478     {
479         boolean isNotSubTree = ( btree.getType() != PERSISTED_SUB );
480 
481         // The sibling is on the left, borrow the rightmost element
482         K siblingKey = sibling.keys[0].getKey();
483         ValueHolder<V> siblingHolder = null;
484         if ( isNotSubTree )
485         {
486             siblingHolder = sibling.values[0];
487         }
488 
489         // Create the new sibling
490         PersistedLeaf<K, V> newSibling = new PersistedLeaf<K, V>( btree, revision, sibling.getNbElems() - 1 );
491 
492         // Copy the keys and the values from 1 to N in the new sibling
493         System.arraycopy( sibling.keys, 1, newSibling.keys, 0, sibling.nbElems - 1 );
494         if ( isNotSubTree )
495         {
496             System.arraycopy( sibling.values, 1, newSibling.values, 0, sibling.nbElems - 1 );
497         }
498 
499         // Create the new page and add the new element at the end
500         // First copy the current page, with the same size
501         PersistedLeaf<K, V> newLeaf = new PersistedLeaf<K, V>( btree, revision, nbElems );
502 
503         // Insert the borrowed element at the end
504         newLeaf.keys[nbElems - 1] = new PersistedKeyHolder<K>( btree.getKeySerializer(), siblingKey );
505         if ( isNotSubTree )
506         {
507             newLeaf.values[nbElems - 1] = siblingHolder;
508         }
509 
510         // Copy the keys and the values up to the deletion position,
511         System.arraycopy( keys, 0, newLeaf.keys, 0, pos );
512         if ( isNotSubTree )
513         {
514             System.arraycopy( values, 0, newLeaf.values, 0, pos );
515         }
516 
517         // And copy the remaining elements
518         System.arraycopy( keys, pos + 1, newLeaf.keys, pos, keys.length - pos - 1 );
519         if ( isNotSubTree )
520         {
521             System.arraycopy( values, pos + 1, newLeaf.values, pos, values.length - pos - 1 );
522         }
523 
524         DeleteResult<K, V> result = new BorrowedFromRightResult<K, V>( newLeaf, newSibling, removedElement );
525 
526         // Add the copied pages to the list
527         result.addCopiedPage( this );
528         result.addCopiedPage( sibling );
529 
530         return result;
531     }
532 
533 
534     /**
535      * Copies the elements of the current page to a new page
536      *
537      * @param keyRemoved a flag stating if the key was removed
538      * @param newLeaf The new page into which the remaining keys and values will be copied
539      * @param pos The position into the page of the element to remove
540      * @throws IOException If we have an error while trying to access the page
541      */
542     private void copyAfterRemovingElement( boolean keyRemoved, V removedValue, PersistedLeaf<K, V> newLeaf, int pos )
543         throws IOException
544     {
545         boolean isNotSubTree = ( btree.getType() != PERSISTED_SUB );
546 
547         if ( keyRemoved )
548         {
549             // Deal with the special case of a page with only one element by skipping
550             // the copy, as we won't have any remaining  element in the page
551             if ( nbElems == 1 )
552             {
553                 return;
554             }
555 
556             // Copy the keys and the values up to the insertion position
557             System.arraycopy( keys, 0, newLeaf.keys, 0, pos );
558             if ( isNotSubTree )
559             {
560                 System.arraycopy( values, 0, newLeaf.values, 0, pos );
561             }
562 
563             // And copy the elements after the position
564             System.arraycopy( keys, pos + 1, newLeaf.keys, pos, keys.length - pos - 1 );
565             if ( isNotSubTree )
566             {
567                 System.arraycopy( values, pos + 1, newLeaf.values, pos, values.length - pos - 1 );
568             }
569         }
570         else
571         // one of the many values of the same key was removed, no change in the number of keys
572         {
573             System.arraycopy( keys, 0, newLeaf.keys, 0, nbElems );
574             System.arraycopy( values, 0, newLeaf.values, 0, nbElems );
575 
576             // We still have to clone the modified value holder
577             ValueHolder<V> valueHolder = newLeaf.values[pos];
578 
579             try
580             {
581                 ValueHolder<V> newValueHolder = valueHolder.clone();
582 
583                 newValueHolder.remove( removedValue );
584 
585                 newLeaf.values[pos] = newValueHolder;
586             }
587             catch ( CloneNotSupportedException e )
588             {
589                 // TODO Auto-generated catch block
590                 e.printStackTrace();
591             }
592         }
593     }
594 
595 
596     /**
597      * {@inheritDoc}
598      */
599     public V get( K key ) throws KeyNotFoundException, IOException
600     {
601         int pos = findPos( key );
602 
603         if ( pos < 0 )
604         {
605             ValueHolder<V> valueHolder = values[-( pos + 1 )];
606 
607             ValueCursor<V> cursor = valueHolder.getCursor();
608 
609             cursor.beforeFirst();
610 
611             if ( cursor.hasNext() )
612             {
613                 V value = cursor.next();
614 
615                 return value;
616             }
617             else
618             {
619                 return null;
620             }
621         }
622         else
623         {
624             throw KeyNotFoundException.INSTANCE;
625         }
626     }
627 
628 
629     /**
630      * {@inheritDoc}
631      */
632     /* No qualifier */KeyHolder<K> getKeyHolder( int pos )
633     {
634         if ( pos < nbElems )
635         {
636             return keys[pos];
637         }
638         else
639         {
640             return null;
641         }
642     }
643 
644 
645     /**
646      * {@inheritDoc}
647      */
648     @Override
649     public ValueCursor<V> getValues( K key ) throws KeyNotFoundException, IOException, IllegalArgumentException
650     {
651         if ( !btree.isAllowDuplicates() )
652         {
653             throw new IllegalArgumentException( "Duplicates are not allowed in this tree" );
654         }
655 
656         int pos = findPos( key );
657 
658         if ( pos < 0 )
659         {
660             ValueHolder<V> valueHolder = values[-( pos + 1 )];
661 
662             return valueHolder.getCursor();
663         }
664         else
665         {
666             throw KeyNotFoundException.INSTANCE;
667         }
668     }
669 
670 
671     /**
672      * {@inheritDoc}
673      */
674     public boolean hasKey( K key )
675     {
676         int pos = findPos( key );
677 
678         if ( pos < 0 )
679         {
680             return true;
681         }
682 
683         return false;
684     }
685 
686 
687     @Override
688     public boolean contains( K key, V value ) throws IOException
689     {
690         int pos = findPos( key );
691 
692         if ( pos < 0 )
693         {
694             ValueHolder<V> valueHolder = values[-( pos + 1 )];
695 
696             return valueHolder.contains( value );
697         }
698         else
699         {
700             return false;
701         }
702     }
703 
704 
705     /**
706      * {@inheritDoc}
707      */
708     /* no qualifier */ValueHolder<V> getValue( int pos )
709     {
710         if ( pos < nbElems )
711         {
712             return values[pos];
713         }
714         else
715         {
716             return null;
717         }
718     }
719 
720 
721     /**
722      * Sets the value at a give position
723      * @param pos The position in the values array
724      * @param value the value to inject
725      */
726     /* no qualifier */void setValue( int pos, ValueHolder<V> value )
727     {
728         values[pos] = value;
729     }
730 
731 
732     /**
733      * {@inheritDoc}
734      */
735     public TupleCursor<K, V> browse( K key, ReadTransaction<K, V> transaction, ParentPos<K, V>[] stack, int depth )
736     {
737         int pos = findPos( key );
738 
739         // First use case : the leaf is empty (this is a root page)
740         if ( nbElems == 0 )
741         {
742             // We have to return an empty cursor
743             return new EmptyTupleCursor<K, V>();
744         }
745 
746         // The cursor we will return
747         TupleCursor<K, V> cursor = new TupleCursor<K, V>( transaction, stack, depth );
748 
749         // Depending on the position, we will proceed differently :
750         // 1) if the key is found in the page, the cursor will be 
751         // set to this position.
752         // 2) The key has not been found, but is in the middle of the
753         // page (ie, other keys above the one we are looking for exist),
754         // the cursor will be set to the current position
755         // 3) The key has not been found, and we are at the end of
756         // the page. We have to fetch the next key in yhe B-tree
757         if ( pos < 0 )
758         {
759             // The key has been found.
760             pos = -( pos + 1 );
761 
762             // Start at the found position in the page
763             ParentPos<K, V> parentPos = new ParentPos<K, V>( this, pos );
764 
765             // Create the value cursor
766             parentPos.valueCursor = values[pos].getCursor();
767 
768             // And store this position in the stack
769             stack[depth] = parentPos;
770 
771             return cursor;
772         }
773         else
774         {
775             // The key has not been found, there are keys above this one. 
776             // Select the value just above
777             if ( pos < nbElems )
778             {
779                 // There is at least one key above the one we are looking for.
780                 // This will be the starting point.
781                 ParentPos<K, V> parentPos = new ParentPos<K, V>( this, pos );
782 
783                 // Create the value cursor
784                 parentPos.valueCursor = values[pos].getCursor();
785 
786                 stack[depth] = parentPos;
787 
788                 return cursor;
789             }
790             else
791             {
792                 // We are at the end of a leaf. We have to see if we have other
793                 // keys on the right.
794                 if ( depth == 0 )
795                 {
796                     // No children, we are at the end of the root page
797                     stack[depth] = new ParentPos<K, V>( this, pos );
798 
799                     // As we are done, set the cursor at the end
800                     try
801                     {
802                         cursor.afterLast();
803                     }
804                     catch ( IOException e )
805                     {
806                         e.printStackTrace();
807                     }
808 
809                     return cursor;
810                 }
811                 else
812                 {
813                     // We have to find the adjacent key in the B-tree
814                     boolean isLast = true;
815                     stack[depth] = new ParentPos<K, V>( this, pos );
816 
817                     // Check each upper node, starting from the direct parent
818                     int stackIndex = depth - 1;
819 
820                     for ( int i = stackIndex; i >= 0; i-- )
821                     {
822                         if ( stack[i].pos < stack[i].page.getNbElems() )
823                         {
824                             isLast = false;
825                             break;
826                         }
827 
828                         stackIndex--;
829                     }
830 
831                     if ( isLast )
832                     {
833                         // We don't have any more elements
834                         try
835                         {
836                             cursor.afterLast();
837                         }
838                         catch ( IOException e )
839                         {
840                             e.printStackTrace();
841                         }
842 
843                         return cursor;
844                     }
845 
846                     return cursor;
847                 }
848             }
849         }
850     }
851 
852 
853     /**
854      * {@inheritDoc}
855      */
856     public TupleCursor<K, V> browse( ReadTransaction<K, V> transaction, ParentPos<K, V>[] stack, int depth )
857     {
858         int pos = 0;
859         TupleCursor<K, V> cursor = null;
860 
861         if ( nbElems == 0 )
862         {
863             // The tree is empty, it's the root, we have nothing to return
864             stack[depth] = new ParentPos<K, V>( null, -1 );
865 
866             return new TupleCursor<K, V>( transaction, stack, depth );
867         }
868         else
869         {
870             // Start at the beginning of the page
871             ParentPos<K, V> parentPos = new ParentPos<K, V>( this, pos );
872 
873             // Create the value cursor
874             parentPos.valueCursor = values[0].getCursor();
875 
876             stack[depth] = parentPos;
877 
878             cursor = new TupleCursor<K, V>( transaction, stack, depth );
879         }
880 
881         return cursor;
882     }
883 
884 
885     /**
886      * Copy the current page and all of the keys, values and children, if it's not a leaf.
887      *
888      * @param revision The new revision
889      * @param nbElems The number of elements to copy
890      * @return The copied page
891      */
892     private Page<K, V> copy( long revision, int nbElems )
893     {
894         PersistedLeaf<K, V> newLeaf = new PersistedLeaf<K, V>( btree, revision, nbElems );
895 
896         // Copy the keys and the values
897         System.arraycopy( keys, 0, newLeaf.keys, 0, nbElems );
898 
899         if ( values != null )
900         {
901             // It' not enough to copy the ValueHolder, we have to clone them
902             // as ValueHolders are mutable
903             int pos = 0;
904 
905             for ( ValueHolder<V> valueHolder : values )
906             {
907                 try
908                 {
909                     newLeaf.values[pos++] = valueHolder.clone();
910                 }
911                 catch ( CloneNotSupportedException e )
912                 {
913                     // TODO Auto-generated catch block
914                     e.printStackTrace();
915                 }
916 
917                 // Stop when we have copied nbElems values
918                 if ( pos == nbElems )
919                 {
920                     break;
921                 }
922             }
923         }
924 
925         return newLeaf;
926     }
927 
928 
929     /**
930      * Copy the current page if needed, and replace the value at the position we have found the key.
931      *
932      * @param revision The new page revision
933      * @param key The new key
934      * @param value the new value
935      * @param pos The position of the key in the page
936      * @return The copied page
937      * @throws IOException If we have an error while trying to access the page
938      */
939     private InsertResult<K, V> replaceElement( long revision, K key, V value, int pos )
940         throws IOException
941     {
942         PersistedLeaf<K, V> newLeaf = this;
943 
944         // Get the previous value from the leaf (it's a copy)
945         ValueHolder<V> valueHolder = values[pos];
946 
947         boolean valueExists = valueHolder.contains( value );
948 
949         if ( this.revision != revision )
950         {
951             // The page hasn't been modified yet, we need to copy it first
952             newLeaf = ( PersistedLeaf<K, V> ) copy( revision, nbElems );
953         }
954 
955         // Get the previous value from the leaf (it's a copy)
956         valueHolder = newLeaf.values[pos];
957         V replacedValue = null;
958 
959         if ( !valueExists && btree.isAllowDuplicates() )
960         {
961             valueHolder.add( value );
962             newLeaf.values[pos] = valueHolder;
963         }
964         else if ( valueExists && btree.isAllowDuplicates() )
965         {
966             // As strange as it sounds, we need to remove the value to reinject it.
967             // There are cases where the value retrieval just use one part of the
968             // value only (typically for LDAP Entries, where we use the DN)
969             replacedValue = valueHolder.remove( value );
970             valueHolder.add( value );
971         }
972         else if ( !btree.isAllowDuplicates() )
973         {
974             replacedValue = valueHolder.replaceValueArray( value );
975         }
976 
977         // Create the result
978         InsertResult<K, V> result = new ModifyResult<K, V>( newLeaf, replacedValue );
979         result.addCopiedPage( this );
980 
981         return result;
982     }
983 
984 
985     /**
986      * Adds a new <K, V> into a copy of the current page at a given position. We return the
987      * modified page. The new page will have one more element than the current page.
988      *
989      * @param revision The revision of the modified page
990      * @param key The key to insert
991      * @param value The value to insert
992      * @param pos The position into the page
993      * @return The modified page with the <K,V> element added
994      */
995     private Page<K, V> addElement( long revision, K key, V value, int pos )
996     {
997         // First copy the current page, but add one element in the copied page
998         PersistedLeaf<K, V> newLeaf = new PersistedLeaf<K, V>( btree, revision, nbElems + 1 );
999 
1000         // Create the value holder
1001         ValueHolder<V> valueHolder = new PersistedValueHolder<V>( btree, value );
1002 
1003         // Deal with the special case of an empty page
1004         if ( nbElems == 0 )
1005         {
1006             newLeaf.keys[0] = new PersistedKeyHolder<K>( btree.getKeySerializer(), key );
1007 
1008             newLeaf.values[0] = valueHolder;
1009         }
1010         else
1011         {
1012             // Copy the keys and the values up to the insertion position
1013             System.arraycopy( keys, 0, newLeaf.keys, 0, pos );
1014             System.arraycopy( values, 0, newLeaf.values, 0, pos );
1015 
1016             // Add the new element
1017             newLeaf.keys[pos] = new PersistedKeyHolder<K>( btree.getKeySerializer(), key );
1018             newLeaf.values[pos] = valueHolder;
1019 
1020             // And copy the remaining elements
1021             System.arraycopy( keys, pos, newLeaf.keys, pos + 1, keys.length - pos );
1022             System.arraycopy( values, pos, newLeaf.values, pos + 1, values.length - pos );
1023         }
1024 
1025         return newLeaf;
1026     }
1027 
1028 
1029     /**
1030      * Split a full page into two new pages, a left, a right and a pivot element. The new pages will
1031      * each contains half of the original elements. <br/>
1032      * The pivot will be computed, depending on the place
1033      * we will inject the newly added element. <br/>
1034      * If the newly added element is in the middle, we will use it
1035      * as a pivot. Otherwise, we will use either the last element in the left page if the element is added
1036      * on the left, or the first element in the right page if it's added on the right.
1037      *
1038      * @param revision The new revision for all the created pages
1039      * @param key The key to add
1040      * @param value The value to add
1041      * @param pos The position of the insertion of the new element
1042      * @return An OverflowPage containing the pivot, and the new left and right pages
1043      */
1044     private InsertResult<K, V> addAndSplit( long revision, K key, V value, int pos )
1045     {
1046         int middle = btree.getPageSize() >> 1;
1047         PersistedLeaf<K, V> leftLeaf = null;
1048         PersistedLeaf<K, V> rightLeaf = null;
1049         ValueHolder<V> valueHolder = new PersistedValueHolder<V>( btree, value );
1050 
1051         // Determinate where to store the new value
1052         if ( pos <= middle )
1053         {
1054             // The left page will contain the new value
1055             leftLeaf = new PersistedLeaf<K, V>( btree, revision, middle + 1 );
1056 
1057             // Copy the keys and the values up to the insertion position
1058             System.arraycopy( keys, 0, leftLeaf.keys, 0, pos );
1059             System.arraycopy( values, 0, leftLeaf.values, 0, pos );
1060 
1061             // Add the new element
1062             leftLeaf.keys[pos] = new PersistedKeyHolder<K>( btree.getKeySerializer(), key );
1063             leftLeaf.values[pos] = valueHolder;
1064 
1065             // And copy the remaining elements
1066             System.arraycopy( keys, pos, leftLeaf.keys, pos + 1, middle - pos );
1067             System.arraycopy( values, pos, leftLeaf.values, pos + 1, middle - pos );
1068 
1069             // Now, create the right page
1070             rightLeaf = new PersistedLeaf<K, V>( btree, revision, middle );
1071 
1072             // Copy the keys and the values in the right page
1073             System.arraycopy( keys, middle, rightLeaf.keys, 0, middle );
1074             System.arraycopy( values, middle, rightLeaf.values, 0, middle );
1075         }
1076         else
1077         {
1078             // Create the left page
1079             leftLeaf = new PersistedLeaf<K, V>( btree, revision, middle );
1080 
1081             // Copy all the element into the left page
1082             System.arraycopy( keys, 0, leftLeaf.keys, 0, middle );
1083             System.arraycopy( values, 0, leftLeaf.values, 0, middle );
1084 
1085             // Now, create the right page
1086             rightLeaf = new PersistedLeaf<K, V>( btree, revision, middle + 1 );
1087 
1088             int rightPos = pos - middle;
1089 
1090             // Copy the keys and the values up to the insertion position
1091             System.arraycopy( keys, middle, rightLeaf.keys, 0, rightPos );
1092             System.arraycopy( values, middle, rightLeaf.values, 0, rightPos );
1093 
1094             // Add the new element
1095             rightLeaf.keys[rightPos] = new PersistedKeyHolder<K>( btree.getKeySerializer(), key );
1096             rightLeaf.values[rightPos] = valueHolder;
1097 
1098             // And copy the remaining elements
1099             System.arraycopy( keys, pos, rightLeaf.keys, rightPos + 1, nbElems - pos );
1100             System.arraycopy( values, pos, rightLeaf.values, rightPos + 1, nbElems - pos );
1101         }
1102 
1103         // Get the pivot
1104         K pivot = rightLeaf.keys[0].getKey();
1105 
1106         // Create the result
1107         InsertResult<K, V> result = new SplitResult<K, V>( pivot, leftLeaf, rightLeaf );
1108 
1109         return result;
1110     }
1111 
1112 
1113     /**
1114      * {@inheritDoc}
1115      */
1116     public K getLeftMostKey()
1117     {
1118         return keys[0].getKey();
1119     }
1120 
1121 
1122     /**
1123      * {@inheritDoc}
1124      */
1125     public K getRightMostKey()
1126     {
1127         return keys[nbElems - 1].getKey();
1128     }
1129 
1130 
1131     /**
1132      * {@inheritDoc}
1133      */
1134     public Tuple<K, V> findLeftMost() throws IOException
1135     {
1136         K key = keys[0].getKey();
1137 
1138         boolean isSubTree = ( btree.getType() == PERSISTED_SUB );
1139 
1140         if ( isSubTree )
1141         {
1142             return new Tuple<K, V>( key, null );
1143         }
1144 
1145         ValueCursor<V> cursor = values[0].getCursor();
1146 
1147         try
1148         {
1149             cursor.beforeFirst();
1150             if ( cursor.hasNext() )
1151             {
1152                 return new Tuple<K, V>( key, cursor.next() );
1153             }
1154             else
1155             {
1156                 // Null value
1157                 return new Tuple<K, V>( key, null );
1158             }
1159         }
1160         finally
1161         {
1162             cursor.close();
1163         }
1164     }
1165 
1166 
1167     /**
1168      * {@inheritDoc}
1169      */
1170     public Tuple<K, V> findRightMost() throws EndOfFileExceededException, IOException
1171     {
1172 
1173         K key = keys[nbElems - 1].getKey();
1174 
1175         boolean isSubTree = ( btree.getType() == PERSISTED_SUB );
1176 
1177         if ( isSubTree )
1178         {
1179             return new Tuple<K, V>( key, null );
1180         }
1181 
1182         ValueCursor<V> cursor = values[nbElems - 1].getCursor();
1183 
1184         try
1185         {
1186             cursor.afterLast();
1187 
1188             if ( cursor.hasPrev() )
1189             {
1190                 return new Tuple<K, V>( key, cursor.prev() );
1191             }
1192             else
1193             {
1194                 // Null value
1195                 return new Tuple<K, V>( key, null );
1196             }
1197         }
1198         finally
1199         {
1200             cursor.close();
1201         }
1202     }
1203 
1204 
1205     /**
1206      * {@inheritDoc}
1207      */
1208     public boolean isLeaf()
1209     {
1210         return true;
1211     }
1212 
1213 
1214     /**
1215      * {@inheritDoc}
1216      */
1217     public boolean isNode()
1218     {
1219         return false;
1220     }
1221 
1222 
1223     /**
1224      * @see Object#toString()
1225      */
1226     public String toString()
1227     {
1228         StringBuilder sb = new StringBuilder();
1229 
1230         sb.append( "Leaf[" );
1231         sb.append( super.toString() );
1232 
1233         sb.append( "] -> {" );
1234 
1235         if ( nbElems > 0 )
1236         {
1237             boolean isFirst = true;
1238 
1239             for ( int i = 0; i < nbElems; i++ )
1240             {
1241                 if ( isFirst )
1242                 {
1243                     isFirst = false;
1244                 }
1245                 else
1246                 {
1247                     sb.append( ", " );
1248                 }
1249 
1250                 sb.append( "<" ).append( keys[i] ).append( "," );
1251 
1252                 if ( values != null )
1253                 {
1254                     sb.append( values[i] );
1255                 }
1256                 else
1257                 {
1258                     sb.append( "null" );
1259                 }
1260 
1261                 sb.append( ">" );
1262             }
1263         }
1264 
1265         sb.append( "}" );
1266 
1267         return sb.toString();
1268     }
1269 
1270 
1271     /**
1272      * same as {@link #addElement(long, Object, Object, int)} except the values are not copied.
1273      * This method is only used while inserting an element into a sub-BTree.
1274      */
1275     private Page<K, V> addSubTreeElement( long revision, K key, int pos )
1276     {
1277         // First copy the current page, but add one element in the copied page
1278         PersistedLeaf<K, V> newLeaf = new PersistedLeaf<K, V>( btree, revision, nbElems + 1 );
1279 
1280         // Deal with the special case of an empty page
1281         if ( nbElems == 0 )
1282         {
1283             newLeaf.keys[0] = new PersistedKeyHolder<K>( btree.getKeySerializer(), key );
1284         }
1285         else
1286         {
1287             // Copy the keys and the values up to the insertion position
1288             System.arraycopy( keys, 0, newLeaf.keys, 0, pos );
1289 
1290             // Add the new element
1291             newLeaf.keys[pos] = new PersistedKeyHolder<K>( btree.getKeySerializer(), key );
1292 
1293             // And copy the remaining elements
1294             System.arraycopy( keys, pos, newLeaf.keys, pos + 1, keys.length - pos );
1295         }
1296 
1297         return newLeaf;
1298     }
1299 
1300 
1301     /**
1302      * same as {@link #addAndSplit(long, Object, Object, int)} except the values are not copied.
1303      * This method is only used while inserting an element into a sub-BTree.
1304      */
1305     private InsertResult<K, V> addAndSplitSubTree( long revision, K key, int pos )
1306     {
1307         int middle = btree.getPageSize() >> 1;
1308         PersistedLeaf<K, V> leftLeaf = null;
1309         PersistedLeaf<K, V> rightLeaf = null;
1310 
1311         // Determinate where to store the new value
1312         if ( pos <= middle )
1313         {
1314             // The left page will contain the new value
1315             leftLeaf = new PersistedLeaf<K, V>( btree, revision, middle + 1 );
1316 
1317             // Copy the keys and the values up to the insertion position
1318             System.arraycopy( keys, 0, leftLeaf.keys, 0, pos );
1319 
1320             // Add the new element
1321             leftLeaf.keys[pos] = new PersistedKeyHolder<K>( btree.getKeySerializer(), key );
1322 
1323             // And copy the remaining elements
1324             System.arraycopy( keys, pos, leftLeaf.keys, pos + 1, middle - pos );
1325 
1326             // Now, create the right page
1327             rightLeaf = new PersistedLeaf<K, V>( btree, revision, middle );
1328 
1329             // Copy the keys and the values in the right page
1330             System.arraycopy( keys, middle, rightLeaf.keys, 0, middle );
1331         }
1332         else
1333         {
1334             // Create the left page
1335             leftLeaf = new PersistedLeaf<K, V>( btree, revision, middle );
1336 
1337             // Copy all the element into the left page
1338             System.arraycopy( keys, 0, leftLeaf.keys, 0, middle );
1339 
1340             // Now, create the right page
1341             rightLeaf = new PersistedLeaf<K, V>( btree, revision, middle + 1 );
1342 
1343             int rightPos = pos - middle;
1344 
1345             // Copy the keys and the values up to the insertion position
1346             System.arraycopy( keys, middle, rightLeaf.keys, 0, rightPos );
1347 
1348             // Add the new element
1349             rightLeaf.keys[rightPos] = new PersistedKeyHolder<K>( btree.getKeySerializer(), key );
1350 
1351             // And copy the remaining elements
1352             System.arraycopy( keys, pos, rightLeaf.keys, rightPos + 1, nbElems - pos );
1353         }
1354 
1355         // Get the pivot
1356         K pivot = rightLeaf.keys[0].getKey();
1357 
1358         // Create the result
1359         InsertResult<K, V> result = new SplitResult<K, V>( pivot, leftLeaf, rightLeaf );
1360 
1361         return result;
1362     }
1363 
1364 
1365     /**
1366      * {@inheritDoc}
1367      */
1368     public KeyCursor<K> browseKeys( ReadTransaction<K, K> transaction, ParentPos<K, K>[] stack, int depth )
1369     {
1370         int pos = 0;
1371         KeyCursor<K> cursor = null;
1372 
1373         if ( nbElems == 0 )
1374         {
1375             // The tree is empty, it's the root, we have nothing to return
1376             stack[depth] = new ParentPos<K, K>( null, -1 );
1377 
1378             return new KeyCursor<K>( transaction, stack, depth );
1379         }
1380         else
1381         {
1382             // Start at the beginning of the page
1383             ParentPos<K, K> parentPos = new ParentPos( this, pos );
1384 
1385             stack[depth] = parentPos;
1386 
1387             cursor = new KeyCursor<K>( transaction, stack, depth );
1388         }
1389 
1390         return cursor;
1391     }
1392 
1393 
1394     /**
1395      * sets the values to null
1396      * WARNING: only used by the internal API (especially during the bulk loading)
1397      */
1398     /* no qualifier */void _clearValues_()
1399     {
1400         values = null;
1401     }
1402 
1403 
1404     /**
1405      * {@inheritDoc}
1406      */
1407     public String dumpPage( String tabs )
1408     {
1409         StringBuilder sb = new StringBuilder();
1410 
1411         sb.append( tabs );
1412 
1413         if ( nbElems > 0 )
1414         {
1415             boolean isFirst = true;
1416 
1417             for ( int i = 0; i < nbElems; i++ )
1418             {
1419                 if ( isFirst )
1420                 {
1421                     isFirst = false;
1422                 }
1423                 else
1424                 {
1425                     sb.append( ", " );
1426                 }
1427 
1428                 sb.append( "<" ).append( keys[i] ).append( "," );
1429 
1430                 if ( values != null )
1431                 {
1432                     sb.append( values[i] );
1433                 }
1434                 else
1435                 {
1436                     sb.append( "null" );
1437                 }
1438 
1439                 sb.append( ">" );
1440             }
1441         }
1442 
1443         sb.append( "\n" );
1444 
1445         return sb.toString();
1446     }
1447 }