T
- the type of the elements in the queue.@Internal public class FutureCompletingBlockingQueue<T> extends Object
CompletableFuture
that is
used in the hand-over of data from a producing thread to a consuming thread. This
FutureCompletingBlockingQueue has the following features:
Rather than letting consumers block on the take()
method, or have them poll the
poll()
method, this queue offers a CompletableFuture
, obtained via the getAvailabilityFuture()
method) that gets completed whenever the queue is non-empty. A consumer
can thus subscribe to asynchronous notifications for availability by adding a handler to the
obtained CompletableFuture
.
The future may also be completed by an explicit call to notifyAvailable()
. That way
the consumer may be notified of a situation/condition without adding an element to the queue.
Availability is reset when a call to poll()
(or take()
finds an empty queue
or results in an empty queue (takes the last element).
Note that this model generally assumes that false positives are okay, meaning that the availability future completes despite there being no data availabile in the queue. The consumer is responsible for polling data and obtaining another future to wait on. This is similar to the way that Java's Monitors and Conditions can have the spurious wakeup of the waiting threads and commonly need to be used in loop with the waiting condition.
The queue supports gracefully waking up producing threads that are blocked due to the queue
capacity limits, without interrupting the thread. This is done via the wakeUpPuttingThread(int)
method.
Modifier and Type | Field and Description |
---|---|
static CompletableFuture<Void> |
AVAILABLE
A constant future that is complete, indicating availability.
|
Constructor and Description |
---|
FutureCompletingBlockingQueue() |
FutureCompletingBlockingQueue(int capacity) |
Modifier and Type | Method and Description |
---|---|
CompletableFuture<Void> |
getAvailabilityFuture()
Returns the availability future.
|
boolean |
isEmpty()
Checks whether the queue is empty.
|
void |
notifyAvailable()
Makes sure the availability future is complete, if it is not complete already.
|
T |
peek()
Get the first element from the queue without removing it.
|
T |
poll()
Get and remove the first element from the queue.
|
boolean |
put(int threadIndex,
T element)
Put an element into the queue.
|
int |
remainingCapacity()
Checks the remaining capacity in the queue.
|
int |
size()
Gets the size of the queue.
|
T |
take()
Warning: This is a dangerous method and should only be used for testing convenience.
|
void |
wakeUpPuttingThread(int threadIndex)
Gracefully wakes up the thread with the given
threadIndex if it is blocked in adding
an element. |
public static final CompletableFuture<Void> AVAILABLE
public FutureCompletingBlockingQueue()
public FutureCompletingBlockingQueue(int capacity)
public CompletableFuture<Void> getAvailabilityFuture()
notifyAvailable()
.
It is important that a completed future is no guarantee that the next call to poll()
will return a non-null element. If there are concurrent consumer, another consumer
may have taken the available element. Or there was no element in the first place, because the
future was completed through a call to notifyAvailable()
.
For that reason, it is important to call this method (to obtain a new future) every time
again after poll()
returned null and you want to wait for data.
public void notifyAvailable()
getAvailabilityFuture()
are guaranteed to be
completed.
All future calls to the method will return a completed future, until the point that the
availability is reset via calls to poll()
that leave the queue empty.
public boolean put(int threadIndex, T element) throws InterruptedException
threadIndex
- the index of the thread.element
- the element to put.InterruptedException
- when the thread is interrupted.@VisibleForTesting public T take() throws InterruptedException
Get and remove the first element from the queue. The call blocks if the queue is empty.
The problem with this method is that it may loop internally until an element is available and
that way eagerly reset the availability future. If a consumer thread is blocked in taking an
element, it will receive availability notifications from notifyAvailable()
and
immediately reset them by calling poll()
and finding the queue empty.
InterruptedException
- when the thread is interrupted.public T poll()
getAvailabilityFuture()
will then return a non-complete future that completes only the next time that the queue
becomes non-empty or the notifyAvailable()
method is called.public T peek()
public int size()
public boolean isEmpty()
public int remainingCapacity()
public void wakeUpPuttingThread(int threadIndex)
threadIndex
if it is blocked in adding
an element. to the queue. If the thread is blocked in put(int, Object)
it will
immediately return from the method with a return value of false.
If this method is called, the next time the thread with the given index is about to be blocked in adding an element, it may immediately wake up and return.
threadIndex
- The number identifying the thread.Copyright © 2014–2024 The Apache Software Foundation. All rights reserved.