ttomcat-1778514358873.zip-extract/apache-tomcat-11.0.18-src/java/org/apache/catalina/tribes/group/RpcChannel.java

Path
ttomcat-1778514358873.zip-extract/apache-tomcat-11.0.18-src/java/org/apache/catalina/tribes/group/RpcChannel.java
Status
scanned
Type
file
Name
RpcChannel.java
Extension
.java
Programming language
Java
Mime type
text/plain
File type
ASCII text, with CRLF line terminators
Tag

      
    
Rootfs path

      
    
Size
11574 (11.3 KB)
MD5
bf08e7d8b45947a21969e346ae02a81f
SHA1
321bcd40c6b31077d55552e1eda834ab669b7988
SHA256
2419cbe926c5978c9d2103a398f4fa069dcb62d48858c1f02b7aa57998547f73
SHA512

      
    
SHA1_git
39b41f0e974b883058bc52a27ae7f8b2a85fa294
Is binary

      
    
Is text
True
Is archive

      
    
Is media

      
    
Is legal

      
    
Is manifest

      
    
Is readme

      
    
Is top level

      
    
Is key file

      
    
RpcChannel.java | 11.3 KB |

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.catalina.tribes.group; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelException; import org.apache.catalina.tribes.ChannelListener; import org.apache.catalina.tribes.ErrorHandler; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.UniqueId; import org.apache.catalina.tribes.util.StringManager; import org.apache.catalina.tribes.util.UUIDGenerator; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; /** * A channel to handle RPC messaging */ public class RpcChannel implements ChannelListener { private static final Log log = LogFactory.getLog(RpcChannel.class); protected static final StringManager sm = StringManager.getManager(RpcChannel.class); public static final int FIRST_REPLY = 1; public static final int MAJORITY_REPLY = 2; public static final int ALL_REPLY = 3; public static final int NO_REPLY = 4; private Channel channel; private RpcCallback callback; private byte[] rpcId; private int replyMessageOptions = 0; private final ConcurrentMap<RpcCollectorKey,RpcCollector> responseMap = new ConcurrentHashMap<>(); /** * Create an RPC channel. You can have several RPC channels attached to a group all separated out by the uniqueness * * @param rpcId - the unique Id for this RPC group * @param channel Channel * @param callback RpcCallback */ public RpcChannel(byte[] rpcId, Channel channel, RpcCallback callback) { this.channel = channel; this.callback = callback; this.rpcId = rpcId; channel.addChannelListener(this); } /** * Send a message and wait for the response. * * @param destination Member[] - the destination for the message, and the members you request a reply from * @param message Serializable - the message you are sending out * @param rpcOptions int - FIRST_REPLY, MAJORITY_REPLY or ALL_REPLY * @param channelOptions channel sender options * @param timeout long - timeout in milliseconds, if no reply is received within this time null is returned * * @return Response[] - an array of response objects. * * @throws ChannelException Error sending message */ public Response[] send(Member[] destination, Serializable message, int rpcOptions, int channelOptions, long timeout) throws ChannelException { if (destination == null || destination.length == 0) { return new Response[0]; } // avoid deadlock int sendOptions = channelOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK; RpcCollectorKey key = new RpcCollectorKey(UUIDGenerator.randomUUID(false)); RpcCollector collector = new RpcCollector(key, rpcOptions, destination.length); try { synchronized (collector) { if (rpcOptions != NO_REPLY) { responseMap.put(key, collector); } RpcMessage rmsg = new RpcMessage(rpcId, key.id, message); channel.send(destination, rmsg, sendOptions); if (rpcOptions != NO_REPLY) { long timeoutExpiry = System.nanoTime() + timeout * 1_000_000; while (!collector.isComplete() && timeout > 0) { collector.wait(timeout); timeout = (timeoutExpiry - System.nanoTime()) / 1_000_000; } } } } catch (InterruptedException ix) { Thread.currentThread().interrupt(); } finally { responseMap.remove(key); } return collector.getResponses(); } @Override public void messageReceived(Serializable msg, Member sender) { RpcMessage rmsg = (RpcMessage) msg; RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid); if (rmsg.reply) { RpcCollector collector = responseMap.get(key); if (collector == null) { if (!(rmsg instanceof RpcMessage.NoRpcChannelReply)) { callback.leftOver(rmsg.message, sender); } } else { synchronized (collector) { // make sure it hasn't been removed if (responseMap.containsKey(key)) { if ((rmsg instanceof RpcMessage.NoRpcChannelReply)) { collector.destcnt--; } else { collector.addResponse(rmsg.message, sender); } if (collector.isComplete()) { collector.notifyAll(); } } else { if (!(rmsg instanceof RpcMessage.NoRpcChannelReply)) { callback.leftOver(rmsg.message, sender); } } } // synchronized } // end if } else { boolean finished = false; final ExtendedRpcCallback excallback = (callback instanceof ExtendedRpcCallback) ? ((ExtendedRpcCallback) callback) : null; boolean asyncReply = ((replyMessageOptions & Channel.SEND_OPTIONS_ASYNCHRONOUS) == Channel.SEND_OPTIONS_ASYNCHRONOUS); Serializable reply = callback.replyRequest(rmsg.message, sender); ErrorHandler handler = null; final Serializable request = msg; final Serializable response = reply; final Member fsender = sender; if (excallback != null && asyncReply) { handler = new ErrorHandler() { @Override public void handleError(ChannelException x, UniqueId id) { excallback.replyFailed(request, response, fsender, x); } @Override public void handleCompletion(UniqueId id) { excallback.replySucceeded(request, response, fsender); } }; } rmsg.reply = true; rmsg.message = reply; try { if (handler != null) { channel.send(new Member[] { sender }, rmsg, replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK, handler); } else { channel.send(new Member[] { sender }, rmsg, replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK); } finished = true; } catch (Exception e) { if (excallback != null && !asyncReply) { excallback.replyFailed(rmsg.message, reply, sender, e); } else { log.error(sm.getString("rpcChannel.replyFailed"), e); } } if (finished && excallback != null && !asyncReply) { excallback.replySucceeded(rmsg.message, reply, sender); } } // end if } public void breakdown() { channel.removeChannelListener(this); } @Override public boolean accept(Serializable msg, Member sender) { if (msg instanceof RpcMessage rmsg) { return Arrays.equals(rmsg.rpcId, rpcId); } else { return false; } } public Channel getChannel() { return channel; } public RpcCallback getCallback() { return callback; } public byte[] getRpcId() { return rpcId; } public void setChannel(Channel channel) { this.channel = channel; } public void setCallback(RpcCallback callback) { this.callback = callback; } public void setRpcId(byte[] rpcId) { this.rpcId = rpcId; } public int getReplyMessageOptions() { return replyMessageOptions; } public void setReplyMessageOptions(int replyMessageOptions) { this.replyMessageOptions = replyMessageOptions; } /** * Class that holds all response. */ public static class RpcCollector { public final ArrayList<Response> responses = new ArrayList<>(); public final RpcCollectorKey key; public final int options; public int destcnt; public RpcCollector(RpcCollectorKey key, int options, int destcnt) { this.key = key; this.options = options; this.destcnt = destcnt; } public void addResponse(Serializable message, Member sender) { Response resp = new Response(sender, message); responses.add(resp); } public boolean isComplete() { if (destcnt <= 0) { return true; } return switch (options) { case ALL_REPLY -> destcnt == responses.size(); case MAJORITY_REPLY -> { float perc = ((float) responses.size()) / ((float) destcnt); yield perc >= 0.50f; } case FIRST_REPLY -> !responses.isEmpty(); default -> false; }; } @Override public int hashCode() { return key.hashCode(); } @Override public boolean equals(Object o) { if (o instanceof RpcCollector r) { return r.key.equals(this.key); } else { return false; } } public Response[] getResponses() { return responses.toArray(new Response[0]); } } public static class RpcCollectorKey { final byte[] id; public RpcCollectorKey(byte[] id) { this.id = id; } @Override public int hashCode() { return id[0] + id[1] + id[2] + id[3]; } @Override public boolean equals(Object o) { if (o instanceof RpcCollectorKey r) { return Arrays.equals(id, r.id); } else { return false; } } } }
Detected license expression
apache-2.0
Detected license expression (SPDX)
Apache-2.0
Percentage of license text
11.67
Copyrights

      
    
Holders

      
    
Authors

      
    
License detections License expression License expression SPDX
apache_2_0-4bde3f57-78aa-4201-96bf-531cba09e7de apache-2.0 Apache-2.0
URL Start line End line
http://www.apache.org/licenses/LICENSE-2.0 9 9