Class InPlaceMutableHashTable<T>
- java.lang.Object
-
- org.apache.flink.runtime.operators.hash.AbstractMutableHashTable<T>
-
- org.apache.flink.runtime.operators.hash.InPlaceMutableHashTable<T>
-
public class InPlaceMutableHashTable<T> extends AbstractMutableHashTable<T>
This hash table supports updating elements. If the new element has the same size as the old element, then the update is done in-place. Otherwise a hole is created at the place of the old record, which will eventually be removed by a compaction.The memory is divided into three areas: - Bucket area: they contain bucket heads: an 8 byte pointer to the first link of a linked list in the record area - Record area: this contains the actual data in linked list elements. A linked list element starts with an 8 byte pointer to the next element, and then the record follows. - Staging area: This is a small, temporary storage area for writing updated records. This is needed, because before serializing a record, there is no way to know in advance how large will it be. Therefore, we can't serialize directly into the record area when we are doing an update, because if it turns out to be larger than the old record, then it would override some other record that happens to be after the old one in memory. The solution is to serialize to the staging area first, and then copy it to the place of the original if it has the same size, otherwise allocate a new linked list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in the record area, so compactions are eventually needed.
Compaction happens by deleting everything in the bucket area, and then reinserting all elements. The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. Note, that insertions never override a record that hasn't been read by the reinsertion sweep, because both the insertions and readings happen sequentially in the record area, and the insertions obviously never overtake the reading sweep.
Note: we have to abandon the old linked list element even when the updated record has a smaller size than the original, because otherwise we wouldn't know where the next record starts during a reinsertion sweep.
The number of buckets depends on how large are the records. The serializer might be able to tell us this, so in this case, we will calculate the number of buckets upfront, and won't do resizes. If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more elements are inserted than the number of buckets.
The number of memory segments given to the staging area is usually one, because it just needs to hold one record.
Note: For hashing, we couldn't just take the lower bits, but have to use a proper hash function from MathUtils because of its avalanche property, so that changing only some high bits of the original value won't leave the lower bits of the hash unaffected. This is because when choosing the bucket for a record, we mask only the lower bits (see numBucketsMask). Lots of collisions would occur when, for example, the original value that is hashed is some bitset, where lots of different values that are different only in the higher bits will actually occur.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description class
InPlaceMutableHashTable.EntryIterator
WARNING: Doing any other operation on the table invalidates the iterator!class
InPlaceMutableHashTable.HashTableProber<PT>
A prober for accessing the table.class
InPlaceMutableHashTable.ReduceFacade
A facade for doing such operations on the hash table that are needed for a reduce operator driver.
-
Field Summary
-
Fields inherited from class org.apache.flink.runtime.operators.hash.AbstractMutableHashTable
buildSideComparator, buildSideSerializer, closed, stateLock
-
-
Constructor Summary
Constructors Constructor Description InPlaceMutableHashTable(TypeSerializer<T> serializer, TypeComparator<T> comparator, List<MemorySegment> memory)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
abort()
void
close()
Closes the hash table.long
getCapacity()
Gets the total capacity of this hash table, in bytes.InPlaceMutableHashTable.EntryIterator
getEntryIterator()
Returns an iterator that can be used to iterate over all the elements in the table.List<MemorySegment>
getFreeMemory()
long
getOccupancy()
Gets the number of bytes currently occupied in this hash table.<PT> InPlaceMutableHashTable.HashTableProber<PT>
getProber(TypeComparator<PT> probeTypeComparator, TypePairComparator<PT,T> pairComparator)
void
insert(T record)
Inserts the given record into the hash table.void
insertOrReplaceRecord(T record)
Searches the hash table for a record with the given key.void
open()
Initialize the hash table-
Methods inherited from class org.apache.flink.runtime.operators.hash.AbstractMutableHashTable
getBuildSideComparator, getBuildSideSerializer
-
-
-
-
Constructor Detail
-
InPlaceMutableHashTable
public InPlaceMutableHashTable(TypeSerializer<T> serializer, TypeComparator<T> comparator, List<MemorySegment> memory)
-
-
Method Detail
-
getCapacity
public long getCapacity()
Gets the total capacity of this hash table, in bytes.- Returns:
- The hash table's total capacity.
-
getOccupancy
public long getOccupancy()
Gets the number of bytes currently occupied in this hash table.- Returns:
- The number of bytes occupied.
-
open
public void open()
Initialize the hash table- Specified by:
open
in classAbstractMutableHashTable<T>
-
close
public void close()
Description copied from class:AbstractMutableHashTable
Closes the hash table. This effectively releases all internal structures and closes all open files and removes them. The call to this method is valid both as a cleanup after the complete inputs were properly processed, and as a cancellation call, which cleans up all resources that are currently held by the hash table. If another process still accesses the hash table after close has been called, no operations will be performed.- Specified by:
close
in classAbstractMutableHashTable<T>
-
abort
public void abort()
- Specified by:
abort
in classAbstractMutableHashTable<T>
-
getFreeMemory
public List<MemorySegment> getFreeMemory()
- Specified by:
getFreeMemory
in classAbstractMutableHashTable<T>
-
insertOrReplaceRecord
public void insertOrReplaceRecord(T record) throws IOException
Searches the hash table for a record with the given key. If it is found, then it is overridden with the specified record. Otherwise, the specified record is inserted.- Specified by:
insertOrReplaceRecord
in classAbstractMutableHashTable<T>
- Parameters:
record
- The record to insert or to replace with.- Throws:
IOException
- (EOFException specifically, if memory ran out)
-
insert
public void insert(T record) throws IOException
Inserts the given record into the hash table. Note: this method doesn't care about whether a record with the same key is already present.- Specified by:
insert
in classAbstractMutableHashTable<T>
- Parameters:
record
- The record to insert.- Throws:
IOException
- (EOFException specifically, if memory ran out)
-
getEntryIterator
public InPlaceMutableHashTable.EntryIterator getEntryIterator()
Returns an iterator that can be used to iterate over all the elements in the table. WARNING: Doing any other operation on the table invalidates the iterator! (Even using getMatchFor of a prober!)- Specified by:
getEntryIterator
in classAbstractMutableHashTable<T>
- Returns:
- the iterator
-
getProber
public <PT> InPlaceMutableHashTable.HashTableProber<PT> getProber(TypeComparator<PT> probeTypeComparator, TypePairComparator<PT,T> pairComparator)
- Specified by:
getProber
in classAbstractMutableHashTable<T>
-
-