ttomcat-1778514358873.zip-extract/apache-tomcat-11.0.18-src/java/org/apache/catalina/ha/tcp/ReplicationValve.java

Path
ttomcat-1778514358873.zip-extract/apache-tomcat-11.0.18-src/java/org/apache/catalina/ha/tcp/ReplicationValve.java
Status
scanned
Type
file
Name
ReplicationValve.java
Extension
.java
Programming language
Java
Mime type
text/plain
File type
ASCII text, with CRLF line terminators
Tag

      
    
Rootfs path

      
    
Size
20220 (19.7 KB)
MD5
5c220d93732431bee9f6635f15e5614a
SHA1
63a99022218205a0955fb6cff2dea8aae529d0b7
SHA256
af387a7af96cba0f9a9d76b74b22b558319532c05a53134c356d729c81d56948
SHA512

      
    
SHA1_git
afd2e5498f17f830a62da3776dcfb9d3f5b092f9
Is binary

      
    
Is text
True
Is archive

      
    
Is media

      
    
Is legal

      
    
Is manifest

      
    
Is readme

      
    
Is top level

      
    
Is key file

      
    
ReplicationValve.java | 19.7 KB |

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.catalina.ha.tcp; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; import jakarta.servlet.ServletException; import org.apache.catalina.Cluster; import org.apache.catalina.Context; import org.apache.catalina.LifecycleException; import org.apache.catalina.Manager; import org.apache.catalina.Session; import org.apache.catalina.connector.Request; import org.apache.catalina.connector.Response; import org.apache.catalina.core.StandardContext; import org.apache.catalina.ha.CatalinaCluster; import org.apache.catalina.ha.ClusterManager; import org.apache.catalina.ha.ClusterMessage; import org.apache.catalina.ha.ClusterSession; import org.apache.catalina.ha.ClusterValve; import org.apache.catalina.ha.session.DeltaManager; import org.apache.catalina.ha.session.DeltaSession; import org.apache.catalina.valves.ValveBase; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; import org.apache.tomcat.util.res.StringManager; /** * Implementation of a Valve that logs interesting contents from the specified Request (before processing) and the * corresponding Response (after processing). It is especially useful in debugging problems related to headers and * cookies. * <p> * This Valve may be attached to any Container, depending on the granularity of the logging you wish to perform. * <p> * primaryIndicator=true, then the request attribute <i>org.apache.catalina.ha.tcp.isPrimarySession.</i> is set true, * when request processing is at sessions primary node. */ public class ReplicationValve extends ValveBase implements ClusterValve { private static final Log log = LogFactory.getLog(ReplicationValve.class); // ----------------------------------------------------- Instance Variables /** * The StringManager for this package. */ protected static final StringManager sm = StringManager.getManager(Constants.Package); private CatalinaCluster cluster = null; /** * Filter expression */ protected Pattern filter = null; /** * crossContext session container */ protected final ThreadLocal<ArrayList<DeltaSession>> crossContextSessions = new ThreadLocal<>(); /** * doProcessingStats (default = off) */ protected boolean doProcessingStats = false; protected LongAdder totalRequestTime = new LongAdder(); protected LongAdder totalSendTime = new LongAdder(); protected LongAdder nrOfRequests = new LongAdder(); protected AtomicLong lastSendTime = new AtomicLong(); protected LongAdder nrOfFilterRequests = new LongAdder(); protected LongAdder nrOfSendRequests = new LongAdder(); protected LongAdder nrOfCrossContextSendRequests = new LongAdder(); /** * Must set primary change indicator. */ protected boolean primaryIndicator = false; /** * Name of primary change indicator as request attribute */ protected String primaryIndicatorName = "org.apache.catalina.ha.tcp.isPrimarySession"; // ------------------------------------------------------------- Properties public ReplicationValve() { super(true); } /** * @return the cluster. */ @Override public CatalinaCluster getCluster() { return cluster; } /** * @param cluster The cluster to set. */ @Override public void setCluster(CatalinaCluster cluster) { this.cluster = cluster; } /** * @return the filter */ public String getFilter() { if (filter == null) { return null; } return filter.toString(); } /** * compile filter string to regular expression * * @see Pattern#compile(String) * * @param filter The filter to set. */ public void setFilter(String filter) { if (log.isTraceEnabled()) { log.trace(sm.getString("ReplicationValve.filter.loading", filter)); } if (filter == null || filter.isEmpty()) { this.filter = null; } else { try { this.filter = Pattern.compile(filter); } catch (PatternSyntaxException pse) { log.error(sm.getString("ReplicationValve.filter.failure", filter), pse); } } } /** * @return the primaryIndicator. */ public boolean isPrimaryIndicator() { return primaryIndicator; } /** * @param primaryIndicator The primaryIndicator to set. */ public void setPrimaryIndicator(boolean primaryIndicator) { this.primaryIndicator = primaryIndicator; } /** * @return the primaryIndicatorName. */ public String getPrimaryIndicatorName() { return primaryIndicatorName; } /** * @param primaryIndicatorName The primaryIndicatorName to set. */ public void setPrimaryIndicatorName(String primaryIndicatorName) { this.primaryIndicatorName = primaryIndicatorName; } /** * Calc processing stats * * @return <code>true</code> if statistics are enabled */ public boolean doStatistics() { return doProcessingStats; } /** * Set Calc processing stats * * @param doProcessingStats New flag value * * @see #resetStatistics() */ public void setStatistics(boolean doProcessingStats) { this.doProcessingStats = doProcessingStats; } /** * @return the lastSendTime. */ public long getLastSendTime() { return lastSendTime.longValue(); } /** * @return the nrOfRequests. */ public long getNrOfRequests() { return nrOfRequests.longValue(); } /** * @return the nrOfFilterRequests. */ public long getNrOfFilterRequests() { return nrOfFilterRequests.longValue(); } /** * @return the nrOfCrossContextSendRequests. */ public long getNrOfCrossContextSendRequests() { return nrOfCrossContextSendRequests.longValue(); } /** * @return the nrOfSendRequests. */ public long getNrOfSendRequests() { return nrOfSendRequests.longValue(); } /** * @return the totalRequestTime. */ public long getTotalRequestTime() { return totalRequestTime.longValue(); } /** * @return the totalSendTime. */ public long getTotalSendTime() { return totalSendTime.longValue(); } // --------------------------------------------------------- Public Methods /** * Register all cross context sessions inside endAccess. Use a list with contains check, that the Portlet API can * include a lot of fragments from same or different applications with session changes. * * @param session cross context session */ public void registerReplicationSession(DeltaSession session) { List<DeltaSession> sessions = crossContextSessions.get(); if (sessions != null) { if (!sessions.contains(session)) { if (log.isTraceEnabled()) { log.trace(sm.getString("ReplicationValve.crossContext.registerSession", session.getIdInternal(), session.getManager().getContext().getName())); } sessions.add(session); } } } @Override public void invoke(Request request, Response response) throws IOException, ServletException { long totalstart = 0; // this happens before the request if (doStatistics()) { totalstart = System.currentTimeMillis(); } if (primaryIndicator) { createPrimaryIndicator(request); } Context context = request.getContext(); boolean isCrossContext = context instanceof StandardContext && context.getCrossContext(); boolean isAsync = request.getAsyncContextInternal() != null; try { if (isCrossContext) { if (log.isTraceEnabled()) { log.trace(sm.getString("ReplicationValve.crossContext.add")); } crossContextSessions.set(new ArrayList<>()); } getNext().invoke(request, response); if (context != null && cluster != null && context.getManager() instanceof ClusterManager clusterManager) { // valve cluster can access manager - other cluster handle replication // at host level - hopefully! if (cluster.getManager(clusterManager.getName()) == null) { return; } if (cluster.hasMembers()) { sendReplicationMessage(request, totalstart, isCrossContext, isAsync, clusterManager); } else { resetReplicationRequest(request, isCrossContext); } } } finally { // Array must be removed: Current master request send endAccess at recycle. // Don't register this request session again! if (isCrossContext) { if (log.isTraceEnabled()) { log.trace(sm.getString("ReplicationValve.crossContext.remove")); } crossContextSessions.remove(); } } } /** * reset the active statistics */ public void resetStatistics() { totalRequestTime.reset(); totalSendTime.reset(); lastSendTime.set(0); nrOfFilterRequests.reset(); nrOfRequests.reset(); nrOfSendRequests.reset(); nrOfCrossContextSendRequests.reset(); } @Override protected void startInternal() throws LifecycleException { if (cluster == null) { Cluster containerCluster = getContainer().getCluster(); if (containerCluster instanceof CatalinaCluster) { setCluster((CatalinaCluster) containerCluster); } else { if (log.isWarnEnabled()) { log.warn(sm.getString("ReplicationValve.nocluster")); } } } super.startInternal(); } // --------------------------------------------------------- Protected Methods protected void sendReplicationMessage(Request request, long totalstart, boolean isCrossContext, boolean isAsync, ClusterManager clusterManager) { // this happens after the request long start = 0; if (doStatistics()) { start = System.currentTimeMillis(); } try { // send invalid sessions sendInvalidSessions(clusterManager); // send replication sendSessionReplicationMessage(request, clusterManager); if (isCrossContext) { sendCrossContextSession(); } } catch (Exception e) { // FIXME we have a lot of sends, but the trouble with one node stops the correct replication to other nodes! log.error(sm.getString("ReplicationValve.send.failure"), e); } finally { if (doStatistics()) { updateStats(totalstart, start, isAsync); } } } /** * Send all changed cross context sessions to backups */ protected void sendCrossContextSession() { List<DeltaSession> sessions = crossContextSessions.get(); if (sessions != null && !sessions.isEmpty()) { for (DeltaSession session : sessions) { if (log.isTraceEnabled()) { log.trace(sm.getString("ReplicationValve.crossContext.sendDelta", session.getManager().getContext().getName())); } sendMessage(session, (ClusterManager) session.getManager()); if (doStatistics()) { nrOfCrossContextSendRequests.increment(); } } } } /** * Fix memory leak for long sessions with many changes, when no backup member exists! * * @param request current request after response is generated * @param isCrossContext check crosscontext threadlocal */ protected void resetReplicationRequest(Request request, boolean isCrossContext) { Session contextSession = request.getSessionInternal(false); if (contextSession instanceof DeltaSession) { resetDeltaRequest(contextSession); ((DeltaSession) contextSession).setPrimarySession(true); } if (isCrossContext) { List<DeltaSession> sessions = crossContextSessions.get(); if (sessions != null) { for (DeltaSession session : sessions) { resetDeltaRequest(session); session.setPrimarySession(true); } } } } /** * Reset DeltaRequest from session * * @param session HttpSession from current request or cross context session */ protected void resetDeltaRequest(Session session) { if (log.isTraceEnabled()) { log.trace(sm.getString("ReplicationValve.resetDeltaRequest", session.getManager().getContext().getName())); } ((DeltaSession) session).resetDeltaRequest(); } /** * Send Cluster Replication Request * * @param request current request * @param manager session manager */ protected void sendSessionReplicationMessage(Request request, ClusterManager manager) { Session session = request.getSessionInternal(false); if (session != null) { String uri = request.getDecodedRequestURI(); // request without session change if (!isRequestWithoutSessionChange(uri)) { if (log.isDebugEnabled()) { log.debug(sm.getString("ReplicationValve.invoke.uri", uri)); } sendMessage(session, manager); } else if (doStatistics()) { nrOfFilterRequests.increment(); } } } /** * Send message delta message from request session * * @param session current session * @param manager session manager */ protected void sendMessage(Session session, ClusterManager manager) { String id = session.getIdInternal(); if (id != null) { send(manager, id); } } /** * send manager requestCompleted message to cluster * * @param manager SessionManager * @param sessionId sessionid from the manager * * @see DeltaManager#requestCompleted(String) * @see SimpleTcpCluster#send(ClusterMessage) */ protected void send(ClusterManager manager, String sessionId) { ClusterMessage msg = manager.requestCompleted(sessionId); if (msg != null && cluster != null) { cluster.send(msg); if (doStatistics()) { nrOfSendRequests.increment(); } } } /** * check for session invalidations * * @param manager Associated manager */ protected void sendInvalidSessions(ClusterManager manager) { String[] invalidIds = manager.getInvalidatedSessions(); for (String invalidId : invalidIds) { try { send(manager, invalidId); } catch (Exception e) { log.error(sm.getString("ReplicationValve.send.invalid.failure", invalidId), e); } } } /** * is request without possible session change * * @param uri The request uri * * @return True if no session change */ protected boolean isRequestWithoutSessionChange(String uri) { Pattern f = filter; return f != null && f.matcher(uri).matches(); } /** * Protocol cluster replications stats * * @param requestTime Request time * @param clusterTime Cluster time * @param isAsync if the request was in async mode */ protected void updateStats(long requestTime, long clusterTime, boolean isAsync) { long currentTime = System.currentTimeMillis(); lastSendTime.set(currentTime); totalSendTime.add(currentTime - clusterTime); totalRequestTime.add(currentTime - requestTime); if (!isAsync) { nrOfRequests.increment(); if (log.isDebugEnabled()) { if ((nrOfRequests.longValue() % 100) == 0) { log.debug(sm.getString("ReplicationValve.stats", Long.valueOf(totalRequestTime.longValue() / nrOfRequests.longValue()), Long.valueOf(totalSendTime.longValue() / nrOfRequests.longValue()), Long.valueOf(nrOfRequests.longValue()), Long.valueOf(nrOfSendRequests.longValue()), Long.valueOf(nrOfCrossContextSendRequests.longValue()), Long.valueOf(nrOfFilterRequests.longValue()), Long.valueOf(totalRequestTime.longValue()), Long.valueOf(totalSendTime.longValue()))); } } } } /** * Mark Request that processed at primary node with attribute primaryIndicatorName * * @param request The Servlet request * * @throws IOException IO error finding session */ protected void createPrimaryIndicator(Request request) throws IOException { String id = request.getRequestedSessionId(); if ((id != null) && (!id.isEmpty())) { Manager manager = request.getContext().getManager(); Session session = manager.findSession(id); if (session instanceof ClusterSession cses) { if (log.isDebugEnabled()) { log.debug(sm.getString("ReplicationValve.session.indicator", request.getContext().getName(), id, primaryIndicatorName, Boolean.valueOf(cses.isPrimarySession()))); } request.setAttribute(primaryIndicatorName, cses.isPrimarySession() ? Boolean.TRUE : Boolean.FALSE); } else { if (log.isDebugEnabled()) { if (session != null) { log.debug(sm.getString("ReplicationValve.session.found", request.getContext().getName(), id)); } else { log.debug(sm.getString("ReplicationValve.session.invalid", request.getContext().getName(), id)); } } } } } }
Detected license expression
apache-2.0
Detected license expression (SPDX)
Apache-2.0
Percentage of license text
7.32
Copyrights

      
    
Holders

      
    
Authors

      
    
License detections License expression License expression SPDX
apache_2_0-4bde3f57-78aa-4201-96bf-531cba09e7de apache-2.0 Apache-2.0
URL Start line End line
http://www.apache.org/licenses/LICENSE-2.0 9 9