BioSender.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.tribes.transport.bio;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Arrays;
import org.apache.catalina.tribes.RemoteProcessException;
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.transport.AbstractSender;
import org.apache.catalina.tribes.transport.Constants;
import org.apache.catalina.tribes.transport.SenderState;
import org.apache.catalina.tribes.util.StringManager;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
/**
* Send cluster messages with only one socket. Ack and keep Alive Handling is supported
*
* @author Peter Rossbach
*
* @since 5.5.16
*
* @deprecated This will be removed in Tomcat 10
*/
@Deprecated
public class BioSender extends AbstractSender {
private static final Log log = LogFactory.getLog(BioSender.class);
/**
* The string manager for this package.
*/
protected static final StringManager sm = StringManager.getManager(BioSender.class);
// ----------------------------------------------------- Instance Variables
/**
* current sender socket
*/
private Socket socket = null;
private OutputStream soOut = null;
private InputStream soIn = null;
protected final XByteBuffer ackbuf = new XByteBuffer(Constants.ACK_COMMAND.length, true);
// ------------------------------------------------------------- Constructor
public BioSender() {
// NO-OP
}
// --------------------------------------------------------- Public Methods
/**
* Connect other cluster member receiver
*
* @see org.apache.catalina.tribes.transport.DataSender#connect()
*/
@Override
public void connect() throws IOException {
openSocket();
}
/**
* disconnect and close socket
*
* @see org.apache.catalina.tribes.transport.DataSender#disconnect()
*/
@Override
public void disconnect() {
boolean connect = isConnected();
closeSocket();
if (connect) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("bioSender.disconnect", getAddress().getHostAddress(),
Integer.valueOf(getPort()), Long.valueOf(0)));
}
}
}
/**
* Send message.
*
* @param data The data to send
* @param waitForAck Wait for an ack
*
* @throws IOException An IO error occurred sending the message
*/
public void sendMessage(byte[] data, boolean waitForAck) throws IOException {
IOException exception = null;
setAttempt(0);
try {
// first try with existing connection
pushMessage(data, false, waitForAck);
} catch (IOException x) {
SenderState.getSenderState(getDestination()).setSuspect();
exception = x;
if (log.isTraceEnabled()) {
log.trace(
sm.getString("bioSender.send.again", getAddress().getHostAddress(), Integer.valueOf(getPort())),
x);
}
while (getAttempt() < getMaxRetryAttempts()) {
try {
setAttempt(getAttempt() + 1);
// second try with fresh connection
pushMessage(data, true, waitForAck);
exception = null;
} catch (IOException xx) {
exception = xx;
closeSocket();
}
}
} finally {
setRequestCount(getRequestCount() + 1);
keepalive();
if (exception != null) {
throw exception;
}
}
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder("DataSender[(");
buf.append(super.toString()).append(')');
buf.append(getAddress()).append(':').append(getPort()).append(']');
return buf.toString();
}
// --------------------------------------------------------- Protected Methods
/**
* Open real socket and set time out when waitForAck is enabled is socket open return directly.
*
* @throws IOException Error opening socket
*/
protected void openSocket() throws IOException {
if (isConnected()) {
return;
}
try {
socket = new Socket();
InetSocketAddress sockaddr = new InetSocketAddress(getAddress(), getPort());
socket.connect(sockaddr, (int) getTimeout());
socket.setSendBufferSize(getTxBufSize());
socket.setReceiveBufferSize(getRxBufSize());
socket.setSoTimeout((int) getTimeout());
socket.setTcpNoDelay(getTcpNoDelay());
socket.setKeepAlive(getSoKeepAlive());
socket.setReuseAddress(getSoReuseAddress());
socket.setOOBInline(getOoBInline());
socket.setSoLinger(getSoLingerOn(), getSoLingerTime());
socket.setTrafficClass(getSoTrafficClass());
setConnected(true);
soOut = socket.getOutputStream();
soIn = socket.getInputStream();
setRequestCount(0);
setConnectTime(System.currentTimeMillis());
if (log.isDebugEnabled()) {
log.debug(sm.getString("bioSender.openSocket", getAddress().getHostAddress(),
Integer.valueOf(getPort()), Long.valueOf(0)));
}
} catch (IOException ex1) {
SenderState.getSenderState(getDestination()).setSuspect();
if (log.isDebugEnabled()) {
log.debug(sm.getString("bioSender.openSocket.failure", getAddress().getHostAddress(),
Integer.valueOf(getPort()), Long.valueOf(0)), ex1);
}
throw ex1;
}
}
/**
* Close socket.
*
* @see #disconnect()
*/
protected void closeSocket() {
if (isConnected()) {
if (socket != null) {
try {
socket.close();
} catch (IOException x) {
// Ignore
} finally {
socket = null;
soOut = null;
soIn = null;
}
}
setRequestCount(0);
setConnected(false);
if (log.isDebugEnabled()) {
log.debug(sm.getString("bioSender.closeSocket", getAddress().getHostAddress(),
Integer.valueOf(getPort()), Long.valueOf(0)));
}
}
}
/**
* Push messages with only one socket at a time Wait for ack is needed and make auto retry when write message is
* failed. After sending error close and reopen socket again. After successful sending update stats WARNING:
* Subclasses must be very careful that only one thread call this pushMessage at once!!!
*
* @see #closeSocket()
* @see #openSocket()
* @see #sendMessage(byte[], boolean)
*
* @param data Data to send
* @param reconnect Do a reconnect (close socket then reopen)
* @param waitForAck Wait for an acknowledgement
*
* @throws IOException IO error writing data
*
* @since 5.5.10
*/
protected void pushMessage(byte[] data, boolean reconnect, boolean waitForAck) throws IOException {
keepalive();
if (reconnect) {
closeSocket();
}
if (!isConnected()) {
openSocket();
}
soOut.write(data);
soOut.flush();
if (waitForAck) {
waitForAck();
}
SenderState.getSenderState(getDestination()).setReady();
}
/**
* Wait for Acknowledgement from other server. FIXME Please, not wait only for three characters, better control that
* the wait ack message is correct.
*
* @throws IOException An IO error occurred
*/
protected void waitForAck() throws java.io.IOException {
try {
boolean ackReceived = false;
boolean failAckReceived = false;
ackbuf.clear();
int bytesRead = 0;
int i = soIn.read();
while ((i != -1) && (bytesRead < Constants.ACK_COMMAND.length)) {
bytesRead++;
byte d = (byte) i;
ackbuf.append(d);
if (ackbuf.doesPackageExist()) {
byte[] ackcmd = ackbuf.extractDataPackage(true).getBytes();
ackReceived = Arrays.equals(ackcmd, org.apache.catalina.tribes.transport.Constants.ACK_DATA);
failAckReceived =
Arrays.equals(ackcmd, org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA);
ackReceived = ackReceived || failAckReceived;
break;
}
i = soIn.read();
}
if (!ackReceived) {
if (i == -1) {
throw new IOException(
sm.getString("bioSender.ack.eof", getAddress(), Integer.valueOf(socket.getLocalPort())));
} else {
throw new IOException(
sm.getString("bioSender.ack.wrong", getAddress(), Integer.valueOf(socket.getLocalPort())));
}
} else if (failAckReceived && getThrowOnFailedAck()) {
throw new RemoteProcessException(sm.getString("bioSender.fail.AckReceived"));
}
} catch (IOException x) {
String errmsg = sm.getString("bioSender.ack.missing", getAddress(), Integer.valueOf(socket.getLocalPort()),
Long.valueOf(getTimeout()));
if (SenderState.getSenderState(getDestination()).isReady()) {
SenderState.getSenderState(getDestination()).setSuspect();
if (log.isWarnEnabled()) {
log.warn(errmsg, x);
}
} else {
if (log.isDebugEnabled()) {
log.debug(errmsg, x);
}
}
throw x;
} finally {
ackbuf.clear();
}
}
}