View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *  
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *  
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License. 
18   *  
19   */
20  package org.apache.directory.server.core.partition.impl.btree.jdbm;
21  
22  
23  import java.io.IOException;
24  import java.util.Comparator;
25  import java.util.Map;
26  
27  import jdbm.RecordManager;
28  import jdbm.btree.BTree;
29  import jdbm.helper.Serializer;
30  import jdbm.helper.Tuple;
31  import jdbm.helper.TupleBrowser;
32  
33  import org.apache.directory.api.ldap.model.cursor.Cursor;
34  import org.apache.directory.api.ldap.model.cursor.CursorException;
35  import org.apache.directory.api.ldap.model.cursor.EmptyCursor;
36  import org.apache.directory.api.ldap.model.cursor.SingletonCursor;
37  import org.apache.directory.api.ldap.model.exception.LdapException;
38  import org.apache.directory.api.ldap.model.exception.LdapOtherException;
39  import org.apache.directory.api.ldap.model.schema.SchemaManager;
40  import org.apache.directory.api.ldap.model.schema.comparators.SerializableComparator;
41  import org.apache.directory.api.util.Strings;
42  import org.apache.directory.api.util.SynchronizedLRUMap;
43  import org.apache.directory.server.core.api.partition.PartitionTxn;
44  import org.apache.directory.server.core.avltree.ArrayMarshaller;
45  import org.apache.directory.server.core.avltree.ArrayTree;
46  import org.apache.directory.server.core.avltree.ArrayTreeCursor;
47  import org.apache.directory.server.core.avltree.Marshaller;
48  import org.apache.directory.server.i18n.I18n;
49  import org.apache.directory.server.xdbm.AbstractTable;
50  import org.apache.directory.server.xdbm.KeyTupleArrayCursor;
51  import org.slf4j.Logger;
52  import org.slf4j.LoggerFactory;
53  
54  
55  /**
56   * A jdbm Btree wrapper that enables duplicate sorted keys using collections.
57   *
58   * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
59   */
60  public class JdbmTable<K, V> extends AbstractTable<K, V>
61  {
62      /** A logger for this class */
63      private static final Logger LOG = LoggerFactory.getLogger( JdbmTable.class );
64  
65      /** the JDBM record manager for the file this table is managed in */
66      private final RecordManager recMan;
67  
68      /** the wrappedCursor JDBM btree used in this Table */
69      private BTree<K, V> bt;
70  
71      /** the limit at which we start using btree redirection for duplicates */
72      private int numDupLimit = JdbmIndex.DEFAULT_DUPLICATE_LIMIT;
73  
74      /** a cache of duplicate BTrees */
75      private final Map<Long, BTree<K, V>> duplicateBtrees;
76  
77      /** A value serializer */
78      private final Serializer valueSerializer;
79  
80      /** A marshaller used to serialize/deserialize values stored in the Table */
81      Marshaller<ArrayTree<V>> marshaller;
82  
83  
84      // ------------------------------------------------------------------------
85      // C O N S T R U C T O R
86      // ------------------------------------------------------------------------
87  
88      /**
89       * Creates a Jdbm BTree based tuple Table abstraction that enables 
90       * duplicates.
91       *
92       * @param schemaManager The server schemaManager
93       * @param name the name of the table
94       * @param numDupLimit the size limit of duplicates before switching to BTrees for values instead of AvlTrees
95       * @param manager the record manager to be used for this table
96       * @param keyComparator a key comparator
97       * @param valueComparator a value comparator
98       * @param keySerializer a serializer to use for the keys instead of using
99       * default Java serialization which could be very expensive
100      * @param valueSerializer a serializer to use for the values instead of
101      * using default Java serialization which could be very expensive
102      * @throws IOException if the table's file cannot be created
103      */
104     @SuppressWarnings("unchecked")
105     public JdbmTable( SchemaManager schemaManager, String name, int numDupLimit, RecordManager manager,
106         Comparator<K> keyComparator, Comparator<V> valueComparator,
107         Serializer keySerializer, Serializer valueSerializer )
108         throws IOException
109     {
110         super( schemaManager, name, keyComparator, valueComparator );
111 
112         if ( valueComparator == null )
113         {
114             throw new IllegalArgumentException( I18n.err( I18n.ERR_592 ) );
115         }
116 
117         // TODO make the size of the duplicate btree cache configurable via constructor
118         duplicateBtrees = new SynchronizedLRUMap( 100 );
119 
120         if ( valueSerializer != null )
121         {
122             marshaller = new ArrayMarshaller<>( valueComparator,
123                 new MarshallerSerializerBridge<V>( valueSerializer ) );
124         }
125         else
126         {
127             marshaller = new ArrayMarshaller<>( valueComparator );
128         }
129 
130         this.numDupLimit = numDupLimit;
131         this.recMan = manager;
132         this.valueSerializer = valueSerializer;
133         this.allowsDuplicates = true;
134         long recId = recMan.getNamedObject( name );
135 
136         if ( recId == 0 ) // Create new main BTree
137         {
138             // we do not use the value serializer in the btree since duplicates will use
139             // either BTreeRedirect objects or AvlTree objects whose marshalling is
140             // explicitly managed by this code.  Value serialization is delegated to these
141             // marshallers.
142 
143             bt = new BTree<>( recMan, keyComparator, keySerializer, null );
144             recId = bt.getRecordId();
145             recMan.setNamedObject( name, recId );
146         }
147         else
148         // Load existing BTree
149         {
150             bt = new BTree<K, V>().load( recMan, recId );
151             ( ( SerializableComparator<K> ) bt.getComparator() ).setSchemaManager( schemaManager );
152             
153             count = bt.size();
154         }
155     }
156 
157 
158     /**
159      * Creates a Jdbm BTree based tuple Table abstraction without duplicates 
160      * enabled using a simple key comparator.
161      *
162      * @param schemaManager The server schemaManager
163      * @param name the name of the table
164      * @param manager the record manager to be used for this table
165      * @param keyComparator a tuple comparator
166      * @param keySerializer a serializer to use for the keys instead of using
167      * default Java serialization which could be very expensive
168      * @param valueSerializer a serializer to use for the values instead of
169      * using default Java serialization which could be very expensive
170      * @throws IOException if the table's file cannot be created
171      */
172     public JdbmTable( SchemaManager schemaManager, String name, RecordManager manager, Comparator<K> keyComparator,
173         Serializer keySerializer, Serializer valueSerializer )
174         throws IOException
175     {
176         super( schemaManager, name, keyComparator, null );
177 
178         this.duplicateBtrees = null;
179         this.numDupLimit = Integer.MAX_VALUE;
180         this.recMan = manager;
181 
182         this.valueSerializer = valueSerializer;
183 
184         this.allowsDuplicates = false;
185 
186         long recId = recMan.getNamedObject( name );
187 
188         if ( recId != 0 )
189         {
190             bt = new BTree<K, V>().load( recMan, recId );
191             ( ( SerializableComparator<K> ) bt.getComparator() ).setSchemaManager( schemaManager );
192             bt.setValueSerializer( valueSerializer );
193             
194             count = bt.size();
195         }
196         else
197         {
198             bt = new BTree<>( recMan, keyComparator, keySerializer, valueSerializer );
199             recId = bt.getRecordId();
200             recMan.setNamedObject( name, recId );
201         }
202     }
203 
204 
205     // ------------------------------------------------------------------------
206     // Count Overloads
207     // ------------------------------------------------------------------------
208     /**
209      * {@inheritDoc}
210      */
211     @Override
212     public long count( PartitionTxn transaction, K key ) throws LdapException
213     {
214         if ( key == null )
215         {
216             return 0L;
217         }
218 
219         try
220         {
221             if ( !allowsDuplicates )
222             {
223                 if ( null == bt.find( key ) )
224                 {
225                     return 0L;
226                 }
227                 else
228                 {
229                     return 1L;
230                 }
231             }
232 
233             DupsContainer<V> values = getDupsContainer( ( byte[] ) bt.find( key ) );
234 
235             if ( values.isArrayTree() )
236             {
237                 return values.getArrayTree().size();
238             }
239 
240             return getBTree( values.getBTreeRedirect() ).size();
241         }
242         catch ( IOException ioe )
243         {
244             throw new LdapOtherException( ioe.getMessage() );
245         }
246     }
247 
248 
249     // ------------------------------------------------------------------------
250     // get/has/put/remove Methods and Overloads
251     // ------------------------------------------------------------------------
252 
253     /**
254      * {@inheritDoc}
255      */
256     @SuppressWarnings("unchecked")
257     @Override
258     public V get( PartitionTxn transaction, K key ) throws LdapException
259     {
260         if ( key == null )
261         {
262             return null;
263         }
264 
265         try
266         {
267             if ( !allowsDuplicates )
268             {
269                 return bt.find( key );
270             }
271 
272             DupsContainer<V> values = getDupsContainer( ( byte[] ) bt.find( key ) );
273 
274             if ( values.isArrayTree() )
275             {
276                 ArrayTree<V> set = values.getArrayTree();
277 
278                 if ( set.getFirst() == null )
279                 {
280                     return null;
281                 }
282 
283                 return set.getFirst();
284             }
285 
286             // Handle values if they are stored in another BTree
287             BTree tree = getBTree( values.getBTreeRedirect() );
288 
289             jdbm.helper.Tuple tuple = new jdbm.helper.Tuple();
290             TupleBrowser<K, V> browser = tree.browse();
291             browser.getNext( tuple );
292 
293             return ( V ) tuple.getKey();
294         }
295         catch ( IOException ioe )
296         {
297             throw new LdapOtherException( ioe.getMessage() );
298         }
299 
300     }
301 
302 
303     /**
304      * {@inheritDoc}
305      */
306     @Override
307     public boolean hasGreaterOrEqual( PartitionTxn transaction, K key, V val ) throws LdapException
308     {
309         if ( key == null )
310         {
311             return false;
312         }
313 
314         if ( !allowsDuplicates )
315         {
316             throw new UnsupportedOperationException( I18n.err( I18n.ERR_593 ) );
317         }
318 
319         try
320         {
321             DupsContainer<V> values = getDupsContainer( ( byte[] ) bt.find( key ) );
322 
323             if ( values.isArrayTree() )
324             {
325                 ArrayTree<V> set = values.getArrayTree();
326                 V result = set.findGreaterOrEqual( val );
327                 return result != null;
328             }
329 
330             // last option is to try a btree with BTreeRedirects
331             BTree<K, V> tree = getBTree( values.getBTreeRedirect() );
332 
333             return tree.size() != 0 && btreeHas( tree, val, true );
334         }
335         catch ( IOException ioe )
336         {
337             throw new LdapOtherException( ioe.getMessage() );
338         }
339     }
340 
341 
342     /**
343      * {@inheritDoc}
344      */
345     @Override
346     public boolean hasLessOrEqual( PartitionTxn transaction, K key, V val ) throws LdapException
347     {
348         if ( key == null )
349         {
350             return false;
351         }
352 
353         if ( !allowsDuplicates )
354         {
355             throw new UnsupportedOperationException( I18n.err( I18n.ERR_593 ) );
356         }
357 
358         try
359         {
360             DupsContainer<V> values = getDupsContainer( ( byte[] ) bt.find( key ) );
361 
362             if ( values.isArrayTree() )
363             {
364                 ArrayTree<V> set = values.getArrayTree();
365                 V result = set.findLessOrEqual( val );
366                 return result != null;
367             }
368 
369             // last option is to try a btree with BTreeRedirects
370             BTree<K, V> tree = getBTree( values.getBTreeRedirect() );
371 
372             return tree.size() != 0 && btreeHas( tree, val, false );
373         }
374         catch ( IOException ioe )
375         {
376             throw new LdapOtherException( ioe.getMessage() );
377         }
378     }
379 
380 
381     /**
382      * {@inheritDoc}
383      */
384     @Override
385     @SuppressWarnings("unchecked")
386     public boolean hasGreaterOrEqual( PartitionTxn transaction, K key ) throws LdapException
387     {
388         try
389         {
390             // See if we can find the border between keys greater than and less
391             // than in the set of keys.  This will be the spot we search from.
392             jdbm.helper.Tuple tuple = bt.findGreaterOrEqual( key );
393     
394             // Test for equality first since it satisfies both greater/less than
395             if ( null != tuple && keyComparator.compare( ( K ) tuple.getKey(), key ) == 0 )
396             {
397                 return true;
398             }
399     
400             // Greater searches are easy and quick thanks to findGreaterOrEqual
401             // A null return above means there were no equal or greater keys
402             return ( null != tuple );
403         }
404         catch ( IOException ioe )
405         {
406             throw new LdapOtherException( ioe.getMessage() );
407         }
408     }
409 
410 
411     /**
412      * {@inheritDoc}
413      */
414     @SuppressWarnings("unchecked")
415     @Override
416     public boolean hasLessOrEqual( PartitionTxn transaction, K key ) throws LdapException
417     {
418         try
419         {
420             // Can only find greater than or equal to with JDBM so we find that
421             // and work backwards to see if we can find one less than the key
422             Tuple<K, V> tuple = bt.findGreaterOrEqual( key );
423     
424             // Test for equality first since it satisfies equal to condition
425             if ( null != tuple && keyComparator.compare( tuple.getKey(), key ) == 0 )
426             {
427                 return true;
428             }
429     
430             if ( null == tuple )
431             {
432                 /*
433                  * Jdbm failed to find a key greater than or equal to the argument
434                  * which means all the keys in the table are less than the
435                  * supplied key argument.  We can hence return true if the table
436                  * contains any Tuples.
437                  */
438                 return count > 0;
439             }
440             else
441             {
442                 /*
443                  * We have the next tuple whose key is the next greater than the
444                  * key argument supplied.  We use this key to advance a browser to
445                  * that tuple and scan down to lesser Tuples until we hit one
446                  * that is less than the key argument supplied.  Usually this will
447                  * be the previous tuple if it exists.
448                  */
449                 TupleBrowser browser = bt.browse( tuple.getKey() );
450     
451                 if ( browser.getPrevious( tuple ) )
452                 {
453                     return true;
454                 }
455             }
456     
457             return false;
458         }
459         catch ( IOException ioe )
460         {
461             throw new LdapOtherException( ioe.getMessage() );
462         }
463     }
464 
465 
466     /**
467      * {@inheritDoc}
468      */
469     @SuppressWarnings("unchecked")
470     public boolean has( PartitionTxn transaction, K key, V value ) throws LdapException
471     {
472         if ( key == null )
473         {
474             return false;
475         }
476 
477         try
478         {
479             if ( !allowsDuplicates )
480             {
481                 V stored = bt.find( key );
482                 return null != stored && stored.equals( value );
483             }
484 
485             DupsContainer<V> values = getDupsContainer( ( byte[] ) bt.find( key ) );
486 
487             if ( values.isArrayTree() )
488             {
489                 return values.getArrayTree().find( value ) != null;
490             }
491 
492             return getBTree( values.getBTreeRedirect() ).find( value ) != null;
493         }
494         catch ( IOException ioe )
495         {
496             throw new LdapOtherException( ioe.getMessage() );
497         }
498     }
499 
500 
501     /**
502      * {@inheritDoc}
503      */
504     @Override
505     public boolean has( PartitionTxn transaction, K key ) throws LdapException
506     {
507         try
508         {
509             return key != null && bt.find( key ) != null;
510         }
511         catch ( IOException ioe )
512         {
513             throw new LdapException( ioe );
514         }
515     }
516 
517 
518     /**
519      * {@inheritDoc}
520      */
521     @Override
522     @SuppressWarnings("unchecked")
523     public synchronized void put( PartitionTxn transaction, K key, V value ) throws LdapException
524     {
525         try
526         {
527             if ( LOG.isDebugEnabled() )
528             {
529                 LOG.debug( "---> Add {} = {}", name, key );
530             }
531 
532             if ( ( value == null ) || ( key == null ) )
533             {
534                 throw new IllegalArgumentException( I18n.err( I18n.ERR_594 ) );
535             }
536 
537             V replaced;
538 
539             if ( !allowsDuplicates )
540             {
541                 replaced = ( V ) bt.insert( key, value, true );
542 
543                 if ( null == replaced )
544                 {
545                     count++;
546                 }
547 
548                 if ( LOG.isDebugEnabled() )
549                 {
550                     LOG.debug( "<--- Add ONE {} = {}", name, key );
551                 }
552 
553                 return;
554             }
555 
556             DupsContainer<V> values = getDupsContainer( ( byte[] ) bt.find( key ) );
557 
558             if ( values.isArrayTree() )
559             {
560                 ArrayTree<V> set = values.getArrayTree();
561                 replaced = set.insert( value );
562 
563                 if ( replaced != null )// if the value already present returns the same value
564                 {
565                     return;
566                 }
567 
568                 if ( set.size() > numDupLimit )
569                 {
570                     BTree tree = convertToBTree( set );
571                     BTreeRedirecterver/core/partition/impl/btree/jdbm/BTreeRedirect.html#BTreeRedirect">BTreeRedirect redirect = new BTreeRedirect( tree.getRecordId() );
572                     bt.insert( key, ( V ) BTreeRedirectMarshaller.INSTANCE.serialize( redirect ), true );
573 
574                     if ( LOG.isDebugEnabled() )
575                     {
576                         LOG.debug( "<--- Add new BTREE {} = {}", name, key );
577                     }
578                 }
579                 else
580                 {
581                     bt.insert( key, ( V ) marshaller.serialize( set ), true );
582 
583                     if ( LOG.isDebugEnabled() )
584                     {
585                         LOG.debug( "<--- Add AVL {} = {}", name, key );
586                     }
587                 }
588 
589                 count++;
590 
591                 return;
592             }
593 
594             BTree tree = getBTree( values.getBTreeRedirect() );
595             replaced = ( V ) tree.insert( value, Strings.EMPTY_BYTES, true );
596 
597             if ( replaced == null )
598             {
599                 count++;
600             }
601 
602             if ( LOG.isDebugEnabled() )
603             {
604                 LOG.debug( "<--- Add BTREE {} = {}", name, key );
605             }
606         }
607         catch ( IOException | CursorException | LdapException e )
608         {
609             LOG.error( I18n.err( I18n.ERR_131, key, name ), e );
610             throw new LdapOtherException( e.getMessage(), e );
611         }
612     }
613 
614 
615     /**
616      * {@inheritDoc}
617      */
618     @SuppressWarnings("unchecked")
619     @Override
620     public synchronized void remove( PartitionTxn transaction, K key, V value ) throws LdapException
621     {
622         try
623         {
624             if ( LOG.isDebugEnabled() )
625             {
626                 LOG.debug( "---> Remove {} = {}, {}", name, key, value );
627             }
628 
629             if ( key == null )
630             {
631                 if ( LOG.isDebugEnabled() )
632                 {
633                     LOG.debug( "<--- Remove NULL key {}", name );
634                 }
635 
636                 return;
637             }
638 
639             if ( !allowsDuplicates )
640             {
641                 V oldValue = bt.find( key );
642 
643                 // Remove the value only if it is the same as value.
644                 if ( ( oldValue != null ) && oldValue.equals( value ) )
645                 {
646                     bt.remove( key );
647                     count--;
648 
649                     if ( LOG.isDebugEnabled() )
650                     {
651                         LOG.debug( "<--- Remove ONE {} = {}, {}", name, key, value );
652                     }
653                 }
654 
655                 return;
656             }
657 
658             DupsContainer<V> values = getDupsContainer( ( byte[] ) bt.find( key ) );
659 
660             if ( values.isArrayTree() )
661             {
662                 ArrayTree<V> set = values.getArrayTree();
663 
664                 // If removal succeeds then remove if set is empty else replace it
665                 if ( set.remove( value ) != null )
666                 {
667                     if ( set.isEmpty() )
668                     {
669                         bt.remove( key );
670                     }
671                     else
672                     {
673                         bt.insert( key, ( V ) marshaller.serialize( set ), true );
674                     }
675 
676                     count--;
677 
678                     if ( LOG.isDebugEnabled() )
679                     {
680                         LOG.debug( "<--- Remove AVL {} = {}, {}", name, key, value );
681                     }
682                 }
683 
684                 return;
685             }
686 
687             // if the number of duplicates falls below the numDupLimit value
688             BTree tree = getBTree( values.getBTreeRedirect() );
689 
690             if ( tree.find( value ) != null && tree.remove( value ) != null )
691             {
692                 /*
693                  * If we drop below the duplicate limit then we revert from using
694                  * a Jdbm BTree to using an in memory AvlTree.
695                  */
696                 if ( tree.size() <= numDupLimit )
697                 {
698                     ArrayTree<V> avlTree = convertToArrayTree( tree );
699                     bt.insert( key, ( V ) marshaller.serialize( avlTree ), true );
700                     recMan.delete( tree.getRecordId() );
701                 }
702 
703                 count--;
704 
705                 if ( LOG.isDebugEnabled() )
706                 {
707                     LOG.debug( "<--- Remove BTREE {} = {}, {}", name, key, value );
708                 }
709             }
710         }
711         catch ( Exception e )
712         {
713             LOG.error( I18n.err( I18n.ERR_132, key, value, name ), e );
714         }
715     }
716 
717 
718     /**
719      * {@inheritDoc}
720      */
721     @Override
722     public synchronized void remove( PartitionTxn transaction, K key ) throws LdapException
723     {
724         try
725         {
726             if ( LOG.isDebugEnabled() )
727             {
728                 LOG.debug( "---> Remove {} = {}", name, key );
729             }
730 
731             if ( key == null )
732             {
733                 return;
734             }
735 
736             Object returned = bt.remove( key );
737 
738             if ( null == returned )
739             {
740                 if ( LOG.isDebugEnabled() )
741                 {
742                     LOG.debug( "<--- Remove AVL {} = {} (not found)", name, key );
743                 }
744 
745                 return;
746             }
747 
748             if ( !allowsDuplicates )
749             {
750                 this.count--;
751 
752                 if ( LOG.isDebugEnabled() )
753                 {
754                     LOG.debug( "<--- Remove ONE {} = {}", name, key );
755                 }
756 
757                 return;
758             }
759 
760             byte[] serialized = ( byte[] ) returned;
761 
762             if ( BTreeRedirectMarshaller.isRedirect( serialized ) )
763             {
764                 BTree tree = getBTree( BTreeRedirectMarshaller.INSTANCE.deserialize( serialized ) );
765                 this.count -= tree.size();
766 
767                 if ( LOG.isDebugEnabled() )
768                 {
769                     LOG.debug( "<--- Remove BTree {} = {}", name, key );
770                 }
771 
772                 recMan.delete( tree.getRecordId() );
773                 duplicateBtrees.remove( tree.getRecordId() );
774             }
775             else
776             {
777                 ArrayTree<V> set = marshaller.deserialize( serialized );
778                 this.count -= set.size();
779 
780                 if ( LOG.isDebugEnabled() )
781                 {
782                     LOG.debug( "<--- Remove AVL {} = {}", name, key );
783                 }
784             }
785         }
786         catch ( Exception e )
787         {
788             LOG.error( I18n.err( I18n.ERR_133, key, name ), e );
789         }
790     }
791 
792 
793     /**
794      * {@inheritDoc}
795      */
796     @Override
797     public Cursor<org.apache.directory.api.ldap.model.cursor.Tuple<K, V>> cursor()
798     {
799         if ( allowsDuplicates )
800         {
801             return new DupsCursor<>( this );
802         }
803 
804         return new NoDupsCursor<>( this );
805     }
806 
807 
808     /**
809      * {@inheritDoc}
810      */
811     @Override
812     public Cursor<org.apache.directory.api.ldap.model.cursor.Tuple<K, V>> cursor( PartitionTxn partitionTxn, K key ) throws LdapException
813     {
814         if ( key == null )
815         {
816             return new EmptyCursor<>();
817         }
818 
819         try
820         { 
821             V raw = bt.find( key );
822     
823             if ( null == raw )
824             {
825                 return new EmptyCursor<>();
826             }
827     
828             if ( !allowsDuplicates )
829             {
830                 return new SingletonCursor<>(
831                     new org.apache.directory.api.ldap.model.cursor.Tuple<K, V>( key, raw ) );
832             }
833     
834             byte[] serialized = ( byte[] ) raw;
835     
836             if ( BTreeRedirectMarshaller.isRedirect( serialized ) )
837             {
838                 BTree tree = getBTree( BTreeRedirectMarshaller.INSTANCE.deserialize( serialized ) );
839                 return new KeyTupleBTreeCursor<>( tree, key, valueComparator );
840             }
841     
842             ArrayTree<V> set = marshaller.deserialize( serialized );
843     
844             return new KeyTupleArrayCursor<>( set, key );
845         }
846         catch ( IOException ioe )
847         {
848             throw new LdapOtherException( ioe.getMessage(), ioe );
849         }
850     }
851 
852 
853     /**
854      * {@inheritDoc}
855      */
856     @SuppressWarnings("unchecked")
857     @Override
858     public Cursor<V> valueCursor( PartitionTxn transaction, K key ) throws LdapException
859     {
860         if ( key == null )
861         {
862             return new EmptyCursor<>();
863         }
864 
865         try
866         {
867             V raw = bt.find( key );
868     
869             if ( null == raw )
870             {
871                 return new EmptyCursor<>();
872             }
873     
874             if ( !allowsDuplicates )
875             {
876                 return new SingletonCursor<>( raw );
877             }
878     
879             byte[] serialized = ( byte[] ) raw;
880     
881             if ( BTreeRedirectMarshaller.isRedirect( serialized ) )
882             {
883                 BTree tree = getBTree( BTreeRedirectMarshaller.INSTANCE.deserialize( serialized ) );
884                 return new KeyBTreeCursor<>( tree, valueComparator );
885             }
886     
887             return new ArrayTreeCursor<>( marshaller.deserialize( serialized ) );
888         }
889         catch ( IOException ioe )
890         {
891             throw new LdapOtherException( ioe.getMessage(), ioe );
892         }
893     }
894 
895 
896     // ------------------------------------------------------------------------
897     // Maintenance Operations 
898     // ------------------------------------------------------------------------
899     /**
900      * {@inheritDoc}
901      */
902     @Override
903     public synchronized void close( PartitionTxn transaction ) throws LdapException
904     {
905         // Nothing to do
906     }
907 
908 
909     public Marshaller<ArrayTree<V>> getMarshaller()
910     {
911         return marshaller;
912     }
913 
914 
915     // ------------------------------------------------------------------------
916     // Private/Package Utility Methods 
917     // ------------------------------------------------------------------------
918 
919     /**
920      * Added to check that we actually switch from one data structure to the 
921      * B+Tree structure on disk for duplicates that go beyond the threshold.
922      */
923     boolean isKeyUsingBTree( K key ) throws Exception
924     {
925         if ( key == null )
926         {
927             throw new IllegalArgumentException( "key is null" );
928         }
929 
930         if ( !allowsDuplicates )
931         {
932             return false;
933         }
934 
935         DupsContainer<V> values = getDupsContainer( ( byte[] ) bt.find( key ) );
936 
937         return values.isBTreeRedirect();
938     }
939 
940 
941     DupsContainer<V> getDupsContainer( byte[] serialized ) throws LdapException
942     {
943         if ( serialized == null )
944         {
945             return new DupsContainer<>( new ArrayTree<V>( valueComparator ) );
946         }
947 
948         if ( BTreeRedirectMarshaller.isRedirect( serialized ) )
949         {
950             try
951             {
952                 return new DupsContainer<>( BTreeRedirectMarshaller.INSTANCE.deserialize( serialized ) );
953             }
954             catch ( IOException ioe )
955             {
956                 throw new LdapOtherException( ioe.getMessage() );
957             }
958 
959         }
960 
961         try
962         {
963             return new DupsContainer<>( marshaller.deserialize( serialized ) );
964         }
965         catch ( IOException ioe )
966         {
967             throw new LdapOtherException( ioe.getMessage() );
968         }
969     }
970 
971 
972     /**
973      * Returns the main BTree used by this table.
974      *
975      * @return the main JDBM BTree used by this table
976      */
977     BTree getBTree()
978     {
979         return bt;
980     }
981 
982 
983     BTree getBTree( BTreeRedirect redirect ) throws IOException
984     {
985         if ( duplicateBtrees.containsKey( redirect.getRecId() ) )
986         {
987             return duplicateBtrees.get( redirect.getRecId() );
988         }
989 
990         BTree<K, V> tree = new BTree<K, V>().load( recMan, redirect.getRecId() );
991         ( ( SerializableComparator<K> ) tree.getComparator() ).setSchemaManager( schemaManager );
992         duplicateBtrees.put( redirect.getRecId(), tree );
993 
994         return tree;
995     }
996 
997 
998     @SuppressWarnings("unchecked")
999     private boolean btreeHas( BTree tree, V key, boolean isGreaterThan ) throws IOException
1000     {
1001         jdbm.helper.Tuple tuple = new jdbm.helper.Tuple();
1002 
1003         TupleBrowser browser = tree.browse( key );
1004 
1005         if ( isGreaterThan )
1006         {
1007             return browser.getNext( tuple );
1008         }
1009         else
1010         {
1011             if ( browser.getPrevious( tuple ) )
1012             {
1013                 return true;
1014             }
1015             else
1016             {
1017                 /*
1018                  * getPrevious() above fails which means the browser has is
1019                  * before the first Tuple of the btree.  A call to getNext()
1020                  * should work every time.
1021                  */
1022                 browser.getNext( tuple );
1023 
1024                 /*
1025                  * Since the browser is positioned now on the Tuple with the
1026                  * smallest key we just need to check if it equals this key
1027                  * which is the only chance for returning true.
1028                  */
1029                 V firstKey = ( V ) tuple.getKey();
1030 
1031                 return valueComparator.compare( key, firstKey ) == 0;
1032             }
1033         }
1034     }
1035 
1036 
1037     @SuppressWarnings("unchecked")
1038     private ArrayTree<V> convertToArrayTree( BTree bTree ) throws IOException
1039     {
1040         ArrayTree<V> avlTree = new ArrayTree<>( valueComparator );
1041         TupleBrowser browser = bTree.browse();
1042         jdbm.helper.Tuple tuple = new jdbm.helper.Tuple();
1043 
1044         while ( browser.getNext( tuple ) )
1045         {
1046             avlTree.insert( ( V ) tuple.getKey() );
1047         }
1048 
1049         return avlTree;
1050     }
1051 
1052 
1053     private BTree<V, K> convertToBTree( ArrayTree<V> arrayTree ) throws IOException, CursorException, LdapException
1054     {
1055         BTree<V, K> bTree;
1056 
1057         if ( valueSerializer != null )
1058         {
1059             bTree = new BTree<>( recMan, valueComparator, valueSerializer, null );
1060         }
1061         else
1062         {
1063             bTree = new BTree<>( recMan, valueComparator );
1064         }
1065 
1066         try ( Cursor<V> keys = new ArrayTreeCursor<>( arrayTree ) )
1067         {
1068             keys.beforeFirst();
1069     
1070             while ( keys.next() )
1071             {
1072                 bTree.insert( keys.get(), ( K ) Strings.EMPTY_BYTES, true );
1073             }
1074         }
1075 
1076         return bTree;
1077     }
1078 }