ttomcat-1778514358873.zip-extract/apache-tomcat-11.0.18-src/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java

Path
ttomcat-1778514358873.zip-extract/apache-tomcat-11.0.18-src/java/org/apache/catalina/tribes/membership/StaticMembershipProvider.java
Status
scanned
Type
file
Name
StaticMembershipProvider.java
Extension
.java
Programming language
Java
Mime type
text/plain
File type
ASCII text, with CRLF line terminators
Tag

      
    
Rootfs path

      
    
Size
15456 (15.1 KB)
MD5
4b4190e9d5a27e506be01292df9ca0be
SHA1
eb89513d5e3d7c164ed52de476130294f1a5edeb
SHA256
3bab1fc888efaf6a343e02188d6f82519bb6bee942bf793665ca5bcc2ee6eb1c
SHA512

      
    
SHA1_git
69b2a9ca152f5902abd799cbef41c402459d919a
Is binary

      
    
Is text
True
Is archive

      
    
Is media

      
    
Is legal

      
    
Is manifest

      
    
Is readme

      
    
Is top level

      
    
Is key file

      
    
StaticMembershipProvider.java | 15.1 KB |

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.catalina.tribes.membership; import java.io.Serial; import java.io.Serializable; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Properties; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelException; import org.apache.catalina.tribes.ChannelException.FaultyMember; import org.apache.catalina.tribes.ChannelListener; import org.apache.catalina.tribes.Heartbeat; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.group.Response; import org.apache.catalina.tribes.group.RpcCallback; import org.apache.catalina.tribes.group.RpcChannel; import org.apache.catalina.tribes.util.Arrays; import org.apache.catalina.tribes.util.ExceptionUtils; import org.apache.catalina.tribes.util.StringManager; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; public class StaticMembershipProvider extends MembershipProviderBase implements RpcCallback, ChannelListener, Heartbeat { protected static final StringManager sm = StringManager.getManager(StaticMembershipProvider.class); private static final Log log = LogFactory.getLog(StaticMembershipProvider.class); protected Channel channel; protected RpcChannel rpcChannel; private String membershipName = null; private byte[] membershipId = null; protected ArrayList<StaticMember> staticMembers; protected int sendOptions = Channel.SEND_OPTIONS_ASYNCHRONOUS; protected long expirationTime = 5000; protected int connectTimeout = 500; protected long rpcTimeout = 3000; protected int startLevel = 0; // for ping thread protected boolean useThread = false; protected long pingInterval = 1000; protected volatile boolean running = true; protected PingThread thread = null; @Override public void init(Properties properties) throws Exception { String expirationTimeStr = properties.getProperty("expirationTime"); this.expirationTime = Long.parseLong(expirationTimeStr); String connectTimeoutStr = properties.getProperty("connectTimeout"); this.connectTimeout = Integer.parseInt(connectTimeoutStr); String rpcTimeouStr = properties.getProperty("rpcTimeout"); this.rpcTimeout = Long.parseLong(rpcTimeouStr); this.membershipName = properties.getProperty("membershipName"); this.membershipId = membershipName.getBytes(StandardCharsets.ISO_8859_1); membership = new Membership(service.getLocalMember(true)); this.rpcChannel = new RpcChannel(this.membershipId, channel, this); this.channel.addChannelListener(this); String useThreadStr = properties.getProperty("useThread"); this.useThread = Boolean.parseBoolean(useThreadStr); String pingIntervalStr = properties.getProperty("pingInterval"); this.pingInterval = Long.parseLong(pingIntervalStr); } @Override public void start(int level) throws Exception { if (Channel.MBR_RX_SEQ == (level & Channel.MBR_RX_SEQ)) { // no-op } if (Channel.MBR_TX_SEQ == (level & Channel.MBR_TX_SEQ)) { // no-op } startLevel = (startLevel | level); if (startLevel == (Channel.MBR_RX_SEQ | Channel.MBR_TX_SEQ)) { startMembership(getAliveMembers(staticMembers.toArray(new Member[0]))); running = true; if (thread == null && useThread) { thread = new PingThread(); thread.setDaemon(true); thread.setName("StaticMembership.PingThread[" + this.channel.getName() + "]"); thread.start(); } } } @Override public boolean stop(int level) throws Exception { if (Channel.MBR_RX_SEQ == (level & Channel.MBR_RX_SEQ)) { // no-op } if (Channel.MBR_TX_SEQ == (level & Channel.MBR_TX_SEQ)) { // no-op } startLevel = (startLevel & (~level)); if (startLevel == 0) { running = false; if (thread != null) { thread.interrupt(); thread = null; } if (this.rpcChannel != null) { this.rpcChannel.breakdown(); } if (this.channel != null) { try { stopMembership(this.getMembers()); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // Otherwise ignore } this.channel.removeChannelListener(this); this.channel = null; } this.rpcChannel = null; this.membership.reset(); } return (startLevel == 0); } protected void startMembership(Member[] members) throws ChannelException { if (members.length == 0) { return; } MemberMessage msg = new MemberMessage(membershipId, MemberMessage.MSG_START, service.getLocalMember(true)); Response[] resp = rpcChannel.send(members, msg, RpcChannel.ALL_REPLY, sendOptions, rpcTimeout); if (resp.length > 0) { for (Response response : resp) { messageReceived(response.getMessage(), response.getSource()); } } else { log.warn(sm.getString("staticMembershipProvider.startMembership.noReplies")); } } protected Member setupMember(Member mbr) { // no-op return mbr; } protected void memberAdded(Member member) { Member mbr = setupMember(member); if (membership.memberAlive(mbr)) { Runnable r = () -> { Thread currentThread = Thread.currentThread(); String name = currentThread.getName(); try { currentThread.setName("StaticMembership-memberAdded"); membershipListener.memberAdded(mbr); } finally { currentThread.setName(name); } }; executor.execute(r); } } protected void memberDisappeared(Member member) { membership.removeMember(member); Runnable r = () -> { Thread currentThread = Thread.currentThread(); String name = currentThread.getName(); try { currentThread.setName("StaticMembership-memberDisappeared"); membershipListener.memberDisappeared(member); } finally { currentThread.setName(name); } }; executor.execute(r); } protected void memberAlive(Member member) { if (!membership.contains(member)) { memberAdded(member); } membership.memberAlive(member); } protected void stopMembership(Member[] members) { if (members.length == 0) { return; } Member localmember = service.getLocalMember(false); localmember.setCommand(Member.SHUTDOWN_PAYLOAD); MemberMessage msg = new MemberMessage(membershipId, MemberMessage.MSG_STOP, localmember); try { channel.send(members, msg, sendOptions); } catch (ChannelException e) { log.error(sm.getString("staticMembershipProvider.stopMembership.sendFailed"), e); } } @Override public void messageReceived(Serializable msg, Member sender) { MemberMessage memMsg = (MemberMessage) msg; Member member = memMsg.getMember(); if (memMsg.getMsgtype() == MemberMessage.MSG_START) { memberAdded(member); } else if (memMsg.getMsgtype() == MemberMessage.MSG_STOP) { memberDisappeared(member); } else if (memMsg.getMsgtype() == MemberMessage.MSG_PING) { memberAlive(member); } } @Override public boolean accept(Serializable msg, Member sender) { boolean result = false; if (msg instanceof MemberMessage) { result = Arrays.equals(this.membershipId, ((MemberMessage) msg).getMembershipId()); } return result; } @Override public Serializable replyRequest(Serializable msg, final Member sender) { if (!(msg instanceof MemberMessage memMsg)) { return null; } if (memMsg.getMsgtype() == MemberMessage.MSG_START) { messageReceived(memMsg, sender); memMsg.setMember(service.getLocalMember(true)); return memMsg; } else if (memMsg.getMsgtype() == MemberMessage.MSG_PING) { messageReceived(memMsg, sender); memMsg.setMember(service.getLocalMember(true)); return memMsg; } else { // other messages are ignored. if (log.isInfoEnabled()) { log.info(sm.getString("staticMembershipProvider.replyRequest.ignored", memMsg.getTypeDesc())); } return null; } } @Override public void leftOver(Serializable msg, Member sender) { if (!(msg instanceof MemberMessage memMsg)) { return; } if (memMsg.getMsgtype() == MemberMessage.MSG_START) { messageReceived(memMsg, sender); } else if (memMsg.getMsgtype() == MemberMessage.MSG_PING) { messageReceived(memMsg, sender); } else { // other messages are ignored. if (log.isInfoEnabled()) { log.info(sm.getString("staticMembershipProvider.leftOver.ignored", memMsg.getTypeDesc())); } } } @Override public void heartbeat() { try { if (!useThread) { ping(); } } catch (ChannelException e) { log.warn(sm.getString("staticMembershipProvider.heartbeat.failed"), e); } } protected void ping() throws ChannelException { // send ping Member[] members = getAliveMembers(staticMembers.toArray(new Member[0])); if (members.length > 0) { try { MemberMessage msg = new MemberMessage(membershipId, MemberMessage.MSG_PING, service.getLocalMember(true)); Response[] resp = rpcChannel.send(members, msg, RpcChannel.ALL_REPLY, sendOptions, rpcTimeout); for (Response response : resp) { messageReceived(response.getMessage(), response.getSource()); } } catch (ChannelException ce) { // Handle known failed members FaultyMember[] faultyMembers = ce.getFaultyMembers(); for (FaultyMember faultyMember : faultyMembers) { memberDisappeared(faultyMember.getMember()); } throw ce; } } // expire checkExpired(); } protected void checkExpired() { Member[] expired = membership.expire(expirationTime); for (Member member : expired) { membershipListener.memberDisappeared(member); } } public void setChannel(Channel channel) { this.channel = channel; } public void setStaticMembers(ArrayList<StaticMember> staticMembers) { this.staticMembers = staticMembers; } private Member[] getAliveMembers(Member[] members) { List<Member> aliveMembers = new ArrayList<>(); for (Member member : members) { try (Socket socket = new Socket()) { InetAddress ia = InetAddress.getByAddress(member.getHost()); InetSocketAddress addr = new InetSocketAddress(ia, member.getPort()); socket.connect(addr, connectTimeout); aliveMembers.add(member); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // Otherwise ignore } } return aliveMembers.toArray(new Member[0]); } // ------------------------------------------------------------------------------ // member message to send to and from other memberships // ------------------------------------------------------------------------------ public static class MemberMessage implements Serializable { @Serial private static final long serialVersionUID = 1L; public static final int MSG_START = 1; public static final int MSG_STOP = 2; public static final int MSG_PING = 3; private final int msgtype; private final byte[] membershipId; private Member member; public MemberMessage(byte[] membershipId, int msgtype, Member member) { this.membershipId = membershipId; this.msgtype = msgtype; this.member = member; } public int getMsgtype() { return msgtype; } public byte[] getMembershipId() { return membershipId; } public Member getMember() { return member; } public void setMember(Member local) { this.member = local; } @Override public String toString() { return "MemberMessage[" + "name=" + new String(membershipId) + "; type=" + getTypeDesc() + "; member=" + member + ']'; } protected String getTypeDesc() { return switch (msgtype) { case MSG_START -> "MSG_START"; case MSG_STOP -> "MSG_STOP"; case MSG_PING -> "MSG_PING"; default -> "UNKNOWN"; }; } } protected class PingThread extends Thread { @Override public void run() { while (running) { try { sleep(pingInterval); ping(); } catch (InterruptedException ix) { // Ignore } catch (Exception e) { log.warn(sm.getString("staticMembershipProvider.pingThread.failed"), e); } } } } }
Detected license expression
apache-2.0
Detected license expression (SPDX)
Apache-2.0
Percentage of license text
9.14
Copyrights

      
    
Holders

      
    
Authors

      
    
License detections License expression License expression SPDX
apache_2_0-4bde3f57-78aa-4201-96bf-531cba09e7de apache-2.0 Apache-2.0
URL Start line End line
http://www.apache.org/licenses/LICENSE-2.0 9 9