View Javadoc
1   /*
2    *   Licensed to the Apache Software Foundation (ASF) under one
3    *   or more contributor license agreements.  See the NOTICE file
4    *   distributed with this work for additional information
5    *   regarding copyright ownership.  The ASF licenses this file
6    *   to you under the Apache License, Version 2.0 (the
7    *   "License"); you may not use this file except in compliance
8    *   with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing,
13   *   software distributed under the License is distributed on an
14   *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *   KIND, either express or implied.  See the License for the
16   *   specific language governing permissions and limitations
17   *   under the License.
18   *
19   */
20  
21  package org.apache.directory.mavibot.btree;
22  
23  
24  import java.io.File;
25  import java.io.FileInputStream;
26  import java.io.FileNotFoundException;
27  import java.io.FileOutputStream;
28  import java.io.IOException;
29  import java.util.ArrayList;
30  import java.util.Arrays;
31  import java.util.Comparator;
32  import java.util.HashMap;
33  import java.util.HashSet;
34  import java.util.Iterator;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.Set;
38  import java.util.TreeMap;
39  import java.util.TreeSet;
40  
41  import org.apache.directory.mavibot.btree.comparator.IntComparator;
42  import org.apache.directory.mavibot.btree.exception.EndOfFileExceededException;
43  import org.apache.directory.mavibot.btree.exception.KeyNotFoundException;
44  import org.apache.directory.mavibot.btree.serializer.IntSerializer;
45  
46  
47  /**
48   * A class used to bulk load a BTree. It will allow the load of N elements in 
49   * a given BTree without to have to inject one by one, saving a lot of time.
50   * The second advantage is that the btree will be dense (the leaves will be
51   * complete, except the last one).
52   * This class can also be used to compact a BTree.
53   *
54   * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
55   */
56  public class BulkLoader<K, V>
57  {
58      private BulkLoader()
59      {
60      };
61  
62      static enum LevelEnum
63      {
64          LEAF,
65          NODE
66      }
67  
68      /**
69       * A private class used to store the temporary sorted file. It's used by
70       * the bulkLoader
71       */
72      private static class SortedFile
73      {
74          /** the file that contains the values */
75          private File file;
76  
77          /** The number of stored values */
78          private int nbValues;
79  
80  
81          /** A constructor for this class */
82          /*No Qualifier*/SortedFile( File file, int nbValues )
83          {
84              this.file = file;
85              this.nbValues = nbValues;
86          }
87      }
88  
89  
90      /**
91       * Process the data, and creates files to store them sorted if necessary, or store them
92       * TODO readElements.
93       *
94       * @param btree
95       * @param iterator
96       * @param sortedFiles
97       * @param tuples
98       * @param chunkSize
99       * @return
100      * @throws IOException
101      */
102     private static <K, V> int readElements( BTree<K, V> btree, Iterator<Tuple<K, V>> iterator, List<File> sortedFiles,
103         List<Tuple<K, V>> tuples, int chunkSize ) throws IOException
104     {
105         int nbRead = 0;
106         int nbIteration = 0;
107         int nbElems = 0;
108         boolean inMemory = true;
109         Set<K> keys = new HashSet<K>();
110 
111         while ( true )
112         {
113             nbIteration++;
114             tuples.clear();
115             keys.clear();
116 
117             // Read up to chukSize elements
118             while ( iterator.hasNext() && ( nbRead < chunkSize ) )
119             {
120                 Tuple<K, V> tuple = iterator.next();
121                 tuples.add( tuple );
122 
123                 if ( !keys.contains( tuple.getKey() ) )
124                 {
125                     keys.add( tuple.getKey() );
126                     nbRead++;
127                 }
128             }
129 
130             if ( nbRead < chunkSize )
131             {
132                 if ( nbIteration != 1 )
133                 {
134                     // Flush the sorted data on disk and exit
135                     inMemory = false;
136 
137                     sortedFiles.add( flushToDisk( nbIteration, tuples, btree ) );
138                 }
139 
140                 // Update the number of read elements
141                 nbElems += nbRead;
142 
143                 break;
144             }
145             else
146             {
147                 if ( !iterator.hasNext() )
148                 {
149                     // special case : we have exactly chunkSize elements in the incoming data
150                     if ( nbIteration > 1 )
151                     {
152                         // Flush the sorted data on disk and exit
153                         inMemory = false;
154                         sortedFiles.add( flushToDisk( nbIteration, tuples, btree ) );
155                     }
156 
157                     // We have read all the data in one round trip, let's get out, no need
158                     // to store the data on disk
159 
160                     // Update the number of read elements
161                     nbElems += nbRead;
162 
163                     break;
164                 }
165 
166                 // We have read chunkSize elements, we have to sort them on disk
167                 nbElems += nbRead;
168                 nbRead = 0;
169                 sortedFiles.add( flushToDisk( nbIteration, tuples, btree ) );
170             }
171         }
172 
173         if ( !inMemory )
174         {
175             tuples.clear();
176         }
177 
178         return nbElems;
179     }
180 
181 
182     /**
183      * Read all the sorted files, and inject them into one single big file containing all the 
184      * sorted and merged elements.
185      * @throws IOException 
186      */
187     private static <K, V> Tuple<Iterator<Tuple<K, Set<V>>>, SortedFile> processFiles( BTree<K, V> btree,
188         Iterator<Tuple<K, Set<V>>> dataIterator ) throws IOException
189     {
190         File file = File.createTempFile( "sortedUnique", "data" );
191         file.deleteOnExit();
192         FileOutputStream fos = new FileOutputStream( file );
193 
194         // Number of read elements
195         int nbReads = 0;
196 
197         // Flush the tuples on disk
198         while ( dataIterator.hasNext() )
199         {
200             nbReads++;
201 
202             // grab a tuple
203             Tuple<K, Set<V>> tuple = dataIterator.next();
204 
205             // Serialize the key
206             byte[] bytesKey = btree.getKeySerializer().serialize( tuple.key );
207             fos.write( IntSerializer.serialize( bytesKey.length ) );
208             fos.write( bytesKey );
209 
210             // Serialize the number of values
211             int nbValues = tuple.getValue().size();
212             fos.write( IntSerializer.serialize( nbValues ) );
213 
214             // Serialize the values
215             for ( V value : tuple.getValue() )
216             {
217                 byte[] bytesValue = btree.getValueSerializer().serialize( value );
218 
219                 // Serialize the value
220                 fos.write( IntSerializer.serialize( bytesValue.length ) );
221                 fos.write( bytesValue );
222             }
223         }
224 
225         fos.flush();
226         fos.close();
227 
228         FileInputStream fis = new FileInputStream( file );
229         Iterator<Tuple<K, Set<V>>> uniqueIterator = createUniqueFileIterator( btree, fis );
230         SortedFile sortedFile = new SortedFile( file, nbReads );
231 
232         Tuple<Iterator<Tuple<K, Set<V>>>, SortedFile> result = new Tuple<Iterator<Tuple<K, Set<V>>>, SortedFile>(
233             uniqueIterator, sortedFile );
234 
235         return result;
236     }
237 
238 
239     /**
240      * Bulk Load data into a persisted BTree
241      *
242      * @param btree The persisted BTree in which we want to load the data
243      * @param iterator The iterator over the data to bulkload
244      * @param chunkSize The number of elements we may store in memory at each iteration
245      * @throws IOException If there is a problem while processing the data
246      */
247     public static <K, V> BTree<K, V> load( BTree<K, V> btree, Iterator<Tuple<K, V>> iterator, int chunkSize )
248         throws IOException
249     {
250         if ( btree == null )
251         {
252             throw new RuntimeException( "Invalid BTree : it's null" );
253         }
254 
255         if ( iterator == null )
256         {
257             // Nothing to do...
258             return null;
259         }
260 
261         // Iterate through the elements by chunk
262         boolean inMemory = true;
263 
264         // The list of files we will use to store the sorted chunks
265         List<File> sortedFiles = new ArrayList<File>();
266 
267         // An array of chukSize tuple max
268         List<Tuple<K, V>> tuples = new ArrayList<Tuple<K, V>>( chunkSize );
269 
270         // Now, start to read all the tuples to sort them. We may use intermediate files
271         // for that purpose if we hit the threshold.
272         int nbElems = readElements( btree, iterator, sortedFiles, tuples, chunkSize );
273 
274         // If the tuple list is empty, we have to process the load based on files, not in memory
275         if ( nbElems > 0 )
276         {
277             inMemory = tuples.size() > 0;
278         }
279 
280         // Now that we have processed all the data, we can start storing them in the btree
281         Iterator<Tuple<K, Set<V>>> dataIterator = null;
282         FileInputStream[] streams = null;
283         BTree<K, V> resultBTree = null;
284 
285         if ( inMemory )
286         {
287             // Here, we have all the data in memory, no need to merge files
288             // We will build a simple iterator over the data
289             dataIterator = createTupleIterator( btree, tuples );
290             resultBTree = bulkLoad( btree, dataIterator, nbElems );
291         }
292         else
293         {
294             // We first have to build an iterator over the files
295             int nbFiles = sortedFiles.size();
296             streams = new FileInputStream[nbFiles];
297 
298             for ( int i = 0; i < nbFiles; i++ )
299             {
300                 streams[i] = new FileInputStream( sortedFiles.get( i ) );
301             }
302 
303             dataIterator = createIterator( btree, streams );
304 
305             // Process the files, and construct one single file with an iterator
306             Tuple<Iterator<Tuple<K, Set<V>>>, SortedFile> result = processFiles( btree, dataIterator );
307             resultBTree = bulkLoad( btree, result.key, result.value.nbValues );
308             result.value.file.delete();
309         }
310 
311         // Ok, we have an iterator over sorted elements, we can now load them in the 
312         // target btree.
313         // Now, close the FileInputStream, and delete them if we have some
314         if ( !inMemory )
315         {
316             int nbFiles = sortedFiles.size();
317 
318             for ( int i = 0; i < nbFiles; i++ )
319             {
320                 streams[i].close();
321                 sortedFiles.get( i ).delete();
322             }
323         }
324 
325         return resultBTree;
326     }
327 
328 
329     /**
330      * Creates a node leaf LevelInfo based on the number of elements in the lower level. We can store
331      * up to PageSize + 1 references to pages in a node.
332      */
333     /* no qualifier*/static <K, V> LevelInfo<K, V> computeLevel( BTree<K, V> btree, int nbElems, LevelEnum levelType )
334     {
335         int pageSize = btree.getPageSize();
336         int incrementNode = 0;
337 
338         if ( levelType == LevelEnum.NODE )
339         {
340             incrementNode = 1;
341         }
342 
343         LevelInfo<K, V> level = new LevelInfo<K, V>();
344         level.setType( ( levelType == LevelEnum.NODE ) );
345         level.setNbElems( nbElems );
346         level.setNbPages( nbElems / ( pageSize + incrementNode ) );
347         level.setLevelNumber( 0 );
348         level.setNbAddedElems( 0 );
349         level.setCurrentPos( 0 );
350 
351         // Create the first level page
352         if ( nbElems <= pageSize + incrementNode )
353         {
354             if ( nbElems % ( pageSize + incrementNode ) != 0 )
355             {
356                 level.setNbPages( 1 );
357             }
358 
359             level.setNbElemsLimit( nbElems );
360 
361             if ( level.isNode() )
362             {
363                 level.setCurrentPage( BTreeFactory.createNode( btree, 0L, nbElems - 1 ) );
364             }
365             else
366             {
367                 level.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, nbElems ) );
368             }
369         }
370         else
371         {
372             int remaining = nbElems % ( pageSize + incrementNode );
373 
374             if ( remaining == 0 )
375             {
376                 level.setNbElemsLimit( nbElems );
377 
378                 if ( level.isNode() )
379                 {
380                     level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize ) );
381                 }
382                 else
383                 {
384                     level.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, pageSize ) );
385                 }
386             }
387             else
388             {
389                 level.incNbPages();
390 
391                 if ( remaining < ( pageSize / 2 ) + incrementNode )
392                 {
393                     level.setNbElemsLimit( nbElems - remaining - ( pageSize + incrementNode ) );
394 
395                     if ( level.getNbElemsLimit() > 0 )
396                     {
397                         if ( level.isNode() )
398                         {
399                             level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize ) );
400                         }
401                         else
402                         {
403                             level.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, pageSize ) );
404                         }
405                     }
406                     else
407                     {
408                         if ( level.isNode() )
409                         {
410                             level
411                                 .setCurrentPage( BTreeFactory.createNode( btree, 0L, ( pageSize / 2 ) + remaining - 1 ) );
412                         }
413                         else
414                         {
415                             level.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, ( pageSize / 2 ) + remaining ) );
416                         }
417                     }
418                 }
419                 else
420                 {
421                     level.setNbElemsLimit( nbElems - remaining );
422 
423                     if ( level.isNode() )
424                     {
425                         level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize ) );
426                     }
427                     else
428                     {
429                         level.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, pageSize ) );
430                     }
431                 }
432             }
433         }
434 
435         return level;
436     }
437 
438 
439     /**
440      * Compute the number of pages necessary to store all the elements per level. The resulting list is
441      * reversed ( ie the leaves are on the left, the root page on the right.
442      */
443     /* No Qualifier */static <K, V> List<LevelInfo<K, V>> computeLevels( BTree<K, V> btree, int nbElems )
444     {
445         List<LevelInfo<K, V>> levelList = new ArrayList<LevelInfo<K, V>>();
446 
447         // Compute the leaves info
448         LevelInfo<K, V> leafLevel = computeLevel( btree, nbElems, LevelEnum.LEAF );
449 
450         levelList.add( leafLevel );
451         int nbPages = leafLevel.getNbPages();
452         int levelNumber = 1;
453 
454         while ( nbPages > 1 )
455         {
456             // Compute the Nodes info
457             LevelInfo<K, V> nodeLevel = computeLevel( btree, nbPages, LevelEnum.NODE );
458             nodeLevel.setLevelNumber( levelNumber++ );
459             levelList.add( nodeLevel );
460             nbPages = nodeLevel.getNbPages();
461         }
462 
463         return levelList;
464     }
465 
466 
467     /**
468      * Inject a tuple into a leaf
469      */
470     private static <K, V> void injectInLeaf( BTree<K, V> btree, Tuple<K, Set<V>> tuple, LevelInfo<K, V> leafLevel )
471     {
472         PersistedLeaf<K, V> leaf = ( PersistedLeaf<K, V> ) leafLevel.getCurrentPage();
473 
474         KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), tuple.getKey() );
475         leaf.setKey( leafLevel.getCurrentPos(), keyHolder );
476 
477         if ( btree.getType() != BTreeTypeEnum.PERSISTED_SUB )
478         {
479             ValueHolder<V> valueHolder = new PersistedValueHolder<V>( btree, ( V[] ) tuple.getValue().toArray() );
480             leaf.setValue( leafLevel.getCurrentPos(), valueHolder );
481         }
482 
483         leafLevel.incCurrentPos();
484     }
485 
486 
487     private static <K, V> int computeNbElemsLeaf( BTree<K, V> btree, LevelInfo<K, V> levelInfo )
488     {
489         int pageSize = btree.getPageSize();
490         int remaining = levelInfo.getNbElems() - levelInfo.getNbAddedElems();
491 
492         if ( remaining < pageSize )
493         {
494             return remaining;
495         }
496         else if ( remaining == pageSize )
497         {
498             return pageSize;
499         }
500         else if ( remaining > levelInfo.getNbElems() - levelInfo.getNbElemsLimit() )
501         {
502             return pageSize;
503         }
504         else
505         {
506             return remaining - pageSize / 2;
507         }
508     }
509 
510 
511     /**
512      * Compute the number of nodes necessary to store all the elements.
513      */
514     /* No qualifier */int computeNbElemsNode( BTree<K, V> btree, LevelInfo<K, V> levelInfo )
515     {
516         int pageSize = btree.getPageSize();
517         int remaining = levelInfo.getNbElems() - levelInfo.getNbAddedElems();
518 
519         if ( remaining < pageSize + 1 )
520         {
521             return remaining;
522         }
523         else if ( remaining == pageSize + 1 )
524         {
525             return pageSize + 1;
526         }
527         else if ( remaining > levelInfo.getNbElems() - levelInfo.getNbElemsLimit() )
528         {
529             return pageSize + 1;
530         }
531         else
532         {
533             return remaining - pageSize / 2;
534         }
535     }
536 
537 
538     /**
539      * Inject a page reference into the root page.
540      */
541     private static <K, V> void injectInRoot( BTree<K, V> btree, Page<K, V> page, PageHolder<K, V> pageHolder,
542         LevelInfo<K, V> level ) throws IOException
543     {
544         PersistedNode<K, V> node = ( PersistedNode<K, V> ) level.getCurrentPage();
545 
546         if ( ( level.getCurrentPos() == 0 ) && ( node.getPage( 0 ) == null ) )
547         {
548             node.setPageHolder( 0, pageHolder );
549             level.incNbAddedElems();
550         }
551         else
552         {
553             // Inject the pageHolder and the page leftmost key
554             node.setPageHolder( level.getCurrentPos() + 1, pageHolder );
555             KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), page.getLeftMostKey() );
556             node.setKey( level.getCurrentPos(), keyHolder );
557             level.incCurrentPos();
558             level.incNbAddedElems();
559 
560             // Check that we haven't added the last element. If so,
561             // we have to write the page on disk and update the btree
562             if ( level.getNbAddedElems() == level.getNbElems() )
563             {
564                 PageHolder<K, V> rootHolder = ( ( PersistedBTree<K, V> ) btree ).getRecordManager().writePage(
565                     btree, node, 0L );
566                 ( ( PersistedBTree<K, V> ) btree ).setRootPage( rootHolder.getValue() );
567             }
568         }
569 
570         return;
571     }
572 
573 
574     /**
575      * Inject a page reference into a Node. This method will recurse if needed.
576      */
577     private static <K, V> void injectInNode( BTree<K, V> btree, Page<K, V> page, List<LevelInfo<K, V>> levels,
578         int levelIndex )
579         throws IOException
580     {
581         int pageSize = btree.getPageSize();
582         LevelInfo<K, V> level = levels.get( levelIndex );
583         PersistedNode<K, V> node = ( PersistedNode<K, V> ) level.getCurrentPage();
584 
585         // We first have to write the page on disk
586         PageHolder<K, V> pageHolder = ( ( PersistedBTree<K, V> ) btree ).getRecordManager().writePage( btree, page, 0L );
587 
588         // First deal with a node that has less than PageSize elements at this level.
589         // It will become the root node.
590         if ( level.getNbElems() <= pageSize + 1 )
591         {
592             injectInRoot( btree, page, pageHolder, level );
593 
594             return;
595         }
596 
597         // Now, we have some parent node. We process the 3 different use case :
598         // o Full node before the limit
599         // o Node over the limit but with at least N/2 elements
600         // o Node over the limit but with elements spread into 2 nodes
601         if ( level.getNbAddedElems() < level.getNbElemsLimit() )
602         {
603             // Ok, we haven't yet reached the incomplete pages (if any).
604             // Let's add the page reference into the node
605             // There is one special case : when we are adding the very first page 
606             // reference into a node. In this case, we don't store the key
607             if ( ( level.getCurrentPos() == 0 ) && ( node.getKey( 0 ) == null ) )
608             {
609                 node.setPageHolder( 0, pageHolder );
610             }
611             else
612             {
613                 // Inject the pageHolder and the page leftmost key
614                 node.setPageHolder( level.getCurrentPos(), pageHolder );
615                 KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), page.getLeftMostKey() );
616                 node.setKey( level.getCurrentPos() - 1, keyHolder );
617             }
618 
619             // Now, increment this level nb of added elements
620             level.incCurrentPos();
621             level.incNbAddedElems();
622 
623             // Check that we haven't added the last element. If so,
624             // we have to write the page on disk and update the parent's node
625             if ( level.getNbAddedElems() == level.getNbElems() )
626             {
627                 //PageHolder<K, V> rootHolder = ( ( PersistedBTree<K, V> ) btree ).getRecordManager().writePage(
628                 //    btree, node, 0L );
629                 //( ( PersistedBTree<K, V> ) btree ).setRootPage( rootHolder.getValue() );
630                 injectInNode( btree, node, levels, levelIndex + 1 );
631 
632                 return;
633             }
634             else
635             {
636                 // Check that we haven't completed the current node, and that this is not the root node.
637                 if ( ( level.getCurrentPos() == pageSize + 1 ) && ( level.getLevelNumber() < levels.size() - 1 ) )
638                 {
639                     // yes. We have to write the node on disk, update its parent
640                     // and create a new current node
641                     injectInNode( btree, node, levels, levelIndex + 1 );
642 
643                     // The page is full, we have to create a new one, with a size depending on the remaining elements
644                     if ( level.getNbAddedElems() < level.getNbElemsLimit() )
645                     {
646                         // We haven't reached the limit, create a new full node
647                         level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize ) );
648                     }
649                     else if ( level.getNbElems() - level.getNbAddedElems() <= pageSize )
650                     {
651                         level.setCurrentPage( BTreeFactory.createNode( btree, 0L,
652                             level.getNbElems() - level.getNbAddedElems() - 1 ) );
653                     }
654                     else
655                     {
656                         level.setCurrentPage( BTreeFactory.createNode( btree, 0L, ( level.getNbElems() - 1 )
657                             - ( level.getNbAddedElems() + 1 ) - pageSize / 2 ) );
658                     }
659 
660                     level.setCurrentPos( 0 );
661                 }
662             }
663 
664             return;
665         }
666         else
667         {
668             // We are dealing with the last page or the last two pages 
669             // We can have either one single pages which can contain up to pageSize-1 elements
670             // or with two pages, the first one containing ( nbElems - limit ) - pageSize/2 elements
671             // and the second one will contain pageSize/2 elements. 
672             if ( level.getNbElems() - level.getNbElemsLimit() > pageSize )
673             {
674                 // As the remaining elements are above a page size, they will be spread across
675                 // two pages. We have two cases here, depending on the page we are filling
676                 if ( level.getNbElems() - level.getNbAddedElems() <= pageSize / 2 + 1 )
677                 {
678                     // As we have less than PageSize/2 elements to write, we are on the second page
679                     if ( ( level.getCurrentPos() == 0 ) && ( node.getKey( 0 ) == null ) )
680                     {
681                         node.setPageHolder( 0, pageHolder );
682                     }
683                     else
684                     {
685                         // Inject the pageHolder and the page leftmost key
686                         node.setPageHolder( level.getCurrentPos(), pageHolder );
687                         KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(),
688                             page.getLeftMostKey() );
689                         node.setKey( level.getCurrentPos() - 1, keyHolder );
690                     }
691 
692                     // Now, increment this level nb of added elements
693                     level.incCurrentPos();
694                     level.incNbAddedElems();
695 
696                     // Check if we are done with the page
697                     if ( level.getNbAddedElems() == level.getNbElems() )
698                     {
699                         // Yes, we have to update the parent
700                         injectInNode( btree, node, levels, levelIndex + 1 );
701                     }
702                 }
703                 else
704                 {
705                     // This is the first page 
706                     if ( ( level.getCurrentPos() == 0 ) && ( node.getKey( 0 ) == null ) )
707                     {
708                         // First element of the page
709                         node.setPageHolder( 0, pageHolder );
710                     }
711                     else
712                     {
713                         // Any other following elements
714                         // Inject the pageHolder and the page leftmost key
715                         node.setPageHolder( level.getCurrentPos(), pageHolder );
716                         KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(),
717                             page.getLeftMostKey() );
718                         node.setKey( level.getCurrentPos() - 1, keyHolder );
719                     }
720 
721                     // Now, increment this level nb of added elements
722                     level.incCurrentPos();
723                     level.incNbAddedElems();
724 
725                     // Check if we are done with the page
726                     if ( level.getCurrentPos() == node.getNbElems() + 1 )
727                     {
728                         // Yes, we have to update the parent
729                         injectInNode( btree, node, levels, levelIndex + 1 );
730 
731                         // An create a new one
732                         level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize / 2 ) );
733                         level.setCurrentPos( 0 );
734                     }
735                 }
736             }
737             else
738             {
739                 // Two cases : we don't have anything else to write, or this is a single page
740                 if ( level.getNbAddedElems() == level.getNbElems() )
741                 {
742                     // We are done with the page
743                     injectInNode( btree, node, levels, levelIndex + 1 );
744                 }
745                 else
746                 {
747                     // We have some more elements to add in  the page
748                     // This is the first page 
749                     if ( ( level.getCurrentPos() == 0 ) && ( node.getKey( 0 ) == null ) )
750                     {
751                         // First element of the page
752                         node.setPageHolder( 0, pageHolder );
753                     }
754                     else
755                     {
756                         // Any other following elements
757                         // Inject the pageHolder and the page leftmost key
758                         node.setPageHolder( level.getCurrentPos(), pageHolder );
759                         KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(),
760                             page.getLeftMostKey() );
761                         node.setKey( level.getCurrentPos() - 1, keyHolder );
762                     }
763 
764                     // Now, increment this level nb of added elements
765                     level.incCurrentPos();
766                     level.incNbAddedElems();
767 
768                     // Check if we are done with the page
769                     if ( level.getCurrentPos() == node.getNbElems() + 1 )
770                     {
771                         // Yes, we have to update the parent
772                         injectInNode( btree, node, levels, levelIndex + 1 );
773 
774                         // An create a new one
775                         level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize / 2 ) );
776                         level.setCurrentPos( 0 );
777                     }
778                 }
779 
780                 return;
781             }
782         }
783     }
784 
785 
786     private static <K, V> BTree<K, V> bulkLoadSinglePage( BTree<K, V> btree, Iterator<Tuple<K, Set<V>>> dataIterator,
787         int nbElems ) throws IOException
788     {
789         // Use the root page
790         Page<K, V> rootPage = btree.getRootPage();
791 
792         // Initialize the root page
793         ( ( AbstractPage<K, V> ) rootPage ).setNbElems( nbElems );
794         KeyHolder<K>[] keys = new KeyHolder[nbElems];
795         ValueHolder<V>[] values = new ValueHolder[nbElems];
796 
797         switch ( btree.getType() )
798         {
799             case IN_MEMORY:
800                 ( ( InMemoryLeaf<K, V> ) rootPage ).values = values;
801                 break;
802 
803             default:
804                 ( ( PersistedLeaf<K, V> ) rootPage ).values = values;
805         }
806 
807         // We first have to inject data into the page
808         int pos = 0;
809 
810         while ( dataIterator.hasNext() )
811         {
812             Tuple<K, Set<V>> tuple = dataIterator.next();
813 
814             // Store the current element in the rootPage
815             KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), tuple.getKey() );
816             keys[pos] = keyHolder;
817 
818             switch ( btree.getType() )
819             {
820                 case IN_MEMORY:
821                     ValueHolder<V> valueHolder = new InMemoryValueHolder<V>( btree, ( V[] ) tuple.getValue()
822                         .toArray() );
823                     ( ( InMemoryLeaf<K, V> ) rootPage ).values[pos] = valueHolder;
824                     break;
825 
826                 default:
827                     valueHolder = new PersistedValueHolder<V>( btree, ( V[] ) tuple.getValue()
828                         .toArray() );
829                     ( ( PersistedLeaf<K, V> ) rootPage ).values[pos] = valueHolder;
830             }
831 
832             pos++;
833         }
834 
835         // Update the rootPage
836         ( ( AbstractPage<K, V> ) rootPage ).setKeys( keys );
837 
838         // Update the btree with the nb of added elements, and write it$
839         BTreeHeader<K, V> btreeHeader = ( ( AbstractBTree<K, V> ) btree ).getBtreeHeader();
840         btreeHeader.setNbElems( nbElems );
841 
842         return btree;
843     }
844 
845 
846     /**
847      * Construct the target BTree from the sorted data. We will use the nb of elements
848      * to determinate the structure of the BTree, as it must be balanced
849      */
850     private static <K, V> BTree<K, V> bulkLoad( BTree<K, V> btree, Iterator<Tuple<K, Set<V>>> dataIterator, int nbElems )
851         throws IOException
852     {
853         int pageSize = btree.getPageSize();
854 
855         // Special case : we can store all the element sin a single page
856         if ( nbElems <= pageSize )
857         {
858             return bulkLoadSinglePage( btree, dataIterator, nbElems );
859         }
860 
861         // Ok, we will need more than one page to store the elements, which
862         // means we also will need more than one level.
863         // First, compute the needed number of levels.
864         List<LevelInfo<K, V>> levels = computeLevels( btree, nbElems );
865 
866         // Now, let's fill the levels
867         LevelInfo<K, V> leafLevel = levels.get( 0 );
868 
869         while ( dataIterator.hasNext() )
870         {
871             // let's fill page up to the point all the complete pages have been filled
872             if ( leafLevel.getNbAddedElems() < leafLevel.getNbElemsLimit() )
873             {
874                 // grab a tuple
875                 Tuple<K, Set<V>> tuple = dataIterator.next();
876 
877                 injectInLeaf( btree, tuple, leafLevel );
878                 leafLevel.incNbAddedElems();
879 
880                 // The page is completed, update the parent's node and create a new current page
881                 if ( leafLevel.getCurrentPos() == pageSize )
882                 {
883                     injectInNode( btree, leafLevel.getCurrentPage(), levels, 1 );
884 
885                     // The page is full, we have to create a new one
886                     leafLevel.setCurrentPage( BTreeFactory
887                         .createLeaf( btree, 0L, computeNbElemsLeaf( btree, leafLevel ) ) );
888                     leafLevel.setCurrentPos( 0 );
889                 }
890             }
891             else
892             {
893                 // We have to deal with uncompleted pages now (if we have any)
894                 if ( leafLevel.getNbAddedElems() == nbElems )
895                 {
896                     // First use case : we have injected all the elements in the btree : get out
897                     break;
898                 }
899 
900                 if ( nbElems - leafLevel.getNbElemsLimit() > pageSize )
901                 {
902                     // Second use case : the number of elements after the limit does not
903                     // fit in a page, that means we have to split it into
904                     // two pages
905 
906                     // First page will contain nbElems - leafLevel.nbElemsLimit - PageSize/2 elements
907                     int nbToAdd = nbElems - leafLevel.getNbElemsLimit() - pageSize / 2;
908 
909                     while ( nbToAdd > 0 )
910                     {
911                         // grab a tuple
912                         Tuple<K, Set<V>> tuple = dataIterator.next();
913 
914                         injectInLeaf( btree, tuple, leafLevel );
915                         leafLevel.incNbAddedElems();
916                         nbToAdd--;
917                     }
918 
919                     // Now inject the page into the node
920                     Page<K, V> currentPage = leafLevel.getCurrentPage();
921                     injectInNode( btree, currentPage, levels, 1 );
922 
923                     // Create a new page for the remaining elements
924                     nbToAdd = pageSize / 2;
925                     leafLevel.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, nbToAdd ) );
926                     leafLevel.setCurrentPos( 0 );
927 
928                     while ( nbToAdd > 0 )
929                     {
930                         // grab a tuple
931                         Tuple<K, Set<V>> tuple = dataIterator.next();
932 
933                         injectInLeaf( btree, tuple, leafLevel );
934                         leafLevel.incNbAddedElems();
935                         nbToAdd--;
936                     }
937 
938                     // And update the parent node
939                     Page<K, V> levelCurrentPage = leafLevel.getCurrentPage();
940                     injectInNode( btree, levelCurrentPage, levels, 1 );
941 
942                     // We are done
943                     break;
944                 }
945                 else
946                 {
947                     // Third use case : we can push all the elements in the last page.
948                     // Let's do it
949                     int nbToAdd = nbElems - leafLevel.getNbElemsLimit();
950 
951                     while ( nbToAdd > 0 )
952                     {
953                         // grab a tuple
954                         Tuple<K, Set<V>> tuple = dataIterator.next();
955 
956                         injectInLeaf( btree, tuple, leafLevel );
957                         leafLevel.incNbAddedElems();
958                         nbToAdd--;
959                     }
960 
961                     // Now inject the page into the node
962                     injectInNode( btree, leafLevel.getCurrentPage(), levels, 1 );
963 
964                     // and we are done
965                     break;
966                 }
967             }
968         }
969 
970         // Update the btree with the nb of added elements, and write it$
971         BTreeHeader<K, V> btreeHeader = ( ( AbstractBTree<K, V> ) btree ).getBtreeHeader();
972         btreeHeader.setNbElems( nbElems );
973 
974         return btree;
975     }
976 
977 
978     /**
979      * Flush a list of tuples to disk after having sorted them. In the process, we may have to gather the values
980      * for the tuples having the same keys.
981      * @throws IOException 
982      */
983     private static <K, V> File flushToDisk( int fileNb, List<Tuple<K, V>> tuples, BTree<K, V> btree )
984         throws IOException
985     {
986         // Sort the tuples. 
987         Tuple<K, Set<V>>[] sortedTuples = sort( btree, tuples );
988 
989         File file = File.createTempFile( "sorted", Integer.toString( fileNb ) );
990         file.deleteOnExit();
991         FileOutputStream fos = new FileOutputStream( file );
992 
993         // Flush the tuples on disk
994         for ( Tuple<K, Set<V>> tuple : sortedTuples )
995         {
996             // Serialize the key
997             byte[] bytesKey = btree.getKeySerializer().serialize( tuple.key );
998             fos.write( IntSerializer.serialize( bytesKey.length ) );
999             fos.write( bytesKey );
1000 
1001             // Serialize the number of values
1002             int nbValues = tuple.getValue().size();
1003             fos.write( IntSerializer.serialize( nbValues ) );
1004 
1005             // Serialize the values
1006             for ( V value : tuple.getValue() )
1007             {
1008                 byte[] bytesValue = btree.getValueSerializer().serialize( value );
1009 
1010                 // Serialize the value
1011                 fos.write( IntSerializer.serialize( bytesValue.length ) );
1012                 fos.write( bytesValue );
1013             }
1014         }
1015 
1016         fos.flush();
1017         fos.close();
1018 
1019         return file;
1020     }
1021 
1022 
1023     /**
1024      * Sort a list of tuples, eliminating the duplicate keys and storing the values in a set when we 
1025      * have a duplicate key
1026      */
1027     private static <K, V> Tuple<K, Set<V>>[] sort( BTree<K, V> btree, List<Tuple<K, V>> tuples )
1028     {
1029         Comparator<Tuple<K, Set<V>>> tupleComparator = new TupleComparator( btree.getKeyComparator(), btree
1030             .getValueComparator() );
1031 
1032         // Sort the list
1033         Tuple<K, V>[] tuplesArray = ( Tuple<K, V>[] ) tuples.toArray( new Tuple[]
1034             {} );
1035 
1036         // First, eliminate the equals keys. We use a map for that
1037         Map<K, Set<V>> mapTuples = new HashMap<K, Set<V>>();
1038 
1039         for ( Tuple<K, V> tuple : tuplesArray )
1040         {
1041             // Is the key present in the map ?
1042             Set<V> foundSet = mapTuples.get( tuple.key );
1043 
1044             if ( foundSet != null )
1045             {
1046                 // We already have had such a key, add the value to the existing key
1047                 foundSet.add( tuple.value );
1048             }
1049             else
1050             {
1051                 // No such key present in the map : create a new set to store the values,
1052                 // and add it in the map associated with the new key
1053                 Set<V> set = new TreeSet<V>();
1054                 set.add( tuple.value );
1055                 mapTuples.put( tuple.key, set );
1056             }
1057         }
1058 
1059         // Now, sort the map, by extracting all the key/values from the map
1060         int size = mapTuples.size();
1061         Tuple<K, Set<V>>[] sortedTuples = new Tuple[size];
1062         int pos = 0;
1063 
1064         // We create an array containing all the elements
1065         for ( Map.Entry<K, Set<V>> entry : mapTuples.entrySet() )
1066         {
1067             sortedTuples[pos] = new Tuple<K, Set<V>>();
1068             sortedTuples[pos].key = entry.getKey();
1069             sortedTuples[pos].value = entry.getValue();
1070             pos++;
1071         }
1072 
1073         // And we sort the array
1074         Arrays.sort( sortedTuples, tupleComparator );
1075 
1076         return sortedTuples;
1077     }
1078 
1079 
1080     /**
1081      * Build an iterator over an array of sorted tuples, in memory
1082      */
1083     private static <K, V> Iterator<Tuple<K, Set<V>>> createTupleIterator( BTree<K, V> btree, List<Tuple<K, V>> tuples )
1084     {
1085         final Tuple<K, Set<V>>[] sortedTuples = sort( btree, tuples );
1086 
1087         Iterator<Tuple<K, Set<V>>> tupleIterator = new Iterator<Tuple<K, Set<V>>>()
1088         {
1089             private int pos = 0;
1090 
1091 
1092             @Override
1093             public Tuple<K, Set<V>> next()
1094             {
1095                 // Return the current tuple, if any
1096                 if ( pos < sortedTuples.length )
1097                 {
1098                     Tuple<K, Set<V>> tuple = sortedTuples[pos];
1099                     pos++;
1100 
1101                     return tuple;
1102                 }
1103 
1104                 return null;
1105             }
1106 
1107 
1108             @Override
1109             public boolean hasNext()
1110             {
1111                 return pos < sortedTuples.length;
1112             }
1113 
1114 
1115             @Override
1116             public void remove()
1117             {
1118             }
1119         };
1120 
1121         return tupleIterator;
1122     }
1123 
1124 
1125     private static <K, V> Tuple<K, Set<V>> fetchTuple( BTree<K, V> btree, FileInputStream fis )
1126     {
1127         try
1128         {
1129             if ( fis.available() == 0 )
1130             {
1131                 return null;
1132             }
1133 
1134             Tuple<K, Set<V>> tuple = new Tuple<K, Set<V>>();
1135             tuple.value = new TreeSet<V>();
1136 
1137             byte[] intBytes = new byte[4];
1138 
1139             // Read the key length
1140             fis.read( intBytes );
1141             int keyLength = IntSerializer.deserialize( intBytes );
1142 
1143             // Read the key
1144             byte[] keyBytes = new byte[keyLength];
1145             fis.read( keyBytes );
1146             K key = btree.getKeySerializer().fromBytes( keyBytes );
1147             tuple.key = key;
1148 
1149             // get the number of values
1150             fis.read( intBytes );
1151             int nbValues = IntSerializer.deserialize( intBytes );
1152 
1153             // Serialize the values
1154             for ( int i = 0; i < nbValues; i++ )
1155             {
1156                 // Read the value length
1157                 fis.read( intBytes );
1158                 int valueLength = IntSerializer.deserialize( intBytes );
1159 
1160                 // Read the value
1161                 byte[] valueBytes = new byte[valueLength];
1162                 fis.read( valueBytes );
1163                 V value = btree.getValueSerializer().fromBytes( valueBytes );
1164                 tuple.value.add( value );
1165             }
1166 
1167             return tuple;
1168         }
1169         catch ( IOException ioe )
1170         {
1171             return null;
1172         }
1173     }
1174 
1175 
1176     /**
1177      * Build an iterator over an array of sorted tuples, from files on the disk
1178      * @throws FileNotFoundException 
1179      */
1180     private static <K, V> Iterator<Tuple<K, Set<V>>> createIterator( final BTree<K, V> btree,
1181         final FileInputStream[] streams )
1182         throws FileNotFoundException
1183     {
1184         // The number of files we have to read from
1185         final int nbFiles = streams.length;
1186 
1187         // We will read only one element at a time from each file
1188         final Tuple<K, Set<V>>[] readTuples = new Tuple[nbFiles];
1189         final TreeMap<Tuple<K, Integer>, Set<V>> candidates =
1190             new TreeMap<Tuple<K, Integer>, Set<V>>(
1191                 new TupleComparator<K, Integer>( btree.getKeyComparator(), IntComparator.INSTANCE ) );
1192 
1193         // Read the tuple from each files
1194         for ( int i = 0; i < nbFiles; i++ )
1195         {
1196             while ( true )
1197             {
1198                 readTuples[i] = fetchTuple( btree, streams[i] );
1199 
1200                 if ( readTuples[i] != null )
1201                 {
1202                     Tuple<K, Integer> candidate = new Tuple<K, Integer>( readTuples[i].key, i, btree.getKeySerializer()
1203                         .getComparator() );
1204 
1205                     if ( !candidates.containsKey( candidate ) )
1206                     {
1207                         candidates.put( candidate, readTuples[i].getValue() );
1208                         break;
1209                     }
1210                     else
1211                     {
1212                         // We have to merge the pulled tuple with the existing one, and read one more tuple
1213                         Set<V> oldValues = candidates.get( candidate );
1214                         oldValues.addAll( readTuples[i].getValue() );
1215                     }
1216                 }
1217                 else
1218                 {
1219                     break;
1220                 }
1221             }
1222         }
1223 
1224         Iterator<Tuple<K, Set<V>>> tupleIterator = new Iterator<Tuple<K, Set<V>>>()
1225         {
1226             @Override
1227             public Tuple<K, Set<V>> next()
1228             {
1229                 // Get the first candidate
1230                 Tuple<K, Integer> tupleCandidate = candidates.firstKey();
1231 
1232                 // Remove it from the set
1233                 candidates.remove( tupleCandidate );
1234 
1235                 // Get the the next tuple from the stream we just got the tuple from
1236                 Tuple<K, Set<V>> tuple = readTuples[tupleCandidate.value];
1237 
1238                 // fetch the next tuple from the file we just read teh candidate from 
1239                 while ( true )
1240                 {
1241                     // fetch it from the disk and store it into its reader
1242                     readTuples[tupleCandidate.value] = fetchTuple( btree, streams[tupleCandidate.value] );
1243 
1244                     if ( readTuples[tupleCandidate.value] == null )
1245                     {
1246                         // No more tuple for this file
1247                         break;
1248                     }
1249 
1250                     if ( readTuples[tupleCandidate.value] != null )
1251                     {
1252                         // And store it into the candidate set
1253                         Set<V> oldValues = candidates.get( readTuples[tupleCandidate.value] );
1254 
1255                         if ( oldValues != null )
1256                         {
1257                             // We already have another element with the same key, merge them
1258                             oldValues.addAll( readTuples[tupleCandidate.value].value );
1259                         }
1260                         else
1261                         {
1262                             Tuple<K, Integer> newTuple = new Tuple<K, Integer>( readTuples[tupleCandidate.value].key,
1263                                 tupleCandidate.value );
1264                             candidates.put( newTuple, readTuples[tupleCandidate.value].getValue() );
1265 
1266                             // and exit the loop
1267                             break;
1268                         }
1269                     }
1270                 }
1271 
1272                 // We can now return the found value
1273                 return tuple;
1274             }
1275 
1276 
1277             @Override
1278             public boolean hasNext()
1279             {
1280                 // Check that we have at least one element to read
1281                 return !candidates.isEmpty();
1282             }
1283 
1284 
1285             @Override
1286             public void remove()
1287             {
1288             }
1289 
1290         };
1291 
1292         return tupleIterator;
1293     }
1294 
1295 
1296     /**
1297      * Build an iterator over an array of sorted tuples, from files on the disk
1298      * @throws FileNotFoundException 
1299      */
1300     private static <K, V> Iterator<Tuple<K, Set<V>>> createUniqueFileIterator( final BTree<K, V> btree,
1301         final FileInputStream stream )
1302         throws FileNotFoundException
1303     {
1304         Iterator<Tuple<K, Set<V>>> tupleIterator = new Iterator<Tuple<K, Set<V>>>()
1305         {
1306             boolean hasNext = true;
1307 
1308 
1309             @Override
1310             public Tuple<K, Set<V>> next()
1311             {
1312                 // Get the tuple from the stream
1313                 Tuple<K, Set<V>> tuple = fetchTuple( btree, stream );
1314 
1315                 // We can now return the found value
1316                 return tuple;
1317             }
1318 
1319 
1320             @Override
1321             public boolean hasNext()
1322             {
1323                 // Check that we have at least one element to read
1324                 try
1325                 {
1326                     return stream.available() > 0;
1327                 }
1328                 catch ( IOException e )
1329                 {
1330                     return false;
1331                 }
1332             }
1333 
1334 
1335             @Override
1336             public void remove()
1337             {
1338             }
1339 
1340         };
1341 
1342         return tupleIterator;
1343     }
1344 
1345 
1346     /**
1347      * Compact a given persisted BTree, making it dense. All the values will be stored 
1348      * in newly created pages, each one of them containing as much elements
1349      * as it's size.
1350      * </br>
1351      * The RecordManager will be stopped and restarted, do not use this method
1352      * on a running BTree.
1353      *
1354      * @param recordManager The associated recordManager
1355      * @param btree The BTree to compact
1356      */
1357     public static void compact( RecordManager recordManager, BTree<?, ?> btree )
1358     {
1359 
1360     }
1361 
1362 
1363     /**
1364      * Compact a given in-memory BTree, making it dense. All the values will be stored 
1365      * in newly created pages, each one of them containing as much elements
1366      * as it's size.
1367      * </br>
1368      *
1369      * @param btree The BTree to compact
1370      * @throws KeyNotFoundException 
1371      * @throws IOException 
1372      */
1373     public static BTree<?, ?> compact( BTree<?, ?> btree ) throws IOException, KeyNotFoundException
1374     {
1375         // First, create a new BTree which will contain all the elements
1376         InMemoryBTreeConfiguration configuration = new InMemoryBTreeConfiguration();
1377         configuration.setName( btree.getName() );
1378         configuration.setPageSize( btree.getPageSize() );
1379         configuration.setKeySerializer( btree.getKeySerializer() );
1380         configuration.setValueSerializer( btree.getValueSerializer() );
1381         configuration.setAllowDuplicates( btree.isAllowDuplicates() );
1382         configuration.setReadTimeOut( btree.getReadTimeOut() );
1383         configuration.setWriteBufferSize( btree.getWriteBufferSize() );
1384 
1385         File file = ( ( InMemoryBTree ) btree ).getFile();
1386 
1387         if ( file != null )
1388         {
1389             configuration.setFilePath( file.getPath() );
1390         }
1391 
1392         // Create a new Btree Builder
1393         InMemoryBTreeBuilder btreeBuilder = new InMemoryBTreeBuilder( configuration );
1394 
1395         // Create a cursor over the existing btree
1396         final TupleCursor cursor = btree.browse();
1397 
1398         // Create an iterator that will iterate the existing btree
1399         Iterator<Tuple> tupleItr = new Iterator<Tuple>()
1400         {
1401             @Override
1402             public Tuple next()
1403             {
1404                 try
1405                 {
1406                     return cursor.next();
1407                 }
1408                 catch ( EndOfFileExceededException e )
1409                 {
1410                     return null;
1411                 }
1412                 catch ( IOException e )
1413                 {
1414                     return null;
1415                 }
1416             }
1417 
1418 
1419             @Override
1420             public boolean hasNext()
1421             {
1422                 try
1423                 {
1424                     return cursor.hasNext();
1425                 }
1426                 catch ( EndOfFileExceededException e )
1427                 {
1428                     return false;
1429                 }
1430                 catch ( IOException e )
1431                 {
1432                     return false;
1433                 }
1434             }
1435 
1436 
1437             @Override
1438             public void remove()
1439             {
1440             }
1441         };
1442 
1443         // And finally, compact the btree
1444         return btreeBuilder.build( tupleItr );
1445     }
1446 }