Http2AsyncParser.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.coyote.http2;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.TimeUnit;
import javax.servlet.http.WebConnection;
import org.apache.coyote.ProtocolException;
import org.apache.tomcat.util.net.SocketEvent;
import org.apache.tomcat.util.net.SocketWrapperBase;
import org.apache.tomcat.util.net.SocketWrapperBase.BlockingMode;
import org.apache.tomcat.util.net.SocketWrapperBase.CompletionCheck;
import org.apache.tomcat.util.net.SocketWrapperBase.CompletionHandlerCall;
import org.apache.tomcat.util.net.SocketWrapperBase.CompletionState;
class Http2AsyncParser extends Http2Parser {
private final SocketWrapperBase<?> socketWrapper;
private final Http2AsyncUpgradeHandler upgradeHandler;
private volatile Throwable error = null;
Http2AsyncParser(String connectionId, Input input, Output output, SocketWrapperBase<?> socketWrapper,
Http2AsyncUpgradeHandler upgradeHandler) {
super(connectionId, input, output);
this.socketWrapper = socketWrapper;
socketWrapper.getSocketBufferHandler().expand(input.getMaxFrameSize());
this.upgradeHandler = upgradeHandler;
}
@Override
void readConnectionPreface(WebConnection webConnection, Stream stream) throws Http2Exception {
byte[] prefaceData = new byte[CLIENT_PREFACE_START.length];
ByteBuffer preface = ByteBuffer.wrap(prefaceData);
ByteBuffer header = ByteBuffer.allocate(9);
ByteBuffer framePayload = ByteBuffer.allocate(input.getMaxFrameSize());
PrefaceCompletionHandler handler =
new PrefaceCompletionHandler(webConnection, stream, prefaceData, preface, header, framePayload);
socketWrapper.read(BlockingMode.NON_BLOCK, socketWrapper.getReadTimeout(), TimeUnit.MILLISECONDS, null, handler,
handler, preface, header, framePayload);
}
private class PrefaceCompletionHandler extends FrameCompletionHandler {
private final WebConnection webConnection;
private final Stream stream;
private final byte[] prefaceData;
private volatile boolean prefaceValidated = false;
private PrefaceCompletionHandler(WebConnection webConnection, Stream stream, byte[] prefaceData,
ByteBuffer... buffers) {
super(FrameType.SETTINGS, buffers);
this.webConnection = webConnection;
this.stream = stream;
this.prefaceData = prefaceData;
}
@Override
public CompletionHandlerCall callHandler(CompletionState state, ByteBuffer[] buffers, int offset, int length) {
if (offset != 0 || length != 3) {
try {
throw new IllegalArgumentException(sm.getString("http2Parser.invalidBuffers"));
} catch (IllegalArgumentException e) {
error = e;
return CompletionHandlerCall.DONE;
}
}
if (!prefaceValidated) {
if (buffers[0].hasRemaining()) {
// The preface must be fully read before being validated
return CompletionHandlerCall.CONTINUE;
}
// Validate preface content
for (int i = 0; i < CLIENT_PREFACE_START.length; i++) {
if (CLIENT_PREFACE_START[i] != prefaceData[i]) {
error = new ProtocolException(sm.getString("http2Parser.preface.invalid"));
return CompletionHandlerCall.DONE;
}
}
prefaceValidated = true;
}
return validate(state, buffers[1], buffers[2]);
}
@Override
public void completed(Long result, Void attachment) {
if (streamException || error == null) {
ByteBuffer payload = buffers[2];
payload.flip();
try {
if (streamException) {
swallowPayload(streamId, frameTypeId, payloadSize, false, payload);
} else {
readSettingsFrame(flags, payloadSize, payload);
}
} catch (RuntimeException | IOException | Http2Exception e) {
error = e;
}
// Any extra frame is not processed yet, so put back any leftover data
if (payload.hasRemaining()) {
socketWrapper.unRead(payload);
}
// Finish processing the connection
upgradeHandler.processConnectionCallback(webConnection, stream);
} else {
upgradeHandler
.closeConnection(new ConnectionException(error.getMessage(), Http2Error.PROTOCOL_ERROR, error));
}
// Continue reading frames
upgradeHandler.upgradeDispatch(SocketEvent.OPEN_READ);
}
}
@Override
protected boolean readFrame(boolean block, FrameType expected) throws IOException, Http2Exception {
handleAsyncException();
ByteBuffer header = ByteBuffer.allocate(9);
ByteBuffer framePayload = ByteBuffer.allocate(input.getMaxFrameSize());
FrameCompletionHandler handler = new FrameCompletionHandler(expected, header, framePayload);
CompletionState state = socketWrapper.read(block ? BlockingMode.BLOCK : BlockingMode.NON_BLOCK,
block ? socketWrapper.getReadTimeout() : 0, TimeUnit.MILLISECONDS, null, handler, handler, header,
framePayload);
if (state == CompletionState.ERROR || state == CompletionState.INLINE) {
handleAsyncException();
return true;
} else {
return false;
}
}
private void handleAsyncException() throws IOException, Http2Exception {
if (error != null) {
Throwable error = this.error;
this.error = null;
if (error instanceof Http2Exception) {
throw (Http2Exception) error;
} else if (error instanceof IOException) {
throw (IOException) error;
} else if (error instanceof RuntimeException) {
throw (RuntimeException) error;
} else {
throw new RuntimeException(error);
}
}
}
private class FrameCompletionHandler implements CompletionCheck, CompletionHandler<Long,Void> {
private final FrameType expected;
protected final ByteBuffer[] buffers;
private volatile boolean parsedFrameHeader = false;
private volatile boolean validated = false;
private volatile CompletionState state = null;
protected volatile int payloadSize;
protected volatile int frameTypeId;
protected volatile FrameType frameType;
protected volatile int flags;
protected volatile int streamId;
protected volatile boolean streamException = false;
private FrameCompletionHandler(FrameType expected, ByteBuffer... buffers) {
this.expected = expected;
this.buffers = buffers;
}
@Override
public CompletionHandlerCall callHandler(CompletionState state, ByteBuffer[] buffers, int offset, int length) {
if (offset != 0 || length != 2) {
try {
throw new IllegalArgumentException(sm.getString("http2Parser.invalidBuffers"));
} catch (IllegalArgumentException e) {
error = e;
return CompletionHandlerCall.DONE;
}
}
return validate(state, buffers[0], buffers[1]);
}
protected CompletionHandlerCall validate(CompletionState state, ByteBuffer frameHeaderBuffer,
ByteBuffer payload) {
if (!parsedFrameHeader) {
// The first buffer should be 9 bytes long
if (frameHeaderBuffer.position() < 9) {
return CompletionHandlerCall.CONTINUE;
}
parsedFrameHeader = true;
payloadSize = ByteUtil.getThreeBytes(frameHeaderBuffer, 0);
frameTypeId = ByteUtil.getOneByte(frameHeaderBuffer, 3);
frameType = FrameType.valueOf(frameTypeId);
flags = ByteUtil.getOneByte(frameHeaderBuffer, 4);
streamId = ByteUtil.get31Bits(frameHeaderBuffer, 5);
}
this.state = state;
if (!validated) {
validated = true;
try {
validateFrame(expected, frameType, streamId, flags, payloadSize);
} catch (StreamException e) {
error = e;
streamException = true;
} catch (Http2Exception e) {
error = e;
// The problem will be handled later, consider the frame read is done
return CompletionHandlerCall.DONE;
}
}
if (payload.position() < payloadSize) {
return CompletionHandlerCall.CONTINUE;
}
return CompletionHandlerCall.DONE;
}
@Override
public void completed(Long result, Void attachment) {
if (streamException || error == null) {
ByteBuffer payload = buffers[1];
payload.flip();
try {
boolean continueParsing;
do {
continueParsing = false;
if (streamException) {
swallowPayload(streamId, frameTypeId, payloadSize, false, payload);
} else {
switch (frameType) {
case DATA:
readDataFrame(streamId, flags, payloadSize, payload);
break;
case HEADERS:
readHeadersFrame(streamId, flags, payloadSize, payload);
break;
case PRIORITY:
readPriorityFrame(streamId, payload);
break;
case RST:
readRstFrame(streamId, payload);
break;
case SETTINGS:
readSettingsFrame(flags, payloadSize, payload);
break;
case PUSH_PROMISE:
readPushPromiseFrame(streamId, flags, payloadSize, payload);
break;
case PING:
readPingFrame(flags, payload);
break;
case GOAWAY:
readGoawayFrame(payloadSize, payload);
break;
case WINDOW_UPDATE:
readWindowUpdateFrame(streamId, payload);
break;
case CONTINUATION:
readContinuationFrame(streamId, flags, payloadSize, payload);
break;
case PRIORITY_UPDATE:
readPriorityUpdateFrame(payloadSize, payload);
break;
case UNKNOWN:
readUnknownFrame(streamId, frameTypeId, flags, payloadSize, payload);
}
}
if (!upgradeHandler.isOverheadLimitExceeded()) {
// See if there is a new 9 byte header and continue parsing if possible
if (payload.remaining() >= 9) {
int position = payload.position();
payloadSize = ByteUtil.getThreeBytes(payload, position);
frameTypeId = ByteUtil.getOneByte(payload, position + 3);
frameType = FrameType.valueOf(frameTypeId);
flags = ByteUtil.getOneByte(payload, position + 4);
streamId = ByteUtil.get31Bits(payload, position + 5);
streamException = false;
if (payload.remaining() - 9 >= payloadSize) {
continueParsing = true;
// Now go over frame header
payload.position(payload.position() + 9);
try {
validateFrame(null, frameType, streamId, flags, payloadSize);
} catch (StreamException e) {
error = e;
streamException = true;
} catch (Http2Exception e) {
error = e;
continueParsing = false;
}
}
}
}
} while (continueParsing);
} catch (RuntimeException | IOException | Http2Exception e) {
error = e;
} finally {
if (payload.hasRemaining()) {
socketWrapper.unRead(payload);
}
}
}
if (state == CompletionState.DONE) {
// The call was not completed inline, so must start reading new frames
// or process the stream exception
upgradeHandler.upgradeDispatch(SocketEvent.OPEN_READ);
}
}
@Override
public void failed(Throwable e, Void attachment) {
// Always a fatal IO error
error = e;
if (log.isDebugEnabled()) {
log.debug(sm.getString("http2Parser.error", connectionId, Integer.valueOf(streamId), frameType), e);
}
if (state == null || state == CompletionState.DONE) {
upgradeHandler.upgradeDispatch(SocketEvent.ERROR);
}
}
}
}