Channel.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.tribes;
import java.io.Serializable;
import java.util.StringJoiner;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
/**
* A channel is a representation of a group of nodes all participating in some sort of communication with each other.
* <p>
* The channel is the main API class for Tribes, this is essentially the only class that an application needs to be
* aware of. Through the channel the application can:
* <ul>
* <li>send messages</li>
* <li>receive message (by registering a <code>ChannelListener</code></li>
* <li>get all members of the group <code>getMembers()</code></li>
* <li>receive notifications of members added and members disappeared by registering a
* <code>MembershipListener</code></li>
* </ul>
* The channel has 5 major components:
* <ul>
* <li>Data receiver, with a built in thread pool to receive messages from other peers</li>
* <li>Data sender, an implementation for sending data using NIO or java.io</li>
* <li>Membership listener,listens for membership broadcasts</li>
* <li>Membership broadcaster, broadcasts membership pings.</li>
* <li>Channel interceptors, the ability to manipulate messages as they are sent or arrive</li>
* </ul>
* The channel layout is:
*
* <pre>
* <code>
* ChannelListener_1..ChannelListener_N MembershipListener_1..MembershipListener_N [Application Layer]
* \ \ / /
* \ \ / /
* \ \ / /
* \ \ / /
* \ \ / /
* \ \ / /
* ---------------------------------------
* |
* |
* Channel
* |
* ChannelInterceptor_1
* | [Channel stack]
* ChannelInterceptor_N
* |
* Coordinator (implements MessageListener,MembershipListener,ChannelInterceptor)
* --------------------
* / | \
* / | \
* / | \
* / | \
* / | \
* MembershipService ChannelSender ChannelReceiver [IO layer]
* </code>
* </pre>
*
* @see org.apache.catalina.tribes.group.GroupChannel example usage
*/
public interface Channel {
/**
* Start and stop sequences can be controlled by these constants. This allows you to start separate components of
* the channel.
* <p>
* DEFAULT - starts or stops all components in the channel
*
* @see #start(int)
* @see #stop(int)
*/
int DEFAULT = 15;
/**
* Start and stop sequences can be controlled by these constants. This allows you to start separate components of
* the channel.
* <p>
* SND_RX_SEQ - starts or stops the data receiver. Start means opening a server socket in case of a TCP
* implementation
*
* @see #start(int)
* @see #stop(int)
*/
int SND_RX_SEQ = 1;
/**
* Start and stop sequences can be controlled by these constants. This allows you to start separate components of
* the channel.
* <p>
* SND_TX_SEQ - starts or stops the data sender. This should not open any sockets, as sockets are opened on demand
* when a message is being sent
*
* @see #start(int)
* @see #stop(int)
*/
int SND_TX_SEQ = 2;
/**
* Start and stop sequences can be controlled by these constants. This allows you to start separate components of
* the channel.
* <p>
* MBR_RX_SEQ - starts or stops the membership listener. In a multicast implementation this will open a datagram
* socket and join a group and listen for membership messages members joining
*
* @see #start(int)
* @see #stop(int)
*/
int MBR_RX_SEQ = 4;
/**
* Start and stop sequences can be controlled by these constants. This allows you to start separate components of
* the channel.
* <p>
* MBR_TX_SEQ - starts or stops the membership broadcaster. In a multicast implementation this will open a datagram
* socket and join a group and broadcast the local member information
*
* @see #start(int)
* @see #stop(int)
*/
int MBR_TX_SEQ = 8;
/**
* Send options, when a message is sent, it can have an option flag to trigger certain behavior. Most flags are used
* to trigger channel interceptors as the message passes through the channel stack.
* <p>
* However, there are five default flags that every channel implementation must implement.
* <p>
* SEND_OPTIONS_BYTE_MESSAGE - The message is a pure byte message and no marshaling or unmarshaling will be
* performed.
*
* @see #send(Member[], Serializable , int)
* @see #send(Member[], Serializable, int, ErrorHandler)
*/
int SEND_OPTIONS_BYTE_MESSAGE = 0x0001;
/**
* Send options, when a message is sent, it can have an option flag to trigger certain behavior. Most flags are used
* to trigger channel interceptors as the message passes through the channel stack.
* <p>
* However, there are five default flags that every channel implementation must implement
* <p>
* SEND_OPTIONS_USE_ACK - Message is sent and an ACK is received when the message has been received by the
* recipient. If no ack is received, the message is not considered successful.
*
* @see #send(Member[], Serializable , int)
* @see #send(Member[], Serializable, int, ErrorHandler)
*/
int SEND_OPTIONS_USE_ACK = 0x0002;
/**
* Send options, when a message is sent, it can have an option flag to trigger certain behavior. Most flags are used
* to trigger channel interceptors as the message passes through the channel stack.
* <p>
* However, there are five default flags that every channel implementation must implement
* <p>
* SEND_OPTIONS_SYNCHRONIZED_ACK - Message is sent and an ACK is received when the message has been received and
* processed by the recipient. If no ack is received, the message is not considered successful
*
* @see #send(Member[], Serializable , int)
* @see #send(Member[], Serializable, int, ErrorHandler)
*/
int SEND_OPTIONS_SYNCHRONIZED_ACK = 0x0004;
/**
* Send options, when a message is sent, it can have an option flag to trigger certain behavior. Most flags are used
* to trigger channel interceptors as the message passes through the channel stack.
* <p>
* However, there are five default flags that every channel implementation must implement
* <p>
* SEND_OPTIONS_ASYNCHRONOUS - Message will be placed on a queue and sent by a separate thread. If the queue is
* full, behaviour depends on {@link MessageDispatchInterceptor#isAlwaysSend()}
*
* @see #send(Member[], Serializable , int)
* @see #send(Member[], Serializable, int, ErrorHandler)
*/
int SEND_OPTIONS_ASYNCHRONOUS = 0x0008;
/**
* Send options, when a message is sent, it can have an option flag to trigger certain behavior. Most flags are used
* to trigger channel interceptors as the message passes through the channel stack.
* <p>
* However, there are five default flags that every channel implementation must implement
* <p>
* SEND_OPTIONS_SECURE - Message is sent over an encrypted channel
*
* @see #send(Member[], Serializable , int)
* @see #send(Member[], Serializable, int, ErrorHandler)
*/
int SEND_OPTIONS_SECURE = 0x0010;
/**
* Send options. When a message is sent with this flag on the system sends the message using UDP instead of TCP.
*
* @see #send(Member[], Serializable , int)
* @see #send(Member[], Serializable, int, ErrorHandler)
*/
int SEND_OPTIONS_UDP = 0x0020;
/**
* Send options. When a message is sent with this flag on the system sends a UDP message on the Multicast address
* instead of UDP or TCP to individual addresses.
*
* @see #send(Member[], Serializable , int)
* @see #send(Member[], Serializable, int, ErrorHandler)
*/
int SEND_OPTIONS_MULTICAST = 0x0040;
/**
* Send options, when a message is sent, it can have an option flag to trigger certain behavior. Most flags are used
* to trigger channel interceptors as the message passes through the channel stack.
* <p>
* However, there are five default flags that every channel implementation must implement
* <p>
* SEND_OPTIONS_DEFAULT - the default sending options, just a helper variable. The default is
* <code>SEND_OPTIONS_USE_ACK</code>
*
* @see #SEND_OPTIONS_USE_ACK
* @see #send(Member[], Serializable , int)
* @see #send(Member[], Serializable, int, ErrorHandler)
*/
int SEND_OPTIONS_DEFAULT = SEND_OPTIONS_USE_ACK;
/**
* Adds an interceptor to the stack for message processing. Interceptors are ordered in the way they are added.
*
* <pre>
* <code>
* channel.addInterceptor(A);
* channel.addInterceptor(C);
* channel.addInterceptor(B);
* </code>
* </pre>
*
* Will result in an interceptor stack like this: <code>A -> C -> B</code>
* <p>
* The complete stack will look like this: <code>Channel -> A -> C -> B -> ChannelCoordinator</code>
*
* @param interceptor ChannelInterceptorBase
*/
void addInterceptor(ChannelInterceptor interceptor);
/**
* Starts up the channel. This can be called multiple times for individual services to start The svc parameter can
* be the logical or value of any constants.
*
* @param svc one of:
* <ul>
* <li>DEFAULT - will start all services</li>
* <li>MBR_RX_SEQ - starts the membership receiver</li>
* <li>MBR_TX_SEQ - starts the membership broadcaster</li>
* <li>SND_TX_SEQ - starts the replication transmitter</li>
* <li>SND_RX_SEQ - starts the replication receiver</li>
* </ul>
* <b>Note:</b> In order for the membership broadcaster to transmit the correct information, it has
* to be started after the replication receiver.
*
* @throws ChannelException if a startup error occurs or the service is already started or an error occurs.
*/
void start(int svc) throws ChannelException;
/**
* Shuts down the channel. This can be called multiple times for individual services to shutdown The svc parameter
* can be the logical or value of any constants
*
* @param svc one of:
* <ul>
* <li>DEFAULT - will shutdown all services</li>
* <li>MBR_RX_SEQ - stops the membership receiver</li>
* <li>MBR_TX_SEQ - stops the membership broadcaster</li>
* <li>SND_TX_SEQ - stops the replication transmitter</li>
* <li>SND_RX_SEQ - stops the replication receiver</li>
* </ul>
*
* @throws ChannelException if a startup error occurs or the service is already stopped or an error occurs.
*/
void stop(int svc) throws ChannelException;
/**
* Send a message to one or more members in the cluster
*
* @param destination Member[] - the destinations, cannot be null or zero length, the reason for that is that a
* membership change can occur and at that time the application is uncertain what group the
* message actually got sent to.
* @param msg Serializable - the message to send, has to be serializable, or a <code>ByteMessage</code> to
* send a pure byte array
* @param options int - sender options, see class documentation for each interceptor that is configured in order
* to trigger interceptors
*
* @return a unique Id that identifies the message that is sent
*
* @throws ChannelException if a serialization error happens.
*
* @see ByteMessage
* @see #SEND_OPTIONS_USE_ACK
* @see #SEND_OPTIONS_ASYNCHRONOUS
* @see #SEND_OPTIONS_SYNCHRONIZED_ACK
*/
UniqueId send(Member[] destination, Serializable msg, int options) throws ChannelException;
/**
* Send a message to one or more members in the cluster
*
* @param destination Member[] - the destinations, null or zero length means all
* @param msg ClusterMessage - the message to send
* @param options int - sender options, see class documentation
* @param handler ErrorHandler - handle errors through a callback, rather than throw it
*
* @return a unique Id that identifies the message that is sent
*
* @exception ChannelException - if a serialization error happens.
*/
UniqueId send(Member[] destination, Serializable msg, int options, ErrorHandler handler) throws ChannelException;
/**
* Sends a heart beat through the interceptor stacks. Use this method to alert interceptors and other components to
* clean up garbage, timed out messages etc.
* <p>
* If you application has a background thread, then you can save one thread, by configuring your channel to not use
* an internal heartbeat thread and invoking this method.
*
* @see #setHeartbeat(boolean)
*/
void heartbeat();
/**
* Enables or disables internal heartbeat.
*
* @param enable boolean - default value is implementation specific
*
* @see #heartbeat()
*/
void setHeartbeat(boolean enable);
/**
* Add a membership listener, will get notified when a new member joins, leaves or crashes.
* <p>
* If the membership listener implements the Heartbeat interface the <code>heartbeat()</code> method will be invoked
* when the heartbeat runs on the channel
*
* @param listener MembershipListener
*
* @see MembershipListener
*/
void addMembershipListener(MembershipListener listener);
/**
* Add a channel listener, this is a callback object when messages are received.
* <p>
* If the channel listener implements the Heartbeat interface the <code>heartbeat()</code> method will be invoked
* when the heartbeat runs on the channel
*
* @param listener ChannelListener
*
* @see ChannelListener
* @see Heartbeat
*/
void addChannelListener(ChannelListener listener);
/**
* Remove a membership listener, listeners are removed based on {@link Object#hashCode()} and
* {@link Object#equals(Object)}.
*
* @param listener MembershipListener
*
* @see MembershipListener
*/
void removeMembershipListener(MembershipListener listener);
/**
* Remove a channel listener, listeners are removed based on {@link Object#hashCode()} and
* {@link Object#equals(Object)}.
*
* @param listener ChannelListener
*
* @see ChannelListener
*/
void removeChannelListener(ChannelListener listener);
/**
* Returns true if there are any members in the group. This call is the same as
* <code>getMembers().length > 0</code>
*
* @return boolean - true if there are any members automatically discovered
*/
boolean hasMembers();
/**
* Get all current group members.
*
* @return all members or empty array, never null
*/
Member[] getMembers();
/**
* Return the member that represents this node. This is also the data that gets broadcasted through the membership
* broadcaster component
*
* @param incAlive - optimization, true if you want it to calculate alive time since the membership service started.
*
* @return Member
*/
Member getLocalMember(boolean incAlive);
/**
* Returns the member from the membership service with complete and recent data. Some implementations might
* serialize and send membership information along with a message, and instead of sending complete membership
* details, only send the primary identifier for the member but not the payload or other information. When such
* message is received the application can retrieve the cached member through this call. In most cases, this is not
* necessary.
*
* @param mbr Member
*
* @return Member
*/
Member getMember(Member mbr);
/**
* Return the name of this channel.
*
* @return channel name
*/
String getName();
/**
* Set the name of this channel
*
* @param name The new channel name
*/
void setName(String name);
/**
* Return executor that can be used for utility tasks.
*
* @return the executor
*/
ScheduledExecutorService getUtilityExecutor();
/**
* Set the executor that can be used for utility tasks.
*
* @param utilityExecutor the executor
*/
void setUtilityExecutor(ScheduledExecutorService utilityExecutor);
/**
* Translates the name of an option to its integer value. Valid option names are "asynchronous" (alias "async"),
* "byte_message" (alias "byte"), "multicast", "secure", "synchronized_ack" (alias "sync"), "udp", "use_ack"
*
* @param opt The name of the option
*
* @return the int value of the passed option name
*/
static int getSendOptionValue(String opt) {
switch (opt) {
case "asynchronous":
case "async":
return SEND_OPTIONS_ASYNCHRONOUS;
case "byte_message":
case "byte":
return SEND_OPTIONS_BYTE_MESSAGE;
case "multicast":
return SEND_OPTIONS_MULTICAST;
case "secure":
return SEND_OPTIONS_SECURE;
case "synchronized_ack":
case "sync":
return SEND_OPTIONS_SYNCHRONIZED_ACK;
case "udp":
return SEND_OPTIONS_UDP;
case "use_ack":
return SEND_OPTIONS_USE_ACK;
}
throw new IllegalArgumentException(String.format("[%s] is not a valid option", opt));
}
/**
* Translates a comma separated list of option names to their bitwise-ORd value
*
* @param input A comma separated list of options, e.g. "async, multicast"
*
* @return a bitwise ORd value of the passed option names
*/
static int parseSendOptions(String input) {
try {
return Integer.parseInt(input);
} catch (NumberFormatException nfe) {
final Log log = LogFactory.getLog(Channel.class);
log.trace(String.format("Failed to parse [%s] as integer, channelSendOptions possibly set by name(s)",
input));
}
String[] options = input.split("\\s*,\\s*");
int result = 0;
for (String opt : options) {
result |= getSendOptionValue(opt);
}
return result;
}
/**
* Translates an integer value of SendOptions to its human-friendly comma separated value list for use in JMX and
* such.
*
* @param input the int value of SendOptions
*
* @return the human-friendly string representation in a reverse order (i.e. the last option will be shown first)
*/
static String getSendOptionsAsString(int input) {
// allOptionNames must be in order of the bits of the available options
final String[] allOptionNames =
new String[] { "byte", "use_ack", "sync", "async", "secure", "udp", "multicast" };
StringJoiner names = new StringJoiner(", ");
for (int bit = allOptionNames.length - 1; bit >= 0; bit--) {
// if the bit is set then add the name to the result
if (((1 << bit) & input) > 0) {
names.add(allOptionNames[bit]);
}
}
return names.toString();
}
}