ReplicationValve.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.ha.tcp;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import javax.servlet.ServletException;
import org.apache.catalina.Cluster;
import org.apache.catalina.Context;
import org.apache.catalina.LifecycleException;
import org.apache.catalina.Manager;
import org.apache.catalina.Session;
import org.apache.catalina.connector.Request;
import org.apache.catalina.connector.Response;
import org.apache.catalina.core.StandardContext;
import org.apache.catalina.ha.CatalinaCluster;
import org.apache.catalina.ha.ClusterManager;
import org.apache.catalina.ha.ClusterMessage;
import org.apache.catalina.ha.ClusterSession;
import org.apache.catalina.ha.ClusterValve;
import org.apache.catalina.ha.session.DeltaManager;
import org.apache.catalina.ha.session.DeltaSession;
import org.apache.catalina.valves.ValveBase;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.res.StringManager;
/**
* <p>
* Implementation of a Valve that logs interesting contents from the specified Request (before processing) and the
* corresponding Response (after processing). It is especially useful in debugging problems related to headers and
* cookies.
* </p>
* <p>
* This Valve may be attached to any Container, depending on the granularity of the logging you wish to perform.
* </p>
* <p>
* primaryIndicator=true, then the request attribute <i>org.apache.catalina.ha.tcp.isPrimarySession.</i> is set true,
* when request processing is at sessions primary node.
* </p>
*
* @author Craig R. McClanahan
* @author Peter Rossbach
*/
public class ReplicationValve extends ValveBase implements ClusterValve {
private static final Log log = LogFactory.getLog(ReplicationValve.class);
// ----------------------------------------------------- Instance Variables
/**
* The StringManager for this package.
*/
protected static final StringManager sm = StringManager.getManager(Constants.Package);
private CatalinaCluster cluster = null;
/**
* Filter expression
*/
protected Pattern filter = null;
/**
* crossContext session container
*/
protected final ThreadLocal<ArrayList<DeltaSession>> crossContextSessions = new ThreadLocal<>();
/**
* doProcessingStats (default = off)
*/
protected boolean doProcessingStats = false;
protected LongAdder totalRequestTime = new LongAdder();
protected LongAdder totalSendTime = new LongAdder();
protected LongAdder nrOfRequests = new LongAdder();
protected AtomicLong lastSendTime = new AtomicLong();
protected LongAdder nrOfFilterRequests = new LongAdder();
protected LongAdder nrOfSendRequests = new LongAdder();
protected LongAdder nrOfCrossContextSendRequests = new LongAdder();
/**
* must primary change indicator set
*/
protected boolean primaryIndicator = false;
/**
* Name of primary change indicator as request attribute
*/
protected String primaryIndicatorName = "org.apache.catalina.ha.tcp.isPrimarySession";
// ------------------------------------------------------------- Properties
public ReplicationValve() {
super(true);
}
/**
* @return the cluster.
*/
@Override
public CatalinaCluster getCluster() {
return cluster;
}
/**
* @param cluster The cluster to set.
*/
@Override
public void setCluster(CatalinaCluster cluster) {
this.cluster = cluster;
}
/**
* @return the filter
*/
public String getFilter() {
if (filter == null) {
return null;
}
return filter.toString();
}
/**
* compile filter string to regular expression
*
* @see Pattern#compile(String)
*
* @param filter The filter to set.
*/
public void setFilter(String filter) {
if (log.isTraceEnabled()) {
log.trace(sm.getString("ReplicationValve.filter.loading", filter));
}
if (filter == null || filter.length() == 0) {
this.filter = null;
} else {
try {
this.filter = Pattern.compile(filter);
} catch (PatternSyntaxException pse) {
log.error(sm.getString("ReplicationValve.filter.failure", filter), pse);
}
}
}
/**
* @return the primaryIndicator.
*/
public boolean isPrimaryIndicator() {
return primaryIndicator;
}
/**
* @param primaryIndicator The primaryIndicator to set.
*/
public void setPrimaryIndicator(boolean primaryIndicator) {
this.primaryIndicator = primaryIndicator;
}
/**
* @return the primaryIndicatorName.
*/
public String getPrimaryIndicatorName() {
return primaryIndicatorName;
}
/**
* @param primaryIndicatorName The primaryIndicatorName to set.
*/
public void setPrimaryIndicatorName(String primaryIndicatorName) {
this.primaryIndicatorName = primaryIndicatorName;
}
/**
* Calc processing stats
*
* @return <code>true</code> if statistics are enabled
*/
public boolean doStatistics() {
return doProcessingStats;
}
/**
* Set Calc processing stats
*
* @param doProcessingStats New flag value
*
* @see #resetStatistics()
*/
public void setStatistics(boolean doProcessingStats) {
this.doProcessingStats = doProcessingStats;
}
/**
* @return the lastSendTime.
*/
public long getLastSendTime() {
return lastSendTime.longValue();
}
/**
* @return the nrOfRequests.
*/
public long getNrOfRequests() {
return nrOfRequests.longValue();
}
/**
* @return the nrOfFilterRequests.
*/
public long getNrOfFilterRequests() {
return nrOfFilterRequests.longValue();
}
/**
* @return the nrOfCrossContextSendRequests.
*/
public long getNrOfCrossContextSendRequests() {
return nrOfCrossContextSendRequests.longValue();
}
/**
* @return the nrOfSendRequests.
*/
public long getNrOfSendRequests() {
return nrOfSendRequests.longValue();
}
/**
* @return the totalRequestTime.
*/
public long getTotalRequestTime() {
return totalRequestTime.longValue();
}
/**
* @return the totalSendTime.
*/
public long getTotalSendTime() {
return totalSendTime.longValue();
}
// --------------------------------------------------------- Public Methods
/**
* Register all cross context sessions inside endAccess. Use a list with contains check, that the Portlet API can
* include a lot of fragments from same or different applications with session changes.
*
* @param session cross context session
*/
public void registerReplicationSession(DeltaSession session) {
List<DeltaSession> sessions = crossContextSessions.get();
if (sessions != null) {
if (!sessions.contains(session)) {
if (log.isTraceEnabled()) {
log.trace(sm.getString("ReplicationValve.crossContext.registerSession", session.getIdInternal(),
session.getManager().getContext().getName()));
}
sessions.add(session);
}
}
}
@Override
public void invoke(Request request, Response response) throws IOException, ServletException {
long totalstart = 0;
// this happens before the request
if (doStatistics()) {
totalstart = System.currentTimeMillis();
}
if (primaryIndicator) {
createPrimaryIndicator(request);
}
Context context = request.getContext();
boolean isCrossContext = context != null && context instanceof StandardContext && context.getCrossContext();
boolean isAsync = request.getAsyncContextInternal() != null;
try {
if (isCrossContext) {
if (log.isTraceEnabled()) {
log.trace(sm.getString("ReplicationValve.crossContext.add"));
}
crossContextSessions.set(new ArrayList<>());
}
getNext().invoke(request, response);
if (context != null && cluster != null && context.getManager() instanceof ClusterManager) {
ClusterManager clusterManager = (ClusterManager) context.getManager();
// valve cluster can access manager - other cluster handle replication
// at host level - hopefully!
if (cluster.getManager(clusterManager.getName()) == null) {
return;
}
if (cluster.hasMembers()) {
sendReplicationMessage(request, totalstart, isCrossContext, isAsync, clusterManager);
} else {
resetReplicationRequest(request, isCrossContext);
}
}
} finally {
// Array must be remove: Current master request send endAccess at recycle.
// Don't register this request session again!
if (isCrossContext) {
if (log.isTraceEnabled()) {
log.trace(sm.getString("ReplicationValve.crossContext.remove"));
}
crossContextSessions.remove();
}
}
}
/**
* reset the active statistics
*/
public void resetStatistics() {
totalRequestTime.reset();
totalSendTime.reset();
lastSendTime.set(0);
nrOfFilterRequests.reset();
nrOfRequests.reset();
nrOfSendRequests.reset();
nrOfCrossContextSendRequests.reset();
}
@Override
protected void startInternal() throws LifecycleException {
if (cluster == null) {
Cluster containerCluster = getContainer().getCluster();
if (containerCluster instanceof CatalinaCluster) {
setCluster((CatalinaCluster) containerCluster);
} else {
if (log.isWarnEnabled()) {
log.warn(sm.getString("ReplicationValve.nocluster"));
}
}
}
super.startInternal();
}
// --------------------------------------------------------- Protected Methods
protected void sendReplicationMessage(Request request, long totalstart, boolean isCrossContext, boolean isAsync,
ClusterManager clusterManager) {
// this happens after the request
long start = 0;
if (doStatistics()) {
start = System.currentTimeMillis();
}
try {
// send invalid sessions
sendInvalidSessions(clusterManager);
// send replication
sendSessionReplicationMessage(request, clusterManager);
if (isCrossContext) {
sendCrossContextSession();
}
} catch (Exception x) {
// FIXME we have a lot of sends, but the trouble with one node stops the correct replication to other nodes!
log.error(sm.getString("ReplicationValve.send.failure"), x);
} finally {
if (doStatistics()) {
updateStats(totalstart, start, isAsync);
}
}
}
/**
* Send all changed cross context sessions to backups
*/
protected void sendCrossContextSession() {
List<DeltaSession> sessions = crossContextSessions.get();
if (sessions != null && sessions.size() > 0) {
for (DeltaSession session : sessions) {
if (log.isTraceEnabled()) {
log.trace(sm.getString("ReplicationValve.crossContext.sendDelta",
session.getManager().getContext().getName()));
}
sendMessage(session, (ClusterManager) session.getManager());
if (doStatistics()) {
nrOfCrossContextSendRequests.increment();
}
}
}
}
/**
* Fix memory leak for long sessions with many changes, when no backup member exists!
*
* @param request current request after response is generated
* @param isCrossContext check crosscontext threadlocal
*/
protected void resetReplicationRequest(Request request, boolean isCrossContext) {
Session contextSession = request.getSessionInternal(false);
if (contextSession instanceof DeltaSession) {
resetDeltaRequest(contextSession);
((DeltaSession) contextSession).setPrimarySession(true);
}
if (isCrossContext) {
List<DeltaSession> sessions = crossContextSessions.get();
if (sessions != null && sessions.size() > 0) {
Iterator<DeltaSession> iter = sessions.iterator();
for (; iter.hasNext();) {
Session session = iter.next();
resetDeltaRequest(session);
if (session instanceof DeltaSession) {
((DeltaSession) contextSession).setPrimarySession(true);
}
}
}
}
}
/**
* Reset DeltaRequest from session
*
* @param session HttpSession from current request or cross context session
*/
protected void resetDeltaRequest(Session session) {
if (log.isTraceEnabled()) {
log.trace(sm.getString("ReplicationValve.resetDeltaRequest", session.getManager().getContext().getName()));
}
((DeltaSession) session).resetDeltaRequest();
}
/**
* Send Cluster Replication Request
*
* @param request current request
* @param manager session manager
*/
protected void sendSessionReplicationMessage(Request request, ClusterManager manager) {
Session session = request.getSessionInternal(false);
if (session != null) {
String uri = request.getDecodedRequestURI();
// request without session change
if (!isRequestWithoutSessionChange(uri)) {
if (log.isDebugEnabled()) {
log.debug(sm.getString("ReplicationValve.invoke.uri", uri));
}
sendMessage(session, manager);
} else if (doStatistics()) {
nrOfFilterRequests.increment();
}
}
}
/**
* Send message delta message from request session
*
* @param session current session
* @param manager session manager
*/
protected void sendMessage(Session session, ClusterManager manager) {
String id = session.getIdInternal();
if (id != null) {
send(manager, id);
}
}
/**
* send manager requestCompleted message to cluster
*
* @param manager SessionManager
* @param sessionId sessionid from the manager
*
* @see DeltaManager#requestCompleted(String)
* @see SimpleTcpCluster#send(ClusterMessage)
*/
protected void send(ClusterManager manager, String sessionId) {
ClusterMessage msg = manager.requestCompleted(sessionId);
if (msg != null && cluster != null) {
cluster.send(msg);
if (doStatistics()) {
nrOfSendRequests.increment();
}
}
}
/**
* check for session invalidations
*
* @param manager Associated manager
*/
protected void sendInvalidSessions(ClusterManager manager) {
String[] invalidIds = manager.getInvalidatedSessions();
if (invalidIds.length > 0) {
for (String invalidId : invalidIds) {
try {
send(manager, invalidId);
} catch (Exception x) {
log.error(sm.getString("ReplicationValve.send.invalid.failure", invalidId), x);
}
}
}
}
/**
* is request without possible session change
*
* @param uri The request uri
*
* @return True if no session change
*/
protected boolean isRequestWithoutSessionChange(String uri) {
Pattern f = filter;
return f != null && f.matcher(uri).matches();
}
/**
* Protocol cluster replications stats
*
* @param requestTime Request time
* @param clusterTime Cluster time
* @param isAsync if the request was in async mode
*/
protected void updateStats(long requestTime, long clusterTime, boolean isAsync) {
long currentTime = System.currentTimeMillis();
lastSendTime.set(currentTime);
totalSendTime.add(currentTime - clusterTime);
totalRequestTime.add(currentTime - requestTime);
if (!isAsync) {
nrOfRequests.increment();
if (log.isDebugEnabled()) {
if ((nrOfRequests.longValue() % 100) == 0) {
log.debug(sm.getString("ReplicationValve.stats",
new Object[] { Long.valueOf(totalRequestTime.longValue() / nrOfRequests.longValue()),
Long.valueOf(totalSendTime.longValue() / nrOfRequests.longValue()),
Long.valueOf(nrOfRequests.longValue()), Long.valueOf(nrOfSendRequests.longValue()),
Long.valueOf(nrOfCrossContextSendRequests.longValue()),
Long.valueOf(nrOfFilterRequests.longValue()),
Long.valueOf(totalRequestTime.longValue()),
Long.valueOf(totalSendTime.longValue()) }));
}
}
}
}
/**
* Mark Request that processed at primary node with attribute primaryIndicatorName
*
* @param request The Servlet request
*
* @throws IOException IO error finding session
*/
protected void createPrimaryIndicator(Request request) throws IOException {
String id = request.getRequestedSessionId();
if ((id != null) && (id.length() > 0)) {
Manager manager = request.getContext().getManager();
Session session = manager.findSession(id);
if (session instanceof ClusterSession) {
ClusterSession cses = (ClusterSession) session;
if (log.isDebugEnabled()) {
log.debug(sm.getString("ReplicationValve.session.indicator", request.getContext().getName(), id,
primaryIndicatorName, Boolean.valueOf(cses.isPrimarySession())));
}
request.setAttribute(primaryIndicatorName, cses.isPrimarySession() ? Boolean.TRUE : Boolean.FALSE);
} else {
if (log.isDebugEnabled()) {
if (session != null) {
log.debug(sm.getString("ReplicationValve.session.found", request.getContext().getName(), id));
} else {
log.debug(sm.getString("ReplicationValve.session.invalid", request.getContext().getName(), id));
}
}
}
}
}
}