Http2AsyncUpgradeHandler.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.coyote.http2;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.servlet.http.WebConnection;
import org.apache.coyote.Adapter;
import org.apache.coyote.ProtocolException;
import org.apache.coyote.Request;
import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.net.SendfileState;
import org.apache.tomcat.util.net.SocketWrapperBase;
import org.apache.tomcat.util.net.SocketWrapperBase.BlockingMode;
import org.apache.tomcat.util.net.SocketWrapperBase.CompletionState;
public class Http2AsyncUpgradeHandler extends Http2UpgradeHandler {
private static final ByteBuffer[] BYTEBUFFER_ARRAY = new ByteBuffer[0];
// Ensures headers are generated and then written for one thread at a time.
// Because of the compression used, headers need to be written to the
// network in the same order they are generated.
private final Lock headerWriteLock = new ReentrantLock();
// Ensures thread triggers the stream reset is the first to send a RST frame
private final Lock sendResetLock = new ReentrantLock();
private final AtomicReference<Throwable> error = new AtomicReference<>();
private final AtomicReference<IOException> applicationIOE = new AtomicReference<>();
public Http2AsyncUpgradeHandler(Http2Protocol protocol, Adapter adapter, Request coyoteRequest) {
super(protocol, adapter, coyoteRequest);
}
private final CompletionHandler<Long,Void> errorCompletion = new CompletionHandler<Long,Void>() {
@Override
public void completed(Long result, Void attachment) {
}
@Override
public void failed(Throwable t, Void attachment) {
error.set(t);
}
};
private final CompletionHandler<Long,Void> applicationErrorCompletion = new CompletionHandler<Long,Void>() {
@Override
public void completed(Long result, Void attachment) {
}
@Override
public void failed(Throwable t, Void attachment) {
if (t instanceof IOException) {
applicationIOE.set((IOException) t);
}
error.set(t);
}
};
@Override
protected Http2Parser getParser(String connectionId) {
return new Http2AsyncParser(connectionId, this, this, socketWrapper, this);
}
@Override
protected PingManager getPingManager() {
return new AsyncPingManager();
}
@Override
public boolean hasAsyncIO() {
return true;
}
@Override
protected void processConnection(WebConnection webConnection, Stream stream) {
// The end of the processing will instead be an async callback
}
void processConnectionCallback(WebConnection webConnection, Stream stream) {
super.processConnection(webConnection, stream);
}
@Override
protected void writeSettings() {
socketWrapper.write(BlockingMode.SEMI_BLOCK, protocol.getWriteTimeout(), TimeUnit.MILLISECONDS, null,
SocketWrapperBase.COMPLETE_WRITE, errorCompletion,
ByteBuffer.wrap(localSettings.getSettingsFrameForPending()),
ByteBuffer.wrap(createWindowUpdateForSettings()));
Throwable err = error.get();
if (err != null) {
String msg = sm.getString("upgradeHandler.sendPrefaceFail", connectionId);
if (log.isDebugEnabled()) {
log.debug(msg);
}
throw new ProtocolException(msg, err);
}
}
@Override
void sendStreamReset(StreamStateMachine state, StreamException se) throws IOException {
if (log.isTraceEnabled()) {
log.trace(sm.getString("upgradeHandler.rst.debug", connectionId, Integer.toString(se.getStreamId()),
se.getError(), se.getMessage()));
}
// Write a RST frame
byte[] rstFrame = new byte[13];
// Length
ByteUtil.setThreeBytes(rstFrame, 0, 4);
// Type
rstFrame[3] = FrameType.RST.getIdByte();
// No flags
// Stream ID
ByteUtil.set31Bits(rstFrame, 5, se.getStreamId());
// Payload
ByteUtil.setFourBytes(rstFrame, 9, se.getError().getCode());
// Need to update state atomically with the sending of the RST
// frame else other threads currently working with this stream
// may see the state change and send a RST frame before the RST
// frame triggered by this thread. If that happens the client
// may see out of order RST frames which may hard to follow if
// the client is unaware the RST frames may be received out of
// order.
sendResetLock.lock();
try {
if (state != null) {
boolean active = state.isActive();
state.sendReset();
if (active) {
decrementActiveRemoteStreamCount(getStream(se.getStreamId()));
}
}
socketWrapper.write(BlockingMode.SEMI_BLOCK, protocol.getWriteTimeout(), TimeUnit.MILLISECONDS, null,
SocketWrapperBase.COMPLETE_WRITE, errorCompletion, ByteBuffer.wrap(rstFrame));
} finally {
sendResetLock.unlock();
}
handleAsyncException();
}
@Override
protected void writeGoAwayFrame(int maxStreamId, long errorCode, byte[] debugMsg) throws IOException {
byte[] fixedPayload = new byte[8];
ByteUtil.set31Bits(fixedPayload, 0, maxStreamId);
ByteUtil.setFourBytes(fixedPayload, 4, errorCode);
int len = 8;
if (debugMsg != null) {
len += debugMsg.length;
}
byte[] payloadLength = new byte[3];
ByteUtil.setThreeBytes(payloadLength, 0, len);
if (debugMsg != null) {
socketWrapper.write(BlockingMode.SEMI_BLOCK, protocol.getWriteTimeout(), TimeUnit.MILLISECONDS, null,
SocketWrapperBase.COMPLETE_WRITE, errorCompletion, ByteBuffer.wrap(payloadLength),
ByteBuffer.wrap(GOAWAY), ByteBuffer.wrap(fixedPayload), ByteBuffer.wrap(debugMsg));
} else {
socketWrapper.write(BlockingMode.SEMI_BLOCK, protocol.getWriteTimeout(), TimeUnit.MILLISECONDS, null,
SocketWrapperBase.COMPLETE_WRITE, errorCompletion, ByteBuffer.wrap(payloadLength),
ByteBuffer.wrap(GOAWAY), ByteBuffer.wrap(fixedPayload));
}
handleAsyncException();
}
@Override
void writeHeaders(Stream stream, int pushedStreamId, MimeHeaders mimeHeaders, boolean endOfStream, int payloadSize)
throws IOException {
headerWriteLock.lock();
try {
AsyncHeaderFrameBuffers headerFrameBuffers = (AsyncHeaderFrameBuffers) doWriteHeaders(stream,
pushedStreamId, mimeHeaders, endOfStream, payloadSize);
if (headerFrameBuffers != null) {
socketWrapper.write(BlockingMode.SEMI_BLOCK, protocol.getWriteTimeout(), TimeUnit.MILLISECONDS, null,
SocketWrapperBase.COMPLETE_WRITE, applicationErrorCompletion,
headerFrameBuffers.bufs.toArray(BYTEBUFFER_ARRAY));
handleAsyncException();
}
} finally {
headerWriteLock.unlock();
}
if (endOfStream) {
sentEndOfStream(stream);
}
}
@Override
protected HeaderFrameBuffers getHeaderFrameBuffers(int initialPayloadSize) {
return new AsyncHeaderFrameBuffers(initialPayloadSize);
}
@Override
void writeBody(Stream stream, ByteBuffer data, int len, boolean finished) throws IOException {
if (log.isTraceEnabled()) {
log.trace(sm.getString("upgradeHandler.writeBody", connectionId, stream.getIdAsString(),
Integer.toString(len), Boolean.valueOf(finished)));
}
reduceOverheadCount(FrameType.DATA);
// Need to check this now since sending end of stream will change this.
boolean writable = stream.canWrite();
byte[] header = new byte[9];
ByteUtil.setThreeBytes(header, 0, len);
header[3] = FrameType.DATA.getIdByte();
if (finished) {
header[4] = FLAG_END_OF_STREAM;
sentEndOfStream(stream);
}
if (writable) {
ByteUtil.set31Bits(header, 5, stream.getIdAsInt());
int orgLimit = data.limit();
data.limit(data.position() + len);
socketWrapper.write(BlockingMode.BLOCK, protocol.getWriteTimeout(), TimeUnit.MILLISECONDS, null,
SocketWrapperBase.COMPLETE_WRITE, applicationErrorCompletion, ByteBuffer.wrap(header), data);
data.limit(orgLimit);
handleAsyncException();
}
}
@Override
void writeWindowUpdate(AbstractNonZeroStream stream, int increment, boolean applicationInitiated)
throws IOException {
if (log.isTraceEnabled()) {
log.trace(sm.getString("upgradeHandler.windowUpdateConnection", getConnectionId(),
Integer.valueOf(increment)));
}
// Build window update frame for stream 0
byte[] frame = new byte[13];
ByteUtil.setThreeBytes(frame, 0, 4);
frame[3] = FrameType.WINDOW_UPDATE.getIdByte();
ByteUtil.set31Bits(frame, 9, increment);
boolean neetToWriteConnectionUpdate = true;
// No need to send update from closed stream
if (stream instanceof Stream && ((Stream) stream).canWrite()) {
int streamIncrement = ((Stream) stream).getWindowUpdateSizeToWrite(increment);
if (streamIncrement > 0) {
if (log.isTraceEnabled()) {
log.trace(sm.getString("upgradeHandler.windowUpdateStream", getConnectionId(), getIdAsString(),
Integer.valueOf(streamIncrement)));
}
byte[] frame2 = new byte[13];
ByteUtil.setThreeBytes(frame2, 0, 4);
frame2[3] = FrameType.WINDOW_UPDATE.getIdByte();
ByteUtil.set31Bits(frame2, 9, streamIncrement);
ByteUtil.set31Bits(frame2, 5, stream.getIdAsInt());
socketWrapper.write(BlockingMode.SEMI_BLOCK, protocol.getWriteTimeout(), TimeUnit.MILLISECONDS, null,
SocketWrapperBase.COMPLETE_WRITE, errorCompletion, ByteBuffer.wrap(frame),
ByteBuffer.wrap(frame2));
neetToWriteConnectionUpdate = false;
}
}
if (neetToWriteConnectionUpdate) {
socketWrapper.write(BlockingMode.SEMI_BLOCK, protocol.getWriteTimeout(), TimeUnit.MILLISECONDS, null,
SocketWrapperBase.COMPLETE_WRITE, errorCompletion, ByteBuffer.wrap(frame));
}
handleAsyncException();
}
@Override
public void settingsEnd(boolean ack) throws IOException {
if (ack) {
if (!localSettings.ack()) {
// Ack was unexpected
log.warn(sm.getString("upgradeHandler.unexpectedAck", connectionId, getIdAsString()));
}
} else {
socketWrapper.write(BlockingMode.SEMI_BLOCK, protocol.getWriteTimeout(), TimeUnit.MILLISECONDS, null,
SocketWrapperBase.COMPLETE_WRITE, errorCompletion, ByteBuffer.wrap(SETTINGS_ACK));
}
handleAsyncException();
}
private void handleAsyncException() throws IOException {
IOException ioe = applicationIOE.getAndSet(null);
if (ioe != null) {
handleAppInitiatedIOException(ioe);
} else {
Throwable err = this.error.getAndSet(null);
if (err != null) {
if (err instanceof IOException) {
throw (IOException) err;
} else {
throw new IOException(err);
}
}
}
}
@Override
protected SendfileState processSendfile(SendfileData sendfile) {
if (sendfile != null) {
try {
try (FileChannel channel = FileChannel.open(sendfile.path, StandardOpenOption.READ)) {
sendfile.mappedBuffer = channel.map(MapMode.READ_ONLY, sendfile.pos, sendfile.end - sendfile.pos);
}
// Reserve as much as possible right away
int reservation = (sendfile.end - sendfile.pos > Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int) (sendfile.end - sendfile.pos);
sendfile.streamReservation = sendfile.stream.reserveWindowSize(reservation, true);
sendfile.connectionReservation = reserveWindowSize(sendfile.stream, sendfile.streamReservation, true);
} catch (IOException e) {
return SendfileState.ERROR;
}
if (log.isTraceEnabled()) {
log.trace(sm.getString("upgradeHandler.sendfile.reservation", connectionId,
sendfile.stream.getIdAsString(), Integer.valueOf(sendfile.connectionReservation),
Integer.valueOf(sendfile.streamReservation)));
}
// connectionReservation will always be smaller than or the same as
// streamReservation
int frameSize = Integer.min(getMaxFrameSize(), sendfile.connectionReservation);
boolean finished =
(frameSize == sendfile.left) && sendfile.stream.getCoyoteResponse().getTrailerFields() == null;
// Need to check this now since sending end of stream will change this.
boolean writable = sendfile.stream.canWrite();
byte[] header = new byte[9];
ByteUtil.setThreeBytes(header, 0, frameSize);
header[3] = FrameType.DATA.getIdByte();
if (finished) {
header[4] = FLAG_END_OF_STREAM;
sentEndOfStream(sendfile.stream);
}
if (writable) {
if (log.isTraceEnabled()) {
log.trace(sm.getString("upgradeHandler.writeBody", connectionId, sendfile.stream.getIdAsString(),
Integer.toString(frameSize), Boolean.valueOf(finished)));
}
ByteUtil.set31Bits(header, 5, sendfile.stream.getIdAsInt());
sendfile.mappedBuffer.limit(sendfile.mappedBuffer.position() + frameSize);
socketWrapper.write(BlockingMode.SEMI_BLOCK, protocol.getWriteTimeout(), TimeUnit.MILLISECONDS,
sendfile, SocketWrapperBase.COMPLETE_WRITE_WITH_COMPLETION, new SendfileCompletionHandler(),
ByteBuffer.wrap(header), sendfile.mappedBuffer);
try {
handleAsyncException();
} catch (IOException e) {
return SendfileState.ERROR;
}
}
return SendfileState.PENDING;
} else {
return SendfileState.DONE;
}
}
protected class SendfileCompletionHandler implements CompletionHandler<Long,SendfileData> {
@Override
public void completed(Long nBytes, SendfileData sendfile) {
CompletionState completionState = null;
long bytesWritten = nBytes.longValue() - 9;
/*
* Loop for in-line writes only. Avoids a possible stack-overflow of chained completion handlers with a long
* series of in-line writes.
*/
do {
sendfile.left -= bytesWritten;
if (sendfile.left == 0) {
try {
sendfile.stream.getOutputBuffer().end();
} catch (IOException e) {
failed(e, sendfile);
}
return;
}
sendfile.streamReservation -= bytesWritten;
sendfile.connectionReservation -= bytesWritten;
sendfile.pos += bytesWritten;
try {
if (sendfile.connectionReservation == 0) {
if (sendfile.streamReservation == 0) {
int reservation = (sendfile.end - sendfile.pos > Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int) (sendfile.end - sendfile.pos);
sendfile.streamReservation = sendfile.stream.reserveWindowSize(reservation, true);
}
sendfile.connectionReservation =
reserveWindowSize(sendfile.stream, sendfile.streamReservation, true);
}
} catch (IOException e) {
failed(e, sendfile);
return;
}
if (log.isTraceEnabled()) {
log.trace(sm.getString("upgradeHandler.sendfile.reservation", connectionId,
sendfile.stream.getIdAsString(), Integer.valueOf(sendfile.connectionReservation),
Integer.valueOf(sendfile.streamReservation)));
}
// connectionReservation will always be smaller than or the same as
// streamReservation
int frameSize = Integer.min(getMaxFrameSize(), sendfile.connectionReservation);
boolean finished =
(frameSize == sendfile.left) && sendfile.stream.getCoyoteResponse().getTrailerFields() == null;
// Need to check this now since sending end of stream will change this.
boolean writable = sendfile.stream.canWrite();
byte[] header = new byte[9];
ByteUtil.setThreeBytes(header, 0, frameSize);
header[3] = FrameType.DATA.getIdByte();
if (finished) {
header[4] = FLAG_END_OF_STREAM;
sentEndOfStream(sendfile.stream);
}
if (writable) {
if (log.isTraceEnabled()) {
log.trace(
sm.getString("upgradeHandler.writeBody", connectionId, sendfile.stream.getIdAsString(),
Integer.toString(frameSize), Boolean.valueOf(finished)));
}
ByteUtil.set31Bits(header, 5, sendfile.stream.getIdAsInt());
sendfile.mappedBuffer.limit(sendfile.mappedBuffer.position() + frameSize);
// Note: Completion handler not called in the write
// completes in-line. The wrote will continue via the
// surrounding loop.
completionState = socketWrapper.write(BlockingMode.SEMI_BLOCK, protocol.getWriteTimeout(),
TimeUnit.MILLISECONDS, sendfile, SocketWrapperBase.COMPLETE_WRITE, this,
ByteBuffer.wrap(header), sendfile.mappedBuffer);
try {
handleAsyncException();
} catch (IOException e) {
failed(e, sendfile);
return;
}
}
// Update bytesWritten for start of next loop iteration
bytesWritten = frameSize;
} while (completionState == CompletionState.INLINE);
}
@Override
public void failed(Throwable t, SendfileData sendfile) {
applicationErrorCompletion.failed(t, null);
}
}
protected class AsyncPingManager extends PingManager {
@Override
public void sendPing(boolean force) throws IOException {
if (initiateDisabled) {
return;
}
long now = System.nanoTime();
if (force || now - lastPingNanoTime > pingIntervalNano) {
lastPingNanoTime = now;
byte[] payload = new byte[8];
int sentSequence = ++sequence;
PingRecord pingRecord = new PingRecord(sentSequence, now);
inflightPings.add(pingRecord);
ByteUtil.set31Bits(payload, 4, sentSequence);
socketWrapper.write(BlockingMode.SEMI_BLOCK, protocol.getWriteTimeout(), TimeUnit.MILLISECONDS, null,
SocketWrapperBase.COMPLETE_WRITE, errorCompletion, ByteBuffer.wrap(PING),
ByteBuffer.wrap(payload));
handleAsyncException();
}
}
@Override
public void receivePing(byte[] payload, boolean ack) throws IOException {
if (ack) {
super.receivePing(payload, ack);
} else {
// Client originated ping. Echo it back.
socketWrapper.write(BlockingMode.SEMI_BLOCK, protocol.getWriteTimeout(), TimeUnit.MILLISECONDS, null,
SocketWrapperBase.COMPLETE_WRITE, errorCompletion, ByteBuffer.wrap(PING_ACK),
ByteBuffer.wrap(payload));
handleAsyncException();
}
}
}
private static class AsyncHeaderFrameBuffers implements HeaderFrameBuffers {
int payloadSize;
private byte[] header;
private ByteBuffer payload;
private final List<ByteBuffer> bufs = new ArrayList<>();
AsyncHeaderFrameBuffers(int initialPayloadSize) {
this.payloadSize = initialPayloadSize;
}
@Override
public void startFrame() {
header = new byte[9];
payload = ByteBuffer.allocate(payloadSize);
}
@Override
public void endFrame() throws IOException {
bufs.add(ByteBuffer.wrap(header));
bufs.add(payload);
}
@Override
public void endHeaders() throws IOException {
}
@Override
public byte[] getHeader() {
return header;
}
@Override
public ByteBuffer getPayload() {
return payload;
}
@Override
public void expandPayload() {
payloadSize = payloadSize * 2;
payload = ByteBuffer.allocate(payloadSize);
}
}
}