mirror of
https://github.com/apache/cloudstack.git
synced 2025-10-26 08:42:29 +01:00
Removed AgentMonitor and moved it inside AgentManager
This commit is contained in:
parent
d0a3a69170
commit
1d4b22d6de
@ -24,7 +24,6 @@ import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
@ -41,6 +40,9 @@ import javax.naming.ConfigurationException;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import org.apache.cloudstack.config.ConfigDepot;
|
||||
import org.apache.cloudstack.config.ConfigKey;
|
||||
import org.apache.cloudstack.config.ConfigValue;
|
||||
import org.apache.cloudstack.context.ServerContexts;
|
||||
import org.apache.cloudstack.utils.identity.ManagementServerNode;
|
||||
|
||||
@ -100,6 +102,9 @@ import com.cloud.utils.component.ManagerBase;
|
||||
import com.cloud.utils.concurrency.NamedThreadFactory;
|
||||
import com.cloud.utils.db.DB;
|
||||
import com.cloud.utils.db.EntityManager;
|
||||
import com.cloud.utils.db.SearchCriteria.Op;
|
||||
import com.cloud.utils.db.SearchCriteria2;
|
||||
import com.cloud.utils.db.SearchCriteriaService;
|
||||
import com.cloud.utils.db.Transaction;
|
||||
import com.cloud.utils.exception.CloudRuntimeException;
|
||||
import com.cloud.utils.exception.HypervisorVersionChangedException;
|
||||
@ -109,28 +114,15 @@ import com.cloud.utils.nio.HandlerFactory;
|
||||
import com.cloud.utils.nio.Link;
|
||||
import com.cloud.utils.nio.NioServer;
|
||||
import com.cloud.utils.nio.Task;
|
||||
import com.cloud.utils.time.InaccurateClock;
|
||||
|
||||
/**
|
||||
* Implementation of the Agent Manager. This class controls the connection to the agents.
|
||||
*
|
||||
* @config {@table || Param Name | Description | Values | Default ||
|
||||
* || port | port to listen on for agent connection. | Integer | 8250 ||
|
||||
* || workers | # of worker threads | Integer | 5 || || router.ram.size | default ram for router vm in mb | Integer | 128 ||
|
||||
* || router.ip.address | ip address for the router | ip | 10.1.1.1 ||
|
||||
* || wait | Time to wait for control commands to return | seconds | 1800 ||
|
||||
* || domain | domain for domain routers| String | foo.com ||
|
||||
* || alert.wait | time to wait before alerting on a disconnected agent | seconds | 1800 ||
|
||||
* || update.wait | time to wait before alerting on a updating agent | seconds | 600 ||
|
||||
* || ping.interval | ping interval in seconds | seconds | 60 ||
|
||||
* || instance.name | Name of the deployment String | required ||
|
||||
* || start.retry | Number of times to retry start | Number | 2 ||
|
||||
* || ping.timeout | multiplier to ping.interval before announcing an agent has timed out | float | 2.0x ||
|
||||
* || router.stats.interval | interval to report router statistics | seconds | 300s || }
|
||||
**/
|
||||
@Local(value = { AgentManager.class })
|
||||
public class AgentManagerImpl extends ManagerBase implements AgentManager, HandlerFactory {
|
||||
private static final Logger s_logger = Logger.getLogger(AgentManagerImpl.class);
|
||||
private static final Logger status_logger = Logger.getLogger(Status.class);
|
||||
protected static final Logger s_logger = Logger.getLogger(AgentManagerImpl.class);
|
||||
protected static final Logger status_logger = Logger.getLogger(Status.class);
|
||||
|
||||
protected ConcurrentHashMap<Long, AgentAttache> _agents = new ConcurrentHashMap<Long, AgentAttache>(10007);
|
||||
protected List<Pair<Integer, Listener>> _hostMonitors = new ArrayList<Pair<Integer, Listener>>(17);
|
||||
@ -140,6 +132,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
|
||||
protected int _monitorId = 0;
|
||||
private final Lock _agentStatusLock = new ReentrantLock();
|
||||
|
||||
|
||||
@Inject
|
||||
protected EntityManager _entityMgr;
|
||||
|
||||
@ -166,24 +159,40 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
|
||||
|
||||
protected int _retry = 2;
|
||||
|
||||
protected int _wait;
|
||||
protected int _alertWait;
|
||||
protected ConfigValue<Integer> _wait;
|
||||
protected ConfigValue<Integer> _alertWait;
|
||||
protected long _nodeId = -1;
|
||||
|
||||
protected Random _rand = new Random(System.currentTimeMillis());
|
||||
|
||||
protected int _pingInterval;
|
||||
protected long _pingTimeout;
|
||||
@Inject protected AgentMonitorService _monitor;
|
||||
protected ConfigValue<Integer> _pingInterval;
|
||||
protected ConfigValue<Float> _pingTimeout;
|
||||
|
||||
protected ExecutorService _executor;
|
||||
protected ThreadPoolExecutor _connectExecutor;
|
||||
protected ScheduledExecutorService _directAgentExecutor;
|
||||
protected ScheduledExecutorService _monitorExecutor;
|
||||
|
||||
protected StateMachine2<Status, Status.Event, Host> _statusStateMachine = Status.getStateMachine();
|
||||
private final Map<Long, Long> _pingMap = new ConcurrentHashMap<Long, Long>(10007);
|
||||
|
||||
@Inject ResourceManager _resourceMgr;
|
||||
|
||||
@Inject
|
||||
protected ConfigDepot _configDepot;
|
||||
|
||||
protected final ConfigKey<Integer> Workers = new ConfigKey<Integer>(Integer.class, "workers", "Advance", AgentManager.class, "5",
|
||||
"Number of worker threads handling remote agent connections.", false, "5-Max Thread Limit");
|
||||
protected final ConfigKey<Integer> Port = new ConfigKey<Integer>(Integer.class, "port", "Advance", AgentManager.class, "8250", "Port to listen on for remote agent connections.", false, "Usable port range");
|
||||
protected final ConfigKey<Integer> PingInterval = new ConfigKey<Integer>(Integer.class, "ping.interval", "Advance", AgentManager.class, "60", "Interval to send application level pings to make sure the connection is still working", false, "Seconds");
|
||||
protected final ConfigKey<Float> PingTimeout = new ConfigKey<Float>(Float.class, "ping.timeout", "Advance", AgentManager.class, "2.5", "Multiplier to ping.interval before announcing an agent has timed out", true, null);
|
||||
protected final ConfigKey<Integer> Wait = new ConfigKey<Integer>(Integer.class, "wait", "Advance", AgentManager.class, "1800",
|
||||
"Time in seconds to wait for control commands to return", true, "Seconds");
|
||||
protected final ConfigKey<Integer> AlertWait = new ConfigKey<Integer>(Integer.class, "alert.wait", "Advance", AgentManager.class, "1800",
|
||||
"Seconds to wait before alerting on a disconnected agent", true, "Seconds");
|
||||
protected final ConfigKey<Integer> DirectAgentLoadSize = new ConfigKey<Integer>(Integer.class, "direct.agent.load.size", "Advance", AgentManager.class, "16",
|
||||
"The number of direct agents to load each time", false, null);
|
||||
protected final ConfigKey<Integer> DirectAgentPoolSize = new ConfigKey<Integer>(Integer.class, "direct.agent.pool.size", "Advance", AgentManager.class, "500",
|
||||
"Default size for DirectAgentPool", false, null);
|
||||
|
||||
@Override
|
||||
public boolean configure(final String name, final Map<String, Object> params) throws ConfigurationException {
|
||||
|
||||
@ -191,51 +200,47 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
|
||||
_port = NumbersUtil.parseInt(configs.get("port"), 8250);
|
||||
final int workers = NumbersUtil.parseInt(configs.get("workers"), 5);
|
||||
|
||||
String value = configs.get(Config.PingInterval.toString());
|
||||
_pingInterval = NumbersUtil.parseInt(value, 60);
|
||||
_pingInterval = _configDepot.get(PingInterval);
|
||||
|
||||
value = configs.get(Config.Wait.toString());
|
||||
_wait = NumbersUtil.parseInt(value, 1800);
|
||||
|
||||
value = configs.get(Config.AlertWait.toString());
|
||||
_alertWait = NumbersUtil.parseInt(value, 1800);
|
||||
|
||||
value = configs.get(Config.PingTimeout.toString());
|
||||
final float multiplier = value != null ? Float.parseFloat(value) : 2.5f;
|
||||
_pingTimeout = (long) (multiplier * _pingInterval);
|
||||
_wait = _configDepot.get(Wait);
|
||||
_alertWait = _configDepot.get(AlertWait);
|
||||
_pingTimeout = _configDepot.get(PingTimeout);
|
||||
|
||||
s_logger.info("Ping Timeout is " + _pingTimeout);
|
||||
|
||||
value = configs.get(Config.DirectAgentLoadSize.key());
|
||||
int threads = NumbersUtil.parseInt(value, 16);
|
||||
ConfigValue<Integer> threads = _configDepot.get(DirectAgentLoadSize);
|
||||
|
||||
_nodeId = ManagementServerNode.getManagementServerId();
|
||||
s_logger.info("Configuring AgentManagerImpl. management server node id(msid): " + _nodeId);
|
||||
|
||||
long lastPing = (System.currentTimeMillis() >> 10) - _pingTimeout;
|
||||
long lastPing = (System.currentTimeMillis() >> 10) - (long)(_pingTimeout.value() * _pingInterval.value());
|
||||
_hostDao.markHostsAsDisconnected(_nodeId, lastPing);
|
||||
|
||||
// _monitor = ComponentLocator.inject(AgentMonitor.class, _nodeId, _hostDao, _vmDao, _dcDao, _podDao, this, _alertMgr, _pingTimeout);
|
||||
registerForHostEvents(_monitor, true, true, false);
|
||||
registerForHostEvents(new BehindOnPingListener(), true, true, false);
|
||||
|
||||
_executor = new ThreadPoolExecutor(threads, threads, 60l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("AgentTaskPool"));
|
||||
_executor = new ThreadPoolExecutor(threads.value(), threads.value(), 60l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("AgentTaskPool"));
|
||||
|
||||
_connectExecutor = new ThreadPoolExecutor(100, 500, 60l, TimeUnit.SECONDS,
|
||||
new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("AgentConnectTaskPool"));
|
||||
_connectExecutor = new ThreadPoolExecutor(100, 500, 60l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("AgentConnectTaskPool"));
|
||||
//allow core threads to time out even when there are no items in the queue
|
||||
_connectExecutor.allowCoreThreadTimeOut(true);
|
||||
|
||||
_connection = new NioServer("AgentManager", _port, workers + 10, this);
|
||||
s_logger.info("Listening on " + _port + " with " + workers + " workers");
|
||||
|
||||
value = configs.get(Config.DirectAgentPoolSize.key());
|
||||
int size = NumbersUtil.parseInt(value, 500);
|
||||
_directAgentExecutor = new ScheduledThreadPoolExecutor(size, new NamedThreadFactory("DirectAgent"));
|
||||
s_logger.debug("Created DirectAgentAttache pool with size: " + size);
|
||||
|
||||
ConfigValue<Integer> size = _configDepot.get(DirectAgentPoolSize);
|
||||
_directAgentExecutor = new ScheduledThreadPoolExecutor(size.value(), new NamedThreadFactory("DirectAgent"));
|
||||
s_logger.debug("Created DirectAgentAttache pool with size: " + size.value());
|
||||
|
||||
_monitorExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AgentMonitor"));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
protected long getTimeout() {
|
||||
return (long)(_pingTimeout.value() * _pingInterval.value());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Task create(Task.Type type, Link link, byte[] data) {
|
||||
return new AgentHandler(type, link, data);
|
||||
@ -354,7 +359,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
|
||||
}
|
||||
|
||||
protected int getPingInterval() {
|
||||
return _pingInterval;
|
||||
return _pingInterval.value();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -389,7 +394,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
|
||||
}
|
||||
|
||||
if (timeout <= 0) {
|
||||
timeout = _wait;
|
||||
timeout = _wait.value();
|
||||
}
|
||||
assert noDbTxn() : "I know, I know. Why are we so strict as to not allow txn across an agent call? ... Why are we so cruel ... Why are we such a dictator .... Too bad... Sorry...but NO AGENT COMMANDS WRAPPED WITHIN DB TRANSACTIONS!";
|
||||
|
||||
@ -568,13 +573,13 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
|
||||
@Override
|
||||
public boolean start() {
|
||||
startDirectlyConnectedHosts();
|
||||
if (_monitor != null) {
|
||||
_monitor.startMonitoring(_pingTimeout);
|
||||
}
|
||||
|
||||
if (_connection != null) {
|
||||
_connection.start();
|
||||
}
|
||||
|
||||
_monitorExecutor.scheduleWithFixedDelay(new MonitorTask(), _pingInterval.value(), _pingInterval.value(), TimeUnit.SECONDS);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -712,9 +717,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
|
||||
|
||||
@Override
|
||||
public boolean stop() {
|
||||
if (_monitor != null) {
|
||||
_monitor.signalStop();
|
||||
}
|
||||
|
||||
if (_connection != null) {
|
||||
_connection.stop();
|
||||
}
|
||||
@ -736,6 +739,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
|
||||
}
|
||||
|
||||
_connectExecutor.shutdownNow();
|
||||
_monitorExecutor.shutdownNow();
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -824,7 +828,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
|
||||
} else if (determinedState == Status.Disconnected) {
|
||||
s_logger.warn("Agent is disconnected but the host is still up: " + host.getId() + "-" + host.getName());
|
||||
if (currentStatus == Status.Disconnected) {
|
||||
if (((System.currentTimeMillis() >> 10) - host.getLastPinged()) > _alertWait) {
|
||||
if (((System.currentTimeMillis() >> 10) - host.getLastPinged()) > _alertWait.value()) {
|
||||
s_logger.warn("Host " + host.getId() + " has been disconnected pass the time it should be disconnected.");
|
||||
event = Status.Event.WaitedTooLong;
|
||||
} else {
|
||||
@ -1404,7 +1408,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
|
||||
attache = createAttacheForDirectConnect(host, resource);
|
||||
StartupAnswer[] answers = new StartupAnswer[cmds.length];
|
||||
for (int i = 0; i < answers.length; i++) {
|
||||
answers[i] = new StartupAnswer(cmds[i], attache.getId(), _pingInterval);
|
||||
answers[i] = new StartupAnswer(cmds[i], attache.getId(), _pingInterval.value());
|
||||
}
|
||||
attache.process(answers);
|
||||
attache = notifyMonitorsOfConnection(attache, cmds, forRebalance);
|
||||
@ -1434,4 +1438,143 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
|
||||
return _directAgentExecutor;
|
||||
}
|
||||
|
||||
public Long getAgentPingTime(long agentId) {
|
||||
return _pingMap.get(agentId);
|
||||
}
|
||||
|
||||
public void pingBy(long agentId) {
|
||||
_pingMap.put(agentId, InaccurateClock.getTimeInSeconds());
|
||||
}
|
||||
|
||||
protected class MonitorTask implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
s_logger.trace("Agent Monitor is started.");
|
||||
|
||||
try {
|
||||
List<Long> behindAgents = findAgentsBehindOnPing();
|
||||
for (Long agentId : behindAgents) {
|
||||
SearchCriteriaService<HostVO, HostVO> sc = SearchCriteria2.create(HostVO.class);
|
||||
sc.addAnd(sc.getEntity().getId(), Op.EQ, agentId);
|
||||
HostVO h = sc.find();
|
||||
if (h != null) {
|
||||
ResourceState resourceState = h.getResourceState();
|
||||
if (resourceState == ResourceState.Disabled || resourceState == ResourceState.Maintenance
|
||||
|| resourceState == ResourceState.ErrorInMaintenance) {
|
||||
/*
|
||||
* Host is in non-operation state, so no
|
||||
* investigation and direct put agent to
|
||||
* Disconnected
|
||||
*/
|
||||
status_logger.debug("Ping timeout but host " + agentId + " is in resource state of "
|
||||
+ resourceState + ", so no investigation");
|
||||
disconnectWithoutInvestigation(agentId, Event.ShutdownRequested);
|
||||
} else {
|
||||
status_logger.debug("Ping timeout for host " + agentId + ", do invstigation");
|
||||
disconnectWithInvestigation(agentId, Event.PingTimeout);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SearchCriteriaService<HostVO, HostVO> sc = SearchCriteria2.create(HostVO.class);
|
||||
sc.addAnd(sc.getEntity().getResourceState(), Op.IN, ResourceState.PrepareForMaintenance, ResourceState.ErrorInMaintenance);
|
||||
List<HostVO> hosts = sc.list();
|
||||
|
||||
for (HostVO host : hosts) {
|
||||
long hostId = host.getId();
|
||||
DataCenterVO dcVO = _dcDao.findById(host.getDataCenterId());
|
||||
HostPodVO podVO = _podDao.findById(host.getPodId());
|
||||
String hostDesc = "name: " + host.getName() + " (id:" + hostId + "), availability zone: " + dcVO.getName() + ", pod: " + podVO.getName();
|
||||
|
||||
if (host.getType() != Host.Type.Storage) {
|
||||
// List<VMInstanceVO> vos = _vmDao.listByHostId(hostId);
|
||||
// List<VMInstanceVO> vosMigrating = _vmDao.listVmsMigratingFromHost(hostId);
|
||||
// if (vos.isEmpty() && vosMigrating.isEmpty()) {
|
||||
// _alertMgr.sendAlert(AlertManager.ALERT_TYPE_HOST, host.getDataCenterId(), host.getPodId(), "Migration Complete for host " + hostDesc, "Host ["
|
||||
// + hostDesc
|
||||
// + "] is ready for maintenance");
|
||||
// _resourceMgr.resourceStateTransitTo(host, ResourceState.Event.InternalEnterMaintenance, _msId);
|
||||
// }
|
||||
}
|
||||
}
|
||||
} catch (Throwable th) {
|
||||
s_logger.error("Caught the following exception: ", th);
|
||||
}
|
||||
|
||||
s_logger.trace("Agent Monitor is leaving the building!");
|
||||
}
|
||||
|
||||
protected List<Long> findAgentsBehindOnPing() {
|
||||
List<Long> agentsBehind = new ArrayList<Long>();
|
||||
long cutoffTime = InaccurateClock.getTimeInSeconds() - getTimeout();
|
||||
for (Map.Entry<Long, Long> entry : _pingMap.entrySet()) {
|
||||
if (entry.getValue() < cutoffTime) {
|
||||
agentsBehind.add(entry.getKey());
|
||||
}
|
||||
}
|
||||
|
||||
if (agentsBehind.size() > 0) {
|
||||
s_logger.info("Found the following agents behind on ping: " + agentsBehind);
|
||||
}
|
||||
|
||||
return agentsBehind;
|
||||
}
|
||||
}
|
||||
|
||||
protected class BehindOnPingListener implements Listener {
|
||||
@Override
|
||||
public boolean isRecurring() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean processAnswers(long agentId, long seq, Answer[] answers) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean processCommands(long agentId, long seq, Command[] commands) {
|
||||
boolean processed = false;
|
||||
for (Command cmd : commands) {
|
||||
if (cmd instanceof PingCommand) {
|
||||
pingBy(agentId);
|
||||
}
|
||||
}
|
||||
return processed;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AgentControlAnswer processControlCommand(long agentId, AgentControlCommand cmd) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processConnect(Host host, StartupCommand cmd, boolean forRebalance) {
|
||||
if (host.getType().equals(Host.Type.TrafficMonitor) ||
|
||||
host.getType().equals(Host.Type.SecondaryStorage)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// NOTE: We don't use pingBy here because we're initiating.
|
||||
_pingMap.put(host.getId(), InaccurateClock.getTimeInSeconds());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean processDisconnect(long agentId, Status state) {
|
||||
_pingMap.remove(agentId);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean processTimeout(long agentId, long seq) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTimeout() {
|
||||
return -1;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -1,284 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
package com.cloud.agent.manager;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import javax.inject.Inject;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import com.cloud.agent.api.AgentControlAnswer;
|
||||
import com.cloud.agent.api.AgentControlCommand;
|
||||
import com.cloud.agent.api.Answer;
|
||||
import com.cloud.agent.api.Command;
|
||||
import com.cloud.agent.api.PingCommand;
|
||||
import com.cloud.agent.api.StartupCommand;
|
||||
import com.cloud.alert.AlertManager;
|
||||
import com.cloud.dc.DataCenterVO;
|
||||
import com.cloud.dc.HostPodVO;
|
||||
import com.cloud.dc.dao.ClusterDao;
|
||||
import com.cloud.dc.dao.DataCenterDao;
|
||||
import com.cloud.dc.dao.HostPodDao;
|
||||
import com.cloud.host.Host;
|
||||
import com.cloud.host.HostVO;
|
||||
import com.cloud.host.Status;
|
||||
import com.cloud.host.Status.Event;
|
||||
import com.cloud.host.dao.HostDao;
|
||||
import com.cloud.resource.ResourceManager;
|
||||
import com.cloud.resource.ResourceState;
|
||||
import com.cloud.utils.db.DB;
|
||||
import com.cloud.utils.db.SearchCriteria.Op;
|
||||
import com.cloud.utils.db.SearchCriteria2;
|
||||
import com.cloud.utils.db.SearchCriteriaService;
|
||||
import com.cloud.utils.time.InaccurateClock;
|
||||
import com.cloud.vm.VMInstanceVO;
|
||||
import com.cloud.vm.dao.VMInstanceDao;
|
||||
|
||||
@Component
|
||||
public class AgentMonitor extends Thread implements AgentMonitorService {
|
||||
private static Logger s_logger = Logger.getLogger(AgentMonitor.class);
|
||||
private static Logger status_Logger = Logger.getLogger(Status.class);
|
||||
private long _pingTimeout = 120; // Default set to 120 seconds
|
||||
@Inject private HostDao _hostDao;
|
||||
private boolean _stop;
|
||||
@Inject
|
||||
private AgentManagerImpl _agentMgr;
|
||||
@Inject private VMInstanceDao _vmDao;
|
||||
@Inject private final DataCenterDao _dcDao = null;
|
||||
@Inject private final HostPodDao _podDao = null;
|
||||
@Inject private AlertManager _alertMgr;
|
||||
private long _msId;
|
||||
@Inject ClusterDao _clusterDao;
|
||||
@Inject ResourceManager _resourceMgr;
|
||||
|
||||
// private ConnectionConcierge _concierge;
|
||||
private final Map<Long, Long> _pingMap;
|
||||
|
||||
public AgentMonitor() {
|
||||
_pingMap = new ConcurrentHashMap<Long, Long>(10007);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the agent is behind on ping
|
||||
*
|
||||
* @param agentId
|
||||
* agent or host id.
|
||||
* @return null if the agent is not kept here. true if behind; false if not.
|
||||
*/
|
||||
@Override
|
||||
public Boolean isAgentBehindOnPing(long agentId) {
|
||||
Long pingTime = _pingMap.get(agentId);
|
||||
if (pingTime == null) {
|
||||
return null;
|
||||
}
|
||||
return pingTime < (InaccurateClock.getTimeInSeconds() - _pingTimeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long getAgentPingTime(long agentId) {
|
||||
return _pingMap.get(agentId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pingBy(long agentId) {
|
||||
_pingMap.put(agentId, InaccurateClock.getTimeInSeconds());
|
||||
}
|
||||
|
||||
// TODO : use host machine time is not safe in clustering environment
|
||||
@Override
|
||||
public void run() {
|
||||
s_logger.info("Agent Monitor is started.");
|
||||
|
||||
while (!_stop) {
|
||||
try {
|
||||
// check every 60 seconds
|
||||
Thread.sleep(60 * 1000);
|
||||
} catch (InterruptedException e) {
|
||||
s_logger.info("Who woke me from my slumber?");
|
||||
}
|
||||
|
||||
try {
|
||||
List<Long> behindAgents = findAgentsBehindOnPing();
|
||||
for (Long agentId : behindAgents) {
|
||||
SearchCriteriaService<HostVO, HostVO> sc = SearchCriteria2.create(HostVO.class);
|
||||
sc.addAnd(sc.getEntity().getId(), Op.EQ, agentId);
|
||||
HostVO h = sc.find();
|
||||
if (h != null) {
|
||||
ResourceState resourceState = h.getResourceState();
|
||||
if (resourceState == ResourceState.Disabled || resourceState == ResourceState.Maintenance
|
||||
|| resourceState == ResourceState.ErrorInMaintenance) {
|
||||
/*
|
||||
* Host is in non-operation state, so no
|
||||
* investigation and direct put agent to
|
||||
* Disconnected
|
||||
*/
|
||||
status_Logger.debug("Ping timeout but host " + agentId + " is in resource state of "
|
||||
+ resourceState + ", so no investigation");
|
||||
_agentMgr.disconnectWithoutInvestigation(agentId, Event.ShutdownRequested);
|
||||
} else {
|
||||
status_Logger.debug("Ping timeout for host " + agentId + ", do invstigation");
|
||||
_agentMgr.disconnectWithInvestigation(agentId, Event.PingTimeout);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SearchCriteriaService<HostVO, HostVO> sc = SearchCriteria2.create(HostVO.class);
|
||||
sc.addAnd(sc.getEntity().getResourceState(), Op.IN, ResourceState.PrepareForMaintenance, ResourceState.ErrorInMaintenance);
|
||||
List<HostVO> hosts = sc.list();
|
||||
|
||||
for (HostVO host : hosts) {
|
||||
long hostId = host.getId();
|
||||
DataCenterVO dcVO = _dcDao.findById(host.getDataCenterId());
|
||||
HostPodVO podVO = _podDao.findById(host.getPodId());
|
||||
String hostDesc = "name: " + host.getName() + " (id:" + hostId + "), availability zone: " + dcVO.getName() + ", pod: " + podVO.getName();
|
||||
|
||||
if (host.getType() != Host.Type.Storage) {
|
||||
List<VMInstanceVO> vos = _vmDao.listByHostId(hostId);
|
||||
List<VMInstanceVO> vosMigrating = _vmDao.listVmsMigratingFromHost(hostId);
|
||||
if (vos.isEmpty() && vosMigrating.isEmpty()) {
|
||||
_alertMgr.sendAlert(AlertManager.ALERT_TYPE_HOST, host.getDataCenterId(), host.getPodId(), "Migration Complete for host " + hostDesc, "Host [" + hostDesc + "] is ready for maintenance");
|
||||
_resourceMgr.resourceStateTransitTo(host, ResourceState.Event.InternalEnterMaintenance, _msId);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Throwable th) {
|
||||
s_logger.error("Caught the following exception: ", th);
|
||||
}
|
||||
}
|
||||
|
||||
s_logger.info("Agent Monitor is leaving the building!");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void signalStop() {
|
||||
_stop = true;
|
||||
interrupt();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRecurring() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean processAnswers(long agentId, long seq, Answer[] answers) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override @DB
|
||||
public boolean processCommands(long agentId, long seq, Command[] commands) {
|
||||
boolean processed = false;
|
||||
for (Command cmd : commands) {
|
||||
if (cmd instanceof PingCommand) {
|
||||
pingBy(agentId);
|
||||
}
|
||||
}
|
||||
return processed;
|
||||
}
|
||||
|
||||
protected List<Long> findAgentsBehindOnPing() {
|
||||
List<Long> agentsBehind = new ArrayList<Long>();
|
||||
long cutoffTime = InaccurateClock.getTimeInSeconds() - _pingTimeout;
|
||||
for (Map.Entry<Long, Long> entry : _pingMap.entrySet()) {
|
||||
if (entry.getValue() < cutoffTime) {
|
||||
agentsBehind.add(entry.getKey());
|
||||
}
|
||||
}
|
||||
|
||||
if (agentsBehind.size() > 0) {
|
||||
s_logger.info("Found the following agents behind on ping: " + agentsBehind);
|
||||
}
|
||||
|
||||
return agentsBehind;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated We're using the in-memory
|
||||
*/
|
||||
@Deprecated
|
||||
protected List<HostVO> findHostsBehindOnPing() {
|
||||
long time = (System.currentTimeMillis() >> 10) - _pingTimeout;
|
||||
List<HostVO> hosts = _hostDao.findLostHosts(time);
|
||||
if (s_logger.isInfoEnabled()) {
|
||||
s_logger.info("Found " + hosts.size() + " hosts behind on ping. pingTimeout : " + _pingTimeout +
|
||||
", mark time : " + time);
|
||||
}
|
||||
|
||||
for (HostVO host : hosts) {
|
||||
if (host.getType().equals(Host.Type.ExternalFirewall) ||
|
||||
host.getType().equals(Host.Type.ExternalLoadBalancer) ||
|
||||
host.getType().equals(Host.Type.TrafficMonitor) ||
|
||||
host.getType().equals(Host.Type.SecondaryStorage)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (host.getManagementServerId() == null || host.getManagementServerId() == _msId) {
|
||||
if (s_logger.isInfoEnabled()) {
|
||||
s_logger.info("Asking agent mgr to investgate why host " + host.getId() +
|
||||
" is behind on ping. last ping time: " + host.getLastPinged());
|
||||
}
|
||||
_agentMgr.disconnectWithInvestigation(host.getId(), Event.PingTimeout);
|
||||
}
|
||||
}
|
||||
|
||||
return hosts;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AgentControlAnswer processControlCommand(long agentId, AgentControlCommand cmd) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processConnect(Host host, StartupCommand cmd, boolean forRebalance) {
|
||||
if (host.getType().equals(Host.Type.TrafficMonitor) ||
|
||||
host.getType().equals(Host.Type.SecondaryStorage)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// NOTE: We don't use pingBy here because we're initiating.
|
||||
_pingMap.put(host.getId(), InaccurateClock.getTimeInSeconds());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean processDisconnect(long agentId, Status state) {
|
||||
_pingMap.remove(agentId);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean processTimeout(long agentId, long seq) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTimeout() {
|
||||
return -1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startMonitoring(long pingTimeout) {
|
||||
_pingTimeout = pingTimeout;
|
||||
start();
|
||||
}
|
||||
}
|
||||
|
||||
@ -20,9 +20,7 @@ import com.cloud.agent.Listener;
|
||||
|
||||
public interface AgentMonitorService extends Listener {
|
||||
|
||||
public Boolean isAgentBehindOnPing(long agentId);
|
||||
public Long getAgentPingTime(long agentId);
|
||||
public void pingBy(long agentId);
|
||||
public void signalStop();
|
||||
public void startMonitoring(long pingTimeout);
|
||||
}
|
||||
|
||||
@ -208,7 +208,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
||||
}
|
||||
|
||||
// for agents that are self-managed, threshold to be considered as disconnected after pingtimeout
|
||||
long cutSeconds = (System.currentTimeMillis() >> 10) - (_pingTimeout);
|
||||
long cutSeconds = (System.currentTimeMillis() >> 10) - getTimeout();
|
||||
List<HostVO> hosts = _hostDao.findAndUpdateDirectAgentToLoad(cutSeconds, _loadSize.value().longValue(), _nodeId);
|
||||
List<HostVO> appliances = _hostDao.findAndUpdateApplianceToLoad(cutSeconds, _nodeId);
|
||||
hosts.addAll(appliances);
|
||||
@ -720,7 +720,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
|
||||
public void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId) {
|
||||
for (ManagementServerHostVO vo : nodeList) {
|
||||
s_logger.info("Marking hosts as disconnected on Management server" + vo.getMsid());
|
||||
long lastPing = (System.currentTimeMillis() >> 10) - _pingTimeout;
|
||||
long lastPing = (System.currentTimeMillis() >> 10) - getTimeout();
|
||||
_hostDao.markHostsAsDisconnected(vo.getMsid(), lastPing);
|
||||
s_logger.info("Deleting entries from op_host_transfer table for Management server " + vo.getMsid());
|
||||
cleanupTransferMap(vo.getMsid());
|
||||
|
||||
@ -165,10 +165,6 @@ public enum Config {
|
||||
IntegrationAPIPort("Advanced", ManagementServer.class, Integer.class, "integration.api.port", null, "Defaul API port", null),
|
||||
InvestigateRetryInterval("Advanced", HighAvailabilityManager.class, Integer.class, "investigate.retry.interval", "60", "Time (in seconds) between VM pings when agent is disconnected", null),
|
||||
MigrateRetryInterval("Advanced", HighAvailabilityManager.class, Integer.class, "migrate.retry.interval", "120", "Time (in seconds) between migration retries", null),
|
||||
PingInterval("Advanced", AgentManager.class, Integer.class, "ping.interval", "60", "Ping interval in seconds", null),
|
||||
PingTimeout("Advanced", AgentManager.class, Float.class, "ping.timeout", "2.5", "Multiplier to ping.interval before announcing an agent has timed out", null),
|
||||
ClusterDeltaSyncInterval("Advanced", AgentManager.class, Integer.class, "sync.interval", "60", "Cluster Delta sync interval in seconds", null),
|
||||
Port("Advanced", AgentManager.class, Integer.class, "port", "8250", "Port to listen on for agent connection.", null),
|
||||
RouterCpuMHz("Advanced", NetworkManager.class, Integer.class, "router.cpu.mhz", String.valueOf(VpcVirtualNetworkApplianceManager.DEFAULT_ROUTER_CPU_MHZ), "Default CPU speed (MHz) for router VM.", null),
|
||||
RestartRetryInterval("Advanced", HighAvailabilityManager.class, Integer.class, "restart.retry.interval", "600", "Time (in seconds) between retries to restart a vm", null),
|
||||
RouterStatsInterval("Advanced", NetworkManager.class, Integer.class, "router.stats.interval", "300", "Interval (in seconds) to report router statistics.", null),
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user