Stream.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.coyote.http2;
import java.io.IOException;
import java.io.StringReader;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import jakarta.servlet.RequestDispatcher;
import org.apache.coyote.ActionCode;
import org.apache.coyote.CloseNowException;
import org.apache.coyote.InputBuffer;
import org.apache.coyote.Request;
import org.apache.coyote.Response;
import org.apache.coyote.http11.HttpOutputBuffer;
import org.apache.coyote.http11.OutputFilter;
import org.apache.coyote.http11.filters.SavedRequestInputFilter;
import org.apache.coyote.http11.filters.VoidOutputFilter;
import org.apache.coyote.http2.HpackDecoder.HeaderEmitter;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.buf.ByteChunk;
import org.apache.tomcat.util.buf.MessageBytes;
import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.http.parser.Host;
import org.apache.tomcat.util.http.parser.Priority;
import org.apache.tomcat.util.net.ApplicationBufferHandler;
import org.apache.tomcat.util.net.WriteBuffer;
import org.apache.tomcat.util.res.StringManager;
class Stream extends AbstractNonZeroStream implements HeaderEmitter {
private static final Log log = LogFactory.getLog(Stream.class);
private static final StringManager sm = StringManager.getManager(Stream.class);
private static final int HEADER_STATE_START = 0;
private static final int HEADER_STATE_PSEUDO = 1;
private static final int HEADER_STATE_REGULAR = 2;
private static final int HEADER_STATE_TRAILER = 3;
private static final MimeHeaders ACK_HEADERS;
private static final Integer HTTP_UPGRADE_STREAM = Integer.valueOf(1);
private static final Set<String> HTTP_CONNECTION_SPECIFIC_HEADERS = new HashSet<>();
static {
Response response = new Response();
response.setStatus(100);
StreamProcessor.prepareHeaders(null, response, true, null, null);
ACK_HEADERS = response.getMimeHeaders();
HTTP_CONNECTION_SPECIFIC_HEADERS.add("connection");
HTTP_CONNECTION_SPECIFIC_HEADERS.add("proxy-connection");
HTTP_CONNECTION_SPECIFIC_HEADERS.add("keep-alive");
HTTP_CONNECTION_SPECIFIC_HEADERS.add("transfer-encoding");
HTTP_CONNECTION_SPECIFIC_HEADERS.add("upgrade");
}
private volatile long contentLengthReceived = 0;
private final Http2UpgradeHandler handler;
private final WindowAllocationManager allocationManager = new WindowAllocationManager(this);
private final Request coyoteRequest;
private final Response coyoteResponse;
private final StreamInputBuffer inputBuffer;
private final StreamOutputBuffer streamOutputBuffer = new StreamOutputBuffer();
private final Http2OutputBuffer http2OutputBuffer;
private final AtomicBoolean removedFromActiveCount = new AtomicBoolean(false);
// State machine would be too much overhead
private int headerState = HEADER_STATE_START;
private StreamException headerException = null;
private volatile StringBuilder cookieHeader = null;
private volatile boolean hostHeaderSeen = false;
private Object pendingWindowUpdateForStreamLock = new Object();
private int pendingWindowUpdateForStream = 0;
private volatile int urgency = Priority.DEFAULT_URGENCY;
private volatile boolean incremental = Priority.DEFAULT_INCREMENTAL;
private final Object recycledLock = new Object();
private volatile boolean recycled = false;
Stream(Integer identifier, Http2UpgradeHandler handler) {
this(identifier, handler, null);
}
Stream(Integer identifier, Http2UpgradeHandler handler, Request coyoteRequest) {
super(handler.getConnectionId(), identifier);
this.handler = handler;
setWindowSize(handler.getRemoteSettings().getInitialWindowSize());
if (coyoteRequest == null) {
// HTTP/2 new request
this.coyoteRequest = handler.getProtocol().popRequestAndResponse();
this.coyoteResponse = this.coyoteRequest.getResponse();
this.inputBuffer = new StandardStreamInputBuffer();
this.coyoteRequest.setInputBuffer(inputBuffer);
} else {
// HTTP/1.1 upgrade
/*
* Implementation note. The request passed in is always newly created so it is safe to recycle it for re-use
* in the Stream.recyle() method. Need to create a matching, new response.
*/
this.coyoteRequest = coyoteRequest;
this.coyoteResponse = new Response();
this.coyoteRequest.setResponse(coyoteResponse);
this.inputBuffer =
new SavedRequestStreamInputBuffer((SavedRequestInputFilter) this.coyoteRequest.getInputBuffer());
// Headers have been read by this point
state.receivedStartOfHeaders();
if (HTTP_UPGRADE_STREAM.equals(identifier)) {
// Populate coyoteRequest from headers (HTTP/1.1 only)
try {
prepareRequest();
} catch (IllegalArgumentException iae) {
// Something in the headers is invalid
// Set correct return status
coyoteResponse.setStatus(400);
// Set error flag. This triggers error processing rather than
// the normal mapping
coyoteResponse.setError();
}
}
// Request body, if any, has been read and buffered
state.receivedEndOfStream();
}
this.coyoteRequest.setSendfile(handler.hasAsyncIO() && handler.getProtocol().getUseSendfile());
http2OutputBuffer = new Http2OutputBuffer(this.coyoteResponse, streamOutputBuffer);
this.coyoteResponse.setOutputBuffer(http2OutputBuffer);
this.coyoteRequest.setResponse(coyoteResponse);
this.coyoteRequest.protocol().setString("HTTP/2.0");
this.coyoteRequest.setStartTimeNanos(System.nanoTime());
}
private void prepareRequest() {
if (coyoteRequest.scheme().isNull()) {
if (handler.getProtocol().getHttp11Protocol().isSSLEnabled()) {
coyoteRequest.scheme().setString("https");
} else {
coyoteRequest.scheme().setString("http");
}
}
MessageBytes hostValueMB = coyoteRequest.getMimeHeaders().getUniqueValue("host");
if (hostValueMB == null) {
throw new IllegalArgumentException();
}
// This processing expects bytes. Trigger a conversion if required.
hostValueMB.toBytes();
ByteChunk valueBC = hostValueMB.getByteChunk();
byte[] valueB = valueBC.getBytes();
int valueL = valueBC.getLength();
int valueS = valueBC.getStart();
int colonPos = Host.parse(hostValueMB);
if (colonPos != -1) {
int port = 0;
for (int i = colonPos + 1; i < valueL; i++) {
char c = (char) valueB[i + valueS];
if (c < '0' || c > '9') {
throw new IllegalArgumentException();
}
port = port * 10 + c - '0';
}
coyoteRequest.setServerPort(port);
// Only need to copy the host name up to the :
valueL = colonPos;
}
// Extract the host name
char[] hostNameC = new char[valueL];
for (int i = 0; i < valueL; i++) {
hostNameC[i] = (char) valueB[i + valueS];
}
coyoteRequest.serverName().setChars(hostNameC, 0, valueL);
}
final void receiveReset(long errorCode) {
if (log.isTraceEnabled()) {
log.trace(
sm.getString("stream.reset.receive", getConnectionId(), getIdAsString(), Long.toString(errorCode)));
}
// Set the new state first since read and write both check this
state.receivedReset();
// Reads wait internally so need to call a method to break the wait()
inputBuffer.receiveReset();
cancelAllocationRequests();
}
final void cancelAllocationRequests() {
allocationManager.notifyAny();
}
@Override
final void incrementWindowSize(int windowSizeIncrement) throws Http2Exception {
windowAllocationLock.lock();
try {
// If this is zero then any thread that has been trying to write for
// this stream will be waiting. Notify that thread it can continue. Use
// notify all even though only one thread is waiting to be on the safe
// side.
boolean notify = getWindowSize() < 1;
super.incrementWindowSize(windowSizeIncrement);
if (notify && getWindowSize() > 0) {
allocationManager.notifyStream();
}
} finally {
windowAllocationLock.unlock();
}
}
final int reserveWindowSize(int reservation, boolean block) throws IOException {
windowAllocationLock.lock();
try {
long windowSize = getWindowSize();
while (windowSize < 1) {
if (!canWrite()) {
throw new CloseNowException(sm.getString("stream.notWritable", getConnectionId(), getIdAsString()));
}
if (block) {
try {
long writeTimeout = handler.getProtocol().getStreamWriteTimeout();
allocationManager.waitForStream(writeTimeout);
windowSize = getWindowSize();
if (windowSize == 0) {
doStreamCancel(sm.getString("stream.writeTimeout"), Http2Error.ENHANCE_YOUR_CALM);
}
} catch (InterruptedException e) {
// Possible shutdown / rst or similar. Use an IOException to
// signal to the client that further I/O isn't possible for this
// Stream.
throw new IOException(e);
}
} else {
allocationManager.waitForStreamNonBlocking();
return 0;
}
}
int allocation;
if (windowSize < reservation) {
allocation = (int) windowSize;
} else {
allocation = reservation;
}
decrementWindowSize(allocation);
return allocation;
} finally {
windowAllocationLock.unlock();
}
}
void doStreamCancel(String msg, Http2Error error) throws CloseNowException {
StreamException se = new StreamException(msg, error, getIdAsInt());
// Prevent the application making further writes
streamOutputBuffer.closed = true;
// Prevent Tomcat's error handling trying to write
coyoteResponse.setError();
coyoteResponse.setErrorReported();
// Trigger a reset once control returns to Tomcat
streamOutputBuffer.reset = se;
throw new CloseNowException(msg, se);
}
void waitForConnectionAllocation(long timeout) throws InterruptedException {
allocationManager.waitForConnection(timeout);
}
void waitForConnectionAllocationNonBlocking() {
allocationManager.waitForConnectionNonBlocking();
}
void notifyConnection() {
allocationManager.notifyConnection();
}
@Override
public final void emitHeader(String name, String value) throws HpackException {
if (log.isTraceEnabled()) {
log.trace(sm.getString("stream.header.debug", getConnectionId(), getIdAsString(), name, value));
}
// Header names must be lower case
if (!name.toLowerCase(Locale.US).equals(name)) {
throw new HpackException(sm.getString("stream.header.case", getConnectionId(), getIdAsString(), name));
}
if (HTTP_CONNECTION_SPECIFIC_HEADERS.contains(name)) {
throw new HpackException(
sm.getString("stream.header.connection", getConnectionId(), getIdAsString(), name));
}
if ("te".equals(name)) {
if (!"trailers".equals(value)) {
throw new HpackException(sm.getString("stream.header.te", getConnectionId(), getIdAsString(), value));
}
}
if (headerException != null) {
// Don't bother processing the header since the stream is going to
// be reset anyway
return;
}
if (name.length() == 0) {
throw new HpackException(sm.getString("stream.header.empty", getConnectionId(), getIdAsString()));
}
boolean pseudoHeader = name.charAt(0) == ':';
if (pseudoHeader && headerState != HEADER_STATE_PSEUDO) {
headerException = new StreamException(
sm.getString("stream.header.unexpectedPseudoHeader", getConnectionId(), getIdAsString(), name),
Http2Error.PROTOCOL_ERROR, getIdAsInt());
// No need for further processing. The stream will be reset.
return;
}
if (headerState == HEADER_STATE_PSEUDO && !pseudoHeader) {
headerState = HEADER_STATE_REGULAR;
}
switch (name) {
case ":method": {
if (coyoteRequest.method().isNull()) {
coyoteRequest.method().setString(value);
if ("HEAD".equals(value)) {
configureVoidOutputFilter();
}
} else {
throw new HpackException(
sm.getString("stream.header.duplicate", getConnectionId(), getIdAsString(), ":method"));
}
break;
}
case ":scheme": {
if (coyoteRequest.scheme().isNull()) {
coyoteRequest.scheme().setString(value);
} else {
throw new HpackException(
sm.getString("stream.header.duplicate", getConnectionId(), getIdAsString(), ":scheme"));
}
break;
}
case ":path": {
if (!coyoteRequest.requestURI().isNull()) {
throw new HpackException(
sm.getString("stream.header.duplicate", getConnectionId(), getIdAsString(), ":path"));
}
if (value.length() == 0) {
throw new HpackException(sm.getString("stream.header.noPath", getConnectionId(), getIdAsString()));
}
int queryStart = value.indexOf('?');
String uri;
if (queryStart == -1) {
uri = value;
} else {
uri = value.substring(0, queryStart);
String query = value.substring(queryStart + 1);
coyoteRequest.queryString().setString(query);
}
// Bug 61120. Set the URI as bytes rather than String so:
// - any path parameters are correctly processed
// - the normalization security checks are performed that prevent
// directory traversal attacks
byte[] uriBytes = uri.getBytes(StandardCharsets.ISO_8859_1);
coyoteRequest.requestURI().setBytes(uriBytes, 0, uriBytes.length);
break;
}
case ":authority": {
if (coyoteRequest.serverName().isNull()) {
parseAuthority(value, false);
} else {
throw new HpackException(
sm.getString("stream.header.duplicate", getConnectionId(), getIdAsString(), ":authority"));
}
break;
}
case "cookie": {
// Cookie headers need to be concatenated into a single header
// See RFC 7540 8.1.2.5
if (cookieHeader == null) {
cookieHeader = new StringBuilder();
} else {
cookieHeader.append("; ");
}
cookieHeader.append(value);
break;
}
case "host": {
if (coyoteRequest.serverName().isNull()) {
// No :authority header. This is first host header. Use it.
hostHeaderSeen = true;
parseAuthority(value, true);
} else if (!hostHeaderSeen) {
// First host header - must be consistent with :authority
hostHeaderSeen = true;
compareAuthority(value);
} else {
// Multiple hosts headers - illegal
throw new HpackException(
sm.getString("stream.header.duplicate", getConnectionId(), getIdAsString(), "host"));
}
break;
}
case "priority": {
try {
Priority p = Priority.parsePriority(new StringReader(value));
setUrgency(p.getUrgency());
setIncremental(p.getIncremental());
} catch (IOException ioe) {
// Not possible with StringReader
}
break;
}
default: {
if (headerState == HEADER_STATE_TRAILER && !handler.getProtocol().isTrailerHeaderAllowed(name)) {
break;
}
if ("expect".equals(name) && "100-continue".equals(value)) {
coyoteRequest.setExpectation(true);
}
if (pseudoHeader) {
headerException = new StreamException(
sm.getString("stream.header.unknownPseudoHeader", getConnectionId(), getIdAsString(), name),
Http2Error.PROTOCOL_ERROR, getIdAsInt());
}
if (headerState == HEADER_STATE_TRAILER) {
// HTTP/2 headers are already always lower case
coyoteRequest.getMimeTrailerFields().addValue(name).setString(value);
} else {
coyoteRequest.getMimeHeaders().addValue(name).setString(value);
}
}
}
}
void configureVoidOutputFilter() {
addOutputFilter(new VoidOutputFilter());
// Prevent further writes by the application
streamOutputBuffer.closed = true;
}
private void parseAuthority(String value, boolean host) throws HpackException {
int i;
try {
i = Host.parse(value);
} catch (IllegalArgumentException iae) {
// Host value invalid
throw new HpackException(sm.getString("stream.header.invalid", getConnectionId(), getIdAsString(),
host ? "host" : ":authority", value));
}
if (i > -1) {
coyoteRequest.serverName().setString(value.substring(0, i));
coyoteRequest.setServerPort(Integer.parseInt(value.substring(i + 1)));
} else {
coyoteRequest.serverName().setString(value);
}
}
private void compareAuthority(String value) throws HpackException {
int i;
try {
i = Host.parse(value);
} catch (IllegalArgumentException iae) {
// Host value invalid
throw new HpackException(
sm.getString("stream.header.invalid", getConnectionId(), getIdAsString(), "host", value));
}
if (i == -1 && (!value.equals(coyoteRequest.serverName().getString()) || coyoteRequest.getServerPort() != -1) ||
i > -1 && ((!value.substring(0, i).equals(coyoteRequest.serverName().getString()) ||
Integer.parseInt(value.substring(i + 1)) != coyoteRequest.getServerPort()))) {
// Host value inconsistent
throw new HpackException(sm.getString("stream.host.inconsistent", getConnectionId(), getIdAsString(), value,
coyoteRequest.serverName().getString(), Integer.toString(coyoteRequest.getServerPort())));
}
}
@Override
public void setHeaderException(StreamException streamException) {
if (headerException == null) {
headerException = streamException;
}
}
@Override
public void validateHeaders() throws StreamException {
if (headerException == null) {
return;
}
handler.getHpackDecoder().setHeaderEmitter(Http2UpgradeHandler.HEADER_SINK);
throw headerException;
}
final boolean receivedEndOfHeaders() throws ConnectionException {
if (coyoteRequest.method().isNull() || coyoteRequest.scheme().isNull() ||
!coyoteRequest.method().equals("CONNECT") && coyoteRequest.requestURI().isNull()) {
throw new ConnectionException(sm.getString("stream.header.required", getConnectionId(), getIdAsString()),
Http2Error.PROTOCOL_ERROR);
}
// Cookie headers need to be concatenated into a single header
// See RFC 7540 8.1.2.5
// Can only do this once the headers are fully received
if (cookieHeader != null) {
coyoteRequest.getMimeHeaders().addValue("cookie").setString(cookieHeader.toString());
}
return headerState == HEADER_STATE_REGULAR || headerState == HEADER_STATE_PSEUDO;
}
final void writeHeaders() throws IOException {
boolean endOfStream = streamOutputBuffer.hasNoBody() && coyoteResponse.getTrailerFields() == null;
handler.writeHeaders(this, coyoteResponse.getMimeHeaders(), endOfStream, Constants.DEFAULT_HEADERS_FRAME_SIZE);
}
final void addOutputFilter(OutputFilter filter) {
http2OutputBuffer.addFilter(filter);
}
final void writeTrailers() throws IOException {
Supplier<Map<String,String>> supplier = coyoteResponse.getTrailerFields();
if (supplier == null) {
// No supplier was set, end of stream will already have been sent
return;
}
/*
* Need a dedicated MimeHeaders for trailers as the MimeHeaders from the response needs to be retained in case
* the access log needs to log header values.
*/
MimeHeaders mimeHeaders = new MimeHeaders();
Map<String,String> headerMap = supplier.get();
if (headerMap == null) {
headerMap = Collections.emptyMap();
}
// Copy the contents of the Map to the MimeHeaders
// TODO: Is there benefit in refactoring this? Is MimeHeaders too
// heavyweight? Can we reduce the copy/conversions?
for (Map.Entry<String,String> headerEntry : headerMap.entrySet()) {
MessageBytes mb = mimeHeaders.addValue(headerEntry.getKey());
mb.setString(headerEntry.getValue());
}
handler.writeHeaders(this, mimeHeaders, true, Constants.DEFAULT_HEADERS_FRAME_SIZE);
}
final void writeAck() throws IOException {
handler.writeHeaders(this, ACK_HEADERS, false, Constants.DEFAULT_HEADERS_ACK_FRAME_SIZE);
}
final void writeEarlyHints() throws IOException {
MimeHeaders headers = coyoteResponse.getMimeHeaders();
String originalStatus = headers.getHeader(":status");
headers.setValue(":status").setString("103");
try {
handler.writeHeaders(this, headers, false, Constants.DEFAULT_HEADERS_FRAME_SIZE);
} finally {
if (originalStatus == null) {
headers.removeHeader(":status");
} else {
headers.setValue(":status").setString(originalStatus);
}
}
}
@Override
final String getConnectionId() {
return handler.getConnectionId();
}
final Request getCoyoteRequest() {
return coyoteRequest;
}
final Response getCoyoteResponse() {
return coyoteResponse;
}
@Override
final ByteBuffer getInputByteBuffer(boolean create) {
return inputBuffer.getInBuffer(create);
}
final void receivedStartOfHeaders(boolean headersEndStream) throws Http2Exception {
if (headerState == HEADER_STATE_START) {
headerState = HEADER_STATE_PSEUDO;
handler.getHpackDecoder().setMaxHeaderCount(handler.getProtocol().getMaxHeaderCount());
handler.getHpackDecoder().setMaxHeaderSize(handler.getProtocol().getMaxHeaderSize());
} else if (headerState == HEADER_STATE_PSEUDO || headerState == HEADER_STATE_REGULAR) {
// Trailer headers MUST include the end of stream flag
if (headersEndStream) {
headerState = HEADER_STATE_TRAILER;
handler.getHpackDecoder().setMaxHeaderCount(handler.getProtocol().getMaxTrailerCount());
handler.getHpackDecoder().setMaxHeaderSize(handler.getProtocol().getMaxTrailerSize());
} else {
throw new ConnectionException(
sm.getString("stream.trailerHeader.noEndOfStream", getConnectionId(), getIdAsString()),
Http2Error.PROTOCOL_ERROR);
}
}
// Parser will catch attempt to send a headers frame after the stream
// has closed.
state.receivedStartOfHeaders();
}
@Override
final void receivedData(int payloadSize) throws Http2Exception {
contentLengthReceived += payloadSize;
long contentLengthHeader = coyoteRequest.getContentLengthLong();
if (contentLengthHeader > -1 && contentLengthReceived > contentLengthHeader) {
throw new ConnectionException(
sm.getString("stream.header.contentLength", getConnectionId(), getIdAsString(),
Long.valueOf(contentLengthHeader), Long.valueOf(contentLengthReceived)),
Http2Error.PROTOCOL_ERROR);
}
}
final void receivedEndOfStream() throws ConnectionException {
if (isContentLengthInconsistent()) {
throw new ConnectionException(
sm.getString("stream.header.contentLength", getConnectionId(), getIdAsString(),
Long.valueOf(coyoteRequest.getContentLengthLong()), Long.valueOf(contentLengthReceived)),
Http2Error.PROTOCOL_ERROR);
}
state.receivedEndOfStream();
inputBuffer.notifyEof();
}
final boolean isContentLengthInconsistent() {
long contentLengthHeader = coyoteRequest.getContentLengthLong();
if (contentLengthHeader > -1 && contentLengthReceived != contentLengthHeader) {
return true;
}
return false;
}
final void sentHeaders() {
state.sentHeaders();
}
final void sentEndOfStream() {
streamOutputBuffer.endOfStreamSent = true;
state.sentEndOfStream();
}
final boolean isReadyForWrite() {
return streamOutputBuffer.isReady();
}
final boolean flush(boolean block) throws IOException {
return streamOutputBuffer.flush(block);
}
final StreamInputBuffer getInputBuffer() {
return inputBuffer;
}
final HttpOutputBuffer getOutputBuffer() {
return http2OutputBuffer;
}
final boolean isActive() {
return state.isActive();
}
final boolean canWrite() {
return state.canWrite();
}
final void closeIfIdle() {
state.closeIfIdle();
}
final boolean isInputFinished() {
return !state.isFrameTypePermitted(FrameType.DATA);
}
final void close(Http2Exception http2Exception) {
if (http2Exception instanceof StreamException) {
try {
StreamException se = (StreamException) http2Exception;
if (log.isTraceEnabled()) {
log.trace(sm.getString("stream.reset.send", getConnectionId(), getIdAsString(), se.getError()));
}
// Need to update state atomically with the sending of the RST
// frame else other threads currently working with this stream
// may see the state change and send a RST frame before the RST
// frame triggered by this thread. If that happens the client
// may see out of order RST frames which may hard to follow if
// the client is unaware the RST frames may be received out of
// order.
handler.sendStreamReset(state, se);
cancelAllocationRequests();
inputBuffer.swallowUnread();
} catch (IOException ioe) {
ConnectionException ce =
new ConnectionException(sm.getString("stream.reset.fail", getConnectionId(), getIdAsString()),
Http2Error.PROTOCOL_ERROR, ioe);
handler.closeConnection(ce);
}
} else {
handler.closeConnection(http2Exception);
}
replace();
}
/*
* This method calls the handler to replace this stream with an implementation that uses less memory. This is useful
* because Stream instances are retained for a period after the Stream closes.
*/
final void replace() {
int remaining;
// May be null if stream was closed before any DATA frames were processed.
ByteBuffer inputByteBuffer = getInputByteBuffer(false);
if (inputByteBuffer == null) {
remaining = 0;
} else {
remaining = inputByteBuffer.remaining();
}
handler.replaceStream(this, new RecycledStream(getConnectionId(), getIdentifier(), state, remaining));
}
/*
* This method is called recycle for consistency with the rest of the Tomcat code base. It does not recycle the
* Stream since Stream objects are not re-used. It does recycle the request and response objects and ensures that
* this is only done once.
*
* replace() should have been called before calling this method.
*
* It is important that this method is not called until any concurrent processing for the stream has completed. This
* is currently achieved by:
*
* - only the StreamProcessor calls this method
*
* - the Http2UpgradeHandler does not call this method
*
* - this method is called once the StreamProcessor considers the Stream closed
*
* In theory, the protection against duplicate calls is not required in this method (the code in StreamProcessor
* should be sufficient) but it is implemented as precaution along with the WARN level logging.
*/
final void recycle() {
if (recycled) {
log.warn(sm.getString("stream.recycle.duplicate", getConnectionId(), getIdAsString()));
return;
}
synchronized (recycledLock) {
if (recycled) {
log.warn(sm.getString("stream.recycle.duplicate", getConnectionId(), getIdAsString()));
return;
}
recycled = true;
}
if (log.isTraceEnabled()) {
log.trace(sm.getString("stream.recycle.first", getConnectionId(), getIdAsString()));
}
coyoteRequest.recycle();
coyoteResponse.recycle();
handler.getProtocol().pushRequestAndResponse(coyoteRequest);
}
boolean isTrailerFieldsReady() {
// Once EndOfStream has been received, canRead will be false
return !state.canRead();
}
boolean isTrailerFieldsSupported() {
return !streamOutputBuffer.endOfStreamSent;
}
StreamException getResetException() {
return streamOutputBuffer.reset;
}
int getWindowUpdateSizeToWrite(int increment) {
int result;
int threshold = handler.getProtocol().getOverheadWindowUpdateThreshold();
synchronized (pendingWindowUpdateForStreamLock) {
if (increment > threshold) {
result = increment + pendingWindowUpdateForStream;
pendingWindowUpdateForStream = 0;
} else {
pendingWindowUpdateForStream += increment;
if (pendingWindowUpdateForStream > threshold) {
result = pendingWindowUpdateForStream;
pendingWindowUpdateForStream = 0;
} else {
result = 0;
}
}
}
return result;
}
public int getUrgency() {
return urgency;
}
public void setUrgency(int urgency) {
this.urgency = urgency;
}
public boolean getIncremental() {
return incremental;
}
public void setIncremental(boolean incremental) {
this.incremental = incremental;
}
int decrementAndGetActiveRemoteStreamCount() {
/*
* Protect against mis-counting of active streams. This method should only be called once per stream but since
* the count of active streams is used to enforce the maximum concurrent streams limit, make sure each stream is
* only removed from the active count exactly once.
*/
if (removedFromActiveCount.compareAndSet(false, true)) {
return handler.activeRemoteStreamCount.decrementAndGet();
} else {
return handler.activeRemoteStreamCount.get();
}
}
class StreamOutputBuffer implements HttpOutputBuffer, WriteBuffer.Sink {
private final Lock writeLock = new ReentrantLock();
private final ByteBuffer buffer = ByteBuffer.allocate(8 * 1024);
private final WriteBuffer writeBuffer = new WriteBuffer(32 * 1024);
// Flag that indicates that data was left over on a previous
// non-blocking write. Once set, this flag stays set until all the data
// has been written.
private boolean dataLeft;
private volatile long written = 0;
private int streamReservation = 0;
private volatile boolean closed = false;
private volatile StreamException reset = null;
private volatile boolean endOfStreamSent = false;
/*
* The write methods share a common lock to ensure that only one thread at a time is able to access the buffer.
* Without this protection, a client that performed concurrent writes could corrupt the buffer.
*/
@Override
public final int doWrite(ByteBuffer chunk) throws IOException {
writeLock.lock();
try {
if (closed) {
throw new IOException(sm.getString("stream.closed", getConnectionId(), getIdAsString()));
}
// chunk is always fully written
int result = chunk.remaining();
if (writeBuffer.isEmpty()) {
int chunkLimit = chunk.limit();
while (chunk.remaining() > 0) {
int thisTime = Math.min(buffer.remaining(), chunk.remaining());
chunk.limit(chunk.position() + thisTime);
buffer.put(chunk);
chunk.limit(chunkLimit);
if (chunk.remaining() > 0 && !buffer.hasRemaining()) {
// Only flush if we have more data to write and the buffer
// is full
if (flush(true, coyoteResponse.getWriteListener() == null)) {
writeBuffer.add(chunk);
dataLeft = true;
break;
}
}
}
} else {
writeBuffer.add(chunk);
}
written += result;
return result;
} finally {
writeLock.unlock();
}
}
final boolean flush(boolean block) throws IOException {
writeLock.lock();
try {
/*
* Need to ensure that there is exactly one call to flush even when there is no data to write. Too few
* calls (i.e. zero) and the end of stream message is not sent for a completed asynchronous write. Too
* many calls and the end of stream message is sent too soon and trailer headers are not sent.
*/
boolean dataInBuffer = buffer.position() > 0;
boolean flushed = false;
if (dataInBuffer) {
dataInBuffer = flush(false, block);
flushed = true;
}
if (dataInBuffer) {
dataLeft = true;
} else {
if (writeBuffer.isEmpty()) {
// Both buffer and writeBuffer are empty.
if (flushed) {
dataLeft = false;
} else {
dataLeft = flush(false, block);
}
} else {
dataLeft = writeBuffer.write(this, block);
}
}
return dataLeft;
} finally {
writeLock.unlock();
}
}
private boolean flush(boolean writeInProgress, boolean block) throws IOException {
writeLock.lock();
try {
if (log.isTraceEnabled()) {
log.trace(sm.getString("stream.outputBuffer.flush.debug", getConnectionId(), getIdAsString(),
Integer.toString(buffer.position()), Boolean.toString(writeInProgress),
Boolean.toString(closed)));
}
if (buffer.position() == 0) {
if (closed && !endOfStreamSent) {
// Handling this special case here is simpler than trying
// to modify the following code to handle it.
handler.writeBody(Stream.this, buffer, 0, coyoteResponse.getTrailerFields() == null);
}
// Buffer is empty. Nothing to do.
return false;
}
buffer.flip();
int left = buffer.remaining();
while (left > 0) {
if (streamReservation == 0) {
streamReservation = reserveWindowSize(left, block);
if (streamReservation == 0) {
// Must be non-blocking.
// Note: Can't add to the writeBuffer here as the write
// may originate from the writeBuffer.
buffer.compact();
return true;
}
}
while (streamReservation > 0) {
int connectionReservation = handler.reserveWindowSize(Stream.this, streamReservation, block);
if (connectionReservation == 0) {
// Must be non-blocking.
// Note: Can't add to the writeBuffer here as the write
// may originate from the writeBuffer.
buffer.compact();
return true;
}
// Do the write
handler.writeBody(Stream.this, buffer, connectionReservation, !writeInProgress && closed &&
left == connectionReservation && coyoteResponse.getTrailerFields() == null);
streamReservation -= connectionReservation;
left -= connectionReservation;
}
}
buffer.clear();
return false;
} finally {
writeLock.unlock();
}
}
final boolean isReady() {
writeLock.lock();
try {
// Bug 63682
// Only want to return false if the window size is zero AND we are
// already waiting for an allocation.
if (getWindowSize() > 0 && allocationManager.isWaitingForStream() ||
handler.getWindowSize() > 0 && allocationManager.isWaitingForConnection() || dataLeft) {
return false;
} else {
return true;
}
} finally {
writeLock.unlock();
}
}
@Override
public final long getBytesWritten() {
return written;
}
@Override
public final void end() throws IOException {
if (reset != null) {
throw new CloseNowException(reset);
}
if (!closed) {
closed = true;
flush(true);
writeTrailers();
}
}
/**
* @return <code>true</code> if it is certain that the associated response has no body.
*/
final boolean hasNoBody() {
return ((written == 0) && closed);
}
@Override
public void flush() throws IOException {
/*
* This method should only be called during blocking I/O. All the Servlet API calls that end up here are
* illegal during non-blocking I/O. Servlet 5.4. However, the wording Servlet specification states that the
* behaviour is undefined so we do the best we can which is to perform a flush using blocking I/O or
* non-blocking I/O based depending which is currently in use.
*/
flush(getCoyoteResponse().getWriteListener() == null);
}
@Override
public boolean writeFromBuffer(ByteBuffer src, boolean blocking) throws IOException {
writeLock.lock();
try {
int chunkLimit = src.limit();
while (src.remaining() > 0) {
int thisTime = Math.min(buffer.remaining(), src.remaining());
src.limit(src.position() + thisTime);
buffer.put(src);
src.limit(chunkLimit);
if (flush(false, blocking)) {
return true;
}
}
return false;
} finally {
writeLock.unlock();
}
}
}
abstract class StreamInputBuffer implements InputBuffer {
abstract void receiveReset();
abstract void swallowUnread() throws IOException;
abstract void notifyEof();
abstract ByteBuffer getInBuffer(boolean create);
abstract void onDataAvailable() throws IOException;
abstract boolean isReadyForRead();
abstract boolean isRequestBodyFullyRead();
abstract void insertReplayedBody(ByteChunk body);
protected abstract boolean timeoutRead(long now);
}
class StandardStreamInputBuffer extends StreamInputBuffer {
private final Lock readStateLock = new ReentrantLock();
/*
* Two buffers are required to avoid various multi-threading issues. These issues arise from the fact that the
* Stream (or the Request/Response) used by the application is processed in one thread but the connection is
* processed in another. Therefore it is possible that a request body frame could be received before the
* application is ready to read it. If it isn't buffered, processing of the connection (and hence all streams)
* would block until the application read the data. Hence the incoming data has to be buffered. If only one
* buffer was used then it could become corrupted if the connection thread is trying to add to it at the same
* time as the application is read it. While it should be possible to avoid this corruption by careful use of
* the buffer it would still require the same copies as using two buffers and the behaviour would be less clear.
*
* The buffers are created lazily because they quickly add up to a lot of memory and most requests do not have
* bodies.
*/
// This buffer is used to populate the ByteChunk passed in to the read
// method
private byte[] outBuffer;
// This buffer is the destination for incoming data. It is normally is
// 'write mode'.
private volatile ByteBuffer inBuffer;
private volatile boolean readInterest;
// If readInterest is true, data must be available to read no later than this time.
private volatile long readTimeoutExpiry;
private volatile boolean closed;
private volatile boolean resetReceived;
@Override
public final int doRead(ApplicationBufferHandler applicationBufferHandler) throws IOException {
ensureBuffersExist();
int written = -1;
// It is still possible that the stream has been closed and inBuffer
// set to null between the call to ensureBuffersExist() above and
// the sync below. The checks just before and just inside the sync
// ensure we don't get any NPEs reported.
ByteBuffer tmpInBuffer = inBuffer;
if (tmpInBuffer == null) {
return -1;
}
// Ensure that only one thread accesses inBuffer at a time
synchronized (tmpInBuffer) {
if (inBuffer == null) {
return -1;
}
boolean canRead = false;
while (inBuffer.position() == 0 && (canRead = isActive() && !isInputFinished())) {
// Need to block until some data is written
try {
if (log.isTraceEnabled()) {
log.trace(sm.getString("stream.inputBuffer.empty"));
}
long readTimeout = handler.getProtocol().getStreamReadTimeout();
if (readTimeout < 0) {
inBuffer.wait();
} else {
inBuffer.wait(readTimeout);
}
if (resetReceived) {
throw new IOException(sm.getString("stream.inputBuffer.reset"));
}
if (inBuffer.position() == 0 && isActive() && !isInputFinished()) {
String msg = sm.getString("stream.inputBuffer.readTimeout");
StreamException se = new StreamException(msg, Http2Error.ENHANCE_YOUR_CALM, getIdAsInt());
// Trigger a reset once control returns to Tomcat
coyoteResponse.setError();
streamOutputBuffer.reset = se;
throw new CloseNowException(msg, se);
}
} catch (InterruptedException e) {
// Possible shutdown / rst or similar. Use an
// IOException to signal to the client that further I/O
// isn't possible for this Stream.
throw new IOException(e);
}
}
if (inBuffer.position() > 0) {
// Data is available in the inBuffer. Copy it to the
// outBuffer.
inBuffer.flip();
written = inBuffer.remaining();
if (log.isTraceEnabled()) {
log.trace(sm.getString("stream.inputBuffer.copy", Integer.toString(written)));
}
inBuffer.get(outBuffer, 0, written);
inBuffer.clear();
} else if (!canRead) {
return -1;
} else {
// Should never happen
throw new IllegalStateException();
}
}
applicationBufferHandler.setByteBuffer(ByteBuffer.wrap(outBuffer, 0, written));
// Increment client-side flow control windows by the number of bytes
// read
handler.writeWindowUpdate(Stream.this, written, true);
return written;
}
@Override
final boolean isReadyForRead() {
ensureBuffersExist();
readStateLock.lock();
try {
if (available() > 0) {
return true;
}
if (resetReceived) {
// Trigger ReadListener.onError()
getCoyoteRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,
new IOException(sm.getString("stream.clientResetRequest")));
coyoteRequest.action(ActionCode.DISPATCH_ERROR, null);
coyoteRequest.action(ActionCode.DISPATCH_EXECUTE, null);
return false;
}
if (!isRequestBodyFullyRead()) {
readInterest = true;
long readTimeout = handler.getProtocol().getStreamReadTimeout();
if (readTimeout > 0) {
readTimeoutExpiry = System.currentTimeMillis() + readTimeout;
} else {
readTimeoutExpiry = Long.MAX_VALUE;
}
}
return false;
} finally {
readStateLock.unlock();
}
}
@Override
final boolean isRequestBodyFullyRead() {
readStateLock.lock();
try {
return (inBuffer == null || inBuffer.position() == 0) && isInputFinished();
} finally {
readStateLock.unlock();
}
}
@Override
public final int available() {
readStateLock.lock();
try {
if (inBuffer == null) {
return 0;
}
return inBuffer.position();
} finally {
readStateLock.unlock();
}
}
/*
* Called after placing some data in the inBuffer.
*/
@Override
final void onDataAvailable() throws IOException {
readStateLock.lock();
try {
if (closed) {
swallowUnread();
} else if (readInterest) {
if (log.isTraceEnabled()) {
log.trace(sm.getString("stream.inputBuffer.dispatch"));
}
readInterest = false;
coyoteRequest.action(ActionCode.DISPATCH_READ, null);
// Always need to dispatch since this thread is processing
// the incoming connection and streams are processed on their
// own.
coyoteRequest.action(ActionCode.DISPATCH_EXECUTE, null);
} else {
if (log.isTraceEnabled()) {
log.trace(sm.getString("stream.inputBuffer.signal"));
}
synchronized (inBuffer) {
inBuffer.notifyAll();
}
}
} finally {
readStateLock.unlock();
}
}
@Override
final ByteBuffer getInBuffer(boolean create) {
if (create) {
ensureBuffersExist();
}
return inBuffer;
}
@Override
final void insertReplayedBody(ByteChunk body) {
readStateLock.lock();
try {
inBuffer = ByteBuffer.wrap(body.getBytes(), body.getStart(), body.getLength());
} finally {
readStateLock.unlock();
}
}
private void ensureBuffersExist() {
if (inBuffer == null && !closed) {
// The client must obey Tomcat's window size when sending so
// this is the initial window size set by Tomcat that the client
// uses (i.e. the local setting is required here).
int size = handler.getLocalSettings().getInitialWindowSize();
readStateLock.lock();
try {
if (inBuffer == null && !closed) {
inBuffer = ByteBuffer.allocate(size);
outBuffer = new byte[size];
}
} finally {
readStateLock.unlock();
}
}
}
@Override
final void receiveReset() {
if (inBuffer != null) {
synchronized (inBuffer) {
resetReceived = true;
inBuffer.notifyAll();
}
}
// If a read is in progress, cancel it.
readStateLock.lock();
try {
if (readInterest) {
readInterest = false;
}
} finally {
readStateLock.unlock();
}
// Trigger ReadListener.onError()
getCoyoteRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,
new IOException(sm.getString("stream.clientResetRequest")));
coyoteRequest.action(ActionCode.DISPATCH_ERROR, null);
coyoteRequest.action(ActionCode.DISPATCH_EXECUTE, null);
}
@Override
final void notifyEof() {
if (inBuffer != null) {
synchronized (inBuffer) {
inBuffer.notifyAll();
}
}
}
@Override
final void swallowUnread() throws IOException {
readStateLock.lock();
try {
closed = true;
} finally {
readStateLock.unlock();
}
if (inBuffer != null) {
int unreadByteCount = 0;
synchronized (inBuffer) {
unreadByteCount = inBuffer.position();
if (log.isTraceEnabled()) {
log.trace(sm.getString("stream.inputBuffer.swallowUnread", Integer.valueOf(unreadByteCount)));
}
if (unreadByteCount > 0) {
inBuffer.position(0);
inBuffer.limit(inBuffer.limit() - unreadByteCount);
}
}
// Do this outside of the sync because:
// - it doesn't need to be inside the sync
// - if inside the sync it can trigger a deadlock
// https://markmail.org/message/vbglzkvj6wxlhh3p
if (unreadByteCount > 0) {
handler.onSwallowedDataFramePayload(getIdAsInt(), unreadByteCount);
}
}
}
@Override
protected boolean timeoutRead(long now) {
return readInterest && now > readTimeoutExpiry;
}
}
class SavedRequestStreamInputBuffer extends StreamInputBuffer {
private final SavedRequestInputFilter inputFilter;
SavedRequestStreamInputBuffer(SavedRequestInputFilter inputFilter) {
this.inputFilter = inputFilter;
}
@Override
public int doRead(ApplicationBufferHandler handler) throws IOException {
return inputFilter.doRead(handler);
}
@Override
public int available() {
return inputFilter.available();
}
@Override
void receiveReset() {
// NO-OP
}
@Override
void swallowUnread() throws IOException {
// NO-OP
}
@Override
void notifyEof() {
// NO-OP
}
@Override
ByteBuffer getInBuffer(boolean create) {
return null;
}
@Override
void onDataAvailable() throws IOException {
// NO-OP
}
@Override
boolean isReadyForRead() {
return true;
}
@Override
boolean isRequestBodyFullyRead() {
return inputFilter.isFinished();
}
@Override
void insertReplayedBody(ByteChunk body) {
// NO-OP
}
@Override
protected boolean timeoutRead(long now) {
// Reading from a saved request. Will never time out.
return false;
}
}
}