diff --git a/engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java index dd3666e5561..724b824942b 100644 --- a/engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java +++ b/engine/orchestration/src/main/java/com/cloud/agent/manager/ClusteredAgentManagerImpl.java @@ -191,7 +191,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust scanDirectAgentToLoad(); } - private void scanDirectAgentToLoad() { + protected void scanDirectAgentToLoad() { logger.trace("Begin scanning directly connected hosts"); // for agents that are self-managed, threshold to be considered as disconnected after pingtimeout @@ -212,11 +212,21 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust logger.info("{} is detected down, but we have a forward attache running, disconnect this one before launching the host", host); removeAgent(agentattache, Status.Disconnected); } else { - continue; + logger.debug("Host {} status is {} but has an AgentAttache which is not forForward, try to load directly", host, host.getStatus()); + Status hostStatus = investigate(agentattache); + if (Status.Up == hostStatus) { + /* Got ping response from host, bring it back */ + logger.info("After investigation, Agent for host {} is determined to be up and running", host); + agentStatusTransitTo(host, Event.Ping, _nodeId); + } else { + logger.debug("After investigation, AgentAttache is not null but host status is {}, try to load directly {}", hostStatus, host); + loadDirectlyConnectedHost(host, false); + } } + } else { + logger.debug("AgentAttache is null, loading directly connected {}", host); + loadDirectlyConnectedHost(host, false); } - logger.debug("Loading directly connected {}", host); - loadDirectlyConnectedHost(host, false); } catch (final Throwable e) { logger.warn(" can not load directly connected {} due to ", host, e); } @@ -362,20 +372,20 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust return; } if (!result) { - throw new CloudRuntimeException("Failed to propagate agent change request event:" + Event.ShutdownRequested + " to host:" + hostId); + throw new CloudRuntimeException(String.format("Failed to propagate agent change request event: %s to host: %s", Event.ShutdownRequested, hostId)); } } public void notifyNodesInCluster(final AgentAttache attache) { logger.debug("Notifying other nodes of to disconnect"); - final Command[] cmds = new Command[] {new ChangeAgentCommand(attache.getId(), Event.AgentDisconnected)}; + final Command[] cmds = new Command[]{new ChangeAgentCommand(attache.getId(), Event.AgentDisconnected)}; _clusterMgr.broadcast(attache.getId(), _gson.toJson(cmds)); } // notifies MS peers to schedule a host scan task immediately, triggered during addHost operation public void notifyNodesInClusterToScheduleHostScanTask() { logger.debug("Notifying other MS nodes to run host scan task"); - final Command[] cmds = new Command[] {new ScheduleHostScanTaskCommand()}; + final Command[] cmds = new Command[]{new ScheduleHostScanTaskCommand()}; _clusterMgr.broadcast(0, _gson.toJson(cmds)); } @@ -416,7 +426,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } try { logD(bytes, "Routing to peer"); - Link.write(ch, new ByteBuffer[] {ByteBuffer.wrap(bytes)}, sslEngine); + Link.write(ch, new ByteBuffer[]{ByteBuffer.wrap(bytes)}, sslEngine); return true; } catch (final IOException e) { try { @@ -625,7 +635,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } final Request req = Request.parse(data); final Command[] cmds = req.getCommands(); - final CancelCommand cancel = (CancelCommand)cmds[0]; + final CancelCommand cancel = (CancelCommand) cmds[0]; logD(data, "Cancel request received"); agent.cancel(cancel.getSequence()); final Long current = agent._currentSequence; @@ -652,7 +662,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust return; } else { if (agent instanceof Routable) { - final Routable cluster = (Routable)agent; + final Routable cluster = (Routable) agent; cluster.routeToAgent(data); } else { agent.send(Request.parse(data)); @@ -669,7 +679,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust if (mgmtId != -1 && mgmtId != _nodeId) { routeToPeer(Long.toString(mgmtId), data); if (Request.requiresSequentialExecution(data)) { - final AgentAttache attache = (AgentAttache)link.attachment(); + final AgentAttache attache = (AgentAttache) link.attachment(); if (attache != null) { attache.sendNext(Request.getSequence(data)); } @@ -933,7 +943,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust if (_agentToTransferIds.size() > 0) { logger.debug("Found {} agents to transfer", _agentToTransferIds.size()); // for (Long hostId : _agentToTransferIds) { - for (final Iterator iterator = _agentToTransferIds.iterator(); iterator.hasNext();) { + for (final Iterator iterator = _agentToTransferIds.iterator(); iterator.hasNext(); ) { final Long hostId = iterator.next(); final AgentAttache attache = findAttache(hostId); @@ -1074,7 +1084,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust return; } - final ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)attache; + final ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache) attache; if (success) { @@ -1125,10 +1135,10 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } synchronized (_agents) { - final ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache)_agents.get(hostId); + final ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache) _agents.get(hostId); if (attache != null && attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) { handleDisconnectWithoutInvestigation(attache, Event.StartAgentRebalance, true, true); - final ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)createAttache(host); + final ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache) createAttache(host); if (forwardAttache == null) { logger.warn("Unable to create a forward attache for the host {} as a part of rebalance process", host); return false; @@ -1232,7 +1242,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) { // intercepted - final ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0]; + final ChangeAgentCommand cmd = (ChangeAgentCommand) cmds[0]; logger.debug("Intercepting command for agent change: agent {} event: {}", cmd.getAgentId(), cmd.getEvent()); boolean result = false; @@ -1249,7 +1259,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust answers[0] = new ChangeAgentAnswer(cmd, result); return _gson.toJson(answers); } else if (cmds.length == 1 && cmds[0] instanceof TransferAgentCommand) { - final TransferAgentCommand cmd = (TransferAgentCommand)cmds[0]; + final TransferAgentCommand cmd = (TransferAgentCommand) cmds[0]; logger.debug("Intercepting command for agent rebalancing: agent {} event: {}", cmd.getAgentId(), cmd.getEvent()); boolean result = false; @@ -1268,7 +1278,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust answers[0] = new Answer(cmd, result, null); return _gson.toJson(answers); } else if (cmds.length == 1 && cmds[0] instanceof PropagateResourceEventCommand) { - final PropagateResourceEventCommand cmd = (PropagateResourceEventCommand)cmds[0]; + final PropagateResourceEventCommand cmd = (PropagateResourceEventCommand) cmds[0]; logger.debug("Intercepting command to propagate event {} for host {} ({})", () -> cmd.getEvent().name(), cmd::getHostId, () -> _hostDao.findById(cmd.getHostId())); @@ -1285,10 +1295,10 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust answers[0] = new Answer(cmd, result, null); return _gson.toJson(answers); } else if (cmds.length == 1 && cmds[0] instanceof ScheduleHostScanTaskCommand) { - final ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand)cmds[0]; + final ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand) cmds[0]; return handleScheduleHostScanTaskCommand(cmd); } else if (cmds.length == 1 && cmds[0] instanceof BaseShutdownManagementServerHostCommand) { - final BaseShutdownManagementServerHostCommand cmd = (BaseShutdownManagementServerHostCommand)cmds[0]; + final BaseShutdownManagementServerHostCommand cmd = (BaseShutdownManagementServerHostCommand) cmds[0]; return handleShutdownManagementServerHostCommand(cmd); } @@ -1323,7 +1333,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust try { shutdownManager.prepareForShutdown(); return "Successfully prepared for shutdown"; - } catch(CloudRuntimeException e) { + } catch (CloudRuntimeException e) { return e.getMessage(); } } @@ -1332,7 +1342,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust try { shutdownManager.triggerShutdown(); return "Successfully triggered shutdown"; - } catch(CloudRuntimeException e) { + } catch (CloudRuntimeException e) { return e.getMessage(); } } @@ -1341,7 +1351,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust try { shutdownManager.cancelShutdown(); return "Successfully prepared for shutdown"; - } catch(CloudRuntimeException e) { + } catch (CloudRuntimeException e) { return e.getMessage(); } } diff --git a/engine/orchestration/src/test/java/com/cloud/agent/manager/ClusteredAgentManagerImplTest.java b/engine/orchestration/src/test/java/com/cloud/agent/manager/ClusteredAgentManagerImplTest.java new file mode 100644 index 00000000000..5e4678f6222 --- /dev/null +++ b/engine/orchestration/src/test/java/com/cloud/agent/manager/ClusteredAgentManagerImplTest.java @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.cloud.agent.manager; + +import com.cloud.configuration.ManagementServiceConfiguration; +import com.cloud.ha.HighAvailabilityManagerImpl; +import com.cloud.host.HostVO; +import com.cloud.host.Status; +import com.cloud.host.dao.HostDao; +import com.cloud.resource.ResourceManagerImpl; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class ClusteredAgentManagerImplTest { + + private HostDao _hostDao; + @Mock + ManagementServiceConfiguration _mgmtServiceConf; + + @Before + public void setUp() throws Exception { + _hostDao = mock(HostDao.class); + } + + @Test + public void scanDirectAgentToLoadNoHostsTest() { + ClusteredAgentManagerImpl clusteredAgentManagerImpl = mock(ClusteredAgentManagerImpl.class); + clusteredAgentManagerImpl._hostDao = _hostDao; + clusteredAgentManagerImpl.scanDirectAgentToLoad(); + verify(clusteredAgentManagerImpl, never()).findAttache(anyLong()); + verify(clusteredAgentManagerImpl, never()).loadDirectlyConnectedHost(any(), anyBoolean()); + } + + @Test + public void scanDirectAgentToLoadHostWithoutAttacheTest() { + // Arrange + ClusteredAgentManagerImpl clusteredAgentManagerImpl = Mockito.spy(ClusteredAgentManagerImpl.class); + HostVO hostVO = mock(HostVO.class); + clusteredAgentManagerImpl._hostDao = _hostDao; + clusteredAgentManagerImpl.mgmtServiceConf = _mgmtServiceConf; + clusteredAgentManagerImpl._resourceMgr = mock(ResourceManagerImpl.class); + when(_mgmtServiceConf.getTimeout()).thenReturn(16000L); + when(hostVO.getId()).thenReturn(1L); + List hosts = new ArrayList<>(); + hosts.add(hostVO); + when(_hostDao.findAndUpdateDirectAgentToLoad(anyLong(), anyLong(), anyLong())).thenReturn(hosts); + AgentAttache agentAttache = mock(AgentAttache.class); + doReturn(Boolean.TRUE).when(clusteredAgentManagerImpl).loadDirectlyConnectedHost(hostVO, false); + clusteredAgentManagerImpl.scanDirectAgentToLoad(); + verify(clusteredAgentManagerImpl).loadDirectlyConnectedHost(hostVO, false); + } + + @Test + public void scanDirectAgentToLoadHostWithForwardAttacheTest() { + ClusteredAgentManagerImpl clusteredAgentManagerImpl = Mockito.spy(ClusteredAgentManagerImpl.class); + HostVO hostVO = mock(HostVO.class); + clusteredAgentManagerImpl._hostDao = _hostDao; + clusteredAgentManagerImpl.mgmtServiceConf = _mgmtServiceConf; + when(_mgmtServiceConf.getTimeout()).thenReturn(16000L); + when(hostVO.getId()).thenReturn(1L); + List hosts = new ArrayList<>(); + hosts.add(hostVO); + when(_hostDao.findAndUpdateDirectAgentToLoad(anyLong(), anyLong(), anyLong())).thenReturn(hosts); + AgentAttache agentAttache = mock(AgentAttache.class); + when(agentAttache.forForward()).thenReturn(Boolean.TRUE); + when(clusteredAgentManagerImpl.findAttache(1L)).thenReturn(agentAttache); + + clusteredAgentManagerImpl.scanDirectAgentToLoad(); + verify(clusteredAgentManagerImpl).removeAgent(agentAttache, Status.Disconnected); + } + + @Test + public void scanDirectAgentToLoadHostWithNonForwardAttacheTest() { + // Arrange + ClusteredAgentManagerImpl clusteredAgentManagerImpl = Mockito.spy(new ClusteredAgentManagerImpl()); + HostVO hostVO = mock(HostVO.class); + clusteredAgentManagerImpl._hostDao = _hostDao; + clusteredAgentManagerImpl.mgmtServiceConf = _mgmtServiceConf; + clusteredAgentManagerImpl._haMgr = mock(HighAvailabilityManagerImpl.class); + when(_mgmtServiceConf.getTimeout()).thenReturn(16000L); + when(hostVO.getId()).thenReturn(0L); + List hosts = new ArrayList<>(); + hosts.add(hostVO); + when(_hostDao.findAndUpdateDirectAgentToLoad(anyLong(), anyLong(), anyLong())).thenReturn(hosts); + + AgentAttache agentAttache = mock(AgentAttache.class); + when(agentAttache.forForward()).thenReturn(Boolean.FALSE); + when(clusteredAgentManagerImpl.findAttache(0L)).thenReturn(agentAttache); + doReturn(Boolean.TRUE).when(clusteredAgentManagerImpl).agentStatusTransitTo(hostVO, Status.Event.Ping, clusteredAgentManagerImpl._nodeId); + doReturn(Status.Up).when(clusteredAgentManagerImpl).investigate(agentAttache); + + clusteredAgentManagerImpl.scanDirectAgentToLoad(); + verify(clusteredAgentManagerImpl).investigate(agentAttache); + verify(clusteredAgentManagerImpl).agentStatusTransitTo(hostVO, Status.Event.Ping, clusteredAgentManagerImpl._nodeId); + } + + @Test + public void scanDirectAgentToLoadHostWithNonForwardAttacheAndDisconnectedTest() { + ClusteredAgentManagerImpl clusteredAgentManagerImpl = Mockito.spy(ClusteredAgentManagerImpl.class); + HostVO hostVO = mock(HostVO.class); + clusteredAgentManagerImpl._hostDao = _hostDao; + clusteredAgentManagerImpl.mgmtServiceConf = _mgmtServiceConf; + clusteredAgentManagerImpl._haMgr = mock(HighAvailabilityManagerImpl.class); + clusteredAgentManagerImpl._resourceMgr = mock(ResourceManagerImpl.class); + when(_mgmtServiceConf.getTimeout()).thenReturn(16000L); + when(hostVO.getId()).thenReturn(0L); + List hosts = new ArrayList<>(); + hosts.add(hostVO); + when(_hostDao.findAndUpdateDirectAgentToLoad(anyLong(), anyLong(), anyLong())).thenReturn(hosts); + AgentAttache agentAttache = mock(AgentAttache.class); + when(agentAttache.forForward()).thenReturn(Boolean.FALSE); + when(clusteredAgentManagerImpl.findAttache(0L)).thenReturn(agentAttache); + doReturn(Boolean.TRUE).when(clusteredAgentManagerImpl).loadDirectlyConnectedHost(hostVO, false); + clusteredAgentManagerImpl.scanDirectAgentToLoad(); + verify(clusteredAgentManagerImpl).investigate(agentAttache); + verify(clusteredAgentManagerImpl).loadDirectlyConnectedHost(hostVO, false); + } +}