Imporve socketChannel closing in NioConnection (#10895)

This commit is contained in:
Harikrishna 2025-05-23 13:13:04 +05:30 committed by GitHub
parent f496ed6eaf
commit 99863c2fa5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 48 additions and 24 deletions

View File

@ -34,11 +34,11 @@ import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.security.GeneralSecurityException; import java.security.GeneralSecurityException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -80,7 +80,7 @@ public abstract class NioConnection implements Callable<Boolean> {
protected ExecutorService _executor; protected ExecutorService _executor;
protected ExecutorService _sslHandshakeExecutor; protected ExecutorService _sslHandshakeExecutor;
protected CAService caService; protected CAService caService;
protected Set<SocketChannel> socketChannels = new HashSet<>(); protected Set<SocketChannel> socketChannels = ConcurrentHashMap.newKeySet();
protected Integer sslHandshakeTimeout = null; protected Integer sslHandshakeTimeout = null;
private final int factoryMaxNewConnectionsCount; private final int factoryMaxNewConnectionsCount;
protected boolean blockNewConnections; protected boolean blockNewConnections;
@ -219,7 +219,7 @@ public abstract class NioConnection implements Callable<Boolean> {
return true; return true;
} }
abstract void init() throws IOException; protected abstract void init() throws IOException;
abstract void registerLink(InetSocketAddress saddr, Link link); abstract void registerLink(InetSocketAddress saddr, Link link);
@ -489,16 +489,47 @@ public abstract class NioConnection implements Callable<Boolean> {
} }
protected void closeConnection(final SelectionKey key) { protected void closeConnection(final SelectionKey key) {
if (key != null) { if (key == null) {
final SocketChannel channel = (SocketChannel)key.channel(); return;
key.cancel(); }
SocketChannel channel = null;
try {
// 1. Check type and handle potential CancelledKeyException
if (key.isValid() && key.channel() instanceof SocketChannel) {
channel = (SocketChannel) key.channel();
}
} catch (CancelledKeyException e) {
logger.trace("Key already cancelled when trying to get channel in closeConnection.");
}
// 2. Cancel the key (safe to call even if already cancelled)
key.cancel();
if (channel == null) {
logger.trace("Channel was null, invalid, or not a SocketChannel for key: " + key);
return;
}
// 3. Try to close the channel if we obtained it
if (channel != null) {
closeChannel(channel);
} else {
logger.trace("Channel was null, invalid, or not a SocketChannel for key: " + key);
}
}
private void closeChannel(SocketChannel channel) {
if (channel != null && channel.isOpen()) {
try { try {
if (channel != null) { logger.debug("Closing socket " + channel.socket());
logger.debug("Closing socket {}", channel.socket()); channel.close();
channel.close(); } catch (IOException ignore) {
} logger.warn(String.format("[ignored] Exception closing channel: %s, due to %s", channel, ignore.getMessage()));
} catch (final IOException ignore) { } catch (Exception e) {
logger.info("[ignored] channel"); logger.warn(String.format("Unexpected exception in closing channel %s", channel), e);
} finally {
socketChannels.remove(channel);
} }
} }
} }
@ -530,14 +561,7 @@ public abstract class NioConnection implements Callable<Boolean> {
/* Release the resource used by the instance */ /* Release the resource used by the instance */
public void cleanUp() throws IOException { public void cleanUp() throws IOException {
for (SocketChannel channel : socketChannels) { for (SocketChannel channel : socketChannels) {
if (channel != null && channel.isOpen()) { closeChannel(channel);
try {
logger.info("Closing connection: {}", channel.getRemoteAddress());
channel.close();
} catch (IOException e) {
logger.warn("Unable to close connection due to {}", e.getMessage());
}
}
} }
if (_selector != null) { if (_selector != null) {
_selector.close(); _selector.close();

View File

@ -25,7 +25,7 @@ import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.nio.channels.spi.SelectorProvider; import java.nio.channels.spi.SelectorProvider;
import java.util.WeakHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.cloudstack.framework.ca.CAService; import org.apache.cloudstack.framework.ca.CAService;
@ -34,15 +34,15 @@ public class NioServer extends NioConnection {
protected InetSocketAddress localAddress; protected InetSocketAddress localAddress;
private ServerSocketChannel serverSocket; private ServerSocketChannel serverSocket;
protected WeakHashMap<InetSocketAddress, Link> links; protected ConcurrentHashMap<InetSocketAddress, Link> links;
public NioServer(final String name, final int port, final int workers, final HandlerFactory factory, public NioServer(final String name, final int port, final int workers, final HandlerFactory factory,
final CAService caService, final Integer sslHandShakeTimeout) { final CAService caService, final Integer sslHandShakeTimeout) {
super(name, port, workers,factory); super(name, port, workers, factory);
setCAService(caService); setCAService(caService);
setSslHandshakeTimeout(sslHandShakeTimeout); setSslHandshakeTimeout(sslHandShakeTimeout);
localAddress = null; localAddress = null;
links = new WeakHashMap<>(1024); links = new ConcurrentHashMap<>(1024);
} }
public int getPort() { public int getPort() {