From 1d4b22d6de7f28de89e592a218343bacf7c0b2b0 Mon Sep 17 00:00:00 2001 From: Alex Huang Date: Mon, 29 Jul 2013 17:28:45 -0700 Subject: [PATCH] Removed AgentMonitor and moved it inside AgentManager --- .../cloud/agent/manager/AgentManagerImpl.java | 257 ++++++++++++---- .../com/cloud/agent/manager/AgentMonitor.java | 284 ------------------ .../agent/manager/AgentMonitorService.java | 2 - .../manager/ClusteredAgentManagerImpl.java | 4 +- .../src/com/cloud/configuration/Config.java | 4 - 5 files changed, 202 insertions(+), 349 deletions(-) delete mode 100755 server/src/com/cloud/agent/manager/AgentMonitor.java diff --git a/server/src/com/cloud/agent/manager/AgentManagerImpl.java b/server/src/com/cloud/agent/manager/AgentManagerImpl.java index 37784318f84..5be7bf929c7 100755 --- a/server/src/com/cloud/agent/manager/AgentManagerImpl.java +++ b/server/src/com/cloud/agent/manager/AgentManagerImpl.java @@ -24,7 +24,6 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -41,6 +40,9 @@ import javax.naming.ConfigurationException; import org.apache.log4j.Logger; +import org.apache.cloudstack.config.ConfigDepot; +import org.apache.cloudstack.config.ConfigKey; +import org.apache.cloudstack.config.ConfigValue; import org.apache.cloudstack.context.ServerContexts; import org.apache.cloudstack.utils.identity.ManagementServerNode; @@ -100,6 +102,9 @@ import com.cloud.utils.component.ManagerBase; import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.db.DB; import com.cloud.utils.db.EntityManager; +import com.cloud.utils.db.SearchCriteria.Op; +import com.cloud.utils.db.SearchCriteria2; +import com.cloud.utils.db.SearchCriteriaService; import com.cloud.utils.db.Transaction; import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.exception.HypervisorVersionChangedException; @@ -109,28 +114,15 @@ import com.cloud.utils.nio.HandlerFactory; import com.cloud.utils.nio.Link; import com.cloud.utils.nio.NioServer; import com.cloud.utils.nio.Task; +import com.cloud.utils.time.InaccurateClock; /** * Implementation of the Agent Manager. This class controls the connection to the agents. - * - * @config {@table || Param Name | Description | Values | Default || - * || port | port to listen on for agent connection. | Integer | 8250 || - * || workers | # of worker threads | Integer | 5 || || router.ram.size | default ram for router vm in mb | Integer | 128 || - * || router.ip.address | ip address for the router | ip | 10.1.1.1 || - * || wait | Time to wait for control commands to return | seconds | 1800 || - * || domain | domain for domain routers| String | foo.com || - * || alert.wait | time to wait before alerting on a disconnected agent | seconds | 1800 || - * || update.wait | time to wait before alerting on a updating agent | seconds | 600 || - * || ping.interval | ping interval in seconds | seconds | 60 || - * || instance.name | Name of the deployment String | required || - * || start.retry | Number of times to retry start | Number | 2 || - * || ping.timeout | multiplier to ping.interval before announcing an agent has timed out | float | 2.0x || - * || router.stats.interval | interval to report router statistics | seconds | 300s || } **/ @Local(value = { AgentManager.class }) public class AgentManagerImpl extends ManagerBase implements AgentManager, HandlerFactory { - private static final Logger s_logger = Logger.getLogger(AgentManagerImpl.class); - private static final Logger status_logger = Logger.getLogger(Status.class); + protected static final Logger s_logger = Logger.getLogger(AgentManagerImpl.class); + protected static final Logger status_logger = Logger.getLogger(Status.class); protected ConcurrentHashMap _agents = new ConcurrentHashMap(10007); protected List> _hostMonitors = new ArrayList>(17); @@ -140,6 +132,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl protected int _monitorId = 0; private final Lock _agentStatusLock = new ReentrantLock(); + @Inject protected EntityManager _entityMgr; @@ -166,24 +159,40 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl protected int _retry = 2; - protected int _wait; - protected int _alertWait; + protected ConfigValue _wait; + protected ConfigValue _alertWait; protected long _nodeId = -1; - protected Random _rand = new Random(System.currentTimeMillis()); - - protected int _pingInterval; - protected long _pingTimeout; - @Inject protected AgentMonitorService _monitor; + protected ConfigValue _pingInterval; + protected ConfigValue _pingTimeout; protected ExecutorService _executor; protected ThreadPoolExecutor _connectExecutor; protected ScheduledExecutorService _directAgentExecutor; + protected ScheduledExecutorService _monitorExecutor; protected StateMachine2 _statusStateMachine = Status.getStateMachine(); + private final Map _pingMap = new ConcurrentHashMap(10007); @Inject ResourceManager _resourceMgr; + @Inject + protected ConfigDepot _configDepot; + + protected final ConfigKey Workers = new ConfigKey(Integer.class, "workers", "Advance", AgentManager.class, "5", + "Number of worker threads handling remote agent connections.", false, "5-Max Thread Limit"); + protected final ConfigKey Port = new ConfigKey(Integer.class, "port", "Advance", AgentManager.class, "8250", "Port to listen on for remote agent connections.", false, "Usable port range"); + protected final ConfigKey PingInterval = new ConfigKey(Integer.class, "ping.interval", "Advance", AgentManager.class, "60", "Interval to send application level pings to make sure the connection is still working", false, "Seconds"); + protected final ConfigKey PingTimeout = new ConfigKey(Float.class, "ping.timeout", "Advance", AgentManager.class, "2.5", "Multiplier to ping.interval before announcing an agent has timed out", true, null); + protected final ConfigKey Wait = new ConfigKey(Integer.class, "wait", "Advance", AgentManager.class, "1800", + "Time in seconds to wait for control commands to return", true, "Seconds"); + protected final ConfigKey AlertWait = new ConfigKey(Integer.class, "alert.wait", "Advance", AgentManager.class, "1800", + "Seconds to wait before alerting on a disconnected agent", true, "Seconds"); + protected final ConfigKey DirectAgentLoadSize = new ConfigKey(Integer.class, "direct.agent.load.size", "Advance", AgentManager.class, "16", + "The number of direct agents to load each time", false, null); + protected final ConfigKey DirectAgentPoolSize = new ConfigKey(Integer.class, "direct.agent.pool.size", "Advance", AgentManager.class, "500", + "Default size for DirectAgentPool", false, null); + @Override public boolean configure(final String name, final Map params) throws ConfigurationException { @@ -191,51 +200,47 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl _port = NumbersUtil.parseInt(configs.get("port"), 8250); final int workers = NumbersUtil.parseInt(configs.get("workers"), 5); - String value = configs.get(Config.PingInterval.toString()); - _pingInterval = NumbersUtil.parseInt(value, 60); + _pingInterval = _configDepot.get(PingInterval); - value = configs.get(Config.Wait.toString()); - _wait = NumbersUtil.parseInt(value, 1800); - - value = configs.get(Config.AlertWait.toString()); - _alertWait = NumbersUtil.parseInt(value, 1800); - - value = configs.get(Config.PingTimeout.toString()); - final float multiplier = value != null ? Float.parseFloat(value) : 2.5f; - _pingTimeout = (long) (multiplier * _pingInterval); + _wait = _configDepot.get(Wait); + _alertWait = _configDepot.get(AlertWait); + _pingTimeout = _configDepot.get(PingTimeout); s_logger.info("Ping Timeout is " + _pingTimeout); - value = configs.get(Config.DirectAgentLoadSize.key()); - int threads = NumbersUtil.parseInt(value, 16); + ConfigValue threads = _configDepot.get(DirectAgentLoadSize); _nodeId = ManagementServerNode.getManagementServerId(); s_logger.info("Configuring AgentManagerImpl. management server node id(msid): " + _nodeId); - long lastPing = (System.currentTimeMillis() >> 10) - _pingTimeout; + long lastPing = (System.currentTimeMillis() >> 10) - (long)(_pingTimeout.value() * _pingInterval.value()); _hostDao.markHostsAsDisconnected(_nodeId, lastPing); - // _monitor = ComponentLocator.inject(AgentMonitor.class, _nodeId, _hostDao, _vmDao, _dcDao, _podDao, this, _alertMgr, _pingTimeout); - registerForHostEvents(_monitor, true, true, false); + registerForHostEvents(new BehindOnPingListener(), true, true, false); - _executor = new ThreadPoolExecutor(threads, threads, 60l, TimeUnit.SECONDS, new LinkedBlockingQueue(), new NamedThreadFactory("AgentTaskPool")); + _executor = new ThreadPoolExecutor(threads.value(), threads.value(), 60l, TimeUnit.SECONDS, new LinkedBlockingQueue(), new NamedThreadFactory("AgentTaskPool")); - _connectExecutor = new ThreadPoolExecutor(100, 500, 60l, TimeUnit.SECONDS, - new LinkedBlockingQueue(), new NamedThreadFactory("AgentConnectTaskPool")); + _connectExecutor = new ThreadPoolExecutor(100, 500, 60l, TimeUnit.SECONDS, new LinkedBlockingQueue(), new NamedThreadFactory("AgentConnectTaskPool")); //allow core threads to time out even when there are no items in the queue _connectExecutor.allowCoreThreadTimeOut(true); _connection = new NioServer("AgentManager", _port, workers + 10, this); s_logger.info("Listening on " + _port + " with " + workers + " workers"); - value = configs.get(Config.DirectAgentPoolSize.key()); - int size = NumbersUtil.parseInt(value, 500); - _directAgentExecutor = new ScheduledThreadPoolExecutor(size, new NamedThreadFactory("DirectAgent")); - s_logger.debug("Created DirectAgentAttache pool with size: " + size); + + ConfigValue size = _configDepot.get(DirectAgentPoolSize); + _directAgentExecutor = new ScheduledThreadPoolExecutor(size.value(), new NamedThreadFactory("DirectAgent")); + s_logger.debug("Created DirectAgentAttache pool with size: " + size.value()); + + _monitorExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AgentMonitor")); return true; } + protected long getTimeout() { + return (long)(_pingTimeout.value() * _pingInterval.value()); + } + @Override public Task create(Task.Type type, Link link, byte[] data) { return new AgentHandler(type, link, data); @@ -354,7 +359,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl } protected int getPingInterval() { - return _pingInterval; + return _pingInterval.value(); } @Override @@ -389,7 +394,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl } if (timeout <= 0) { - timeout = _wait; + timeout = _wait.value(); } assert noDbTxn() : "I know, I know. Why are we so strict as to not allow txn across an agent call? ... Why are we so cruel ... Why are we such a dictator .... Too bad... Sorry...but NO AGENT COMMANDS WRAPPED WITHIN DB TRANSACTIONS!"; @@ -568,13 +573,13 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl @Override public boolean start() { startDirectlyConnectedHosts(); - if (_monitor != null) { - _monitor.startMonitoring(_pingTimeout); - } + if (_connection != null) { _connection.start(); } + _monitorExecutor.scheduleWithFixedDelay(new MonitorTask(), _pingInterval.value(), _pingInterval.value(), TimeUnit.SECONDS); + return true; } @@ -712,9 +717,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl @Override public boolean stop() { - if (_monitor != null) { - _monitor.signalStop(); - } + if (_connection != null) { _connection.stop(); } @@ -736,6 +739,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl } _connectExecutor.shutdownNow(); + _monitorExecutor.shutdownNow(); return true; } @@ -824,7 +828,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl } else if (determinedState == Status.Disconnected) { s_logger.warn("Agent is disconnected but the host is still up: " + host.getId() + "-" + host.getName()); if (currentStatus == Status.Disconnected) { - if (((System.currentTimeMillis() >> 10) - host.getLastPinged()) > _alertWait) { + if (((System.currentTimeMillis() >> 10) - host.getLastPinged()) > _alertWait.value()) { s_logger.warn("Host " + host.getId() + " has been disconnected pass the time it should be disconnected."); event = Status.Event.WaitedTooLong; } else { @@ -1404,7 +1408,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl attache = createAttacheForDirectConnect(host, resource); StartupAnswer[] answers = new StartupAnswer[cmds.length]; for (int i = 0; i < answers.length; i++) { - answers[i] = new StartupAnswer(cmds[i], attache.getId(), _pingInterval); + answers[i] = new StartupAnswer(cmds[i], attache.getId(), _pingInterval.value()); } attache.process(answers); attache = notifyMonitorsOfConnection(attache, cmds, forRebalance); @@ -1434,4 +1438,143 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl return _directAgentExecutor; } + public Long getAgentPingTime(long agentId) { + return _pingMap.get(agentId); + } + + public void pingBy(long agentId) { + _pingMap.put(agentId, InaccurateClock.getTimeInSeconds()); + } + + protected class MonitorTask implements Runnable { + @Override + public void run() { + s_logger.trace("Agent Monitor is started."); + + try { + List behindAgents = findAgentsBehindOnPing(); + for (Long agentId : behindAgents) { + SearchCriteriaService sc = SearchCriteria2.create(HostVO.class); + sc.addAnd(sc.getEntity().getId(), Op.EQ, agentId); + HostVO h = sc.find(); + if (h != null) { + ResourceState resourceState = h.getResourceState(); + if (resourceState == ResourceState.Disabled || resourceState == ResourceState.Maintenance + || resourceState == ResourceState.ErrorInMaintenance) { + /* + * Host is in non-operation state, so no + * investigation and direct put agent to + * Disconnected + */ + status_logger.debug("Ping timeout but host " + agentId + " is in resource state of " + + resourceState + ", so no investigation"); + disconnectWithoutInvestigation(agentId, Event.ShutdownRequested); + } else { + status_logger.debug("Ping timeout for host " + agentId + ", do invstigation"); + disconnectWithInvestigation(agentId, Event.PingTimeout); + } + } + } + + SearchCriteriaService sc = SearchCriteria2.create(HostVO.class); + sc.addAnd(sc.getEntity().getResourceState(), Op.IN, ResourceState.PrepareForMaintenance, ResourceState.ErrorInMaintenance); + List hosts = sc.list(); + + for (HostVO host : hosts) { + long hostId = host.getId(); + DataCenterVO dcVO = _dcDao.findById(host.getDataCenterId()); + HostPodVO podVO = _podDao.findById(host.getPodId()); + String hostDesc = "name: " + host.getName() + " (id:" + hostId + "), availability zone: " + dcVO.getName() + ", pod: " + podVO.getName(); + + if (host.getType() != Host.Type.Storage) { +// List vos = _vmDao.listByHostId(hostId); +// List vosMigrating = _vmDao.listVmsMigratingFromHost(hostId); +// if (vos.isEmpty() && vosMigrating.isEmpty()) { +// _alertMgr.sendAlert(AlertManager.ALERT_TYPE_HOST, host.getDataCenterId(), host.getPodId(), "Migration Complete for host " + hostDesc, "Host [" +// + hostDesc +// + "] is ready for maintenance"); +// _resourceMgr.resourceStateTransitTo(host, ResourceState.Event.InternalEnterMaintenance, _msId); +// } + } + } + } catch (Throwable th) { + s_logger.error("Caught the following exception: ", th); + } + + s_logger.trace("Agent Monitor is leaving the building!"); + } + + protected List findAgentsBehindOnPing() { + List agentsBehind = new ArrayList(); + long cutoffTime = InaccurateClock.getTimeInSeconds() - getTimeout(); + for (Map.Entry entry : _pingMap.entrySet()) { + if (entry.getValue() < cutoffTime) { + agentsBehind.add(entry.getKey()); + } + } + + if (agentsBehind.size() > 0) { + s_logger.info("Found the following agents behind on ping: " + agentsBehind); + } + + return agentsBehind; + } + } + + protected class BehindOnPingListener implements Listener { + @Override + public boolean isRecurring() { + return true; + } + + @Override + public boolean processAnswers(long agentId, long seq, Answer[] answers) { + return false; + } + + @Override + public boolean processCommands(long agentId, long seq, Command[] commands) { + boolean processed = false; + for (Command cmd : commands) { + if (cmd instanceof PingCommand) { + pingBy(agentId); + } + } + return processed; + } + + @Override + public AgentControlAnswer processControlCommand(long agentId, AgentControlCommand cmd) { + return null; + } + + @Override + public void processConnect(Host host, StartupCommand cmd, boolean forRebalance) { + if (host.getType().equals(Host.Type.TrafficMonitor) || + host.getType().equals(Host.Type.SecondaryStorage)) { + return; + } + + // NOTE: We don't use pingBy here because we're initiating. + _pingMap.put(host.getId(), InaccurateClock.getTimeInSeconds()); + } + + @Override + public boolean processDisconnect(long agentId, Status state) { + _pingMap.remove(agentId); + return true; + } + + @Override + public boolean processTimeout(long agentId, long seq) { + return true; + } + + @Override + public int getTimeout() { + return -1; + } + + } + } diff --git a/server/src/com/cloud/agent/manager/AgentMonitor.java b/server/src/com/cloud/agent/manager/AgentMonitor.java deleted file mode 100755 index be850318967..00000000000 --- a/server/src/com/cloud/agent/manager/AgentMonitor.java +++ /dev/null @@ -1,284 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -package com.cloud.agent.manager; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import javax.inject.Inject; - -import org.apache.log4j.Logger; -import org.springframework.stereotype.Component; - -import com.cloud.agent.api.AgentControlAnswer; -import com.cloud.agent.api.AgentControlCommand; -import com.cloud.agent.api.Answer; -import com.cloud.agent.api.Command; -import com.cloud.agent.api.PingCommand; -import com.cloud.agent.api.StartupCommand; -import com.cloud.alert.AlertManager; -import com.cloud.dc.DataCenterVO; -import com.cloud.dc.HostPodVO; -import com.cloud.dc.dao.ClusterDao; -import com.cloud.dc.dao.DataCenterDao; -import com.cloud.dc.dao.HostPodDao; -import com.cloud.host.Host; -import com.cloud.host.HostVO; -import com.cloud.host.Status; -import com.cloud.host.Status.Event; -import com.cloud.host.dao.HostDao; -import com.cloud.resource.ResourceManager; -import com.cloud.resource.ResourceState; -import com.cloud.utils.db.DB; -import com.cloud.utils.db.SearchCriteria.Op; -import com.cloud.utils.db.SearchCriteria2; -import com.cloud.utils.db.SearchCriteriaService; -import com.cloud.utils.time.InaccurateClock; -import com.cloud.vm.VMInstanceVO; -import com.cloud.vm.dao.VMInstanceDao; - -@Component -public class AgentMonitor extends Thread implements AgentMonitorService { - private static Logger s_logger = Logger.getLogger(AgentMonitor.class); - private static Logger status_Logger = Logger.getLogger(Status.class); - private long _pingTimeout = 120; // Default set to 120 seconds - @Inject private HostDao _hostDao; - private boolean _stop; - @Inject - private AgentManagerImpl _agentMgr; - @Inject private VMInstanceDao _vmDao; - @Inject private final DataCenterDao _dcDao = null; - @Inject private final HostPodDao _podDao = null; - @Inject private AlertManager _alertMgr; - private long _msId; - @Inject ClusterDao _clusterDao; - @Inject ResourceManager _resourceMgr; - - // private ConnectionConcierge _concierge; - private final Map _pingMap; - - public AgentMonitor() { - _pingMap = new ConcurrentHashMap(10007); - } - - /** - * Check if the agent is behind on ping - * - * @param agentId - * agent or host id. - * @return null if the agent is not kept here. true if behind; false if not. - */ - @Override - public Boolean isAgentBehindOnPing(long agentId) { - Long pingTime = _pingMap.get(agentId); - if (pingTime == null) { - return null; - } - return pingTime < (InaccurateClock.getTimeInSeconds() - _pingTimeout); - } - - @Override - public Long getAgentPingTime(long agentId) { - return _pingMap.get(agentId); - } - - @Override - public void pingBy(long agentId) { - _pingMap.put(agentId, InaccurateClock.getTimeInSeconds()); - } - - // TODO : use host machine time is not safe in clustering environment - @Override - public void run() { - s_logger.info("Agent Monitor is started."); - - while (!_stop) { - try { - // check every 60 seconds - Thread.sleep(60 * 1000); - } catch (InterruptedException e) { - s_logger.info("Who woke me from my slumber?"); - } - - try { - List behindAgents = findAgentsBehindOnPing(); - for (Long agentId : behindAgents) { - SearchCriteriaService sc = SearchCriteria2.create(HostVO.class); - sc.addAnd(sc.getEntity().getId(), Op.EQ, agentId); - HostVO h = sc.find(); - if (h != null) { - ResourceState resourceState = h.getResourceState(); - if (resourceState == ResourceState.Disabled || resourceState == ResourceState.Maintenance - || resourceState == ResourceState.ErrorInMaintenance) { - /* - * Host is in non-operation state, so no - * investigation and direct put agent to - * Disconnected - */ - status_Logger.debug("Ping timeout but host " + agentId + " is in resource state of " - + resourceState + ", so no investigation"); - _agentMgr.disconnectWithoutInvestigation(agentId, Event.ShutdownRequested); - } else { - status_Logger.debug("Ping timeout for host " + agentId + ", do invstigation"); - _agentMgr.disconnectWithInvestigation(agentId, Event.PingTimeout); - } - } - } - - SearchCriteriaService sc = SearchCriteria2.create(HostVO.class); - sc.addAnd(sc.getEntity().getResourceState(), Op.IN, ResourceState.PrepareForMaintenance, ResourceState.ErrorInMaintenance); - List hosts = sc.list(); - - for (HostVO host : hosts) { - long hostId = host.getId(); - DataCenterVO dcVO = _dcDao.findById(host.getDataCenterId()); - HostPodVO podVO = _podDao.findById(host.getPodId()); - String hostDesc = "name: " + host.getName() + " (id:" + hostId + "), availability zone: " + dcVO.getName() + ", pod: " + podVO.getName(); - - if (host.getType() != Host.Type.Storage) { - List vos = _vmDao.listByHostId(hostId); - List vosMigrating = _vmDao.listVmsMigratingFromHost(hostId); - if (vos.isEmpty() && vosMigrating.isEmpty()) { - _alertMgr.sendAlert(AlertManager.ALERT_TYPE_HOST, host.getDataCenterId(), host.getPodId(), "Migration Complete for host " + hostDesc, "Host [" + hostDesc + "] is ready for maintenance"); - _resourceMgr.resourceStateTransitTo(host, ResourceState.Event.InternalEnterMaintenance, _msId); - } - } - } - } catch (Throwable th) { - s_logger.error("Caught the following exception: ", th); - } - } - - s_logger.info("Agent Monitor is leaving the building!"); - } - - @Override - public void signalStop() { - _stop = true; - interrupt(); - } - - @Override - public boolean isRecurring() { - return true; - } - - @Override - public boolean processAnswers(long agentId, long seq, Answer[] answers) { - return false; - } - - @Override @DB - public boolean processCommands(long agentId, long seq, Command[] commands) { - boolean processed = false; - for (Command cmd : commands) { - if (cmd instanceof PingCommand) { - pingBy(agentId); - } - } - return processed; - } - - protected List findAgentsBehindOnPing() { - List agentsBehind = new ArrayList(); - long cutoffTime = InaccurateClock.getTimeInSeconds() - _pingTimeout; - for (Map.Entry entry : _pingMap.entrySet()) { - if (entry.getValue() < cutoffTime) { - agentsBehind.add(entry.getKey()); - } - } - - if (agentsBehind.size() > 0) { - s_logger.info("Found the following agents behind on ping: " + agentsBehind); - } - - return agentsBehind; - } - - /** - * @deprecated We're using the in-memory - */ - @Deprecated - protected List findHostsBehindOnPing() { - long time = (System.currentTimeMillis() >> 10) - _pingTimeout; - List hosts = _hostDao.findLostHosts(time); - if (s_logger.isInfoEnabled()) { - s_logger.info("Found " + hosts.size() + " hosts behind on ping. pingTimeout : " + _pingTimeout + - ", mark time : " + time); - } - - for (HostVO host : hosts) { - if (host.getType().equals(Host.Type.ExternalFirewall) || - host.getType().equals(Host.Type.ExternalLoadBalancer) || - host.getType().equals(Host.Type.TrafficMonitor) || - host.getType().equals(Host.Type.SecondaryStorage)) { - continue; - } - - if (host.getManagementServerId() == null || host.getManagementServerId() == _msId) { - if (s_logger.isInfoEnabled()) { - s_logger.info("Asking agent mgr to investgate why host " + host.getId() + - " is behind on ping. last ping time: " + host.getLastPinged()); - } - _agentMgr.disconnectWithInvestigation(host.getId(), Event.PingTimeout); - } - } - - return hosts; - } - - @Override - public AgentControlAnswer processControlCommand(long agentId, AgentControlCommand cmd) { - return null; - } - - @Override - public void processConnect(Host host, StartupCommand cmd, boolean forRebalance) { - if (host.getType().equals(Host.Type.TrafficMonitor) || - host.getType().equals(Host.Type.SecondaryStorage)) { - return; - } - - // NOTE: We don't use pingBy here because we're initiating. - _pingMap.put(host.getId(), InaccurateClock.getTimeInSeconds()); - } - - @Override - public boolean processDisconnect(long agentId, Status state) { - _pingMap.remove(agentId); - return true; - } - - @Override - public boolean processTimeout(long agentId, long seq) { - return true; - } - - @Override - public int getTimeout() { - return -1; - } - - @Override - public void startMonitoring(long pingTimeout) { - _pingTimeout = pingTimeout; - start(); - } -} - diff --git a/server/src/com/cloud/agent/manager/AgentMonitorService.java b/server/src/com/cloud/agent/manager/AgentMonitorService.java index 5759e5f6334..4dd2c1ef28e 100644 --- a/server/src/com/cloud/agent/manager/AgentMonitorService.java +++ b/server/src/com/cloud/agent/manager/AgentMonitorService.java @@ -20,9 +20,7 @@ import com.cloud.agent.Listener; public interface AgentMonitorService extends Listener { - public Boolean isAgentBehindOnPing(long agentId); public Long getAgentPingTime(long agentId); public void pingBy(long agentId); public void signalStop(); - public void startMonitoring(long pingTimeout); } diff --git a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java index a9a6b4a3152..91b0343cef8 100755 --- a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java +++ b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java @@ -208,7 +208,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } // for agents that are self-managed, threshold to be considered as disconnected after pingtimeout - long cutSeconds = (System.currentTimeMillis() >> 10) - (_pingTimeout); + long cutSeconds = (System.currentTimeMillis() >> 10) - getTimeout(); List hosts = _hostDao.findAndUpdateDirectAgentToLoad(cutSeconds, _loadSize.value().longValue(), _nodeId); List appliances = _hostDao.findAndUpdateApplianceToLoad(cutSeconds, _nodeId); hosts.addAll(appliances); @@ -720,7 +720,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust public void onManagementNodeLeft(List nodeList, long selfNodeId) { for (ManagementServerHostVO vo : nodeList) { s_logger.info("Marking hosts as disconnected on Management server" + vo.getMsid()); - long lastPing = (System.currentTimeMillis() >> 10) - _pingTimeout; + long lastPing = (System.currentTimeMillis() >> 10) - getTimeout(); _hostDao.markHostsAsDisconnected(vo.getMsid(), lastPing); s_logger.info("Deleting entries from op_host_transfer table for Management server " + vo.getMsid()); cleanupTransferMap(vo.getMsid()); diff --git a/server/src/com/cloud/configuration/Config.java b/server/src/com/cloud/configuration/Config.java index ff484be0c1a..1107cf1353d 100755 --- a/server/src/com/cloud/configuration/Config.java +++ b/server/src/com/cloud/configuration/Config.java @@ -165,10 +165,6 @@ public enum Config { IntegrationAPIPort("Advanced", ManagementServer.class, Integer.class, "integration.api.port", null, "Defaul API port", null), InvestigateRetryInterval("Advanced", HighAvailabilityManager.class, Integer.class, "investigate.retry.interval", "60", "Time (in seconds) between VM pings when agent is disconnected", null), MigrateRetryInterval("Advanced", HighAvailabilityManager.class, Integer.class, "migrate.retry.interval", "120", "Time (in seconds) between migration retries", null), - PingInterval("Advanced", AgentManager.class, Integer.class, "ping.interval", "60", "Ping interval in seconds", null), - PingTimeout("Advanced", AgentManager.class, Float.class, "ping.timeout", "2.5", "Multiplier to ping.interval before announcing an agent has timed out", null), - ClusterDeltaSyncInterval("Advanced", AgentManager.class, Integer.class, "sync.interval", "60", "Cluster Delta sync interval in seconds", null), - Port("Advanced", AgentManager.class, Integer.class, "port", "8250", "Port to listen on for agent connection.", null), RouterCpuMHz("Advanced", NetworkManager.class, Integer.class, "router.cpu.mhz", String.valueOf(VpcVirtualNetworkApplianceManager.DEFAULT_ROUTER_CPU_MHZ), "Default CPU speed (MHz) for router VM.", null), RestartRetryInterval("Advanced", HighAvailabilityManager.class, Integer.class, "restart.retry.interval", "600", "Time (in seconds) between retries to restart a vm", null), RouterStatsInterval("Advanced", NetworkManager.class, Integer.class, "router.stats.interval", "300", "Interval (in seconds) to report router statistics.", null),