ReceiverBase.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.catalina.tribes.transport;

import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.management.ObjectName;

import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.ChannelReceiver;
import org.apache.catalina.tribes.MessageListener;
import org.apache.catalina.tribes.io.ListenCallback;
import org.apache.catalina.tribes.jmx.JmxRegistry;
import org.apache.catalina.tribes.util.ExecutorFactory;
import org.apache.catalina.tribes.util.StringManager;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;

public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, RxTaskPool.TaskCreator {

    public static final int OPTION_DIRECT_BUFFER = 0x0004;

    private static final Log log = LogFactory.getLog(ReceiverBase.class);

    private static final Object bindLock = new Object();

    protected static final StringManager sm = StringManager.getManager(Constants.Package);

    private MessageListener listener;
    private String host = "auto";
    private InetAddress bind;
    private int port = 4000;
    private int udpPort = -1;
    private int securePort = -1;
    private int rxBufSize = Constants.DEFAULT_CLUSTER_MSG_BUFFER_SIZE;
    private int txBufSize = Constants.DEFAULT_CLUSTER_ACK_BUFFER_SIZE;
    private int udpRxBufSize = Constants.DEFAULT_CLUSTER_MSG_BUFFER_SIZE;
    private int udpTxBufSize = Constants.DEFAULT_CLUSTER_ACK_BUFFER_SIZE;

    private volatile boolean listen = false;
    private RxTaskPool pool;
    private boolean direct = true;
    private long tcpSelectorTimeout = 5000;
    // how many times to search for an available socket
    private int autoBind = 100;
    private int maxThreads = 15;
    private int minThreads = 6;
    private int maxTasks = 100;
    private int minTasks = 10;
    private boolean tcpNoDelay = true;
    private boolean soKeepAlive = false;
    private boolean ooBInline = true;
    private boolean soReuseAddress = true;
    private boolean soLingerOn = true;
    private int soLingerTime = 3;
    private int soTrafficClass = 0x04 | 0x08 | 0x010;
    private int timeout = 3000; // 3 seconds
    private boolean useBufferPool = true;
    private boolean daemon = true;
    private long maxIdleTime = 60000;

    private ExecutorService executor;
    private Channel channel;

    /**
     * the ObjectName of this Receiver.
     */
    private ObjectName oname = null;

    public ReceiverBase() {
    }

    @Override
    public void start() throws IOException {
        if (executor == null) {
            // executor = new ThreadPoolExecutor(minThreads,maxThreads,60,TimeUnit.SECONDS,new
            // LinkedBlockingQueue<Runnable>());
            String channelName = "";
            if (channel.getName() != null) {
                channelName = "[" + channel.getName() + "]";
            }
            TaskThreadFactory tf = new TaskThreadFactory("Tribes-Task-Receiver" + channelName + "-");
            executor = ExecutorFactory.newThreadPool(minThreads, maxThreads, maxIdleTime, TimeUnit.MILLISECONDS, tf);
        }
        // register jmx
        JmxRegistry jmxRegistry = JmxRegistry.getRegistry(channel);
        if (jmxRegistry != null) {
            this.oname = jmxRegistry.registerJmx(",component=Receiver", this);
        }
    }

    @Override
    public void stop() {
        if (executor != null) {
            executor.shutdownNow();// ignore left overs
        }
        executor = null;
        if (oname != null) {
            JmxRegistry jmxRegistry = JmxRegistry.getRegistry(channel);
            if (jmxRegistry != null) {
                jmxRegistry.unregisterJmx(oname);
            }
            oname = null;
        }
        channel = null;
    }

    @Override
    public MessageListener getMessageListener() {
        return listener;
    }

    @Override
    public int getPort() {
        return port;
    }

    public int getRxBufSize() {
        return rxBufSize;
    }

    public int getTxBufSize() {
        return txBufSize;
    }

    @Override
    public void setMessageListener(MessageListener listener) {
        this.listener = listener;
    }

    public void setRxBufSize(int rxBufSize) {
        this.rxBufSize = rxBufSize;
    }

    public void setTxBufSize(int txBufSize) {
        this.txBufSize = txBufSize;
    }

    /**
     * @return Returns the bind.
     */
    public InetAddress getBind() {
        if (bind == null) {
            try {
                if ("auto".equals(host)) {
                    host = InetAddress.getLocalHost().getHostAddress();
                }
                if (log.isDebugEnabled()) {
                    log.debug(sm.getString("receiverBase.start", host));
                }
                bind = InetAddress.getByName(host);
            } catch (IOException ioe) {
                log.error(sm.getString("receiverBase.bind.failed", host), ioe);
            }
        }
        return bind;
    }

    /**
     * Attempts to bind using the provided port and if that fails attempts to bind to each of the ports from portstart
     * to (portstart + retries -1) until either there are no more ports or the bind is successful. The address to bind
     * to is obtained via a call to {link {@link #getBind()}.
     *
     * @param socket    The socket to bind
     * @param portstart Starting port for bind attempts
     * @param retries   Number of times to attempt to bind (port incremented between attempts)
     *
     * @throws IOException Socket bind error
     */
    protected void bind(ServerSocket socket, int portstart, int retries) throws IOException {
        synchronized (bindLock) {
            InetSocketAddress addr = null;
            int port = portstart;
            while (retries > 0) {
                try {
                    addr = new InetSocketAddress(getBind(), port);
                    socket.bind(addr);
                    setPort(port);
                    log.info(sm.getString("receiverBase.socket.bind", addr));
                    retries = 0;
                } catch (IOException x) {
                    retries--;
                    if (retries <= 0) {
                        log.info(sm.getString("receiverBase.unable.bind", addr));
                        throw x;
                    }
                    port++;
                }
            }
        }
    }

    /**
     * Same as bind() except it does it for the UDP port
     *
     * @param socket    The socket to bind
     * @param portstart Starting port for bind attempts
     * @param retries   Number of times to attempt to bind (port incremented between attempts)
     *
     * @return int The retry count
     *
     * @throws IOException Socket bind error
     */
    protected int bindUdp(DatagramSocket socket, int portstart, int retries) throws IOException {
        InetSocketAddress addr = null;
        while (retries > 0) {
            try {
                addr = new InetSocketAddress(getBind(), portstart);
                socket.bind(addr);
                setUdpPort(portstart);
                log.info(sm.getString("receiverBase.udp.bind", addr));
                return 0;
            } catch (IOException x) {
                retries--;
                if (retries <= 0) {
                    log.info(sm.getString("receiverBase.unable.bind.udp", addr));
                    throw x;
                }
                portstart++;
                try {
                    Thread.sleep(25);
                } catch (InterruptedException ti) {
                    Thread.currentThread().interrupt();
                }
                retries = bindUdp(socket, portstart, retries);
            }
        }
        return retries;
    }


    @Override
    public void messageDataReceived(ChannelMessage data) {
        if (this.listener != null) {
            if (listener.accept(data)) {
                listener.messageReceived(data);
            }
        }
    }

    public int getWorkerThreadOptions() {
        int options = 0;
        if (getDirect()) {
            options = options | OPTION_DIRECT_BUFFER;
        }
        return options;
    }


    /**
     * @param bind The bind to set.
     */
    public void setBind(InetAddress bind) {
        this.bind = bind;
    }

    public boolean getDirect() {
        return direct;
    }


    public void setDirect(boolean direct) {
        this.direct = direct;
    }


    public String getAddress() {
        getBind();
        return this.host;
    }

    @Override
    public String getHost() {
        return getAddress();
    }

    public long getSelectorTimeout() {
        return tcpSelectorTimeout;
    }

    public boolean doListen() {
        return listen;
    }

    public MessageListener getListener() {
        return listener;
    }

    public RxTaskPool getTaskPool() {
        return pool;
    }

    public int getAutoBind() {
        return autoBind;
    }

    public int getMaxThreads() {
        return maxThreads;
    }

    public int getMinThreads() {
        return minThreads;
    }

    public boolean getTcpNoDelay() {
        return tcpNoDelay;
    }

    public boolean getSoKeepAlive() {
        return soKeepAlive;
    }

    public boolean getOoBInline() {
        return ooBInline;
    }


    public boolean getSoLingerOn() {
        return soLingerOn;
    }

    public int getSoLingerTime() {
        return soLingerTime;
    }

    public boolean getSoReuseAddress() {
        return soReuseAddress;
    }

    public int getSoTrafficClass() {
        return soTrafficClass;
    }

    public int getTimeout() {
        return timeout;
    }

    public boolean getUseBufferPool() {
        return useBufferPool;
    }

    @Override
    public int getSecurePort() {
        return securePort;
    }

    public int getMinTasks() {
        return minTasks;
    }

    public int getMaxTasks() {
        return maxTasks;
    }

    public ExecutorService getExecutor() {
        return executor;
    }

    public boolean isListening() {
        return listen;
    }

    public void setSelectorTimeout(long selTimeout) {
        tcpSelectorTimeout = selTimeout;
    }

    public void setListen(boolean doListen) {
        this.listen = doListen;
    }


    public void setAddress(String host) {
        this.host = host;
    }

    public void setHost(String host) {
        setAddress(host);
    }

    public void setListener(MessageListener listener) {
        this.listener = listener;
    }

    public void setPool(RxTaskPool pool) {
        this.pool = pool;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public void setAutoBind(int autoBind) {
        this.autoBind = autoBind;
        if (this.autoBind <= 0) {
            this.autoBind = 1;
        }
    }

    public void setMaxThreads(int maxThreads) {
        this.maxThreads = maxThreads;
    }

    public void setMinThreads(int minThreads) {
        this.minThreads = minThreads;
    }

    public void setTcpNoDelay(boolean tcpNoDelay) {
        this.tcpNoDelay = tcpNoDelay;
    }

    public void setSoKeepAlive(boolean soKeepAlive) {
        this.soKeepAlive = soKeepAlive;
    }

    public void setOoBInline(boolean ooBInline) {
        this.ooBInline = ooBInline;
    }


    public void setSoLingerOn(boolean soLingerOn) {
        this.soLingerOn = soLingerOn;
    }

    public void setSoLingerTime(int soLingerTime) {
        this.soLingerTime = soLingerTime;
    }

    public void setSoReuseAddress(boolean soReuseAddress) {
        this.soReuseAddress = soReuseAddress;
    }

    public void setSoTrafficClass(int soTrafficClass) {
        this.soTrafficClass = soTrafficClass;
    }

    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

    public void setUseBufferPool(boolean useBufferPool) {
        this.useBufferPool = useBufferPool;
    }

    public void setSecurePort(int securePort) {
        this.securePort = securePort;
    }

    public void setMinTasks(int minTasks) {
        this.minTasks = minTasks;
    }

    public void setMaxTasks(int maxTasks) {
        this.maxTasks = maxTasks;
    }

    public void setExecutor(ExecutorService executor) {
        this.executor = executor;
    }

    @Override
    public void heartbeat() {
        // empty operation
    }

    @Override
    public int getUdpPort() {
        return udpPort;
    }

    public void setUdpPort(int udpPort) {
        this.udpPort = udpPort;
    }

    public int getUdpRxBufSize() {
        return udpRxBufSize;
    }

    public void setUdpRxBufSize(int udpRxBufSize) {
        this.udpRxBufSize = udpRxBufSize;
    }

    public int getUdpTxBufSize() {
        return udpTxBufSize;
    }

    public void setUdpTxBufSize(int udpTxBufSize) {
        this.udpTxBufSize = udpTxBufSize;
    }

    @Override
    public Channel getChannel() {
        return channel;
    }

    @Override
    public void setChannel(Channel channel) {
        this.channel = channel;
    }

    // ---------------------------------------------- stats of the thread pool
    /**
     * Return the current number of threads that are managed by the pool.
     *
     * @return the current number of threads that are managed by the pool
     */
    public int getPoolSize() {
        if (executor instanceof ThreadPoolExecutor) {
            return ((ThreadPoolExecutor) executor).getPoolSize();
        } else {
            return -1;
        }
    }

    /**
     * Return the current number of threads that are in use.
     *
     * @return the current number of threads that are in use
     */
    public int getActiveCount() {
        if (executor instanceof ThreadPoolExecutor) {
            return ((ThreadPoolExecutor) executor).getActiveCount();
        } else {
            return -1;
        }
    }

    /**
     * Return the total number of tasks that have ever been scheduled for execution by the pool.
     *
     * @return the total number of tasks that have ever been scheduled for execution by the pool
     */
    public long getTaskCount() {
        if (executor instanceof ThreadPoolExecutor) {
            return ((ThreadPoolExecutor) executor).getTaskCount();
        } else {
            return -1;
        }
    }

    /**
     * Return the total number of tasks that have completed execution by the pool.
     *
     * @return the total number of tasks that have completed execution by the pool
     */
    public long getCompletedTaskCount() {
        if (executor instanceof ThreadPoolExecutor) {
            return ((ThreadPoolExecutor) executor).getCompletedTaskCount();
        } else {
            return -1;
        }
    }

    // ---------------------------------------------- ThreadFactory Inner Class
    class TaskThreadFactory implements ThreadFactory {
        final ThreadGroup group;
        final AtomicInteger threadNumber = new AtomicInteger(1);
        final String namePrefix;

        TaskThreadFactory(String namePrefix) {
            group = Thread.currentThread().getThreadGroup();
            this.namePrefix = namePrefix;
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement());
            t.setDaemon(daemon);
            t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

    public boolean isDaemon() {
        return daemon;
    }

    public long getMaxIdleTime() {
        return maxIdleTime;
    }

    public void setDaemon(boolean daemon) {
        this.daemon = daemon;
    }

    public void setMaxIdleTime(long maxIdleTime) {
        this.maxIdleTime = maxIdleTime;
    }

}