Agent manager connection handling improvements (#11376)

* Agent manager connection handling improvements

* Fix to send LB check interval in ready command
This commit is contained in:
Suresh Kumar Anaparti 2025-08-05 15:07:02 +05:30 committed by GitHub
parent d7b7bd53ad
commit 4c3f29de1e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 106 additions and 78 deletions

View File

@ -475,7 +475,7 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater
return;
}
logger.info("Scheduling a recurring preferred host checker task with lb algorithm '{}' and host.lb.interval={} ms", lbAlgorithm, checkInterval);
logger.info("Scheduling a recurring preferred host checker task with host.lb.interval={} ms", checkInterval);
hostLbCheckExecutor = Executors.newSingleThreadScheduledExecutor((new NamedThreadFactory(name)));
hostLbCheckExecutor.scheduleAtFixedRate(new PreferredHostCheckerTask(), checkInterval, checkInterval,
TimeUnit.MILLISECONDS);

View File

@ -26,10 +26,6 @@ import java.util.List;
public class ReadyCommand extends Command {
private String _details;
public ReadyCommand() {
super();
}
private Long dcId;
private Long hostId;
private String hostUuid;
@ -41,6 +37,10 @@ public class ReadyCommand extends Command {
private Boolean enableHumanReadableSizes;
private String arch;
public ReadyCommand() {
super();
}
public ReadyCommand(Long dcId) {
super();
this.dcId = dcId;
@ -95,7 +95,7 @@ public class ReadyCommand extends Command {
return avoidMsHostList;
}
public void setAvoidMsHostList(List<String> msHostList) {
public void setAvoidMsHostList(List<String> avoidMsHostList) {
this.avoidMsHostList = avoidMsHostList;
}

View File

@ -27,6 +27,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@ -758,15 +759,15 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
}
}
protected AgentAttache notifyMonitorsOfConnection(final AgentAttache attache, final StartupCommand[] cmd, final boolean forRebalance) throws ConnectionException {
protected AgentAttache notifyMonitorsOfConnection(final AgentAttache attache, final StartupCommand[] cmds, final boolean forRebalance) throws ConnectionException {
final long hostId = attache.getId();
final HostVO host = _hostDao.findById(hostId);
for (final Pair<Integer, Listener> monitor : _hostMonitors) {
logger.debug("Sending Connect to listener: {}, for rebalance: {}", monitor.second().getClass().getSimpleName(), forRebalance);
for (int i = 0; i < cmd.length; i++) {
for (StartupCommand cmd : cmds) {
try {
logger.debug("process connection to issue: {} for host: {}, forRebalance: {}, connection transferred: {}", ReflectionToStringBuilderUtils.reflectCollection(cmd[i]), hostId, forRebalance, cmd[i].isConnectionTransferred());
monitor.second().processConnect(host, cmd[i], forRebalance);
logger.debug("process connection to issue: {} for host: {}, forRebalance: {}", ReflectionToStringBuilderUtils.reflectOnlySelectedFields(cmd, "id", "type", "msHostList", "connectionTransferred"), hostId, forRebalance);
monitor.second().processConnect(host, cmd, forRebalance);
} catch (final ConnectionException ce) {
if (ce.isSetupError()) {
logger.warn("Monitor {} says there is an error in the connect process for {} due to {}", monitor.second().getClass().getSimpleName(), hostId, ce.getMessage());
@ -1040,39 +1041,50 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
protected boolean handleDisconnectWithoutInvestigation(final AgentAttache attache, final Status.Event event, final boolean transitState, final boolean removeAgent) {
final long hostId = attache.getId();
final HostVO host = _hostDao.findById(hostId);
boolean result = false;
GlobalLock joinLock = getHostJoinLock(hostId);
if (joinLock.lock(60)) {
try {
logger.info("Host {} is disconnecting with event {}",
attache, event);
Status nextStatus;
final HostVO host = _hostDao.findById(hostId);
if (host == null) {
logger.warn("Can't find host with {} ({})", hostId, attache);
nextStatus = Status.Removed;
} else {
nextStatus = getNextStatusOnDisconnection(host, event);
caService.purgeHostCertificate(host);
}
logger.debug("Deregistering link for {} with state {}", attache, nextStatus);
removeAgent(attache, nextStatus);
if (host != null && transitState) {
// update the state for host in DB as per the event
disconnectAgent(host, event, _nodeId);
}
} finally {
joinLock.unlock();
try {
if (!joinLock.lock(60)) {
logger.debug("Unable to acquire lock on host {} to process agent disconnection", Objects.toString(host, String.valueOf(hostId)));
return result;
}
logger.debug("Acquired lock on host {}, to process agent disconnection", Objects.toString(host, String.valueOf(hostId)));
disconnectHostAgent(attache, event, host, transitState, joinLock);
result = true;
} finally {
joinLock.releaseRef();
}
joinLock.releaseRef();
return result;
}
private void disconnectHostAgent(final AgentAttache attache, final Status.Event event, final HostVO host, final boolean transitState, final GlobalLock joinLock) {
try {
logger.info("Host {} is disconnecting with event {}", attache, event);
final long hostId = attache.getId();
Status nextStatus;
if (host == null) {
logger.warn("Can't find host with {} ({})", hostId, attache);
nextStatus = Status.Removed;
} else {
nextStatus = getNextStatusOnDisconnection(host, event);
caService.purgeHostCertificate(host);
}
logger.debug("Deregistering link for {} with state {}", attache, nextStatus);
removeAgent(attache, nextStatus);
if (host != null && transitState) {
// update the state for host in DB as per the event
disconnectAgent(host, event, _nodeId);
}
} finally {
joinLock.unlock();
}
}
protected boolean handleDisconnectWithInvestigation(final AgentAttache attache, Status.Event event) {
final long hostId = attache.getId();
HostVO host = _hostDao.findById(hostId);
@ -1341,45 +1353,58 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
return attache;
}
private AgentAttache sendReadyAndGetAttache(HostVO host, ReadyCommand ready, Link link, StartupCommand[] startup) throws ConnectionException {
final List<String> agentMSHostList = new ArrayList<>();
String lbAlgorithm = null;
if (startup != null && startup.length > 0) {
final String agentMSHosts = startup[0].getMsHostList();
if (StringUtils.isNotEmpty(agentMSHosts)) {
String[] msHosts = agentMSHosts.split("@");
if (msHosts.length > 1) {
lbAlgorithm = msHosts[1];
}
agentMSHostList.addAll(Arrays.asList(msHosts[0].split(",")));
}
}
ready.setArch(host.getArch().getType());
private AgentAttache sendReadyAndGetAttache(HostVO host, ReadyCommand ready, Link link, StartupCommand[] startupCmds) throws ConnectionException {
AgentAttache attache;
GlobalLock joinLock = getHostJoinLock(host.getId());
if (joinLock.lock(60)) {
try {
if (!indirectAgentLB.compareManagementServerList(host.getId(), host.getDataCenterId(), agentMSHostList, lbAlgorithm)) {
final List<String> newMSList = indirectAgentLB.getManagementServerList(host.getId(), host.getDataCenterId(), null);
ready.setMsHostList(newMSList);
final List<String> avoidMsList = _mshostDao.listNonUpStateMsIPs();
ready.setAvoidMsHostList(avoidMsList);
ready.setLbAlgorithm(indirectAgentLB.getLBAlgorithmName());
ready.setLbCheckInterval(indirectAgentLB.getLBPreferredHostCheckInterval(host.getClusterId()));
logger.debug("Agent's management server host list is not up to date, sending list update: {}", newMSList);
}
attache = createAttacheForConnect(host, link);
attache = notifyMonitorsOfConnection(attache, startup, false);
} finally {
joinLock.unlock();
try {
if (!joinLock.lock(60)) {
throw new ConnectionException(true, String.format("Unable to acquire lock on host %s, to process agent connection", host));
}
} else {
throw new ConnectionException(true,
String.format("Unable to acquire lock on host %s", host));
logger.debug("Acquired lock on host {}, to process agent connection", host);
attache = connectHostAgent(host, ready, link, startupCmds, joinLock);
} finally {
joinLock.releaseRef();
}
joinLock.releaseRef();
return attache;
}
private AgentAttache connectHostAgent(HostVO host, ReadyCommand ready, Link link, StartupCommand[] startupCmds, GlobalLock joinLock) throws ConnectionException {
AgentAttache attache;
try {
final List<String> agentMSHostList = new ArrayList<>();
String lbAlgorithm = null;
if (startupCmds != null && startupCmds.length > 0) {
final String agentMSHosts = startupCmds[0].getMsHostList();
if (StringUtils.isNotEmpty(agentMSHosts)) {
String[] msHosts = agentMSHosts.split("@");
if (msHosts.length > 1) {
lbAlgorithm = msHosts[1];
}
agentMSHostList.addAll(Arrays.asList(msHosts[0].split(",")));
}
}
if (!indirectAgentLB.compareManagementServerListAndLBAlgorithm(host.getId(), host.getDataCenterId(), agentMSHostList, lbAlgorithm)) {
final List<String> newMSList = indirectAgentLB.getManagementServerList(host.getId(), host.getDataCenterId(), null);
ready.setMsHostList(newMSList);
String newLBAlgorithm = indirectAgentLB.getLBAlgorithmName();
ready.setLbAlgorithm(newLBAlgorithm);
logger.debug("Agent's management server host list or lb algorithm is not up to date, sending list and algorithm update: {}, {}", newMSList, newLBAlgorithm);
}
final List<String> avoidMsList = _mshostDao.listNonUpStateMsIPs();
ready.setAvoidMsHostList(avoidMsList);
ready.setLbCheckInterval(indirectAgentLB.getLBPreferredHostCheckInterval(host.getClusterId()));
ready.setArch(host.getArch().getType());
attache = createAttacheForConnect(host, link);
attache = notifyMonitorsOfConnection(attache, startupCmds, false);
} finally {
joinLock.unlock();
}
return attache;
}
@ -1666,7 +1691,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
logger.debug("Not processing {} for agent id={}; can't find the host in the DB", PingRoutingCommand.class.getSimpleName(), cmdHostId);
}
}
if (host!= null && host.getStatus() != Status.Up && gatewayAccessible) {
if (host != null && host.getStatus() != Status.Up && gatewayAccessible) {
requestStartupCommand = true;
}
final List<String> avoidMsList = _mshostDao.listNonUpStateMsIPs();
@ -1821,11 +1846,11 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
return false;
}
private void disconnectInternal(final long hostId, final Status.Event event, final boolean invstigate) {
private void disconnectInternal(final long hostId, final Status.Event event, final boolean investigate) {
final AgentAttache attache = findAttache(hostId);
if (attache != null) {
if (!invstigate) {
if (!investigate) {
disconnectWithoutInvestigation(attache, event);
} else {
disconnectWithInvestigation(attache, event);

View File

@ -54,8 +54,10 @@ public class ConnectedAgentAttache extends AgentAttache {
@Override
public void disconnect(final Status state) {
synchronized (this) {
logger.debug("Processing Disconnect.");
logger.debug("Processing disconnect [id: {}, uuid: {}, name: {}]", _id, _uuid, _name);
if (_link != null) {
logger.debug("Disconnecting from {}, Socket Address: {}", _link.getIpAddress(), _link.getSocketAddress());
_link.close();
_link.terminated();
}

View File

@ -48,13 +48,14 @@ public interface IndirectAgentLB {
List<String> getManagementServerList(Long hostId, Long dcId, List<Long> orderedHostIdList, String lbAlgorithm);
/**
* Compares received management server list against expected list for a host in a zone.
* Compares received management server list against expected list for a host in a zone and LB algorithm.
* @param hostId host id
* @param dcId zone id
* @param receivedMSHosts received management server list
* @return true if mgmtHosts is up to date, false if not
* @param lbAlgorithm received LB algorithm
* @return true if mgmtHosts and LB algorithm are up to date, false if not
*/
boolean compareManagementServerList(Long hostId, Long dcId, List<String> receivedMSHosts, String lbAlgorithm);
boolean compareManagementServerListAndLBAlgorithm(Long hostId, Long dcId, List<String> receivedMSHosts, String lbAlgorithm);
/**
* Returns the configure LB algorithm

View File

@ -149,7 +149,7 @@ public class IndirectAgentLBServiceImpl extends ComponentLifecycleBase implement
}
@Override
public boolean compareManagementServerList(final Long hostId, final Long dcId, final List<String> receivedMSHosts, final String lbAlgorithm) {
public boolean compareManagementServerListAndLBAlgorithm(final Long hostId, final Long dcId, final List<String> receivedMSHosts, final String lbAlgorithm) {
if (receivedMSHosts == null || receivedMSHosts.isEmpty()) {
return false;
}