WsRemoteEndpointImplServer.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tomcat.websocket.server;
import java.io.EOFException;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.servlet.http.WebConnection;
import javax.websocket.SendHandler;
import javax.websocket.SendResult;
import org.apache.coyote.http11.upgrade.UpgradeInfo;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.net.SocketWrapperBase;
import org.apache.tomcat.util.net.SocketWrapperBase.BlockingMode;
import org.apache.tomcat.util.res.StringManager;
import org.apache.tomcat.websocket.Constants;
import org.apache.tomcat.websocket.Transformation;
import org.apache.tomcat.websocket.WsRemoteEndpointImplBase;
/**
* This is the server side {@link javax.websocket.RemoteEndpoint} implementation - i.e. what the server uses to send
* data to the client.
*/
public class WsRemoteEndpointImplServer extends WsRemoteEndpointImplBase {
private static final StringManager sm = StringManager.getManager(WsRemoteEndpointImplServer.class);
private final Log log = LogFactory.getLog(WsRemoteEndpointImplServer.class); // must not be static
private final SocketWrapperBase<?> socketWrapper;
private final UpgradeInfo upgradeInfo;
private final WebConnection connection;
private final WsWriteTimeout wsWriteTimeout;
private volatile SendHandler handler = null;
private volatile ByteBuffer[] buffers = null;
private volatile long timeoutExpiry = -1;
public WsRemoteEndpointImplServer(SocketWrapperBase<?> socketWrapper, UpgradeInfo upgradeInfo,
WsServerContainer serverContainer, WebConnection connection) {
this.socketWrapper = socketWrapper;
this.upgradeInfo = upgradeInfo;
this.connection = connection;
this.wsWriteTimeout = serverContainer.getTimeout();
}
@Override
protected final boolean isMasked() {
return false;
}
/**
* {@inheritDoc}
* <p>
* The close message is a special case. It needs to be blocking else implementing the clean-up that follows the
* sending of the close message gets a lot more complicated. On the server, this creates additional complications as
* a dead-lock may occur in the following scenario:
* <ol>
* <li>Application thread writes message using non-blocking</li>
* <li>Write does not complete (write logic holds message pending lock)</li>
* <li>Socket is added to poller (or equivalent) for write
* <li>Client sends close message</li>
* <li>Container processes received close message and tries to send close message in response</li>
* <li>Container holds socket lock and is blocked waiting for message pending lock</li>
* <li>Poller fires write possible event for socket</li>
* <li>Container tries to process write possible event but is blocked waiting for socket lock</li>
* <li>Processing of the WebSocket connection is dead-locked until the original message write times out</li>
* </ol>
* The purpose of this method is to break the above dead-lock. It does this by returning control of the processor to
* the socket wrapper and releasing the socket lock while waiting for the pending message write to complete.
* Normally, that would be a terrible idea as it creates the possibility that the processor is returned to the pool
* more than once under various error conditions. In this instance it is safe because these are upgrade processors
* (isUpgrade() returns {@code true}) and upgrade processors are never pooled.
* <p>
* TODO: Despite the complications it creates, it would be worth exploring the possibility of processing a received
* close frame in a non-blocking manner.
*/
@Override
protected boolean acquireMessagePartInProgressSemaphore(byte opCode, long timeoutExpiry)
throws InterruptedException {
/*
* Special handling is required only when all of the following are true:
* - A close message is being sent
* - This thread currently holds the socketWrapper lock (i.e. the thread is current processing a socket event)
*
* Special handling is only possible if the socketWrapper lock is a ReentrantLock (it will be by default)
*/
if (socketWrapper.getLock() instanceof ReentrantLock) {
ReentrantLock reentrantLock = (ReentrantLock) socketWrapper.getLock();
if (opCode == Constants.OPCODE_CLOSE && reentrantLock.isHeldByCurrentThread()) {
int socketWrapperLockCount = reentrantLock.getHoldCount();
while (!messagePartInProgress.tryAcquire()) {
if (timeoutExpiry < System.currentTimeMillis()) {
return false;
}
try {
// Release control of the processor
socketWrapper.setCurrentProcessor(connection);
// Release the per socket lock(s)
for (int i = 0; i < socketWrapperLockCount; i++) {
socketWrapper.getLock().unlock();
}
// Provide opportunity for another thread to obtain the socketWrapper lock
Thread.yield();
} finally {
// Re-obtain the per socket lock(s)
for (int i = 0; i < socketWrapperLockCount; i++) {
socketWrapper.getLock().lock();
}
// Re-take control of the processor
socketWrapper.takeCurrentProcessor();
}
}
return true;
}
}
// Skip special handling
return super.acquireMessagePartInProgressSemaphore(opCode, timeoutExpiry);
}
@Override
protected void doWrite(final SendHandler handler, final long blockingWriteTimeoutExpiry, ByteBuffer... buffers) {
if (socketWrapper.hasAsyncIO()) {
final boolean block = (blockingWriteTimeoutExpiry != -1);
long timeout = -1;
if (block) {
timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis();
if (timeout <= 0) {
SendResult sr = new SendResult(new SocketTimeoutException());
handler.onResult(sr);
return;
}
} else {
this.handler = handler;
timeout = getSendTimeout();
if (timeout > 0) {
// Register with timeout thread
timeoutExpiry = timeout + System.currentTimeMillis();
wsWriteTimeout.register(this);
}
}
socketWrapper.write(block ? BlockingMode.BLOCK : BlockingMode.SEMI_BLOCK, timeout, TimeUnit.MILLISECONDS,
null, SocketWrapperBase.COMPLETE_WRITE_WITH_COMPLETION, new CompletionHandler<Long, Void>() {
@Override
public void completed(Long result, Void attachment) {
if (block) {
long timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis();
if (timeout <= 0) {
failed(new SocketTimeoutException(), null);
} else {
handler.onResult(SENDRESULT_OK);
}
} else {
wsWriteTimeout.unregister(WsRemoteEndpointImplServer.this);
clearHandler(null, true);
}
}
@Override
public void failed(Throwable exc, Void attachment) {
if (block) {
SendResult sr = new SendResult(exc);
handler.onResult(sr);
} else {
wsWriteTimeout.unregister(WsRemoteEndpointImplServer.this);
clearHandler(exc, true);
close();
}
}
}, buffers);
} else {
if (blockingWriteTimeoutExpiry == -1) {
this.handler = handler;
this.buffers = buffers;
// This is definitely the same thread that triggered the write so a
// dispatch will be required.
onWritePossible(true);
} else {
// Blocking
try {
for (ByteBuffer buffer : buffers) {
long timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis();
if (timeout <= 0) {
SendResult sr = new SendResult(new SocketTimeoutException());
handler.onResult(sr);
return;
}
socketWrapper.setWriteTimeout(timeout);
socketWrapper.write(true, buffer);
}
long timeout = blockingWriteTimeoutExpiry - System.currentTimeMillis();
if (timeout <= 0) {
SendResult sr = new SendResult(new SocketTimeoutException());
handler.onResult(sr);
return;
}
socketWrapper.setWriteTimeout(timeout);
socketWrapper.flush(true);
handler.onResult(SENDRESULT_OK);
} catch (IOException e) {
SendResult sr = new SendResult(e);
handler.onResult(sr);
}
}
}
}
@Override
protected void updateStats(long payloadLength) {
upgradeInfo.addMsgsSent(1);
upgradeInfo.addBytesSent(payloadLength);
}
public void onWritePossible(boolean useDispatch) {
// Note: Unused for async IO
ByteBuffer[] buffers = this.buffers;
if (buffers == null) {
// Servlet 3.1 will call the write listener once even if nothing
// was written
return;
}
boolean complete = false;
try {
socketWrapper.flush(false);
// If this is false there will be a call back when it is true
while (socketWrapper.isReadyForWrite()) {
complete = true;
for (ByteBuffer buffer : buffers) {
if (buffer.hasRemaining()) {
complete = false;
socketWrapper.write(false, buffer);
break;
}
}
if (complete) {
socketWrapper.flush(false);
complete = socketWrapper.isReadyForWrite();
if (complete) {
wsWriteTimeout.unregister(this);
clearHandler(null, useDispatch);
}
break;
}
}
} catch (IOException | IllegalStateException e) {
wsWriteTimeout.unregister(this);
clearHandler(e, useDispatch);
close();
}
if (!complete) {
// Async write is in progress
long timeout = getSendTimeout();
if (timeout > 0) {
// Register with timeout thread
timeoutExpiry = timeout + System.currentTimeMillis();
wsWriteTimeout.register(this);
}
}
}
@Override
protected void doClose() {
if (handler != null) {
// close() can be triggered by a wide range of scenarios. It is far
// simpler just to always use a dispatch than it is to try and track
// whether or not this method was called by the same thread that
// triggered the write
clearHandler(new EOFException(), true);
}
try {
socketWrapper.close();
} catch (Exception e) {
if (log.isInfoEnabled()) {
log.info(sm.getString("wsRemoteEndpointServer.closeFailed"), e);
}
}
wsWriteTimeout.unregister(this);
}
protected long getTimeoutExpiry() {
return timeoutExpiry;
}
/*
* Currently this is only called from the background thread so we could just call clearHandler() with useDispatch ==
* false but the method parameter was added in case other callers started to use this method to make sure that those
* callers think through what the correct value of useDispatch is for them.
*/
protected void onTimeout(boolean useDispatch) {
if (handler != null) {
clearHandler(new SocketTimeoutException(), useDispatch);
}
close();
}
@Override
protected void setTransformation(Transformation transformation) {
// Overridden purely so it is visible to other classes in this package
super.setTransformation(transformation);
}
/**
* @param t The throwable associated with any error that occurred
* @param useDispatch Should {@link SendHandler#onResult(SendResult)} be called from a new thread, keeping in mind
* the requirements of {@link javax.websocket.RemoteEndpoint.Async}
*/
void clearHandler(Throwable t, boolean useDispatch) {
// Setting the result marks this (partial) message as
// complete which means the next one may be sent which
// could update the value of the handler. Therefore, keep a
// local copy before signalling the end of the (partial)
// message.
SendHandler sh = handler;
handler = null;
buffers = null;
if (sh != null) {
if (useDispatch) {
OnResultRunnable r = new OnResultRunnable(sh, t);
try {
socketWrapper.execute(r);
} catch (RejectedExecutionException ree) {
// Can't use the executor so call the runnable directly.
// This may not be strictly specification compliant in all
// cases but during shutdown only close messages are going
// to be sent so there should not be the issue of nested
// calls leading to stack overflow as described in bug
// 55715. The issues with nested calls was the reason for
// the separate thread requirement in the specification.
r.run();
}
} else {
if (t == null) {
sh.onResult(new SendResult());
} else {
sh.onResult(new SendResult(t));
}
}
}
}
@Override
protected Lock getLock() {
return socketWrapper.getLock();
}
private static class OnResultRunnable implements Runnable {
private final SendHandler sh;
private final Throwable t;
private OnResultRunnable(SendHandler sh, Throwable t) {
this.sh = sh;
this.t = t;
}
@Override
public void run() {
if (t == null) {
sh.onResult(new SendResult());
} else {
sh.onResult(new SendResult(t));
}
}
}
}