001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020 021package org.apache.directory.mavibot.btree; 022 023 024import java.io.File; 025import java.io.FileInputStream; 026import java.io.FileNotFoundException; 027import java.io.FileOutputStream; 028import java.io.IOException; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.Comparator; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.Iterator; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import java.util.TreeMap; 039import java.util.TreeSet; 040 041import org.apache.directory.mavibot.btree.comparator.IntComparator; 042import org.apache.directory.mavibot.btree.exception.EndOfFileExceededException; 043import org.apache.directory.mavibot.btree.exception.KeyNotFoundException; 044import org.apache.directory.mavibot.btree.serializer.IntSerializer; 045 046 047/** 048 * A class used to bulk load a BTree. It will allow the load of N elements in 049 * a given BTree without to have to inject one by one, saving a lot of time. 050 * The second advantage is that the btree will be dense (the leaves will be 051 * complete, except the last one). 052 * This class can also be used to compact a BTree. 053 * 054 * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a> 055 */ 056public class BulkLoader<K, V> 057{ 058 private BulkLoader() 059 { 060 }; 061 062 static enum LevelEnum 063 { 064 LEAF, 065 NODE 066 } 067 068 /** 069 * A private class used to store the temporary sorted file. It's used by 070 * the bulkLoader 071 */ 072 private static class SortedFile 073 { 074 /** the file that contains the values */ 075 private File file; 076 077 /** The number of stored values */ 078 private int nbValues; 079 080 081 /** A constructor for this class */ 082 /*No Qualifier*/SortedFile( File file, int nbValues ) 083 { 084 this.file = file; 085 this.nbValues = nbValues; 086 } 087 } 088 089 090 /** 091 * Process the data, and creates files to store them sorted if necessary, or store them 092 * TODO readElements. 093 * 094 * @param btree 095 * @param iterator 096 * @param sortedFiles 097 * @param tuples 098 * @param chunkSize 099 * @return 100 * @throws IOException 101 */ 102 private static <K, V> int readElements( BTree<K, V> btree, Iterator<Tuple<K, V>> iterator, List<File> sortedFiles, 103 List<Tuple<K, V>> tuples, int chunkSize ) throws IOException 104 { 105 int nbRead = 0; 106 int nbIteration = 0; 107 int nbElems = 0; 108 boolean inMemory = true; 109 Set<K> keys = new HashSet<K>(); 110 111 while ( true ) 112 { 113 nbIteration++; 114 tuples.clear(); 115 keys.clear(); 116 117 // Read up to chukSize elements 118 while ( iterator.hasNext() && ( nbRead < chunkSize ) ) 119 { 120 Tuple<K, V> tuple = iterator.next(); 121 tuples.add( tuple ); 122 123 if ( !keys.contains( tuple.getKey() ) ) 124 { 125 keys.add( tuple.getKey() ); 126 nbRead++; 127 } 128 } 129 130 if ( nbRead < chunkSize ) 131 { 132 if ( nbIteration != 1 ) 133 { 134 // Flush the sorted data on disk and exit 135 inMemory = false; 136 137 sortedFiles.add( flushToDisk( nbIteration, tuples, btree ) ); 138 } 139 140 // Update the number of read elements 141 nbElems += nbRead; 142 143 break; 144 } 145 else 146 { 147 if ( !iterator.hasNext() ) 148 { 149 // special case : we have exactly chunkSize elements in the incoming data 150 if ( nbIteration > 1 ) 151 { 152 // Flush the sorted data on disk and exit 153 inMemory = false; 154 sortedFiles.add( flushToDisk( nbIteration, tuples, btree ) ); 155 } 156 157 // We have read all the data in one round trip, let's get out, no need 158 // to store the data on disk 159 160 // Update the number of read elements 161 nbElems += nbRead; 162 163 break; 164 } 165 166 // We have read chunkSize elements, we have to sort them on disk 167 nbElems += nbRead; 168 nbRead = 0; 169 sortedFiles.add( flushToDisk( nbIteration, tuples, btree ) ); 170 } 171 } 172 173 if ( !inMemory ) 174 { 175 tuples.clear(); 176 } 177 178 return nbElems; 179 } 180 181 182 /** 183 * Read all the sorted files, and inject them into one single big file containing all the 184 * sorted and merged elements. 185 * @throws IOException 186 */ 187 private static <K, V> Tuple<Iterator<Tuple<K, Set<V>>>, SortedFile> processFiles( BTree<K, V> btree, 188 Iterator<Tuple<K, Set<V>>> dataIterator ) throws IOException 189 { 190 File file = File.createTempFile( "sortedUnique", "data" ); 191 file.deleteOnExit(); 192 FileOutputStream fos = new FileOutputStream( file ); 193 194 // Number of read elements 195 int nbReads = 0; 196 197 // Flush the tuples on disk 198 while ( dataIterator.hasNext() ) 199 { 200 nbReads++; 201 202 // grab a tuple 203 Tuple<K, Set<V>> tuple = dataIterator.next(); 204 205 // Serialize the key 206 byte[] bytesKey = btree.getKeySerializer().serialize( tuple.key ); 207 fos.write( IntSerializer.serialize( bytesKey.length ) ); 208 fos.write( bytesKey ); 209 210 // Serialize the number of values 211 int nbValues = tuple.getValue().size(); 212 fos.write( IntSerializer.serialize( nbValues ) ); 213 214 // Serialize the values 215 for ( V value : tuple.getValue() ) 216 { 217 byte[] bytesValue = btree.getValueSerializer().serialize( value ); 218 219 // Serialize the value 220 fos.write( IntSerializer.serialize( bytesValue.length ) ); 221 fos.write( bytesValue ); 222 } 223 } 224 225 fos.flush(); 226 fos.close(); 227 228 FileInputStream fis = new FileInputStream( file ); 229 Iterator<Tuple<K, Set<V>>> uniqueIterator = createUniqueFileIterator( btree, fis ); 230 SortedFile sortedFile = new SortedFile( file, nbReads ); 231 232 Tuple<Iterator<Tuple<K, Set<V>>>, SortedFile> result = new Tuple<Iterator<Tuple<K, Set<V>>>, SortedFile>( 233 uniqueIterator, sortedFile ); 234 235 return result; 236 } 237 238 239 /** 240 * Bulk Load data into a persisted BTree 241 * 242 * @param btree The persisted BTree in which we want to load the data 243 * @param iterator The iterator over the data to bulkload 244 * @param chunkSize The number of elements we may store in memory at each iteration 245 * @throws IOException If there is a problem while processing the data 246 */ 247 public static <K, V> BTree<K, V> load( BTree<K, V> btree, Iterator<Tuple<K, V>> iterator, int chunkSize ) 248 throws IOException 249 { 250 if ( btree == null ) 251 { 252 throw new RuntimeException( "Invalid BTree : it's null" ); 253 } 254 255 if ( iterator == null ) 256 { 257 // Nothing to do... 258 return null; 259 } 260 261 // Iterate through the elements by chunk 262 boolean inMemory = true; 263 264 // The list of files we will use to store the sorted chunks 265 List<File> sortedFiles = new ArrayList<File>(); 266 267 // An array of chukSize tuple max 268 List<Tuple<K, V>> tuples = new ArrayList<Tuple<K, V>>( chunkSize ); 269 270 // Now, start to read all the tuples to sort them. We may use intermediate files 271 // for that purpose if we hit the threshold. 272 int nbElems = readElements( btree, iterator, sortedFiles, tuples, chunkSize ); 273 274 // If the tuple list is empty, we have to process the load based on files, not in memory 275 if ( nbElems > 0 ) 276 { 277 inMemory = tuples.size() > 0; 278 } 279 280 // Now that we have processed all the data, we can start storing them in the btree 281 Iterator<Tuple<K, Set<V>>> dataIterator = null; 282 FileInputStream[] streams = null; 283 BTree<K, V> resultBTree = null; 284 285 if ( inMemory ) 286 { 287 // Here, we have all the data in memory, no need to merge files 288 // We will build a simple iterator over the data 289 dataIterator = createTupleIterator( btree, tuples ); 290 resultBTree = bulkLoad( btree, dataIterator, nbElems ); 291 } 292 else 293 { 294 // We first have to build an iterator over the files 295 int nbFiles = sortedFiles.size(); 296 streams = new FileInputStream[nbFiles]; 297 298 for ( int i = 0; i < nbFiles; i++ ) 299 { 300 streams[i] = new FileInputStream( sortedFiles.get( i ) ); 301 } 302 303 dataIterator = createIterator( btree, streams ); 304 305 // Process the files, and construct one single file with an iterator 306 Tuple<Iterator<Tuple<K, Set<V>>>, SortedFile> result = processFiles( btree, dataIterator ); 307 resultBTree = bulkLoad( btree, result.key, result.value.nbValues ); 308 result.value.file.delete(); 309 } 310 311 // Ok, we have an iterator over sorted elements, we can now load them in the 312 // target btree. 313 // Now, close the FileInputStream, and delete them if we have some 314 if ( !inMemory ) 315 { 316 int nbFiles = sortedFiles.size(); 317 318 for ( int i = 0; i < nbFiles; i++ ) 319 { 320 streams[i].close(); 321 sortedFiles.get( i ).delete(); 322 } 323 } 324 325 return resultBTree; 326 } 327 328 329 /** 330 * Creates a node leaf LevelInfo based on the number of elements in the lower level. We can store 331 * up to PageSize + 1 references to pages in a node. 332 */ 333 /* no qualifier*/static <K, V> LevelInfo<K, V> computeLevel( BTree<K, V> btree, int nbElems, LevelEnum levelType ) 334 { 335 int pageSize = btree.getPageSize(); 336 int incrementNode = 0; 337 338 if ( levelType == LevelEnum.NODE ) 339 { 340 incrementNode = 1; 341 } 342 343 LevelInfo<K, V> level = new LevelInfo<K, V>(); 344 level.setType( ( levelType == LevelEnum.NODE ) ); 345 level.setNbElems( nbElems ); 346 level.setNbPages( nbElems / ( pageSize + incrementNode ) ); 347 level.setLevelNumber( 0 ); 348 level.setNbAddedElems( 0 ); 349 level.setCurrentPos( 0 ); 350 351 // Create the first level page 352 if ( nbElems <= pageSize + incrementNode ) 353 { 354 if ( nbElems % ( pageSize + incrementNode ) != 0 ) 355 { 356 level.setNbPages( 1 ); 357 } 358 359 level.setNbElemsLimit( nbElems ); 360 361 if ( level.isNode() ) 362 { 363 level.setCurrentPage( BTreeFactory.createNode( btree, 0L, nbElems - 1 ) ); 364 } 365 else 366 { 367 level.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, nbElems ) ); 368 } 369 } 370 else 371 { 372 int remaining = nbElems % ( pageSize + incrementNode ); 373 374 if ( remaining == 0 ) 375 { 376 level.setNbElemsLimit( nbElems ); 377 378 if ( level.isNode() ) 379 { 380 level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize ) ); 381 } 382 else 383 { 384 level.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, pageSize ) ); 385 } 386 } 387 else 388 { 389 level.incNbPages(); 390 391 if ( remaining < ( pageSize / 2 ) + incrementNode ) 392 { 393 level.setNbElemsLimit( nbElems - remaining - ( pageSize + incrementNode ) ); 394 395 if ( level.getNbElemsLimit() > 0 ) 396 { 397 if ( level.isNode() ) 398 { 399 level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize ) ); 400 } 401 else 402 { 403 level.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, pageSize ) ); 404 } 405 } 406 else 407 { 408 if ( level.isNode() ) 409 { 410 level 411 .setCurrentPage( BTreeFactory.createNode( btree, 0L, ( pageSize / 2 ) + remaining - 1 ) ); 412 } 413 else 414 { 415 level.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, ( pageSize / 2 ) + remaining ) ); 416 } 417 } 418 } 419 else 420 { 421 level.setNbElemsLimit( nbElems - remaining ); 422 423 if ( level.isNode() ) 424 { 425 level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize ) ); 426 } 427 else 428 { 429 level.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, pageSize ) ); 430 } 431 } 432 } 433 } 434 435 return level; 436 } 437 438 439 /** 440 * Compute the number of pages necessary to store all the elements per level. The resulting list is 441 * reversed ( ie the leaves are on the left, the root page on the right. 442 */ 443 /* No Qualifier */static <K, V> List<LevelInfo<K, V>> computeLevels( BTree<K, V> btree, int nbElems ) 444 { 445 List<LevelInfo<K, V>> levelList = new ArrayList<LevelInfo<K, V>>(); 446 447 // Compute the leaves info 448 LevelInfo<K, V> leafLevel = computeLevel( btree, nbElems, LevelEnum.LEAF ); 449 450 levelList.add( leafLevel ); 451 int nbPages = leafLevel.getNbPages(); 452 int levelNumber = 1; 453 454 while ( nbPages > 1 ) 455 { 456 // Compute the Nodes info 457 LevelInfo<K, V> nodeLevel = computeLevel( btree, nbPages, LevelEnum.NODE ); 458 nodeLevel.setLevelNumber( levelNumber++ ); 459 levelList.add( nodeLevel ); 460 nbPages = nodeLevel.getNbPages(); 461 } 462 463 return levelList; 464 } 465 466 467 /** 468 * Inject a tuple into a leaf 469 */ 470 private static <K, V> void injectInLeaf( BTree<K, V> btree, Tuple<K, Set<V>> tuple, LevelInfo<K, V> leafLevel ) 471 { 472 PersistedLeaf<K, V> leaf = ( PersistedLeaf<K, V> ) leafLevel.getCurrentPage(); 473 474 KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), tuple.getKey() ); 475 leaf.setKey( leafLevel.getCurrentPos(), keyHolder ); 476 477 if ( btree.getType() != BTreeTypeEnum.PERSISTED_SUB ) 478 { 479 ValueHolder<V> valueHolder = new PersistedValueHolder<V>( btree, ( V[] ) tuple.getValue().toArray() ); 480 leaf.setValue( leafLevel.getCurrentPos(), valueHolder ); 481 } 482 483 leafLevel.incCurrentPos(); 484 } 485 486 487 private static <K, V> int computeNbElemsLeaf( BTree<K, V> btree, LevelInfo<K, V> levelInfo ) 488 { 489 int pageSize = btree.getPageSize(); 490 int remaining = levelInfo.getNbElems() - levelInfo.getNbAddedElems(); 491 492 if ( remaining < pageSize ) 493 { 494 return remaining; 495 } 496 else if ( remaining == pageSize ) 497 { 498 return pageSize; 499 } 500 else if ( remaining > levelInfo.getNbElems() - levelInfo.getNbElemsLimit() ) 501 { 502 return pageSize; 503 } 504 else 505 { 506 return remaining - pageSize / 2; 507 } 508 } 509 510 511 /** 512 * Compute the number of nodes necessary to store all the elements. 513 */ 514 /* No qualifier */int computeNbElemsNode( BTree<K, V> btree, LevelInfo<K, V> levelInfo ) 515 { 516 int pageSize = btree.getPageSize(); 517 int remaining = levelInfo.getNbElems() - levelInfo.getNbAddedElems(); 518 519 if ( remaining < pageSize + 1 ) 520 { 521 return remaining; 522 } 523 else if ( remaining == pageSize + 1 ) 524 { 525 return pageSize + 1; 526 } 527 else if ( remaining > levelInfo.getNbElems() - levelInfo.getNbElemsLimit() ) 528 { 529 return pageSize + 1; 530 } 531 else 532 { 533 return remaining - pageSize / 2; 534 } 535 } 536 537 538 /** 539 * Inject a page reference into the root page. 540 */ 541 private static <K, V> void injectInRoot( BTree<K, V> btree, Page<K, V> page, PageHolder<K, V> pageHolder, 542 LevelInfo<K, V> level ) throws IOException 543 { 544 PersistedNode<K, V> node = ( PersistedNode<K, V> ) level.getCurrentPage(); 545 546 if ( ( level.getCurrentPos() == 0 ) && ( node.getPage( 0 ) == null ) ) 547 { 548 node.setPageHolder( 0, pageHolder ); 549 level.incNbAddedElems(); 550 } 551 else 552 { 553 // Inject the pageHolder and the page leftmost key 554 node.setPageHolder( level.getCurrentPos() + 1, pageHolder ); 555 KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), page.getLeftMostKey() ); 556 node.setKey( level.getCurrentPos(), keyHolder ); 557 level.incCurrentPos(); 558 level.incNbAddedElems(); 559 560 // Check that we haven't added the last element. If so, 561 // we have to write the page on disk and update the btree 562 if ( level.getNbAddedElems() == level.getNbElems() ) 563 { 564 PageHolder<K, V> rootHolder = ( ( PersistedBTree<K, V> ) btree ).getRecordManager().writePage( 565 btree, node, 0L ); 566 ( ( PersistedBTree<K, V> ) btree ).setRootPage( rootHolder.getValue() ); 567 } 568 } 569 570 return; 571 } 572 573 574 /** 575 * Inject a page reference into a Node. This method will recurse if needed. 576 */ 577 private static <K, V> void injectInNode( BTree<K, V> btree, Page<K, V> page, List<LevelInfo<K, V>> levels, 578 int levelIndex ) 579 throws IOException 580 { 581 int pageSize = btree.getPageSize(); 582 LevelInfo<K, V> level = levels.get( levelIndex ); 583 PersistedNode<K, V> node = ( PersistedNode<K, V> ) level.getCurrentPage(); 584 585 // We first have to write the page on disk 586 PageHolder<K, V> pageHolder = ( ( PersistedBTree<K, V> ) btree ).getRecordManager().writePage( btree, page, 0L ); 587 588 // First deal with a node that has less than PageSize elements at this level. 589 // It will become the root node. 590 if ( level.getNbElems() <= pageSize + 1 ) 591 { 592 injectInRoot( btree, page, pageHolder, level ); 593 594 return; 595 } 596 597 // Now, we have some parent node. We process the 3 different use case : 598 // o Full node before the limit 599 // o Node over the limit but with at least N/2 elements 600 // o Node over the limit but with elements spread into 2 nodes 601 if ( level.getNbAddedElems() < level.getNbElemsLimit() ) 602 { 603 // Ok, we haven't yet reached the incomplete pages (if any). 604 // Let's add the page reference into the node 605 // There is one special case : when we are adding the very first page 606 // reference into a node. In this case, we don't store the key 607 if ( ( level.getCurrentPos() == 0 ) && ( node.getKey( 0 ) == null ) ) 608 { 609 node.setPageHolder( 0, pageHolder ); 610 } 611 else 612 { 613 // Inject the pageHolder and the page leftmost key 614 node.setPageHolder( level.getCurrentPos(), pageHolder ); 615 KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), page.getLeftMostKey() ); 616 node.setKey( level.getCurrentPos() - 1, keyHolder ); 617 } 618 619 // Now, increment this level nb of added elements 620 level.incCurrentPos(); 621 level.incNbAddedElems(); 622 623 // Check that we haven't added the last element. If so, 624 // we have to write the page on disk and update the parent's node 625 if ( level.getNbAddedElems() == level.getNbElems() ) 626 { 627 //PageHolder<K, V> rootHolder = ( ( PersistedBTree<K, V> ) btree ).getRecordManager().writePage( 628 // btree, node, 0L ); 629 //( ( PersistedBTree<K, V> ) btree ).setRootPage( rootHolder.getValue() ); 630 injectInNode( btree, node, levels, levelIndex + 1 ); 631 632 return; 633 } 634 else 635 { 636 // Check that we haven't completed the current node, and that this is not the root node. 637 if ( ( level.getCurrentPos() == pageSize + 1 ) && ( level.getLevelNumber() < levels.size() - 1 ) ) 638 { 639 // yes. We have to write the node on disk, update its parent 640 // and create a new current node 641 injectInNode( btree, node, levels, levelIndex + 1 ); 642 643 // The page is full, we have to create a new one, with a size depending on the remaining elements 644 if ( level.getNbAddedElems() < level.getNbElemsLimit() ) 645 { 646 // We haven't reached the limit, create a new full node 647 level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize ) ); 648 } 649 else if ( level.getNbElems() - level.getNbAddedElems() <= pageSize ) 650 { 651 level.setCurrentPage( BTreeFactory.createNode( btree, 0L, 652 level.getNbElems() - level.getNbAddedElems() - 1 ) ); 653 } 654 else 655 { 656 level.setCurrentPage( BTreeFactory.createNode( btree, 0L, ( level.getNbElems() - 1 ) 657 - ( level.getNbAddedElems() + 1 ) - pageSize / 2 ) ); 658 } 659 660 level.setCurrentPos( 0 ); 661 } 662 } 663 664 return; 665 } 666 else 667 { 668 // We are dealing with the last page or the last two pages 669 // We can have either one single pages which can contain up to pageSize-1 elements 670 // or with two pages, the first one containing ( nbElems - limit ) - pageSize/2 elements 671 // and the second one will contain pageSize/2 elements. 672 if ( level.getNbElems() - level.getNbElemsLimit() > pageSize ) 673 { 674 // As the remaining elements are above a page size, they will be spread across 675 // two pages. We have two cases here, depending on the page we are filling 676 if ( level.getNbElems() - level.getNbAddedElems() <= pageSize / 2 + 1 ) 677 { 678 // As we have less than PageSize/2 elements to write, we are on the second page 679 if ( ( level.getCurrentPos() == 0 ) && ( node.getKey( 0 ) == null ) ) 680 { 681 node.setPageHolder( 0, pageHolder ); 682 } 683 else 684 { 685 // Inject the pageHolder and the page leftmost key 686 node.setPageHolder( level.getCurrentPos(), pageHolder ); 687 KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), 688 page.getLeftMostKey() ); 689 node.setKey( level.getCurrentPos() - 1, keyHolder ); 690 } 691 692 // Now, increment this level nb of added elements 693 level.incCurrentPos(); 694 level.incNbAddedElems(); 695 696 // Check if we are done with the page 697 if ( level.getNbAddedElems() == level.getNbElems() ) 698 { 699 // Yes, we have to update the parent 700 injectInNode( btree, node, levels, levelIndex + 1 ); 701 } 702 } 703 else 704 { 705 // This is the first page 706 if ( ( level.getCurrentPos() == 0 ) && ( node.getKey( 0 ) == null ) ) 707 { 708 // First element of the page 709 node.setPageHolder( 0, pageHolder ); 710 } 711 else 712 { 713 // Any other following elements 714 // Inject the pageHolder and the page leftmost key 715 node.setPageHolder( level.getCurrentPos(), pageHolder ); 716 KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), 717 page.getLeftMostKey() ); 718 node.setKey( level.getCurrentPos() - 1, keyHolder ); 719 } 720 721 // Now, increment this level nb of added elements 722 level.incCurrentPos(); 723 level.incNbAddedElems(); 724 725 // Check if we are done with the page 726 if ( level.getCurrentPos() == node.getNbElems() + 1 ) 727 { 728 // Yes, we have to update the parent 729 injectInNode( btree, node, levels, levelIndex + 1 ); 730 731 // An create a new one 732 level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize / 2 ) ); 733 level.setCurrentPos( 0 ); 734 } 735 } 736 } 737 else 738 { 739 // Two cases : we don't have anything else to write, or this is a single page 740 if ( level.getNbAddedElems() == level.getNbElems() ) 741 { 742 // We are done with the page 743 injectInNode( btree, node, levels, levelIndex + 1 ); 744 } 745 else 746 { 747 // We have some more elements to add in the page 748 // This is the first page 749 if ( ( level.getCurrentPos() == 0 ) && ( node.getKey( 0 ) == null ) ) 750 { 751 // First element of the page 752 node.setPageHolder( 0, pageHolder ); 753 } 754 else 755 { 756 // Any other following elements 757 // Inject the pageHolder and the page leftmost key 758 node.setPageHolder( level.getCurrentPos(), pageHolder ); 759 KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), 760 page.getLeftMostKey() ); 761 node.setKey( level.getCurrentPos() - 1, keyHolder ); 762 } 763 764 // Now, increment this level nb of added elements 765 level.incCurrentPos(); 766 level.incNbAddedElems(); 767 768 // Check if we are done with the page 769 if ( level.getCurrentPos() == node.getNbElems() + 1 ) 770 { 771 // Yes, we have to update the parent 772 injectInNode( btree, node, levels, levelIndex + 1 ); 773 774 // An create a new one 775 level.setCurrentPage( BTreeFactory.createNode( btree, 0L, pageSize / 2 ) ); 776 level.setCurrentPos( 0 ); 777 } 778 } 779 780 return; 781 } 782 } 783 } 784 785 786 private static <K, V> BTree<K, V> bulkLoadSinglePage( BTree<K, V> btree, Iterator<Tuple<K, Set<V>>> dataIterator, 787 int nbElems ) throws IOException 788 { 789 // Use the root page 790 Page<K, V> rootPage = btree.getRootPage(); 791 792 // Initialize the root page 793 ( ( AbstractPage<K, V> ) rootPage ).setNbElems( nbElems ); 794 KeyHolder<K>[] keys = new KeyHolder[nbElems]; 795 ValueHolder<V>[] values = new ValueHolder[nbElems]; 796 797 switch ( btree.getType() ) 798 { 799 case IN_MEMORY: 800 ( ( InMemoryLeaf<K, V> ) rootPage ).values = values; 801 break; 802 803 default: 804 ( ( PersistedLeaf<K, V> ) rootPage ).values = values; 805 } 806 807 // We first have to inject data into the page 808 int pos = 0; 809 810 while ( dataIterator.hasNext() ) 811 { 812 Tuple<K, Set<V>> tuple = dataIterator.next(); 813 814 // Store the current element in the rootPage 815 KeyHolder<K> keyHolder = new PersistedKeyHolder<K>( btree.getKeySerializer(), tuple.getKey() ); 816 keys[pos] = keyHolder; 817 818 switch ( btree.getType() ) 819 { 820 case IN_MEMORY: 821 ValueHolder<V> valueHolder = new InMemoryValueHolder<V>( btree, ( V[] ) tuple.getValue() 822 .toArray() ); 823 ( ( InMemoryLeaf<K, V> ) rootPage ).values[pos] = valueHolder; 824 break; 825 826 default: 827 valueHolder = new PersistedValueHolder<V>( btree, ( V[] ) tuple.getValue() 828 .toArray() ); 829 ( ( PersistedLeaf<K, V> ) rootPage ).values[pos] = valueHolder; 830 } 831 832 pos++; 833 } 834 835 // Update the rootPage 836 ( ( AbstractPage<K, V> ) rootPage ).setKeys( keys ); 837 838 // Update the btree with the nb of added elements, and write it$ 839 BTreeHeader<K, V> btreeHeader = ( ( AbstractBTree<K, V> ) btree ).getBtreeHeader(); 840 btreeHeader.setNbElems( nbElems ); 841 842 return btree; 843 } 844 845 846 /** 847 * Construct the target BTree from the sorted data. We will use the nb of elements 848 * to determinate the structure of the BTree, as it must be balanced 849 */ 850 private static <K, V> BTree<K, V> bulkLoad( BTree<K, V> btree, Iterator<Tuple<K, Set<V>>> dataIterator, int nbElems ) 851 throws IOException 852 { 853 int pageSize = btree.getPageSize(); 854 855 // Special case : we can store all the element sin a single page 856 if ( nbElems <= pageSize ) 857 { 858 return bulkLoadSinglePage( btree, dataIterator, nbElems ); 859 } 860 861 // Ok, we will need more than one page to store the elements, which 862 // means we also will need more than one level. 863 // First, compute the needed number of levels. 864 List<LevelInfo<K, V>> levels = computeLevels( btree, nbElems ); 865 866 // Now, let's fill the levels 867 LevelInfo<K, V> leafLevel = levels.get( 0 ); 868 869 while ( dataIterator.hasNext() ) 870 { 871 // let's fill page up to the point all the complete pages have been filled 872 if ( leafLevel.getNbAddedElems() < leafLevel.getNbElemsLimit() ) 873 { 874 // grab a tuple 875 Tuple<K, Set<V>> tuple = dataIterator.next(); 876 877 injectInLeaf( btree, tuple, leafLevel ); 878 leafLevel.incNbAddedElems(); 879 880 // The page is completed, update the parent's node and create a new current page 881 if ( leafLevel.getCurrentPos() == pageSize ) 882 { 883 injectInNode( btree, leafLevel.getCurrentPage(), levels, 1 ); 884 885 // The page is full, we have to create a new one 886 leafLevel.setCurrentPage( BTreeFactory 887 .createLeaf( btree, 0L, computeNbElemsLeaf( btree, leafLevel ) ) ); 888 leafLevel.setCurrentPos( 0 ); 889 } 890 } 891 else 892 { 893 // We have to deal with uncompleted pages now (if we have any) 894 if ( leafLevel.getNbAddedElems() == nbElems ) 895 { 896 // First use case : we have injected all the elements in the btree : get out 897 break; 898 } 899 900 if ( nbElems - leafLevel.getNbElemsLimit() > pageSize ) 901 { 902 // Second use case : the number of elements after the limit does not 903 // fit in a page, that means we have to split it into 904 // two pages 905 906 // First page will contain nbElems - leafLevel.nbElemsLimit - PageSize/2 elements 907 int nbToAdd = nbElems - leafLevel.getNbElemsLimit() - pageSize / 2; 908 909 while ( nbToAdd > 0 ) 910 { 911 // grab a tuple 912 Tuple<K, Set<V>> tuple = dataIterator.next(); 913 914 injectInLeaf( btree, tuple, leafLevel ); 915 leafLevel.incNbAddedElems(); 916 nbToAdd--; 917 } 918 919 // Now inject the page into the node 920 Page<K, V> currentPage = leafLevel.getCurrentPage(); 921 injectInNode( btree, currentPage, levels, 1 ); 922 923 // Create a new page for the remaining elements 924 nbToAdd = pageSize / 2; 925 leafLevel.setCurrentPage( BTreeFactory.createLeaf( btree, 0L, nbToAdd ) ); 926 leafLevel.setCurrentPos( 0 ); 927 928 while ( nbToAdd > 0 ) 929 { 930 // grab a tuple 931 Tuple<K, Set<V>> tuple = dataIterator.next(); 932 933 injectInLeaf( btree, tuple, leafLevel ); 934 leafLevel.incNbAddedElems(); 935 nbToAdd--; 936 } 937 938 // And update the parent node 939 Page<K, V> levelCurrentPage = leafLevel.getCurrentPage(); 940 injectInNode( btree, levelCurrentPage, levels, 1 ); 941 942 // We are done 943 break; 944 } 945 else 946 { 947 // Third use case : we can push all the elements in the last page. 948 // Let's do it 949 int nbToAdd = nbElems - leafLevel.getNbElemsLimit(); 950 951 while ( nbToAdd > 0 ) 952 { 953 // grab a tuple 954 Tuple<K, Set<V>> tuple = dataIterator.next(); 955 956 injectInLeaf( btree, tuple, leafLevel ); 957 leafLevel.incNbAddedElems(); 958 nbToAdd--; 959 } 960 961 // Now inject the page into the node 962 injectInNode( btree, leafLevel.getCurrentPage(), levels, 1 ); 963 964 // and we are done 965 break; 966 } 967 } 968 } 969 970 // Update the btree with the nb of added elements, and write it$ 971 BTreeHeader<K, V> btreeHeader = ( ( AbstractBTree<K, V> ) btree ).getBtreeHeader(); 972 btreeHeader.setNbElems( nbElems ); 973 974 return btree; 975 } 976 977 978 /** 979 * Flush a list of tuples to disk after having sorted them. In the process, we may have to gather the values 980 * for the tuples having the same keys. 981 * @throws IOException 982 */ 983 private static <K, V> File flushToDisk( int fileNb, List<Tuple<K, V>> tuples, BTree<K, V> btree ) 984 throws IOException 985 { 986 // Sort the tuples. 987 Tuple<K, Set<V>>[] sortedTuples = sort( btree, tuples ); 988 989 File file = File.createTempFile( "sorted", Integer.toString( fileNb ) ); 990 file.deleteOnExit(); 991 FileOutputStream fos = new FileOutputStream( file ); 992 993 // Flush the tuples on disk 994 for ( Tuple<K, Set<V>> tuple : sortedTuples ) 995 { 996 // Serialize the key 997 byte[] bytesKey = btree.getKeySerializer().serialize( tuple.key ); 998 fos.write( IntSerializer.serialize( bytesKey.length ) ); 999 fos.write( bytesKey ); 1000 1001 // Serialize the number of values 1002 int nbValues = tuple.getValue().size(); 1003 fos.write( IntSerializer.serialize( nbValues ) ); 1004 1005 // Serialize the values 1006 for ( V value : tuple.getValue() ) 1007 { 1008 byte[] bytesValue = btree.getValueSerializer().serialize( value ); 1009 1010 // Serialize the value 1011 fos.write( IntSerializer.serialize( bytesValue.length ) ); 1012 fos.write( bytesValue ); 1013 } 1014 } 1015 1016 fos.flush(); 1017 fos.close(); 1018 1019 return file; 1020 } 1021 1022 1023 /** 1024 * Sort a list of tuples, eliminating the duplicate keys and storing the values in a set when we 1025 * have a duplicate key 1026 */ 1027 private static <K, V> Tuple<K, Set<V>>[] sort( BTree<K, V> btree, List<Tuple<K, V>> tuples ) 1028 { 1029 Comparator<Tuple<K, Set<V>>> tupleComparator = new TupleComparator( btree.getKeyComparator(), btree 1030 .getValueComparator() ); 1031 1032 // Sort the list 1033 Tuple<K, V>[] tuplesArray = ( Tuple<K, V>[] ) tuples.toArray( new Tuple[] 1034 {} ); 1035 1036 // First, eliminate the equals keys. We use a map for that 1037 Map<K, Set<V>> mapTuples = new HashMap<K, Set<V>>(); 1038 1039 for ( Tuple<K, V> tuple : tuplesArray ) 1040 { 1041 // Is the key present in the map ? 1042 Set<V> foundSet = mapTuples.get( tuple.key ); 1043 1044 if ( foundSet != null ) 1045 { 1046 // We already have had such a key, add the value to the existing key 1047 foundSet.add( tuple.value ); 1048 } 1049 else 1050 { 1051 // No such key present in the map : create a new set to store the values, 1052 // and add it in the map associated with the new key 1053 Set<V> set = new TreeSet<V>(); 1054 set.add( tuple.value ); 1055 mapTuples.put( tuple.key, set ); 1056 } 1057 } 1058 1059 // Now, sort the map, by extracting all the key/values from the map 1060 int size = mapTuples.size(); 1061 Tuple<K, Set<V>>[] sortedTuples = new Tuple[size]; 1062 int pos = 0; 1063 1064 // We create an array containing all the elements 1065 for ( Map.Entry<K, Set<V>> entry : mapTuples.entrySet() ) 1066 { 1067 sortedTuples[pos] = new Tuple<K, Set<V>>(); 1068 sortedTuples[pos].key = entry.getKey(); 1069 sortedTuples[pos].value = entry.getValue(); 1070 pos++; 1071 } 1072 1073 // And we sort the array 1074 Arrays.sort( sortedTuples, tupleComparator ); 1075 1076 return sortedTuples; 1077 } 1078 1079 1080 /** 1081 * Build an iterator over an array of sorted tuples, in memory 1082 */ 1083 private static <K, V> Iterator<Tuple<K, Set<V>>> createTupleIterator( BTree<K, V> btree, List<Tuple<K, V>> tuples ) 1084 { 1085 final Tuple<K, Set<V>>[] sortedTuples = sort( btree, tuples ); 1086 1087 Iterator<Tuple<K, Set<V>>> tupleIterator = new Iterator<Tuple<K, Set<V>>>() 1088 { 1089 private int pos = 0; 1090 1091 1092 @Override 1093 public Tuple<K, Set<V>> next() 1094 { 1095 // Return the current tuple, if any 1096 if ( pos < sortedTuples.length ) 1097 { 1098 Tuple<K, Set<V>> tuple = sortedTuples[pos]; 1099 pos++; 1100 1101 return tuple; 1102 } 1103 1104 return null; 1105 } 1106 1107 1108 @Override 1109 public boolean hasNext() 1110 { 1111 return pos < sortedTuples.length; 1112 } 1113 1114 1115 @Override 1116 public void remove() 1117 { 1118 } 1119 }; 1120 1121 return tupleIterator; 1122 } 1123 1124 1125 private static <K, V> Tuple<K, Set<V>> fetchTuple( BTree<K, V> btree, FileInputStream fis ) 1126 { 1127 try 1128 { 1129 if ( fis.available() == 0 ) 1130 { 1131 return null; 1132 } 1133 1134 Tuple<K, Set<V>> tuple = new Tuple<K, Set<V>>(); 1135 tuple.value = new TreeSet<V>(); 1136 1137 byte[] intBytes = new byte[4]; 1138 1139 // Read the key length 1140 fis.read( intBytes ); 1141 int keyLength = IntSerializer.deserialize( intBytes ); 1142 1143 // Read the key 1144 byte[] keyBytes = new byte[keyLength]; 1145 fis.read( keyBytes ); 1146 K key = btree.getKeySerializer().fromBytes( keyBytes ); 1147 tuple.key = key; 1148 1149 // get the number of values 1150 fis.read( intBytes ); 1151 int nbValues = IntSerializer.deserialize( intBytes ); 1152 1153 // Serialize the values 1154 for ( int i = 0; i < nbValues; i++ ) 1155 { 1156 // Read the value length 1157 fis.read( intBytes ); 1158 int valueLength = IntSerializer.deserialize( intBytes ); 1159 1160 // Read the value 1161 byte[] valueBytes = new byte[valueLength]; 1162 fis.read( valueBytes ); 1163 V value = btree.getValueSerializer().fromBytes( valueBytes ); 1164 tuple.value.add( value ); 1165 } 1166 1167 return tuple; 1168 } 1169 catch ( IOException ioe ) 1170 { 1171 return null; 1172 } 1173 } 1174 1175 1176 /** 1177 * Build an iterator over an array of sorted tuples, from files on the disk 1178 * @throws FileNotFoundException 1179 */ 1180 private static <K, V> Iterator<Tuple<K, Set<V>>> createIterator( final BTree<K, V> btree, 1181 final FileInputStream[] streams ) 1182 throws FileNotFoundException 1183 { 1184 // The number of files we have to read from 1185 final int nbFiles = streams.length; 1186 1187 // We will read only one element at a time from each file 1188 final Tuple<K, Set<V>>[] readTuples = new Tuple[nbFiles]; 1189 final TreeMap<Tuple<K, Integer>, Set<V>> candidates = 1190 new TreeMap<Tuple<K, Integer>, Set<V>>( 1191 new TupleComparator<K, Integer>( btree.getKeyComparator(), IntComparator.INSTANCE ) ); 1192 1193 // Read the tuple from each files 1194 for ( int i = 0; i < nbFiles; i++ ) 1195 { 1196 while ( true ) 1197 { 1198 readTuples[i] = fetchTuple( btree, streams[i] ); 1199 1200 if ( readTuples[i] != null ) 1201 { 1202 Tuple<K, Integer> candidate = new Tuple<K, Integer>( readTuples[i].key, i, btree.getKeySerializer() 1203 .getComparator() ); 1204 1205 if ( !candidates.containsKey( candidate ) ) 1206 { 1207 candidates.put( candidate, readTuples[i].getValue() ); 1208 break; 1209 } 1210 else 1211 { 1212 // We have to merge the pulled tuple with the existing one, and read one more tuple 1213 Set<V> oldValues = candidates.get( candidate ); 1214 oldValues.addAll( readTuples[i].getValue() ); 1215 } 1216 } 1217 else 1218 { 1219 break; 1220 } 1221 } 1222 } 1223 1224 Iterator<Tuple<K, Set<V>>> tupleIterator = new Iterator<Tuple<K, Set<V>>>() 1225 { 1226 @Override 1227 public Tuple<K, Set<V>> next() 1228 { 1229 // Get the first candidate 1230 Tuple<K, Integer> tupleCandidate = candidates.firstKey(); 1231 1232 // Remove it from the set 1233 candidates.remove( tupleCandidate ); 1234 1235 // Get the the next tuple from the stream we just got the tuple from 1236 Tuple<K, Set<V>> tuple = readTuples[tupleCandidate.value]; 1237 1238 // fetch the next tuple from the file we just read teh candidate from 1239 while ( true ) 1240 { 1241 // fetch it from the disk and store it into its reader 1242 readTuples[tupleCandidate.value] = fetchTuple( btree, streams[tupleCandidate.value] ); 1243 1244 if ( readTuples[tupleCandidate.value] == null ) 1245 { 1246 // No more tuple for this file 1247 break; 1248 } 1249 1250 if ( readTuples[tupleCandidate.value] != null ) 1251 { 1252 // And store it into the candidate set 1253 Set<V> oldValues = candidates.get( readTuples[tupleCandidate.value] ); 1254 1255 if ( oldValues != null ) 1256 { 1257 // We already have another element with the same key, merge them 1258 oldValues.addAll( readTuples[tupleCandidate.value].value ); 1259 } 1260 else 1261 { 1262 Tuple<K, Integer> newTuple = new Tuple<K, Integer>( readTuples[tupleCandidate.value].key, 1263 tupleCandidate.value ); 1264 candidates.put( newTuple, readTuples[tupleCandidate.value].getValue() ); 1265 1266 // and exit the loop 1267 break; 1268 } 1269 } 1270 } 1271 1272 // We can now return the found value 1273 return tuple; 1274 } 1275 1276 1277 @Override 1278 public boolean hasNext() 1279 { 1280 // Check that we have at least one element to read 1281 return !candidates.isEmpty(); 1282 } 1283 1284 1285 @Override 1286 public void remove() 1287 { 1288 } 1289 1290 }; 1291 1292 return tupleIterator; 1293 } 1294 1295 1296 /** 1297 * Build an iterator over an array of sorted tuples, from files on the disk 1298 * @throws FileNotFoundException 1299 */ 1300 private static <K, V> Iterator<Tuple<K, Set<V>>> createUniqueFileIterator( final BTree<K, V> btree, 1301 final FileInputStream stream ) 1302 throws FileNotFoundException 1303 { 1304 Iterator<Tuple<K, Set<V>>> tupleIterator = new Iterator<Tuple<K, Set<V>>>() 1305 { 1306 boolean hasNext = true; 1307 1308 1309 @Override 1310 public Tuple<K, Set<V>> next() 1311 { 1312 // Get the tuple from the stream 1313 Tuple<K, Set<V>> tuple = fetchTuple( btree, stream ); 1314 1315 // We can now return the found value 1316 return tuple; 1317 } 1318 1319 1320 @Override 1321 public boolean hasNext() 1322 { 1323 // Check that we have at least one element to read 1324 try 1325 { 1326 return stream.available() > 0; 1327 } 1328 catch ( IOException e ) 1329 { 1330 return false; 1331 } 1332 } 1333 1334 1335 @Override 1336 public void remove() 1337 { 1338 } 1339 1340 }; 1341 1342 return tupleIterator; 1343 } 1344 1345 1346 /** 1347 * Compact a given persisted BTree, making it dense. All the values will be stored 1348 * in newly created pages, each one of them containing as much elements 1349 * as it's size. 1350 * </br> 1351 * The RecordManager will be stopped and restarted, do not use this method 1352 * on a running BTree. 1353 * 1354 * @param recordManager The associated recordManager 1355 * @param btree The BTree to compact 1356 */ 1357 public static void compact( RecordManager recordManager, BTree<?, ?> btree ) 1358 { 1359 1360 } 1361 1362 1363 /** 1364 * Compact a given in-memory BTree, making it dense. All the values will be stored 1365 * in newly created pages, each one of them containing as much elements 1366 * as it's size. 1367 * </br> 1368 * 1369 * @param btree The BTree to compact 1370 * @throws KeyNotFoundException 1371 * @throws IOException 1372 */ 1373 public static BTree<?, ?> compact( BTree<?, ?> btree ) throws IOException, KeyNotFoundException 1374 { 1375 // First, create a new BTree which will contain all the elements 1376 InMemoryBTreeConfiguration configuration = new InMemoryBTreeConfiguration(); 1377 configuration.setName( btree.getName() ); 1378 configuration.setPageSize( btree.getPageSize() ); 1379 configuration.setKeySerializer( btree.getKeySerializer() ); 1380 configuration.setValueSerializer( btree.getValueSerializer() ); 1381 configuration.setAllowDuplicates( btree.isAllowDuplicates() ); 1382 configuration.setReadTimeOut( btree.getReadTimeOut() ); 1383 configuration.setWriteBufferSize( btree.getWriteBufferSize() ); 1384 1385 File file = ( ( InMemoryBTree ) btree ).getFile(); 1386 1387 if ( file != null ) 1388 { 1389 configuration.setFilePath( file.getPath() ); 1390 } 1391 1392 // Create a new Btree Builder 1393 InMemoryBTreeBuilder btreeBuilder = new InMemoryBTreeBuilder( configuration ); 1394 1395 // Create a cursor over the existing btree 1396 final TupleCursor cursor = btree.browse(); 1397 1398 // Create an iterator that will iterate the existing btree 1399 Iterator<Tuple> tupleItr = new Iterator<Tuple>() 1400 { 1401 @Override 1402 public Tuple next() 1403 { 1404 try 1405 { 1406 return cursor.next(); 1407 } 1408 catch ( EndOfFileExceededException e ) 1409 { 1410 return null; 1411 } 1412 catch ( IOException e ) 1413 { 1414 return null; 1415 } 1416 } 1417 1418 1419 @Override 1420 public boolean hasNext() 1421 { 1422 try 1423 { 1424 return cursor.hasNext(); 1425 } 1426 catch ( EndOfFileExceededException e ) 1427 { 1428 return false; 1429 } 1430 catch ( IOException e ) 1431 { 1432 return false; 1433 } 1434 } 1435 1436 1437 @Override 1438 public void remove() 1439 { 1440 } 1441 }; 1442 1443 // And finally, compact the btree 1444 return btreeBuilder.build( tupleItr ); 1445 } 1446}