Class MutableHashTable<BT,​PT>

  • Type Parameters:
    BT - The type of records from the build side that are stored in the hash table.
    PT - The type of records from the probe side that are stored in the hash table.
    All Implemented Interfaces:
    MemorySegmentSource
    Direct Known Subclasses:
    ReOpenableMutableHashTable

    public class MutableHashTable<BT,​PT>
    extends Object
    implements MemorySegmentSource
    An implementation of a Hybrid Hash Join. The join starts operating in memory and gradually starts spilling contents to disk, when the memory is not sufficient. It does not need to know a priori how large the input will be.

    The design of this class follows in many parts the design presented in "Hash joins and hash teams in Microsoft SQL Server", by Goetz Graefe et al. In its current state, the implementation lacks features like dynamic role reversal, partition tuning, or histogram guided partitioning.

    The layout of the buckets inside a memory segment is as follows:

     +----------------------------- Bucket x ----------------------------
     |Partition (1 byte) | Status (1 byte) | element count (2 bytes) |
     | next-bucket-in-chain-pointer (8 bytes) | probedFlags (2 bytes) | reserved (2 bytes) |
     |
     |hashCode 1 (4 bytes) | hashCode 2 (4 bytes) | hashCode 3 (4 bytes) |
     | ... hashCode n-1 (4 bytes) | hashCode n (4 bytes)
     |
     |pointer 1 (8 bytes) | pointer 2 (8 bytes) | pointer 3 (8 bytes) |
     | ... pointer n-1 (8 bytes) | pointer n (8 bytes)
     |
     +---------------------------- Bucket x + 1--------------------------
     |Partition (1 byte) | Status (1 byte) | element count (2 bytes) |
     | next-bucket-in-chain-pointer (8 bytes) | probedFlags (2 bytes) | reserved (2 bytes) |
     |
     |hashCode 1 (4 bytes) | hashCode 2 (4 bytes) | hashCode 3 (4 bytes) |
     | ... hashCode n-1 (4 bytes) | hashCode n (4 bytes)
     |
     |pointer 1 (8 bytes) | pointer 2 (8 bytes) | pointer 3 (8 bytes) |
     | ... pointer n-1 (8 bytes) | pointer n (8 bytes)
     +-------------------------------------------------------------------
     | ...
     |
     
    • Field Detail

      • buildSideSerializer

        protected final TypeSerializer<BT> buildSideSerializer
        The utilities to serialize the build side data types.
      • probeSideSerializer

        protected final TypeSerializer<PT> probeSideSerializer
        The utilities to serialize the probe side data types.
      • buildSideComparator

        protected final TypeComparator<BT> buildSideComparator
        The utilities to hash and compare the build side data types.
      • availableMemory

        protected final List<MemorySegment> availableMemory
        The free memory segments currently available to the hash join.
      • writeBehindBuffers

        protected final LinkedBlockingQueue<MemorySegment> writeBehindBuffers
        The queue of buffers that can be used for write-behind. Any buffer that is written asynchronously to disk is returned through this queue. hence, it may sometimes contain more
      • ioManager

        protected final IOManager ioManager
        The I/O manager used to instantiate writers for the spilled partitions.
      • segmentSize

        protected final int segmentSize
        The size of the segments used by the hash join buckets. All segments must be of equal size to ease offset computations.
      • bucketsPerSegmentMask

        protected final int bucketsPerSegmentMask
        The number of hash table buckets in a single memory segment - 1. Because memory segments can be comparatively large, we fit multiple buckets into one memory segment. This variable is a mask that is 1 in the lower bits that define the number of a bucket in a segment.
      • bucketsPerSegmentBits

        protected final int bucketsPerSegmentBits
        The number of bits that describe the position of a bucket in a memory segment. Computed as log2(bucketsPerSegment).
      • partitionsBeingBuilt

        protected final ArrayList<HashPartition<BT,​PT>> partitionsBeingBuilt
        The partitions that are built by processing the current partition.
      • currentEnumerator

        protected FileIOChannel.Enumerator currentEnumerator
        The channel enumerator that is used while processing the current partition to create channels for the spill partitions it requires.
      • buckets

        protected MemorySegment[] buckets
        The array of memory segments that contain the buckets which form the actual hash-table of hash-codes and pointers to the elements.
      • numBuckets

        protected int numBuckets
        The number of buckets in the current table. The bucket array is not necessarily fully used, when not all buckets that would fit into the last segment are actually used.
      • writeBehindBuffersAvailable

        protected int writeBehindBuffersAvailable
        The number of buffers in the write behind queue that are actually not write behind buffers, but regular buffers that only have not yet returned. This is part of an optimization that the spilling code needs not wait until the partition is completely spilled before proceeding.
      • currentRecursionDepth

        protected int currentRecursionDepth
        The recursion depth of the partition that is currently processed. The initial table has a recursion depth of 0. Partitions spilled from a table that is built for a partition with recursion depth n have a recursion depth of n+1.
      • closed

        protected AtomicBoolean closed
        Flag indicating that the closing logic has been invoked.
      • keepBuildSidePartitions

        protected boolean keepBuildSidePartitions
        If true, build side partitions are kept for multiple probe steps.
      • furtherPartitioning

        protected boolean furtherPartitioning
    • Method Detail

      • open

        public void open​(MutableObjectIterator<BT> buildSide,
                         MutableObjectIterator<PT> probeSide)
                  throws IOException
        Opens the hash join. This method reads the build-side input and constructs the initial hash table, gradually spilling partitions that do not fit into memory.
        Parameters:
        buildSide - Build side input.
        probeSide - Probe side input.
        Throws:
        IOException - Thrown, if an I/O problem occurs while spilling a partition.
      • open

        public void open​(MutableObjectIterator<BT> buildSide,
                         MutableObjectIterator<PT> probeSide,
                         boolean buildOuterJoin)
                  throws IOException
        Opens the hash join. This method reads the build-side input and constructs the initial hash table, gradually spilling partitions that do not fit into memory.
        Parameters:
        buildSide - Build side input.
        probeSide - Probe side input.
        buildOuterJoin - Whether outer join on build side.
        Throws:
        IOException - Thrown, if an I/O problem occurs while spilling a partition.
      • processUnmatchedBuildIter

        protected boolean processUnmatchedBuildIter()
                                             throws IOException
        Throws:
        IOException
      • getCurrentProbeRecord

        public PT getCurrentProbeRecord()
      • close

        public void close()
        Closes the hash table. This effectively releases all internal structures and closes all open files and removes them. The call to this method is valid both as a cleanup after the complete inputs were properly processed, and as an cancellation call, which cleans up all resources that are currently held by the hash join.
      • abort

        public void abort()
      • buildInitialTable

        protected void buildInitialTable​(MutableObjectIterator<BT> input)
                                  throws IOException
        Creates the initial hash table. This method sets up partitions, hash index, and inserts the data from the given iterator.
        Parameters:
        input - The iterator with the build side data.
        Throws:
        IOException - Thrown, if an element could not be fetched and deserialized from the iterator, or if serialization fails.
      • insertIntoTable

        protected final void insertIntoTable​(BT record,
                                             int hashCode)
                                      throws IOException
        Throws:
        IOException
      • getNewInMemoryPartition

        protected HashPartition<BT,​PT> getNewInMemoryPartition​(int number,
                                                                     int recursionLevel)
        Returns a new inMemoryPartition object. This is required as a plug for ReOpenableMutableHashTable.
      • createPartitions

        protected void createPartitions​(int numPartitions,
                                        int recursionLevel)
      • clearPartitions

        protected void clearPartitions()
        This method clears all partitions currently residing (partially) in memory. It releases all memory and deletes all spilled partitions.

        This method is intended for a hard cleanup in the case that the join is aborted.

      • initTable

        protected void initTable​(int numBuckets,
                                 byte numPartitions)
      • releaseTable

        protected void releaseTable()
        Releases the table (the array of buckets) and returns the occupied memory segments to the list of free segments.
      • spillPartition

        protected int spillPartition()
                              throws IOException
        Selects a partition and spills it. The number of the spilled partition is returned.
        Returns:
        The number of the spilled partition.
        Throws:
        IOException
      • buildBloomFilterForBucketsInPartition

        protected final void buildBloomFilterForBucketsInPartition​(int partNum,
                                                                   HashPartition<BT,​PT> partition)
      • nextSegment

        public MemorySegment nextSegment()
        This is the method called by the partitions to request memory to serialize records. It automatically spills partitions, if memory runs out.
        Specified by:
        nextSegment in interface MemorySegmentSource
        Returns:
        The next available memory segment.
      • getNumWriteBehindBuffers

        public static int getNumWriteBehindBuffers​(int numBuffers)
        Determines the number of buffers to be used for asynchronous write behind. It is currently computed as the logarithm of the number of buffers to the base 4, rounded up, minus 2. The upper limit for the number of write behind buffers is however set to six.
        Parameters:
        numBuffers - The number of available buffers.
        Returns:
        The number
      • getPartitioningFanOutNoEstimates

        public static int getPartitioningFanOutNoEstimates​(int numBuffers)
        Gets the number of partitions to be used for an initial hash-table, when no estimates are available.

        The current logic makes sure that there are always between 10 and 127 partitions, and close to 0.1 of the number of buffers.

        Parameters:
        numBuffers - The number of buffers available.
        Returns:
        The number of partitions to use.
      • getInitialTableSize

        public static int getInitialTableSize​(int numBuffers,
                                              int bufferSize,
                                              int numPartitions,
                                              int recordLenBytes)
      • assignPartition

        public static byte assignPartition​(int bucket,
                                           byte numPartitions)
        Assigns a partition to a bucket.
        Parameters:
        bucket - The bucket to get the partition for.
        numPartitions - The number of partitions.
        Returns:
        The partition for the bucket.
      • hash

        public static int hash​(int code,
                               int level)
        The level parameter is needed so that we can have different hash functions when we recursively apply the partitioning, so that the working set eventually fits into memory.