bug 10406: fixed concurrency problem happening during Agent Transfer Monitor task

status 10406: resolved fixed
This commit is contained in:
alena 2011-06-22 13:45:16 -07:00
parent 38ebac207b
commit b8e15a833b

View File

@ -16,7 +16,6 @@ import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -867,49 +866,55 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
if (s_logger.isTraceEnabled()) {
s_logger.trace("Clustered agent transfer scan check, management server id:" + _nodeId);
}
if (_agentToTransferIds.size() > 0) {
s_logger.debug("Found " + _agentToTransferIds.size() + " agents to transfer");
for (Long hostId : _agentToTransferIds) {
AgentAttache attache = findAttache(hostId);
// if the thread:
// 1) timed out waiting for the host to reconnect
// 2) recipient management server is not active any more
// remove the host from re-balance list and delete from op_host_transfer DB
// no need to do anything with the real attache as we haven't modified it yet
Date cutTime = DateUtil.currentGMTTime();
if (_hostTransferDao.isNotActive(hostId, new Date(cutTime.getTime() - rebalanceTimeOut))) {
s_logger.debug("Timed out waiting for the host id=" + hostId + " to be ready to transfer, skipping rebalance for the host");
failStartRebalance(hostId);
return;
}
HostTransferMapVO transferMap = _hostTransferDao.findByIdAndCurrentOwnerId(hostId, _nodeId);
if (transferMap == null) {
s_logger.debug("Can't transfer host id=" + hostId + "; record for the host no longer exists in op_host_transfer table");
failStartRebalance(hostId);
return;
synchronized (_agentToTransferIds) {
if (_agentToTransferIds.size() > 0) {
s_logger.debug("Found " + _agentToTransferIds.size() + " agents to transfer");
//for (Long hostId : _agentToTransferIds) {
for (Iterator<Long> iterator = _agentToTransferIds.iterator(); iterator.hasNext();) {
Long hostId = iterator.next();
AgentAttache attache = findAttache(hostId);
// if the thread:
// 1) timed out waiting for the host to reconnect
// 2) recipient management server is not active any more
// remove the host from re-balance list and delete from op_host_transfer DB
// no need to do anything with the real attache as we haven't modified it yet
Date cutTime = DateUtil.currentGMTTime();
if (_hostTransferDao.isNotActive(hostId, new Date(cutTime.getTime() - rebalanceTimeOut))) {
s_logger.debug("Timed out waiting for the host id=" + hostId + " to be ready to transfer, skipping rebalance for the host");
iterator.remove();
_hostTransferDao.completeAgentTransfer(hostId);
continue;
}
HostTransferMapVO transferMap = _hostTransferDao.findByIdAndCurrentOwnerId(hostId, _nodeId);
if (transferMap == null) {
s_logger.debug("Can't transfer host id=" + hostId + "; record for the host no longer exists in op_host_transfer table");
iterator.remove();
_hostTransferDao.completeAgentTransfer(hostId);
continue;
}
ManagementServerHostVO ms = _mshostDao.findByMsid(transferMap.getFutureOwner());
if (ms != null && ms.getState() != ManagementServerHost.State.Up) {
s_logger.debug("Can't transfer host " + hostId + " as it's future owner is not in UP state: " + ms + ", skipping rebalance for the host");
iterator.remove();
_hostTransferDao.completeAgentTransfer(hostId);
continue;
}
if (attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) {
iterator.remove();
rebalanceHost(hostId, transferMap.getInitialOwner(), transferMap.getFutureOwner());
} else {
s_logger.debug("Agent " + hostId + " can't be transfered yet as its request queue size is " + attache.getQueueSize() + " and listener queue size is " + attache.getNonRecurringListenersSize());
}
}
ManagementServerHostVO ms = _mshostDao.findByMsid(transferMap.getFutureOwner());
if (ms != null && ms.getState() != ManagementServerHost.State.Up) {
s_logger.debug("Can't transfer host " + hostId + " as it's future owner is not in UP state: " + ms + ", skipping rebalance for the host");
failStartRebalance(hostId);
return;
}
if (attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) {
rebalanceHost(hostId, transferMap.getInitialOwner(), transferMap.getFutureOwner());
} else {
s_logger.debug("Agent " + hostId + " can't be transfered yet as its request queue size is " + attache.getQueueSize() + " and listener queue size is " + attache.getNonRecurringListenersSize());
} else {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Found no agents to be transfered by the management server " + _nodeId);
}
}
} else {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Found no agents to be transfered by the management server " + _nodeId);
}
}
@ -933,7 +938,6 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
boolean result = true;
if (currentOwnerId == _nodeId) {
_agentToTransferIds.remove(hostId);
if (!startRebalance(hostId)) {
s_logger.debug("Failed to start agent rebalancing");
failRebalance(hostId);
@ -952,9 +956,10 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
if (result) {
s_logger.debug("Got host id=" + hostId + " from management server " + futureOwnerId);
s_logger.debug("Successfully transfered host id=" + hostId + " to management server " + futureOwnerId);
finishRebalance(hostId, futureOwnerId, Event.RebalanceCompleted);
} else {
s_logger.debug("Failed to transfer host id=" + hostId + " to management server " + futureOwnerId);
finishRebalance(hostId, futureOwnerId, Event.RebalanceFailed);
}
@ -1034,7 +1039,6 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
synchronized (_agents) {
ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache)_agents.get(hostId);
if (attache != null && attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) {
_agentToTransferIds.remove(hostId);
removeAgent(attache, Status.Rebalancing);
ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)createAttache(hostId);
if (forwardAttache == null) {
@ -1066,11 +1070,6 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
return true;
}
protected void failStartRebalance(final long hostId) {
_agentToTransferIds.remove(hostId);
_hostTransferDao.completeAgentTransfer(hostId);
}
protected void cleanupTransferMap() {
List<HostTransferMapVO> hostsJoingingCluster = _hostTransferDao.listHostsJoiningCluster(_nodeId);