1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.directory.mavibot.btree;
22
23
24 import java.io.File;
25 import java.io.FileInputStream;
26 import java.io.FileNotFoundException;
27 import java.io.FileOutputStream;
28 import java.io.IOException;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Comparator;
32 import java.util.HashMap;
33 import java.util.HashSet;
34 import java.util.Iterator;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Set;
38 import java.util.TreeMap;
39 import java.util.TreeSet;
40
41 import org.apache.directory.mavibot.btree.comparator.IntComparator;
42 import org.apache.directory.mavibot.btree.exception.EndOfFileExceededException;
43 import org.apache.directory.mavibot.btree.exception.KeyNotFoundException;
44 import org.apache.directory.mavibot.btree.serializer.IntSerializer;
45
46
47
48
49
50
51
52
53
54
55
56 public class BulkLoader<K, V>
57 {
58 private BulkLoader()
59 {
60 };
61
62 static enum LevelEnum
63 {
64 LEAF,
65 NODE
66 }
67
68
69
70
71
72 private static class SortedFile
73 {
74
75 private File file;
76
77
78 private int nbValues;
79
80
81
82 SortedFile( File file, int nbValues )
83 {
84 this.file = file;
85 this.nbValues = nbValues;
86 }
87 }
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102 private static <K, V> int readElements( BTree<K, V> btree, Iterator<Tuple<K, V>> iterator, List<File> sortedFiles,
103 List<Tuple<K, V>> tuples, int chunkSize ) throws IOException
104 {
105 int nbRead = 0;
106 int nbIteration = 0;
107 int nbElems = 0;
108 boolean inMemory = true;
109 Set<K> keys = new HashSet<K>();
110
111 while ( true )
112 {
113 nbIteration++;
114 tuples.clear();
115 keys.clear();
116
117
118 while ( iterator.hasNext() && ( nbRead < chunkSize ) )
119 {
120 Tuple<K, V> tuple = iterator.next();
121 tuples.add( tuple );
122
123 if ( !keys.contains( tuple.getKey() ) )
124 {
125 keys.add( tuple.getKey() );
126 nbRead++;
127 }
128 }
129
130 if ( nbRead < chunkSize )
131 {
132 if ( nbIteration != 1 )
133 {
134
135 inMemory = false;
136
137 sortedFiles.add( flushToDisk( nbIteration, tuples, btree ) );
138 }
139
140
141 nbElems += nbRead;
142
143 break;
144 }
145 else
146 {
147 if ( !iterator.hasNext() )
148 {
149
150 if ( nbIteration > 1 )
151 {
152
153 inMemory = false;
154 sortedFiles.add( flushToDisk( nbIteration, tuples, btree ) );
155 }
156
157
158
159
160
161 nbElems += nbRead;
162
163 break;
164 }
165
166
167 nbElems += nbRead;
168 nbRead = 0;
169 sortedFiles.add( flushToDisk( nbIteration, tuples, btree ) );
170 }
171 }
172
173 if ( !inMemory )
174 {
175 tuples.clear();
176 }
177
178 return nbElems;
179 }
180
181
182
183
184
185
186
187 private static <K, V> Tuple<Iterator<Tuple<K, Set<V>>>, SortedFile> processFiles( BTree<K, V> btree,
188 Iterator<Tuple<K, Set<V>>> dataIterator ) throws IOException
189 {
190 File file = File.createTempFile( "sortedUnique", "data" );
191 file.deleteOnExit();
192 FileOutputStream fos = new FileOutputStream( file );
193
194
195 int nbReads = 0;
196
197
198 while ( dataIterator.hasNext() )
199 {
200 nbReads++;
201
202
203 Tuple<K, Set<V>> tuple = dataIterator.next();
204
205
206 byte[] bytesKey = btree.getKeySerializer().serialize( tuple.key );
207 fos.write( IntSerializer.serialize( bytesKey.length ) );
208 fos.write( bytesKey );
209
210
211 int nbValues = tuple.getValue().size();
212 fos.write( IntSerializer.serialize( nbValues ) );
213
214
215 for ( V value : tuple.getValue() )
216 {
217 byte[] bytesValue = btree.getValueSerializer().serialize( value );
218
219
220 fos.write( IntSerializer.serialize( bytesValue.length ) );
221 fos.write( bytesValue );
222 }
223 }
224
225 fos.flush();
226 fos.close();
227
228 FileInputStream fis = new FileInputStream( file );
229 Iterator<Tuple<K, Set<V>>> uniqueIterator = createUniqueFileIterator( btree, fis );
230 SortedFile sortedFile = new SortedFile( file, nbReads );
231
232 Tuple<Iterator<Tuple<K, Set<V>>>, SortedFile> result = new Tuple<Iterator<Tuple<K, Set<V>>>, SortedFile>(
233 uniqueIterator, sortedFile );
234
235 return result;
236 }
237
238
239
240
241
242
243
244
245
246
247 public static <K, V> BTree<K, V> load( BTree<K, V> btree, Iterator<Tuple<K, V>> iterator, int chunkSize )
248 throws IOException
249 {
250 if ( btree == null )
251 {
252 throw new RuntimeException( "Invalid BTree : it's null" );
253 }
254
255 if ( iterator == null )
256 {
257
258 return null;
259 }
260
261
262 boolean inMemory = true;
263
264
265 List<File> sortedFiles = new ArrayList<File>();
266
267
268 List<Tuple<K, V>> tuples = new ArrayList<Tuple<K, V>>( chunkSize );
269
270
271
272 int nbElems = readElements( btree, iterator, sortedFiles, tuples, chunkSize );
273
274
275 if ( nbElems > 0 )
276 {
277 inMemory = tuples.size() > 0;
278 }
279
280
281 Iterator<Tuple<K, Set<V>>> dataIterator = null;
282 FileInputStream[] streams = null;
283 BTree<K, V> resultBTree = null;
284
285 if ( inMemory )
286 {
287
288
289 dataIterator = createTupleIterator( btree, tuples );
290 resultBTree = bulkLoad( btree, dataIterator, nbElems );
291 }
292 else
293 {
294
295 int nbFiles = sortedFiles.size();
296 streams = new FileInputStream[nbFiles];
297
298 for ( int i = 0; i < nbFiles; i++ )
299 {
300 streams[i] = new FileInputStream( sortedFiles.get( i ) );
301 }
302
303 dataIterator = createIterator( btree, streams );
304
305
306 Tuple<Iterator<Tuple<K, Set<V>>>, SortedFile> result = processFiles( btree, dataIterator );
307 resultBTree = bulkLoad( btree, result.key, result.value.nbValues );
308 result.value.file.delete();
309 }
310
311
312
313
314 if ( !inMemory )
315 {
316 int nbFiles = sortedFiles.size();
317
318 for ( int i = 0; i < nbFiles; i++ )
319 {
320 streams[i].close();
321 sortedFiles.get( i ).delete();
322 }
323 }
324
325 return resultBTree;
326 }
327
328
329
330
331
332
333 static <K, V> LevelInfo<K, V> computeLevel( BTree<K, V> btree, int nbElems, LevelEnum levelType )
334 {
335 int pageSize = btree.getPageSize();
336 int incrementNode = 0;
337
338 if ( levelType == LevelEnum.NODE )
339 {
340 incrementNode = 1;
341 }
342
343 LevelInfo<K, V> level = new LevelInfo<K, V>();
344 level.setType( ( levelType == LevelEnum.NODE ) );
345 level.setNbElems( nbElems );
346 level.setNbPages( nbElems / ( pageSize + incrementNode ) );
347 level.setLevelNumber( 0 );
348 level.setNbAddedElems( 0 );
349 level.setCurrentPos( 0 );
350
351
352 if ( nbElems <= pageSize + incrementNode )
353 {
354 if ( nbElems % ( pageSize + incrementNode ) != 0 )
355 {
356 level.setNbPages( 1 );
357 }
358
359 level.setNbElemsLimit( nbElems );
360
361 if ( level.isNode() )
362 {
363 level.setCurrentPage( BTreeFactory.createNode( btree, 0L, nbElems - 1 ) );
364 }
365 else
366 {
367 level.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, nbElems ) );
368 }
369 }
370 else
371 {
372 int remaining = nbElems % ( pageSize + incrementNode );
373
374 if ( remaining == 0 )
375 {
376 level.setNbElemsLimit( nbElems );
377
378 if ( level.isNode() )
379 {
380 level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize ) );
381 }
382 else
383 {
384 level.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, pageSize ) );
385 }
386 }
387 else
388 {
389 level.incNbPages();
390
391 if ( remaining < ( pageSize / 2 ) + incrementNode )
392 {
393 level.setNbElemsLimit( nbElems - remaining - ( pageSize + incrementNode ) );
394
395 if ( level.getNbElemsLimit() > 0 )
396 {
397 if ( level.isNode() )
398 {
399 level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize ) );
400 }
401 else
402 {
403 level.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, pageSize ) );
404 }
405 }
406 else
407 {
408 if ( level.isNode() )
409 {
410 level
411 .setCurrentPage( BTreeFactory.createNode( btree, 0L, ( pageSize / 2 ) + remaining - 1 ) );
412 }
413 else
414 {
415 level.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, ( pageSize / 2 ) + remaining ) );
416 }
417 }
418 }
419 else
420 {
421 level.setNbElemsLimit( nbElems - remaining );
422
423 if ( level.isNode() )
424 {
425 level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize ) );
426 }
427 else
428 {
429 level.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, pageSize ) );
430 }
431 }
432 }
433 }
434
435 return level;
436 }
437
438
439
440
441
442
443 static <K, V> List<LevelInfo<K, V>> computeLevels( BTree<K, V> btree, int nbElems )
444 {
445 List<LevelInfo<K, V>> levelList = new ArrayList<LevelInfo<K, V>>();
446
447
448 LevelInfo<K, V> leafLevel = computeLevel( btree, nbElems, LevelEnum.LEAF );
449
450 levelList.add( leafLevel );
451 int nbPages = leafLevel.getNbPages();
452 int levelNumber = 1;
453
454 while ( nbPages > 1 )
455 {
456
457 LevelInfo<K, V> nodeLevel = computeLevel( btree, nbPages, LevelEnum.NODE );
458 nodeLevel.setLevelNumber( levelNumber++ );
459 levelList.add( nodeLevel );
460 nbPages = nodeLevel.getNbPages();
461 }
462
463 return levelList;
464 }
465
466
467
468
469
470 private static <K, V> void injectInLeaf( BTree<K, V> btree, Tuple<K, Set<V>> tuple, LevelInfo<K, V> leafLevel )
471 {
472 PersistedLeaf<K, V> leaf = ( PersistedLeaf<K, V> ) leafLevel.getCurrentPage();
473
474 KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), tuple.getKey() );
475 leaf.setKey( leafLevel.getCurrentPos(), keyHolder );
476
477 if ( btree.getType() != BTreeTypeEnum.PERSISTED_SUB )
478 {
479 ValueHolder<V> valueHolder = new PersistedValueHolder<V>( btree, ( V[] ) tuple.getValue().toArray() );
480 leaf.setValue( leafLevel.getCurrentPos(), valueHolder );
481 }
482
483 leafLevel.incCurrentPos();
484 }
485
486
487 private static <K, V> int computeNbElemsLeaf( BTree<K, V> btree, LevelInfo<K, V> levelInfo )
488 {
489 int pageSize = btree.getPageSize();
490 int remaining = levelInfo.getNbElems() - levelInfo.getNbAddedElems();
491
492 if ( remaining < pageSize )
493 {
494 return remaining;
495 }
496 else if ( remaining == pageSize )
497 {
498 return pageSize;
499 }
500 else if ( remaining > levelInfo.getNbElems() - levelInfo.getNbElemsLimit() )
501 {
502 return pageSize;
503 }
504 else
505 {
506 return remaining - pageSize / 2;
507 }
508 }
509
510
511
512
513
514 int computeNbElemsNode( BTree<K, V> btree, LevelInfo<K, V> levelInfo )
515 {
516 int pageSize = btree.getPageSize();
517 int remaining = levelInfo.getNbElems() - levelInfo.getNbAddedElems();
518
519 if ( remaining < pageSize + 1 )
520 {
521 return remaining;
522 }
523 else if ( remaining == pageSize + 1 )
524 {
525 return pageSize + 1;
526 }
527 else if ( remaining > levelInfo.getNbElems() - levelInfo.getNbElemsLimit() )
528 {
529 return pageSize + 1;
530 }
531 else
532 {
533 return remaining - pageSize / 2;
534 }
535 }
536
537
538
539
540
541 private static <K, V> void injectInRoot( BTree<K, V> btree, Page<K, V> page, PageHolder<K, V> pageHolder,
542 LevelInfo<K, V> level ) throws IOException
543 {
544 PersistedNode<K, V> node = ( PersistedNode<K, V> ) level.getCurrentPage();
545
546 if ( ( level.getCurrentPos() == 0 ) && ( node.getPage( 0 ) == null ) )
547 {
548 node.setPageHolder( 0, pageHolder );
549 level.incNbAddedElems();
550 }
551 else
552 {
553
554 node.setPageHolder( level.getCurrentPos() + 1, pageHolder );
555 KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), page.getLeftMostKey() );
556 node.setKey( level.getCurrentPos(), keyHolder );
557 level.incCurrentPos();
558 level.incNbAddedElems();
559
560
561
562 if ( level.getNbAddedElems() == level.getNbElems() )
563 {
564 PageHolder<K, V> rootHolder = ( ( PersistedBTree<K, V> ) btree ).getRecordManager().writePage(
565 btree, node, 0L );
566 ( ( PersistedBTree<K, V> ) btree ).setRootPage( rootHolder.getValue() );
567 }
568 }
569
570 return;
571 }
572
573
574
575
576
577 private static <K, V> void injectInNode( BTree<K, V> btree, Page<K, V> page, List<LevelInfo<K, V>> levels,
578 int levelIndex )
579 throws IOException
580 {
581 int pageSize = btree.getPageSize();
582 LevelInfo<K, V> level = levels.get( levelIndex );
583 PersistedNode<K, V> node = ( PersistedNode<K, V> ) level.getCurrentPage();
584
585
586 PageHolder<K, V> pageHolder = ( ( PersistedBTree<K, V> ) btree ).getRecordManager().writePage( btree, page, 0L );
587
588
589
590 if ( level.getNbElems() <= pageSize + 1 )
591 {
592 injectInRoot( btree, page, pageHolder, level );
593
594 return;
595 }
596
597
598
599
600
601 if ( level.getNbAddedElems() < level.getNbElemsLimit() )
602 {
603
604
605
606
607 if ( ( level.getCurrentPos() == 0 ) && ( node.getKey( 0 ) == null ) )
608 {
609 node.setPageHolder( 0, pageHolder );
610 }
611 else
612 {
613
614 node.setPageHolder( level.getCurrentPos(), pageHolder );
615 KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), page.getLeftMostKey() );
616 node.setKey( level.getCurrentPos() - 1, keyHolder );
617 }
618
619
620 level.incCurrentPos();
621 level.incNbAddedElems();
622
623
624
625 if ( level.getNbAddedElems() == level.getNbElems() )
626 {
627
628
629
630 injectInNode( btree, node, levels, levelIndex + 1 );
631
632 return;
633 }
634 else
635 {
636
637 if ( ( level.getCurrentPos() == pageSize + 1 ) && ( level.getLevelNumber() < levels.size() - 1 ) )
638 {
639
640
641 injectInNode( btree, node, levels, levelIndex + 1 );
642
643
644 if ( level.getNbAddedElems() < level.getNbElemsLimit() )
645 {
646
647 level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize ) );
648 }
649 else if ( level.getNbElems() - level.getNbAddedElems() <= pageSize )
650 {
651 level.setCurrentPage( BTreeFactory.createNode( btree, 0L,
652 level.getNbElems() - level.getNbAddedElems() - 1 ) );
653 }
654 else
655 {
656 level.setCurrentPage( BTreeFactory.createNode( btree, 0L, ( level.getNbElems() - 1 )
657 - ( level.getNbAddedElems() + 1 ) - pageSize / 2 ) );
658 }
659
660 level.setCurrentPos( 0 );
661 }
662 }
663
664 return;
665 }
666 else
667 {
668
669
670
671
672 if ( level.getNbElems() - level.getNbElemsLimit() > pageSize )
673 {
674
675
676 if ( level.getNbElems() - level.getNbAddedElems() <= pageSize / 2 + 1 )
677 {
678
679 if ( ( level.getCurrentPos() == 0 ) && ( node.getKey( 0 ) == null ) )
680 {
681 node.setPageHolder( 0, pageHolder );
682 }
683 else
684 {
685
686 node.setPageHolder( level.getCurrentPos(), pageHolder );
687 KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(),
688 page.getLeftMostKey() );
689 node.setKey( level.getCurrentPos() - 1, keyHolder );
690 }
691
692
693 level.incCurrentPos();
694 level.incNbAddedElems();
695
696
697 if ( level.getNbAddedElems() == level.getNbElems() )
698 {
699
700 injectInNode( btree, node, levels, levelIndex + 1 );
701 }
702 }
703 else
704 {
705
706 if ( ( level.getCurrentPos() == 0 ) && ( node.getKey( 0 ) == null ) )
707 {
708
709 node.setPageHolder( 0, pageHolder );
710 }
711 else
712 {
713
714
715 node.setPageHolder( level.getCurrentPos(), pageHolder );
716 KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(),
717 page.getLeftMostKey() );
718 node.setKey( level.getCurrentPos() - 1, keyHolder );
719 }
720
721
722 level.incCurrentPos();
723 level.incNbAddedElems();
724
725
726 if ( level.getCurrentPos() == node.getNbElems() + 1 )
727 {
728
729 injectInNode( btree, node, levels, levelIndex + 1 );
730
731
732 level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize / 2 ) );
733 level.setCurrentPos( 0 );
734 }
735 }
736 }
737 else
738 {
739
740 if ( level.getNbAddedElems() == level.getNbElems() )
741 {
742
743 injectInNode( btree, node, levels, levelIndex + 1 );
744 }
745 else
746 {
747
748
749 if ( ( level.getCurrentPos() == 0 ) && ( node.getKey( 0 ) == null ) )
750 {
751
752 node.setPageHolder( 0, pageHolder );
753 }
754 else
755 {
756
757
758 node.setPageHolder( level.getCurrentPos(), pageHolder );
759 KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(),
760 page.getLeftMostKey() );
761 node.setKey( level.getCurrentPos() - 1, keyHolder );
762 }
763
764
765 level.incCurrentPos();
766 level.incNbAddedElems();
767
768
769 if ( level.getCurrentPos() == node.getNbElems() + 1 )
770 {
771
772 injectInNode( btree, node, levels, levelIndex + 1 );
773
774
775 level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize / 2 ) );
776 level.setCurrentPos( 0 );
777 }
778 }
779
780 return;
781 }
782 }
783 }
784
785
786 private static <K, V> BTree<K, V> bulkLoadSinglePage( BTree<K, V> btree, Iterator<Tuple<K, Set<V>>> dataIterator,
787 int nbElems ) throws IOException
788 {
789
790 Page<K, V> rootPage = btree.getRootPage();
791
792
793 ( ( AbstractPage<K, V> ) rootPage ).setNbElems( nbElems );
794 KeyHolder<K>[] keys = new KeyHolder[nbElems];
795 ValueHolder<V>[] values = new ValueHolder[nbElems];
796
797 switch ( btree.getType() )
798 {
799 case IN_MEMORY:
800 ( ( InMemoryLeaf<K, V> ) rootPage ).values = values;
801 break;
802
803 default:
804 ( ( PersistedLeaf<K, V> ) rootPage ).values = values;
805 }
806
807
808 int pos = 0;
809
810 while ( dataIterator.hasNext() )
811 {
812 Tuple<K, Set<V>> tuple = dataIterator.next();
813
814
815 KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), tuple.getKey() );
816 keys[pos] = keyHolder;
817
818 switch ( btree.getType() )
819 {
820 case IN_MEMORY:
821 ValueHolder<V> valueHolder = new InMemoryValueHolder<V>( btree, ( V[] ) tuple.getValue()
822 .toArray() );
823 ( ( InMemoryLeaf<K, V> ) rootPage ).values[pos] = valueHolder;
824 break;
825
826 default:
827 valueHolder = new PersistedValueHolder<V>( btree, ( V[] ) tuple.getValue()
828 .toArray() );
829 ( ( PersistedLeaf<K, V> ) rootPage ).values[pos] = valueHolder;
830 }
831
832 pos++;
833 }
834
835
836 ( ( AbstractPage<K, V> ) rootPage ).setKeys( keys );
837
838
839 BTreeHeader<K, V> btreeHeader = ( ( AbstractBTree<K, V> ) btree ).getBtreeHeader();
840 btreeHeader.setNbElems( nbElems );
841
842 return btree;
843 }
844
845
846
847
848
849
850 private static <K, V> BTree<K, V> bulkLoad( BTree<K, V> btree, Iterator<Tuple<K, Set<V>>> dataIterator, int nbElems )
851 throws IOException
852 {
853 int pageSize = btree.getPageSize();
854
855
856 if ( nbElems <= pageSize )
857 {
858 return bulkLoadSinglePage( btree, dataIterator, nbElems );
859 }
860
861
862
863
864 List<LevelInfo<K, V>> levels = computeLevels( btree, nbElems );
865
866
867 LevelInfo<K, V> leafLevel = levels.get( 0 );
868
869 while ( dataIterator.hasNext() )
870 {
871
872 if ( leafLevel.getNbAddedElems() < leafLevel.getNbElemsLimit() )
873 {
874
875 Tuple<K, Set<V>> tuple = dataIterator.next();
876
877 injectInLeaf( btree, tuple, leafLevel );
878 leafLevel.incNbAddedElems();
879
880
881 if ( leafLevel.getCurrentPos() == pageSize )
882 {
883 injectInNode( btree, leafLevel.getCurrentPage(), levels, 1 );
884
885
886 leafLevel.setCurrentPage( BTreeFactory
887 .createLeaf( btree, 0L, computeNbElemsLeaf( btree, leafLevel ) ) );
888 leafLevel.setCurrentPos( 0 );
889 }
890 }
891 else
892 {
893
894 if ( leafLevel.getNbAddedElems() == nbElems )
895 {
896
897 break;
898 }
899
900 if ( nbElems - leafLevel.getNbElemsLimit() > pageSize )
901 {
902
903
904
905
906
907 int nbToAdd = nbElems - leafLevel.getNbElemsLimit() - pageSize / 2;
908
909 while ( nbToAdd > 0 )
910 {
911
912 Tuple<K, Set<V>> tuple = dataIterator.next();
913
914 injectInLeaf( btree, tuple, leafLevel );
915 leafLevel.incNbAddedElems();
916 nbToAdd--;
917 }
918
919
920 Page<K, V> currentPage = leafLevel.getCurrentPage();
921 injectInNode( btree, currentPage, levels, 1 );
922
923
924 nbToAdd = pageSize / 2;
925 leafLevel.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, nbToAdd ) );
926 leafLevel.setCurrentPos( 0 );
927
928 while ( nbToAdd > 0 )
929 {
930
931 Tuple<K, Set<V>> tuple = dataIterator.next();
932
933 injectInLeaf( btree, tuple, leafLevel );
934 leafLevel.incNbAddedElems();
935 nbToAdd--;
936 }
937
938
939 Page<K, V> levelCurrentPage = leafLevel.getCurrentPage();
940 injectInNode( btree, levelCurrentPage, levels, 1 );
941
942
943 break;
944 }
945 else
946 {
947
948
949 int nbToAdd = nbElems - leafLevel.getNbElemsLimit();
950
951 while ( nbToAdd > 0 )
952 {
953
954 Tuple<K, Set<V>> tuple = dataIterator.next();
955
956 injectInLeaf( btree, tuple, leafLevel );
957 leafLevel.incNbAddedElems();
958 nbToAdd--;
959 }
960
961
962 injectInNode( btree, leafLevel.getCurrentPage(), levels, 1 );
963
964
965 break;
966 }
967 }
968 }
969
970
971 BTreeHeader<K, V> btreeHeader = ( ( AbstractBTree<K, V> ) btree ).getBtreeHeader();
972 btreeHeader.setNbElems( nbElems );
973
974 return btree;
975 }
976
977
978
979
980
981
982
983 private static <K, V> File flushToDisk( int fileNb, List<Tuple<K, V>> tuples, BTree<K, V> btree )
984 throws IOException
985 {
986
987 Tuple<K, Set<V>>[] sortedTuples = sort( btree, tuples );
988
989 File file = File.createTempFile( "sorted", Integer.toString( fileNb ) );
990 file.deleteOnExit();
991 FileOutputStream fos = new FileOutputStream( file );
992
993
994 for ( Tuple<K, Set<V>> tuple : sortedTuples )
995 {
996
997 byte[] bytesKey = btree.getKeySerializer().serialize( tuple.key );
998 fos.write( IntSerializer.serialize( bytesKey.length ) );
999 fos.write( bytesKey );
1000
1001
1002 int nbValues = tuple.getValue().size();
1003 fos.write( IntSerializer.serialize( nbValues ) );
1004
1005
1006 for ( V value : tuple.getValue() )
1007 {
1008 byte[] bytesValue = btree.getValueSerializer().serialize( value );
1009
1010
1011 fos.write( IntSerializer.serialize( bytesValue.length ) );
1012 fos.write( bytesValue );
1013 }
1014 }
1015
1016 fos.flush();
1017 fos.close();
1018
1019 return file;
1020 }
1021
1022
1023
1024
1025
1026
1027 private static <K, V> Tuple<K, Set<V>>[] sort( BTree<K, V> btree, List<Tuple<K, V>> tuples )
1028 {
1029 Comparator<Tuple<K, Set<V>>> tupleComparator = new TupleComparator( btree.getKeyComparator(), btree
1030 .getValueComparator() );
1031
1032
1033 Tuple<K, V>[] tuplesArray = ( Tuple<K, V>[] ) tuples.toArray( new Tuple[]
1034 {} );
1035
1036
1037 Map<K, Set<V>> mapTuples = new HashMap<K, Set<V>>();
1038
1039 for ( Tuple<K, V> tuple : tuplesArray )
1040 {
1041
1042 Set<V> foundSet = mapTuples.get( tuple.key );
1043
1044 if ( foundSet != null )
1045 {
1046
1047 foundSet.add( tuple.value );
1048 }
1049 else
1050 {
1051
1052
1053 Set<V> set = new TreeSet<V>();
1054 set.add( tuple.value );
1055 mapTuples.put( tuple.key, set );
1056 }
1057 }
1058
1059
1060 int size = mapTuples.size();
1061 Tuple<K, Set<V>>[] sortedTuples = new Tuple[size];
1062 int pos = 0;
1063
1064
1065 for ( Map.Entry<K, Set<V>> entry : mapTuples.entrySet() )
1066 {
1067 sortedTuples[pos] = new Tuple<K, Set<V>>();
1068 sortedTuples[pos].key = entry.getKey();
1069 sortedTuples[pos].value = entry.getValue();
1070 pos++;
1071 }
1072
1073
1074 Arrays.sort( sortedTuples, tupleComparator );
1075
1076 return sortedTuples;
1077 }
1078
1079
1080
1081
1082
1083 private static <K, V> Iterator<Tuple<K, Set<V>>> createTupleIterator( BTree<K, V> btree, List<Tuple<K, V>> tuples )
1084 {
1085 final Tuple<K, Set<V>>[] sortedTuples = sort( btree, tuples );
1086
1087 Iterator<Tuple<K, Set<V>>> tupleIterator = new Iterator<Tuple<K, Set<V>>>()
1088 {
1089 private int pos = 0;
1090
1091
1092 @Override
1093 public Tuple<K, Set<V>> next()
1094 {
1095
1096 if ( pos < sortedTuples.length )
1097 {
1098 Tuple<K, Set<V>> tuple = sortedTuples[pos];
1099 pos++;
1100
1101 return tuple;
1102 }
1103
1104 return null;
1105 }
1106
1107
1108 @Override
1109 public boolean hasNext()
1110 {
1111 return pos < sortedTuples.length;
1112 }
1113
1114
1115 @Override
1116 public void remove()
1117 {
1118 }
1119 };
1120
1121 return tupleIterator;
1122 }
1123
1124
1125 private static <K, V> Tuple<K, Set<V>> fetchTuple( BTree<K, V> btree, FileInputStream fis )
1126 {
1127 try
1128 {
1129 if ( fis.available() == 0 )
1130 {
1131 return null;
1132 }
1133
1134 Tuple<K, Set<V>> tuple = new Tuple<K, Set<V>>();
1135 tuple.value = new TreeSet<V>();
1136
1137 byte[] intBytes = new byte[4];
1138
1139
1140 fis.read( intBytes );
1141 int keyLength = IntSerializer.deserialize( intBytes );
1142
1143
1144 byte[] keyBytes = new byte[keyLength];
1145 fis.read( keyBytes );
1146 K key = btree.getKeySerializer().fromBytes( keyBytes );
1147 tuple.key = key;
1148
1149
1150 fis.read( intBytes );
1151 int nbValues = IntSerializer.deserialize( intBytes );
1152
1153
1154 for ( int i = 0; i < nbValues; i++ )
1155 {
1156
1157 fis.read( intBytes );
1158 int valueLength = IntSerializer.deserialize( intBytes );
1159
1160
1161 byte[] valueBytes = new byte[valueLength];
1162 fis.read( valueBytes );
1163 V value = btree.getValueSerializer().fromBytes( valueBytes );
1164 tuple.value.add( value );
1165 }
1166
1167 return tuple;
1168 }
1169 catch ( IOException ioe )
1170 {
1171 return null;
1172 }
1173 }
1174
1175
1176
1177
1178
1179
1180 private static <K, V> Iterator<Tuple<K, Set<V>>> createIterator( final BTree<K, V> btree,
1181 final FileInputStream[] streams )
1182 throws FileNotFoundException
1183 {
1184
1185 final int nbFiles = streams.length;
1186
1187
1188 final Tuple<K, Set<V>>[] readTuples = new Tuple[nbFiles];
1189 final TreeMap<Tuple<K, Integer>, Set<V>> candidates =
1190 new TreeMap<Tuple<K, Integer>, Set<V>>(
1191 new TupleComparator<K, Integer>( btree.getKeyComparator(), IntComparator.INSTANCE ) );
1192
1193
1194 for ( int i = 0; i < nbFiles; i++ )
1195 {
1196 while ( true )
1197 {
1198 readTuples[i] = fetchTuple( btree, streams[i] );
1199
1200 if ( readTuples[i] != null )
1201 {
1202 Tuple<K, Integer> candidate = new Tuple<K, Integer>( readTuples[i].key, i, btree.getKeySerializer()
1203 .getComparator() );
1204
1205 if ( !candidates.containsKey( candidate ) )
1206 {
1207 candidates.put( candidate, readTuples[i].getValue() );
1208 break;
1209 }
1210 else
1211 {
1212
1213 Set<V> oldValues = candidates.get( candidate );
1214 oldValues.addAll( readTuples[i].getValue() );
1215 }
1216 }
1217 else
1218 {
1219 break;
1220 }
1221 }
1222 }
1223
1224 Iterator<Tuple<K, Set<V>>> tupleIterator = new Iterator<Tuple<K, Set<V>>>()
1225 {
1226 @Override
1227 public Tuple<K, Set<V>> next()
1228 {
1229
1230 Tuple<K, Integer> tupleCandidate = candidates.firstKey();
1231
1232
1233 candidates.remove( tupleCandidate );
1234
1235
1236 Tuple<K, Set<V>> tuple = readTuples[tupleCandidate.value];
1237
1238
1239 while ( true )
1240 {
1241
1242 readTuples[tupleCandidate.value] = fetchTuple( btree, streams[tupleCandidate.value] );
1243
1244 if ( readTuples[tupleCandidate.value] == null )
1245 {
1246
1247 break;
1248 }
1249
1250 if ( readTuples[tupleCandidate.value] != null )
1251 {
1252
1253 Set<V> oldValues = candidates.get( readTuples[tupleCandidate.value] );
1254
1255 if ( oldValues != null )
1256 {
1257
1258 oldValues.addAll( readTuples[tupleCandidate.value].value );
1259 }
1260 else
1261 {
1262 Tuple<K, Integer> newTuple = new Tuple<K, Integer>( readTuples[tupleCandidate.value].key,
1263 tupleCandidate.value );
1264 candidates.put( newTuple, readTuples[tupleCandidate.value].getValue() );
1265
1266
1267 break;
1268 }
1269 }
1270 }
1271
1272
1273 return tuple;
1274 }
1275
1276
1277 @Override
1278 public boolean hasNext()
1279 {
1280
1281 return !candidates.isEmpty();
1282 }
1283
1284
1285 @Override
1286 public void remove()
1287 {
1288 }
1289
1290 };
1291
1292 return tupleIterator;
1293 }
1294
1295
1296
1297
1298
1299
1300 private static <K, V> Iterator<Tuple<K, Set<V>>> createUniqueFileIterator( final BTree<K, V> btree,
1301 final FileInputStream stream )
1302 throws FileNotFoundException
1303 {
1304 Iterator<Tuple<K, Set<V>>> tupleIterator = new Iterator<Tuple<K, Set<V>>>()
1305 {
1306 boolean hasNext = true;
1307
1308
1309 @Override
1310 public Tuple<K, Set<V>> next()
1311 {
1312
1313 Tuple<K, Set<V>> tuple = fetchTuple( btree, stream );
1314
1315
1316 return tuple;
1317 }
1318
1319
1320 @Override
1321 public boolean hasNext()
1322 {
1323
1324 try
1325 {
1326 return stream.available() > 0;
1327 }
1328 catch ( IOException e )
1329 {
1330 return false;
1331 }
1332 }
1333
1334
1335 @Override
1336 public void remove()
1337 {
1338 }
1339
1340 };
1341
1342 return tupleIterator;
1343 }
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357 public static void compact( RecordManager recordManager, BTree<?, ?> btree )
1358 {
1359
1360 }
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373 public static BTree<?, ?> compact( BTree<?, ?> btree ) throws IOException, KeyNotFoundException
1374 {
1375
1376 InMemoryBTreeConfiguration configuration = new InMemoryBTreeConfiguration();
1377 configuration.setName( btree.getName() );
1378 configuration.setPageSize( btree.getPageSize() );
1379 configuration.setKeySerializer( btree.getKeySerializer() );
1380 configuration.setValueSerializer( btree.getValueSerializer() );
1381 configuration.setAllowDuplicates( btree.isAllowDuplicates() );
1382 configuration.setReadTimeOut( btree.getReadTimeOut() );
1383 configuration.setWriteBufferSize( btree.getWriteBufferSize() );
1384
1385 File file = ( ( InMemoryBTree ) btree ).getFile();
1386
1387 if ( file != null )
1388 {
1389 configuration.setFilePath( file.getPath() );
1390 }
1391
1392
1393 InMemoryBTreeBuilder btreeBuilder = new InMemoryBTreeBuilder( configuration );
1394
1395
1396 final TupleCursor cursor = btree.browse();
1397
1398
1399 Iterator<Tuple> tupleItr = new Iterator<Tuple>()
1400 {
1401 @Override
1402 public Tuple next()
1403 {
1404 try
1405 {
1406 return cursor.next();
1407 }
1408 catch ( EndOfFileExceededException e )
1409 {
1410 return null;
1411 }
1412 catch ( IOException e )
1413 {
1414 return null;
1415 }
1416 }
1417
1418
1419 @Override
1420 public boolean hasNext()
1421 {
1422 try
1423 {
1424 return cursor.hasNext();
1425 }
1426 catch ( EndOfFileExceededException e )
1427 {
1428 return false;
1429 }
1430 catch ( IOException e )
1431 {
1432 return false;
1433 }
1434 }
1435
1436
1437 @Override
1438 public void remove()
1439 {
1440 }
1441 };
1442
1443
1444 return btreeBuilder.build( tupleItr );
1445 }
1446 }