TcpSender.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.ha.backend;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.StringTokenizer;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.res.StringManager;
/*
* Sender to proxies using multicast socket.
*/
public class TcpSender implements Sender {
private static final Log log = LogFactory.getLog(HeartbeatListener.class);
private static final StringManager sm = StringManager.getManager(TcpSender.class);
HeartbeatListener config = null;
/**
* Proxies.
*/
protected Proxy[] proxies = null;
/**
* Active connections.
*/
protected Socket[] connections = null;
protected BufferedReader[] connectionReaders = null;
protected BufferedWriter[] connectionWriters = null;
@Override
public void init(HeartbeatListener config) throws Exception {
this.config = config;
StringTokenizer tok = new StringTokenizer(config.getProxyList(), ",");
proxies = new Proxy[tok.countTokens()];
int i = 0;
while (tok.hasMoreTokens()) {
String token = tok.nextToken().trim();
int pos = token.indexOf(':');
if (pos <= 0) {
throw new Exception(sm.getString("tcpSender.invalidProxyList"));
}
proxies[i] = new Proxy();
proxies[i].port = Integer.parseInt(token.substring(pos + 1));
try {
proxies[i].address = InetAddress.getByName(token.substring(0, pos));
} catch (Exception e) {
throw new Exception(sm.getString("tcpSender.invalidProxyList"));
}
i++;
}
connections = new Socket[proxies.length];
connectionReaders = new BufferedReader[proxies.length];
connectionWriters = new BufferedWriter[proxies.length];
}
@Override
public int send(String mess) throws Exception {
if (connections == null) {
log.error(sm.getString("tcpSender.notInitialized"));
return -1;
}
String requestLine = "POST " + config.getProxyURL() + " HTTP/1.0";
for (int i = 0; i < connections.length; i++) {
if (connections[i] == null) {
try {
if (config.getHost() != null) {
connections[i] = new Socket();
InetAddress addr = InetAddress.getByName(config.getHost());
InetSocketAddress addrs = new InetSocketAddress(addr, 0);
connections[i].setReuseAddress(true);
connections[i].bind(addrs);
addrs = new InetSocketAddress(proxies[i].address, proxies[i].port);
connections[i].connect(addrs);
} else {
connections[i] = new Socket(proxies[i].address, proxies[i].port);
}
connectionReaders[i] = new BufferedReader(new InputStreamReader(connections[i].getInputStream()));
connectionWriters[i] = new BufferedWriter(new OutputStreamWriter(connections[i].getOutputStream()));
} catch (Exception ex) {
log.error(sm.getString("tcpSender.connectionFailed"), ex);
close(i);
}
}
if (connections[i] == null) {
continue; // try next proxy in the list
}
BufferedWriter writer = connectionWriters[i];
try {
writer.write(requestLine);
writer.write("\r\n");
writer.write("Content-Length: " + mess.length() + "\r\n");
writer.write("User-Agent: HeartbeatListener/1.0\r\n");
writer.write("Connection: Keep-Alive\r\n");
writer.write("\r\n");
writer.write(mess);
writer.write("\r\n");
writer.flush();
} catch (Exception ex) {
log.error(sm.getString("tcpSender.sendFailed"), ex);
close(i);
}
if (connections[i] == null) {
continue; // try next proxy in the list
}
/* Read httpd answer */
String responseStatus = connectionReaders[i].readLine();
if (responseStatus == null) {
log.error(sm.getString("tcpSender.responseError"));
close(i);
continue;
} else {
responseStatus = responseStatus.substring(responseStatus.indexOf(' ') + 1,
responseStatus.indexOf(' ', responseStatus.indexOf(' ') + 1));
int status = Integer.parseInt(responseStatus);
if (status != 200) {
log.error(sm.getString("tcpSender.responseErrorCode", Integer.valueOf(status)));
close(i);
continue;
}
// read all the headers.
String header = connectionReaders[i].readLine();
int contentLength = 0;
while (header != null && !header.isEmpty()) {
int colon = header.indexOf(':');
String headerName = header.substring(0, colon).trim();
String headerValue = header.substring(colon + 1).trim();
if ("content-length".equalsIgnoreCase(headerName)) {
contentLength = Integer.parseInt(headerValue);
}
header = connectionReaders[i].readLine();
}
if (contentLength > 0) {
char[] buf = new char[512];
while (contentLength > 0) {
int thisTime = (contentLength > buf.length) ? buf.length : contentLength;
int n = connectionReaders[i].read(buf, 0, thisTime);
if (n <= 0) {
log.error(sm.getString("tcpSender.readError"));
close(i);
break;
} else {
contentLength -= n;
}
}
}
}
}
return 0;
}
/**
* Close connection.
*
* @param i The index of the connection that will be closed
*/
protected void close(int i) {
try {
if (connectionReaders[i] != null) {
connectionReaders[i].close();
}
} catch (IOException e) {
}
connectionReaders[i] = null;
try {
if (connectionWriters[i] != null) {
connectionWriters[i].close();
}
} catch (IOException e) {
}
connectionWriters[i] = null;
try {
if (connections[i] != null) {
connections[i].close();
}
} catch (IOException e) {
}
connections[i] = null;
}
}