Removed unused agent lb timer

This commit is contained in:
Alena Prokharchyk 2013-11-21 14:11:14 -08:00
parent 7df7abf327
commit e8ec75a2b5

View File

@ -94,7 +94,7 @@ import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.nio.Link;
import com.cloud.utils.nio.Task;
@Local(value = {AgentManager.class, ClusteredAgentRebalanceService.class})
@Local(value = { AgentManager.class, ClusteredAgentRebalanceService.class })
public class ClusteredAgentManagerImpl extends AgentManagerImpl implements ClusterManagerListener, ClusteredAgentRebalanceService {
final static Logger s_logger = Logger.getLogger(ClusteredAgentManagerImpl.class);
private static final ScheduledExecutorService s_transferExecutor = Executors.newScheduledThreadPool(2, new NamedThreadFactory("Cluster-AgentRebalancingExecutor"));
@ -108,7 +108,6 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
protected HashMap<String, SocketChannel> _peers;
protected HashMap<String, SSLEngine> _sslEngines;
private final Timer _timer = new Timer("ClusteredAgentManager Timer");
private final Timer _agentLbTimer = new Timer("ClusteredAgentManager AgentRebalancing Timer");
boolean _agentLbHappened = false;
@Inject
@ -129,13 +128,13 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
protected final ConfigKey<Boolean> EnableLB = new ConfigKey<Boolean>(Boolean.class, "agent.lb.enabled", "Advanced", "false",
"Enable agent load balancing between management server nodes", true);
"Enable agent load balancing between management server nodes", true);
protected final ConfigKey<Double> ConnectedAgentThreshold = new ConfigKey<Double>(Double.class, "agent.load.threshold", "Advanced", "0.7",
"What percentage of the agents can be held by one management server before load balancing happens", true);
"What percentage of the agents can be held by one management server before load balancing happens", true);
protected final ConfigKey<Integer> LoadSize = new ConfigKey<Integer>(Integer.class, "direct.agent.load.size", "Advanced", "16",
"How many agents to connect to in each round", true);
"How many agents to connect to in each round", true);
protected final ConfigKey<Integer> ScanInterval = new ConfigKey<Integer>(Integer.class, "direct.agent.scan.interval", "Advanced", "90",
"Interval between scans to load agents", false, ConfigKey.Scope.Global, 1000);
"Interval between scans to load agents", false, ConfigKey.Scope.Global, 1000);
@Override
public boolean configure(String name, Map<String, Object> xmlParams) throws ConfigurationException {
@ -277,9 +276,9 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
@Override
protected AgentAttache createAttacheForDirectConnect(Host host, ServerResource resource) {
// if (resource instanceof DummySecondaryStorageResource) {
// return new DummyAttache(this, host.getId(), false);
// }
// if (resource instanceof DummySecondaryStorageResource) {
// return new DummyAttache(this, host.getId(), false);
// }
s_logger.debug("create ClusteredDirectAgentAttache for " + host.getId());
final DirectAgentAttache attache = new ClusteredDirectAgentAttache(this, host.getId(), host.getName(), _nodeId, resource, host.isInMaintenanceStates(), this);
AgentAttache old = null;
@ -329,23 +328,23 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
AgentAttache attache = findAttache(hostId);
if (attache != null) {
//don't process disconnect if the host is being rebalanced
// don't process disconnect if the host is being rebalanced
if (isAgentRebalanceEnabled()) {
HostTransferMapVO transferVO = _hostTransferDao.findById(hostId);
if (transferVO != null) {
if (transferVO.getFutureOwner() == _nodeId && transferVO.getState() == HostTransferState.TransferStarted) {
s_logger.debug("Not processing " + Event.AgentDisconnected + " event for the host id=" + hostId + " as the host is being connected to " +
_nodeId);
_nodeId);
return true;
}
}
}
//don't process disconnect if the disconnect came for the host via delayed cluster notification,
//but the host has already reconnected to the current management server
// don't process disconnect if the disconnect came for the host via delayed cluster notification,
// but the host has already reconnected to the current management server
if (!attache.forForward()) {
s_logger.debug("Not processing " + Event.AgentDisconnected + " event for the host id=" + hostId +
" as the host is directly connected to the current management server " + _nodeId);
" as the host is directly connected to the current management server " + _nodeId);
return true;
}
@ -376,7 +375,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
public void notifyNodesInCluster(AgentAttache attache) {
s_logger.debug("Notifying other nodes of to disconnect");
Command[] cmds = new Command[] {new ChangeAgentCommand(attache.getId(), Event.AgentDisconnected)};
Command[] cmds = new Command[] { new ChangeAgentCommand(attache.getId(), Event.AgentDisconnected) };
_clusterMgr.broadcast(attache.getId(), _gson.toJson(cmds));
}
@ -385,23 +384,23 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
if (s_logger.isDebugEnabled()) {
s_logger.debug("Notifying other MS nodes to run host scan task");
}
Command[] cmds = new Command[] {new ScheduleHostScanTaskCommand()};
Command[] cmds = new Command[] { new ScheduleHostScanTaskCommand() };
_clusterMgr.broadcast(0, _gson.toJson(cmds));
}
protected static void logT(byte[] bytes, final String msg) {
s_logger.trace("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": " +
(Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
(Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
}
protected static void logD(byte[] bytes, final String msg) {
s_logger.debug("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": " +
(Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
(Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
}
protected static void logI(byte[] bytes, final String msg) {
s_logger.info("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": " +
(Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
(Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
}
public boolean routeToPeer(String peer, byte[] bytes) {
@ -426,7 +425,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
if (s_logger.isDebugEnabled()) {
logD(bytes, "Routing to peer");
}
Link.write(ch, new ByteBuffer[] {ByteBuffer.wrap(bytes)}, sslEngine);
Link.write(ch, new ByteBuffer[] { ByteBuffer.wrap(bytes) }, sslEngine);
return true;
} catch (IOException e) {
try {
@ -568,9 +567,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
}
_timer.cancel();
_agentLbTimer.cancel();
//cancel all transfer tasks
// cancel all transfer tasks
s_transferExecutor.shutdownNow();
cleanupTransferMap(_nodeId);
@ -620,14 +618,15 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
Request req = Request.parse(data);
Command[] cmds = req.getCommands();
CancelCommand cancel = (CancelCommand)cmds[0];
CancelCommand cancel = (CancelCommand) cmds[0];
if (s_logger.isDebugEnabled()) {
logD(data, "Cancel request received");
}
agent.cancel(cancel.getSequence());
final Long current = agent._currentSequence;
//if the request is the current request, always have to trigger sending next request in sequence,
//otherwise the agent queue will be blocked
// if the request is the current request, always have to trigger sending next request in
// sequence,
// otherwise the agent queue will be blocked
if (req.executeInSequence() && (current != null && current == Request.getSequence(data))) {
agent.sendNext(Request.getSequence(data));
}
@ -648,7 +647,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
return;
} else {
if (agent instanceof Routable) {
Routable cluster = (Routable)agent;
Routable cluster = (Routable) agent;
cluster.routeToAgent(data);
} else {
agent.send(Request.parse(data));
@ -665,7 +664,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
if (mgmtId != -1 && mgmtId != _nodeId) {
routeToPeer(Long.toString(mgmtId), data);
if (Request.requiresSequentialExecution(data)) {
AgentAttache attache = (AgentAttache)link.attachment();
AgentAttache attache = (AgentAttache) link.attachment();
if (attache != null) {
attache.sendNext(Request.getSequence(data));
} else if (s_logger.isDebugEnabled()) {
@ -727,7 +726,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
@Override
public boolean executeRebalanceRequest(long agentId, long currentOwnerId, long futureOwnerId, Event event) throws AgentUnavailableException,
OperationTimedoutException {
OperationTimedoutException {
boolean result = false;
if (event == Event.RequestAgentRebalance) {
return setToWaitForRebalance(agentId, currentOwnerId, futureOwnerId);
@ -794,7 +793,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
} else {
if (s_logger.isDebugEnabled()) {
s_logger.debug("There are no hosts to rebalance in the system. Current number of active management server nodes in the system is " + allMS.size() +
"; number of managed agents is " + allManagedAgents.size());
"; number of managed agents is " + allManagedAgents.size());
}
return;
}
@ -849,7 +848,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
if (s_logger.isDebugEnabled()) {
s_logger.debug("Removing mapping from op_host_transfer as it failed to be set to transfer mode");
}
//just remove the mapping (if exists) as nothing was done on the peer management server yet
// just remove the mapping (if exists) as nothing was done on the peer management
// server yet
_hostTransferDao.remove(transfer.getId());
}
}
@ -934,7 +934,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
synchronized (_agentToTransferIds) {
if (_agentToTransferIds.size() > 0) {
s_logger.debug("Found " + _agentToTransferIds.size() + " agents to transfer");
//for (Long hostId : _agentToTransferIds) {
// for (Long hostId : _agentToTransferIds) {
for (Iterator<Long> iterator = _agentToTransferIds.iterator(); iterator.hasNext();) {
Long hostId = iterator.next();
AgentAttache attache = findAttache(hostId);
@ -947,7 +947,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
// no need to do anything with the real attache as we haven't modified it yet
Date cutTime = DateUtil.currentGMTTime();
HostTransferMapVO transferMap =
_hostTransferDao.findActiveHostTransferMapByHostId(hostId, new Date(cutTime.getTime() - rebalanceTimeOut));
_hostTransferDao.findActiveHostTransferMapByHostId(hostId, new Date(cutTime.getTime() - rebalanceTimeOut));
if (transferMap == null) {
s_logger.debug("Timed out waiting for the host id=" + hostId + " to be ready to transfer, skipping rebalance for the host");
@ -966,7 +966,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
ManagementServerHostVO ms = _mshostDao.findByMsid(transferMap.getFutureOwner());
if (ms != null && ms.getState() != ManagementServerHost.State.Up) {
s_logger.debug("Can't transfer host " + hostId + " as it's future owner is not in UP state: " + ms +
", skipping rebalance for the host");
", skipping rebalance for the host");
iterator.remove();
_hostTransferDao.completeAgentTransfer(hostId);
continue;
@ -983,7 +983,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
} else {
s_logger.debug("Agent " + hostId + " can't be transfered yet as its request queue size is " + attache.getQueueSize() +
" and listener queue size is " + attache.getNonRecurringListenersSize());
" and listener queue size is " + attache.getNonRecurringListenersSize());
}
}
} else {
@ -1050,7 +1050,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
if (result) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Loading directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId +
" as a part of rebalance process");
" as a part of rebalance process");
}
result = loadDirectlyConnectedHost(host, true);
} else {
@ -1059,16 +1059,16 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
} catch (Exception ex) {
s_logger.warn("Failed to load directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId +
" as a part of rebalance process due to:", ex);
" as a part of rebalance process due to:", ex);
result = false;
}
if (result) {
s_logger.debug("Successfully loaded directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId +
" as a part of rebalance process");
" as a part of rebalance process");
} else {
s_logger.warn("Failed to load directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId +
" as a part of rebalance process");
" as a part of rebalance process");
}
}
@ -1089,18 +1089,18 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
return;
}
ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)attache;
ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache) attache;
if (success) {
//1) Set transfer mode to false - so the agent can start processing requests normally
// 1) Set transfer mode to false - so the agent can start processing requests normally
forwardAttache.setTransferMode(false);
//2) Get all transfer requests and route them to peer
// 2) Get all transfer requests and route them to peer
Request requestToTransfer = forwardAttache.getRequestToTransfer();
while (requestToTransfer != null) {
s_logger.debug("Forwarding request " + requestToTransfer.getSequence() + " held in transfer attache " + hostId + " from the management server " +
_nodeId + " to " + futureOwnerId);
_nodeId + " to " + futureOwnerId);
boolean routeResult = routeToPeer(Long.toString(futureOwnerId), requestToTransfer.getBytes());
if (!routeResult) {
logD(requestToTransfer.getBytes(), "Failed to route request to peer");
@ -1138,10 +1138,10 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
synchronized (_agents) {
ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache)_agents.get(hostId);
ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache) _agents.get(hostId);
if (attache != null && attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) {
handleDisconnectWithoutInvestigation(attache, Event.StartAgentRebalance, true, true);
ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)createAttache(hostId);
ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache) createAttache(hostId);
if (forwardAttache == null) {
s_logger.warn("Unable to create a forward attache for the host " + hostId + " as a part of rebalance process");
return false;
@ -1154,7 +1154,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
s_logger.warn("Attache for the agent " + hostId + " no longer exists on management server " + _nodeId + ", can't start host rebalancing");
} else {
s_logger.warn("Attache for the agent " + hostId + " has request queue size= " + attache.getQueueSize() + " and listener queue size " +
attache.getNonRecurringListenersSize() + ", can't start host rebalancing");
attache.getNonRecurringListenersSize() + ", can't start host rebalancing");
}
return false;
}
@ -1211,7 +1211,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
// Scheduling host scan task in peer MS is a best effort operation during host add, regular host scan
// happens at fixed intervals anyways. So handling any exceptions that may be thrown
s_logger.warn("Exception happened while trying to schedule host scan task on mgmt server " + _clusterMgr.getSelfPeerName() +
", ignoring as regular host scan happens at fixed interval anyways", e);
", ignoring as regular host scan happens at fixed interval anyways", e);
return null;
}
@ -1249,8 +1249,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
s_logger.error("Excection in gson decoding : ", e);
}
if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) { //intercepted
ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0];
if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) { // intercepted
ChangeAgentCommand cmd = (ChangeAgentCommand) cmds[0];
if (s_logger.isDebugEnabled()) {
s_logger.debug("Intercepting command for agent change: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
@ -1271,7 +1271,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
answers[0] = new ChangeAgentAnswer(cmd, result);
return _gson.toJson(answers);
} else if (cmds.length == 1 && cmds[0] instanceof TransferAgentCommand) {
TransferAgentCommand cmd = (TransferAgentCommand)cmds[0];
TransferAgentCommand cmd = (TransferAgentCommand) cmds[0];
if (s_logger.isDebugEnabled()) {
s_logger.debug("Intercepting command for agent rebalancing: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
@ -1294,7 +1294,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
answers[0] = new Answer(cmd, result, null);
return _gson.toJson(answers);
} else if (cmds.length == 1 && cmds[0] instanceof PropagateResourceEventCommand) {
PropagateResourceEventCommand cmd = (PropagateResourceEventCommand)cmds[0];
PropagateResourceEventCommand cmd = (PropagateResourceEventCommand) cmds[0];
s_logger.debug("Intercepting command to propagate event " + cmd.getEvent().name() + " for host " + cmd.getHostId());
@ -1311,7 +1311,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
answers[0] = new Answer(cmd, result, null);
return _gson.toJson(answers);
} else if (cmds.length == 1 && cmds[0] instanceof ScheduleHostScanTaskCommand) {
ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand)cmds[0];
ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand) cmds[0];
String response = handleScheduleHostScanTaskCommand(cmd);
return response;
}
@ -1328,14 +1328,14 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
if (s_logger.isDebugEnabled()) {
s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() + " in " +
(System.currentTimeMillis() - startTick) + " ms, return result: " + jsonReturn);
(System.currentTimeMillis() - startTick) + " ms, return result: " + jsonReturn);
}
return jsonReturn;
} else {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() + " in " +
(System.currentTimeMillis() - startTick) + " ms, return null result");
(System.currentTimeMillis() - startTick) + " ms, return null result");
}
}
} catch (AgentUnavailableException e) {
@ -1369,7 +1369,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
if (s_logger.isTraceEnabled()) {
s_logger.trace("Agent rebalance task check, management server id:" + _nodeId);
}
//initiate agent lb task will be scheduled and executed only once, and only when number of agents loaded exceeds _connectedAgentsThreshold
// initiate agent lb task will be scheduled and executed only once, and only when number of agents
// loaded exceeds _connectedAgentsThreshold
if (!_agentLbHappened) {
QueryBuilder<HostVO> sc = QueryBuilder.create(HostVO.class);
sc.and(sc.entity().getManagementServerId(), Op.NNULL);
@ -1385,12 +1386,12 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
double load = managedHostsCount / allHostsCount;
if (load >= ConnectedAgentThreshold.value()) {
s_logger.debug("Scheduling agent rebalancing task as the average agent load " + load + " is more than the threshold " +
ConnectedAgentThreshold.value());
ConnectedAgentThreshold.value());
scheduleRebalanceAgents();
_agentLbHappened = true;
} else {
s_logger.debug("Not scheduling agent rebalancing task as the averages load " + load + " is less than the threshold " +
ConnectedAgentThreshold.value());
ConnectedAgentThreshold.value());
}
}
}