GroupChannel.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.tribes.group;
import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.catalina.tribes.ByteMessage;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelInterceptor;
import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.ChannelReceiver;
import org.apache.catalina.tribes.ChannelSender;
import org.apache.catalina.tribes.ErrorHandler;
import org.apache.catalina.tribes.Heartbeat;
import org.apache.catalina.tribes.JmxChannel;
import org.apache.catalina.tribes.ManagedChannel;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
import org.apache.catalina.tribes.MembershipService;
import org.apache.catalina.tribes.RemoteProcessException;
import org.apache.catalina.tribes.UniqueId;
import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor;
import org.apache.catalina.tribes.io.BufferPool;
import org.apache.catalina.tribes.io.ChannelData;
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.jmx.JmxRegistry;
import org.apache.catalina.tribes.util.Arrays;
import org.apache.catalina.tribes.util.Logs;
import org.apache.catalina.tribes.util.StringManager;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
/**
* The default implementation of a Channel.<br>
* The GroupChannel manages the replication channel. It coordinates message being sent and received with membership
* announcements. The channel has an chain of interceptors that can modify the message or perform other logic.<br>
* It manages a complete group, both membership and replication.
*/
public class GroupChannel extends ChannelInterceptorBase implements ManagedChannel, JmxChannel, GroupChannelMBean {
private static final Log log = LogFactory.getLog(GroupChannel.class);
protected static final StringManager sm = StringManager.getManager(GroupChannel.class);
/**
* Flag to determine if the channel manages its own heartbeat If set to true, the channel will start a local thread
* for the heart beat.
*/
protected boolean heartbeat = true;
/**
* If <code>heartbeat == true</code> then how often do we want this heartbeat to run. The default value is 5000
* milliseconds.
*/
protected long heartbeatSleeptime = 5 * 1000;
/**
* Internal heartbeat future
*/
protected ScheduledFuture<?> heartbeatFuture = null;
protected ScheduledFuture<?> monitorFuture;
/**
* The <code>ChannelCoordinator</code> coordinates the bottom layer components:<br>
* - MembershipService<br>
* - ChannelSender <br>
* - ChannelReceiver<br>
*/
protected final ChannelCoordinator coordinator = new ChannelCoordinator();
/**
* The first interceptor in the interceptor stack. The interceptors are chained in a linked list, so we only need a
* reference to the first one
*/
protected ChannelInterceptor interceptors = null;
/**
* A list of membership listeners that subscribe to membership announcements
*/
protected final List<MembershipListener> membershipListeners = new CopyOnWriteArrayList<>();
/**
* A list of channel listeners that subscribe to incoming messages
*/
protected final List<ChannelListener> channelListeners = new CopyOnWriteArrayList<>();
/**
* If set to true, the GroupChannel will check to make sure that
*/
protected boolean optionCheck = false;
/**
* the name of this channel.
*/
protected String name = null;
/**
* the jmx domain which this channel is registered.
*/
private String jmxDomain = "ClusterChannel";
/**
* the jmx prefix which will be used with channel ObjectName.
*/
private String jmxPrefix = "";
/**
* If set to true, this channel is registered with jmx.
*/
private boolean jmxEnabled = true;
/**
* Executor service.
*/
protected ScheduledExecutorService utilityExecutor = null;
/**
* the ObjectName of this channel.
*/
private ObjectName oname = null;
/**
* Creates a GroupChannel. This constructor will also add the first interceptor in the GroupChannel.<br>
* The first interceptor is always the channel itself.
*/
public GroupChannel() {
addInterceptor(this);
}
@Override
public void addInterceptor(ChannelInterceptor interceptor) {
if (interceptors == null) {
interceptors = interceptor;
interceptors.setNext(coordinator);
interceptors.setPrevious(null);
coordinator.setPrevious(interceptors);
} else {
ChannelInterceptor last = interceptors;
while (last.getNext() != coordinator) {
last = last.getNext();
}
last.setNext(interceptor);
interceptor.setNext(coordinator);
interceptor.setPrevious(last);
coordinator.setPrevious(interceptor);
}
}
/**
* Sends a heartbeat through the interceptor stack.<br>
* Invoke this method from the application on a periodic basis if you have turned off internal heartbeats
* <code>channel.setHeartbeat(false)</code>
*/
@Override
public void heartbeat() {
super.heartbeat();
for (MembershipListener listener : membershipListeners) {
if (listener instanceof Heartbeat) {
((Heartbeat) listener).heartbeat();
}
}
for (ChannelListener listener : channelListeners) {
if (listener instanceof Heartbeat) {
((Heartbeat) listener).heartbeat();
}
}
}
@Override
public UniqueId send(Member[] destination, Serializable msg, int options) throws ChannelException {
return send(destination, msg, options, null);
}
@Override
public UniqueId send(Member[] destination, Serializable msg, int options, ErrorHandler handler)
throws ChannelException {
if (msg == null) {
throw new ChannelException(sm.getString("groupChannel.nullMessage"));
}
XByteBuffer buffer = null;
try {
if (destination == null || destination.length == 0) {
throw new ChannelException(sm.getString("groupChannel.noDestination"));
}
ChannelData data = new ChannelData(true);// generates a unique Id
data.setAddress(getLocalMember(false));
data.setTimestamp(System.currentTimeMillis());
byte[] b = null;
if (msg instanceof ByteMessage) {
b = ((ByteMessage) msg).getMessage();
options = options | SEND_OPTIONS_BYTE_MESSAGE;
} else {
b = XByteBuffer.serialize(msg);
options = options & (~SEND_OPTIONS_BYTE_MESSAGE);
}
data.setOptions(options);
// XByteBuffer buffer = new XByteBuffer(b.length+128,false);
buffer = BufferPool.getBufferPool().getBuffer(b.length + 128, false);
buffer.append(b, 0, b.length);
data.setMessage(buffer);
InterceptorPayload payload = null;
if (handler != null) {
payload = new InterceptorPayload();
payload.setErrorHandler(handler);
}
getFirstInterceptor().sendMessage(destination, data, payload);
if (Logs.MESSAGES.isTraceEnabled()) {
Logs.MESSAGES.trace("GroupChannel - Sent msg:" + new UniqueId(data.getUniqueId()) + " at " +
new java.sql.Timestamp(System.currentTimeMillis()) + " to " + Arrays.toNameString(destination));
Logs.MESSAGES.trace("GroupChannel - Send Message:" + new UniqueId(data.getUniqueId()) + " is " + msg);
}
return new UniqueId(data.getUniqueId());
} catch (RuntimeException | IOException e) {
throw new ChannelException(e);
} finally {
if (buffer != null) {
BufferPool.getBufferPool().returnBuffer(buffer);
}
}
}
/**
* Callback from the interceptor stack. <br>
* When a message is received from a remote node, this method will be invoked by the previous interceptor.<br>
* This method can also be used to send a message to other components within the same application, but its an
* extreme case, and you're probably better off doing that logic between the applications itself.
*
* @param msg ChannelMessage
*/
@Override
public void messageReceived(ChannelMessage msg) {
if (msg == null) {
return;
}
try {
if (Logs.MESSAGES.isTraceEnabled()) {
Logs.MESSAGES.trace("GroupChannel - Received msg:" + new UniqueId(msg.getUniqueId()) + " at " +
new java.sql.Timestamp(System.currentTimeMillis()) + " from " + msg.getAddress().getName());
}
Serializable fwd = null;
if ((msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE) {
fwd = new ByteMessage(msg.getMessage().getBytes());
} else {
try {
fwd = XByteBuffer.deserialize(msg.getMessage().getBytesDirect(), 0, msg.getMessage().getLength());
} catch (Exception sx) {
log.error(sm.getString("groupChannel.unable.deserialize", msg), sx);
return;
}
}
if (Logs.MESSAGES.isTraceEnabled()) {
Logs.MESSAGES.trace("GroupChannel - Receive Message:" + new UniqueId(msg.getUniqueId()) + " is " + fwd);
}
// get the actual member with the correct alive time
Member source = msg.getAddress();
boolean rx = false;
boolean delivered = false;
for (ChannelListener channelListener : channelListeners) {
if (channelListener != null && channelListener.accept(fwd, source)) {
channelListener.messageReceived(fwd, source);
delivered = true;
// if the message was accepted by an RPC channel, that channel
// is responsible for returning the reply, otherwise we send an absence reply
if (channelListener instanceof RpcChannel) {
rx = true;
}
}
} // for
if ((!rx) && (fwd instanceof RpcMessage)) {
// if we have a message that requires a response,
// but none was given, send back an immediate one
sendNoRpcChannelReply((RpcMessage) fwd, source);
}
if (Logs.MESSAGES.isTraceEnabled()) {
Logs.MESSAGES.trace("GroupChannel delivered[" + delivered + "] id:" + new UniqueId(msg.getUniqueId()));
}
} catch (Exception x) {
// this could be the channel listener throwing an exception, we should log it
// as a warning.
if (log.isWarnEnabled()) {
log.warn(sm.getString("groupChannel.receiving.error"), x);
}
throw new RemoteProcessException(sm.getString("groupChannel.receiving.error"), x);
}
}
/**
* Sends a <code>NoRpcChannelReply</code> message to a member<br>
* This method gets invoked by the channel if an RPC message comes in and no channel listener accepts the message.
* This avoids timeout
*
* @param msg RpcMessage
* @param destination Member - the destination for the reply
*/
protected void sendNoRpcChannelReply(RpcMessage msg, Member destination) {
try {
// avoid circular loop
if (msg instanceof RpcMessage.NoRpcChannelReply) {
return;
}
RpcMessage.NoRpcChannelReply reply = new RpcMessage.NoRpcChannelReply(msg.rpcId, msg.uuid);
send(new Member[] { destination }, reply, SEND_OPTIONS_ASYNCHRONOUS);
} catch (Exception x) {
log.error(sm.getString("groupChannel.sendFail.noRpcChannelReply"), x);
}
}
/**
* memberAdded gets invoked by the interceptor below the channel and the channel will broadcast it to the membership
* listeners
*
* @param member Member - the new member
*/
@Override
public void memberAdded(Member member) {
// notify upwards
for (MembershipListener membershipListener : membershipListeners) {
if (membershipListener != null) {
membershipListener.memberAdded(member);
}
}
}
/**
* memberDisappeared gets invoked by the interceptor below the channel and the channel will broadcast it to the
* membership listeners
*
* @param member Member - the member that left or crashed
*/
@Override
public void memberDisappeared(Member member) {
// notify upwards
for (MembershipListener membershipListener : membershipListeners) {
if (membershipListener != null) {
membershipListener.memberDisappeared(member);
}
}
}
/**
* Sets up the default implementation interceptor stack if no interceptors have been added
*
* @throws ChannelException Cluster error
*/
protected synchronized void setupDefaultStack() throws ChannelException {
if (getFirstInterceptor() != null && ((getFirstInterceptor().getNext() instanceof ChannelCoordinator))) {
addInterceptor(new MessageDispatchInterceptor());
}
Iterator<ChannelInterceptor> interceptors = getInterceptors();
while (interceptors.hasNext()) {
ChannelInterceptor channelInterceptor = interceptors.next();
channelInterceptor.setChannel(this);
}
coordinator.setChannel(this);
}
/**
* Validates the option flags that each interceptor is using and reports an error if two interceptor share the same
* flag.
*
* @throws ChannelException Error with option flag
*/
protected void checkOptionFlags() throws ChannelException {
StringBuilder conflicts = new StringBuilder();
ChannelInterceptor first = interceptors;
while (first != null) {
int flag = first.getOptionFlag();
if (flag != 0) {
ChannelInterceptor next = first.getNext();
while (next != null) {
int nflag = next.getOptionFlag();
if (nflag != 0 && (((flag & nflag) == flag) || ((flag & nflag) == nflag))) {
conflicts.append('[');
conflicts.append(first.getClass().getName());
conflicts.append(':');
conflicts.append(flag);
conflicts.append(" == ");
conflicts.append(next.getClass().getName());
conflicts.append(':');
conflicts.append(nflag);
conflicts.append("] ");
} // end if
next = next.getNext();
} // while
} // end if
first = first.getNext();
} // while
if (conflicts.length() > 0) {
throw new ChannelException(sm.getString("groupChannel.optionFlag.conflict", conflicts.toString()));
}
}
protected boolean ownExecutor = false;
@Override
public synchronized void start(int svc) throws ChannelException {
setupDefaultStack();
if (optionCheck) {
checkOptionFlags();
}
// register jmx
JmxRegistry jmxRegistry = JmxRegistry.getRegistry(this);
if (jmxRegistry != null) {
this.oname = jmxRegistry.registerJmx(",component=Channel", this);
}
if (utilityExecutor == null) {
log.warn(sm.getString("groupChannel.warn.noUtilityExecutor"));
utilityExecutor = new ScheduledThreadPoolExecutor(1);
ownExecutor = true;
}
super.start(svc);
monitorFuture = utilityExecutor.scheduleWithFixedDelay(this::startHeartbeat, 0, 60, TimeUnit.SECONDS);
}
protected void startHeartbeat() {
if (heartbeat && (heartbeatFuture == null || (heartbeatFuture != null && heartbeatFuture.isDone()))) {
if (heartbeatFuture != null && heartbeatFuture.isDone()) {
// There was an error executing the scheduled task, get it and log it
try {
heartbeatFuture.get();
} catch (InterruptedException | ExecutionException e) {
log.error(sm.getString("groupChannel.unable.sendHeartbeat"), e);
}
}
heartbeatFuture = utilityExecutor.scheduleWithFixedDelay(new HeartbeatRunnable(), heartbeatSleeptime,
heartbeatSleeptime, TimeUnit.MILLISECONDS);
}
}
@Override
public synchronized void stop(int svc) throws ChannelException {
if (monitorFuture != null) {
monitorFuture.cancel(true);
monitorFuture = null;
}
if (heartbeatFuture != null) {
heartbeatFuture.cancel(true);
heartbeatFuture = null;
}
super.stop(svc);
if (ownExecutor) {
utilityExecutor.shutdown();
utilityExecutor = null;
ownExecutor = false;
}
if (oname != null) {
JmxRegistry.getRegistry(this).unregisterJmx(oname);
oname = null;
}
}
/**
* Returns the first interceptor of the stack. Useful for traversal.
*
* @return ChannelInterceptor
*/
public ChannelInterceptor getFirstInterceptor() {
if (interceptors != null) {
return interceptors;
} else {
return coordinator;
}
}
@Override
public ScheduledExecutorService getUtilityExecutor() {
return utilityExecutor;
}
@Override
public void setUtilityExecutor(ScheduledExecutorService utilityExecutor) {
this.utilityExecutor = utilityExecutor;
}
@Override
public ChannelReceiver getChannelReceiver() {
return coordinator.getClusterReceiver();
}
@Override
public ChannelSender getChannelSender() {
return coordinator.getClusterSender();
}
@Override
public MembershipService getMembershipService() {
return coordinator.getMembershipService();
}
@Override
public void setChannelReceiver(ChannelReceiver clusterReceiver) {
coordinator.setClusterReceiver(clusterReceiver);
}
@Override
public void setChannelSender(ChannelSender clusterSender) {
coordinator.setClusterSender(clusterSender);
}
@Override
public void setMembershipService(MembershipService membershipService) {
coordinator.setMembershipService(membershipService);
}
@Override
public void addMembershipListener(MembershipListener membershipListener) {
if (!this.membershipListeners.contains(membershipListener)) {
this.membershipListeners.add(membershipListener);
}
}
@Override
public void removeMembershipListener(MembershipListener membershipListener) {
membershipListeners.remove(membershipListener);
}
@Override
public void addChannelListener(ChannelListener channelListener) {
if (!this.channelListeners.contains(channelListener)) {
this.channelListeners.add(channelListener);
} else {
throw new IllegalArgumentException(sm.getString("groupChannel.listener.alreadyExist", channelListener,
channelListener.getClass().getName()));
}
}
@Override
public void removeChannelListener(ChannelListener channelListener) {
channelListeners.remove(channelListener);
}
@Override
public Iterator<ChannelInterceptor> getInterceptors() {
return new InterceptorIterator(this.getNext(), this.coordinator);
}
/**
* Enables/disables the option check<br>
* Setting this to true, will make the GroupChannel perform a conflict check on the interceptors. If two
* interceptors are using the same option flag and throw an error upon start.
*
* @param optionCheck boolean
*/
public void setOptionCheck(boolean optionCheck) {
this.optionCheck = optionCheck;
}
/**
* Configure local heartbeat sleep time<br>
* Only used when <code>getHeartbeat()==true</code>
*
* @param heartbeatSleeptime long - time in milliseconds to sleep between heartbeats
*/
public void setHeartbeatSleeptime(long heartbeatSleeptime) {
this.heartbeatSleeptime = heartbeatSleeptime;
}
/**
* Enables or disables local heartbeat. if <code>setHeartbeat(true)</code> is invoked then the channel will start an
* internal thread to invoke <code>Channel.heartbeat()</code> every <code>getHeartbeatSleeptime</code> milliseconds
*
* @param heartbeat boolean
*/
@Override
public void setHeartbeat(boolean heartbeat) {
this.heartbeat = heartbeat;
}
@Override
public boolean getOptionCheck() {
return optionCheck;
}
@Override
public boolean getHeartbeat() {
return heartbeat;
}
/**
* @return the sleep time in milliseconds that the internal heartbeat will sleep in between invocations of
* <code>Channel.heartbeat()</code>
*/
@Override
public long getHeartbeatSleeptime() {
return heartbeatSleeptime;
}
@Override
public String getName() {
return name;
}
@Override
public void setName(String name) {
this.name = name;
}
@Override
public boolean isJmxEnabled() {
return jmxEnabled;
}
@Override
public void setJmxEnabled(boolean jmxEnabled) {
this.jmxEnabled = jmxEnabled;
}
@Override
public String getJmxDomain() {
return jmxDomain;
}
@Override
public void setJmxDomain(String jmxDomain) {
this.jmxDomain = jmxDomain;
}
@Override
public String getJmxPrefix() {
return jmxPrefix;
}
@Override
public void setJmxPrefix(String jmxPrefix) {
this.jmxPrefix = jmxPrefix;
}
@Override
public ObjectName preRegister(MBeanServer server, ObjectName name) throws Exception {
// NOOP
return null;
}
@Override
public void postRegister(Boolean registrationDone) {
// NOOP
}
@Override
public void preDeregister() throws Exception {
// NOOP
}
@Override
public void postDeregister() {
JmxRegistry.removeRegistry(this, true);
}
/**
* An iterator to loop through the interceptors in a channel.
*/
public static class InterceptorIterator implements Iterator<ChannelInterceptor> {
private final ChannelInterceptor end;
private ChannelInterceptor start;
public InterceptorIterator(ChannelInterceptor start, ChannelInterceptor end) {
this.end = end;
this.start = start;
}
@Override
public boolean hasNext() {
return start != null && start != end;
}
@Override
public ChannelInterceptor next() {
ChannelInterceptor result = null;
if (hasNext()) {
result = start;
start = start.getNext();
}
return result;
}
@Override
public void remove() {
// empty operation
}
}
/**
* <p>
* Title: Internal heartbeat runnable
* </p>
* <p>
* Description: if <code>Channel.getHeartbeat()==true</code> then a thread of this class is created
* </p>
*/
public class HeartbeatRunnable implements Runnable {
@Override
public void run() {
heartbeat();
}
}
}