ttomcat-1778514358873.zip-extract/apache-tomcat-11.0.18-src/java/org/apache/catalina/ha/session/DeltaManager.java

Path
ttomcat-1778514358873.zip-extract/apache-tomcat-11.0.18-src/java/org/apache/catalina/ha/session/DeltaManager.java
Status
scanned
Type
file
Name
DeltaManager.java
Extension
.java
Programming language
Java
Mime type
text/plain
File type
ASCII text, with CRLF line terminators
Tag

      
    
Rootfs path

      
    
Size
55261 (54.0 KB)
MD5
9f74636cad6481489ef2ad33466613dc
SHA1
48de7064905ba34773fcb434433b2e606a3614ff
SHA256
6d7bc18d5597f8b97faa32f8fae97d9f2ae54d43688ff23bbf7c63a4e6827035
SHA512

      
    
SHA1_git
f9b26b7e09904e2d5231913baa3917a36579270c
Is binary

      
    
Is text
True
Is archive

      
    
Is media

      
    
Is legal

      
    
Is manifest

      
    
Is readme

      
    
Is top level

      
    
Is key file

      
    
DeltaManager.java | 54.0 KB |

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.catalina.ha.session; import java.io.BufferedOutputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.Date; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.catalina.Engine; import org.apache.catalina.Host; import org.apache.catalina.LifecycleException; import org.apache.catalina.LifecycleState; import org.apache.catalina.Session; import org.apache.catalina.ha.ClusterManager; import org.apache.catalina.ha.ClusterMessage; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.io.ReplicationStream; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; import org.apache.tomcat.util.ExceptionUtils; import org.apache.tomcat.util.res.StringManager; /** * The DeltaManager manages replicated sessions by only replicating the deltas in data. For applications written to * handle this, the DeltaManager is the optimal way of replicating data. * <p> * This code is almost identical to StandardManager with a difference in how it persists sessions and some modifications * to it. * <p> * <b>IMPLEMENTATION NOTE </b>: Correct behavior of session storing and reloading depends upon external calls to the * <code>start()</code> and <code>stop()</code> methods of this class at the correct times. */ public class DeltaManager extends ClusterManagerBase { // ---------------------------------------------------- Security Classes public final Log log = LogFactory.getLog(DeltaManager.class); /** * The string manager for this package. */ protected static final StringManager sm = StringManager.getManager(DeltaManager.class); private static final String[] EMPTY_STRING_ARRAY = new String[0]; // ----------------------------------------------------- Instance Variables protected String name = null; private boolean expireSessionsOnShutdown = false; private boolean notifySessionListenersOnReplication = true; private boolean notifyContainerListenersOnReplication = true; private volatile boolean stateTransferred = false; private volatile boolean noContextManagerReceived = false; private int stateTransferTimeout = 60; private boolean sendAllSessions = true; private int sendAllSessionsSize = 1000; private boolean enableStatistics = true; /** * wait time between send session block (default 2 sec) */ private int sendAllSessionsWaitTime = 2 * 1000; private final ArrayList<SessionMessage> receivedMessageQueue = new ArrayList<>(); private boolean receiverQueue = false; private boolean stateTimestampDrop = true; private volatile long stateTransferCreateSendTime; // -------------------------------------------------------- stats attributes private final AtomicLong sessionReplaceCounter = new AtomicLong(0); private final AtomicLong counterReceive_EVT_GET_ALL_SESSIONS = new AtomicLong(0); private final AtomicLong counterReceive_EVT_ALL_SESSION_DATA = new AtomicLong(0); private final AtomicLong counterReceive_EVT_SESSION_CREATED = new AtomicLong(0); private final AtomicLong counterReceive_EVT_SESSION_EXPIRED = new AtomicLong(0); private final AtomicLong counterReceive_EVT_SESSION_ACCESSED = new AtomicLong(0); private final AtomicLong counterReceive_EVT_SESSION_DELTA = new AtomicLong(0); private final AtomicInteger counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE = new AtomicInteger(0); private final AtomicLong counterReceive_EVT_CHANGE_SESSION_ID = new AtomicLong(0); private final AtomicLong counterReceive_EVT_ALL_SESSION_NOCONTEXTMANAGER = new AtomicLong(0); private final AtomicLong counterSend_EVT_GET_ALL_SESSIONS = new AtomicLong(0); private final AtomicLong counterSend_EVT_ALL_SESSION_DATA = new AtomicLong(0); private final AtomicLong counterSend_EVT_SESSION_CREATED = new AtomicLong(0); private final AtomicLong counterSend_EVT_SESSION_DELTA = new AtomicLong(0); private final AtomicLong counterSend_EVT_SESSION_ACCESSED = new AtomicLong(0); private final AtomicLong counterSend_EVT_SESSION_EXPIRED = new AtomicLong(0); private final AtomicInteger counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = new AtomicInteger(0); private final AtomicLong counterSend_EVT_CHANGE_SESSION_ID = new AtomicLong(0); private final AtomicInteger counterNoStateTransferred = new AtomicInteger(0); // ------------------------------------------------------------- Constructor public DeltaManager() { super(); } // ------------------------------------------------------------- Properties @Override public void setName(String name) { this.name = name; } @Override public String getName() { return name; } /** * @return Returns the counterSend_EVT_GET_ALL_SESSIONS. */ public long getCounterSend_EVT_GET_ALL_SESSIONS() { return counterSend_EVT_GET_ALL_SESSIONS.get(); } /** * @return Returns the counterSend_EVT_SESSION_ACCESSED. */ public long getCounterSend_EVT_SESSION_ACCESSED() { return counterSend_EVT_SESSION_ACCESSED.get(); } /** * @return Returns the counterSend_EVT_SESSION_CREATED. */ public long getCounterSend_EVT_SESSION_CREATED() { return counterSend_EVT_SESSION_CREATED.get(); } /** * @return Returns the counterSend_EVT_SESSION_DELTA. */ public long getCounterSend_EVT_SESSION_DELTA() { return counterSend_EVT_SESSION_DELTA.get(); } /** * @return Returns the counterSend_EVT_SESSION_EXPIRED. */ public long getCounterSend_EVT_SESSION_EXPIRED() { return counterSend_EVT_SESSION_EXPIRED.get(); } /** * @return Returns the counterSend_EVT_ALL_SESSION_DATA. */ public long getCounterSend_EVT_ALL_SESSION_DATA() { return counterSend_EVT_ALL_SESSION_DATA.get(); } /** * @return Returns the counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE. */ public int getCounterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE() { return counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE.get(); } /** * @return Returns the counterSend_EVT_CHANGE_SESSION_ID. */ public long getCounterSend_EVT_CHANGE_SESSION_ID() { return counterSend_EVT_CHANGE_SESSION_ID.get(); } /** * @return Returns the counterReceive_EVT_ALL_SESSION_DATA. */ public long getCounterReceive_EVT_ALL_SESSION_DATA() { return counterReceive_EVT_ALL_SESSION_DATA.get(); } /** * @return Returns the counterReceive_EVT_GET_ALL_SESSIONS. */ public long getCounterReceive_EVT_GET_ALL_SESSIONS() { return counterReceive_EVT_GET_ALL_SESSIONS.get(); } /** * @return Returns the counterReceive_EVT_SESSION_ACCESSED. */ public long getCounterReceive_EVT_SESSION_ACCESSED() { return counterReceive_EVT_SESSION_ACCESSED.get(); } /** * @return Returns the counterReceive_EVT_SESSION_CREATED. */ public long getCounterReceive_EVT_SESSION_CREATED() { return counterReceive_EVT_SESSION_CREATED.get(); } /** * @return Returns the counterReceive_EVT_SESSION_DELTA. */ public long getCounterReceive_EVT_SESSION_DELTA() { return counterReceive_EVT_SESSION_DELTA.get(); } /** * @return Returns the counterReceive_EVT_SESSION_EXPIRED. */ public long getCounterReceive_EVT_SESSION_EXPIRED() { return counterReceive_EVT_SESSION_EXPIRED.get(); } /** * @return Returns the counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE. */ public int getCounterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE() { return counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE.get(); } /** * @return Returns the counterReceive_EVT_CHANGE_SESSION_ID. */ public long getCounterReceive_EVT_CHANGE_SESSION_ID() { return counterReceive_EVT_CHANGE_SESSION_ID.get(); } /** * @return Returns the counterReceive_EVT_ALL_SESSION_NOCONTEXTMANAGER. */ public long getCounterReceive_EVT_ALL_SESSION_NOCONTEXTMANAGER() { return counterReceive_EVT_ALL_SESSION_NOCONTEXTMANAGER.get(); } /** * @return Returns the sessionReplaceCounter. */ public long getSessionReplaceCounter() { return sessionReplaceCounter.get(); } /** * @return Returns the counterNoStateTransferred. */ public int getCounterNoStateTransferred() { return counterNoStateTransferred.get(); } public int getReceivedQueueSize() { synchronized (receivedMessageQueue) { return receivedMessageQueue.size(); } } /** * @return Returns the stateTransferTimeout. */ public int getStateTransferTimeout() { return stateTransferTimeout; } /** * @param timeoutAllSession The timeout */ public void setStateTransferTimeout(int timeoutAllSession) { this.stateTransferTimeout = timeoutAllSession; } /** * @return <code>true</code> if the state transfer is complete. */ public boolean getStateTransferred() { return stateTransferred; } /** * Set that state transferred is complete * * @param stateTransferred Flag value */ public void setStateTransferred(boolean stateTransferred) { this.stateTransferred = stateTransferred; } public boolean isNoContextManagerReceived() { return noContextManagerReceived; } public void setNoContextManagerReceived(boolean noContextManagerReceived) { this.noContextManagerReceived = noContextManagerReceived; } /** * @return the sendAllSessionsWaitTime in msec */ public int getSendAllSessionsWaitTime() { return sendAllSessionsWaitTime; } /** * @param sendAllSessionsWaitTime The sendAllSessionsWaitTime to set at msec. */ public void setSendAllSessionsWaitTime(int sendAllSessionsWaitTime) { this.sendAllSessionsWaitTime = sendAllSessionsWaitTime; } /** * @return the stateTimestampDrop. */ public boolean isStateTimestampDrop() { return stateTimestampDrop; } /** * @param isTimestampDrop The new flag value */ public void setStateTimestampDrop(boolean isTimestampDrop) { this.stateTimestampDrop = isTimestampDrop; } /** * @return the sendAllSessions. */ public boolean isSendAllSessions() { return sendAllSessions; } /** * @param sendAllSessions The sendAllSessions to set. */ public void setSendAllSessions(boolean sendAllSessions) { this.sendAllSessions = sendAllSessions; } /** * @return the sendAllSessionsSize. */ public int getSendAllSessionsSize() { return sendAllSessionsSize; } /** * @param sendAllSessionsSize The sendAllSessionsSize to set. */ public void setSendAllSessionsSize(int sendAllSessionsSize) { this.sendAllSessionsSize = sendAllSessionsSize; } /** * @return the notifySessionListenersOnReplication. */ public boolean isNotifySessionListenersOnReplication() { return notifySessionListenersOnReplication; } /** * @param notifyListenersCreateSessionOnReplication The notifySessionListenersOnReplication to set. */ public void setNotifySessionListenersOnReplication(boolean notifyListenersCreateSessionOnReplication) { this.notifySessionListenersOnReplication = notifyListenersCreateSessionOnReplication; } public boolean isExpireSessionsOnShutdown() { return expireSessionsOnShutdown; } public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown) { this.expireSessionsOnShutdown = expireSessionsOnShutdown; } public boolean isNotifyContainerListenersOnReplication() { return notifyContainerListenersOnReplication; } public void setNotifyContainerListenersOnReplication(boolean notifyContainerListenersOnReplication) { this.notifyContainerListenersOnReplication = notifyContainerListenersOnReplication; } /** * @return the enableStatistics */ public boolean getEnableStatistics() { return this.enableStatistics; } /** * @param enableStatistics the enableStatistics to set */ public void setEnableStatistics(boolean enableStatistics) { this.enableStatistics = enableStatistics; } // --------------------------------------------------------- Public Methods @Override public Session createSession(String sessionId) { return createSession(sessionId, true); } /** * Create new session with check maxActiveSessions and send session creation to other cluster nodes. * * @param sessionId The session id that should be used for the session * @param distribute <code>true</code> to replicate the new session * * @return The session */ public Session createSession(String sessionId, boolean distribute) { DeltaSession session = (DeltaSession) super.createSession(sessionId); if (distribute) { sendCreateSession(session.getId(), session); } if (log.isDebugEnabled()) { log.debug(sm.getString("deltaManager.createSession.newSession", session.getId(), Integer.valueOf(sessions.size()))); } return session; } /** * Send create session event to all backup node * * @param sessionId The session id of the session * @param session The session object */ protected void sendCreateSession(String sessionId, DeltaSession session) { if (cluster.getMembers().length > 0) { SessionMessage msg = new SessionMessageImpl(getName(), SessionMessage.EVT_SESSION_CREATED, null, sessionId, sessionId + "-" + System.currentTimeMillis()); if (log.isTraceEnabled()) { log.trace(sm.getString("deltaManager.sendMessage.newSession", name, sessionId)); } msg.setTimestamp(session.getCreationTime()); if (enableStatistics) { counterSend_EVT_SESSION_CREATED.incrementAndGet(); } send(msg); } } /** * Send messages to other backup member (domain or all) * * @param msg Session message */ protected void send(SessionMessage msg) { if (cluster != null) { cluster.send(msg); } } /** * {@inheritDoc} * <p> * Creates new DeltaSession instance. */ @Override public Session createEmptySession() { return new DeltaSession(this); } @Override public String rotateSessionId(Session session) { return rotateSessionId(session, true); } @Override public void changeSessionId(Session session, String newId) { changeSessionId(session, newId, true); } protected String rotateSessionId(Session session, boolean notify) { String orgSessionID = session.getId(); String newId = super.rotateSessionId(session); if (notify) { sendChangeSessionId(session.getId(), orgSessionID); } return newId; } protected void changeSessionId(Session session, String newId, boolean notify) { String orgSessionID = session.getId(); super.changeSessionId(session, newId); if (notify) { sendChangeSessionId(session.getId(), orgSessionID); } } protected void sendChangeSessionId(String newSessionID, String orgSessionID) { if (cluster.getMembers().length > 0) { try { // serialize sessionID byte[] data = serializeSessionId(newSessionID); // notify change sessionID SessionMessage msg = new SessionMessageImpl(getName(), SessionMessage.EVT_CHANGE_SESSION_ID, data, orgSessionID, orgSessionID + "-" + System.currentTimeMillis()); msg.setTimestamp(System.currentTimeMillis()); if (enableStatistics) { counterSend_EVT_CHANGE_SESSION_ID.incrementAndGet(); } send(msg); } catch (IOException ioe) { log.error(sm.getString("deltaManager.unableSerializeSessionID", newSessionID), ioe); } } } /** * serialize sessionID * * @param sessionId Session id to serialize * * @return byte array with serialized session id * * @throws IOException if an input/output error occurs */ protected byte[] serializeSessionId(String sessionId) throws IOException { ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeUTF(sessionId); oos.flush(); oos.close(); return bos.toByteArray(); } /** * Load sessionID * * @param data serialized session id * * @return session id * * @throws IOException if an input/output error occurs */ protected String deserializeSessionId(byte[] data) throws IOException { ReplicationStream ois = getReplicationStream(data); String sessionId = ois.readUTF(); ois.close(); return sessionId; } /** * Load sessions from other cluster node. * <p> * FIXME replace currently sessions with same id without notification. * <p> * FIXME SSO handling is not really correct with the session replacement! * * @param data Serialized data * * @exception ClassNotFoundException if a serialized class cannot be found during the reload * @exception IOException if an input/output error occurs */ protected void deserializeSessions(byte[] data) throws ClassNotFoundException, IOException { // Open an input stream to the specified pathname, if any // Load the previously unloaded active sessions try (ObjectInputStream ois = getReplicationStream(data)) { Integer count = (Integer) ois.readObject(); int n = count.intValue(); for (int i = 0; i < n; i++) { DeltaSession session = (DeltaSession) createEmptySession(); session.readObjectData(ois); session.setManager(this); session.setValid(true); session.setPrimarySession(false); // in case the nodes in the cluster are out of // time synch, this will make sure that we have the // correct timestamp, isValid returns true, cause // accessCount=1 session.access(); // make sure that the session gets ready to expire if // needed session.setAccessCount(0); session.resetDeltaRequest(); // FIXME How inform other session id cache like SingleSignOn if (findSession(session.getIdInternal()) != null) { if (enableStatistics) { sessionReplaceCounter.incrementAndGet(); } // FIXME better is to grap this sessions again ! if (log.isWarnEnabled()) { log.warn(sm.getString("deltaManager.loading.existing.session", session.getIdInternal())); } } add(session); if (notifySessionListenersOnReplication) { session.tellNew(); } } } catch (ClassNotFoundException e) { log.error(sm.getString("deltaManager.loading.cnfe", e), e); throw e; } catch (IOException ioe) { log.error(sm.getString("deltaManager.loading.ioe", ioe), ioe); throw ioe; } } /** * Save any currently active sessions in the appropriate persistence mechanism, if any. If persistence is not * supported, this method returns without doing anything. * * @param currentSessions Sessions to serialize * * @return serialized data * * @exception IOException if an input/output error occurs */ protected byte[] serializeSessions(Session[] currentSessions) throws IOException { // Open an output stream to the specified pathname, if any ByteArrayOutputStream fos = new ByteArrayOutputStream(); try (ObjectOutputStream oos = new ObjectOutputStream(new BufferedOutputStream(fos))) { oos.writeObject(Integer.valueOf(currentSessions.length)); for (Session currentSession : currentSessions) { ((DeltaSession) currentSession).writeObjectData(oos); } // Flush and close the output stream oos.flush(); } catch (IOException ioe) { log.error(sm.getString("deltaManager.unloading.ioe", ioe), ioe); throw ioe; } // send object data as byte[] return fos.toByteArray(); } /** * Start this component and implement the requirements of * {@link org.apache.catalina.util.LifecycleBase#startInternal()}. * * @exception LifecycleException if this component detects a fatal error that prevents this component from being * used */ @Override protected void startInternal() throws LifecycleException { super.startInternal(); // Load unloaded sessions, if any try { if (cluster == null) { log.error(sm.getString("deltaManager.noCluster", getName())); return; } else { if (log.isInfoEnabled()) { String type = "unknown"; if (cluster.getContainer() instanceof Host) { type = "Host"; } else if (cluster.getContainer() instanceof Engine) { type = "Engine"; } log.info(sm.getString("deltaManager.registerCluster", getName(), type, cluster.getClusterName())); } } if (log.isInfoEnabled()) { log.info(sm.getString("deltaManager.startClustering", getName())); } getAllClusterSessions(); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("deltaManager.managerLoad"), t); } setState(LifecycleState.STARTING); } /** * get from first session master the backup from all clustered sessions * * @see #findSessionMasterMember() */ public synchronized void getAllClusterSessions() { if (cluster != null && cluster.getMembers().length > 0) { long beforeSendTime = System.currentTimeMillis(); Member mbr = findSessionMasterMember(); if (mbr == null) { // No domain member found return; } SessionMessage msg = new SessionMessageImpl(this.getName(), SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL", "GET-ALL-" + getName()); msg.setTimestamp(beforeSendTime); // set reference time stateTransferCreateSendTime = beforeSendTime; // request session state if (enableStatistics) { counterSend_EVT_GET_ALL_SESSIONS.incrementAndGet(); } stateTransferred = false; // FIXME This send call block the deploy thread, when sender waitForAck is enabled try { synchronized (receivedMessageQueue) { receiverQueue = true; } cluster.send(msg, mbr, Channel.SEND_OPTIONS_ASYNCHRONOUS); if (log.isInfoEnabled()) { log.info(sm.getString("deltaManager.waitForSessionState", getName(), mbr, Integer.valueOf(getStateTransferTimeout()))); } // FIXME At sender ack mode this method check only the state // transfer and resend is a problem! waitForSendAllSessions(beforeSendTime); } finally { synchronized (receivedMessageQueue) { for (SessionMessage smsg : receivedMessageQueue) { if (!stateTimestampDrop) { messageReceived(smsg, smsg.getAddress()); } else { if (smsg.getEventType() != SessionMessage.EVT_GET_ALL_SESSIONS && smsg.getTimestamp() >= stateTransferCreateSendTime) { // FIXME handle EVT_GET_ALL_SESSIONS later messageReceived(smsg, smsg.getAddress()); } else { if (log.isWarnEnabled()) { log.warn(sm.getString("deltaManager.dropMessage", getName(), smsg.getEventTypeString(), new Date(stateTransferCreateSendTime), new Date(smsg.getTimestamp()))); } } } } receivedMessageQueue.clear(); receiverQueue = false; } } } else { if (log.isInfoEnabled()) { log.info(sm.getString("deltaManager.noMembers", getName())); } } } /** * Find the master of the session state * * @return master member of sessions */ protected Member findSessionMasterMember() { Member mbr = null; Member[] mbrs = cluster.getMembers(); if (mbrs.length != 0) { mbr = mbrs[0]; } if (mbr == null && log.isWarnEnabled()) { log.warn(sm.getString("deltaManager.noMasterMember", getName(), "")); } if (mbr != null && log.isTraceEnabled()) { log.trace(sm.getString("deltaManager.foundMasterMember", getName(), mbr)); } return mbr; } /** * Wait that cluster session state is transferred or timeout after 60 Sec With stateTransferTimeout == -1 wait that * backup is transferred (forever mode) * * @param beforeSendTime Start instant of the operation */ protected void waitForSendAllSessions(long beforeSendTime) { long reqStart = System.currentTimeMillis(); long reqNow = reqStart; boolean isTimeout = false; if (getStateTransferTimeout() > 0) { // wait that state is transferred with timeout check do { try { Thread.sleep(100); } catch (Exception ignore) { // Ignore } reqNow = System.currentTimeMillis(); isTimeout = ((reqNow - reqStart) > (1000L * getStateTransferTimeout())); } while ((!getStateTransferred()) && (!isTimeout) && (!isNoContextManagerReceived())); } else { if (getStateTransferTimeout() == -1) { // wait that state is transferred do { try { Thread.sleep(100); } catch (Exception ignore) { // Ignore } } while ((!getStateTransferred()) && (!isNoContextManagerReceived())); reqNow = System.currentTimeMillis(); } } if (isTimeout) { if (enableStatistics) { counterNoStateTransferred.incrementAndGet(); } log.error(sm.getString("deltaManager.noSessionState", getName(), new Date(beforeSendTime), Long.valueOf(reqNow - beforeSendTime))); } else if (isNoContextManagerReceived()) { if (log.isWarnEnabled()) { log.warn(sm.getString("deltaManager.noContextManager", getName(), new Date(beforeSendTime), Long.valueOf(reqNow - beforeSendTime))); } } else { if (log.isInfoEnabled()) { log.info(sm.getString("deltaManager.sessionReceived", getName(), new Date(beforeSendTime), Long.valueOf(reqNow - beforeSendTime))); } } } /** * Stop this component and implement the requirements of * {@link org.apache.catalina.util.LifecycleBase#stopInternal()}. * * @exception LifecycleException if this component detects a fatal error that prevents this component from being * used */ @Override protected void stopInternal() throws LifecycleException { if (log.isDebugEnabled()) { log.debug(sm.getString("deltaManager.stopped", getName())); } setState(LifecycleState.STOPPING); // Expire all active sessions if (log.isInfoEnabled()) { log.info(sm.getString("deltaManager.expireSessions", getName())); } Session[] sessions = findSessions(); for (Session value : sessions) { DeltaSession session = (DeltaSession) value; if (!session.isValid()) { continue; } try { session.expire(true, isExpireSessionsOnShutdown()); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); } } // Require a new random number generator if we are restarted super.stopInternal(); } // -------------------------------------------------------- Replication // Methods @Override public void messageDataReceived(ClusterMessage cmsg) { if (cmsg instanceof SessionMessage msg) { switch (msg.getEventType()) { case SessionMessage.EVT_GET_ALL_SESSIONS: case SessionMessage.EVT_SESSION_CREATED: case SessionMessage.EVT_SESSION_EXPIRED: case SessionMessage.EVT_SESSION_ACCESSED: case SessionMessage.EVT_SESSION_DELTA: case SessionMessage.EVT_CHANGE_SESSION_ID: synchronized (receivedMessageQueue) { if (receiverQueue) { receivedMessageQueue.add(msg); return; } } break; default: // we didn't queue, do nothing break; } // switch messageReceived(msg, msg.getAddress()); } } @Override public ClusterMessage requestCompleted(String sessionId) { return requestCompleted(sessionId, false); } /** * When the request has been completed, the replication valve will notify the manager, and the manager will decide * whether any replication is needed or not. If there is a need for replication, the manager will create a session * message and that will be replicated. The cluster determines where it gets sent. Session expiration also calls * this method, but with expires == true. * * @param sessionId - the sessionId that just completed. * @param expires - whether this method has been called during session expiration * * @return a SessionMessage to be sent, */ public ClusterMessage requestCompleted(String sessionId, boolean expires) { DeltaSession session; SessionMessage msg = null; try { session = (DeltaSession) findSession(sessionId); if (session == null) { // A parallel request has called session.invalidate() which has // removed the session from the Manager. return null; } if (session.isDirty()) { if (enableStatistics) { counterSend_EVT_SESSION_DELTA.incrementAndGet(); } msg = new SessionMessageImpl(getName(), SessionMessage.EVT_SESSION_DELTA, session.getDiff(), sessionId, sessionId + "-" + System.currentTimeMillis()); } } catch (IOException ioe) { log.error(sm.getString("deltaManager.createMessage.unableCreateDeltaRequest", sessionId), ioe); return null; } if (msg == null) { if (!expires && !session.isPrimarySession()) { if (enableStatistics) { counterSend_EVT_SESSION_ACCESSED.incrementAndGet(); } msg = new SessionMessageImpl(getName(), SessionMessage.EVT_SESSION_ACCESSED, null, sessionId, sessionId + "-" + System.currentTimeMillis()); if (log.isDebugEnabled()) { log.debug(sm.getString("deltaManager.createMessage.accessChangePrimary", getName(), sessionId)); } } } else { // log only outside synch block! if (log.isDebugEnabled()) { log.debug(sm.getString("deltaManager.createMessage.delta", getName(), sessionId)); } } if (!expires) { session.setPrimarySession(true); } // check to see if we need to send out an access message if (!expires && (msg == null)) { long replDelta = System.currentTimeMillis() - session.getLastTimeReplicated(); if (session.getMaxInactiveInterval() >= 0 && replDelta > (session.getMaxInactiveInterval() * 1000L)) { if (enableStatistics) { counterSend_EVT_SESSION_ACCESSED.incrementAndGet(); } msg = new SessionMessageImpl(getName(), SessionMessage.EVT_SESSION_ACCESSED, null, sessionId, sessionId + "-" + System.currentTimeMillis()); if (log.isDebugEnabled()) { log.debug(sm.getString("deltaManager.createMessage.access", getName(), sessionId)); } } } // update last replicated time if (msg != null) { session.setLastTimeReplicated(System.currentTimeMillis()); msg.setTimestamp(session.getLastTimeReplicated()); } return msg; } /** * Reset manager statistics */ public synchronized void resetStatistics() { processingTime = 0; expiredSessions.set(0); synchronized (sessionCreationTiming) { sessionCreationTiming.clear(); while (sessionCreationTiming.size() < TIMING_STATS_CACHE_SIZE) { sessionCreationTiming.add(null); } } synchronized (sessionExpirationTiming) { sessionExpirationTiming.clear(); while (sessionExpirationTiming.size() < TIMING_STATS_CACHE_SIZE) { sessionExpirationTiming.add(null); } } rejectedSessions = 0; sessionReplaceCounter.set(0); counterNoStateTransferred.set(0); setMaxActive(getActiveSessions()); counterReceive_EVT_ALL_SESSION_DATA.set(0); counterReceive_EVT_GET_ALL_SESSIONS.set(0); counterReceive_EVT_SESSION_ACCESSED.set(0); counterReceive_EVT_SESSION_CREATED.set(0); counterReceive_EVT_SESSION_DELTA.set(0); counterReceive_EVT_SESSION_EXPIRED.set(0); counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE.set(0); counterReceive_EVT_CHANGE_SESSION_ID.set(0); counterSend_EVT_ALL_SESSION_DATA.set(0); counterSend_EVT_GET_ALL_SESSIONS.set(0); counterSend_EVT_SESSION_ACCESSED.set(0); counterSend_EVT_SESSION_CREATED.set(0); counterSend_EVT_SESSION_DELTA.set(0); counterSend_EVT_SESSION_EXPIRED.set(0); counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE.set(0); counterSend_EVT_CHANGE_SESSION_ID.set(0); } // -------------------------------------------------------- expire /** * send session expired to other cluster nodes * * @param id session id */ protected void sessionExpired(String id) { if (cluster.getMembers().length > 0) { if (enableStatistics) { counterSend_EVT_SESSION_EXPIRED.incrementAndGet(); } SessionMessage msg = new SessionMessageImpl(getName(), SessionMessage.EVT_SESSION_EXPIRED, null, id, id + "-EXPIRED-MSG"); msg.setTimestamp(System.currentTimeMillis()); if (log.isDebugEnabled()) { log.debug(sm.getString("deltaManager.createMessage.expire", getName(), id)); } send(msg); } } /** * Expire all find sessions. */ public void expireAllLocalSessions() { Session[] sessions = findSessions(); int expireDirect = 0; int expireIndirect = 0; long timeNow = 0; if (log.isTraceEnabled()) { timeNow = System.currentTimeMillis(); log.trace("Start expire all sessions " + getName() + " at " + timeNow + " sessioncount " + sessions.length); } for (Session value : sessions) { if (value instanceof DeltaSession session) { if (session.isPrimarySession()) { if (session.isValid()) { session.expire(); expireDirect++; } else { expireIndirect++; } // end if } // end if } // end if } // for if (log.isTraceEnabled()) { long timeEnd = System.currentTimeMillis(); log.trace("End expire sessions " + getName() + " expire processingTime " + (timeEnd - timeNow) + " expired direct sessions: " + expireDirect + " expired direct sessions: " + expireIndirect); } } @Override public String[] getInvalidatedSessions() { return EMPTY_STRING_ARRAY; } // -------------------------------------------------------- message receive /** * This method is called by the received thread when a SessionMessage has been received from one of the other nodes * in the cluster. * * @param msg - the message received * @param sender - the sender of the message, this is used if we receive a EVT_GET_ALL_SESSION message, so that we * only reply to the requesting node */ protected void messageReceived(SessionMessage msg, Member sender) { Thread currentThread = Thread.currentThread(); ClassLoader contextLoader = currentThread.getContextClassLoader(); try { ClassLoader[] loaders = getClassLoaders(); currentThread.setContextClassLoader(loaders[0]); if (log.isTraceEnabled()) { log.trace(sm.getString("deltaManager.receiveMessage.eventType", getName(), msg.getEventTypeString(), sender)); } switch (msg.getEventType()) { case SessionMessage.EVT_GET_ALL_SESSIONS: handleGET_ALL_SESSIONS(msg, sender); break; case SessionMessage.EVT_ALL_SESSION_DATA: handleALL_SESSION_DATA(msg, sender); break; case SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE: handleALL_SESSION_TRANSFERCOMPLETE(msg, sender); break; case SessionMessage.EVT_SESSION_CREATED: handleSESSION_CREATED(msg, sender); break; case SessionMessage.EVT_SESSION_EXPIRED: handleSESSION_EXPIRED(msg, sender); break; case SessionMessage.EVT_SESSION_ACCESSED: handleSESSION_ACCESSED(msg, sender); break; case SessionMessage.EVT_SESSION_DELTA: handleSESSION_DELTA(msg, sender); break; case SessionMessage.EVT_CHANGE_SESSION_ID: handleCHANGE_SESSION_ID(msg, sender); break; case SessionMessage.EVT_ALL_SESSION_NOCONTEXTMANAGER: handleALL_SESSION_NOCONTEXTMANAGER(msg, sender); break; default: // we didn't recognize the message type, do nothing break; } // switch } catch (Exception e) { log.error(sm.getString("deltaManager.receiveMessage.error", getName()), e); } finally { currentThread.setContextClassLoader(contextLoader); } } // -------------------------------------------------------- message receiver handler /** * handle receive session state is complete transferred * * @param msg Session message * @param sender Member which sent the message */ protected void handleALL_SESSION_TRANSFERCOMPLETE(SessionMessage msg, Member sender) { if (enableStatistics) { counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE.incrementAndGet(); } if (log.isDebugEnabled()) { log.debug(sm.getString("deltaManager.receiveMessage.transfercomplete", getName(), sender.getHost(), Integer.valueOf(sender.getPort()))); } stateTransferCreateSendTime = msg.getTimestamp(); stateTransferred = true; } /** * handle receive session delta * * @param msg Session message * @param sender Member which sent the message * * @throws IOException IO error with serialization * @throws ClassNotFoundException Serialization error */ protected void handleSESSION_DELTA(SessionMessage msg, Member sender) throws IOException, ClassNotFoundException { if (enableStatistics) { counterReceive_EVT_SESSION_DELTA.incrementAndGet(); } byte[] delta = msg.getSession(); DeltaSession session = (DeltaSession) findSession(msg.getSessionID()); if (session == null) { if (log.isDebugEnabled()) { log.debug(sm.getString("deltaManager.receiveMessage.delta.unknown", getName(), msg.getSessionID())); } } else { if (log.isDebugEnabled()) { log.debug(sm.getString("deltaManager.receiveMessage.delta", getName(), msg.getSessionID())); } session.deserializeAndExecuteDeltaRequest(delta); } } /** * handle receive session is access at other node ( primary session is now false) * * @param msg Session message * @param sender Member which sent the message * * @throws IOException Propagated IO error */ protected void handleSESSION_ACCESSED(SessionMessage msg, Member sender) throws IOException { if (enableStatistics) { counterReceive_EVT_SESSION_ACCESSED.incrementAndGet(); } DeltaSession session = (DeltaSession) findSession(msg.getSessionID()); if (session != null) { if (log.isDebugEnabled()) { log.debug(sm.getString("deltaManager.receiveMessage.accessed", getName(), msg.getSessionID())); } session.access(); session.setPrimarySession(false); session.endAccess(); } } /** * handle receive session is expired at other node ( expire session also here) * * @param msg Session message * @param sender Member which sent the message * * @throws IOException Propagated IO error */ protected void handleSESSION_EXPIRED(SessionMessage msg, Member sender) throws IOException { if (enableStatistics) { counterReceive_EVT_SESSION_EXPIRED.incrementAndGet(); } DeltaSession session = (DeltaSession) findSession(msg.getSessionID()); if (session != null) { if (log.isDebugEnabled()) { log.debug(sm.getString("deltaManager.receiveMessage.expired", getName(), msg.getSessionID())); } session.expire(notifySessionListenersOnReplication, false); } } /** * handle receive new session is created at other node (create backup - primary false) * * @param msg Session message * @param sender Member which sent the message */ protected void handleSESSION_CREATED(SessionMessage msg, Member sender) { if (enableStatistics) { counterReceive_EVT_SESSION_CREATED.incrementAndGet(); } if (log.isDebugEnabled()) { log.debug(sm.getString("deltaManager.receiveMessage.createNewSession", getName(), msg.getSessionID())); } DeltaSession session = (DeltaSession) createEmptySession(); session.setValid(true); session.setPrimarySession(false); session.setCreationTime(msg.getTimestamp()); // use container maxInactiveInterval so that session will expire correctly // in case of primary transfer session.setMaxInactiveInterval(getContext().getSessionTimeout() * 60, false); session.access(); session.setId(msg.getSessionID(), notifySessionListenersOnReplication); session.endAccess(); } /** * handle receive sessions from other not ( restart ) * * @param msg Session message * @param sender Member which sent the message * * @throws ClassNotFoundException Serialization error * @throws IOException IO error with serialization */ protected void handleALL_SESSION_DATA(SessionMessage msg, Member sender) throws ClassNotFoundException, IOException { if (enableStatistics) { counterReceive_EVT_ALL_SESSION_DATA.incrementAndGet(); } if (log.isDebugEnabled()) { log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataBegin", getName())); } byte[] data = msg.getSession(); deserializeSessions(data); if (log.isDebugEnabled()) { log.debug(sm.getString("deltaManager.receiveMessage.allSessionDataAfter", getName())); } // stateTransferred = true; } /** * Handle a get all sessions message from another node. Depending on {@link #sendAllSessions}, sessions are either * sent in a single message or in batches. Sending is complete when this method exits. * * @param msg Session message * @param sender Member which sent the message * * @throws IOException IO error sending messages */ protected void handleGET_ALL_SESSIONS(SessionMessage msg, Member sender) throws IOException { if (enableStatistics) { counterReceive_EVT_GET_ALL_SESSIONS.incrementAndGet(); } // get a list of all the session from this manager if (log.isDebugEnabled()) { log.debug(sm.getString("deltaManager.receiveMessage.unloadingBegin", getName())); } // Write the number of active sessions, followed by the details // get all sessions and serialize without sync Session[] currentSessions = findSessions(); long findSessionTimestamp = System.currentTimeMillis(); if (isSendAllSessions()) { sendSessions(sender, currentSessions, findSessionTimestamp); } else { // send sessions in batches int remain = currentSessions.length; for (int i = 0; i < currentSessions.length; i += getSendAllSessionsSize()) { int len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length - i : getSendAllSessionsSize(); Session[] sendSessions = new Session[len]; System.arraycopy(currentSessions, i, sendSessions, 0, len); sendSessions(sender, sendSessions, findSessionTimestamp); remain = remain - len; if (getSendAllSessionsWaitTime() > 0 && remain > 0) { try { Thread.sleep(getSendAllSessionsWaitTime()); } catch (Exception ignore) { // Ignore } } } } SessionMessage newmsg = new SessionMessageImpl(name, SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE, null, "SESSION-STATE-TRANSFERRED", "SESSION-STATE-TRANSFERRED" + getName()); newmsg.setTimestamp(findSessionTimestamp); if (log.isDebugEnabled()) { log.debug(sm.getString("deltaManager.createMessage.allSessionTransferred", getName())); } if (enableStatistics) { counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE.incrementAndGet(); } cluster.send(newmsg, sender); } /** * handle receive change sessionID at other node * * @param msg Session message * @param sender Member which sent the message * * @throws IOException IO error with serialization */ protected void handleCHANGE_SESSION_ID(SessionMessage msg, Member sender) throws IOException { if (enableStatistics) { counterReceive_EVT_CHANGE_SESSION_ID.incrementAndGet(); } DeltaSession session = (DeltaSession) findSession(msg.getSessionID()); if (session != null) { String newSessionID = deserializeSessionId(msg.getSession()); session.setPrimarySession(false); // change session id changeSessionId(session, newSessionID, notifySessionListenersOnReplication, notifyContainerListenersOnReplication); } } /** * handle receive no context manager. * * @param msg Session message * @param sender Member which sent the message */ protected void handleALL_SESSION_NOCONTEXTMANAGER(SessionMessage msg, Member sender) { if (enableStatistics) { counterReceive_EVT_ALL_SESSION_NOCONTEXTMANAGER.incrementAndGet(); } if (log.isDebugEnabled()) { log.debug(sm.getString("deltaManager.receiveMessage.noContextManager", getName(), sender.getHost(), Integer.valueOf(sender.getPort()))); } noContextManagerReceived = true; } /** * send a block of session to sender * * @param sender Sender member * @param currentSessions Sessions to send * @param sendTimestamp Timestamp * * @throws IOException IO error sending messages */ protected void sendSessions(Member sender, Session[] currentSessions, long sendTimestamp) throws IOException { byte[] data = serializeSessions(currentSessions); if (log.isDebugEnabled()) { log.debug(sm.getString("deltaManager.receiveMessage.unloadingAfter", getName())); } SessionMessage newmsg = new SessionMessageImpl(name, SessionMessage.EVT_ALL_SESSION_DATA, data, "SESSION-STATE", "SESSION-STATE-" + getName()); newmsg.setTimestamp(sendTimestamp); if (log.isDebugEnabled()) { log.debug(sm.getString("deltaManager.createMessage.allSessionData", getName())); } if (enableStatistics) { counterSend_EVT_ALL_SESSION_DATA.incrementAndGet(); } int sendOptions = Channel.SEND_OPTIONS_SYNCHRONIZED_ACK | Channel.SEND_OPTIONS_USE_ACK; cluster.send(newmsg, sender, sendOptions); } @Override public ClusterManager cloneFromTemplate() { DeltaManager result = new DeltaManager(); clone(result); result.expireSessionsOnShutdown = expireSessionsOnShutdown; result.notifySessionListenersOnReplication = notifySessionListenersOnReplication; result.notifyContainerListenersOnReplication = notifyContainerListenersOnReplication; result.stateTransferTimeout = stateTransferTimeout; result.sendAllSessions = sendAllSessions; result.sendAllSessionsSize = sendAllSessionsSize; result.sendAllSessionsWaitTime = sendAllSessionsWaitTime; result.stateTimestampDrop = stateTimestampDrop; return result; } }
Detected license expression
apache-2.0
Detected license expression (SPDX)
Apache-2.0
Percentage of license text
2.64
Copyrights

      
    
Holders

      
    
Authors

      
    
License detections License expression License expression SPDX
apache_2_0-4bde3f57-78aa-4201-96bf-531cba09e7de apache-2.0 Apache-2.0
URL Start line End line
http://www.apache.org/licenses/LICENSE-2.0 9 9