WsSession.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.tomcat.websocket;

import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.Principal;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import javax.naming.NamingException;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.CloseReason;
import javax.websocket.CloseReason.CloseCode;
import javax.websocket.CloseReason.CloseCodes;
import javax.websocket.DeploymentException;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.Extension;
import javax.websocket.MessageHandler;
import javax.websocket.MessageHandler.Partial;
import javax.websocket.MessageHandler.Whole;
import javax.websocket.PongMessage;
import javax.websocket.RemoteEndpoint;
import javax.websocket.SendResult;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import javax.websocket.server.ServerEndpointConfig;
import javax.websocket.server.ServerEndpointConfig.Configurator;

import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.InstanceManager;
import org.apache.tomcat.util.ExceptionUtils;
import org.apache.tomcat.util.res.StringManager;
import org.apache.tomcat.websocket.pojo.PojoEndpointServer;
import org.apache.tomcat.websocket.server.DefaultServerEndpointConfigurator;

public class WsSession implements Session {

    private final Log log = LogFactory.getLog(WsSession.class); // must not be static
    private static final StringManager sm = StringManager.getManager(WsSession.class);

    // An ellipsis is a single character that looks like three periods in a row
    // and is used to indicate a continuation.
    private static final byte[] ELLIPSIS_BYTES = "\u2026".getBytes(StandardCharsets.UTF_8);
    // An ellipsis is three bytes in UTF-8
    private static final int ELLIPSIS_BYTES_LEN = ELLIPSIS_BYTES.length;

    private static final boolean SEC_CONFIGURATOR_USES_IMPL_DEFAULT;

    private static AtomicLong ids = new AtomicLong(0);

    static {
        // Use fake end point and path. They are never used, they just need to
        // be sufficient to pass the validation tests.
        ServerEndpointConfig.Builder builder = ServerEndpointConfig.Builder.create(Object.class, "/");
        ServerEndpointConfig sec = builder.build();
        SEC_CONFIGURATOR_USES_IMPL_DEFAULT = sec.getConfigurator().getClass()
                .equals(DefaultServerEndpointConfigurator.class);
    }

    private final Endpoint localEndpoint;
    private final WsRemoteEndpointImplBase wsRemoteEndpoint;
    private final RemoteEndpoint.Async remoteEndpointAsync;
    private final RemoteEndpoint.Basic remoteEndpointBasic;
    private final ClassLoader applicationClassLoader;
    private final WsWebSocketContainer webSocketContainer;
    private final URI requestUri;
    private final Map<String, List<String>> requestParameterMap;
    private final String queryString;
    private final Principal userPrincipal;
    private final EndpointConfig endpointConfig;

    private final List<Extension> negotiatedExtensions;
    private final String subProtocol;
    private final Map<String, String> pathParameters;
    private final boolean secure;
    private final String httpSessionId;
    private final String id;

    // Expected to handle message types of <String> only
    private volatile MessageHandler textMessageHandler = null;
    // Expected to handle message types of <ByteBuffer> only
    private volatile MessageHandler binaryMessageHandler = null;
    private volatile MessageHandler.Whole<PongMessage> pongMessageHandler = null;
    private AtomicReference<State> state = new AtomicReference<>(State.OPEN);
    private final Map<String, Object> userProperties = new ConcurrentHashMap<>();
    private volatile int maxBinaryMessageBufferSize = Constants.DEFAULT_BUFFER_SIZE;
    private volatile int maxTextMessageBufferSize = Constants.DEFAULT_BUFFER_SIZE;
    private volatile long maxIdleTimeout = 0;
    private volatile long lastActiveRead = System.currentTimeMillis();
    private volatile long lastActiveWrite = System.currentTimeMillis();
    private Map<FutureToSendHandler, FutureToSendHandler> futures = new ConcurrentHashMap<>();
    private volatile Long sessionCloseTimeoutExpiry;


    /**
     * Creates a new WebSocket session for communication between the provided client and remote end points. The result
     * of {@link Thread#getContextClassLoader()} at the time this constructor is called will be used when calling
     * {@link Endpoint#onClose(Session, CloseReason)}.
     *
     * @param clientEndpointHolder The end point managed by this code
     * @param wsRemoteEndpoint     The other / remote end point
     * @param wsWebSocketContainer The container that created this session
     * @param negotiatedExtensions The agreed extensions to use for this session
     * @param subProtocol          The agreed sub-protocol to use for this session
     * @param pathParameters       The path parameters associated with the request that initiated this session or
     *                                 <code>null</code> if this is a client session
     * @param secure               Was this session initiated over a secure connection?
     * @param clientEndpointConfig The configuration information for the client end point
     *
     * @throws DeploymentException if an invalid encode is specified
     */
    public WsSession(ClientEndpointHolder clientEndpointHolder, WsRemoteEndpointImplBase wsRemoteEndpoint,
            WsWebSocketContainer wsWebSocketContainer, List<Extension> negotiatedExtensions, String subProtocol,
            Map<String, String> pathParameters, boolean secure, ClientEndpointConfig clientEndpointConfig)
            throws DeploymentException {
        this.wsRemoteEndpoint = wsRemoteEndpoint;
        this.wsRemoteEndpoint.setSession(this);
        this.remoteEndpointAsync = new WsRemoteEndpointAsync(wsRemoteEndpoint);
        this.remoteEndpointBasic = new WsRemoteEndpointBasic(wsRemoteEndpoint);
        this.webSocketContainer = wsWebSocketContainer;
        applicationClassLoader = Thread.currentThread().getContextClassLoader();
        wsRemoteEndpoint.setSendTimeout(wsWebSocketContainer.getDefaultAsyncSendTimeout());
        this.maxBinaryMessageBufferSize = webSocketContainer.getDefaultMaxBinaryMessageBufferSize();
        this.maxTextMessageBufferSize = webSocketContainer.getDefaultMaxTextMessageBufferSize();
        this.maxIdleTimeout = webSocketContainer.getDefaultMaxSessionIdleTimeout();
        this.requestUri = null;
        this.requestParameterMap = Collections.emptyMap();
        this.queryString = null;
        this.userPrincipal = null;
        this.httpSessionId = null;
        this.negotiatedExtensions = negotiatedExtensions;
        if (subProtocol == null) {
            this.subProtocol = "";
        } else {
            this.subProtocol = subProtocol;
        }
        this.pathParameters = pathParameters;
        this.secure = secure;
        this.wsRemoteEndpoint.setEncoders(clientEndpointConfig);
        this.endpointConfig = clientEndpointConfig;

        this.userProperties.putAll(endpointConfig.getUserProperties());
        this.id = Long.toHexString(ids.getAndIncrement());

        this.localEndpoint = clientEndpointHolder.getInstance(getInstanceManager());

        if (log.isTraceEnabled()) {
            log.trace(sm.getString("wsSession.created", id));
        }
    }


    /**
     * Creates a new WebSocket session for communication between the provided server and remote end points. The result
     * of {@link Thread#getContextClassLoader()} at the time this constructor is called will be used when calling
     * {@link Endpoint#onClose(Session, CloseReason)}.
     *
     * @param wsRemoteEndpoint     The other / remote end point
     * @param wsWebSocketContainer The container that created this session
     * @param requestUri           The URI used to connect to this end point or <code>null</code> if this is a client
     *                                 session
     * @param requestParameterMap  The parameters associated with the request that initiated this session or
     *                                 <code>null</code> if this is a client session
     * @param queryString          The query string associated with the request that initiated this session or
     *                                 <code>null</code> if this is a client session
     * @param userPrincipal        The principal associated with the request that initiated this session or
     *                                 <code>null</code> if this is a client session
     * @param httpSessionId        The HTTP session ID associated with the request that initiated this session or
     *                                 <code>null</code> if this is a client session
     * @param negotiatedExtensions The agreed extensions to use for this session
     * @param subProtocol          The agreed sub-protocol to use for this session
     * @param pathParameters       The path parameters associated with the request that initiated this session or
     *                                 <code>null</code> if this is a client session
     * @param secure               Was this session initiated over a secure connection?
     * @param serverEndpointConfig The configuration information for the server end point
     *
     * @throws DeploymentException if an invalid encode is specified
     */
    public WsSession(WsRemoteEndpointImplBase wsRemoteEndpoint, WsWebSocketContainer wsWebSocketContainer,
            URI requestUri, Map<String, List<String>> requestParameterMap, String queryString, Principal userPrincipal,
            String httpSessionId, List<Extension> negotiatedExtensions, String subProtocol,
            Map<String, String> pathParameters, boolean secure, ServerEndpointConfig serverEndpointConfig)
            throws DeploymentException {

        this.wsRemoteEndpoint = wsRemoteEndpoint;
        this.wsRemoteEndpoint.setSession(this);
        this.remoteEndpointAsync = new WsRemoteEndpointAsync(wsRemoteEndpoint);
        this.remoteEndpointBasic = new WsRemoteEndpointBasic(wsRemoteEndpoint);
        this.webSocketContainer = wsWebSocketContainer;
        applicationClassLoader = Thread.currentThread().getContextClassLoader();
        wsRemoteEndpoint.setSendTimeout(wsWebSocketContainer.getDefaultAsyncSendTimeout());
        this.maxBinaryMessageBufferSize = webSocketContainer.getDefaultMaxBinaryMessageBufferSize();
        this.maxTextMessageBufferSize = webSocketContainer.getDefaultMaxTextMessageBufferSize();
        this.maxIdleTimeout = webSocketContainer.getDefaultMaxSessionIdleTimeout();
        this.requestUri = requestUri;
        if (requestParameterMap == null) {
            this.requestParameterMap = Collections.emptyMap();
        } else {
            this.requestParameterMap = requestParameterMap;
        }
        this.queryString = queryString;
        this.userPrincipal = userPrincipal;
        this.httpSessionId = httpSessionId;
        this.negotiatedExtensions = negotiatedExtensions;
        if (subProtocol == null) {
            this.subProtocol = "";
        } else {
            this.subProtocol = subProtocol;
        }
        this.pathParameters = pathParameters;
        this.secure = secure;
        this.wsRemoteEndpoint.setEncoders(serverEndpointConfig);
        this.endpointConfig = serverEndpointConfig;

        this.userProperties.putAll(endpointConfig.getUserProperties());
        this.id = Long.toHexString(ids.getAndIncrement());

        InstanceManager instanceManager = getInstanceManager();
        Configurator configurator = serverEndpointConfig.getConfigurator();
        Class<?> clazz = serverEndpointConfig.getEndpointClass();

        Object endpointInstance;
        try {
            if (instanceManager == null || !isDefaultConfigurator(configurator)) {
                endpointInstance = configurator.getEndpointInstance(clazz);
                if (instanceManager != null) {
                    try {
                        instanceManager.newInstance(endpointInstance);
                    } catch (ReflectiveOperationException | NamingException e) {
                        throw new DeploymentException(sm.getString("wsSession.instanceNew"), e);
                    }
                }
            } else {
                endpointInstance = instanceManager.newInstance(clazz);
            }
        } catch (ReflectiveOperationException | NamingException e) {
            throw new DeploymentException(sm.getString("wsSession.instanceCreateFailed"), e);
        }

        if (endpointInstance instanceof Endpoint) {
            this.localEndpoint = (Endpoint) endpointInstance;
        } else {
            this.localEndpoint = new PojoEndpointServer(pathParameters, endpointInstance);
        }

        if (log.isTraceEnabled()) {
            log.trace(sm.getString("wsSession.created", id));
        }
    }


    private boolean isDefaultConfigurator(Configurator configurator) {
        if (configurator.getClass().equals(DefaultServerEndpointConfigurator.class)) {
            return true;
        }
        if (SEC_CONFIGURATOR_USES_IMPL_DEFAULT &&
                configurator.getClass().equals(ServerEndpointConfig.Configurator.class)) {
            return true;
        }
        return false;
    }


    /**
     * Creates a new WebSocket session for communication between the two provided end points. The result of
     * {@link Thread#getContextClassLoader()} at the time this constructor is called will be used when calling
     * {@link Endpoint#onClose(Session, CloseReason)}.
     *
     * @param localEndpoint        The end point managed by this code
     * @param wsRemoteEndpoint     The other / remote endpoint
     * @param wsWebSocketContainer The container that created this session
     * @param requestUri           The URI used to connect to this endpoint or <code>null</code> is this is a client
     *                                 session
     * @param requestParameterMap  The parameters associated with the request that initiated this session or
     *                                 <code>null</code> if this is a client session
     * @param queryString          The query string associated with the request that initiated this session or
     *                                 <code>null</code> if this is a client session
     * @param userPrincipal        The principal associated with the request that initiated this session or
     *                                 <code>null</code> if this is a client session
     * @param httpSessionId        The HTTP session ID associated with the request that initiated this session or
     *                                 <code>null</code> if this is a client session
     * @param negotiatedExtensions The agreed extensions to use for this session
     * @param subProtocol          The agreed subprotocol to use for this session
     * @param pathParameters       The path parameters associated with the request that initiated this session or
     *                                 <code>null</code> if this is a client session
     * @param secure               Was this session initiated over a secure connection?
     * @param endpointConfig       The configuration information for the endpoint
     *
     * @throws DeploymentException if an invalid encode is specified
     *
     * @deprecated Unused. This will be removed in Tomcat 10.1
     */
    @Deprecated
    public WsSession(Endpoint localEndpoint, WsRemoteEndpointImplBase wsRemoteEndpoint,
            WsWebSocketContainer wsWebSocketContainer, URI requestUri, Map<String, List<String>> requestParameterMap,
            String queryString, Principal userPrincipal, String httpSessionId, List<Extension> negotiatedExtensions,
            String subProtocol, Map<String, String> pathParameters, boolean secure, EndpointConfig endpointConfig)
            throws DeploymentException {
        this.localEndpoint = localEndpoint;
        this.wsRemoteEndpoint = wsRemoteEndpoint;
        this.wsRemoteEndpoint.setSession(this);
        this.remoteEndpointAsync = new WsRemoteEndpointAsync(wsRemoteEndpoint);
        this.remoteEndpointBasic = new WsRemoteEndpointBasic(wsRemoteEndpoint);
        this.webSocketContainer = wsWebSocketContainer;
        applicationClassLoader = Thread.currentThread().getContextClassLoader();
        wsRemoteEndpoint.setSendTimeout(wsWebSocketContainer.getDefaultAsyncSendTimeout());
        this.maxBinaryMessageBufferSize = webSocketContainer.getDefaultMaxBinaryMessageBufferSize();
        this.maxTextMessageBufferSize = webSocketContainer.getDefaultMaxTextMessageBufferSize();
        this.maxIdleTimeout = webSocketContainer.getDefaultMaxSessionIdleTimeout();
        this.requestUri = requestUri;
        if (requestParameterMap == null) {
            this.requestParameterMap = Collections.emptyMap();
        } else {
            this.requestParameterMap = requestParameterMap;
        }
        this.queryString = queryString;
        this.userPrincipal = userPrincipal;
        this.httpSessionId = httpSessionId;
        this.negotiatedExtensions = negotiatedExtensions;
        if (subProtocol == null) {
            this.subProtocol = "";
        } else {
            this.subProtocol = subProtocol;
        }
        this.pathParameters = pathParameters;
        this.secure = secure;
        this.wsRemoteEndpoint.setEncoders(endpointConfig);
        this.endpointConfig = endpointConfig;

        this.userProperties.putAll(endpointConfig.getUserProperties());
        this.id = Long.toHexString(ids.getAndIncrement());

        InstanceManager instanceManager = getInstanceManager();
        if (instanceManager != null) {
            try {
                instanceManager.newInstance(localEndpoint);
            } catch (Exception e) {
                throw new DeploymentException(sm.getString("wsSession.instanceNew"), e);
            }
        }

        if (log.isDebugEnabled()) {
            log.debug(sm.getString("wsSession.created", id));
        }
    }


    public InstanceManager getInstanceManager() {
        return webSocketContainer.getInstanceManager(applicationClassLoader);
    }


    @Override
    public WebSocketContainer getContainer() {
        checkState();
        return webSocketContainer;
    }


    @Override
    public void addMessageHandler(MessageHandler listener) {
        Class<?> target = Util.getMessageType(listener);
        doAddMessageHandler(target, listener);
    }


    @Override
    public <T> void addMessageHandler(Class<T> clazz, Partial<T> handler) throws IllegalStateException {
        doAddMessageHandler(clazz, handler);
    }


    @Override
    public <T> void addMessageHandler(Class<T> clazz, Whole<T> handler) throws IllegalStateException {
        doAddMessageHandler(clazz, handler);
    }


    @SuppressWarnings("unchecked")
    private void doAddMessageHandler(Class<?> target, MessageHandler listener) {
        checkState();

        // Message handlers that require decoders may map to text messages,
        // binary messages, both or neither.

        // The frame processing code expects binary message handlers to
        // accept ByteBuffer

        // Use the POJO message handler wrappers as they are designed to wrap
        // arbitrary objects with MessageHandlers and can wrap MessageHandlers
        // just as easily.

        Set<MessageHandlerResult> mhResults = Util.getMessageHandlers(target, listener, endpointConfig, this);

        for (MessageHandlerResult mhResult : mhResults) {
            switch (mhResult.getType()) {
                case TEXT: {
                    if (textMessageHandler != null) {
                        throw new IllegalStateException(sm.getString("wsSession.duplicateHandlerText"));
                    }
                    textMessageHandler = mhResult.getHandler();
                    break;
                }
                case BINARY: {
                    if (binaryMessageHandler != null) {
                        throw new IllegalStateException(sm.getString("wsSession.duplicateHandlerBinary"));
                    }
                    binaryMessageHandler = mhResult.getHandler();
                    break;
                }
                case PONG: {
                    if (pongMessageHandler != null) {
                        throw new IllegalStateException(sm.getString("wsSession.duplicateHandlerPong"));
                    }
                    MessageHandler handler = mhResult.getHandler();
                    if (handler instanceof MessageHandler.Whole<?>) {
                        pongMessageHandler = (MessageHandler.Whole<PongMessage>) handler;
                    } else {
                        throw new IllegalStateException(sm.getString("wsSession.invalidHandlerTypePong"));
                    }

                    break;
                }
                default: {
                    throw new IllegalArgumentException(
                            sm.getString("wsSession.unknownHandlerType", listener, mhResult.getType()));
                }
            }
        }
    }


    @Override
    public Set<MessageHandler> getMessageHandlers() {
        checkState();
        Set<MessageHandler> result = new HashSet<>();
        if (binaryMessageHandler != null) {
            result.add(binaryMessageHandler);
        }
        if (textMessageHandler != null) {
            result.add(textMessageHandler);
        }
        if (pongMessageHandler != null) {
            result.add(pongMessageHandler);
        }
        return result;
    }


    @Override
    public void removeMessageHandler(MessageHandler listener) {
        checkState();
        if (listener == null) {
            return;
        }

        MessageHandler wrapped = null;

        if (listener instanceof WrappedMessageHandler) {
            wrapped = ((WrappedMessageHandler) listener).getWrappedHandler();
        }

        if (wrapped == null) {
            wrapped = listener;
        }

        boolean removed = false;
        if (wrapped.equals(textMessageHandler) || listener.equals(textMessageHandler)) {
            textMessageHandler = null;
            removed = true;
        }

        if (wrapped.equals(binaryMessageHandler) || listener.equals(binaryMessageHandler)) {
            binaryMessageHandler = null;
            removed = true;
        }

        if (wrapped.equals(pongMessageHandler) || listener.equals(pongMessageHandler)) {
            pongMessageHandler = null;
            removed = true;
        }

        if (!removed) {
            // ISE for now. Could swallow this silently / log this if the ISE
            // becomes a problem
            throw new IllegalStateException(sm.getString("wsSession.removeHandlerFailed", listener));
        }
    }


    @Override
    public String getProtocolVersion() {
        checkState();
        return Constants.WS_VERSION_HEADER_VALUE;
    }


    @Override
    public String getNegotiatedSubprotocol() {
        checkState();
        return subProtocol;
    }


    @Override
    public List<Extension> getNegotiatedExtensions() {
        checkState();
        return negotiatedExtensions;
    }


    @Override
    public boolean isSecure() {
        checkState();
        return secure;
    }


    @Override
    public boolean isOpen() {
        return state.get() == State.OPEN || state.get() == State.OUTPUT_CLOSING || state.get() == State.CLOSING;
    }


    public boolean isClosed() {
        return state.get() == State.CLOSED;
    }


    @Override
    public long getMaxIdleTimeout() {
        checkState();
        return maxIdleTimeout;
    }


    @Override
    public void setMaxIdleTimeout(long timeout) {
        checkState();
        this.maxIdleTimeout = timeout;
    }


    @Override
    public void setMaxBinaryMessageBufferSize(int max) {
        checkState();
        this.maxBinaryMessageBufferSize = max;
    }


    @Override
    public int getMaxBinaryMessageBufferSize() {
        checkState();
        return maxBinaryMessageBufferSize;
    }


    @Override
    public void setMaxTextMessageBufferSize(int max) {
        checkState();
        this.maxTextMessageBufferSize = max;
    }


    @Override
    public int getMaxTextMessageBufferSize() {
        checkState();
        return maxTextMessageBufferSize;
    }


    @Override
    public Set<Session> getOpenSessions() {
        checkState();
        return webSocketContainer.getOpenSessions(getSessionMapKey());
    }


    @Override
    public RemoteEndpoint.Async getAsyncRemote() {
        checkState();
        return remoteEndpointAsync;
    }


    @Override
    public RemoteEndpoint.Basic getBasicRemote() {
        checkState();
        return remoteEndpointBasic;
    }


    @Override
    public void close() throws IOException {
        close(new CloseReason(CloseCodes.NORMAL_CLOSURE, ""));
    }


    @Override
    public void close(CloseReason closeReason) throws IOException {
        doClose(closeReason, closeReason);
    }


    /**
     * WebSocket 1.0. Section 2.1.5. Need internal close method as spec requires that the local endpoint receives a 1006
     * on timeout.
     *
     * @param closeReasonMessage The close reason to pass to the remote endpoint
     * @param closeReasonLocal   The close reason to pass to the local endpoint
     */
    public void doClose(CloseReason closeReasonMessage, CloseReason closeReasonLocal) {
        doClose(closeReasonMessage, closeReasonLocal, false);
    }


    /**
     * WebSocket 1.0. Section 2.1.5. Need internal close method as spec requires that the local endpoint receives a 1006
     * on timeout.
     *
     * @param closeReasonMessage The close reason to pass to the remote endpoint
     * @param closeReasonLocal   The close reason to pass to the local endpoint
     * @param closeSocket        Should the socket be closed immediately rather than waiting for the server to respond
     */
    public void doClose(CloseReason closeReasonMessage, CloseReason closeReasonLocal, boolean closeSocket) {

        if (!state.compareAndSet(State.OPEN, State.OUTPUT_CLOSING)) {
            // Close process has already been started. Don't start it again.
            return;
        }

        if (log.isTraceEnabled()) {
            log.trace(sm.getString("wsSession.doClose", id));
        }

        // Flush any batched messages not yet sent.
        try {
            wsRemoteEndpoint.setBatchingAllowed(false);
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            log.warn(sm.getString("wsSession.flushFailOnClose"), t);
            fireEndpointOnError(t);
        }

        // Send the close message to the remote endpoint.
        sendCloseMessage(closeReasonMessage);
        fireEndpointOnClose(closeReasonLocal);
        if (!state.compareAndSet(State.OUTPUT_CLOSING, State.OUTPUT_CLOSED) || closeSocket) {
            /*
             * A close message was received in another thread or this is handling an error condition. Either way, no
             * further close message is expected to be received. Mark the session as fully closed...
             */
            state.set(State.CLOSED);
            // ... and close the network connection.
            closeConnection();
        } else {
            /*
             * Set close timeout. If the client fails to send a close message response within the timeout, the session
             * and the connection will be closed when the timeout expires.
             */
            sessionCloseTimeoutExpiry =
                    Long.valueOf(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(getSessionCloseTimeout()));
        }

        // Fail any uncompleted messages.
        IOException ioe = new IOException(sm.getString("wsSession.messageFailed"));
        SendResult sr = new SendResult(ioe);
        for (FutureToSendHandler f2sh : futures.keySet()) {
            f2sh.onResult(sr);
        }
    }


    /**
     * Called when a close message is received. Should only ever happen once. Also called after a protocol error when
     * the ProtocolHandler needs to force the closing of the connection.
     *
     * @param closeReason The reason contained within the received close message.
     */
    public void onClose(CloseReason closeReason) {
        if (log.isTraceEnabled()) {
            log.trace(sm.getString("wsSession.onClose.entry", closeReason, getId(), state));
        }
        if (state.compareAndSet(State.OPEN, State.CLOSING)) {
            // Standard close.

            // Flush any batched messages not yet sent.
            try {
                wsRemoteEndpoint.setBatchingAllowed(false);
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.warn(sm.getString("wsSession.flushFailOnClose"), t);
                fireEndpointOnError(t);
            }

            // Send the close message response to the remote endpoint.
            sendCloseMessage(closeReason);
            fireEndpointOnClose(closeReason);

            // Mark the session as fully closed.
            state.set(State.CLOSED);

            // Close the network connection.
            closeConnection();
        } else if (state.compareAndSet(State.OUTPUT_CLOSING, State.CLOSING)) {
            /*
             * The local endpoint sent a close message the the same time as the remote endpoint. The local close is
             * still being processed. Update the state so the the local close process will also close the network
             * connection once it has finished sending a close message.
             */
        } else if (state.compareAndSet(State.OUTPUT_CLOSED, State.CLOSED)) {
            /*
             * The local endpoint sent the first close message. The remote endpoint has now responded with its own close
             * message so mark the session as fully closed and close the network connection.
             */
            closeConnection();
        }
        // CLOSING and CLOSED are NO-OPs
    }


    private void closeConnection() {
        /*
         * Close the network connection.
         */
        wsRemoteEndpoint.close();
        /*
         * Don't unregister the session until the connection is fully closed since webSocketContainer is responsible for
         * tracking the session close timeout.
         */
        webSocketContainer.unregisterSession(getSessionMapKey(), this);
    }


    /*
     * Returns the session close timeout in milliseconds
     */
    protected long getSessionCloseTimeout() {
        long result = 0;
        Object obj = userProperties.get(Constants.SESSION_CLOSE_TIMEOUT_PROPERTY);
        if (obj instanceof Long) {
            result = ((Long) obj).intValue();
        }
        if (result <= 0) {
            result = Constants.DEFAULT_SESSION_CLOSE_TIMEOUT;
        }
        return result;
    }


    /*
     * Returns the session close timeout in milliseconds
     */
    private long getAbnormalSessionCloseSendTimeout() {
        long result = 0;
        Object obj = userProperties.get(Constants.ABNORMAL_SESSION_CLOSE_SEND_TIMEOUT_PROPERTY);
        if (obj instanceof Long) {
            result = ((Long) obj).longValue();
        }
        if (result <= 0) {
            result = Constants.DEFAULT_ABNORMAL_SESSION_CLOSE_SEND_TIMEOUT;
        }
        return result;
    }


    protected void checkCloseTimeout() {
        // Skip the check if no session close timeout has been set.
        if (sessionCloseTimeoutExpiry != null) {
            // Check if the timeout has expired.
            if (System.nanoTime() - sessionCloseTimeoutExpiry.longValue() > 0) {
                // Check if the session has been closed in another thread while the timeout was being processed.
                if (state.compareAndSet(State.OUTPUT_CLOSED, State.CLOSED)) {
                    closeConnection();
                }
            }
        }
    }


    private void fireEndpointOnClose(CloseReason closeReason) {

        // Fire the onClose event
        Throwable throwable = null;
        InstanceManager instanceManager = getInstanceManager();
        Thread t = Thread.currentThread();
        ClassLoader cl = t.getContextClassLoader();
        t.setContextClassLoader(applicationClassLoader);
        try {
            localEndpoint.onClose(this, closeReason);
        } catch (Throwable t1) {
            ExceptionUtils.handleThrowable(t1);
            throwable = t1;
        } finally {
            if (instanceManager != null) {
                try {
                    instanceManager.destroyInstance(localEndpoint);
                } catch (Throwable t2) {
                    ExceptionUtils.handleThrowable(t2);
                    if (throwable == null) {
                        throwable = t2;
                    }
                }
            }
            t.setContextClassLoader(cl);
        }

        if (throwable != null) {
            fireEndpointOnError(throwable);
        }
    }


    private void fireEndpointOnError(Throwable throwable) {

        // Fire the onError event
        Thread t = Thread.currentThread();
        ClassLoader cl = t.getContextClassLoader();
        t.setContextClassLoader(applicationClassLoader);
        try {
            localEndpoint.onError(this, throwable);
        } finally {
            t.setContextClassLoader(cl);
        }
    }


    private void sendCloseMessage(CloseReason closeReason) {
        // 125 is maximum size for the payload of a control message
        ByteBuffer msg = ByteBuffer.allocate(125);
        CloseCode closeCode = closeReason.getCloseCode();
        // CLOSED_ABNORMALLY should not be put on the wire
        if (closeCode == CloseCodes.CLOSED_ABNORMALLY) {
            // PROTOCOL_ERROR is probably better than GOING_AWAY here
            msg.putShort((short) CloseCodes.PROTOCOL_ERROR.getCode());
        } else {
            msg.putShort((short) closeCode.getCode());
        }

        String reason = closeReason.getReasonPhrase();
        if (reason != null && reason.length() > 0) {
            appendCloseReasonWithTruncation(msg, reason);
        }
        msg.flip();
        try {
            if (closeCode == CloseCodes.NORMAL_CLOSURE) {
                wsRemoteEndpoint.sendMessageBlock(Constants.OPCODE_CLOSE, msg, true);
            } else {
                wsRemoteEndpoint.sendMessageBlock(Constants.OPCODE_CLOSE, msg, true,
                        getAbnormalSessionCloseSendTimeout());
            }
        } catch (IOException | IllegalStateException e) {
            // Failed to send close message. Close the socket and let the caller
            // deal with the Exception
            if (log.isDebugEnabled()) {
                log.debug(sm.getString("wsSession.sendCloseFail", id), e);
            }
            closeConnection();
            // Failure to send a close message is not unexpected in the case of
            // an abnormal closure (usually triggered by a failure to read/write
            // from/to the client. In this case do not trigger the endpoint's
            // error handling
            if (closeCode != CloseCodes.CLOSED_ABNORMALLY) {
                localEndpoint.onError(this, e);
            }
        }
    }


    private Object getSessionMapKey() {
        if (endpointConfig instanceof ServerEndpointConfig) {
            // Server
            return ((ServerEndpointConfig) endpointConfig).getPath();
        } else {
            // Client
            return localEndpoint;
        }
    }

    /**
     * Use protected so unit tests can access this method directly.
     *
     * @param msg    The message
     * @param reason The reason
     */
    protected static void appendCloseReasonWithTruncation(ByteBuffer msg, String reason) {
        // Once the close code has been added there are a maximum of 123 bytes
        // left for the reason phrase. If it is truncated then care needs to be
        // taken to ensure the bytes are not truncated in the middle of a
        // multi-byte UTF-8 character.
        byte[] reasonBytes = reason.getBytes(StandardCharsets.UTF_8);

        if (reasonBytes.length <= 123) {
            // No need to truncate
            msg.put(reasonBytes);
        } else {
            // Need to truncate
            int remaining = 123 - ELLIPSIS_BYTES_LEN;
            int pos = 0;
            byte[] bytesNext = reason.substring(pos, pos + 1).getBytes(StandardCharsets.UTF_8);
            while (remaining >= bytesNext.length) {
                msg.put(bytesNext);
                remaining -= bytesNext.length;
                pos++;
                bytesNext = reason.substring(pos, pos + 1).getBytes(StandardCharsets.UTF_8);
            }
            msg.put(ELLIPSIS_BYTES);
        }
    }


    /**
     * Make the session aware of a {@link FutureToSendHandler} that will need to be forcibly closed if the session
     * closes before the {@link FutureToSendHandler} completes.
     *
     * @param f2sh The handler
     */
    protected void registerFuture(FutureToSendHandler f2sh) {
        // Ideally, this code should sync on stateLock so that the correct
        // action is taken based on the current state of the connection.
        // However, a sync on stateLock can't be used here as it will create the
        // possibility of a dead-lock. See BZ 61183.
        // Therefore, a slightly less efficient approach is used.

        // Always register the future.
        futures.put(f2sh, f2sh);

        if (isOpen()) {
            // The session is open. The future has been registered with the open
            // session. Normal processing continues.
            return;
        }

        // The session is closing / closed. The future may or may not have been registered
        // in time for it to be processed during session closure.

        if (f2sh.isDone()) {
            // The future has completed. It is not known if the future was
            // completed normally by the I/O layer or in error by doClose(). It
            // doesn't matter which. There is nothing more to do here.
            return;
        }

        // The session is closing / closed. The Future had not completed when last checked.
        // There is a small timing window that means the Future may have been
        // completed since the last check. There is also the possibility that
        // the Future was not registered in time to be cleaned up during session
        // close.
        // Attempt to complete the Future with an error result as this ensures
        // that the Future completes and any client code waiting on it does not
        // hang. It is slightly inefficient since the Future may have been
        // completed in another thread or another thread may be about to
        // complete the Future but knowing if this is the case requires the sync
        // on stateLock (see above).
        // Note: If multiple attempts are made to complete the Future, the
        // second and subsequent attempts are ignored.

        IOException ioe = new IOException(sm.getString("wsSession.messageFailed"));
        SendResult sr = new SendResult(ioe);
        f2sh.onResult(sr);
    }


    /**
     * Remove a {@link FutureToSendHandler} from the set of tracked instances.
     *
     * @param f2sh The handler
     */
    protected void unregisterFuture(FutureToSendHandler f2sh) {
        futures.remove(f2sh);
    }


    @Override
    public URI getRequestURI() {
        checkState();
        return requestUri;
    }


    @Override
    public Map<String, List<String>> getRequestParameterMap() {
        checkState();
        return requestParameterMap;
    }


    @Override
    public String getQueryString() {
        checkState();
        return queryString;
    }


    @Override
    public Principal getUserPrincipal() {
        checkState();
        return getUserPrincipalInternal();
    }


    public Principal getUserPrincipalInternal() {
        return userPrincipal;
    }


    @Override
    public Map<String, String> getPathParameters() {
        checkState();
        return pathParameters;
    }


    @Override
    public String getId() {
        return id;
    }


    @Override
    public Map<String, Object> getUserProperties() {
        checkState();
        return userProperties;
    }


    public Endpoint getLocal() {
        return localEndpoint;
    }


    public String getHttpSessionId() {
        return httpSessionId;
    }


    protected MessageHandler getTextMessageHandler() {
        return textMessageHandler;
    }


    protected MessageHandler getBinaryMessageHandler() {
        return binaryMessageHandler;
    }


    protected MessageHandler.Whole<PongMessage> getPongMessageHandler() {
        return pongMessageHandler;
    }


    protected void updateLastActiveRead() {
        lastActiveRead = System.currentTimeMillis();
    }


    protected void updateLastActiveWrite() {
        lastActiveWrite = System.currentTimeMillis();
    }


    protected void checkExpiration() {
        // Local copies to ensure consistent behaviour during method execution
        long timeout = maxIdleTimeout;
        long timeoutRead = getMaxIdleTimeoutRead();
        long timeoutWrite = getMaxIdleTimeoutWrite();

        long currentTime = System.currentTimeMillis();
        String key = null;

        if (timeoutRead > 0 && (currentTime - lastActiveRead) > timeoutRead) {
            key = "wsSession.timeoutRead";
        } else if (timeoutWrite > 0 && (currentTime - lastActiveWrite) > timeoutWrite) {
            key = "wsSession.timeoutWrite";
        } else if (timeout > 0 && (currentTime - lastActiveRead) > timeout &&
                (currentTime - lastActiveWrite) > timeout) {
            key = "wsSession.timeout";
        }

        if (key != null) {
            String msg = sm.getString(key, getId());
            if (log.isDebugEnabled()) {
                log.debug(msg);
            }
            doClose(new CloseReason(CloseCodes.GOING_AWAY, msg), new CloseReason(CloseCodes.CLOSED_ABNORMALLY, msg));
        }
    }


    private long getMaxIdleTimeoutRead() {
        Object timeout = userProperties.get(Constants.READ_IDLE_TIMEOUT_MS);
        if (timeout instanceof Long) {
            return ((Long) timeout).longValue();
        }
        return 0;
    }


    private long getMaxIdleTimeoutWrite() {
        Object timeout = userProperties.get(Constants.WRITE_IDLE_TIMEOUT_MS);
        if (timeout instanceof Long) {
            return ((Long) timeout).longValue();
        }
        return 0;
    }


    private void checkState() {
        if (isClosed()) {
            /*
             * As per RFC 6455, a WebSocket connection is considered to be closed once a peer has sent and received a
             * WebSocket close frame.
             */
            throw new IllegalStateException(sm.getString("wsSession.closed", id));
        }
    }

    private enum State {
        OPEN,
        OUTPUT_CLOSING,
        OUTPUT_CLOSED,
        CLOSING,
        CLOSED
    }


    private WsFrameBase wsFrame;

    void setWsFrame(WsFrameBase wsFrame) {
        this.wsFrame = wsFrame;
    }


    /**
     * Suspends the reading of the incoming messages.
     */
    public void suspend() {
        wsFrame.suspend();
    }


    /**
     * Resumes the reading of the incoming messages.
     */
    public void resume() {
        wsFrame.resume();
    }
}