WindowAllocationManager.java

/*
 *  Licensed to the Apache Software Foundation (ASF) under one or more
 *  contributor license agreements.  See the NOTICE file distributed with
 *  this work for additional information regarding copyright ownership.
 *  The ASF licenses this file to You under the Apache License, Version 2.0
 *  (the "License"); you may not use this file except in compliance with
 *  the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */
package org.apache.coyote.http2;

import java.util.concurrent.TimeUnit;

import org.apache.coyote.ActionCode;
import org.apache.coyote.Response;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.res.StringManager;

/**
 * Tracks whether the stream is waiting for an allocation to the stream flow control window, to the connection flow
 * control window or not waiting for an allocation and only issues allocation notifications when the stream is known to
 * be waiting for the notification.
 * <p>
 * It is possible for a stream to be waiting for a connection allocation when a stream allocation is made. Therefore
 * this class tracks the type of allocation that the stream is waiting for to ensure that notifications are correctly
 * triggered.
 * <p>
 * With the implementation at the time of writing, it is not possible for a stream to receive an unexpected connection
 * notification as these are only issues to streams in the backlog and a stream must be waiting for a connection
 * allocation in order to be placed on the backlog. However, as a precaution, this class protects against unexpected
 * connection notifications.
 * <p>
 * It is important for asynchronous processing not to notify unless a notification is expected else a dispatch will be
 * performed unnecessarily which may lead to unexpected results.
 * <p>
 * A previous implementation used separate locks for the stream and connection notifications. However, correct handling
 * of allocation waiting requires holding the stream lock when making the decision to wait. Therefore both allocations
 * need to wait on the Stream.
 */
class WindowAllocationManager {

    private static final Log log = LogFactory.getLog(WindowAllocationManager.class);
    private static final StringManager sm = StringManager.getManager(WindowAllocationManager.class);

    private static final int NONE = 0;
    private static final int STREAM = 1;
    private static final int CONNECTION = 2;

    private final Stream stream;

    private int waitingFor = NONE;

    WindowAllocationManager(Stream stream) {
        this.stream = stream;
    }

    void waitForStream(long timeout) throws InterruptedException {
        if (log.isDebugEnabled()) {
            log.debug(sm.getString("windowAllocationManager.waitFor.stream", stream.getConnectionId(),
                    stream.getIdAsString(), Long.toString(timeout)));
        }

        waitFor(STREAM, timeout);
    }


    void waitForConnection(long timeout) throws InterruptedException {
        if (log.isDebugEnabled()) {
            log.debug(sm.getString("windowAllocationManager.waitFor.connection", stream.getConnectionId(),
                    stream.getIdAsString(), Integer.toString(stream.getConnectionAllocationRequested()),
                    Long.toString(timeout)));
        }

        waitFor(CONNECTION, timeout);
    }


    void waitForStreamNonBlocking() {
        if (log.isDebugEnabled()) {
            log.debug(sm.getString("windowAllocationManager.waitForNonBlocking.stream", stream.getConnectionId(),
                    stream.getIdAsString()));
        }

        waitForNonBlocking(STREAM);
    }


    void waitForConnectionNonBlocking() {
        if (log.isDebugEnabled()) {
            log.debug(sm.getString("windowAllocationManager.waitForNonBlocking.connection", stream.getConnectionId(),
                    stream.getIdAsString()));
        }

        waitForNonBlocking(CONNECTION);
    }


    void notifyStream() {
        notify(STREAM);
    }


    void notifyConnection() {
        notify(CONNECTION);
    }


    void notifyAny() {
        notify(STREAM | CONNECTION);
    }


    boolean isWaitingForStream() {
        return isWaitingFor(STREAM);
    }


    boolean isWaitingForConnection() {
        return isWaitingFor(CONNECTION);
    }


    private boolean isWaitingFor(int waitTarget) {
        stream.windowAllocationLock.lock();
        try {
            return (waitingFor & waitTarget) > 0;
        } finally {
            stream.windowAllocationLock.unlock();
        }
    }


    private void waitFor(int waitTarget, final long timeout) throws InterruptedException {
        stream.windowAllocationLock.lock();
        try {
            if (waitingFor != NONE) {
                throw new IllegalStateException(sm.getString("windowAllocationManager.waitFor.ise",
                        stream.getConnectionId(), stream.getIdAsString()));
            }

            waitingFor = waitTarget;
            long startNanos = -1;

            // Loop to handle spurious wake-ups
            do {
                if (timeout < 0) {
                    stream.windowAllocationAvailable.await();
                } else {
                    long timeoutRemaining;
                    if (startNanos == -1) {
                        startNanos = System.nanoTime();
                        timeoutRemaining = timeout;
                    } else {
                        long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
                        if (elapsedMillis == 0) {
                            elapsedMillis = 1;
                        }
                        timeoutRemaining = timeout - elapsedMillis;
                        if (timeoutRemaining <= 0) {
                            return;
                        }
                    }
                    stream.windowAllocationAvailable.await(timeoutRemaining, TimeUnit.MILLISECONDS);
                }
            } while (waitingFor != NONE);
        } finally {
            stream.windowAllocationLock.unlock();
        }
    }


    private void waitForNonBlocking(int waitTarget) {
        stream.windowAllocationLock.lock();
        try {
            if (waitingFor == NONE) {
                waitingFor = waitTarget;
            } else if (waitingFor == waitTarget) {
                // NO-OP
                // Non-blocking post-processing may attempt to flush
            } else {
                throw new IllegalStateException(sm.getString("windowAllocationManager.waitFor.ise",
                        stream.getConnectionId(), stream.getIdAsString()));
            }
        } finally {
            stream.windowAllocationLock.unlock();
        }
    }


    private void notify(int notifyTarget) {

        stream.windowAllocationLock.lock();
        try {
            if (log.isDebugEnabled()) {
                log.debug(sm.getString("windowAllocationManager.notify", stream.getConnectionId(),
                        stream.getIdAsString(), Integer.toString(waitingFor), Integer.toString(notifyTarget)));
            }

            if ((notifyTarget & waitingFor) > NONE) {
                // Reset this here so multiple notifies (possible with a
                // backlog containing multiple streams and small window updates)
                // are handled correctly (only the first should trigger a call
                // to stream.notify(). Additional notify() calls may trigger
                // unexpected timeouts.
                waitingFor = NONE;
                Response response = stream.getCoyoteResponse();
                if (response != null) {
                    if (response.getWriteListener() == null) {
                        // Blocking, so use notify to release StreamOutputBuffer
                        if (log.isDebugEnabled()) {
                            log.debug(sm.getString("windowAllocationManager.notified", stream.getConnectionId(),
                                    stream.getIdAsString()));
                        }
                        stream.windowAllocationAvailable.signal();
                    } else {
                        // Non-blocking so dispatch
                        if (log.isDebugEnabled()) {
                            log.debug(sm.getString("windowAllocationManager.dispatched", stream.getConnectionId(),
                                    stream.getIdAsString()));
                        }
                        response.action(ActionCode.DISPATCH_WRITE, null);
                        // Need to explicitly execute dispatches on the StreamProcessor
                        // as this thread is being processed by an UpgradeProcessor
                        // which won't see this dispatch
                        response.action(ActionCode.DISPATCH_EXECUTE, null);
                    }
                }
            }
        } finally {
            stream.windowAllocationLock.unlock();
        }
    }
}