ttomcat-1778514358873.zip-extract/apache-tomcat-11.0.18-src/java/org/apache/catalina/valves/StuckThreadDetectionValve.java

Path
ttomcat-1778514358873.zip-extract/apache-tomcat-11.0.18-src/java/org/apache/catalina/valves/StuckThreadDetectionValve.java
Status
scanned
Type
file
Name
StuckThreadDetectionValve.java
Extension
.java
Programming language
Java
Mime type
text/plain
File type
ASCII text, with CRLF line terminators
Tag

      
    
Rootfs path

      
    
Size
15446 (15.1 KB)
MD5
d56bf6b24540788f8d441260b745e034
SHA1
ed4f75855c1477b6a998c0794cfaf71cfcc459bc
SHA256
7dd7c05da9434d6a87afd47e2964b1a37ff93982b0b6a8e8bb6e3d04c6592895
SHA512

      
    
SHA1_git
727e4f39741dcf99f49dd821d8a79c758b029234
Is binary

      
    
Is text
True
Is archive

      
    
Is media

      
    
Is legal

      
    
Is manifest

      
    
Is readme

      
    
Is top level

      
    
Is key file

      
    
StuckThreadDetectionValve.java | 15.1 KB |

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.catalina.valves; import java.io.IOException; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import jakarta.servlet.ServletException; import org.apache.catalina.LifecycleException; import org.apache.catalina.connector.Request; import org.apache.catalina.connector.Response; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; import org.apache.tomcat.util.res.StringManager; /** * This valve allows to detect requests that take a long time to process, which might indicate that the thread that is * processing it is stuck. */ public class StuckThreadDetectionValve extends ValveBase { /** * Logger */ private static final Log log = LogFactory.getLog(StuckThreadDetectionValve.class); /** * The string manager for this package. */ private static final StringManager sm = StringManager.getManager(Constants.Package); /** * Keeps count of the number of stuck threads detected */ private final AtomicInteger stuckCount = new AtomicInteger(0); /** * Keeps count of the number of stuck threads that have been interrupted */ private final AtomicLong interruptedThreadsCount = new AtomicLong(); /** * In seconds. Default 600 (10 minutes). */ private int threshold = 600; /** * In seconds. Default is -1 to disable interruption. */ private int interruptThreadThreshold; /** * The only references we keep to actual running Thread objects are in this Map (which is automatically cleaned in * invoke()s finally clause). That way, Threads can be GC'ed, even though the Valve still thinks they are stuck * (caused by a long monitor interval) */ private final Map<Long,MonitoredThread> activeThreads = new ConcurrentHashMap<>(); private final Queue<CompletedStuckThread> completedStuckThreadsQueue = new ConcurrentLinkedQueue<>(); /** * Specifies the threshold (in seconds) used when checking for stuck threads. If &lt;=0, the detection is disabled. * The default is 600 seconds. * * @param threshold The new threshold in seconds */ public void setThreshold(int threshold) { this.threshold = threshold; } /** * @see #setThreshold(int) * * @return The current threshold in seconds */ public int getThreshold() { return threshold; } public int getInterruptThreadThreshold() { return interruptThreadThreshold; } /** * Specifies the threshold (in seconds) before stuck threads are interrupted. If &lt;=0, the interruption is * disabled. The default is -1. If &gt;=0, the value must actually be &gt;= threshold. * * @param interruptThreadThreshold The new thread interruption threshold in seconds */ public void setInterruptThreadThreshold(int interruptThreadThreshold) { this.interruptThreadThreshold = interruptThreadThreshold; } /** * Required to enable async support. */ public StuckThreadDetectionValve() { super(true); } @Override protected void initInternal() throws LifecycleException { super.initInternal(); if (log.isTraceEnabled()) { log.trace("Monitoring stuck threads with threshold = " + threshold + " sec"); } } private void notifyStuckThreadDetected(MonitoredThread monitoredThread, long activeTime, int numStuckThreads) { if (log.isWarnEnabled()) { @SuppressWarnings("deprecation") String msg = sm.getString("stuckThreadDetectionValve.notifyStuckThreadDetected", monitoredThread.getThread().getName(), Long.valueOf(activeTime), monitoredThread.getStartTime(), Integer.valueOf(numStuckThreads), monitoredThread.getRequestUri(), Integer.valueOf(threshold), String.valueOf(monitoredThread.getThread().getId())); // msg += " " + getStackTraceAsString(trace); Throwable th = new Throwable(); th.setStackTrace(monitoredThread.getThread().getStackTrace()); log.warn(msg, th); } } private void notifyStuckThreadCompleted(CompletedStuckThread thread, int numStuckThreads) { if (log.isWarnEnabled()) { String msg = sm.getString("stuckThreadDetectionValve.notifyStuckThreadCompleted", thread.getName(), Long.valueOf(thread.getTotalActiveTime()), Integer.valueOf(numStuckThreads), String.valueOf(thread.getId())); // Since the "stuck thread notification" is warn, this should also // be warn log.warn(msg); } } @Override public void invoke(Request request, Response response) throws IOException, ServletException { if (threshold <= 0) { // short-circuit if not monitoring stuck threads getNext().invoke(request, response); return; } // Save the thread/runnable // Keeping a reference to the thread object here does not prevent // GC'ing, as the reference is removed from the Map in the finally clause Thread currentThread = Thread.currentThread(); @SuppressWarnings("deprecation") Long key = Long.valueOf(currentThread.getId()); StringBuffer requestUrl = request.getRequestURL(); if (request.getQueryString() != null) { requestUrl.append('?'); requestUrl.append(request.getQueryString()); } MonitoredThread monitoredThread = new MonitoredThread(currentThread, requestUrl.toString(), interruptThreadThreshold > 0); activeThreads.put(key, monitoredThread); try { getNext().invoke(request, response); } finally { activeThreads.remove(key); if (monitoredThread.markAsDone() == MonitoredThreadState.STUCK) { if (monitoredThread.wasInterrupted()) { interruptedThreadsCount.incrementAndGet(); } completedStuckThreadsQueue.add( new CompletedStuckThread(monitoredThread.getThread(), monitoredThread.getActiveTimeInMillis())); } } } @Override public void backgroundProcess() { super.backgroundProcess(); long thresholdInMillis = threshold * 1000L; // Check monitored threads, being careful that the request might have // completed by the time we examine it for (MonitoredThread monitoredThread : activeThreads.values()) { long activeTime = monitoredThread.getActiveTimeInMillis(); if (activeTime >= thresholdInMillis && monitoredThread.markAsStuckIfStillRunning()) { int numStuckThreads = stuckCount.incrementAndGet(); notifyStuckThreadDetected(monitoredThread, activeTime, numStuckThreads); } if (interruptThreadThreshold > 0 && activeTime >= interruptThreadThreshold * 1000L) { monitoredThread.interruptIfStuck(interruptThreadThreshold); } } // Check if any threads previously reported as stuck, have finished. for (CompletedStuckThread completedStuckThread = completedStuckThreadsQueue.poll(); completedStuckThread != null; completedStuckThread = completedStuckThreadsQueue.poll()) { int numStuckThreads = stuckCount.decrementAndGet(); notifyStuckThreadCompleted(completedStuckThread, numStuckThreads); } } public int getStuckThreadCount() { return stuckCount.get(); } @SuppressWarnings("deprecation") public long[] getStuckThreadIds() { List<Long> idList = new ArrayList<>(); for (MonitoredThread monitoredThread : activeThreads.values()) { if (monitoredThread.isMarkedAsStuck()) { idList.add(Long.valueOf(monitoredThread.getThread().getId())); } } long[] result = new long[idList.size()]; for (int i = 0; i < result.length; i++) { result[i] = idList.get(i).longValue(); } return result; } public String[] getStuckThreadNames() { List<String> nameList = new ArrayList<>(); for (MonitoredThread monitoredThread : activeThreads.values()) { if (monitoredThread.isMarkedAsStuck()) { nameList.add(monitoredThread.getThread().getName()); } } return nameList.toArray(new String[0]); } public long getInterruptedThreadsCount() { return interruptedThreadsCount.get(); } private static class MonitoredThread { /** * Reference to the thread to get a stack trace from background task */ private final Thread thread; private final String requestUri; private final long start; private final AtomicInteger state = new AtomicInteger(MonitoredThreadState.RUNNING.ordinal()); /** * Semaphore to synchronize the stuck thread with the background-process thread. It's not used if the * interruption feature is not active. */ private final Semaphore interruptionSemaphore; /** * Set to true after the thread is interrupted. No need to make it volatile since it is accessed right after * acquiring the semaphore. */ private boolean interrupted; MonitoredThread(Thread thread, String requestUri, boolean interruptible) { this.thread = thread; this.requestUri = requestUri; this.start = System.currentTimeMillis(); if (interruptible) { interruptionSemaphore = new Semaphore(1); } else { interruptionSemaphore = null; } } public Thread getThread() { return this.thread; } public String getRequestUri() { return requestUri; } public long getActiveTimeInMillis() { return System.currentTimeMillis() - start; } public Date getStartTime() { return new Date(start); } public boolean markAsStuckIfStillRunning() { return this.state.compareAndSet(MonitoredThreadState.RUNNING.ordinal(), MonitoredThreadState.STUCK.ordinal()); } public MonitoredThreadState markAsDone() { int val = this.state.getAndSet(MonitoredThreadState.DONE.ordinal()); MonitoredThreadState threadState = MonitoredThreadState.values()[val]; if (threadState == MonitoredThreadState.STUCK && interruptionSemaphore != null) { try { // use the semaphore to synchronize with the background thread // which might try to interrupt this current thread. // Otherwise, the current thread might be interrupted after // going out from here, maybe already serving a new request this.interruptionSemaphore.acquire(); } catch (InterruptedException e) { if (log.isDebugEnabled()) { log.debug(sm.getString("stuckThreadDetectionValve.interrupted"), e); } } // no need to release the semaphore, it will be GCed } // else the request went through before being marked as stuck, no need // to sync against the semaphore return threadState; } boolean isMarkedAsStuck() { return this.state.get() == MonitoredThreadState.STUCK.ordinal(); } public boolean interruptIfStuck(long interruptThreadThreshold) { if (!isMarkedAsStuck() || interruptionSemaphore == null || !this.interruptionSemaphore.tryAcquire()) { // if the semaphore is already acquired, it means that the // request thread got unstuck before we interrupted it return false; } try { if (log.isWarnEnabled()) { @SuppressWarnings("deprecation") String msg = sm.getString("stuckThreadDetectionValve.notifyStuckThreadInterrupted", this.getThread().getName(), Long.valueOf(getActiveTimeInMillis()), this.getStartTime(), this.getRequestUri(), Long.valueOf(interruptThreadThreshold), String.valueOf(this.getThread().getId())); Throwable th = new Throwable(); th.setStackTrace(this.getThread().getStackTrace()); log.warn(msg, th); } this.thread.interrupt(); } finally { this.interrupted = true; this.interruptionSemaphore.release(); } return true; } public boolean wasInterrupted() { return interrupted; } } private static class CompletedStuckThread { private final String threadName; private final long threadId; private final long totalActiveTime; @SuppressWarnings("deprecation") CompletedStuckThread(Thread thread, long totalActiveTime) { this.threadName = thread.getName(); this.threadId = thread.getId(); this.totalActiveTime = totalActiveTime; } public String getName() { return this.threadName; } public long getId() { return this.threadId; } public long getTotalActiveTime() { return this.totalActiveTime; } } private enum MonitoredThreadState { RUNNING, STUCK, DONE } }
Detected license expression
apache-2.0
Detected license expression (SPDX)
Apache-2.0
Percentage of license text
9.1
Copyrights

      
    
Holders

      
    
Authors

      
    
License detections License expression License expression SPDX
apache_2_0-4bde3f57-78aa-4201-96bf-531cba09e7de apache-2.0 Apache-2.0
URL Start line End line
http://www.apache.org/licenses/LICENSE-2.0 9 9