public class UnilateralSortMerger<E> extends Object implements Sorter<E>
UnilateralSortMerger
is a full fledged sorter. It implements a multi-way merge sort. Internally,
the logic is factored into three threads (read, sort, spill) which communicate through a set of blocking queues,
forming a closed loop. Memory is allocated using the MemoryManager
interface. Thus the component will
not exceed the provided memory limits.Modifier and Type | Class and Description |
---|---|
protected static class |
UnilateralSortMerger.ChannelWithBlockCount |
protected static class |
UnilateralSortMerger.CircularElement<E>
Class representing buffers that circulate between the reading, sorting and spilling thread.
|
protected static class |
UnilateralSortMerger.CircularQueues<E>
Collection of queues that are used for the communication between the threads.
|
protected static class |
UnilateralSortMerger.ReadingThread<E>
The thread that consumes the input data and puts it into a buffer that will be sorted.
|
protected static class |
UnilateralSortMerger.SortingThread<E>
The thread that sorts filled buffers.
|
protected class |
UnilateralSortMerger.SpillingThread
The thread that handles the spilling of intermediate results and sets up the merging.
|
protected static class |
UnilateralSortMerger.ThreadBase<E>
Base class for all working threads in this sort-merger.
|
Modifier and Type | Field and Description |
---|---|
protected boolean |
closed
Flag indicating that the sorter was closed.
|
protected MutableObjectIterator<E> |
iterator
The iterator to be returned by the sort-merger.
|
protected IOException |
iteratorException
The exception that is set, if the iterator cannot be created.
|
protected Object |
iteratorLock
The monitor which guards the iterator field.
|
protected static int |
MAX_NUM_WRITE_BUFFERS
The maximal number of buffers to use by the writers.
|
protected MemoryManager |
memoryManager
The memory manager through which memory is allocated and released.
|
protected static int |
MIN_NUM_SORT_MEM_SEGMENTS
The minimum number of segments that are required for the sort to operate.
|
protected static int |
MIN_NUM_WRITE_BUFFERS
The minimal number of buffers to use by the writers.
|
protected boolean |
objectReuseEnabled
Whether to reuse objects during deserialization.
|
protected List<MemorySegment> |
sortReadMemory
The memory segments used first for sorting and later for reading/pre-fetching
during the external merge.
|
protected List<MemorySegment> |
writeMemory
The memory segments used to stage data to be written.
|
Modifier | Constructor and Description |
---|---|
|
UnilateralSortMerger(MemoryManager memoryManager,
IOManager ioManager,
MutableObjectIterator<E> input,
AbstractInvokable parentTask,
TypeSerializerFactory<E> serializerFactory,
TypeComparator<E> comparator,
double memoryFraction,
int maxNumFileHandles,
float startSpillingFraction,
boolean objectReuseEnabled) |
|
UnilateralSortMerger(MemoryManager memoryManager,
IOManager ioManager,
MutableObjectIterator<E> input,
AbstractInvokable parentTask,
TypeSerializerFactory<E> serializerFactory,
TypeComparator<E> comparator,
double memoryFraction,
int numSortBuffers,
int maxNumFileHandles,
float startSpillingFraction,
boolean objectReuseEnabled) |
protected |
UnilateralSortMerger(MemoryManager memoryManager,
IOManager ioManager,
MutableObjectIterator<E> input,
AbstractInvokable parentTask,
TypeSerializerFactory<E> serializerFactory,
TypeComparator<E> comparator,
double memoryFraction,
int numSortBuffers,
int maxNumFileHandles,
float startSpillingFraction,
boolean noSpillingMemory,
boolean handleLargeRecords,
boolean objectReuseEnabled) |
|
UnilateralSortMerger(MemoryManager memoryManager,
List<MemorySegment> memory,
IOManager ioManager,
MutableObjectIterator<E> input,
AbstractInvokable parentTask,
TypeSerializerFactory<E> serializerFactory,
TypeComparator<E> comparator,
int numSortBuffers,
int maxNumFileHandles,
float startSpillingFraction,
boolean handleLargeRecords,
boolean objectReuseEnabled) |
protected |
UnilateralSortMerger(MemoryManager memoryManager,
List<MemorySegment> memory,
IOManager ioManager,
MutableObjectIterator<E> input,
AbstractInvokable parentTask,
TypeSerializerFactory<E> serializerFactory,
TypeComparator<E> comparator,
int numSortBuffers,
int maxNumFileHandles,
float startSpillingFraction,
boolean noSpillingMemory,
boolean handleLargeRecords,
boolean objectReuseEnabled) |
Modifier and Type | Method and Description |
---|---|
void |
close()
Shuts down all the threads initiated by this sort/merger.
|
protected static <T> UnilateralSortMerger.CircularElement<T> |
endMarker()
Gets the element that is passed as marker for the end of data.
|
MutableObjectIterator<E> |
getIterator()
Gets the iterator over this input.
|
protected UnilateralSortMerger.ThreadBase<E> |
getReadingThread(ExceptionHandler<IOException> exceptionHandler,
MutableObjectIterator<E> reader,
UnilateralSortMerger.CircularQueues<E> queues,
LargeRecordHandler<E> largeRecordHandler,
AbstractInvokable parentTask,
TypeSerializer<E> serializer,
long startSpillingBytes)
Creates the reading thread.
|
protected UnilateralSortMerger.ThreadBase<E> |
getSortingThread(ExceptionHandler<IOException> exceptionHandler,
UnilateralSortMerger.CircularQueues<E> queues,
AbstractInvokable parentTask) |
protected UnilateralSortMerger.ThreadBase<E> |
getSpillingThread(ExceptionHandler<IOException> exceptionHandler,
UnilateralSortMerger.CircularQueues<E> queues,
AbstractInvokable parentTask,
MemoryManager memoryManager,
IOManager ioManager,
TypeSerializerFactory<E> serializerFactory,
TypeComparator<E> comparator,
List<MemorySegment> sortReadMemory,
List<MemorySegment> writeMemory,
int maxFileHandles) |
protected void |
setResultIterator(MutableObjectIterator<E> iterator)
Sets the result iterator.
|
protected void |
setResultIteratorException(IOException ioex)
Reports an exception to all threads that are waiting for the result iterator.
|
protected static <T> UnilateralSortMerger.CircularElement<T> |
spillingMarker()
Gets the element that is passed as marker for signal beginning of spilling.
|
protected void |
startThreads()
Starts all the threads that are used by this sort-merger.
|
protected static final int MIN_NUM_WRITE_BUFFERS
protected static final int MAX_NUM_WRITE_BUFFERS
protected static final int MIN_NUM_SORT_MEM_SEGMENTS
protected final List<MemorySegment> sortReadMemory
protected final List<MemorySegment> writeMemory
protected final MemoryManager memoryManager
protected final Object iteratorLock
protected volatile MutableObjectIterator<E> iterator
protected volatile IOException iteratorException
protected volatile boolean closed
protected final boolean objectReuseEnabled
public UnilateralSortMerger(MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, double memoryFraction, int maxNumFileHandles, float startSpillingFraction, boolean objectReuseEnabled) throws IOException, MemoryAllocationException
IOException
MemoryAllocationException
public UnilateralSortMerger(MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, double memoryFraction, int numSortBuffers, int maxNumFileHandles, float startSpillingFraction, boolean objectReuseEnabled) throws IOException, MemoryAllocationException
IOException
MemoryAllocationException
public UnilateralSortMerger(MemoryManager memoryManager, List<MemorySegment> memory, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, int numSortBuffers, int maxNumFileHandles, float startSpillingFraction, boolean handleLargeRecords, boolean objectReuseEnabled) throws IOException
IOException
protected UnilateralSortMerger(MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, double memoryFraction, int numSortBuffers, int maxNumFileHandles, float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords, boolean objectReuseEnabled) throws IOException, MemoryAllocationException
IOException
MemoryAllocationException
protected UnilateralSortMerger(MemoryManager memoryManager, List<MemorySegment> memory, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, int numSortBuffers, int maxNumFileHandles, float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords, boolean objectReuseEnabled) throws IOException
IOException
protected void startThreads()
public void close()
The threads are set to exit directly, but depending on their operation, it may take a while to actually happen. The sorting thread will for example not finish before the current batch is sorted. This method attempts to wait for the working thread to exit. If it is however interrupted, the method exits immediately and is not guaranteed how long the threads continue to exist and occupy resources afterwards.
close
in interface Closeable
close
in interface AutoCloseable
Closeable.close()
protected UnilateralSortMerger.ThreadBase<E> getReadingThread(ExceptionHandler<IOException> exceptionHandler, MutableObjectIterator<E> reader, UnilateralSortMerger.CircularQueues<E> queues, LargeRecordHandler<E> largeRecordHandler, AbstractInvokable parentTask, TypeSerializer<E> serializer, long startSpillingBytes)
The returned thread is not yet started.
exceptionHandler
- The handler for exceptions in the thread.reader
- The reader from which the thread reads.queues
- The queues through which the thread communicates with the other threads.parentTask
- The task at which the thread registers itself (for profiling purposes).serializer
- The serializer used to serialize records.startSpillingBytes
- The number of bytes after which the reader thread will send the notification to
start the spilling.protected UnilateralSortMerger.ThreadBase<E> getSortingThread(ExceptionHandler<IOException> exceptionHandler, UnilateralSortMerger.CircularQueues<E> queues, AbstractInvokable parentTask)
protected UnilateralSortMerger.ThreadBase<E> getSpillingThread(ExceptionHandler<IOException> exceptionHandler, UnilateralSortMerger.CircularQueues<E> queues, AbstractInvokable parentTask, MemoryManager memoryManager, IOManager ioManager, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, List<MemorySegment> sortReadMemory, List<MemorySegment> writeMemory, int maxFileHandles)
public MutableObjectIterator<E> getIterator() throws InterruptedException
CloseableInputProvider
getIterator
in interface Sorter<E>
getIterator
in interface CloseableInputProvider<E>
InterruptedException
protected final void setResultIterator(MutableObjectIterator<E> iterator)
iterator
- The result iterator to set.protected final void setResultIteratorException(IOException ioex)
ioex
- The exception to be reported to the threads that wait for the result iterator.protected static <T> UnilateralSortMerger.CircularElement<T> endMarker()
protected static <T> UnilateralSortMerger.CircularElement<T> spillingMarker()
Copyright © 2014–2017 The Apache Software Foundation. All rights reserved.