TcpPingInterceptor.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.tribes.group.interceptors;
import java.lang.ref.WeakReference;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelInterceptor;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.ChannelInterceptorBase;
import org.apache.catalina.tribes.io.ChannelData;
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.util.StringManager;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
/**
* Sends a ping to all members. Configure this interceptor with the TcpFailureDetector below it, and the
* TcpFailureDetector will act as the membership guide.
*/
public class TcpPingInterceptor extends ChannelInterceptorBase implements TcpPingInterceptorMBean {
private static final Log log = LogFactory.getLog(TcpPingInterceptor.class);
protected static final StringManager sm = StringManager.getManager(TcpPingInterceptor.class);
protected static final byte[] TCP_PING_DATA = new byte[] { 79, -89, 115, 72, 121, -33, 67, -55, -97, 111, -119,
-128, -95, 91, 7, 20, 125, -39, 82, 91, -21, -33, 67, -102, -73, 126, -66, -113, -127, 103, 30, -74, 55, 21,
-66, -121, 69, 33, 76, -88, -65, 10, 77, 19, 83, 56, 21, 50, 85, -10, -108, -73, 58, -33, 33, 120, -111, 4,
125, -41, 114, -124, -64, -43 };
protected long interval = 1000; // 1 second
protected boolean useThread = false;
protected boolean staticOnly = false;
protected volatile boolean running = true;
protected PingThread thread = null;
protected static final AtomicInteger cnt = new AtomicInteger(0);
WeakReference<TcpFailureDetector> failureDetector = null;
WeakReference<StaticMembershipInterceptor> staticMembers = null;
@Override
public synchronized void start(int svc) throws ChannelException {
super.start(svc);
running = true;
if (thread == null && useThread) {
thread = new PingThread();
thread.setDaemon(true);
String channelName = "";
if (getChannel().getName() != null) {
channelName = "[" + getChannel().getName() + "]";
}
thread.setName("TcpPingInterceptor.PingThread" + channelName + "-" + cnt.addAndGet(1));
thread.start();
}
// acquire the interceptors to invoke on send ping events
ChannelInterceptor next = getNext();
while (next != null) {
if (next instanceof TcpFailureDetector) {
failureDetector = new WeakReference<>((TcpFailureDetector) next);
}
if (next instanceof StaticMembershipInterceptor) {
staticMembers = new WeakReference<>((StaticMembershipInterceptor) next);
}
next = next.getNext();
}
}
@Override
public synchronized void stop(int svc) throws ChannelException {
running = false;
if (thread != null) {
thread.interrupt();
thread = null;
}
super.stop(svc);
}
@Override
public void heartbeat() {
super.heartbeat();
if (!getUseThread()) {
sendPing();
}
}
@Override
public long getInterval() {
return interval;
}
public void setInterval(long interval) {
this.interval = interval;
}
public void setUseThread(boolean useThread) {
this.useThread = useThread;
}
public void setStaticOnly(boolean staticOnly) {
this.staticOnly = staticOnly;
}
@Override
public boolean getUseThread() {
return useThread;
}
public boolean getStaticOnly() {
return staticOnly;
}
protected void sendPing() {
TcpFailureDetector tcpFailureDetector = failureDetector != null ? failureDetector.get() : null;
if (tcpFailureDetector != null) {
// We have a reference to the failure detector
// Piggy back on it
tcpFailureDetector.checkMembers(true);
} else {
StaticMembershipInterceptor smi = staticOnly && staticMembers != null ? staticMembers.get() : null;
if (smi != null) {
sendPingMessage(smi.getMembers());
} else {
sendPingMessage(getMembers());
}
}
}
protected void sendPingMessage(Member[] members) {
if (members == null || members.length == 0) {
return;
}
ChannelData data = new ChannelData(true);// generates a unique Id
data.setAddress(getLocalMember(false));
data.setTimestamp(System.currentTimeMillis());
data.setOptions(getOptionFlag());
data.setMessage(new XByteBuffer(TCP_PING_DATA, false));
try {
super.sendMessage(members, data, null);
} catch (ChannelException x) {
log.warn(sm.getString("tcpPingInterceptor.ping.failed"), x);
}
}
@Override
public void messageReceived(ChannelMessage msg) {
// catch incoming
boolean process = true;
if (okToProcess(msg.getOptions())) {
// check to see if it is a ping message, if so, process = false
process = ((msg.getMessage().getLength() != TCP_PING_DATA.length) ||
(!Arrays.equals(TCP_PING_DATA, msg.getMessage().getBytes())));
} // end if
// ignore the message, it doesn't have the flag set
if (process) {
super.messageReceived(msg);
} else if (log.isTraceEnabled()) {
log.trace("Received a TCP ping packet:" + msg);
}
}// messageReceived
protected class PingThread extends Thread {
@Override
public void run() {
while (running) {
try {
sleep(interval);
sendPing();
} catch (InterruptedException ix) {
// Ignore. Probably triggered by a call to stop().
// In the highly unlikely event it was a different trigger,
// simply ignore it and continue.
} catch (Exception x) {
log.warn(sm.getString("tcpPingInterceptor.pingFailed.pingThread"), x);
}
}
}
}
}