ttomcat-1778514358873.zip-extract/apache-tomcat-11.0.18-src/java/org/apache/catalina/tribes/group/GroupChannel.java

Path
ttomcat-1778514358873.zip-extract/apache-tomcat-11.0.18-src/java/org/apache/catalina/tribes/group/GroupChannel.java
Status
scanned
Type
file
Name
GroupChannel.java
Extension
.java
Programming language
Java
Mime type
text/plain
File type
ASCII text, with CRLF line terminators
Tag

      
    
Rootfs path

      
    
Size
26294 (25.7 KB)
MD5
2ba57bb0fd40fa2529e839d4591ba28c
SHA1
2d535d02251321e995dfa75521b5d9247d9c7615
SHA256
b4f6bb3a6e1abcfe1f5a6e019bff76d0462eb1c014d67242cb418b5cacf30b86
SHA512

      
    
SHA1_git
ce9394324c4846053555ba463e5e5e3920779448
Is binary

      
    
Is text
True
Is archive

      
    
Is media

      
    
Is legal

      
    
Is manifest

      
    
Is readme

      
    
Is top level

      
    
Is key file

      
    
GroupChannel.java | 25.7 KB |

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.catalina.tribes.group; import java.io.IOException; import java.io.Serializable; import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.management.MBeanServer; import javax.management.ObjectName; import org.apache.catalina.tribes.ByteMessage; import org.apache.catalina.tribes.ChannelException; import org.apache.catalina.tribes.ChannelInterceptor; import org.apache.catalina.tribes.ChannelListener; import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.ChannelReceiver; import org.apache.catalina.tribes.ChannelSender; import org.apache.catalina.tribes.ErrorHandler; import org.apache.catalina.tribes.Heartbeat; import org.apache.catalina.tribes.JmxChannel; import org.apache.catalina.tribes.ManagedChannel; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.MembershipListener; import org.apache.catalina.tribes.MembershipService; import org.apache.catalina.tribes.RemoteProcessException; import org.apache.catalina.tribes.UniqueId; import org.apache.catalina.tribes.group.interceptors.MessageDispatchInterceptor; import org.apache.catalina.tribes.io.BufferPool; import org.apache.catalina.tribes.io.ChannelData; import org.apache.catalina.tribes.io.XByteBuffer; import org.apache.catalina.tribes.jmx.JmxRegistry; import org.apache.catalina.tribes.util.Arrays; import org.apache.catalina.tribes.util.Logs; import org.apache.catalina.tribes.util.StringManager; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; /** * The default implementation of a Channel.<br> * The GroupChannel manages the replication channel. It coordinates message being sent and received with membership * announcements. The channel has a chain of interceptors that can modify the message or perform other logic.<br> * It manages a complete group, both membership and replication. */ public class GroupChannel extends ChannelInterceptorBase implements ManagedChannel, JmxChannel, GroupChannelMBean { private static final Log log = LogFactory.getLog(GroupChannel.class); protected static final StringManager sm = StringManager.getManager(GroupChannel.class); /** * Flag to determine if the channel manages its own heartbeat If set to true, the channel will start a local thread * for the heart beat. */ protected boolean heartbeat = true; /** * If <code>heartbeat == true</code> then how often do we want this heartbeat to run. The default value is 5000 * milliseconds. */ protected long heartbeatSleeptime = 5 * 1000; /** * Internal heartbeat future */ protected ScheduledFuture<?> heartbeatFuture = null; protected ScheduledFuture<?> monitorFuture; /** * The <code>ChannelCoordinator</code> coordinates the bottom layer components:<br> * - MembershipService<br> * - ChannelSender <br> * - ChannelReceiver<br> */ protected final ChannelCoordinator coordinator = new ChannelCoordinator(); /** * The first interceptor in the interceptor stack. The interceptors are chained in a linked list, so we only need a * reference to the first one */ protected ChannelInterceptor interceptors = null; /** * A list of membership listeners that subscribe to membership announcements */ protected final List<MembershipListener> membershipListeners = new CopyOnWriteArrayList<>(); /** * A list of channel listeners that subscribe to incoming messages */ protected final List<ChannelListener> channelListeners = new CopyOnWriteArrayList<>(); /** * If set to true, the GroupChannel will check to make sure that */ protected boolean optionCheck = false; /** * the name of this channel. */ protected String name = null; /** * the jmx domain which this channel is registered. */ private String jmxDomain = "ClusterChannel"; /** * the jmx prefix which will be used with channel ObjectName. */ private String jmxPrefix = ""; /** * If set to true, this channel is registered with jmx. */ private boolean jmxEnabled = true; /** * Executor service. */ protected ScheduledExecutorService utilityExecutor = null; /** * the ObjectName of this channel. */ private ObjectName oname = null; /** * Creates a GroupChannel. This constructor will also add the first interceptor in the GroupChannel.<br> * The first interceptor is always the channel itself. */ public GroupChannel() { addInterceptor(this); } @Override public void addInterceptor(ChannelInterceptor interceptor) { if (interceptors == null) { interceptors = interceptor; interceptors.setNext(coordinator); interceptors.setPrevious(null); coordinator.setPrevious(interceptors); } else { ChannelInterceptor last = interceptors; while (last.getNext() != coordinator) { last = last.getNext(); } last.setNext(interceptor); interceptor.setNext(coordinator); interceptor.setPrevious(last); coordinator.setPrevious(interceptor); } } /** * Sends a heartbeat through the interceptor stack.<br> * Invoke this method from the application on a periodic basis if you have turned off internal heartbeats * <code>channel.setHeartbeat(false)</code> */ @Override public void heartbeat() { super.heartbeat(); for (MembershipListener listener : membershipListeners) { if (listener instanceof Heartbeat) { ((Heartbeat) listener).heartbeat(); } } for (ChannelListener listener : channelListeners) { if (listener instanceof Heartbeat) { ((Heartbeat) listener).heartbeat(); } } } @Override public UniqueId send(Member[] destination, Serializable msg, int options) throws ChannelException { return send(destination, msg, options, null); } @Override public UniqueId send(Member[] destination, Serializable msg, int options, ErrorHandler handler) throws ChannelException { if (msg == null) { throw new ChannelException(sm.getString("groupChannel.nullMessage")); } XByteBuffer buffer = null; try { if (destination == null || destination.length == 0) { throw new ChannelException(sm.getString("groupChannel.noDestination")); } ChannelData data = new ChannelData(true);// generates a unique Id data.setAddress(getLocalMember(false)); data.setTimestamp(System.currentTimeMillis()); byte[] b; if (msg instanceof ByteMessage) { b = ((ByteMessage) msg).getMessage(); options = options | SEND_OPTIONS_BYTE_MESSAGE; } else { b = XByteBuffer.serialize(msg); options = options & (~SEND_OPTIONS_BYTE_MESSAGE); } data.setOptions(options); // XByteBuffer buffer = new XByteBuffer(b.length+128,false); buffer = BufferPool.getBufferPool().getBuffer(b.length + 128, false); buffer.append(b, 0, b.length); data.setMessage(buffer); InterceptorPayload payload = null; if (handler != null) { payload = new InterceptorPayload(); payload.setErrorHandler(handler); } getFirstInterceptor().sendMessage(destination, data, payload); if (Logs.MESSAGES.isTraceEnabled()) { Logs.MESSAGES.trace("GroupChannel - Sent msg:" + new UniqueId(data.getUniqueId()) + " at " + new java.sql.Timestamp(System.currentTimeMillis()) + " to " + Arrays.toNameString(destination)); Logs.MESSAGES.trace("GroupChannel - Send Message:" + new UniqueId(data.getUniqueId()) + " is " + msg); } return new UniqueId(data.getUniqueId()); } catch (RuntimeException | IOException e) { throw new ChannelException(e); } finally { if (buffer != null) { BufferPool.getBufferPool().returnBuffer(buffer); } } } /** * Callback from the interceptor stack. <br> * When a message is received from a remote node, this method will be invoked by the previous interceptor.<br> * This method can also be used to send a message to other components within the same application, but it's an * extreme case, and you're probably better off doing that logic between the applications itself. * * @param msg ChannelMessage */ @Override public void messageReceived(ChannelMessage msg) { if (msg == null) { return; } try { if (Logs.MESSAGES.isTraceEnabled()) { Logs.MESSAGES.trace("GroupChannel - Received msg:" + new UniqueId(msg.getUniqueId()) + " at " + new java.sql.Timestamp(System.currentTimeMillis()) + " from " + msg.getAddress().getName()); } Serializable fwd; if ((msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE) { fwd = new ByteMessage(msg.getMessage().getBytes()); } else { try { fwd = XByteBuffer.deserialize(msg.getMessage().getBytesDirect(), 0, msg.getMessage().getLength()); } catch (Exception e) { log.error(sm.getString("groupChannel.unable.deserialize", msg), e); return; } } if (Logs.MESSAGES.isTraceEnabled()) { Logs.MESSAGES.trace("GroupChannel - Receive Message:" + new UniqueId(msg.getUniqueId()) + " is " + fwd); } // get the actual member with the correct alive time Member source = msg.getAddress(); boolean rx = false; boolean delivered = false; for (ChannelListener channelListener : channelListeners) { if (channelListener != null && channelListener.accept(fwd, source)) { channelListener.messageReceived(fwd, source); delivered = true; // if the message was accepted by an RPC channel, that channel // is responsible for returning the reply, otherwise we send an absence reply if (channelListener instanceof RpcChannel) { rx = true; } } } // for if ((!rx) && (fwd instanceof RpcMessage)) { // if we have a message that requires a response, // but none was given, send back an immediate one sendNoRpcChannelReply((RpcMessage) fwd, source); } if (Logs.MESSAGES.isTraceEnabled()) { Logs.MESSAGES.trace("GroupChannel delivered[" + delivered + "] id:" + new UniqueId(msg.getUniqueId())); } } catch (Exception e) { // this could be the channel listener throwing an exception, we should log it // as a warning. if (log.isWarnEnabled()) { log.warn(sm.getString("groupChannel.receiving.error"), e); } throw new RemoteProcessException(sm.getString("groupChannel.receiving.error"), e); } } /** * Sends a <code>NoRpcChannelReply</code> message to a member<br> * This method gets invoked by the channel if an RPC message comes in and no channel listener accepts the message. * This avoids timeout * * @param msg RpcMessage * @param destination Member - the destination for the reply */ protected void sendNoRpcChannelReply(RpcMessage msg, Member destination) { try { // avoid circular loop if (msg instanceof RpcMessage.NoRpcChannelReply) { return; } RpcMessage.NoRpcChannelReply reply = new RpcMessage.NoRpcChannelReply(msg.rpcId, msg.uuid); send(new Member[] { destination }, reply, SEND_OPTIONS_ASYNCHRONOUS); } catch (Exception e) { log.error(sm.getString("groupChannel.sendFail.noRpcChannelReply"), e); } } /** * memberAdded gets invoked by the interceptor below the channel and the channel will broadcast it to the membership * listeners * * @param member Member - the new member */ @Override public void memberAdded(Member member) { // notify upwards for (MembershipListener membershipListener : membershipListeners) { if (membershipListener != null) { membershipListener.memberAdded(member); } } } /** * memberDisappeared gets invoked by the interceptor below the channel and the channel will broadcast it to the * membership listeners * * @param member Member - the member that left or crashed */ @Override public void memberDisappeared(Member member) { // notify upwards for (MembershipListener membershipListener : membershipListeners) { if (membershipListener != null) { membershipListener.memberDisappeared(member); } } } /** * Sets up the default implementation interceptor stack if no interceptors have been added * * @throws ChannelException Cluster error */ protected synchronized void setupDefaultStack() throws ChannelException { if (getFirstInterceptor() != null && ((getFirstInterceptor().getNext() instanceof ChannelCoordinator))) { addInterceptor(new MessageDispatchInterceptor()); } Iterator<ChannelInterceptor> interceptors = getInterceptors(); while (interceptors.hasNext()) { ChannelInterceptor channelInterceptor = interceptors.next(); channelInterceptor.setChannel(this); } coordinator.setChannel(this); } /** * Validates the option flags that each interceptor is using and reports an error if two interceptor share the same * flag. * * @throws ChannelException Error with option flag */ protected void checkOptionFlags() throws ChannelException { StringBuilder conflicts = new StringBuilder(); ChannelInterceptor first = interceptors; while (first != null) { int flag = first.getOptionFlag(); if (flag != 0) { ChannelInterceptor next = first.getNext(); while (next != null) { int nflag = next.getOptionFlag(); if (nflag != 0 && (((flag & nflag) == flag) || ((flag & nflag) == nflag))) { conflicts.append('['); conflicts.append(first.getClass().getName()); conflicts.append(':'); conflicts.append(flag); conflicts.append(" == "); conflicts.append(next.getClass().getName()); conflicts.append(':'); conflicts.append(nflag); conflicts.append("] "); } // end if next = next.getNext(); } // while } // end if first = first.getNext(); } // while if (!conflicts.isEmpty()) { throw new ChannelException(sm.getString("groupChannel.optionFlag.conflict", conflicts.toString())); } } protected boolean ownExecutor = false; @Override public synchronized void start(int svc) throws ChannelException { setupDefaultStack(); if (optionCheck) { checkOptionFlags(); } // register jmx JmxRegistry jmxRegistry = JmxRegistry.getRegistry(this); if (jmxRegistry != null) { this.oname = jmxRegistry.registerJmx(",component=Channel", this); } if (utilityExecutor == null) { log.warn(sm.getString("groupChannel.warn.noUtilityExecutor")); utilityExecutor = new ScheduledThreadPoolExecutor(1); ownExecutor = true; } super.start(svc); monitorFuture = utilityExecutor.scheduleWithFixedDelay(this::startHeartbeat, 0, 60, TimeUnit.SECONDS); } protected void startHeartbeat() { if (heartbeat && (heartbeatFuture == null || heartbeatFuture.isDone())) { if (heartbeatFuture != null && heartbeatFuture.isDone()) { // There was an error executing the scheduled task, get it and log it try { heartbeatFuture.get(); } catch (InterruptedException | ExecutionException e) { log.error(sm.getString("groupChannel.unable.sendHeartbeat"), e); } } heartbeatFuture = utilityExecutor.scheduleWithFixedDelay(new HeartbeatRunnable(), heartbeatSleeptime, heartbeatSleeptime, TimeUnit.MILLISECONDS); } } @Override public synchronized void stop(int svc) throws ChannelException { if (monitorFuture != null) { monitorFuture.cancel(true); monitorFuture = null; } if (heartbeatFuture != null) { heartbeatFuture.cancel(true); heartbeatFuture = null; } super.stop(svc); if (ownExecutor) { utilityExecutor.shutdown(); utilityExecutor = null; ownExecutor = false; } if (oname != null) { JmxRegistry.getRegistry(this).unregisterJmx(oname); oname = null; } } /** * Returns the first interceptor of the stack. Useful for traversal. * * @return ChannelInterceptor */ public ChannelInterceptor getFirstInterceptor() { return Objects.requireNonNullElse(interceptors, coordinator); } @Override public ScheduledExecutorService getUtilityExecutor() { return utilityExecutor; } @Override public void setUtilityExecutor(ScheduledExecutorService utilityExecutor) { this.utilityExecutor = utilityExecutor; } @Override public ChannelReceiver getChannelReceiver() { return coordinator.getClusterReceiver(); } @Override public ChannelSender getChannelSender() { return coordinator.getClusterSender(); } @Override public MembershipService getMembershipService() { return coordinator.getMembershipService(); } @Override public void setChannelReceiver(ChannelReceiver clusterReceiver) { coordinator.setClusterReceiver(clusterReceiver); } @Override public void setChannelSender(ChannelSender clusterSender) { coordinator.setClusterSender(clusterSender); } @Override public void setMembershipService(MembershipService membershipService) { coordinator.setMembershipService(membershipService); } @Override public void addMembershipListener(MembershipListener membershipListener) { if (!this.membershipListeners.contains(membershipListener)) { this.membershipListeners.add(membershipListener); } } @Override public void removeMembershipListener(MembershipListener membershipListener) { membershipListeners.remove(membershipListener); } @Override public void addChannelListener(ChannelListener channelListener) { if (!this.channelListeners.contains(channelListener)) { this.channelListeners.add(channelListener); } else { throw new IllegalArgumentException(sm.getString("groupChannel.listener.alreadyExist", channelListener, channelListener.getClass().getName())); } } @Override public void removeChannelListener(ChannelListener channelListener) { channelListeners.remove(channelListener); } @Override public Iterator<ChannelInterceptor> getInterceptors() { return new InterceptorIterator(this.getNext(), this.coordinator); } /** * Enables/disables the option check<br> * Setting this to true, will make the GroupChannel perform a conflict check on the interceptors. If two * interceptors are using the same option flag and throw an error upon start. * * @param optionCheck boolean */ public void setOptionCheck(boolean optionCheck) { this.optionCheck = optionCheck; } /** * Configure local heartbeat sleep time<br> * Only used when <code>getHeartbeat()==true</code> * * @param heartbeatSleeptime long - time in milliseconds to sleep between heartbeats */ public void setHeartbeatSleeptime(long heartbeatSleeptime) { this.heartbeatSleeptime = heartbeatSleeptime; } /** * Enables or disables local heartbeat. if <code>setHeartbeat(true)</code> is invoked then the channel will start an * internal thread to invoke <code>Channel.heartbeat()</code> every <code>getHeartbeatSleeptime</code> milliseconds * * @param heartbeat boolean */ @Override public void setHeartbeat(boolean heartbeat) { this.heartbeat = heartbeat; } @Override public boolean getOptionCheck() { return optionCheck; } @Override public boolean getHeartbeat() { return heartbeat; } /** * @return the sleep time in milliseconds that the internal heartbeat will sleep in between invocations of * <code>Channel.heartbeat()</code> */ @Override public long getHeartbeatSleeptime() { return heartbeatSleeptime; } @Override public String getName() { return name; } @Override public void setName(String name) { this.name = name; } @Override public boolean isJmxEnabled() { return jmxEnabled; } @Override public void setJmxEnabled(boolean jmxEnabled) { this.jmxEnabled = jmxEnabled; } @Override public String getJmxDomain() { return jmxDomain; } @Override public void setJmxDomain(String jmxDomain) { this.jmxDomain = jmxDomain; } @Override public String getJmxPrefix() { return jmxPrefix; } @Override public void setJmxPrefix(String jmxPrefix) { this.jmxPrefix = jmxPrefix; } @Override public ObjectName preRegister(MBeanServer server, ObjectName name) throws Exception { // NOOP return null; } @Override public void postRegister(Boolean registrationDone) { // NOOP } @Override public void preDeregister() throws Exception { // NOOP } @Override public void postDeregister() { JmxRegistry.removeRegistry(this, true); } /** * An iterator to loop through the interceptors in a channel. */ public static class InterceptorIterator implements Iterator<ChannelInterceptor> { private final ChannelInterceptor end; private ChannelInterceptor start; public InterceptorIterator(ChannelInterceptor start, ChannelInterceptor end) { this.end = end; this.start = start; } @Override public boolean hasNext() { return start != null && start != end; } @Override public ChannelInterceptor next() { ChannelInterceptor result = null; if (hasNext()) { result = start; start = start.getNext(); } return result; } @Override public void remove() { // empty operation } } /** * <p> * Title: Internal heartbeat runnable * </p> * <p> * Description: if <code>Channel.getHeartbeat()==true</code> then a thread of this class is created * </p> */ public class HeartbeatRunnable implements Runnable { @Override public void run() { heartbeat(); } } }
Detected license expression
apache-2.0
Detected license expression (SPDX)
Apache-2.0
Percentage of license text
5.26
Copyrights

      
    
Holders

      
    
Authors

      
    
License detections License expression License expression SPDX
apache_2_0-4bde3f57-78aa-4201-96bf-531cba09e7de apache-2.0 Apache-2.0
URL Start line End line
http://www.apache.org/licenses/LICENSE-2.0 9 9