Nio2Endpoint.java

/*
 *  Licensed to the Apache Software Foundation (ASF) under one or more
 *  contributor license agreements.  See the NOTICE file distributed with
 *  this work for additional information regarding copyright ownership.
 *  The ASF licenses this file to You under the Apache License, Version 2.0
 *  (the "License"); you may not use this file except in compliance with
 *  the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */
package org.apache.tomcat.util.net;

import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.channels.FileChannel;
import java.nio.channels.NetworkChannel;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import javax.net.ssl.SSLEngine;

import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.ExceptionUtils;
import org.apache.tomcat.util.collections.SynchronizedStack;
import org.apache.tomcat.util.compat.JrePlatform;
import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
import org.apache.tomcat.util.net.Acceptor.AcceptorState;
import org.apache.tomcat.util.net.jsse.JSSESupport;

/**
 * NIO2 endpoint.
 */
public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousSocketChannel> {


    // -------------------------------------------------------------- Constants


    private static final Log log = LogFactory.getLog(Nio2Endpoint.class);
    private static final Log logCertificate = LogFactory.getLog(Nio2Endpoint.class.getName() + ".certificate");
    private static final Log logHandshake = LogFactory.getLog(Nio2Endpoint.class.getName() + ".handshake");


    // ----------------------------------------------------------------- Fields

    /**
     * Server socket "pointer".
     */
    private volatile AsynchronousServerSocketChannel serverSock = null;

    /**
     * Allows detecting if a completion handler completes inline.
     */
    private static ThreadLocal<Boolean> inlineCompletion = new ThreadLocal<>();

    /**
     * Thread group associated with the server socket.
     */
    private AsynchronousChannelGroup threadGroup = null;

    private volatile boolean allClosed;

    /**
     * Bytebuffer cache, each channel holds a set of buffers (two, except for SSL holds four)
     */
    private SynchronizedStack<Nio2Channel> nioChannels;

    private SocketAddress previousAcceptedSocketRemoteAddress = null;
    private long previousAcceptedSocketNanoTime = 0;


    // --------------------------------------------------------- Public Methods

    /**
     * Number of keep-alive sockets.
     *
     * @return Always returns -1.
     */
    public int getKeepAliveCount() {
        // For this connector, only the overall connection count is relevant
        return -1;
    }


    // ----------------------------------------------- Public Lifecycle Methods


    /**
     * Initialize the endpoint.
     */
    @Override
    public void bind() throws Exception {

        // Create worker collection
        if (getExecutor() == null) {
            createExecutor();
        }
        if (getExecutor() instanceof ExecutorService) {
            threadGroup = AsynchronousChannelGroup.withThreadPool((ExecutorService) getExecutor());
        } else {
            log.info(sm.getString("endpoint.nio2.executorService"));
        }
        // AsynchronousChannelGroup needs exclusive access to its executor service
        if (!internalExecutor) {
            log.warn(sm.getString("endpoint.nio2.exclusiveExecutor"));
        }

        serverSock = AsynchronousServerSocketChannel.open(threadGroup);
        socketProperties.setProperties(serverSock);
        InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
        serverSock.bind(addr, getAcceptCount());

        // Initialize SSL if needed
        initialiseSsl();
    }


    /**
     * Start the NIO2 endpoint, creating acceptor.
     */
    @Override
    public void startInternal() throws Exception {

        if (!running) {
            allClosed = false;
            running = true;
            paused = false;

            if (socketProperties.getProcessorCache() != 0) {
                processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                        socketProperties.getProcessorCache());
            }
            int actualBufferPool =
                    socketProperties.getActualBufferPool(isSSLEnabled() ? getSniParseLimit() * 2 : 0);
            if (actualBufferPool != 0) {
                nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                        actualBufferPool);
            }
            // Create worker collection
            if (getExecutor() == null) {
                createExecutor();
            }

            initializeConnectionLatch();
            startAcceptorThread();
        }
    }

    @Override
    protected void startAcceptorThread() {
        // Instead of starting a real acceptor thread, this will instead call
        // an asynchronous accept operation
        if (acceptor == null) {
            acceptor = new Nio2Acceptor(this);
            acceptor.setThreadName(getName() + "-Acceptor");
        }
        acceptor.state = AcceptorState.RUNNING;
        getExecutor().execute(acceptor);
    }

    @Override
    public void resume() {
        super.resume();
        if (isRunning()) {
            acceptor.state = AcceptorState.RUNNING;
            getExecutor().execute(acceptor);
        }
    }

    /**
     * Stop the endpoint. This will cause all processing threads to stop.
     */
    @Override
    public void stopInternal() {
        if (!paused) {
            pause();
        }
        if (running) {
            running = false;
            acceptor.stopMillis(10);
            // Use the executor to avoid binding the main thread if something bad
            // occurs and unbind will also wait for a bit for it to complete
            getExecutor().execute(() -> {
                // Then close all active connections if any remain
                try {
                    for (SocketWrapperBase<Nio2Channel> wrapper : getConnections()) {
                        wrapper.close();
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                } finally {
                    allClosed = true;
                }
            });
            if (nioChannels != null) {
                Nio2Channel socket;
                while ((socket = nioChannels.pop()) != null) {
                    socket.free();
                }
                nioChannels = null;
            }
            if (processorCache != null) {
                processorCache.clear();
                processorCache = null;
            }
        }
    }


    /**
     * Deallocate NIO memory pools, and close server socket.
     */
    @Override
    public void unbind() throws Exception {
        if (running) {
            stop();
        }
        doCloseServerSocket();
        destroySsl();
        super.unbind();
        // Unlike other connectors, the thread pool is tied to the server socket
        shutdownExecutor();
        if (getHandler() != null) {
            getHandler().recycle();
        }
    }


    @Override
    protected void doCloseServerSocket() throws IOException {
        // Close server socket
        if (serverSock != null) {
            serverSock.close();
            serverSock = null;
        }
    }


    @Override
    public void shutdownExecutor() {
        if (threadGroup != null && internalExecutor) {
            try {
                long timeout = getExecutorTerminationTimeoutMillis();
                while (timeout > 0 && !allClosed) {
                    timeout -= 1;
                    Thread.sleep(1);
                }
                threadGroup.shutdownNow();
                if (timeout > 0) {
                    threadGroup.awaitTermination(timeout, TimeUnit.MILLISECONDS);
                }
            } catch (IOException e) {
                getLog().warn(sm.getString("endpoint.warn.executorShutdown", getName()), e);
            } catch (InterruptedException e) {
                // Ignore
            }
            if (!threadGroup.isTerminated()) {
                getLog().warn(sm.getString("endpoint.warn.executorShutdown", getName()));
            }
            threadGroup = null;
        }
        // Mostly to cleanup references
        super.shutdownExecutor();
    }


    // ------------------------------------------------------ Protected Methods

    /**
     * Process the specified connection.
     * @param socket The socket channel
     * @return <code>true</code> if the socket was correctly configured
     *  and processing may continue, <code>false</code> if the socket needs to be
     *  close immediately
     */
    @Override
    protected boolean setSocketOptions(AsynchronousSocketChannel socket) {
        Nio2SocketWrapper socketWrapper = null;
        try {
            // Allocate channel and wrapper
            Nio2Channel channel = null;
            if (nioChannels != null) {
                channel = nioChannels.pop();
            }
            if (channel == null) {
                SocketBufferHandler bufhandler = new SocketBufferHandler(
                        socketProperties.getAppReadBufSize(),
                        socketProperties.getAppWriteBufSize(),
                        socketProperties.getDirectBuffer());
                if (isSSLEnabled()) {
                    channel = new SecureNio2Channel(bufhandler, this);
                } else {
                    channel = new Nio2Channel(bufhandler);
                }
            }
            Nio2SocketWrapper newWrapper = new Nio2SocketWrapper(channel, this);
            channel.reset(socket, newWrapper);
            connections.put(socket, newWrapper);
            socketWrapper = newWrapper;

            // Set socket properties
            socketProperties.setProperties(socket);

            socketWrapper.setReadTimeout(getConnectionTimeout());
            socketWrapper.setWriteTimeout(getConnectionTimeout());
            socketWrapper.setKeepAliveLeft(Nio2Endpoint.this.getMaxKeepAliveRequests());
            // Continue processing on the same thread as the acceptor is async
            return processSocket(socketWrapper, SocketEvent.OPEN_READ, false);
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            log.error(sm.getString("endpoint.socketOptionsError"), t);
            if (socketWrapper == null) {
                destroySocket(socket);
            }
        }
        // Tell to close the socket if needed
        return false;
    }


    @Override
    protected void destroySocket(AsynchronousSocketChannel socket) {
        countDownConnection();
        try {
            socket.close();
        } catch (IOException ioe) {
            if (log.isDebugEnabled()) {
                log.debug(sm.getString("endpoint.err.close"), ioe);
            }
        }
    }


    protected SynchronizedStack<Nio2Channel> getNioChannels() {
        return nioChannels;
    }


    @Override
    protected NetworkChannel getServerSocket() {
        return serverSock;
    }


    @Override
    protected AsynchronousSocketChannel serverSocketAccept() throws Exception {
        AsynchronousSocketChannel result = serverSock.accept().get();

        // Bug does not affect Windows. Skip the check on that platform.
        if (!JrePlatform.IS_WINDOWS) {
            SocketAddress currentRemoteAddress = result.getRemoteAddress();
            long currentNanoTime = System.nanoTime();
            if (currentRemoteAddress.equals(previousAcceptedSocketRemoteAddress) &&
                    currentNanoTime - previousAcceptedSocketNanoTime < 1000) {
                throw new IOException(sm.getString("endpoint.err.duplicateAccept"));
            }
            previousAcceptedSocketRemoteAddress = currentRemoteAddress;
            previousAcceptedSocketNanoTime = currentNanoTime;
        }

        return result;
    }


    @Override
    protected Log getLog() {
        return log;
    }


    @Override
    protected Log getLogCertificate() {
        return logCertificate;
    }


    @Override
    protected SocketProcessorBase<Nio2Channel> createSocketProcessor(
            SocketWrapperBase<Nio2Channel> socketWrapper, SocketEvent event) {
        return new SocketProcessor(socketWrapper, event);
    }


    protected class Nio2Acceptor extends Acceptor<AsynchronousSocketChannel>
        implements CompletionHandler<AsynchronousSocketChannel, Void> {

        protected int errorDelay = 0;

        public Nio2Acceptor(AbstractEndpoint<?, AsynchronousSocketChannel> endpoint) {
            super(endpoint);
        }

        @Override
        public void run() {
            // The initial accept will be called in a separate utility thread
            if (!isPaused()) {
                //if we have reached max connections, wait
                try {
                    countUpOrAwaitConnection();
                } catch (InterruptedException e) {
                    // Ignore
                }
                if (!isPaused()) {
                    // Note: as a special behavior, the completion handler for accept is
                    // always called in a separate thread.
                    serverSock.accept(null, this);
                } else {
                    state = AcceptorState.PAUSED;
                }
            } else {
                state = AcceptorState.PAUSED;
            }
        }

        /**
         * Signals the Acceptor to stop.
         *
         * @param waitMilliseconds Ignored for NIO2.
         *
         */
        @Override
        public void stopMillis(int waitMilliseconds) {
            acceptor.state = AcceptorState.ENDED;
        }

        @Override
        public void completed(AsynchronousSocketChannel socket,
                Void attachment) {
            // Successful accept, reset the error delay
            errorDelay = 0;
            // Continue processing the socket on the current thread
            // Configure the socket
            if (isRunning() && !isPaused()) {
                if (getMaxConnections() == -1) {
                    serverSock.accept(null, this);
                } else if (getConnectionCount() < getMaxConnections()) {
                    try {
                        // This will not block
                        countUpOrAwaitConnection();
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                    serverSock.accept(null, this);
                } else {
                    // Accept again on a new thread since countUpOrAwaitConnection may block
                    getExecutor().execute(this);
                }
                if (!setSocketOptions(socket)) {
                    closeSocket(socket);
                }
            } else {
                if (isRunning()) {
                    state = AcceptorState.PAUSED;
                }
                destroySocket(socket);
            }
        }

        @Override
        public void failed(Throwable t, Void attachment) {
            if (isRunning()) {
                if (!isPaused()) {
                    if (getMaxConnections() == -1) {
                        serverSock.accept(null, this);
                    } else {
                        // Accept again on a new thread since countUpOrAwaitConnection may block
                        getExecutor().execute(this);
                    }
                } else {
                    state = AcceptorState.PAUSED;
                }
                // We didn't get a socket
                countDownConnection();
                // Introduce delay if necessary
                errorDelay = handleExceptionWithDelay(errorDelay);
                ExceptionUtils.handleThrowable(t);
                log.error(sm.getString("endpoint.accept.fail"), t);
            } else {
                // We didn't get a socket
                countDownConnection();
            }
        }

    }

    public static class Nio2SocketWrapper extends SocketWrapperBase<Nio2Channel> {

        private final SynchronizedStack<Nio2Channel> nioChannels;

        private SendfileData sendfileData = null;

        private final CompletionHandler<Integer, ByteBuffer> readCompletionHandler;
        private boolean readInterest = false; // Guarded by readCompletionHandler
        private boolean readNotify = false;

        private final CompletionHandler<Integer, ByteBuffer> writeCompletionHandler;
        private final CompletionHandler<Long, ByteBuffer[]> gatheringWriteCompletionHandler;
        private boolean writeInterest = false; // Guarded by writeCompletionHandler
        private boolean writeNotify = false;

        private CompletionHandler<Integer, SendfileData> sendfileHandler
            = new CompletionHandler<>() {

            @Override
            public void completed(Integer nWrite, SendfileData attachment) {
                if (nWrite.intValue() < 0) {
                    failed(new EOFException(), attachment);
                    return;
                }
                attachment.pos += nWrite.intValue();
                ByteBuffer buffer = getSocket().getBufHandler().getWriteBuffer();
                if (!buffer.hasRemaining()) {
                    if (attachment.length <= 0) {
                        // All data has now been written
                        setSendfileData(null);
                        try {
                            attachment.fchannel.close();
                        } catch (IOException e) {
                            // Ignore
                        }
                        if (isInline()) {
                            attachment.doneInline = true;
                        } else {
                            switch (attachment.keepAliveState) {
                            case NONE: {
                                getEndpoint().processSocket(Nio2SocketWrapper.this,
                                        SocketEvent.DISCONNECT, false);
                                break;
                            }
                            case PIPELINED: {
                                if (!getEndpoint().processSocket(Nio2SocketWrapper.this, SocketEvent.OPEN_READ, true)) {
                                    close();
                                }
                                break;
                            }
                            case OPEN: {
                                registerReadInterest();
                                break;
                            }
                            }
                        }
                        return;
                    } else {
                        getSocket().getBufHandler().configureWriteBufferForWrite();
                        int nRead = -1;
                        try {
                            nRead = attachment.fchannel.read(buffer);
                        } catch (IOException e) {
                            failed(e, attachment);
                            return;
                        }
                        if (nRead > 0) {
                            getSocket().getBufHandler().configureWriteBufferForRead();
                            if (attachment.length < buffer.remaining()) {
                                buffer.limit(buffer.limit() - buffer.remaining() + (int) attachment.length);
                            }
                            attachment.length -= nRead;
                        } else {
                            failed(new EOFException(), attachment);
                            return;
                        }
                    }
                }
                getSocket().write(buffer, toTimeout(getWriteTimeout()), TimeUnit.MILLISECONDS, attachment, this);
            }

            @Override
            public void failed(Throwable exc, SendfileData attachment) {
                try {
                    attachment.fchannel.close();
                } catch (IOException e) {
                    // Ignore
                }
                if (!isInline()) {
                    getEndpoint().processSocket(Nio2SocketWrapper.this, SocketEvent.ERROR, false);
                } else {
                    attachment.doneInline = true;
                    attachment.error = true;
                }
            }
        };

        public Nio2SocketWrapper(Nio2Channel channel, final Nio2Endpoint endpoint) {
            super(channel, endpoint);
            nioChannels = endpoint.getNioChannels();
            socketBufferHandler = channel.getBufHandler();

            this.readCompletionHandler = new CompletionHandler<>() {
                @Override
                public void completed(Integer nBytes, ByteBuffer attachment) {
                    if (log.isTraceEnabled()) {
                        log.trace("Socket: [" + Nio2SocketWrapper.this + "], Interest: [" + readInterest + "]");
                    }
                    boolean notify = false;
                    synchronized (readCompletionHandler) {
                        readNotify = false;
                        if (nBytes.intValue() < 0) {
                            failed(new EOFException(), attachment);
                        } else {
                            if (readInterest && !isInline()) {
                                readNotify = true;
                            } else {
                                // Release here since there will be no
                                // notify/dispatch to do the release.
                                readPending.release();
                            }
                            readInterest = false;
                        }
                        notify = readNotify;
                    }
                    if (notify) {
                        getEndpoint().processSocket(Nio2SocketWrapper.this, SocketEvent.OPEN_READ, false);
                    }
                }
                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    IOException ioe;
                    if (exc instanceof IOException) {
                        ioe = (IOException) exc;
                    } else {
                        ioe = new IOException(exc);
                    }
                    setError(ioe);
                    if (exc instanceof AsynchronousCloseException) {
                        // Release here since there will be no
                        // notify/dispatch to do the release.
                        readPending.release();
                        // If already closed, don't call onError and close again
                        getEndpoint().processSocket(Nio2SocketWrapper.this, SocketEvent.STOP, false);
                    } else if (!getEndpoint().processSocket(Nio2SocketWrapper.this, SocketEvent.ERROR, true)) {
                        close();
                    }
                }
            };

            this.writeCompletionHandler = new CompletionHandler<>() {
                @Override
                public void completed(Integer nBytes, ByteBuffer attachment) {
                    boolean notify = false;
                    synchronized (writeCompletionHandler) {
                        writeNotify = false;
                        if (nBytes.intValue() < 0) {
                            failed(new EOFException(sm.getString("iob.failedwrite")), attachment);
                        } else if (!nonBlockingWriteBuffer.isEmpty()) {
                            // Continue writing data using a gathering write
                            ByteBuffer[] array = nonBlockingWriteBuffer.toArray(attachment);
                            getSocket().write(array, 0, array.length,
                                    toTimeout(getWriteTimeout()), TimeUnit.MILLISECONDS,
                                    array, gatheringWriteCompletionHandler);
                        } else if (attachment.hasRemaining()) {
                            // Regular write
                            getSocket().write(attachment, toTimeout(getWriteTimeout()),
                                    TimeUnit.MILLISECONDS, attachment, writeCompletionHandler);
                        } else {
                            // All data has been written
                            if (writeInterest && !isInline()) {
                                writeNotify = true;
                                // Set extra flag so that write nesting does not cause multiple notifications
                                notify = true;
                            } else {
                                // Release here since there will be no
                                // notify/dispatch to do the release.
                                writePending.release();
                            }
                            writeInterest = false;
                        }
                    }
                    if (notify) {
                        if (!endpoint.processSocket(Nio2SocketWrapper.this, SocketEvent.OPEN_WRITE, true)) {
                            close();
                        }
                    }
                }
                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    IOException ioe;
                    if (exc instanceof IOException) {
                        ioe = (IOException) exc;
                    } else {
                        ioe = new IOException(exc);
                    }
                    setError(ioe);
                    writePending.release();
                    if (!endpoint.processSocket(Nio2SocketWrapper.this, SocketEvent.ERROR, true)) {
                        close();
                    }
                }
            };

            gatheringWriteCompletionHandler = new CompletionHandler<>() {
                @Override
                public void completed(Long nBytes, ByteBuffer[] attachment) {
                    boolean notify = false;
                    synchronized (writeCompletionHandler) {
                        writeNotify = false;
                        if (nBytes.longValue() < 0) {
                            failed(new EOFException(sm.getString("iob.failedwrite")), attachment);
                        } else if (!nonBlockingWriteBuffer.isEmpty() || buffersArrayHasRemaining(attachment, 0, attachment.length)) {
                            // Continue writing data using a gathering write
                            ByteBuffer[] array = nonBlockingWriteBuffer.toArray(attachment);
                            getSocket().write(array, 0, array.length,
                                    toTimeout(getWriteTimeout()), TimeUnit.MILLISECONDS,
                                    array, gatheringWriteCompletionHandler);
                        } else {
                            // All data has been written
                            if (writeInterest && !isInline()) {
                                writeNotify = true;
                                // Set extra flag so that write nesting does not cause multiple notifications
                                notify = true;
                            } else {
                                // Release here since there will be no
                                // notify/dispatch to do the release.
                                writePending.release();
                            }
                            writeInterest = false;
                        }
                    }
                    if (notify) {
                        if (!endpoint.processSocket(Nio2SocketWrapper.this, SocketEvent.OPEN_WRITE, true)) {
                            close();
                        }
                    }
                }
                @Override
                public void failed(Throwable exc, ByteBuffer[] attachment) {
                    IOException ioe;
                    if (exc instanceof IOException) {
                        ioe = (IOException) exc;
                    } else {
                        ioe = new IOException(exc);
                    }
                    setError(ioe);
                    writePending.release();
                    if (!endpoint.processSocket(Nio2SocketWrapper.this, SocketEvent.ERROR, true)) {
                        close();
                    }
               }
            };

        }


        public void setSendfileData(SendfileData sf) { this.sendfileData = sf; }
        public SendfileData getSendfileData() { return this.sendfileData; }

        @Override
        public boolean isReadyForRead() throws IOException {
            synchronized (readCompletionHandler) {
                // A notification has been sent, it is possible to read at least once
                if (readNotify) {
                    return true;
                }
                // If a read is pending, reading is not possible until a notification is sent
                if (!readPending.tryAcquire()) {
                    readInterest = true;
                    return false;
                }
                // It is possible to read directly from the buffer contents
                if (!socketBufferHandler.isReadBufferEmpty()) {
                    readPending.release();
                    return true;
                }
                // Try to read some data
                boolean isReady = fillReadBuffer(false) > 0;
                if (!isReady) {
                    readInterest = true;
                }
                return isReady;
            }
        }


        @Override
        public boolean isReadyForWrite() {
            synchronized (writeCompletionHandler) {
                // A notification has been sent, it is possible to write at least once
                if (writeNotify) {
                    return true;
                }
                // If a write is pending, writing is not possible until a notification is sent
                if (!writePending.tryAcquire()) {
                    writeInterest = true;
                    return false;
                }
                // If the buffer is empty, it is possible to write to it
                if (socketBufferHandler.isWriteBufferEmpty() && nonBlockingWriteBuffer.isEmpty()) {
                    writePending.release();
                    return true;
                }
                // Try to flush all data
                boolean isReady = !flushNonBlockingInternal(true);
                if (!isReady) {
                    writeInterest = true;
                }
                return isReady;
            }
        }


        @Override
        public int read(boolean block, byte[] b, int off, int len) throws IOException {
            checkError();

            if (log.isTraceEnabled()) {
                log.trace("Socket: [" + this + "], block: [" + block + "], length: [" + len + "]");
            }

            if (socketBufferHandler == null) {
                throw new IOException(sm.getString("socket.closed"));
            }

            boolean notify = false;
            synchronized (readCompletionHandler) {
                notify = readNotify;
            }

            if (!notify) {
                if (block) {
                    try {
                        readPending.acquire();
                    } catch (InterruptedException e) {
                        throw new IOException(e);
                    }
                } else {
                    if (!readPending.tryAcquire()) {
                        if (log.isTraceEnabled()) {
                            log.trace("Socket: [" + this + "], Read in progress. Returning [0]");
                        }
                        return 0;
                    }
                }
            }

            int nRead = populateReadBuffer(b, off, len);
            if (nRead > 0) {
                // The code that was notified is now reading its data
                synchronized (readCompletionHandler) {
                    readNotify = false;
                }
                // This may be sufficient to complete the request and we
                // don't want to trigger another read since if there is no
                // more data to read and this request takes a while to
                // process the read will timeout triggering an error.
                readPending.release();
                return nRead;
            }

            synchronized (readCompletionHandler) {
                // Fill the read buffer as best we can.
                nRead = fillReadBuffer(block);
                // Fill as much of the remaining byte array as possible with the
                // data that was just read
                if (nRead > 0) {
                    socketBufferHandler.configureReadBufferForRead();
                    nRead = Math.min(nRead, len);
                    socketBufferHandler.getReadBuffer().get(b, off, nRead);
                } else if (nRead == 0 && !block) {
                    readInterest = true;
                }
                if (log.isTraceEnabled()) {
                    log.trace("Socket: [" + this + "], Read: [" + nRead + "]");
                }
                return nRead;
            }
        }


        @Override
        public int read(boolean block, ByteBuffer to) throws IOException {
            checkError();

            if (socketBufferHandler == null) {
                throw new IOException(sm.getString("socket.closed"));
            }

            boolean notify = false;
            synchronized (readCompletionHandler) {
                notify = readNotify;
            }

            if (!notify) {
                if (block) {
                    try {
                        readPending.acquire();
                    } catch (InterruptedException e) {
                        throw new IOException(e);
                    }
                } else {
                    if (!readPending.tryAcquire()) {
                        if (log.isTraceEnabled()) {
                            log.trace("Socket: [" + this + "], Read in progress. Returning [0]");
                        }
                        return 0;
                    }
                }
            }

            int nRead = populateReadBuffer(to);
            if (nRead > 0) {
                // The code that was notified is now reading its data
                synchronized (readCompletionHandler) {
                    readNotify = false;
                }
                // This may be sufficient to complete the request and we
                // don't want to trigger another read since if there is no
                // more data to read and this request takes a while to
                // process the read will timeout triggering an error.
                readPending.release();
                return nRead;
            }

            synchronized (readCompletionHandler) {
                // The socket read buffer capacity is socket.appReadBufSize
                int limit = socketBufferHandler.getReadBuffer().capacity();
                if (block && to.remaining() >= limit) {
                    to.limit(to.position() + limit);
                    nRead = fillReadBuffer(block, to);
                    if (log.isTraceEnabled()) {
                        log.trace("Socket: [" + this + "], Read direct from socket: [" + nRead + "]");
                    }
                } else {
                    // Fill the read buffer as best we can.
                    nRead = fillReadBuffer(block);
                    if (log.isTraceEnabled()) {
                        log.trace("Socket: [" + this + "], Read into buffer: [" + nRead + "]");
                    }
                    // Fill as much of the remaining byte array as possible with the
                    // data that was just read
                    if (nRead > 0) {
                        nRead = populateReadBuffer(to);
                    } else if (nRead == 0 && !block) {
                        readInterest = true;
                    }
                }
                return nRead;
            }
        }


        @Override
        protected void doClose() {
            if (log.isTraceEnabled()) {
                log.trace("Calling [" + getEndpoint() + "].closeSocket([" + this + "])");
            }
            try {
                getEndpoint().connections.remove(getSocket().getIOChannel());
                if (getSocket().isOpen()) {
                    getSocket().close(true);
                }
                if (getEndpoint().running) {
                    if (nioChannels == null || !nioChannels.push(getSocket())) {
                        getSocket().free();
                    }
                }
            } catch (Throwable e) {
                ExceptionUtils.handleThrowable(e);
                if (log.isDebugEnabled()) {
                    log.error(sm.getString("endpoint.debug.channelCloseFail"), e);
                }
            } finally {
                socketBufferHandler = SocketBufferHandler.EMPTY;
                nonBlockingWriteBuffer.clear();
                reset(Nio2Channel.CLOSED_NIO2_CHANNEL);
            }
            try {
                SendfileData data = getSendfileData();
                if (data != null && data.fchannel != null && data.fchannel.isOpen()) {
                    data.fchannel.close();
                }
            } catch (Throwable e) {
                ExceptionUtils.handleThrowable(e);
                if (log.isDebugEnabled()) {
                    log.error(sm.getString("endpoint.sendfile.closeError"), e);
                }
            }
        }

        @Override
        public boolean hasAsyncIO() {
            return getEndpoint().getUseAsyncIO();
        }

        @Override
        public boolean needSemaphores() {
            return true;
        }

        @Override
        public boolean hasPerOperationTimeout() {
            return true;
        }

        @Override
        protected <A> OperationState<A> newOperationState(boolean read,
                ByteBuffer[] buffers, int offset, int length,
                BlockingMode block, long timeout, TimeUnit unit, A attachment,
                CompletionCheck check, CompletionHandler<Long, ? super A> handler,
                Semaphore semaphore, VectoredIOCompletionHandler<A> completion) {
            return new Nio2OperationState<>(read, buffers, offset, length, block,
                    timeout, unit, attachment, check, handler, semaphore, completion);
        }

        private class Nio2OperationState<A> extends OperationState<A> {
            private Nio2OperationState(boolean read, ByteBuffer[] buffers, int offset, int length,
                    BlockingMode block, long timeout, TimeUnit unit, A attachment,
                    CompletionCheck check, CompletionHandler<Long, ? super A> handler,
                    Semaphore semaphore, VectoredIOCompletionHandler<A> completion) {
                super(read, buffers, offset, length, block,
                    timeout, unit, attachment, check, handler, semaphore, completion);
            }

            @Override
            protected boolean isInline() {
                return Nio2Endpoint.isInline();
            }

            @Override
            protected void start() {
                if (read) {
                    // Disable any regular read notifications caused by registerReadInterest
                    synchronized (readCompletionHandler) {
                        readNotify = true;
                    }
                } else {
                    // Disable any regular write notifications caused by registerWriteInterest
                    synchronized (writeCompletionHandler) {
                        writeNotify = true;
                    }
                }
                startInline();
                try {
                    run();
                } finally {
                    endInline();
                }
            }

            @Override
            public void run() {
                if (read) {
                    long nBytes = 0;
                    // If there is still data inside the main read buffer, it needs to be read first
                    if (!socketBufferHandler.isReadBufferEmpty()) {
                        synchronized (readCompletionHandler) {
                            socketBufferHandler.configureReadBufferForRead();
                            for (int i = 0; i < length && !socketBufferHandler.isReadBufferEmpty(); i++) {
                                nBytes += transfer(socketBufferHandler.getReadBuffer(), buffers[offset + i]);
                            }
                        }
                        if (nBytes > 0) {
                            completion.completed(Long.valueOf(nBytes), this);
                        }
                    }
                    if (nBytes == 0) {
                        getSocket().read(buffers, offset, length, timeout, unit, this, completion);
                    }
                } else {
                    // If there is still data inside the main write buffer, it needs to be written first
                    if (!socketBufferHandler.isWriteBufferEmpty()) {
                        synchronized (writeCompletionHandler) {
                            socketBufferHandler.configureWriteBufferForRead();
                            ByteBuffer[] array = nonBlockingWriteBuffer.toArray(socketBufferHandler.getWriteBuffer());
                            if (buffersArrayHasRemaining(array, 0, array.length)) {
                                getSocket().write(array, 0, array.length, timeout, unit,
                                        array, new CompletionHandler<Long, ByteBuffer[]>() {
                                            @Override
                                            public void completed(Long nBytes, ByteBuffer[] buffers) {
                                                if (nBytes.longValue() < 0) {
                                                    failed(new EOFException(), null);
                                                } else if (buffersArrayHasRemaining(buffers, 0, buffers.length)) {
                                                    getSocket().write(buffers, 0, buffers.length, toTimeout(getWriteTimeout()),
                                                            TimeUnit.MILLISECONDS, buffers, this);
                                                } else {
                                                    // Continue until everything is written
                                                    process();
                                                }
                                            }
                                            @Override
                                            public void failed(Throwable exc, ByteBuffer[] buffers) {
                                                completion.failed(exc, Nio2OperationState.this);
                                            }
                                        });
                                return;
                            }
                        }
                    }
                    getSocket().write(buffers, offset, length, timeout, unit, this, completion);
                }
            }
        }

        /* Callers of this method must:
         * - have acquired the readPending semaphore
         * - have acquired a lock on readCompletionHandler
         *
         * This method will release (or arrange for the release of) the
         * readPending semaphore once the read has completed.
         */
        private int fillReadBuffer(boolean block) throws IOException {
            socketBufferHandler.configureReadBufferForWrite();
            return fillReadBuffer(block, socketBufferHandler.getReadBuffer());
        }

        private int fillReadBuffer(boolean block, ByteBuffer to) throws IOException {
            int nRead = 0;
            Future<Integer> integer = null;
            if (block) {
                try {
                    integer = getSocket().read(to);
                    long timeout = getReadTimeout();
                    if (timeout > 0) {
                        nRead = integer.get(timeout, TimeUnit.MILLISECONDS).intValue();
                    } else {
                        nRead = integer.get().intValue();
                    }
                } catch (ExecutionException e) {
                    if (e.getCause() instanceof IOException) {
                        throw (IOException) e.getCause();
                    } else {
                        throw new IOException(e);
                    }
                } catch (InterruptedException e) {
                    throw new IOException(e);
                } catch (TimeoutException e) {
                    integer.cancel(true);
                    throw new SocketTimeoutException();
                } finally {
                    // Blocking read so need to release here since there will
                    // not be a callback to a completion handler.
                    readPending.release();
                }
            } else {
                startInline();
                getSocket().read(to, toTimeout(getReadTimeout()), TimeUnit.MILLISECONDS, to,
                        readCompletionHandler);
                endInline();
                if (readPending.availablePermits() == 1) {
                    nRead = to.position();
                }
            }
            return nRead;
        }


        /**
         * {@inheritDoc}
         * <p>
         * Overridden for NIO2 to enable a gathering write to be used to write
         * all of the remaining data in a single additional write should a
         * non-blocking write leave data in the buffer.
         */
        @Override
        protected void writeNonBlocking(byte[] buf, int off, int len) throws IOException {
            // Note: Possible alternate behavior:
            // If there's non blocking abuse (like a test writing 1MB in a single
            // "non blocking" write), then block until the previous write is
            // done rather than continue buffering
            // Also allows doing autoblocking
            // Could be "smart" with coordination with the main CoyoteOutputStream to
            // indicate the end of a write
            // Uses: if (writePending.tryAcquire(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS))
            synchronized (writeCompletionHandler) {
                checkError();
                if (writeNotify || writePending.tryAcquire()) {
                    // No pending completion handler, so writing to the main buffer
                    // is possible
                    socketBufferHandler.configureWriteBufferForWrite();
                    int thisTime = transfer(buf, off, len, socketBufferHandler.getWriteBuffer());
                    len = len - thisTime;
                    off = off + thisTime;
                    if (len > 0) {
                        // Remaining data must be buffered
                        nonBlockingWriteBuffer.add(buf, off, len);
                    }
                    flushNonBlockingInternal(true);
                } else {
                    nonBlockingWriteBuffer.add(buf, off, len);
                }
            }
        }


        /**
         * {@inheritDoc}
         * <p>
         * Overridden for NIO2 to enable a gathering write to be used to write
         * all of the remaining data in a single additional write should a
         * non-blocking write leave data in the buffer.
         */
        @Override
        protected void writeNonBlocking(ByteBuffer from) throws IOException {
            writeNonBlockingInternal(from);
        }


        /**
         * {@inheritDoc}
         * <p>
         * Overridden for NIO2 to enable a gathering write to be used to write
         * all of the remaining data in a single additional write should a
         * non-blocking write leave data in the buffer.
         */
        @Override
        protected void writeNonBlockingInternal(ByteBuffer from) throws IOException {
            // Note: Possible alternate behavior:
            // If there's non blocking abuse (like a test writing 1MB in a single
            // "non blocking" write), then block until the previous write is
            // done rather than continue buffering
            // Also allows doing autoblocking
            // Could be "smart" with coordination with the main CoyoteOutputStream to
            // indicate the end of a write
            // Uses: if (writePending.tryAcquire(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS))
            synchronized (writeCompletionHandler) {
                checkError();
                if (writeNotify || writePending.tryAcquire()) {
                    // No pending completion handler, so writing to the main buffer
                    // is possible
                    socketBufferHandler.configureWriteBufferForWrite();
                    transfer(from, socketBufferHandler.getWriteBuffer());
                    if (from.remaining() > 0) {
                        // Remaining data must be buffered
                        nonBlockingWriteBuffer.add(from);
                    }
                    flushNonBlockingInternal(true);
                } else {
                    nonBlockingWriteBuffer.add(from);
                }
            }
        }


        /**
         * @param block Ignored since this method is only called in the
         *              blocking case
         */
        @Override
        protected void doWrite(boolean block, ByteBuffer from) throws IOException {
            Future<Integer> integer = null;
            try {
                do {
                    integer = getSocket().write(from);
                    long timeout = getWriteTimeout();
                    if (timeout > 0) {
                        if (integer.get(timeout, TimeUnit.MILLISECONDS).intValue() < 0) {
                            throw new EOFException(sm.getString("iob.failedwrite"));
                        }
                    } else {
                        if (integer.get().intValue() < 0) {
                            throw new EOFException(sm.getString("iob.failedwrite"));
                        }
                    }
                } while (from.hasRemaining());
            } catch (ExecutionException e) {
                if (e.getCause() instanceof IOException) {
                    throw (IOException) e.getCause();
                } else {
                    throw new IOException(e);
                }
            } catch (InterruptedException e) {
                throw new IOException(e);
            } catch (TimeoutException e) {
                integer.cancel(true);
                throw new SocketTimeoutException();
            }
        }


        @Override
        protected void flushBlocking() throws IOException {
            checkError();

            // Before doing a blocking flush, make sure that any pending non
            // blocking write has completed.
            try {
                if (writePending.tryAcquire(toTimeout(getWriteTimeout()), TimeUnit.MILLISECONDS)) {
                    writePending.release();
                } else {
                    throw new SocketTimeoutException();
                }
            } catch (InterruptedException e) {
                // Ignore
            }

            super.flushBlocking();
        }

        @Override
        protected boolean flushNonBlocking() throws IOException {
            checkError();
            return flushNonBlockingInternal(false);
        }

        private boolean flushNonBlockingInternal(boolean hasPermit) {
            synchronized (writeCompletionHandler) {
                if (writeNotify || hasPermit || writePending.tryAcquire()) {
                    // The code that was notified is now writing its data
                    writeNotify = false;
                    socketBufferHandler.configureWriteBufferForRead();
                    if (!nonBlockingWriteBuffer.isEmpty()) {
                        ByteBuffer[] array = nonBlockingWriteBuffer.toArray(socketBufferHandler.getWriteBuffer());
                        startInline();
                        getSocket().write(array, 0, array.length, toTimeout(getWriteTimeout()),
                                TimeUnit.MILLISECONDS, array, gatheringWriteCompletionHandler);
                        endInline();
                    } else if (socketBufferHandler.getWriteBuffer().hasRemaining()) {
                        // Regular write
                        startInline();
                        getSocket().write(socketBufferHandler.getWriteBuffer(), toTimeout(getWriteTimeout()),
                                TimeUnit.MILLISECONDS, socketBufferHandler.getWriteBuffer(),
                                writeCompletionHandler);
                        endInline();
                    } else {
                        // Nothing was written
                        if (!hasPermit) {
                            writePending.release();
                        }
                        writeInterest = false;
                    }
                }
                return hasDataToWrite();
            }
        }


        @Override
        public boolean hasDataToRead() {
            synchronized (readCompletionHandler) {
                return !socketBufferHandler.isReadBufferEmpty()
                        || readNotify || getError() != null;
            }
        }


        @Override
        public boolean hasDataToWrite() {
            synchronized (writeCompletionHandler) {
                return !socketBufferHandler.isWriteBufferEmpty() || !nonBlockingWriteBuffer.isEmpty()
                        || writeNotify || writePending.availablePermits() == 0 || getError() != null;
            }
        }


        @Override
        public boolean isReadPending() {
            synchronized (readCompletionHandler) {
                return readPending.availablePermits() == 0;
            }
        }


        @Override
        public boolean isWritePending() {
            synchronized (writeCompletionHandler) {
                return writePending.availablePermits() == 0;
            }
        }


        @Override
        public void registerReadInterest() {
            synchronized (readCompletionHandler) {
                // A notification is already being sent
                if (readNotify) {
                    return;
                }
                if (log.isTraceEnabled()) {
                    log.trace(sm.getString("endpoint.debug.registerRead", this));
                }
                readInterest = true;
                if (readPending.tryAcquire()) {
                    // No read pending, so do a read
                    try {
                        if (fillReadBuffer(false) > 0) {
                            // Special case where the read completed inline, there is no notification
                            // in that case so it has to be done here
                            if (!getEndpoint().processSocket(this, SocketEvent.OPEN_READ, true)) {
                                close();
                            }
                        }
                    } catch (IOException e) {
                        // Will never happen
                        setError(e);
                    }
                }
            }
        }


        @Override
        public void registerWriteInterest() {
            synchronized (writeCompletionHandler) {
                // A notification is already being sent
                if (writeNotify) {
                    return;
                }
                if (log.isTraceEnabled()) {
                    log.trace(sm.getString("endpoint.debug.registerWrite", this));
                }
                writeInterest = true;
                if (writePending.availablePermits() == 1) {
                    // If no write is pending, notify that writing is possible
                    if (!getEndpoint().processSocket(this, SocketEvent.OPEN_WRITE, true)) {
                        close();
                    }
                }
            }
        }


        @Override
        public SendfileDataBase createSendfileData(String filename, long pos, long length) {
            return new SendfileData(filename, pos, length);
        }


        @Override
        public SendfileState processSendfile(SendfileDataBase sendfileData) {
            SendfileData data = (SendfileData) sendfileData;
            setSendfileData(data);
            // Configure the send file data
            if (data.fchannel == null || !data.fchannel.isOpen()) {
                java.nio.file.Path path = new File(sendfileData.fileName).toPath();
                try {
                    data.fchannel = FileChannel.open(path, StandardOpenOption.READ).position(sendfileData.pos);
                } catch (IOException e) {
                    return SendfileState.ERROR;
                }
            }
            getSocket().getBufHandler().configureWriteBufferForWrite();
            ByteBuffer buffer = getSocket().getBufHandler().getWriteBuffer();
            int nRead = -1;
            try {
                nRead = data.fchannel.read(buffer);
            } catch (IOException e1) {
                return SendfileState.ERROR;
            }

            if (nRead >= 0) {
                data.length -= nRead;
                getSocket().getBufHandler().configureWriteBufferForRead();
                startInline();
                getSocket().write(buffer, toTimeout(getWriteTimeout()), TimeUnit.MILLISECONDS,
                        data, sendfileHandler);
                endInline();
                if (data.doneInline) {
                    if (data.error) {
                        return SendfileState.ERROR;
                    } else {
                        return SendfileState.DONE;
                    }
                } else {
                    return SendfileState.PENDING;
                }
            } else {
                return SendfileState.ERROR;
            }
        }


        @Override
        protected void populateRemoteAddr() {
            AsynchronousSocketChannel sc = getSocket().getIOChannel();
            if (sc != null) {
                SocketAddress socketAddress = null;
                try {
                    socketAddress = sc.getRemoteAddress();
                } catch (IOException e) {
                    // Ignore
                }
                if (socketAddress instanceof InetSocketAddress) {
                    remoteAddr = ((InetSocketAddress) socketAddress).getAddress().getHostAddress();
                }
            }
        }


        @Override
        protected void populateRemoteHost() {
            AsynchronousSocketChannel sc = getSocket().getIOChannel();
            if (sc != null) {
                SocketAddress socketAddress = null;
                try {
                    socketAddress = sc.getRemoteAddress();
                } catch (IOException e) {
                    log.warn(sm.getString("endpoint.warn.noRemoteHost", getSocket()), e);
                }
                if (socketAddress instanceof InetSocketAddress) {
                    remoteHost = ((InetSocketAddress) socketAddress).getAddress().getHostName();
                    if (remoteAddr == null) {
                        remoteAddr = ((InetSocketAddress) socketAddress).getAddress().getHostAddress();
                    }
                }
            }
        }


        @Override
        protected void populateRemotePort() {
            AsynchronousSocketChannel sc = getSocket().getIOChannel();
            if (sc != null) {
                SocketAddress socketAddress = null;
                try {
                    socketAddress = sc.getRemoteAddress();
                } catch (IOException e) {
                    log.warn(sm.getString("endpoint.warn.noRemotePort", getSocket()), e);
                }
                if (socketAddress instanceof InetSocketAddress) {
                    remotePort = ((InetSocketAddress) socketAddress).getPort();
                }
            }
        }


        @Override
        protected void populateLocalName() {
            AsynchronousSocketChannel sc = getSocket().getIOChannel();
            if (sc != null) {
                SocketAddress socketAddress = null;
                try {
                    socketAddress = sc.getLocalAddress();
                } catch (IOException e) {
                    log.warn(sm.getString("endpoint.warn.noLocalName", getSocket()), e);
                }
                if (socketAddress instanceof InetSocketAddress) {
                    localName = ((InetSocketAddress) socketAddress).getHostName();
                }
            }
        }


        @Override
        protected void populateLocalAddr() {
            AsynchronousSocketChannel sc = getSocket().getIOChannel();
            if (sc != null) {
                SocketAddress socketAddress = null;
                try {
                    socketAddress = sc.getLocalAddress();
                } catch (IOException e) {
                    log.warn(sm.getString("endpoint.warn.noLocalAddr", getSocket()), e);
                }
                if (socketAddress instanceof InetSocketAddress) {
                    localAddr = ((InetSocketAddress) socketAddress).getAddress().getHostAddress();
                }
            }
        }


        @Override
        protected void populateLocalPort() {
            AsynchronousSocketChannel sc = getSocket().getIOChannel();
            if (sc != null) {
                SocketAddress socketAddress = null;
                try {
                    socketAddress = sc.getLocalAddress();
                } catch (IOException e) {
                    log.warn(sm.getString("endpoint.warn.noLocalPort", getSocket()), e);
                }
                if (socketAddress instanceof InetSocketAddress) {
                    localPort = ((InetSocketAddress) socketAddress).getPort();
                }
            }
        }


        @Override
        public SSLSupport getSslSupport() {
            if (getSocket() instanceof SecureNio2Channel) {
                SecureNio2Channel ch = (SecureNio2Channel) getSocket();
                return ch.getSSLSupport();
            }
            return null;
        }


        @Override
        public void doClientAuth(SSLSupport sslSupport) throws IOException {
            SecureNio2Channel sslChannel = (SecureNio2Channel) getSocket();
            SSLEngine engine = sslChannel.getSslEngine();
            if (!engine.getNeedClientAuth()) {
                // Need to re-negotiate SSL connection
                engine.setNeedClientAuth(true);
                sslChannel.rehandshake();
                ((JSSESupport) sslSupport).setSession(engine.getSession());
            }
        }


        @Override
        public void setAppReadBufHandler(ApplicationBufferHandler handler) {
            getSocket().setAppReadBufHandler(handler);
        }
    }

    public static void startInline() {
        inlineCompletion.set(Boolean.TRUE);
    }

    public static void endInline() {
        inlineCompletion.set(Boolean.FALSE);
    }

    public static boolean isInline() {
        Boolean flag = inlineCompletion.get();
        if (flag == null) {
            return false;
        } else {
            return flag.booleanValue();
        }
    }


    // ---------------------------------------------- SocketProcessor Inner Class
    /**
     * This class is the equivalent of the Worker, but will simply use in an
     * external Executor thread pool.
     */
    protected class SocketProcessor extends SocketProcessorBase<Nio2Channel> {

        public SocketProcessor(SocketWrapperBase<Nio2Channel> socketWrapper, SocketEvent event) {
            super(socketWrapper, event);
        }

        @Override
        protected void doRun() {
            boolean launch = false;
            try {
                int handshake = -1;

                try {
                    if (socketWrapper.getSocket().isHandshakeComplete()) {
                        // No TLS handshaking required. Let the handler
                        // process this socket / event combination.
                        handshake = 0;
                    } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
                            event == SocketEvent.ERROR) {
                        // Unable to complete the TLS handshake. Treat it as
                        // if the handshake failed.
                        handshake = -1;
                    } else {
                        handshake = socketWrapper.getSocket().handshake();
                        // The handshake process reads/writes from/to the
                        // socket. status may therefore be OPEN_WRITE once
                        // the handshake completes. However, the handshake
                        // happens when the socket is opened so the status
                        // must always be OPEN_READ after it completes. It
                        // is OK to always set this as it is only used if
                        // the handshake completes.
                        event = SocketEvent.OPEN_READ;
                    }
                } catch (IOException x) {
                    handshake = -1;
                    if (logHandshake.isDebugEnabled()) {
                        logHandshake.debug(sm.getString("endpoint.err.handshake",
                                socketWrapper.getRemoteAddr(), Integer.toString(socketWrapper.getRemotePort())), x);
                    }
                }
                if (handshake == 0) {
                    SocketState state = SocketState.OPEN;
                    // Process the request from this socket
                    if (event == null) {
                        state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
                    } else {
                        state = getHandler().process(socketWrapper, event);
                    }
                    if (state == SocketState.CLOSED) {
                        // Close socket and pool
                        socketWrapper.close();
                    } else if (state == SocketState.UPGRADING) {
                        launch = true;
                    }
                } else if (handshake == -1 ) {
                    getHandler().process(socketWrapper, SocketEvent.CONNECT_FAIL);
                    socketWrapper.close();
                }
            } catch (VirtualMachineError vme) {
                ExceptionUtils.handleThrowable(vme);
            } catch (Throwable t) {
                log.error(sm.getString("endpoint.processing.fail"), t);
                if (socketWrapper != null) {
                    ((Nio2SocketWrapper) socketWrapper).close();
                }
            } finally {
                if (launch) {
                    try {
                        getExecutor().execute(new SocketProcessor(socketWrapper, SocketEvent.OPEN_READ));
                    } catch (NullPointerException npe) {
                        if (running) {
                            log.error(sm.getString("endpoint.launch.fail"),
                                    npe);
                        }
                    }
                }
                socketWrapper = null;
                event = null;
                //return to cache
                if (running && processorCache != null) {
                    processorCache.push(this);
                }
            }
        }
    }

    // ----------------------------------------------- SendfileData Inner Class
    /**
     * SendfileData class.
     */
    public static class SendfileData extends SendfileDataBase {
        private FileChannel fchannel;
        // Internal use only
        private boolean doneInline = false;
        private boolean error = false;

        public SendfileData(String filename, long pos, long length) {
            super(filename, pos, length);
        }
    }
}