TcpPingInterceptor.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.catalina.tribes.group.interceptors;

import java.lang.ref.WeakReference;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelInterceptor;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.ChannelInterceptorBase;
import org.apache.catalina.tribes.io.ChannelData;
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.util.StringManager;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;

/**
 * Sends a ping to all members.
 * Configure this interceptor with the TcpFailureDetector below it,
 * and the TcpFailureDetector will act as the membership guide.
 */

public class TcpPingInterceptor extends ChannelInterceptorBase implements TcpPingInterceptorMBean {

    private static final Log log = LogFactory.getLog(TcpPingInterceptor.class);
    protected static final StringManager sm = StringManager.getManager(TcpPingInterceptor.class);

    protected static final byte[] TCP_PING_DATA = new byte[] {
        79, -89, 115, 72, 121, -33, 67, -55, -97, 111, -119, -128, -95, 91, 7, 20,
        125, -39, 82, 91, -21, -33, 67, -102, -73, 126, -66, -113, -127, 103, 30, -74,
        55, 21, -66, -121, 69, 33, 76, -88, -65, 10, 77, 19, 83, 56, 21, 50,
        85, -10, -108, -73, 58, -33, 33, 120, -111, 4, 125, -41, 114, -124, -64, -43};

    protected long interval = 1000; //1 second

    protected boolean useThread = false;
    protected boolean staticOnly = false;
    protected volatile boolean running = true;
    protected PingThread thread = null;
    protected static final AtomicInteger cnt = new AtomicInteger(0);

    WeakReference<TcpFailureDetector> failureDetector = null;
    WeakReference<StaticMembershipInterceptor> staticMembers = null;

    @Override
    public synchronized void start(int svc) throws ChannelException {
        super.start(svc);
        running = true;
        if ( thread == null && useThread) {
            thread = new PingThread();
            thread.setDaemon(true);
            String channelName = "";
            if (getChannel().getName() != null) {
                channelName = "[" + getChannel().getName() + "]";
            }
            thread.setName("TcpPingInterceptor.PingThread" + channelName +"-"+cnt.addAndGet(1));
            thread.start();
        }

        //acquire the interceptors to invoke on send ping events
        ChannelInterceptor next = getNext();
        while ( next != null ) {
            if ( next instanceof TcpFailureDetector ) {
                failureDetector = new WeakReference<>((TcpFailureDetector)next);
            }
            if ( next instanceof StaticMembershipInterceptor ) {
                staticMembers = new WeakReference<>((StaticMembershipInterceptor)next);
            }
            next = next.getNext();
        }

    }

    @Override
    public synchronized void stop(int svc) throws ChannelException {
        running = false;
        if (thread != null) {
            thread.interrupt();
            thread = null;
        }
        super.stop(svc);
    }

    @Override
    public void heartbeat() {
        super.heartbeat();
        if (!getUseThread()) {
            sendPing();
        }
    }

    @Override
    public long getInterval() {
        return interval;
    }

    public void setInterval(long interval) {
        this.interval = interval;
    }

    public void setUseThread(boolean useThread) {
        this.useThread = useThread;
    }

    public void setStaticOnly(boolean staticOnly) {
        this.staticOnly = staticOnly;
    }

    @Override
    public boolean getUseThread() {
        return useThread;
    }

    public boolean getStaticOnly() {
        return staticOnly;
    }

    protected void sendPing() {
        TcpFailureDetector tcpFailureDetector =
                failureDetector != null ? failureDetector.get() : null;
        if (tcpFailureDetector != null) {
            // We have a reference to the failure detector
            // Piggy back on it
            tcpFailureDetector.checkMembers(true);
        } else {
            StaticMembershipInterceptor smi =
                    staticOnly && staticMembers != null ? staticMembers.get() : null;
            if (smi != null) {
                sendPingMessage(smi.getMembers());
            } else {
                sendPingMessage(getMembers());
            }
        }
    }

    protected void sendPingMessage(Member[] members) {
        if ( members == null || members.length == 0 ) {
            return;
        }
        ChannelData data = new ChannelData(true);//generates a unique Id
        data.setAddress(getLocalMember(false));
        data.setTimestamp(System.currentTimeMillis());
        data.setOptions(getOptionFlag());
        data.setMessage(new XByteBuffer(TCP_PING_DATA, false));
        try {
            super.sendMessage(members, data, null);
        }catch (ChannelException x) {
            log.warn(sm.getString("tcpPingInterceptor.ping.failed"),x);
        }
    }

    @Override
    public void messageReceived(ChannelMessage msg) {
        //catch incoming
        boolean process = true;
        if ( okToProcess(msg.getOptions()) ) {
            //check to see if it is a ping message, if so, process = false
            process = ( (msg.getMessage().getLength() != TCP_PING_DATA.length) ||
                        (!Arrays.equals(TCP_PING_DATA,msg.getMessage().getBytes()) ) );
        }//end if

        //ignore the message, it doesn't have the flag set
        if ( process ) {
            super.messageReceived(msg);
        } else if ( log.isTraceEnabled() ) {
            log.trace("Received a TCP ping packet:" + msg);
        }
    }//messageReceived

    protected class PingThread extends Thread {
        @Override
        public void run() {
            while (running) {
                try {
                    sleep(interval);
                    sendPing();
                }catch ( InterruptedException ix ) {
                    // Ignore. Probably triggered by a call to stop().
                    // In the highly unlikely event it was a different trigger,
                    // simply ignore it and continue.
                }catch ( Exception x )  {
                    log.warn(sm.getString("tcpPingInterceptor.pingFailed.pingThread"),x);
                }
            }
        }
    }


}