ttomcat-1778514358873.zip-extract/apache-tomcat-11.0.18-src/java/org/apache/catalina/tribes/transport/ReceiverBase.java

Path
ttomcat-1778514358873.zip-extract/apache-tomcat-11.0.18-src/java/org/apache/catalina/tribes/transport/ReceiverBase.java
Status
scanned
Type
file
Name
ReceiverBase.java
Extension
.java
Programming language
Java
Mime type
text/plain
File type
ASCII text, with CRLF line terminators
Tag

      
    
Rootfs path

      
    
Size
17326 (16.9 KB)
MD5
eff363e7ff5512493b64a49263dc88b4
SHA1
83bf2df190adee8e5bc0ed6d90f699868b849038
SHA256
705b8842652695afe1ddf64d3b74bd9d09639fc9ee70bc805056cc599768ce0e
SHA512

      
    
SHA1_git
c9dd5e6ebe3f6452b060cc98af9577942198e570
Is binary

      
    
Is text
True
Is archive

      
    
Is media

      
    
Is legal

      
    
Is manifest

      
    
Is readme

      
    
Is top level

      
    
Is key file

      
    
ReceiverBase.java | 16.9 KB |

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.catalina.tribes.transport; import java.io.IOException; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.management.ObjectName; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.ChannelReceiver; import org.apache.catalina.tribes.MessageListener; import org.apache.catalina.tribes.io.ListenCallback; import org.apache.catalina.tribes.jmx.JmxRegistry; import org.apache.catalina.tribes.util.ExecutorFactory; import org.apache.catalina.tribes.util.StringManager; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, RxTaskPool.TaskCreator { public static final int OPTION_DIRECT_BUFFER = 0x0004; private static final Log log = LogFactory.getLog(ReceiverBase.class); private static final Object bindLock = new Object(); protected static final StringManager sm = StringManager.getManager(Constants.Package); private MessageListener listener; private String host = "auto"; private InetAddress bind; private int port = 4000; private int udpPort = -1; private int securePort = -1; private int rxBufSize = Constants.DEFAULT_CLUSTER_MSG_BUFFER_SIZE; private int txBufSize = Constants.DEFAULT_CLUSTER_ACK_BUFFER_SIZE; private int udpRxBufSize = Constants.DEFAULT_CLUSTER_MSG_BUFFER_SIZE; private int udpTxBufSize = Constants.DEFAULT_CLUSTER_ACK_BUFFER_SIZE; private volatile boolean listen = false; private RxTaskPool pool; private boolean direct = true; private long tcpSelectorTimeout = 5000; // how many times to search for an available socket private int autoBind = 100; private int maxThreads = 15; private int minThreads = 6; private int maxTasks = 100; private int minTasks = 10; private boolean tcpNoDelay = true; private boolean soKeepAlive = false; private boolean ooBInline = true; private boolean soReuseAddress = true; private boolean soLingerOn = true; private int soLingerTime = 3; private int soTrafficClass = 0x04 | 0x08 | 0x010; private int timeout = 3000; // 3 seconds private boolean useBufferPool = true; private boolean daemon = true; private long maxIdleTime = 60000; private ExecutorService executor; private Channel channel; /** * the ObjectName of this Receiver. */ private ObjectName oname = null; public ReceiverBase() { } @Override public void start() throws IOException { if (executor == null) { String channelName = ""; if (channel.getName() != null) { channelName = "[" + channel.getName() + "]"; } TaskThreadFactory tf = new TaskThreadFactory("Tribes-Task-Receiver" + channelName + "-"); executor = ExecutorFactory.newThreadPool(minThreads, maxThreads, maxIdleTime, TimeUnit.MILLISECONDS, tf); } // register jmx JmxRegistry jmxRegistry = JmxRegistry.getRegistry(channel); if (jmxRegistry != null) { this.oname = jmxRegistry.registerJmx(",component=Receiver", this); } } @Override public void stop() { if (executor != null) { executor.shutdownNow();// ignore left overs } executor = null; if (oname != null) { JmxRegistry jmxRegistry = JmxRegistry.getRegistry(channel); if (jmxRegistry != null) { jmxRegistry.unregisterJmx(oname); } oname = null; } channel = null; } @Override public MessageListener getMessageListener() { return listener; } @Override public int getPort() { return port; } public int getRxBufSize() { return rxBufSize; } public int getTxBufSize() { return txBufSize; } @Override public void setMessageListener(MessageListener listener) { this.listener = listener; } public void setRxBufSize(int rxBufSize) { this.rxBufSize = rxBufSize; } public void setTxBufSize(int txBufSize) { this.txBufSize = txBufSize; } /** * @return Returns the bind. */ public InetAddress getBind() { if (bind == null) { try { if ("auto".equals(host)) { host = InetAddress.getLocalHost().getHostAddress(); } if (log.isDebugEnabled()) { log.debug(sm.getString("receiverBase.start", host)); } bind = InetAddress.getByName(host); } catch (IOException ioe) { log.error(sm.getString("receiverBase.bind.failed", host), ioe); } } return bind; } /** * Attempts to bind using the provided port and if that fails attempts to bind to each of the ports from portstart * to (portstart + retries -1) until either there are no more ports or the bind is successful. The address to bind * to is obtained via a call to {@link #getBind()}. * * @param socket The socket to bind * @param portstart Starting port for bind attempts * @param retries Number of times to attempt to bind (port incremented between attempts) * * @throws IOException Socket bind error */ protected void bind(ServerSocket socket, int portstart, int retries) throws IOException { synchronized (bindLock) { InetSocketAddress addr = null; int port = portstart; while (retries > 0) { try { addr = new InetSocketAddress(getBind(), port); socket.bind(addr); setPort(port); log.info(sm.getString("receiverBase.socket.bind", addr)); retries = 0; } catch (IOException ioe) { retries--; if (retries <= 0) { log.info(sm.getString("receiverBase.unable.bind", addr)); throw ioe; } port++; } } } } /** * Same as bind() except it does it for the UDP port * * @param socket The socket to bind * @param portstart Starting port for bind attempts * @param retries Number of times to attempt to bind (port incremented between attempts) * * @return int The retry count * * @throws IOException Socket bind error */ protected int bindUdp(DatagramSocket socket, int portstart, int retries) throws IOException { InetSocketAddress addr = null; while (retries > 0) { try { addr = new InetSocketAddress(getBind(), portstart); socket.bind(addr); setUdpPort(portstart); log.info(sm.getString("receiverBase.udp.bind", addr)); return 0; } catch (IOException ioe) { retries--; if (retries <= 0) { log.info(sm.getString("receiverBase.unable.bind.udp", addr)); throw ioe; } portstart++; try { Thread.sleep(25); } catch (InterruptedException ti) { Thread.currentThread().interrupt(); } retries = bindUdp(socket, portstart, retries); } } return retries; } @Override public void messageDataReceived(ChannelMessage data) { if (this.listener != null) { if (listener.accept(data)) { listener.messageReceived(data); } } } public int getWorkerThreadOptions() { int options = 0; if (getDirect()) { options = options | OPTION_DIRECT_BUFFER; } return options; } /** * @param bind The bind to set. */ public void setBind(InetAddress bind) { this.bind = bind; } public boolean getDirect() { return direct; } public void setDirect(boolean direct) { this.direct = direct; } public String getAddress() { getBind(); return this.host; } @Override public String getHost() { return getAddress(); } public long getSelectorTimeout() { return tcpSelectorTimeout; } public boolean doListen() { return listen; } public MessageListener getListener() { return listener; } public RxTaskPool getTaskPool() { return pool; } public int getAutoBind() { return autoBind; } public int getMaxThreads() { return maxThreads; } public int getMinThreads() { return minThreads; } public boolean getTcpNoDelay() { return tcpNoDelay; } public boolean getSoKeepAlive() { return soKeepAlive; } public boolean getOoBInline() { return ooBInline; } public boolean getSoLingerOn() { return soLingerOn; } public int getSoLingerTime() { return soLingerTime; } public boolean getSoReuseAddress() { return soReuseAddress; } public int getSoTrafficClass() { return soTrafficClass; } public int getTimeout() { return timeout; } public boolean getUseBufferPool() { return useBufferPool; } @Override public int getSecurePort() { return securePort; } public int getMinTasks() { return minTasks; } public int getMaxTasks() { return maxTasks; } public ExecutorService getExecutor() { return executor; } public boolean isListening() { return listen; } public void setSelectorTimeout(long selTimeout) { tcpSelectorTimeout = selTimeout; } public void setListen(boolean doListen) { this.listen = doListen; } public void setAddress(String host) { this.host = host; } public void setHost(String host) { setAddress(host); } public void setListener(MessageListener listener) { this.listener = listener; } public void setPool(RxTaskPool pool) { this.pool = pool; } public void setPort(int port) { this.port = port; } public void setAutoBind(int autoBind) { this.autoBind = autoBind; if (this.autoBind <= 0) { this.autoBind = 1; } } public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; } public void setMinThreads(int minThreads) { this.minThreads = minThreads; } public void setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; } public void setSoKeepAlive(boolean soKeepAlive) { this.soKeepAlive = soKeepAlive; } public void setOoBInline(boolean ooBInline) { this.ooBInline = ooBInline; } public void setSoLingerOn(boolean soLingerOn) { this.soLingerOn = soLingerOn; } public void setSoLingerTime(int soLingerTime) { this.soLingerTime = soLingerTime; } public void setSoReuseAddress(boolean soReuseAddress) { this.soReuseAddress = soReuseAddress; } public void setSoTrafficClass(int soTrafficClass) { this.soTrafficClass = soTrafficClass; } public void setTimeout(int timeout) { this.timeout = timeout; } public void setUseBufferPool(boolean useBufferPool) { this.useBufferPool = useBufferPool; } public void setSecurePort(int securePort) { this.securePort = securePort; } public void setMinTasks(int minTasks) { this.minTasks = minTasks; } public void setMaxTasks(int maxTasks) { this.maxTasks = maxTasks; } public void setExecutor(ExecutorService executor) { this.executor = executor; } @Override public void heartbeat() { // empty operation } @Override public int getUdpPort() { return udpPort; } public void setUdpPort(int udpPort) { this.udpPort = udpPort; } public int getUdpRxBufSize() { return udpRxBufSize; } public void setUdpRxBufSize(int udpRxBufSize) { this.udpRxBufSize = udpRxBufSize; } public int getUdpTxBufSize() { return udpTxBufSize; } public void setUdpTxBufSize(int udpTxBufSize) { this.udpTxBufSize = udpTxBufSize; } @Override public Channel getChannel() { return channel; } @Override public void setChannel(Channel channel) { this.channel = channel; } // ---------------------------------------------- stats of the thread pool /** * Return the current number of threads that are managed by the pool. * * @return the current number of threads that are managed by the pool */ public int getPoolSize() { if (executor instanceof ThreadPoolExecutor) { return ((ThreadPoolExecutor) executor).getPoolSize(); } else { return -1; } } /** * Return the current number of threads that are in use. * * @return the current number of threads that are in use */ public int getActiveCount() { if (executor instanceof ThreadPoolExecutor) { return ((ThreadPoolExecutor) executor).getActiveCount(); } else { return -1; } } /** * Return the total number of tasks that have ever been scheduled for execution by the pool. * * @return the total number of tasks that have ever been scheduled for execution by the pool */ public long getTaskCount() { if (executor instanceof ThreadPoolExecutor) { return ((ThreadPoolExecutor) executor).getTaskCount(); } else { return -1; } } /** * Return the total number of tasks that have completed execution by the pool. * * @return the total number of tasks that have completed execution by the pool */ public long getCompletedTaskCount() { if (executor instanceof ThreadPoolExecutor) { return ((ThreadPoolExecutor) executor).getCompletedTaskCount(); } else { return -1; } } // ---------------------------------------------- ThreadFactory Inner Class class TaskThreadFactory implements ThreadFactory { final ThreadGroup group; final AtomicInteger threadNumber = new AtomicInteger(1); final String namePrefix; TaskThreadFactory(String namePrefix) { group = Thread.currentThread().getThreadGroup(); this.namePrefix = namePrefix; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement()); t.setDaemon(daemon); t.setPriority(Thread.NORM_PRIORITY); return t; } } public boolean isDaemon() { return daemon; } public long getMaxIdleTime() { return maxIdleTime; } public void setDaemon(boolean daemon) { this.daemon = daemon; } public void setMaxIdleTime(long maxIdleTime) { this.maxIdleTime = maxIdleTime; } }
Detected license expression
apache-2.0
Detected license expression (SPDX)
Apache-2.0
Percentage of license text
7.66
Copyrights

      
    
Holders

      
    
Authors

      
    
License detections License expression License expression SPDX
apache_2_0-4bde3f57-78aa-4201-96bf-531cba09e7de apache-2.0 Apache-2.0
URL Start line End line
http://www.apache.org/licenses/LICENSE-2.0 9 9