bug 10445: Submit rebalancing task in a separate thread for each host

status 10445: resolved fixed
This commit is contained in:
alena 2011-06-24 15:38:09 -07:00
parent 97b562fc2a
commit fce33bcd76
2 changed files with 55 additions and 6 deletions

View File

@ -21,8 +21,12 @@ import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.ejb.Local;
@ -38,6 +42,7 @@ import com.cloud.agent.api.CancelCommand;
import com.cloud.agent.api.ChangeAgentCommand;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.TransferAgentCommand;
import com.cloud.agent.manager.AgentManagerImpl.SimulateStartTask;
import com.cloud.agent.transport.Request;
import com.cloud.agent.transport.Request.Version;
import com.cloud.agent.transport.Response;
@ -47,6 +52,7 @@ import com.cloud.cluster.ClusterManagerListener;
import com.cloud.cluster.ClusteredAgentRebalanceService;
import com.cloud.cluster.ManagementServerHost;
import com.cloud.cluster.ManagementServerHostVO;
import com.cloud.cluster.StackMaid;
import com.cloud.cluster.agentlb.AgentLoadBalancerPlanner;
import com.cloud.cluster.agentlb.HostTransferMapVO;
import com.cloud.cluster.agentlb.HostTransferMapVO.HostTransferState;
@ -79,14 +85,13 @@ import com.cloud.utils.nio.Task;
public class ClusteredAgentManagerImpl extends AgentManagerImpl implements ClusterManagerListener, ClusteredAgentRebalanceService {
final static Logger s_logger = Logger.getLogger(ClusteredAgentManagerImpl.class);
private static final ScheduledExecutorService s_transferExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-AgentTransferExecutor"));
private final long rebalanceTimeOut = 300000; // 5 mins - after this time remove the agent from the transfer list
public final static long STARTUP_DELAY = 5000;
public final static long SCAN_INTERVAL = 90000; // 90 seconds, it takes 60 sec for xenserver to fail login
public final static int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds
public long _loadSize = 100;
protected Set<Long> _agentToTransferIds = new HashSet<Long>();
private final long rebalanceTimeOut = 300000; // 5 mins - after this time remove the agent from the transfer list
protected Set<Long> _agentToTransferIds = new HashSet<Long>();
@Inject
protected ClusterManager _clusterMgr = null;
@ -121,7 +126,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
ClusteredAgentAttache.initialize(this);
_clusterMgr.registerListener(this);
return super.configure(name, xmlParams);
}
@ -906,7 +911,13 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
if (attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) {
iterator.remove();
rebalanceHost(hostId, transferMap.getInitialOwner(), transferMap.getFutureOwner());
try {
_executor.execute(new RebalanceTask(hostId, transferMap.getInitialOwner(), transferMap.getFutureOwner()));
} catch (RejectedExecutionException ex) {
s_logger.warn("Failed to submit rebalance task for host id=" + hostId + "; postponing the execution");
continue;
}
} else {
s_logger.debug("Agent " + hostId + " can't be transfered yet as its request queue size is " + attache.getQueueSize() + " and listener queue size is " + attache.getNonRecurringListenersSize());
}
@ -1082,4 +1093,32 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
}
protected class RebalanceTask implements Runnable {
Long hostId = null;
Long currentOwnerId = null;
Long futureOwnerId = null;
public RebalanceTask(long hostId, long currentOwnerId, long futureOwnerId) {
this.hostId = hostId;
this.currentOwnerId = currentOwnerId;
this.futureOwnerId = futureOwnerId;
}
@Override
public void run() {
try {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Rebalancing host id=" + hostId);
}
rebalanceHost(hostId, currentOwnerId, futureOwnerId);
} catch (Exception e) {
s_logger.warn("Unable to rebalance host id=" + hostId, e);
} finally {
StackMaid.current().exitCleanup();
}
}
}
}

View File

@ -32,6 +32,8 @@ import javax.persistence.TableGenerator;
import org.apache.log4j.Logger;
import com.cloud.cluster.agentlb.HostTransferMapVO;
import com.cloud.cluster.agentlb.dao.HostTransferMapDaoImpl;
import com.cloud.host.Host;
import com.cloud.host.Host.Type;
import com.cloud.host.HostTagVO;
@ -50,6 +52,7 @@ import com.cloud.utils.db.GenericSearchBuilder;
import com.cloud.utils.db.JoinBuilder;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.JoinBuilder.JoinType;
import com.cloud.utils.db.SearchCriteria.Func;
import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.Transaction;
@ -96,6 +99,7 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
protected final GenericSearchBuilder<HostVO, Long> HostsInStatusSearch;
protected final GenericSearchBuilder<HostVO, Long> CountRoutingByDc;
protected final SearchBuilder<HostTransferMapVO> HostTransferSearch;
protected final Attribute _statusAttr;
protected final Attribute _msIdAttr;
@ -103,6 +107,7 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
protected final HostDetailsDaoImpl _detailsDao = ComponentLocator.inject(HostDetailsDaoImpl.class);
protected final HostTagsDaoImpl _hostTagsDao = ComponentLocator.inject(HostTagsDaoImpl.class);
protected final HostTransferMapDaoImpl _hostTransferDao = ComponentLocator.inject(HostTransferMapDaoImpl.class);
public HostDaoImpl() {
@ -225,7 +230,11 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
* UnmanagedDirectConnectSearch.and("lastPinged", UnmanagedDirectConnectSearch.entity().getLastPinged(),
* SearchCriteria.Op.LTEQ); UnmanagedDirectConnectSearch.cp(); UnmanagedDirectConnectSearch.cp();
*/
HostTransferSearch = _hostTransferDao.createSearchBuilder();
HostTransferSearch.and("id", HostTransferSearch.entity().getId(), SearchCriteria.Op.NULL);
UnmanagedDirectConnectSearch.join("hostTransferSearch", HostTransferSearch, HostTransferSearch.entity().getId(), UnmanagedDirectConnectSearch.entity().getId(), JoinType.LEFTOUTER);
UnmanagedDirectConnectSearch.done();
DirectConnectSearch = createSearchBuilder();
DirectConnectSearch.and("resource", DirectConnectSearch.entity().getResource(), SearchCriteria.Op.NNULL);
@ -357,7 +366,8 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
@Override
public List<HostVO> findDirectAgentToLoad(long lastPingSecondsAfter, Long limit) {
SearchCriteria<HostVO> sc = UnmanagedDirectConnectSearch.create();
sc.setParameters("lastPinged", lastPingSecondsAfter);
sc.setParameters("lastPinged", lastPingSecondsAfter);
return search(sc, new Filter(HostVO.class, "clusterId", true, 0L, limit));
}