Class FutureCompletingBlockingQueue<T>

  • Type Parameters:
    T - the type of the elements in the queue.

    @Internal
    public class FutureCompletingBlockingQueue<T>
    extends Object
    A custom implementation of blocking queue in combination with a CompletableFuture that is used in the hand-over of data from a producing thread to a consuming thread. This FutureCompletingBlockingQueue has the following features:

    Consumer Notifications

    Rather than letting consumers block on the take() method, or have them poll the poll() method, this queue offers a CompletableFuture, obtained via the getAvailabilityFuture() method) that gets completed whenever the queue is non-empty. A consumer can thus subscribe to asynchronous notifications for availability by adding a handler to the obtained CompletableFuture.

    The future may also be completed by an explicit call to notifyAvailable(). That way the consumer may be notified of a situation/condition without adding an element to the queue.

    Availability is reset when a call to poll() (or take() finds an empty queue or results in an empty queue (takes the last element).

    Note that this model generally assumes that false positives are okay, meaning that the availability future completes despite there being no data availabile in the queue. The consumer is responsible for polling data and obtaining another future to wait on. This is similar to the way that Java's Monitors and Conditions can have the spurious wakeup of the waiting threads and commonly need to be used in loop with the waiting condition.

    Producer Wakeup

    The queue supports gracefully waking up producing threads that are blocked due to the queue capacity limits, without interrupting the thread. This is done via the wakeUpPuttingThread(int) method.

    • Field Detail

      • AVAILABLE

        public static final CompletableFuture<Void> AVAILABLE
        A constant future that is complete, indicating availability. Using this constant in cases that are guaranteed available helps short-circuiting some checks and avoiding volatile memory operations.
    • Constructor Detail

      • FutureCompletingBlockingQueue

        public FutureCompletingBlockingQueue()
      • FutureCompletingBlockingQueue

        public FutureCompletingBlockingQueue​(int capacity)
    • Method Detail

      • getAvailabilityFuture

        public CompletableFuture<Void> getAvailabilityFuture()
        Returns the availability future. If the queue is non-empty, then this future will already be complete. Otherwise the obtained future is guaranteed to get completed the next time the queue becomes non-empty, or a notification happens via notifyAvailable().

        It is important that a completed future is no guarantee that the next call to poll() will return a non-null element. If there are concurrent consumer, another consumer may have taken the available element. Or there was no element in the first place, because the future was completed through a call to notifyAvailable().

        For that reason, it is important to call this method (to obtain a new future) every time again after poll() returned null and you want to wait for data.

      • notifyAvailable

        public void notifyAvailable()
        Makes sure the availability future is complete, if it is not complete already. All futures returned by previous calls to getAvailabilityFuture() are guaranteed to be completed.

        All future calls to the method will return a completed future, until the point that the availability is reset via calls to poll() that leave the queue empty.

      • put

        public boolean put​(int threadIndex,
                           T element)
                    throws InterruptedException
        Put an element into the queue. The thread blocks if the queue is full.
        Parameters:
        threadIndex - the index of the thread.
        element - the element to put.
        Returns:
        true if the element has been successfully put into the queue, false otherwise.
        Throws:
        InterruptedException - when the thread is interrupted.
      • take

        @VisibleForTesting
        public T take()
               throws InterruptedException
        Warning: This is a dangerous method and should only be used for testing convenience. A method that blocks until availability does not go together well with the concept of asynchronous notifications and non-blocking polling.

        Get and remove the first element from the queue. The call blocks if the queue is empty. The problem with this method is that it may loop internally until an element is available and that way eagerly reset the availability future. If a consumer thread is blocked in taking an element, it will receive availability notifications from notifyAvailable() and immediately reset them by calling poll() and finding the queue empty.

        Returns:
        the first element in the queue.
        Throws:
        InterruptedException - when the thread is interrupted.
      • poll

        public T poll()
        Get and remove the first element from the queue. Null is returned if the queue is empty. If this makes the queue empty (takes the last element) or finds the queue already empty, then this resets the availability notifications. The next call to getAvailabilityFuture() will then return a non-complete future that completes only the next time that the queue becomes non-empty or the notifyAvailable() method is called.
        Returns:
        the first element from the queue, or Null if the queue is empty.
      • peek

        public T peek()
        Get the first element from the queue without removing it.
        Returns:
        the first element in the queue, or Null if the queue is empty.
      • size

        public int size()
        Gets the size of the queue.
      • isEmpty

        public boolean isEmpty()
        Checks whether the queue is empty.
      • remainingCapacity

        public int remainingCapacity()
        Checks the remaining capacity in the queue. That is the difference between the maximum capacity and the current number of elements in the queue.
      • wakeUpPuttingThread

        public void wakeUpPuttingThread​(int threadIndex)
        Gracefully wakes up the thread with the given threadIndex if it is blocked in adding an element. to the queue. If the thread is blocked in put(int, Object) it will immediately return from the method with a return value of false.

        If this method is called, the next time the thread with the given index is about to be blocked in adding an element, it may immediately wake up and return.

        Parameters:
        threadIndex - The number identifying the thread.