MessageDispatchInterceptor.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.tribes.group.interceptors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.ErrorHandler;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.UniqueId;
import org.apache.catalina.tribes.group.ChannelInterceptorBase;
import org.apache.catalina.tribes.group.InterceptorPayload;
import org.apache.catalina.tribes.util.ExecutorFactory;
import org.apache.catalina.tribes.util.StringManager;
import org.apache.catalina.tribes.util.TcclThreadFactory;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
/**
* The message dispatcher is a way to enable asynchronous communication
* through a channel. The dispatcher will look for the
* <code>Channel.SEND_OPTIONS_ASYNCHRONOUS</code> flag to be set, if it is, it
* will queue the message for delivery and immediately return to the sender.
*/
public class MessageDispatchInterceptor extends ChannelInterceptorBase
implements MessageDispatchInterceptorMBean {
private static final Log log = LogFactory.getLog(MessageDispatchInterceptor.class);
protected static final StringManager sm =
StringManager.getManager(MessageDispatchInterceptor.class);
protected long maxQueueSize = 1024*1024*64; //64 MiB
protected volatile boolean run = false;
protected boolean useDeepClone = true;
protected boolean alwaysSend = true;
protected final AtomicLong currentSize = new AtomicLong(0);
protected ExecutorService executor = null;
protected int maxThreads = 10;
protected int maxSpareThreads = 2;
protected long keepAliveTime = 5000;
public MessageDispatchInterceptor() {
setOptionFlag(Channel.SEND_OPTIONS_ASYNCHRONOUS);
}
@Override
public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload)
throws ChannelException {
boolean async = (msg.getOptions() &
Channel.SEND_OPTIONS_ASYNCHRONOUS) == Channel.SEND_OPTIONS_ASYNCHRONOUS;
if (async && run) {
if ((getCurrentSize()+msg.getMessage().getLength()) > maxQueueSize) {
if (alwaysSend) {
super.sendMessage(destination,msg,payload);
return;
} else {
throw new ChannelException(sm.getString("messageDispatchInterceptor.queue.full",
Long.toString(maxQueueSize), Long.toString(getCurrentSize())));
}
}
//add to queue
if (useDeepClone) {
msg = (ChannelMessage)msg.deepclone();
}
if (!addToQueue(msg, destination, payload)) {
throw new ChannelException(
sm.getString("messageDispatchInterceptor.unableAdd.queue"));
}
addAndGetCurrentSize(msg.getMessage().getLength());
} else {
super.sendMessage(destination, msg, payload);
}
}
public boolean addToQueue(final ChannelMessage msg, final Member[] destination,
final InterceptorPayload payload) {
Runnable r = new Runnable() {
@Override
public void run() {
sendAsyncData(msg, destination, payload);
}
};
executor.execute(r);
return true;
}
public void startQueue() {
if (run) {
return;
}
String channelName = "";
if (getChannel().getName() != null) {
channelName = "[" + getChannel().getName() + "]";
}
executor = ExecutorFactory.newThreadPool(maxSpareThreads, maxThreads, keepAliveTime,
TimeUnit.MILLISECONDS,
new TcclThreadFactory("MessageDispatchInterceptor.MessageDispatchThread" + channelName));
run = true;
}
public void stopQueue() {
run = false;
executor.shutdownNow();
setAndGetCurrentSize(0);
}
@Override
public void setOptionFlag(int flag) {
if ( flag != Channel.SEND_OPTIONS_ASYNCHRONOUS ) {
log.warn(sm.getString("messageDispatchInterceptor.warning.optionflag"));
}
super.setOptionFlag(flag);
}
public void setMaxQueueSize(long maxQueueSize) {
this.maxQueueSize = maxQueueSize;
}
public void setUseDeepClone(boolean useDeepClone) {
this.useDeepClone = useDeepClone;
}
@Override
public long getMaxQueueSize() {
return maxQueueSize;
}
public boolean getUseDeepClone() {
return useDeepClone;
}
@Override
public long getCurrentSize() {
return currentSize.get();
}
public long addAndGetCurrentSize(long inc) {
return currentSize.addAndGet(inc);
}
public long setAndGetCurrentSize(long value) {
currentSize.set(value);
return value;
}
@Override
public long getKeepAliveTime() {
return keepAliveTime;
}
@Override
public int getMaxSpareThreads() {
return maxSpareThreads;
}
@Override
public int getMaxThreads() {
return maxThreads;
}
public void setKeepAliveTime(long keepAliveTime) {
this.keepAliveTime = keepAliveTime;
}
public void setMaxSpareThreads(int maxSpareThreads) {
this.maxSpareThreads = maxSpareThreads;
}
public void setMaxThreads(int maxThreads) {
this.maxThreads = maxThreads;
}
@Override
public boolean isAlwaysSend() {
return alwaysSend;
}
@Override
public void setAlwaysSend(boolean alwaysSend) {
this.alwaysSend = alwaysSend;
}
@Override
public void start(int svc) throws ChannelException {
//start the thread
if (!run ) {
synchronized (this) {
// only start with the sender
if ( !run && ((svc & Channel.SND_TX_SEQ)==Channel.SND_TX_SEQ) ) {
startQueue();
}
}
}
super.start(svc);
}
@Override
public void stop(int svc) throws ChannelException {
//stop the thread
if (run) {
synchronized (this) {
if ( run && ((svc & Channel.SND_TX_SEQ)==Channel.SND_TX_SEQ)) {
stopQueue();
}
}
}
super.stop(svc);
}
protected void sendAsyncData(ChannelMessage msg, Member[] destination,
InterceptorPayload payload) {
ErrorHandler handler = null;
if (payload != null) {
handler = payload.getErrorHandler();
}
try {
super.sendMessage(destination, msg, null);
try {
if (handler != null) {
handler.handleCompletion(new UniqueId(msg.getUniqueId()));
}
} catch ( Exception ex ) {
log.error(sm.getString("messageDispatchInterceptor.completeMessage.failed"),ex);
}
} catch ( Exception x ) {
ChannelException cx = null;
if (x instanceof ChannelException) {
cx = (ChannelException) x;
} else {
cx = new ChannelException(x);
}
if (log.isDebugEnabled()) {
log.debug(sm.getString("messageDispatchInterceptor.AsyncMessage.failed"),x);
}
try {
if (handler != null) {
handler.handleError(cx, new UniqueId(msg.getUniqueId()));
}
} catch ( Exception ex ) {
log.error(sm.getString("messageDispatchInterceptor.errorMessage.failed"),ex);
}
} finally {
addAndGetCurrentSize(-msg.getMessage().getLength());
}
}
// ---------------------------------------------- stats of the thread pool
/**
* Return the current number of threads that are managed by the pool.
* @return the current number of threads that are managed by the pool
*/
@Override
public int getPoolSize() {
if (executor instanceof ThreadPoolExecutor) {
return ((ThreadPoolExecutor) executor).getPoolSize();
} else {
return -1;
}
}
/**
* Return the current number of threads that are in use.
* @return the current number of threads that are in use
*/
@Override
public int getActiveCount() {
if (executor instanceof ThreadPoolExecutor) {
return ((ThreadPoolExecutor) executor).getActiveCount();
} else {
return -1;
}
}
/**
* Return the total number of tasks that have ever been scheduled for execution by the pool.
* @return the total number of tasks that have ever been scheduled for execution by the pool
*/
@Override
public long getTaskCount() {
if (executor instanceof ThreadPoolExecutor) {
return ((ThreadPoolExecutor) executor).getTaskCount();
} else {
return -1;
}
}
/**
* Return the total number of tasks that have completed execution by the pool.
* @return the total number of tasks that have completed execution by the pool
*/
@Override
public long getCompletedTaskCount() {
if (executor instanceof ThreadPoolExecutor) {
return ((ThreadPoolExecutor) executor).getCompletedTaskCount();
} else {
return -1;
}
}
}