Class FutureCompletingBlockingQueue<T>
- java.lang.Object
-
- org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue<T>
-
- Type Parameters:
T
- the type of the elements in the queue.
@Internal public class FutureCompletingBlockingQueue<T> extends Object
A custom implementation of blocking queue in combination with aCompletableFuture
that is used in the hand-over of data from a producing thread to a consuming thread. This FutureCompletingBlockingQueue has the following features:Consumer Notifications
Rather than letting consumers block on the
take()
method, or have them poll thepoll()
method, this queue offers aCompletableFuture
, obtained via thegetAvailabilityFuture()
method) that gets completed whenever the queue is non-empty. A consumer can thus subscribe to asynchronous notifications for availability by adding a handler to the obtainedCompletableFuture
.The future may also be completed by an explicit call to
notifyAvailable()
. That way the consumer may be notified of a situation/condition without adding an element to the queue.Availability is reset when a call to
poll()
(ortake()
finds an empty queue or results in an empty queue (takes the last element).Note that this model generally assumes that false positives are okay, meaning that the availability future completes despite there being no data availabile in the queue. The consumer is responsible for polling data and obtaining another future to wait on. This is similar to the way that Java's Monitors and Conditions can have the spurious wakeup of the waiting threads and commonly need to be used in loop with the waiting condition.
Producer Wakeup
The queue supports gracefully waking up producing threads that are blocked due to the queue capacity limits, without interrupting the thread. This is done via the
wakeUpPuttingThread(int)
method.
-
-
Field Summary
Fields Modifier and Type Field Description static CompletableFuture<Void>
AVAILABLE
A constant future that is complete, indicating availability.
-
Constructor Summary
Constructors Constructor Description FutureCompletingBlockingQueue()
FutureCompletingBlockingQueue(int capacity)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description CompletableFuture<Void>
getAvailabilityFuture()
Returns the availability future.boolean
isEmpty()
Checks whether the queue is empty.void
notifyAvailable()
Makes sure the availability future is complete, if it is not complete already.T
peek()
Get the first element from the queue without removing it.T
poll()
Get and remove the first element from the queue.boolean
put(int threadIndex, T element)
Put an element into the queue.int
remainingCapacity()
Checks the remaining capacity in the queue.int
size()
Gets the size of the queue.T
take()
Warning: This is a dangerous method and should only be used for testing convenience.void
wakeUpPuttingThread(int threadIndex)
Gracefully wakes up the thread with the giventhreadIndex
if it is blocked in adding an element. to the queue.
-
-
-
Field Detail
-
AVAILABLE
public static final CompletableFuture<Void> AVAILABLE
A constant future that is complete, indicating availability. Using this constant in cases that are guaranteed available helps short-circuiting some checks and avoiding volatile memory operations.
-
-
Method Detail
-
getAvailabilityFuture
public CompletableFuture<Void> getAvailabilityFuture()
Returns the availability future. If the queue is non-empty, then this future will already be complete. Otherwise the obtained future is guaranteed to get completed the next time the queue becomes non-empty, or a notification happens vianotifyAvailable()
.It is important that a completed future is no guarantee that the next call to
poll()
will return a non-null element. If there are concurrent consumer, another consumer may have taken the available element. Or there was no element in the first place, because the future was completed through a call tonotifyAvailable()
.For that reason, it is important to call this method (to obtain a new future) every time again after
poll()
returned null and you want to wait for data.
-
notifyAvailable
public void notifyAvailable()
Makes sure the availability future is complete, if it is not complete already. All futures returned by previous calls togetAvailabilityFuture()
are guaranteed to be completed.All future calls to the method will return a completed future, until the point that the availability is reset via calls to
poll()
that leave the queue empty.
-
put
public boolean put(int threadIndex, T element) throws InterruptedException
Put an element into the queue. The thread blocks if the queue is full.- Parameters:
threadIndex
- the index of the thread.element
- the element to put.- Returns:
- true if the element has been successfully put into the queue, false otherwise.
- Throws:
InterruptedException
- when the thread is interrupted.
-
take
@VisibleForTesting public T take() throws InterruptedException
Warning: This is a dangerous method and should only be used for testing convenience. A method that blocks until availability does not go together well with the concept of asynchronous notifications and non-blocking polling.Get and remove the first element from the queue. The call blocks if the queue is empty. The problem with this method is that it may loop internally until an element is available and that way eagerly reset the availability future. If a consumer thread is blocked in taking an element, it will receive availability notifications from
notifyAvailable()
and immediately reset them by callingpoll()
and finding the queue empty.- Returns:
- the first element in the queue.
- Throws:
InterruptedException
- when the thread is interrupted.
-
poll
public T poll()
Get and remove the first element from the queue. Null is returned if the queue is empty. If this makes the queue empty (takes the last element) or finds the queue already empty, then this resets the availability notifications. The next call togetAvailabilityFuture()
will then return a non-complete future that completes only the next time that the queue becomes non-empty or thenotifyAvailable()
method is called.- Returns:
- the first element from the queue, or Null if the queue is empty.
-
peek
public T peek()
Get the first element from the queue without removing it.- Returns:
- the first element in the queue, or Null if the queue is empty.
-
size
public int size()
Gets the size of the queue.
-
isEmpty
public boolean isEmpty()
Checks whether the queue is empty.
-
remainingCapacity
public int remainingCapacity()
Checks the remaining capacity in the queue. That is the difference between the maximum capacity and the current number of elements in the queue.
-
wakeUpPuttingThread
public void wakeUpPuttingThread(int threadIndex)
Gracefully wakes up the thread with the giventhreadIndex
if it is blocked in adding an element. to the queue. If the thread is blocked input(int, Object)
it will immediately return from the method with a return value of false.If this method is called, the next time the thread with the given index is about to be blocked in adding an element, it may immediately wake up and return.
- Parameters:
threadIndex
- The number identifying the thread.
-
-