From a7c7a331313dfac618c0e8c2c3f1e4dd6c4e5d29 Mon Sep 17 00:00:00 2001 From: Vishesh Date: Fri, 3 Nov 2023 21:26:15 +0530 Subject: [PATCH] Apple base418 agent lock during reconnect (#340) Co-authored-by: Marcus Sorensen --- .../src/main/java/com/cloud/agent/Agent.java | 4 + .../java/com/cloud/agent/api/PingAnswer.java | 13 +- .../cloud/agent/manager/AgentManagerImpl.java | 154 +++++++++++------- test/integration/smoke/test_host_ping.py | 102 ++++++++++++ 4 files changed, 216 insertions(+), 57 deletions(-) create mode 100644 test/integration/smoke/test_host_ping.py diff --git a/agent/src/main/java/com/cloud/agent/Agent.java b/agent/src/main/java/com/cloud/agent/Agent.java index b1ec592b9fe..9e0ee746c03 100644 --- a/agent/src/main/java/com/cloud/agent/Agent.java +++ b/agent/src/main/java/com/cloud/agent/Agent.java @@ -42,6 +42,7 @@ import javax.naming.ConfigurationException; import com.cloud.resource.AgentStatusUpdater; import com.cloud.resource.ResourceStatusUpdater; +import com.cloud.agent.api.PingAnswer; import com.cloud.utils.NumbersUtil; import org.apache.cloudstack.agent.lb.SetupMSListAnswer; import org.apache.cloudstack.agent.lb.SetupMSListCommand; @@ -842,6 +843,9 @@ public class Agent implements HandlerFactory, IAgentControl, AgentStatusUpdater listener.processControlResponse(response, (AgentControlAnswer)answer); } } + } else if (answer instanceof PingAnswer && (((PingAnswer) answer).isSendStartup()) && _reconnectAllowed) { + s_logger.info("Management server requested startup command to reinitialize the agent"); + sendStartup(link); } else { setLastPingResponseTime(); } diff --git a/core/src/main/java/com/cloud/agent/api/PingAnswer.java b/core/src/main/java/com/cloud/agent/api/PingAnswer.java index 35242380739..6353b121583 100644 --- a/core/src/main/java/com/cloud/agent/api/PingAnswer.java +++ b/core/src/main/java/com/cloud/agent/api/PingAnswer.java @@ -22,15 +22,26 @@ package com.cloud.agent.api; public class PingAnswer extends Answer { private PingCommand _command = null; + private boolean sendStartup = false; + protected PingAnswer() { } - public PingAnswer(PingCommand cmd) { + public PingAnswer(PingCommand cmd, boolean sendStartup) { super(cmd); _command = cmd; + this.sendStartup = sendStartup; } public PingCommand getCommand() { return _command; } + + public boolean isSendStartup() { + return sendStartup; + } + + public void setSendStartup(boolean sendStartup) { + this.sendStartup = sendStartup; + } } diff --git a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java index abdee769c1a..d8fb69ff1dd 100644 --- a/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java +++ b/engine/orchestration/src/main/java/com/cloud/agent/manager/AgentManagerImpl.java @@ -40,6 +40,7 @@ import javax.naming.ConfigurationException; import com.cloud.configuration.Config; import com.cloud.utils.NumbersUtil; +import com.cloud.utils.db.GlobalLock; import org.apache.cloudstack.agent.lb.IndirectAgentLB; import org.apache.cloudstack.ca.CAManager; import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService; @@ -799,49 +800,65 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl return true; } + protected Status getNextStatusOnDisconnection(Host host, final Status.Event event) { + final Status currentStatus = host.getStatus(); + final Status nextStatus; + if (currentStatus == Status.Down || currentStatus == Status.Alert || currentStatus == Status.Removed) { + if (s_logger.isDebugEnabled()) { + s_logger.debug(String.format("Host %s is already %s", host.getUuid(), currentStatus)); + } + nextStatus = currentStatus; + } else { + try { + nextStatus = currentStatus.getNextStatus(event); + } catch (final NoTransitionException e) { + final String err = String.format("Cannot find next status for %s as current status is %s for agent %s", event, currentStatus, host.getUuid()); + s_logger.debug(err); + throw new CloudRuntimeException(err); + } + + if (s_logger.isDebugEnabled()) { + s_logger.debug(String.format("The next status of agent %s is %s, current status is %s", host.getUuid(), nextStatus, currentStatus)); + } + } + return nextStatus; + } + protected boolean handleDisconnectWithoutInvestigation(final AgentAttache attache, final Status.Event event, final boolean transitState, final boolean removeAgent) { final long hostId = attache.getId(); - s_logger.info("Host " + hostId + " is disconnecting with event " + event); - Status nextStatus = null; - final HostVO host = _hostDao.findById(hostId); - if (host == null) { - s_logger.warn("Can't find host with " + hostId); - nextStatus = Status.Removed; - } else { - final Status currentStatus = host.getStatus(); - if (currentStatus == Status.Down || currentStatus == Status.Alert || currentStatus == Status.Removed) { - if (s_logger.isDebugEnabled()) { - s_logger.debug("Host " + hostId + " is already " + currentStatus); - } - nextStatus = currentStatus; - } else { - try { - nextStatus = currentStatus.getNextStatus(event); - } catch (final NoTransitionException e) { - final String err = "Cannot find next status for " + event + " as current status is " + currentStatus + " for agent " + hostId; - s_logger.debug(err); - throw new CloudRuntimeException(err); + boolean result = false; + GlobalLock joinLock = getHostJoinLock(hostId); + if (joinLock.lock(60)) { + try { + s_logger.info(String.format("Host %d is disconnecting with event %s", hostId, event)); + Status nextStatus = null; + final HostVO host = _hostDao.findById(hostId); + if (host == null) { + s_logger.warn(String.format("Can't find host with %d", hostId)); + nextStatus = Status.Removed; + } else { + nextStatus = getNextStatusOnDisconnection(host, event); + caService.purgeHostCertificate(host); } if (s_logger.isDebugEnabled()) { - s_logger.debug("The next status of agent " + hostId + "is " + nextStatus + ", current status is " + currentStatus); + s_logger.debug(String.format("Deregistering link for %d with state %s", hostId, nextStatus)); } + + removeAgent(attache, nextStatus); + + if (host != null && transitState) { + // update the state for host in DB as per the event + disconnectAgent(host, event, _nodeId); + } + } finally { + joinLock.unlock(); } - caService.purgeHostCertificate(host); + result = true; } - - if (s_logger.isDebugEnabled()) { - s_logger.debug("Deregistering link for " + hostId + " with state " + nextStatus); - } - - removeAgent(attache, nextStatus); - // update the DB - if (host != null && transitState) { - disconnectAgent(host, event, _nodeId); - } - - return true; + joinLock.releaseRef(); + return result; } protected boolean handleDisconnectWithInvestigation(final AgentAttache attache, Status.Event event) { @@ -1102,27 +1119,24 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl return attache; } - private AgentAttache handleConnectedAgent(final Link link, final StartupCommand[] startup, final Request request) { - AgentAttache attache = null; - ReadyCommand ready = null; - try { - final List agentMSHostList = new ArrayList<>(); - String lbAlgorithm = null; - if (startup != null && startup.length > 0) { - final String agentMSHosts = startup[0].getMsHostList(); - if (StringUtils.isNotEmpty(agentMSHosts)) { - String[] msHosts = agentMSHosts.split("@"); - if (msHosts.length > 1) { - lbAlgorithm = msHosts[1]; - } - agentMSHostList.addAll(Arrays.asList(msHosts[0].split(","))); + private AgentAttache sendReadyAndGetAttache(HostVO host, ReadyCommand ready, Link link, StartupCommand[] startup) throws ConnectionException { + final List agentMSHostList = new ArrayList<>(); + String lbAlgorithm = null; + if (startup != null && startup.length > 0) { + final String agentMSHosts = startup[0].getMsHostList(); + if (StringUtils.isNotEmpty(agentMSHosts)) { + String[] msHosts = agentMSHosts.split("@"); + if (msHosts.length > 1) { + lbAlgorithm = msHosts[1]; } + agentMSHostList.addAll(Arrays.asList(msHosts[0].split(","))); } + } + AgentAttache attache = null; - final HostVO host = _resourceMgr.createHostVOForConnectedAgent(startup); - if (host != null) { - ready = new ReadyCommand(host.getDataCenterId(), host.getId(), NumbersUtil.enableHumanReadableSizes); - + GlobalLock joinLock = getHostJoinLock(host.getId()); + if (joinLock.lock(60)) { + try { if (!indirectAgentLB.compareManagementServerList(host.getId(), host.getDataCenterId(), agentMSHostList, lbAlgorithm)) { final List newMSList = indirectAgentLB.getManagementServerList(host.getId(), host.getDataCenterId(), null); ready.setMsHostList(newMSList); @@ -1133,6 +1147,24 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl attache = createAttacheForConnect(host, link); attache = notifyMonitorsOfConnection(attache, startup, false); + } finally { + joinLock.unlock(); + } + } else { + throw new ConnectionException(true, "Unable to acquire lock on host " + host.getUuid()); + } + joinLock.releaseRef(); + return attache; + } + + private AgentAttache handleConnectedAgent(final Link link, final StartupCommand[] startup, final Request request) { + AgentAttache attache = null; + ReadyCommand ready = null; + try { + final HostVO host = _resourceMgr.createHostVOForConnectedAgent(startup); + if (host != null) { + ready = new ReadyCommand(host.getDataCenterId(), host.getId(), NumbersUtil.enableHumanReadableSizes); + attache = sendReadyAndGetAttache(host, ready, link, startup); } } catch (final Exception e) { s_logger.debug("Failed to handle host connection: ", e); @@ -1312,6 +1344,8 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl connectAgent(link, cmds, request); } return; + } else if (cmd instanceof StartupCommand) { + connectAgent(link, cmds, request); } final long hostId = attache.getId(); @@ -1366,14 +1400,15 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl handleCommands(attache, request.getSequence(), new Command[] {cmd}); if (cmd instanceof PingCommand) { final long cmdHostId = ((PingCommand)cmd).getHostId(); + boolean requestStartupCommand = false; + final HostVO host = _hostDao.findById(Long.valueOf(cmdHostId)); + boolean gatewayAccessible = true; // if the router is sending a ping, verify the // gateway was pingable if (cmd instanceof PingRoutingCommand) { processPingRoutingCommand((PingRoutingCommand) cmd, hostId); - final boolean gatewayAccessible = ((PingRoutingCommand)cmd).isGatewayAccessible(); - final HostVO host = _hostDao.findById(Long.valueOf(cmdHostId)); - + gatewayAccessible = ((PingRoutingCommand)cmd).isGatewayAccessible(); if (host != null) { if (!gatewayAccessible) { // alert that host lost connection to @@ -1391,7 +1426,10 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl s_logger.debug("Not processing " + PingRoutingCommand.class.getSimpleName() + " for agent id=" + cmdHostId + "; can't find the host in the DB"); } } - answer = new PingAnswer((PingCommand)cmd); + if (host != null && host.getStatus() != Status.Up && gatewayAccessible) { + requestStartupCommand = true; + } + answer = new PingAnswer((PingCommand)cmd, requestStartupCommand); } else if (cmd instanceof ReadyAnswer) { final HostVO host = _hostDao.findById(attache.getId()); if (host == null) { @@ -1913,4 +1951,8 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl sendCommandToAgents(hostsPerZone, params); } } + + private GlobalLock getHostJoinLock(Long hostId) { + return GlobalLock.getInternLock(String.format("%s-%s", "Host-Join", hostId)); + } } diff --git a/test/integration/smoke/test_host_ping.py b/test/integration/smoke/test_host_ping.py new file mode 100644 index 00000000000..3448f7a9137 --- /dev/null +++ b/test/integration/smoke/test_host_ping.py @@ -0,0 +1,102 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" Check state transition of host from Alert to Up on Ping +""" + +# Import Local Modules +from marvin.cloudstackTestCase import * +from marvin.lib.common import * +from marvin.lib.utils import * +from nose.plugins.attrib import attr + +_multiprocess_shared_ = False + + +class TestHostPing(cloudstackTestCase): + + def setUp(self, handler=logging.StreamHandler()): + self.logger = logging.getLogger('TestHM') + self.stream_handler = handler + self.logger.setLevel(logging.DEBUG) + self.logger.addHandler(self.stream_handler) + self.apiclient = self.testClient.getApiClient() + self.hypervisor = self.testClient.getHypervisorInfo() + self.mgtSvrDetails = self.config.__dict__["mgtSvr"][0].__dict__ + self.dbConnection = self.testClient.getDbConnection() + self.services = self.testClient.getParsedTestDataConfig() + self.zone = get_zone(self.apiclient, self.testClient.getZoneForTests()) + self.pod = get_pod(self.apiclient, self.zone.id) + self.cleanup = [] + + def tearDown(self): + super(TestHostPing, self).tearDown() + + def checkHostStateInCloudstack(self, state, host_id): + try: + listHost = Host.list( + self.apiclient, + type='Routing', + zoneid=self.zone.id, + podid=self.pod.id, + id=host_id + ) + self.assertEqual( + isinstance(listHost, list), + True, + "Check if listHost returns a valid response" + ) + + self.assertEqual( + len(listHost), + 1, + "Check if listHost returns a host" + ) + self.logger.debug(" Host state is %s " % listHost[0].state) + if listHost[0].state == state: + return True, 1 + else: + return False, 1 + except Exception as e: + self.logger.debug("Got exception %s" % e) + return False, 1 + + @attr( + tags=[ + "advanced", + "advancedns", + "smoke", + "basic"], + required_hardware="true") + def test_01_host_ping_on_alert(self): + listHost = Host.list( + self.apiclient, + type='Routing', + zoneid=self.zone.id, + podid=self.pod.id, + ) + for host in listHost: + self.logger.debug('Hypervisor = {}'.format(host.id)) + + hostToTest = listHost[0] + sql_query = "UPDATE host SET status = 'Alert' WHERE uuid = '" + hostToTest.id + "'" + self.dbConnection.execute(sql_query) + + hostUpInCloudstack = wait_until(30, 8, self.checkHostStateInCloudstack, "Up", hostToTest.id) + + if not (hostUpInCloudstack): + raise self.fail("Host is not up %s, in cloudstack so failing test " % (hostToTest.ipaddress)) + return