001/*
002 *   Licensed to the Apache Software Foundation (ASF) under one
003 *   or more contributor license agreements.  See the NOTICE file
004 *   distributed with this work for additional information
005 *   regarding copyright ownership.  The ASF licenses this file
006 *   to you under the Apache License, Version 2.0 (the
007 *   "License"); you may not use this file except in compliance
008 *   with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *   Unless required by applicable law or agreed to in writing,
013 *   software distributed under the License is distributed on an
014 *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *   KIND, either express or implied.  See the License for the
016 *   specific language governing permissions and limitations
017 *   under the License.
018 *
019 */
020
021package org.apache.directory.mavibot.btree;
022
023
024import java.io.File;
025import java.io.FileInputStream;
026import java.io.FileNotFoundException;
027import java.io.FileOutputStream;
028import java.io.IOException;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Comparator;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.Iterator;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.TreeMap;
039import java.util.TreeSet;
040
041import org.apache.directory.mavibot.btree.comparator.IntComparator;
042import org.apache.directory.mavibot.btree.exception.EndOfFileExceededException;
043import org.apache.directory.mavibot.btree.exception.KeyNotFoundException;
044import org.apache.directory.mavibot.btree.serializer.IntSerializer;
045
046
047/**
048 * A class used to bulk load a BTree. It will allow the load of N elements in 
049 * a given BTree without to have to inject one by one, saving a lot of time.
050 * The second advantage is that the btree will be dense (the leaves will be
051 * complete, except the last one).
052 * This class can also be used to compact a BTree.
053 *
054 * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
055 */
056public class BulkLoader<K, V>
057{
058    private BulkLoader()
059    {
060    };
061
062    static enum LevelEnum
063    {
064        LEAF,
065        NODE
066    }
067
068    /**
069     * A private class used to store the temporary sorted file. It's used by
070     * the bulkLoader
071     */
072    private static class SortedFile
073    {
074        /** the file that contains the values */
075        private File file;
076
077        /** The number of stored values */
078        private int nbValues;
079
080
081        /** A constructor for this class */
082        /*No Qualifier*/SortedFile( File file, int nbValues )
083        {
084            this.file = file;
085            this.nbValues = nbValues;
086        }
087    }
088
089
090    /**
091     * Process the data, and creates files to store them sorted if necessary, or store them
092     * TODO readElements.
093     *
094     * @param btree
095     * @param iterator
096     * @param sortedFiles
097     * @param tuples
098     * @param chunkSize
099     * @return
100     * @throws IOException
101     */
102    private static <K, V> int readElements( BTree<K, V> btree, Iterator<Tuple<K, V>> iterator, List<File> sortedFiles,
103        List<Tuple<K, V>> tuples, int chunkSize ) throws IOException
104    {
105        int nbRead = 0;
106        int nbIteration = 0;
107        int nbElems = 0;
108        boolean inMemory = true;
109        Set<K> keys = new HashSet<K>();
110
111        while ( true )
112        {
113            nbIteration++;
114            tuples.clear();
115            keys.clear();
116
117            // Read up to chukSize elements
118            while ( iterator.hasNext() && ( nbRead < chunkSize ) )
119            {
120                Tuple<K, V> tuple = iterator.next();
121                tuples.add( tuple );
122
123                if ( !keys.contains( tuple.getKey() ) )
124                {
125                    keys.add( tuple.getKey() );
126                    nbRead++;
127                }
128            }
129
130            if ( nbRead < chunkSize )
131            {
132                if ( nbIteration != 1 )
133                {
134                    // Flush the sorted data on disk and exit
135                    inMemory = false;
136
137                    sortedFiles.add( flushToDisk( nbIteration, tuples, btree ) );
138                }
139
140                // Update the number of read elements
141                nbElems += nbRead;
142
143                break;
144            }
145            else
146            {
147                if ( !iterator.hasNext() )
148                {
149                    // special case : we have exactly chunkSize elements in the incoming data
150                    if ( nbIteration > 1 )
151                    {
152                        // Flush the sorted data on disk and exit
153                        inMemory = false;
154                        sortedFiles.add( flushToDisk( nbIteration, tuples, btree ) );
155                    }
156
157                    // We have read all the data in one round trip, let's get out, no need
158                    // to store the data on disk
159
160                    // Update the number of read elements
161                    nbElems += nbRead;
162
163                    break;
164                }
165
166                // We have read chunkSize elements, we have to sort them on disk
167                nbElems += nbRead;
168                nbRead = 0;
169                sortedFiles.add( flushToDisk( nbIteration, tuples, btree ) );
170            }
171        }
172
173        if ( !inMemory )
174        {
175            tuples.clear();
176        }
177
178        return nbElems;
179    }
180
181
182    /**
183     * Read all the sorted files, and inject them into one single big file containing all the 
184     * sorted and merged elements.
185     * @throws IOException 
186     */
187    private static <K, V> Tuple<Iterator<Tuple<K, Set<V>>>, SortedFile> processFiles( BTree<K, V> btree,
188        Iterator<Tuple<K, Set<V>>> dataIterator ) throws IOException
189    {
190        File file = File.createTempFile( "sortedUnique", "data" );
191        file.deleteOnExit();
192        FileOutputStream fos = new FileOutputStream( file );
193
194        // Number of read elements
195        int nbReads = 0;
196
197        // Flush the tuples on disk
198        while ( dataIterator.hasNext() )
199        {
200            nbReads++;
201
202            // grab a tuple
203            Tuple<K, Set<V>> tuple = dataIterator.next();
204
205            // Serialize the key
206            byte[] bytesKey = btree.getKeySerializer().serialize( tuple.key );
207            fos.write( IntSerializer.serialize( bytesKey.length ) );
208            fos.write( bytesKey );
209
210            // Serialize the number of values
211            int nbValues = tuple.getValue().size();
212            fos.write( IntSerializer.serialize( nbValues ) );
213
214            // Serialize the values
215            for ( V value : tuple.getValue() )
216            {
217                byte[] bytesValue = btree.getValueSerializer().serialize( value );
218
219                // Serialize the value
220                fos.write( IntSerializer.serialize( bytesValue.length ) );
221                fos.write( bytesValue );
222            }
223        }
224
225        fos.flush();
226        fos.close();
227
228        FileInputStream fis = new FileInputStream( file );
229        Iterator<Tuple<K, Set<V>>> uniqueIterator = createUniqueFileIterator( btree, fis );
230        SortedFile sortedFile = new SortedFile( file, nbReads );
231
232        Tuple<Iterator<Tuple<K, Set<V>>>, SortedFile> result = new Tuple<Iterator<Tuple<K, Set<V>>>, SortedFile>(
233            uniqueIterator, sortedFile );
234
235        return result;
236    }
237
238
239    /**
240     * Bulk Load data into a persisted BTree
241     *
242     * @param btree The persisted BTree in which we want to load the data
243     * @param iterator The iterator over the data to bulkload
244     * @param chunkSize The number of elements we may store in memory at each iteration
245     * @throws IOException If there is a problem while processing the data
246     */
247    public static <K, V> BTree<K, V> load( BTree<K, V> btree, Iterator<Tuple<K, V>> iterator, int chunkSize )
248        throws IOException
249    {
250        if ( btree == null )
251        {
252            throw new RuntimeException( "Invalid BTree : it's null" );
253        }
254
255        if ( iterator == null )
256        {
257            // Nothing to do...
258            return null;
259        }
260
261        // Iterate through the elements by chunk
262        boolean inMemory = true;
263
264        // The list of files we will use to store the sorted chunks
265        List<File> sortedFiles = new ArrayList<File>();
266
267        // An array of chukSize tuple max
268        List<Tuple<K, V>> tuples = new ArrayList<Tuple<K, V>>( chunkSize );
269
270        // Now, start to read all the tuples to sort them. We may use intermediate files
271        // for that purpose if we hit the threshold.
272        int nbElems = readElements( btree, iterator, sortedFiles, tuples, chunkSize );
273
274        // If the tuple list is empty, we have to process the load based on files, not in memory
275        if ( nbElems > 0 )
276        {
277            inMemory = tuples.size() > 0;
278        }
279
280        // Now that we have processed all the data, we can start storing them in the btree
281        Iterator<Tuple<K, Set<V>>> dataIterator = null;
282        FileInputStream[] streams = null;
283        BTree<K, V> resultBTree = null;
284
285        if ( inMemory )
286        {
287            // Here, we have all the data in memory, no need to merge files
288            // We will build a simple iterator over the data
289            dataIterator = createTupleIterator( btree, tuples );
290            resultBTree = bulkLoad( btree, dataIterator, nbElems );
291        }
292        else
293        {
294            // We first have to build an iterator over the files
295            int nbFiles = sortedFiles.size();
296            streams = new FileInputStream[nbFiles];
297
298            for ( int i = 0; i < nbFiles; i++ )
299            {
300                streams[i] = new FileInputStream( sortedFiles.get( i ) );
301            }
302
303            dataIterator = createIterator( btree, streams );
304
305            // Process the files, and construct one single file with an iterator
306            Tuple<Iterator<Tuple<K, Set<V>>>, SortedFile> result = processFiles( btree, dataIterator );
307            resultBTree = bulkLoad( btree, result.key, result.value.nbValues );
308            result.value.file.delete();
309        }
310
311        // Ok, we have an iterator over sorted elements, we can now load them in the 
312        // target btree.
313        // Now, close the FileInputStream, and delete them if we have some
314        if ( !inMemory )
315        {
316            int nbFiles = sortedFiles.size();
317
318            for ( int i = 0; i < nbFiles; i++ )
319            {
320                streams[i].close();
321                sortedFiles.get( i ).delete();
322            }
323        }
324
325        return resultBTree;
326    }
327
328
329    /**
330     * Creates a node leaf LevelInfo based on the number of elements in the lower level. We can store
331     * up to PageSize + 1 references to pages in a node.
332     */
333    /* no qualifier*/static <K, V> LevelInfo<K, V> computeLevel( BTree<K, V> btree, int nbElems, LevelEnum levelType )
334    {
335        int pageSize = btree.getPageSize();
336        int incrementNode = 0;
337
338        if ( levelType == LevelEnum.NODE )
339        {
340            incrementNode = 1;
341        }
342
343        LevelInfo<K, V> level = new LevelInfo<K, V>();
344        level.setType( ( levelType == LevelEnum.NODE ) );
345        level.setNbElems( nbElems );
346        level.setNbPages( nbElems / ( pageSize + incrementNode ) );
347        level.setLevelNumber( 0 );
348        level.setNbAddedElems( 0 );
349        level.setCurrentPos( 0 );
350
351        // Create the first level page
352        if ( nbElems <= pageSize + incrementNode )
353        {
354            if ( nbElems % ( pageSize + incrementNode ) != 0 )
355            {
356                level.setNbPages( 1 );
357            }
358
359            level.setNbElemsLimit( nbElems );
360
361            if ( level.isNode() )
362            {
363                level.setCurrentPage( BTreeFactory.createNode( btree, 0L, nbElems - 1 ) );
364            }
365            else
366            {
367                level.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, nbElems ) );
368            }
369        }
370        else
371        {
372            int remaining = nbElems % ( pageSize + incrementNode );
373
374            if ( remaining == 0 )
375            {
376                level.setNbElemsLimit( nbElems );
377
378                if ( level.isNode() )
379                {
380                    level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize ) );
381                }
382                else
383                {
384                    level.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, pageSize ) );
385                }
386            }
387            else
388            {
389                level.incNbPages();
390
391                if ( remaining < ( pageSize / 2 ) + incrementNode )
392                {
393                    level.setNbElemsLimit( nbElems - remaining - ( pageSize + incrementNode ) );
394
395                    if ( level.getNbElemsLimit() > 0 )
396                    {
397                        if ( level.isNode() )
398                        {
399                            level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize ) );
400                        }
401                        else
402                        {
403                            level.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, pageSize ) );
404                        }
405                    }
406                    else
407                    {
408                        if ( level.isNode() )
409                        {
410                            level
411                                .setCurrentPage( BTreeFactory.createNode( btree, 0L, ( pageSize / 2 ) + remaining - 1 ) );
412                        }
413                        else
414                        {
415                            level.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, ( pageSize / 2 ) + remaining ) );
416                        }
417                    }
418                }
419                else
420                {
421                    level.setNbElemsLimit( nbElems - remaining );
422
423                    if ( level.isNode() )
424                    {
425                        level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize ) );
426                    }
427                    else
428                    {
429                        level.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, pageSize ) );
430                    }
431                }
432            }
433        }
434
435        return level;
436    }
437
438
439    /**
440     * Compute the number of pages necessary to store all the elements per level. The resulting list is
441     * reversed ( ie the leaves are on the left, the root page on the right.
442     */
443    /* No Qualifier */static <K, V> List<LevelInfo<K, V>> computeLevels( BTree<K, V> btree, int nbElems )
444    {
445        List<LevelInfo<K, V>> levelList = new ArrayList<LevelInfo<K, V>>();
446
447        // Compute the leaves info
448        LevelInfo<K, V> leafLevel = computeLevel( btree, nbElems, LevelEnum.LEAF );
449
450        levelList.add( leafLevel );
451        int nbPages = leafLevel.getNbPages();
452        int levelNumber = 1;
453
454        while ( nbPages > 1 )
455        {
456            // Compute the Nodes info
457            LevelInfo<K, V> nodeLevel = computeLevel( btree, nbPages, LevelEnum.NODE );
458            nodeLevel.setLevelNumber( levelNumber++ );
459            levelList.add( nodeLevel );
460            nbPages = nodeLevel.getNbPages();
461        }
462
463        return levelList;
464    }
465
466
467    /**
468     * Inject a tuple into a leaf
469     */
470    private static <K, V> void injectInLeaf( BTree<K, V> btree, Tuple<K, Set<V>> tuple, LevelInfo<K, V> leafLevel )
471    {
472        PersistedLeaf<K, V> leaf = ( PersistedLeaf<K, V> ) leafLevel.getCurrentPage();
473
474        KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), tuple.getKey() );
475        leaf.setKey( leafLevel.getCurrentPos(), keyHolder );
476
477        if ( btree.getType() != BTreeTypeEnum.PERSISTED_SUB )
478        {
479            ValueHolder<V> valueHolder = new PersistedValueHolder<V>( btree, ( V[] ) tuple.getValue().toArray() );
480            leaf.setValue( leafLevel.getCurrentPos(), valueHolder );
481        }
482
483        leafLevel.incCurrentPos();
484    }
485
486
487    private static <K, V> int computeNbElemsLeaf( BTree<K, V> btree, LevelInfo<K, V> levelInfo )
488    {
489        int pageSize = btree.getPageSize();
490        int remaining = levelInfo.getNbElems() - levelInfo.getNbAddedElems();
491
492        if ( remaining < pageSize )
493        {
494            return remaining;
495        }
496        else if ( remaining == pageSize )
497        {
498            return pageSize;
499        }
500        else if ( remaining > levelInfo.getNbElems() - levelInfo.getNbElemsLimit() )
501        {
502            return pageSize;
503        }
504        else
505        {
506            return remaining - pageSize / 2;
507        }
508    }
509
510
511    /**
512     * Compute the number of nodes necessary to store all the elements.
513     */
514    /* No qualifier */int computeNbElemsNode( BTree<K, V> btree, LevelInfo<K, V> levelInfo )
515    {
516        int pageSize = btree.getPageSize();
517        int remaining = levelInfo.getNbElems() - levelInfo.getNbAddedElems();
518
519        if ( remaining < pageSize + 1 )
520        {
521            return remaining;
522        }
523        else if ( remaining == pageSize + 1 )
524        {
525            return pageSize + 1;
526        }
527        else if ( remaining > levelInfo.getNbElems() - levelInfo.getNbElemsLimit() )
528        {
529            return pageSize + 1;
530        }
531        else
532        {
533            return remaining - pageSize / 2;
534        }
535    }
536
537
538    /**
539     * Inject a page reference into the root page.
540     */
541    private static <K, V> void injectInRoot( BTree<K, V> btree, Page<K, V> page, PageHolder<K, V> pageHolder,
542        LevelInfo<K, V> level ) throws IOException
543    {
544        PersistedNode<K, V> node = ( PersistedNode<K, V> ) level.getCurrentPage();
545
546        if ( ( level.getCurrentPos() == 0 ) && ( node.getPage( 0 ) == null ) )
547        {
548            node.setPageHolder( 0, pageHolder );
549            level.incNbAddedElems();
550        }
551        else
552        {
553            // Inject the pageHolder and the page leftmost key
554            node.setPageHolder( level.getCurrentPos() + 1, pageHolder );
555            KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), page.getLeftMostKey() );
556            node.setKey( level.getCurrentPos(), keyHolder );
557            level.incCurrentPos();
558            level.incNbAddedElems();
559
560            // Check that we haven't added the last element. If so,
561            // we have to write the page on disk and update the btree
562            if ( level.getNbAddedElems() == level.getNbElems() )
563            {
564                PageHolder<K, V> rootHolder = ( ( PersistedBTree<K, V> ) btree ).getRecordManager().writePage(
565                    btree, node, 0L );
566                ( ( PersistedBTree<K, V> ) btree ).setRootPage( rootHolder.getValue() );
567            }
568        }
569
570        return;
571    }
572
573
574    /**
575     * Inject a page reference into a Node. This method will recurse if needed.
576     */
577    private static <K, V> void injectInNode( BTree<K, V> btree, Page<K, V> page, List<LevelInfo<K, V>> levels,
578        int levelIndex )
579        throws IOException
580    {
581        int pageSize = btree.getPageSize();
582        LevelInfo<K, V> level = levels.get( levelIndex );
583        PersistedNode<K, V> node = ( PersistedNode<K, V> ) level.getCurrentPage();
584
585        // We first have to write the page on disk
586        PageHolder<K, V> pageHolder = ( ( PersistedBTree<K, V> ) btree ).getRecordManager().writePage( btree, page, 0L );
587
588        // First deal with a node that has less than PageSize elements at this level.
589        // It will become the root node.
590        if ( level.getNbElems() <= pageSize + 1 )
591        {
592            injectInRoot( btree, page, pageHolder, level );
593
594            return;
595        }
596
597        // Now, we have some parent node. We process the 3 different use case :
598        // o Full node before the limit
599        // o Node over the limit but with at least N/2 elements
600        // o Node over the limit but with elements spread into 2 nodes
601        if ( level.getNbAddedElems() < level.getNbElemsLimit() )
602        {
603            // Ok, we haven't yet reached the incomplete pages (if any).
604            // Let's add the page reference into the node
605            // There is one special case : when we are adding the very first page 
606            // reference into a node. In this case, we don't store the key
607            if ( ( level.getCurrentPos() == 0 ) && ( node.getKey( 0 ) == null ) )
608            {
609                node.setPageHolder( 0, pageHolder );
610            }
611            else
612            {
613                // Inject the pageHolder and the page leftmost key
614                node.setPageHolder( level.getCurrentPos(), pageHolder );
615                KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), page.getLeftMostKey() );
616                node.setKey( level.getCurrentPos() - 1, keyHolder );
617            }
618
619            // Now, increment this level nb of added elements
620            level.incCurrentPos();
621            level.incNbAddedElems();
622
623            // Check that we haven't added the last element. If so,
624            // we have to write the page on disk and update the parent's node
625            if ( level.getNbAddedElems() == level.getNbElems() )
626            {
627                //PageHolder<K, V> rootHolder = ( ( PersistedBTree<K, V> ) btree ).getRecordManager().writePage(
628                //    btree, node, 0L );
629                //( ( PersistedBTree<K, V> ) btree ).setRootPage( rootHolder.getValue() );
630                injectInNode( btree, node, levels, levelIndex + 1 );
631
632                return;
633            }
634            else
635            {
636                // Check that we haven't completed the current node, and that this is not the root node.
637                if ( ( level.getCurrentPos() == pageSize + 1 ) && ( level.getLevelNumber() < levels.size() - 1 ) )
638                {
639                    // yes. We have to write the node on disk, update its parent
640                    // and create a new current node
641                    injectInNode( btree, node, levels, levelIndex + 1 );
642
643                    // The page is full, we have to create a new one, with a size depending on the remaining elements
644                    if ( level.getNbAddedElems() < level.getNbElemsLimit() )
645                    {
646                        // We haven't reached the limit, create a new full node
647                        level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize ) );
648                    }
649                    else if ( level.getNbElems() - level.getNbAddedElems() <= pageSize )
650                    {
651                        level.setCurrentPage( BTreeFactory.createNode( btree, 0L,
652                            level.getNbElems() - level.getNbAddedElems() - 1 ) );
653                    }
654                    else
655                    {
656                        level.setCurrentPage( BTreeFactory.createNode( btree, 0L, ( level.getNbElems() - 1 )
657                            - ( level.getNbAddedElems() + 1 ) - pageSize / 2 ) );
658                    }
659
660                    level.setCurrentPos( 0 );
661                }
662            }
663
664            return;
665        }
666        else
667        {
668            // We are dealing with the last page or the last two pages 
669            // We can have either one single pages which can contain up to pageSize-1 elements
670            // or with two pages, the first one containing ( nbElems - limit ) - pageSize/2 elements
671            // and the second one will contain pageSize/2 elements. 
672            if ( level.getNbElems() - level.getNbElemsLimit() > pageSize )
673            {
674                // As the remaining elements are above a page size, they will be spread across
675                // two pages. We have two cases here, depending on the page we are filling
676                if ( level.getNbElems() - level.getNbAddedElems() <= pageSize / 2 + 1 )
677                {
678                    // As we have less than PageSize/2 elements to write, we are on the second page
679                    if ( ( level.getCurrentPos() == 0 ) && ( node.getKey( 0 ) == null ) )
680                    {
681                        node.setPageHolder( 0, pageHolder );
682                    }
683                    else
684                    {
685                        // Inject the pageHolder and the page leftmost key
686                        node.setPageHolder( level.getCurrentPos(), pageHolder );
687                        KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(),
688                            page.getLeftMostKey() );
689                        node.setKey( level.getCurrentPos() - 1, keyHolder );
690                    }
691
692                    // Now, increment this level nb of added elements
693                    level.incCurrentPos();
694                    level.incNbAddedElems();
695
696                    // Check if we are done with the page
697                    if ( level.getNbAddedElems() == level.getNbElems() )
698                    {
699                        // Yes, we have to update the parent
700                        injectInNode( btree, node, levels, levelIndex + 1 );
701                    }
702                }
703                else
704                {
705                    // This is the first page 
706                    if ( ( level.getCurrentPos() == 0 ) && ( node.getKey( 0 ) == null ) )
707                    {
708                        // First element of the page
709                        node.setPageHolder( 0, pageHolder );
710                    }
711                    else
712                    {
713                        // Any other following elements
714                        // Inject the pageHolder and the page leftmost key
715                        node.setPageHolder( level.getCurrentPos(), pageHolder );
716                        KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(),
717                            page.getLeftMostKey() );
718                        node.setKey( level.getCurrentPos() - 1, keyHolder );
719                    }
720
721                    // Now, increment this level nb of added elements
722                    level.incCurrentPos();
723                    level.incNbAddedElems();
724
725                    // Check if we are done with the page
726                    if ( level.getCurrentPos() == node.getNbElems() + 1 )
727                    {
728                        // Yes, we have to update the parent
729                        injectInNode( btree, node, levels, levelIndex + 1 );
730
731                        // An create a new one
732                        level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize / 2 ) );
733                        level.setCurrentPos( 0 );
734                    }
735                }
736            }
737            else
738            {
739                // Two cases : we don't have anything else to write, or this is a single page
740                if ( level.getNbAddedElems() == level.getNbElems() )
741                {
742                    // We are done with the page
743                    injectInNode( btree, node, levels, levelIndex + 1 );
744                }
745                else
746                {
747                    // We have some more elements to add in  the page
748                    // This is the first page 
749                    if ( ( level.getCurrentPos() == 0 ) && ( node.getKey( 0 ) == null ) )
750                    {
751                        // First element of the page
752                        node.setPageHolder( 0, pageHolder );
753                    }
754                    else
755                    {
756                        // Any other following elements
757                        // Inject the pageHolder and the page leftmost key
758                        node.setPageHolder( level.getCurrentPos(), pageHolder );
759                        KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(),
760                            page.getLeftMostKey() );
761                        node.setKey( level.getCurrentPos() - 1, keyHolder );
762                    }
763
764                    // Now, increment this level nb of added elements
765                    level.incCurrentPos();
766                    level.incNbAddedElems();
767
768                    // Check if we are done with the page
769                    if ( level.getCurrentPos() == node.getNbElems() + 1 )
770                    {
771                        // Yes, we have to update the parent
772                        injectInNode( btree, node, levels, levelIndex + 1 );
773
774                        // An create a new one
775                        level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize / 2 ) );
776                        level.setCurrentPos( 0 );
777                    }
778                }
779
780                return;
781            }
782        }
783    }
784
785
786    private static <K, V> BTree<K, V> bulkLoadSinglePage( BTree<K, V> btree, Iterator<Tuple<K, Set<V>>> dataIterator,
787        int nbElems ) throws IOException
788    {
789        // Use the root page
790        Page<K, V> rootPage = btree.getRootPage();
791
792        // Initialize the root page
793        ( ( AbstractPage<K, V> ) rootPage ).setNbElems( nbElems );
794        KeyHolder<K>[] keys = new KeyHolder[nbElems];
795        ValueHolder<V>[] values = new ValueHolder[nbElems];
796
797        switch ( btree.getType() )
798        {
799            case IN_MEMORY:
800                ( ( InMemoryLeaf<K, V> ) rootPage ).values = values;
801                break;
802
803            default:
804                ( ( PersistedLeaf<K, V> ) rootPage ).values = values;
805        }
806
807        // We first have to inject data into the page
808        int pos = 0;
809
810        while ( dataIterator.hasNext() )
811        {
812            Tuple<K, Set<V>> tuple = dataIterator.next();
813
814            // Store the current element in the rootPage
815            KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), tuple.getKey() );
816            keys[pos] = keyHolder;
817
818            switch ( btree.getType() )
819            {
820                case IN_MEMORY:
821                    ValueHolder<V> valueHolder = new InMemoryValueHolder<V>( btree, ( V[] ) tuple.getValue()
822                        .toArray() );
823                    ( ( InMemoryLeaf<K, V> ) rootPage ).values[pos] = valueHolder;
824                    break;
825
826                default:
827                    valueHolder = new PersistedValueHolder<V>( btree, ( V[] ) tuple.getValue()
828                        .toArray() );
829                    ( ( PersistedLeaf<K, V> ) rootPage ).values[pos] = valueHolder;
830            }
831
832            pos++;
833        }
834
835        // Update the rootPage
836        ( ( AbstractPage<K, V> ) rootPage ).setKeys( keys );
837
838        // Update the btree with the nb of added elements, and write it$
839        BTreeHeader<K, V> btreeHeader = ( ( AbstractBTree<K, V> ) btree ).getBtreeHeader();
840        btreeHeader.setNbElems( nbElems );
841
842        return btree;
843    }
844
845
846    /**
847     * Construct the target BTree from the sorted data. We will use the nb of elements
848     * to determinate the structure of the BTree, as it must be balanced
849     */
850    private static <K, V> BTree<K, V> bulkLoad( BTree<K, V> btree, Iterator<Tuple<K, Set<V>>> dataIterator, int nbElems )
851        throws IOException
852    {
853        int pageSize = btree.getPageSize();
854
855        // Special case : we can store all the element sin a single page
856        if ( nbElems <= pageSize )
857        {
858            return bulkLoadSinglePage( btree, dataIterator, nbElems );
859        }
860
861        // Ok, we will need more than one page to store the elements, which
862        // means we also will need more than one level.
863        // First, compute the needed number of levels.
864        List<LevelInfo<K, V>> levels = computeLevels( btree, nbElems );
865
866        // Now, let's fill the levels
867        LevelInfo<K, V> leafLevel = levels.get( 0 );
868
869        while ( dataIterator.hasNext() )
870        {
871            // let's fill page up to the point all the complete pages have been filled
872            if ( leafLevel.getNbAddedElems() < leafLevel.getNbElemsLimit() )
873            {
874                // grab a tuple
875                Tuple<K, Set<V>> tuple = dataIterator.next();
876
877                injectInLeaf( btree, tuple, leafLevel );
878                leafLevel.incNbAddedElems();
879
880                // The page is completed, update the parent's node and create a new current page
881                if ( leafLevel.getCurrentPos() == pageSize )
882                {
883                    injectInNode( btree, leafLevel.getCurrentPage(), levels, 1 );
884
885                    // The page is full, we have to create a new one
886                    leafLevel.setCurrentPage( BTreeFactory
887                        .createLeaf( btree, 0L, computeNbElemsLeaf( btree, leafLevel ) ) );
888                    leafLevel.setCurrentPos( 0 );
889                }
890            }
891            else
892            {
893                // We have to deal with uncompleted pages now (if we have any)
894                if ( leafLevel.getNbAddedElems() == nbElems )
895                {
896                    // First use case : we have injected all the elements in the btree : get out
897                    break;
898                }
899
900                if ( nbElems - leafLevel.getNbElemsLimit() > pageSize )
901                {
902                    // Second use case : the number of elements after the limit does not
903                    // fit in a page, that means we have to split it into
904                    // two pages
905
906                    // First page will contain nbElems - leafLevel.nbElemsLimit - PageSize/2 elements
907                    int nbToAdd = nbElems - leafLevel.getNbElemsLimit() - pageSize / 2;
908
909                    while ( nbToAdd > 0 )
910                    {
911                        // grab a tuple
912                        Tuple<K, Set<V>> tuple = dataIterator.next();
913
914                        injectInLeaf( btree, tuple, leafLevel );
915                        leafLevel.incNbAddedElems();
916                        nbToAdd--;
917                    }
918
919                    // Now inject the page into the node
920                    Page<K, V> currentPage = leafLevel.getCurrentPage();
921                    injectInNode( btree, currentPage, levels, 1 );
922
923                    // Create a new page for the remaining elements
924                    nbToAdd = pageSize / 2;
925                    leafLevel.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, nbToAdd ) );
926                    leafLevel.setCurrentPos( 0 );
927
928                    while ( nbToAdd > 0 )
929                    {
930                        // grab a tuple
931                        Tuple<K, Set<V>> tuple = dataIterator.next();
932
933                        injectInLeaf( btree, tuple, leafLevel );
934                        leafLevel.incNbAddedElems();
935                        nbToAdd--;
936                    }
937
938                    // And update the parent node
939                    Page<K, V> levelCurrentPage = leafLevel.getCurrentPage();
940                    injectInNode( btree, levelCurrentPage, levels, 1 );
941
942                    // We are done
943                    break;
944                }
945                else
946                {
947                    // Third use case : we can push all the elements in the last page.
948                    // Let's do it
949                    int nbToAdd = nbElems - leafLevel.getNbElemsLimit();
950
951                    while ( nbToAdd > 0 )
952                    {
953                        // grab a tuple
954                        Tuple<K, Set<V>> tuple = dataIterator.next();
955
956                        injectInLeaf( btree, tuple, leafLevel );
957                        leafLevel.incNbAddedElems();
958                        nbToAdd--;
959                    }
960
961                    // Now inject the page into the node
962                    injectInNode( btree, leafLevel.getCurrentPage(), levels, 1 );
963
964                    // and we are done
965                    break;
966                }
967            }
968        }
969
970        // Update the btree with the nb of added elements, and write it$
971        BTreeHeader<K, V> btreeHeader = ( ( AbstractBTree<K, V> ) btree ).getBtreeHeader();
972        btreeHeader.setNbElems( nbElems );
973
974        return btree;
975    }
976
977
978    /**
979     * Flush a list of tuples to disk after having sorted them. In the process, we may have to gather the values
980     * for the tuples having the same keys.
981     * @throws IOException 
982     */
983    private static <K, V> File flushToDisk( int fileNb, List<Tuple<K, V>> tuples, BTree<K, V> btree )
984        throws IOException
985    {
986        // Sort the tuples. 
987        Tuple<K, Set<V>>[] sortedTuples = sort( btree, tuples );
988
989        File file = File.createTempFile( "sorted", Integer.toString( fileNb ) );
990        file.deleteOnExit();
991        FileOutputStream fos = new FileOutputStream( file );
992
993        // Flush the tuples on disk
994        for ( Tuple<K, Set<V>> tuple : sortedTuples )
995        {
996            // Serialize the key
997            byte[] bytesKey = btree.getKeySerializer().serialize( tuple.key );
998            fos.write( IntSerializer.serialize( bytesKey.length ) );
999            fos.write( bytesKey );
1000
1001            // Serialize the number of values
1002            int nbValues = tuple.getValue().size();
1003            fos.write( IntSerializer.serialize( nbValues ) );
1004
1005            // Serialize the values
1006            for ( V value : tuple.getValue() )
1007            {
1008                byte[] bytesValue = btree.getValueSerializer().serialize( value );
1009
1010                // Serialize the value
1011                fos.write( IntSerializer.serialize( bytesValue.length ) );
1012                fos.write( bytesValue );
1013            }
1014        }
1015
1016        fos.flush();
1017        fos.close();
1018
1019        return file;
1020    }
1021
1022
1023    /**
1024     * Sort a list of tuples, eliminating the duplicate keys and storing the values in a set when we 
1025     * have a duplicate key
1026     */
1027    private static <K, V> Tuple<K, Set<V>>[] sort( BTree<K, V> btree, List<Tuple<K, V>> tuples )
1028    {
1029        Comparator<Tuple<K, Set<V>>> tupleComparator = new TupleComparator( btree.getKeyComparator(), btree
1030            .getValueComparator() );
1031
1032        // Sort the list
1033        Tuple<K, V>[] tuplesArray = ( Tuple<K, V>[] ) tuples.toArray( new Tuple[]
1034            {} );
1035
1036        // First, eliminate the equals keys. We use a map for that
1037        Map<K, Set<V>> mapTuples = new HashMap<K, Set<V>>();
1038
1039        for ( Tuple<K, V> tuple : tuplesArray )
1040        {
1041            // Is the key present in the map ?
1042            Set<V> foundSet = mapTuples.get( tuple.key );
1043
1044            if ( foundSet != null )
1045            {
1046                // We already have had such a key, add the value to the existing key
1047                foundSet.add( tuple.value );
1048            }
1049            else
1050            {
1051                // No such key present in the map : create a new set to store the values,
1052                // and add it in the map associated with the new key
1053                Set<V> set = new TreeSet<V>();
1054                set.add( tuple.value );
1055                mapTuples.put( tuple.key, set );
1056            }
1057        }
1058
1059        // Now, sort the map, by extracting all the key/values from the map
1060        int size = mapTuples.size();
1061        Tuple<K, Set<V>>[] sortedTuples = new Tuple[size];
1062        int pos = 0;
1063
1064        // We create an array containing all the elements
1065        for ( Map.Entry<K, Set<V>> entry : mapTuples.entrySet() )
1066        {
1067            sortedTuples[pos] = new Tuple<K, Set<V>>();
1068            sortedTuples[pos].key = entry.getKey();
1069            sortedTuples[pos].value = entry.getValue();
1070            pos++;
1071        }
1072
1073        // And we sort the array
1074        Arrays.sort( sortedTuples, tupleComparator );
1075
1076        return sortedTuples;
1077    }
1078
1079
1080    /**
1081     * Build an iterator over an array of sorted tuples, in memory
1082     */
1083    private static <K, V> Iterator<Tuple<K, Set<V>>> createTupleIterator( BTree<K, V> btree, List<Tuple<K, V>> tuples )
1084    {
1085        final Tuple<K, Set<V>>[] sortedTuples = sort( btree, tuples );
1086
1087        Iterator<Tuple<K, Set<V>>> tupleIterator = new Iterator<Tuple<K, Set<V>>>()
1088        {
1089            private int pos = 0;
1090
1091
1092            @Override
1093            public Tuple<K, Set<V>> next()
1094            {
1095                // Return the current tuple, if any
1096                if ( pos < sortedTuples.length )
1097                {
1098                    Tuple<K, Set<V>> tuple = sortedTuples[pos];
1099                    pos++;
1100
1101                    return tuple;
1102                }
1103
1104                return null;
1105            }
1106
1107
1108            @Override
1109            public boolean hasNext()
1110            {
1111                return pos < sortedTuples.length;
1112            }
1113
1114
1115            @Override
1116            public void remove()
1117            {
1118            }
1119        };
1120
1121        return tupleIterator;
1122    }
1123
1124
1125    private static <K, V> Tuple<K, Set<V>> fetchTuple( BTree<K, V> btree, FileInputStream fis )
1126    {
1127        try
1128        {
1129            if ( fis.available() == 0 )
1130            {
1131                return null;
1132            }
1133
1134            Tuple<K, Set<V>> tuple = new Tuple<K, Set<V>>();
1135            tuple.value = new TreeSet<V>();
1136
1137            byte[] intBytes = new byte[4];
1138
1139            // Read the key length
1140            fis.read( intBytes );
1141            int keyLength = IntSerializer.deserialize( intBytes );
1142
1143            // Read the key
1144            byte[] keyBytes = new byte[keyLength];
1145            fis.read( keyBytes );
1146            K key = btree.getKeySerializer().fromBytes( keyBytes );
1147            tuple.key = key;
1148
1149            // get the number of values
1150            fis.read( intBytes );
1151            int nbValues = IntSerializer.deserialize( intBytes );
1152
1153            // Serialize the values
1154            for ( int i = 0; i < nbValues; i++ )
1155            {
1156                // Read the value length
1157                fis.read( intBytes );
1158                int valueLength = IntSerializer.deserialize( intBytes );
1159
1160                // Read the value
1161                byte[] valueBytes = new byte[valueLength];
1162                fis.read( valueBytes );
1163                V value = btree.getValueSerializer().fromBytes( valueBytes );
1164                tuple.value.add( value );
1165            }
1166
1167            return tuple;
1168        }
1169        catch ( IOException ioe )
1170        {
1171            return null;
1172        }
1173    }
1174
1175
1176    /**
1177     * Build an iterator over an array of sorted tuples, from files on the disk
1178     * @throws FileNotFoundException 
1179     */
1180    private static <K, V> Iterator<Tuple<K, Set<V>>> createIterator( final BTree<K, V> btree,
1181        final FileInputStream[] streams )
1182        throws FileNotFoundException
1183    {
1184        // The number of files we have to read from
1185        final int nbFiles = streams.length;
1186
1187        // We will read only one element at a time from each file
1188        final Tuple<K, Set<V>>[] readTuples = new Tuple[nbFiles];
1189        final TreeMap<Tuple<K, Integer>, Set<V>> candidates =
1190            new TreeMap<Tuple<K, Integer>, Set<V>>(
1191                new TupleComparator<K, Integer>( btree.getKeyComparator(), IntComparator.INSTANCE ) );
1192
1193        // Read the tuple from each files
1194        for ( int i = 0; i < nbFiles; i++ )
1195        {
1196            while ( true )
1197            {
1198                readTuples[i] = fetchTuple( btree, streams[i] );
1199
1200                if ( readTuples[i] != null )
1201                {
1202                    Tuple<K, Integer> candidate = new Tuple<K, Integer>( readTuples[i].key, i, btree.getKeySerializer()
1203                        .getComparator() );
1204
1205                    if ( !candidates.containsKey( candidate ) )
1206                    {
1207                        candidates.put( candidate, readTuples[i].getValue() );
1208                        break;
1209                    }
1210                    else
1211                    {
1212                        // We have to merge the pulled tuple with the existing one, and read one more tuple
1213                        Set<V> oldValues = candidates.get( candidate );
1214                        oldValues.addAll( readTuples[i].getValue() );
1215                    }
1216                }
1217                else
1218                {
1219                    break;
1220                }
1221            }
1222        }
1223
1224        Iterator<Tuple<K, Set<V>>> tupleIterator = new Iterator<Tuple<K, Set<V>>>()
1225        {
1226            @Override
1227            public Tuple<K, Set<V>> next()
1228            {
1229                // Get the first candidate
1230                Tuple<K, Integer> tupleCandidate = candidates.firstKey();
1231
1232                // Remove it from the set
1233                candidates.remove( tupleCandidate );
1234
1235                // Get the the next tuple from the stream we just got the tuple from
1236                Tuple<K, Set<V>> tuple = readTuples[tupleCandidate.value];
1237
1238                // fetch the next tuple from the file we just read teh candidate from 
1239                while ( true )
1240                {
1241                    // fetch it from the disk and store it into its reader
1242                    readTuples[tupleCandidate.value] = fetchTuple( btree, streams[tupleCandidate.value] );
1243
1244                    if ( readTuples[tupleCandidate.value] == null )
1245                    {
1246                        // No more tuple for this file
1247                        break;
1248                    }
1249
1250                    if ( readTuples[tupleCandidate.value] != null )
1251                    {
1252                        // And store it into the candidate set
1253                        Set<V> oldValues = candidates.get( readTuples[tupleCandidate.value] );
1254
1255                        if ( oldValues != null )
1256                        {
1257                            // We already have another element with the same key, merge them
1258                            oldValues.addAll( readTuples[tupleCandidate.value].value );
1259                        }
1260                        else
1261                        {
1262                            Tuple<K, Integer> newTuple = new Tuple<K, Integer>( readTuples[tupleCandidate.value].key,
1263                                tupleCandidate.value );
1264                            candidates.put( newTuple, readTuples[tupleCandidate.value].getValue() );
1265
1266                            // and exit the loop
1267                            break;
1268                        }
1269                    }
1270                }
1271
1272                // We can now return the found value
1273                return tuple;
1274            }
1275
1276
1277            @Override
1278            public boolean hasNext()
1279            {
1280                // Check that we have at least one element to read
1281                return !candidates.isEmpty();
1282            }
1283
1284
1285            @Override
1286            public void remove()
1287            {
1288            }
1289
1290        };
1291
1292        return tupleIterator;
1293    }
1294
1295
1296    /**
1297     * Build an iterator over an array of sorted tuples, from files on the disk
1298     * @throws FileNotFoundException 
1299     */
1300    private static <K, V> Iterator<Tuple<K, Set<V>>> createUniqueFileIterator( final BTree<K, V> btree,
1301        final FileInputStream stream )
1302        throws FileNotFoundException
1303    {
1304        Iterator<Tuple<K, Set<V>>> tupleIterator = new Iterator<Tuple<K, Set<V>>>()
1305        {
1306            boolean hasNext = true;
1307
1308
1309            @Override
1310            public Tuple<K, Set<V>> next()
1311            {
1312                // Get the tuple from the stream
1313                Tuple<K, Set<V>> tuple = fetchTuple( btree, stream );
1314
1315                // We can now return the found value
1316                return tuple;
1317            }
1318
1319
1320            @Override
1321            public boolean hasNext()
1322            {
1323                // Check that we have at least one element to read
1324                try
1325                {
1326                    return stream.available() > 0;
1327                }
1328                catch ( IOException e )
1329                {
1330                    return false;
1331                }
1332            }
1333
1334
1335            @Override
1336            public void remove()
1337            {
1338            }
1339
1340        };
1341
1342        return tupleIterator;
1343    }
1344
1345
1346    /**
1347     * Compact a given persisted BTree, making it dense. All the values will be stored 
1348     * in newly created pages, each one of them containing as much elements
1349     * as it's size.
1350     * </br>
1351     * The RecordManager will be stopped and restarted, do not use this method
1352     * on a running BTree.
1353     *
1354     * @param recordManager The associated recordManager
1355     * @param btree The BTree to compact
1356     */
1357    public static void compact( RecordManager recordManager, BTree<?, ?> btree )
1358    {
1359
1360    }
1361
1362
1363    /**
1364     * Compact a given in-memory BTree, making it dense. All the values will be stored 
1365     * in newly created pages, each one of them containing as much elements
1366     * as it's size.
1367     * </br>
1368     *
1369     * @param btree The BTree to compact
1370     * @throws KeyNotFoundException 
1371     * @throws IOException 
1372     */
1373    public static BTree<?, ?> compact( BTree<?, ?> btree ) throws IOException, KeyNotFoundException
1374    {
1375        // First, create a new BTree which will contain all the elements
1376        InMemoryBTreeConfiguration configuration = new InMemoryBTreeConfiguration();
1377        configuration.setName( btree.getName() );
1378        configuration.setPageSize( btree.getPageSize() );
1379        configuration.setKeySerializer( btree.getKeySerializer() );
1380        configuration.setValueSerializer( btree.getValueSerializer() );
1381        configuration.setAllowDuplicates( btree.isAllowDuplicates() );
1382        configuration.setReadTimeOut( btree.getReadTimeOut() );
1383        configuration.setWriteBufferSize( btree.getWriteBufferSize() );
1384
1385        File file = ( ( InMemoryBTree ) btree ).getFile();
1386
1387        if ( file != null )
1388        {
1389            configuration.setFilePath( file.getPath() );
1390        }
1391
1392        // Create a new Btree Builder
1393        InMemoryBTreeBuilder btreeBuilder = new InMemoryBTreeBuilder( configuration );
1394
1395        // Create a cursor over the existing btree
1396        final TupleCursor cursor = btree.browse();
1397
1398        // Create an iterator that will iterate the existing btree
1399        Iterator<Tuple> tupleItr = new Iterator<Tuple>()
1400        {
1401            @Override
1402            public Tuple next()
1403            {
1404                try
1405                {
1406                    return cursor.next();
1407                }
1408                catch ( EndOfFileExceededException e )
1409                {
1410                    return null;
1411                }
1412                catch ( IOException e )
1413                {
1414                    return null;
1415                }
1416            }
1417
1418
1419            @Override
1420            public boolean hasNext()
1421            {
1422                try
1423                {
1424                    return cursor.hasNext();
1425                }
1426                catch ( EndOfFileExceededException e )
1427                {
1428                    return false;
1429                }
1430                catch ( IOException e )
1431                {
1432                    return false;
1433                }
1434            }
1435
1436
1437            @Override
1438            public void remove()
1439            {
1440            }
1441        };
1442
1443        // And finally, compact the btree
1444        return btreeBuilder.build( tupleItr );
1445    }
1446}