BioSender.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.catalina.tribes.transport.bio;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Arrays;

import org.apache.catalina.tribes.RemoteProcessException;
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.transport.AbstractSender;
import org.apache.catalina.tribes.transport.Constants;
import org.apache.catalina.tribes.transport.SenderState;
import org.apache.catalina.tribes.util.StringManager;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;

/**
 * Send cluster messages with only one socket. Ack and keep Alive Handling is
 * supported
 *
 * @author Peter Rossbach
 * @since 5.5.16
 * @deprecated This will be removed in Tomcat 10
 */
@Deprecated
public class BioSender extends AbstractSender {

    private static final Log log = LogFactory.getLog(BioSender.class);

    /**
     * The string manager for this package.
     */
    protected static final StringManager sm = StringManager.getManager(BioSender.class);

    // ----------------------------------------------------- Instance Variables

    /**
     * current sender socket
     */
    private Socket socket = null;
    private OutputStream soOut = null;
    private InputStream soIn = null;

    protected final XByteBuffer ackbuf =
            new XByteBuffer(Constants.ACK_COMMAND.length, true);


    // ------------------------------------------------------------- Constructor

    public BioSender()  {
        // NO-OP
    }


    // --------------------------------------------------------- Public Methods

    /**
     * Connect other cluster member receiver
     * @see org.apache.catalina.tribes.transport.DataSender#connect()
     */
    @Override
    public  void connect() throws IOException {
        openSocket();
   }


    /**
     * disconnect and close socket
     *
     * @see org.apache.catalina.tribes.transport.DataSender#disconnect()
     */
    @Override
    public  void disconnect() {
        boolean connect = isConnected();
        closeSocket();
        if (connect) {
            if (log.isDebugEnabled()) {
                log.debug(sm.getString("bioSender.disconnect", getAddress().getHostAddress(), Integer.valueOf(getPort()), Long.valueOf(0)));
            }
        }

    }

    /**
     * Send message.
     * @param data The data to send
     * @param waitForAck Wait for an ack
     * @throws IOException An IO error occurred sending the message
     */
    public  void sendMessage(byte[] data, boolean waitForAck) throws IOException {
        IOException exception = null;
        setAttempt(0);
        try {
             // first try with existing connection
             pushMessage(data,false,waitForAck);
        } catch (IOException x) {
            SenderState.getSenderState(getDestination()).setSuspect();
            exception = x;
            if (log.isTraceEnabled()) {
                log.trace(sm.getString("bioSender.send.again", getAddress().getHostAddress(),Integer.valueOf(getPort())),x);
            }
            while ( getAttempt()<getMaxRetryAttempts() ) {
                try {
                    setAttempt(getAttempt()+1);
                    // second try with fresh connection
                    pushMessage(data, true,waitForAck);
                    exception = null;
                } catch (IOException xx) {
                    exception = xx;
                    closeSocket();
                }
            }
        } finally {
            setRequestCount(getRequestCount()+1);
            keepalive();
            if ( exception != null ) {
                throw exception;
            }
        }
    }


    @Override
    public String toString() {
        StringBuilder buf = new StringBuilder("DataSender[(");
        buf.append(super.toString()).append(')');
        buf.append(getAddress()).append(':').append(getPort()).append(']');
        return buf.toString();
    }

    // --------------------------------------------------------- Protected Methods

    /**
     * Open real socket and set time out when waitForAck is enabled
     * is socket open return directly.
     * @throws IOException Error opening socket
     */
    protected void openSocket() throws IOException {
       if(isConnected()) {
        return ;
    }
       try {
           socket = new Socket();
           InetSocketAddress sockaddr = new InetSocketAddress(getAddress(), getPort());
           socket.connect(sockaddr,(int)getTimeout());
           socket.setSendBufferSize(getTxBufSize());
           socket.setReceiveBufferSize(getRxBufSize());
           socket.setSoTimeout( (int) getTimeout());
           socket.setTcpNoDelay(getTcpNoDelay());
           socket.setKeepAlive(getSoKeepAlive());
           socket.setReuseAddress(getSoReuseAddress());
           socket.setOOBInline(getOoBInline());
           socket.setSoLinger(getSoLingerOn(),getSoLingerTime());
           socket.setTrafficClass(getSoTrafficClass());
           setConnected(true);
           soOut = socket.getOutputStream();
           soIn  = socket.getInputStream();
           setRequestCount(0);
           setConnectTime(System.currentTimeMillis());
           if (log.isDebugEnabled()) {
            log.debug(sm.getString("bioSender.openSocket", getAddress().getHostAddress(), Integer.valueOf(getPort()), Long.valueOf(0)));
        }
      } catch (IOException ex1) {
          SenderState.getSenderState(getDestination()).setSuspect();
          if (log.isDebugEnabled()) {
            log.debug(sm.getString("bioSender.openSocket.failure",getAddress().getHostAddress(), Integer.valueOf(getPort()), Long.valueOf(0)), ex1);
        }
          throw ex1;
        }

     }

    /**
     * Close socket.
     *
     * @see #disconnect()
     */
    protected void closeSocket() {
        if(isConnected()) {
             if (socket != null) {
                try {
                    socket.close();
                } catch (IOException x) {
                    // Ignore
                } finally {
                    socket = null;
                    soOut = null;
                    soIn = null;
                }
            }
            setRequestCount(0);
            setConnected(false);
            if (log.isDebugEnabled()) {
                log.debug(sm.getString("bioSender.closeSocket",getAddress().getHostAddress(), Integer.valueOf(getPort()), Long.valueOf(0)));
            }
       }
    }

    /**
     * Push messages with only one socket at a time
     * Wait for ack is needed and make auto retry when write message is failed.
     * After sending error close and reopen socket again.
     *
     * After successful sending update stats
     *
     * WARNING: Subclasses must be very careful that only one thread call this pushMessage at once!!!
     *
     * @see #closeSocket()
     * @see #openSocket()
     * @see #sendMessage(byte[], boolean)
     *
     * @param data Data to send
     * @param reconnect Do a reconnect (close socket then reopen)
     * @param waitForAck Wait for an acknowledgement
     * @throws IOException IO error writing data
     * @since 5.5.10
     */

    protected void pushMessage(byte[] data, boolean reconnect, boolean waitForAck) throws IOException {
        keepalive();
        if ( reconnect ) {
            closeSocket();
        }
        if (!isConnected()) {
            openSocket();
        }
        soOut.write(data);
        soOut.flush();
        if (waitForAck) {
            waitForAck();
        }
        SenderState.getSenderState(getDestination()).setReady();

    }

    /**
     * Wait for Acknowledgement from other server.
     * FIXME Please, not wait only for three characters, better control that the wait ack message is correct.
     * @throws IOException An IO error occurred
     */
    protected void waitForAck() throws java.io.IOException {
        try {
            boolean ackReceived = false;
            boolean failAckReceived = false;
            ackbuf.clear();
            int bytesRead = 0;
            int i = soIn.read();
            while ((i != -1) && (bytesRead < Constants.ACK_COMMAND.length)) {
                bytesRead++;
                byte d = (byte)i;
                ackbuf.append(d);
                if (ackbuf.doesPackageExist() ) {
                    byte[] ackcmd = ackbuf.extractDataPackage(true).getBytes();
                    ackReceived = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.ACK_DATA);
                    failAckReceived = Arrays.equals(ackcmd,org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA);
                    ackReceived = ackReceived || failAckReceived;
                    break;
                }
                i = soIn.read();
            }
            if (!ackReceived) {
                if (i == -1) {
                    throw new IOException(sm.getString("bioSender.ack.eof",getAddress(), Integer.valueOf(socket.getLocalPort())));
                } else {
                    throw new IOException(sm.getString("bioSender.ack.wrong",getAddress(), Integer.valueOf(socket.getLocalPort())));
                }
            } else if ( failAckReceived && getThrowOnFailedAck()) {
                throw new RemoteProcessException(sm.getString("bioSender.fail.AckReceived"));
            }
        } catch (IOException x) {
            String errmsg = sm.getString("bioSender.ack.missing", getAddress(), Integer.valueOf(socket.getLocalPort()), Long.valueOf(getTimeout()));
            if ( SenderState.getSenderState(getDestination()).isReady() ) {
                SenderState.getSenderState(getDestination()).setSuspect();
                if ( log.isWarnEnabled() ) {
                    log.warn(errmsg, x);
                }
            } else {
                if ( log.isDebugEnabled() ) {
                    log.debug(errmsg, x);
                }
            }
            throw x;
        } finally {
            ackbuf.clear();
        }
    }
}