Class MutableHashTable<BT,PT>
- java.lang.Object
-
- org.apache.flink.runtime.operators.hash.MutableHashTable<BT,PT>
-
- Type Parameters:
BT
- The type of records from the build side that are stored in the hash table.PT
- The type of records from the probe side that are stored in the hash table.
- All Implemented Interfaces:
MemorySegmentSource
- Direct Known Subclasses:
ReOpenableMutableHashTable
public class MutableHashTable<BT,PT> extends Object implements MemorySegmentSource
An implementation of a Hybrid Hash Join. The join starts operating in memory and gradually starts spilling contents to disk, when the memory is not sufficient. It does not need to know a priori how large the input will be.The design of this class follows in many parts the design presented in "Hash joins and hash teams in Microsoft SQL Server", by Goetz Graefe et al. In its current state, the implementation lacks features like dynamic role reversal, partition tuning, or histogram guided partitioning.
The layout of the buckets inside a memory segment is as follows:
+----------------------------- Bucket x ---------------------------- |Partition (1 byte) | Status (1 byte) | element count (2 bytes) | | next-bucket-in-chain-pointer (8 bytes) | probedFlags (2 bytes) | reserved (2 bytes) | | |hashCode 1 (4 bytes) | hashCode 2 (4 bytes) | hashCode 3 (4 bytes) | | ... hashCode n-1 (4 bytes) | hashCode n (4 bytes) | |pointer 1 (8 bytes) | pointer 2 (8 bytes) | pointer 3 (8 bytes) | | ... pointer n-1 (8 bytes) | pointer n (8 bytes) | +---------------------------- Bucket x + 1-------------------------- |Partition (1 byte) | Status (1 byte) | element count (2 bytes) | | next-bucket-in-chain-pointer (8 bytes) | probedFlags (2 bytes) | reserved (2 bytes) | | |hashCode 1 (4 bytes) | hashCode 2 (4 bytes) | hashCode 3 (4 bytes) | | ... hashCode n-1 (4 bytes) | hashCode n (4 bytes) | |pointer 1 (8 bytes) | pointer 2 (8 bytes) | pointer 3 (8 bytes) | | ... pointer n-1 (8 bytes) | pointer n (8 bytes) +------------------------------------------------------------------- | ... |
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
MutableHashTable.HashBucketIterator<BT,PT>
static class
MutableHashTable.ProbeIterator<PT>
static class
MutableHashTable.UnmatchedBuildIterator<BT,PT>
Iterate all the elements in memory which has not been probed during probe phase.
-
Field Summary
Fields Modifier and Type Field Description protected List<MemorySegment>
availableMemory
The free memory segments currently available to the hash join.protected MemorySegment[]
buckets
The array of memory segments that contain the buckets which form the actual hash-table of hash-codes and pointers to the elements.protected int
bucketsPerSegmentBits
The number of bits that describe the position of a bucket in a memory segment.protected int
bucketsPerSegmentMask
The number of hash table buckets in a single memory segment - 1.protected TypeComparator<BT>
buildSideComparator
The utilities to hash and compare the build side data types.protected TypeSerializer<BT>
buildSideSerializer
The utilities to serialize the build side data types.protected AtomicBoolean
closed
Flag indicating that the closing logic has been invoked.protected FileIOChannel.Enumerator
currentEnumerator
The channel enumerator that is used while processing the current partition to create channels for the spill partitions it requires.protected int
currentRecursionDepth
The recursion depth of the partition that is currently processed.protected boolean
furtherPartitioning
protected IOManager
ioManager
The I/O manager used to instantiate writers for the spilled partitions.protected boolean
keepBuildSidePartitions
If true, build side partitions are kept for multiple probe steps.protected int
numBuckets
The number of buckets in the current table.protected ArrayList<HashPartition<BT,PT>>
partitionsBeingBuilt
The partitions that are built by processing the current partition.protected MutableHashTable.ProbeIterator<PT>
probeIterator
Iterator over the elements from the probe side.protected TypeSerializer<PT>
probeSideSerializer
The utilities to serialize the probe side data types.protected int
segmentSize
The size of the segments used by the hash join buckets.protected LinkedBlockingQueue<MemorySegment>
writeBehindBuffers
The queue of buffers that can be used for write-behind.protected int
writeBehindBuffersAvailable
The number of buffers in the write behind queue that are actually not write behind buffers, but regular buffers that only have not yet returned.
-
Constructor Summary
Constructors Constructor Description MutableHashTable(TypeSerializer<BT> buildSideSerializer, TypeSerializer<PT> probeSideSerializer, TypeComparator<BT> buildSideComparator, TypeComparator<PT> probeSideComparator, TypePairComparator<PT,BT> comparator, List<MemorySegment> memorySegments, IOManager ioManager)
MutableHashTable(TypeSerializer<BT> buildSideSerializer, TypeSerializer<PT> probeSideSerializer, TypeComparator<BT> buildSideComparator, TypeComparator<PT> probeSideComparator, TypePairComparator<PT,BT> comparator, List<MemorySegment> memorySegments, IOManager ioManager, boolean useBloomFilters)
MutableHashTable(TypeSerializer<BT> buildSideSerializer, TypeSerializer<PT> probeSideSerializer, TypeComparator<BT> buildSideComparator, TypeComparator<PT> probeSideComparator, TypePairComparator<PT,BT> comparator, List<MemorySegment> memorySegments, IOManager ioManager, int avgRecordLen, boolean useBloomFilters)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description void
abort()
static byte
assignPartition(int bucket, byte numPartitions)
Assigns a partition to a bucket.protected void
buildBloomFilterForBucketsInPartition(int partNum, HashPartition<BT,PT> partition)
protected void
buildInitialTable(MutableObjectIterator<BT> input)
Creates the initial hash table.protected void
buildTableFromSpilledPartition(HashPartition<BT,PT> p)
protected void
clearPartitions()
This method clears all partitions currently residing (partially) in memory.void
close()
Closes the hash table.protected void
createPartitions(int numPartitions, int recursionLevel)
MutableObjectIterator<BT>
getBuildSideIterator()
PT
getCurrentProbeRecord()
List<MemorySegment>
getFreedMemory()
static int
getInitialTableSize(int numBuffers, int bufferSize, int numPartitions, int recordLenBytes)
MutableHashTable.HashBucketIterator<BT,PT>
getMatchesFor(PT record)
protected HashPartition<BT,PT>
getNewInMemoryPartition(int number, int recursionLevel)
Returns a new inMemoryPartition object.static int
getNumWriteBehindBuffers(int numBuffers)
Determines the number of buffers to be used for asynchronous write behind.static int
getPartitioningFanOutNoEstimates(int numBuffers)
Gets the number of partitions to be used for an initial hash-table, when no estimates are available.TypeComparator<PT>
getProbeSideComparator()
static int
hash(int code, int level)
The level parameter is needed so that we can have different hash functions when we recursively apply the partitioning, so that the working set eventually fits into memory.protected void
initTable(int numBuckets, byte numPartitions)
protected void
insertIntoTable(BT record, int hashCode)
boolean
nextRecord()
MemorySegment
nextSegment()
This is the method called by the partitions to request memory to serialize records.void
open(MutableObjectIterator<BT> buildSide, MutableObjectIterator<PT> probeSide)
Opens the hash join.void
open(MutableObjectIterator<BT> buildSide, MutableObjectIterator<PT> probeSide, boolean buildOuterJoin)
Opens the hash join.protected boolean
prepareNextPartition()
protected boolean
processProbeIter()
protected boolean
processUnmatchedBuildIter()
protected void
releaseTable()
Releases the table (the array of buckets) and returns the occupied memory segments to the list of free segments.protected int
spillPartition()
Selects a partition and spills it.
-
-
-
Field Detail
-
buildSideSerializer
protected final TypeSerializer<BT> buildSideSerializer
The utilities to serialize the build side data types.
-
probeSideSerializer
protected final TypeSerializer<PT> probeSideSerializer
The utilities to serialize the probe side data types.
-
buildSideComparator
protected final TypeComparator<BT> buildSideComparator
The utilities to hash and compare the build side data types.
-
availableMemory
protected final List<MemorySegment> availableMemory
The free memory segments currently available to the hash join.
-
writeBehindBuffers
protected final LinkedBlockingQueue<MemorySegment> writeBehindBuffers
The queue of buffers that can be used for write-behind. Any buffer that is written asynchronously to disk is returned through this queue. hence, it may sometimes contain more
-
ioManager
protected final IOManager ioManager
The I/O manager used to instantiate writers for the spilled partitions.
-
segmentSize
protected final int segmentSize
The size of the segments used by the hash join buckets. All segments must be of equal size to ease offset computations.
-
bucketsPerSegmentMask
protected final int bucketsPerSegmentMask
The number of hash table buckets in a single memory segment - 1. Because memory segments can be comparatively large, we fit multiple buckets into one memory segment. This variable is a mask that is 1 in the lower bits that define the number of a bucket in a segment.
-
bucketsPerSegmentBits
protected final int bucketsPerSegmentBits
The number of bits that describe the position of a bucket in a memory segment. Computed as log2(bucketsPerSegment).
-
partitionsBeingBuilt
protected final ArrayList<HashPartition<BT,PT>> partitionsBeingBuilt
The partitions that are built by processing the current partition.
-
probeIterator
protected MutableHashTable.ProbeIterator<PT> probeIterator
Iterator over the elements from the probe side.
-
currentEnumerator
protected FileIOChannel.Enumerator currentEnumerator
The channel enumerator that is used while processing the current partition to create channels for the spill partitions it requires.
-
buckets
protected MemorySegment[] buckets
The array of memory segments that contain the buckets which form the actual hash-table of hash-codes and pointers to the elements.
-
numBuckets
protected int numBuckets
The number of buckets in the current table. The bucket array is not necessarily fully used, when not all buckets that would fit into the last segment are actually used.
-
writeBehindBuffersAvailable
protected int writeBehindBuffersAvailable
The number of buffers in the write behind queue that are actually not write behind buffers, but regular buffers that only have not yet returned. This is part of an optimization that the spilling code needs not wait until the partition is completely spilled before proceeding.
-
currentRecursionDepth
protected int currentRecursionDepth
The recursion depth of the partition that is currently processed. The initial table has a recursion depth of 0. Partitions spilled from a table that is built for a partition with recursion depth n have a recursion depth of n+1.
-
closed
protected AtomicBoolean closed
Flag indicating that the closing logic has been invoked.
-
keepBuildSidePartitions
protected boolean keepBuildSidePartitions
If true, build side partitions are kept for multiple probe steps.
-
furtherPartitioning
protected boolean furtherPartitioning
-
-
Constructor Detail
-
MutableHashTable
public MutableHashTable(TypeSerializer<BT> buildSideSerializer, TypeSerializer<PT> probeSideSerializer, TypeComparator<BT> buildSideComparator, TypeComparator<PT> probeSideComparator, TypePairComparator<PT,BT> comparator, List<MemorySegment> memorySegments, IOManager ioManager)
-
MutableHashTable
public MutableHashTable(TypeSerializer<BT> buildSideSerializer, TypeSerializer<PT> probeSideSerializer, TypeComparator<BT> buildSideComparator, TypeComparator<PT> probeSideComparator, TypePairComparator<PT,BT> comparator, List<MemorySegment> memorySegments, IOManager ioManager, boolean useBloomFilters)
-
MutableHashTable
public MutableHashTable(TypeSerializer<BT> buildSideSerializer, TypeSerializer<PT> probeSideSerializer, TypeComparator<BT> buildSideComparator, TypeComparator<PT> probeSideComparator, TypePairComparator<PT,BT> comparator, List<MemorySegment> memorySegments, IOManager ioManager, int avgRecordLen, boolean useBloomFilters)
-
-
Method Detail
-
open
public void open(MutableObjectIterator<BT> buildSide, MutableObjectIterator<PT> probeSide) throws IOException
Opens the hash join. This method reads the build-side input and constructs the initial hash table, gradually spilling partitions that do not fit into memory.- Parameters:
buildSide
- Build side input.probeSide
- Probe side input.- Throws:
IOException
- Thrown, if an I/O problem occurs while spilling a partition.
-
open
public void open(MutableObjectIterator<BT> buildSide, MutableObjectIterator<PT> probeSide, boolean buildOuterJoin) throws IOException
Opens the hash join. This method reads the build-side input and constructs the initial hash table, gradually spilling partitions that do not fit into memory.- Parameters:
buildSide
- Build side input.probeSide
- Probe side input.buildOuterJoin
- Whether outer join on build side.- Throws:
IOException
- Thrown, if an I/O problem occurs while spilling a partition.
-
processProbeIter
protected boolean processProbeIter() throws IOException
- Throws:
IOException
-
processUnmatchedBuildIter
protected boolean processUnmatchedBuildIter() throws IOException
- Throws:
IOException
-
prepareNextPartition
protected boolean prepareNextPartition() throws IOException
- Throws:
IOException
-
nextRecord
public boolean nextRecord() throws IOException
- Throws:
IOException
-
getMatchesFor
public MutableHashTable.HashBucketIterator<BT,PT> getMatchesFor(PT record) throws IOException
- Throws:
IOException
-
getCurrentProbeRecord
public PT getCurrentProbeRecord()
-
getBuildSideIterator
public MutableObjectIterator<BT> getBuildSideIterator()
-
close
public void close()
Closes the hash table. This effectively releases all internal structures and closes all open files and removes them. The call to this method is valid both as a cleanup after the complete inputs were properly processed, and as an cancellation call, which cleans up all resources that are currently held by the hash join.
-
abort
public void abort()
-
getFreedMemory
public List<MemorySegment> getFreedMemory()
-
buildInitialTable
protected void buildInitialTable(MutableObjectIterator<BT> input) throws IOException
Creates the initial hash table. This method sets up partitions, hash index, and inserts the data from the given iterator.- Parameters:
input
- The iterator with the build side data.- Throws:
IOException
- Thrown, if an element could not be fetched and deserialized from the iterator, or if serialization fails.
-
buildTableFromSpilledPartition
protected void buildTableFromSpilledPartition(HashPartition<BT,PT> p) throws IOException
- Throws:
IOException
-
insertIntoTable
protected final void insertIntoTable(BT record, int hashCode) throws IOException
- Throws:
IOException
-
getNewInMemoryPartition
protected HashPartition<BT,PT> getNewInMemoryPartition(int number, int recursionLevel)
Returns a new inMemoryPartition object. This is required as a plug for ReOpenableMutableHashTable.
-
createPartitions
protected void createPartitions(int numPartitions, int recursionLevel)
-
clearPartitions
protected void clearPartitions()
This method clears all partitions currently residing (partially) in memory. It releases all memory and deletes all spilled partitions.This method is intended for a hard cleanup in the case that the join is aborted.
-
initTable
protected void initTable(int numBuckets, byte numPartitions)
-
releaseTable
protected void releaseTable()
Releases the table (the array of buckets) and returns the occupied memory segments to the list of free segments.
-
spillPartition
protected int spillPartition() throws IOException
Selects a partition and spills it. The number of the spilled partition is returned.- Returns:
- The number of the spilled partition.
- Throws:
IOException
-
buildBloomFilterForBucketsInPartition
protected final void buildBloomFilterForBucketsInPartition(int partNum, HashPartition<BT,PT> partition)
-
nextSegment
public MemorySegment nextSegment()
This is the method called by the partitions to request memory to serialize records. It automatically spills partitions, if memory runs out.- Specified by:
nextSegment
in interfaceMemorySegmentSource
- Returns:
- The next available memory segment.
-
getNumWriteBehindBuffers
public static int getNumWriteBehindBuffers(int numBuffers)
Determines the number of buffers to be used for asynchronous write behind. It is currently computed as the logarithm of the number of buffers to the base 4, rounded up, minus 2. The upper limit for the number of write behind buffers is however set to six.- Parameters:
numBuffers
- The number of available buffers.- Returns:
- The number
-
getPartitioningFanOutNoEstimates
public static int getPartitioningFanOutNoEstimates(int numBuffers)
Gets the number of partitions to be used for an initial hash-table, when no estimates are available.The current logic makes sure that there are always between 10 and 127 partitions, and close to 0.1 of the number of buffers.
- Parameters:
numBuffers
- The number of buffers available.- Returns:
- The number of partitions to use.
-
getInitialTableSize
public static int getInitialTableSize(int numBuffers, int bufferSize, int numPartitions, int recordLenBytes)
-
assignPartition
public static byte assignPartition(int bucket, byte numPartitions)
Assigns a partition to a bucket.- Parameters:
bucket
- The bucket to get the partition for.numPartitions
- The number of partitions.- Returns:
- The partition for the bucket.
-
hash
public static int hash(int code, int level)
The level parameter is needed so that we can have different hash functions when we recursively apply the partitioning, so that the working set eventually fits into memory.
-
getProbeSideComparator
public TypeComparator<PT> getProbeSideComparator()
-
-