ttomcat-1778514358873.zip-extract/apache-tomcat-11.0.18-src/java/org/apache/catalina/valves/PersistentValve.java

Path
ttomcat-1778514358873.zip-extract/apache-tomcat-11.0.18-src/java/org/apache/catalina/valves/PersistentValve.java
Status
scanned
Type
file
Name
PersistentValve.java
Extension
.java
Programming language
Java
Mime type
text/plain
File type
ASCII text, with CRLF line terminators
Tag

      
    
Rootfs path

      
    
Size
18639 (18.2 KB)
MD5
1a15aad1da49b942f30a068611cc22fd
SHA1
7f641c5efa56009516ec60b5c5fbe5b2b8f9b0fa
SHA256
855668691305fcc97932a047cba5e9328327919d72aa195a1780ade4f2658499
SHA512

      
    
SHA1_git
8e3e39af66f4b5d7f1624ab62958d14dc250edec
Is binary

      
    
Is text
True
Is archive

      
    
Is media

      
    
Is legal

      
    
Is manifest

      
    
Is readme

      
    
Is top level

      
    
Is key file

      
    
PersistentValve.java | 18.2 KB |

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.catalina.valves; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; import jakarta.servlet.ServletException; import jakarta.servlet.http.HttpServletResponse; import org.apache.catalina.Container; import org.apache.catalina.Context; import org.apache.catalina.Engine; import org.apache.catalina.Host; import org.apache.catalina.Manager; import org.apache.catalina.Session; import org.apache.catalina.Store; import org.apache.catalina.StoreManager; import org.apache.catalina.connector.Request; import org.apache.catalina.connector.Response; /** * Valve that implements per-request session persistence. It is intended to be used with non-sticky load-balancers and a * PersistentManager. The Valve works by loading the session from the Store at the start of the request, the request * then updates the session as required and the Valve saves the session to the Store at the end of the request. * <p> * To avoid conflicts and/or errors when updating the session store, each session must only be accessed by no more than * one concurrent request. The {@code filter} field can be used to define requests (e.g. those for static resources) * that do not need access to the session and can Requests for resources that do not need to access the session and can * bypass the session load/save functionality provided by this Valve. * <p> * The Valve uses a per session {@code Semaphore} to ensure that each session is accessed by no more than one request at * a time within a single Tomcat instance. The behaviour if multiple requests try to access the session concurrently can * be controlled by the {@code semaphoreFairness}, {@code semaphoreBlockOnAcquire} and {@code * semaphoreAcquireUninterruptibly} fields. If a request fails to obtain the Semaphore, the response is generated by the * {@link #onSemaphoreNotAcquired(Request, Response)} method which, by default, returns a {@code 429} status code. * <p> * The per session Semaphores only provide limited protection against concurrent requests within a single Tomcat * instance. If multiple requests access the same session concurrently across different Tomcat instances, update * conflicts and/or session data loss and/or errors are very likely. * <p> * <b>USAGE CONSTRAINTS</b>: * <ul> * <li>This Valve must only be used with a PersistentManager</li> * <li>The client must ensure that no more than one concurrent request accesses a session at any time across all Tomcat * instances</li> * </ul> */ public class PersistentValve extends ValveBase { // Saves a couple of calls to getClassLoader() on every request. Under high // load these calls took just long enough to appear as a hot spot (although // a very minor one) in a profiler. private static final ClassLoader MY_CLASSLOADER = PersistentValve.class.getClassLoader(); private volatile boolean clBindRequired; protected Pattern filter = null; private final ConcurrentMap<String,UsageCountingSemaphore> sessionToSemaphoreMap = new ConcurrentHashMap<>(); private boolean semaphoreFairness = true; private boolean semaphoreBlockOnAcquire = true; private boolean semaphoreAcquireUninterruptibly = true; public PersistentValve() { super(true); } @Override public void setContainer(Container container) { super.setContainer(container); clBindRequired = container instanceof Engine || container instanceof Host; } @Override public void invoke(Request request, Response response) throws IOException, ServletException { // request without session if (isRequestWithoutSession(request.getDecodedRequestURI())) { if (containerLog.isTraceEnabled()) { containerLog.trace(sm.getString("persistentValve.requestIgnore", request.getDecodedRequestURI())); } getNext().invoke(request, response); return; } else if (containerLog.isTraceEnabled()) { containerLog.trace(sm.getString("persistentValve.requestProcess", request.getDecodedRequestURI())); } // Select the Context to be used for this Request Context context = request.getContext(); if (context == null) { response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, sm.getString("standardHost.noContext")); return; } String sessionId = request.getRequestedSessionId(); UsageCountingSemaphore semaphore = null; boolean mustReleaseSemaphore = true; try { // Acquire the per session semaphore if (sessionId != null) { semaphore = sessionToSemaphoreMap.compute(sessionId, (k, v) -> v == null ? new UsageCountingSemaphore(semaphoreFairness) : v.incrementUsageCount()); if (semaphoreBlockOnAcquire) { if (semaphoreAcquireUninterruptibly) { semaphore.acquireUninterruptibly(); } else { try { semaphore.acquire(); } catch (InterruptedException e) { mustReleaseSemaphore = false; onSemaphoreNotAcquired(request, response); if (containerLog.isDebugEnabled()) { containerLog.debug(sm.getString("persistentValve.acquireInterrupted", request.getDecodedRequestURI())); } return; } } } else { if (!semaphore.tryAcquire()) { onSemaphoreNotAcquired(request, response); if (containerLog.isDebugEnabled()) { containerLog.debug( sm.getString("persistentValve.acquireFailed", request.getDecodedRequestURI())); } return; } } } // Update the session last access time for our session (if any) Manager manager = context.getManager(); if (sessionId != null && manager instanceof StoreManager) { Store store = ((StoreManager) manager).getStore(); if (store != null) { Session session = null; try { session = store.load(sessionId); } catch (Exception e) { containerLog.error(sm.getString("persistentValve.sessionLoadFail", sessionId)); } if (session != null) { if (!session.isValid() || isSessionStale(session, System.currentTimeMillis())) { if (containerLog.isTraceEnabled()) { containerLog.trace("session swapped in is invalid or expired"); } session.expire(); store.remove(sessionId); } else { session.setManager(manager); // session.setId(sessionId); Only if new ??? manager.add(session); // ((StandardSession)session).activate(); session.access(); session.endAccess(); } } } } if (containerLog.isTraceEnabled()) { containerLog.trace("sessionId: " + sessionId); } // Ask the next valve to process the request. getNext().invoke(request, response); // If still processing async, don't try to store the session if (!request.isAsync()) { // Read the sessionid after the response. // HttpSession hsess = hreq.getSession(false); Session hsess; try { hsess = request.getSessionInternal(false); } catch (Exception e) { hsess = null; } String newsessionId = null; if (hsess != null) { newsessionId = hsess.getIdInternal(); } if (containerLog.isTraceEnabled()) { containerLog.trace("newsessionId: " + newsessionId); } if (newsessionId != null) { try { bind(context); /* store the session and remove it from the manager */ if (manager instanceof StoreManager) { Session session = manager.findSession(newsessionId); Store store = ((StoreManager) manager).getStore(); boolean stored = false; if (session != null) { if (store != null && session.isValid() && !isSessionStale(session, System.currentTimeMillis())) { store.save(session); ((StoreManager) manager).removeSuper(session); session.recycle(); stored = true; } } if (!stored) { if (containerLog.isTraceEnabled()) { containerLog.trace( "newsessionId store: " + store + " session: " + session + " valid: " + (session == null ? "N/A" : Boolean.toString(session.isValid())) + " stale: " + isSessionStale(session, System.currentTimeMillis())); } } } else { if (containerLog.isTraceEnabled()) { containerLog.trace("newsessionId Manager: " + manager); } } } finally { unbind(context); } } } } finally { if (semaphore != null) { if (mustReleaseSemaphore) { semaphore.release(); } sessionToSemaphoreMap.computeIfPresent(sessionId, (k, v) -> v.decrementAndGetUsageCount() == 0 ? null : v); } } } /** * Handle the case where a semaphore cannot be obtained. The default behaviour is to return a 429 (too many * requests) status code. * * @param request The request that will not be processed * @param response The response that will be used for this request * * @throws IOException If an I/O error occurs while working with the request or response */ protected void onSemaphoreNotAcquired(Request request, Response response) throws IOException { response.sendError(429); } /** * Indicate whether the session has been idle for longer than its expiration date as of the supplied time. * * @param session The session to check * @param timeNow The current time to check for * * @return <code>true</code> if the session is past its expiration */ protected boolean isSessionStale(Session session, long timeNow) { if (session != null) { int maxInactiveInterval = session.getMaxInactiveInterval(); if (maxInactiveInterval > 0) { int timeIdle = (int) (session.getIdleTimeInternal() / 1000L); return timeIdle >= maxInactiveInterval; } } return false; } private void bind(Context context) { if (clBindRequired) { context.bind(MY_CLASSLOADER); } } private void unbind(Context context) { if (clBindRequired) { context.unbind(MY_CLASSLOADER); } } protected boolean isRequestWithoutSession(String uri) { Pattern f = filter; return f != null && f.matcher(uri).matches(); } public String getFilter() { if (filter == null) { return null; } return filter.toString(); } public void setFilter(String filter) { if (filter == null || filter.isEmpty()) { this.filter = null; } else { try { this.filter = Pattern.compile(filter); } catch (PatternSyntaxException pse) { container.getLogger().error(sm.getString("persistentValve.filter.failure", filter), pse); } } } /** * If multiple threads attempt to acquire the same per session Semaphore, will permits be granted in the same order * they were requested? * * @return {@code true} if fairness is enabled, otherwise {@code false} */ public boolean isSemaphoreFairness() { return semaphoreFairness; } /** * Configure whether the per session Semaphores will handle granting of permits in the same order they were * requested if multiple threads attempt to acquire the same Semaphore. * * @param semaphoreFairness {@code true} if permits should be granted in the same order they are requested, * otherwise {@code false} */ public void setSemaphoreFairness(boolean semaphoreFairness) { this.semaphoreFairness = semaphoreFairness; } /** * If a thread attempts to acquire the per session Semaphore while it is being used by another request, should the * thread block to wait for the Semaphore or should the request be rejected? * * @return {@code true} if the thread should block, otherwise {@code false} to reject the concurrent request */ public boolean isSemaphoreBlockOnAcquire() { return semaphoreBlockOnAcquire; } /** * Configure whether a thread should block and wait for the per session Semaphore or reject the request if the * Semaphore is being used by another request. * * @param semaphoreBlockOnAcquire {@code true} to block, otherwise {@code false} */ public void setSemaphoreBlockOnAcquire(boolean semaphoreBlockOnAcquire) { this.semaphoreBlockOnAcquire = semaphoreBlockOnAcquire; } /** * If a thread is blocking to acquire a per session Semaphore, can that thread be interrupted? * * @return {@code true} if the thread can <b>not</b> be interrupted, otherwise {@code false}. */ public boolean isSemaphoreAcquireUninterruptibly() { return semaphoreAcquireUninterruptibly; } /** * Configure whether a thread blocking to acquire a per session Semaphore can be interrupted. * * @param semaphoreAcquireUninterruptibly {@code true} if the thread can <b>not</b> be interrupted, otherwise * {@code false}. */ public void setSemaphoreAcquireUninterruptibly(boolean semaphoreAcquireUninterruptibly) { this.semaphoreAcquireUninterruptibly = semaphoreAcquireUninterruptibly; } /* * The PersistentValve uses a per session semaphore to ensure that only one request accesses a session at a time. To * limit the size of the session ID to Semaphore map, the Semaphores are created when required and destroyed (made * eligible for GC) as soon as they are not required. Tracking usage in a thread-safe way requires a usage counter * that does not block. The Semaphore's internal tracking can't be used because the only way to increment usage is * via the acquire methods and they block. Therefore, this class was created which uses a separate AtomicLong long * to track usage. */ private static class UsageCountingSemaphore { private final AtomicLong usageCount = new AtomicLong(1); private final Semaphore semaphore; private UsageCountingSemaphore(boolean fairness) { semaphore = new Semaphore(1, fairness); } private UsageCountingSemaphore incrementUsageCount() { usageCount.incrementAndGet(); return this; } private long decrementAndGetUsageCount() { return usageCount.decrementAndGet(); } private void acquire() throws InterruptedException { semaphore.acquire(); } private void acquireUninterruptibly() { semaphore.acquireUninterruptibly(); } private boolean tryAcquire() { return semaphore.tryAcquire(); } private void release() { semaphore.release(); } } }
Detected license expression
apache-2.0
Detected license expression (SPDX)
Apache-2.0
Percentage of license text
7.22
Copyrights

      
    
Holders

      
    
Authors

      
    
License detections License expression License expression SPDX
apache_2_0-4bde3f57-78aa-4201-96bf-531cba09e7de apache-2.0 Apache-2.0
URL Start line End line
http://www.apache.org/licenses/LICENSE-2.0 9 9