Some fixes to agent lb code:

* Schedule agent LB as a TimerTask as it can take time for the management server to start; and it can accept rebalance requests only when it's up and running
* Removed Starting state from mshost as it's not being used anywhere
* Fixed the bug where requests weren't routed properly from the old host owner to the new one.
This commit is contained in:
alena 2011-06-13 17:18:03 -07:00
parent 30617a947c
commit 00a35314bb
6 changed files with 155 additions and 122 deletions

View File

@ -9,6 +9,10 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import javax.net.ssl.SSLEngine;
@ -16,8 +20,10 @@ import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.Listener;
import com.cloud.agent.api.Command;
import com.cloud.agent.transport.Request;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.host.Status;
import com.cloud.utils.nio.Link;
@ -26,6 +32,8 @@ public class ClusteredAgentAttache extends ConnectedAgentAttache implements Rout
private static ClusteredAgentManagerImpl s_clusteredAgentMgr;
protected ByteBuffer _buffer = ByteBuffer.allocate(2048);
private boolean _forward = false;
protected final LinkedList<Request> _transferRequests;
protected boolean _transferMode = false;
static public void initialize(ClusteredAgentManagerImpl agentMgr) {
s_clusteredAgentMgr = agentMgr;
@ -34,11 +42,13 @@ public class ClusteredAgentAttache extends ConnectedAgentAttache implements Rout
public ClusteredAgentAttache(AgentManager agentMgr, long id) {
super(agentMgr, id, null, false);
_forward = true;
_transferRequests = new LinkedList<Request>();
}
public ClusteredAgentAttache(AgentManager agentMgr, long id, Link link, boolean maintenance) {
super(agentMgr, id, link, maintenance);
_forward = link == null;
_transferRequests = new LinkedList<Request>();
}
@Override
@ -50,7 +60,22 @@ public class ClusteredAgentAttache extends ConnectedAgentAttache implements Rout
public boolean forForward() {
return _forward;
}
protected void checkAvailability(final Command[] cmds) throws AgentUnavailableException {
if (_transferMode) {
// need to throw some other exception while agent is in rebalancing mode
for (final Command cmd : cmds) {
if (!cmd.allowCaching()) {
throw new AgentUnavailableException("Unable to send " + cmd.getClass().toString() + " because agent is in Rebalancing mode", _id);
}
}
} else {
super.checkAvailability(cmds);
}
}
@Override
public void cancel(long seq) {
if (forForward()) {
@ -104,12 +129,24 @@ public class ClusteredAgentAttache extends ConnectedAgentAttache implements Rout
super.send(req, listener);
return;
}
long seq = req.getSequence();
if (listener != null) {
registerListener(req.getSequence(), listener);
}
if (_transferMode) {
if (s_logger.isDebugEnabled()) {
s_logger.debug(log(seq, "Holding request as the corresponding agent is in transfer mode: "));
}
synchronized (this) {
addRequestToTransfer(req);
return;
}
}
int i = 0;
SocketChannel ch = null;
@ -162,4 +199,42 @@ public class ClusteredAgentAttache extends ConnectedAgentAttache implements Rout
}
throw new AgentUnavailableException("Unable to reach the peer that the agent is connected", _id);
}
public synchronized void setTransferMode(final boolean transfer) {
_transferMode = transfer;
}
public boolean getTransferMode() {
return _transferMode;
}
public Request getRequestToTransfer() {
if (_transferRequests.isEmpty()) {
return null;
} else {
return _transferRequests.pop();
}
}
protected synchronized void addRequestToTransfer(Request req) {
int index = findTransferRequest(req);
assert (index < 0) : "How can we get index again? " + index + ":" + req.toString();
_transferRequests.add(-index - 1, req);
}
protected synchronized int findTransferRequest(Request req) {
return Collections.binarySearch(_transferRequests, req, s_reqComparator);
}
@Override
public void disconnect(final Status state) {
super.disconnect(state);
_transferRequests.clear();
}
public void cleanup(final Status state) {
super.cleanup(state);
_transferRequests.clear();
}
}

View File

@ -728,11 +728,45 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
return true;
}
@Override
public void scheduleRebalanceAgents() {
_timer.schedule(new AgentLoadBalancerTask(), 30000);
}
public class AgentLoadBalancerTask extends TimerTask {
protected volatile boolean cancelled = false;
public AgentLoadBalancerTask() {
s_logger.debug("Agent load balancer task created");
}
@Override
public synchronized boolean cancel() {
if (!cancelled) {
cancelled = true;
s_logger.debug("Agent load balancer task cancelled");
return super.cancel();
}
return true;
}
@Override
public synchronized void run() {
if (!cancelled) {
startRebalanceAgents();
if (s_logger.isInfoEnabled()) {
s_logger.info("The agent load balancer task is now being cancelled");
}
cancelled = true;
}
}
}
public void startRebalanceAgents() {
List<ManagementServerHostVO> allMS = _mshostDao.listBy(ManagementServerHost.State.Up, ManagementServerHost.State.Starting);
s_logger.debug("Management server " + _nodeId + " is asking other peers to rebalance their agents");
List<ManagementServerHostVO> allMS = _mshostDao.listBy(ManagementServerHost.State.Up);
List<HostVO> allManagedAgents = _hostDao.listManagedRoutingAgents();
int avLoad = 0;
@ -899,9 +933,10 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
boolean result = true;
if (currentOwnerId == _nodeId) {
_agentToTransferIds.remove(hostId);
if (!startRebalance(hostId)) {
s_logger.debug("Failed to start agent rebalancing");
failStartRebalance(hostId);
failRebalance(hostId);
return false;
}
try {
@ -948,46 +983,43 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
AgentAttache attache = findAttache(hostId);
if (attache == null) {
s_logger.debug("Unable to find attache for the host id=" + hostId + ", assuming that the agent disconnected already");
if (attache == null || !(attache instanceof ClusteredAgentAttache)) {
s_logger.debug("Unable to find forward attache for the host id=" + hostId + ", assuming that the agent disconnected already");
_hostTransferDao.completeAgentTransfer(hostId);
return;
} else if (success) {
s_logger.debug("Management server " + _nodeId + " is completing agent " + hostId + " rebalance");
//1) Get all the requests before removing transfer attache
LinkedList<Request> requests = ((ClusteredDirectAgentAttache) attache).getRequests();
removeAgent(attache, Status.Rebalancing);
}
ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)attache;
if (success) {
//2) Create forward attache
try {
getAttache(hostId);
//3) forward all the requests to the management server which owns the host now
if (!requests.isEmpty()) {
s_logger.debug("Forwarding requests held in transfer attache " + hostId + " from the management server " + _nodeId + " to " + futureOwnerId);
for (Iterator<Request> iter = requests.iterator(); iter.hasNext();) {
Request req = iter.next();
boolean routeResult = routeToPeer(Long.toString(futureOwnerId), req.getBytes());
if (!routeResult) {
logD(req.getBytes(), "Failed to route request to peer");
}
}
//1) Set transfer mode to false - so the agent can start processing requests normally
forwardAttache.setTransferMode(false);
//2) Get all transfer requests and route them to peer
Request requestToTransfer = forwardAttache.getRequestToTransfer();
while (requestToTransfer != null) {
s_logger.debug("Forwarding request " + requestToTransfer.getSequence() + " held in transfer attache " + hostId + " from the management server " + _nodeId + " to " + futureOwnerId);
boolean routeResult = routeToPeer(Long.toString(futureOwnerId), requestToTransfer.getBytes());
if (!routeResult) {
logD(requestToTransfer.getBytes(), "Failed to route request to peer");
}
} catch (AgentUnavailableException ex) {
s_logger.warn("Failed to finish host " + hostId + " rebalance: couldn't create forward attache as agent is not available", ex);
failRebalance(hostId);
requestToTransfer = forwardAttache.getRequestToTransfer();
}
} else {
failRebalance(hostId);
}
s_logger.debug("Management server " + _nodeId + " completed agent " + hostId + " rebalance");
_hostTransferDao.completeAgentTransfer(hostId);
}
protected void failRebalance(final long hostId) throws AgentUnavailableException{
reconnect(hostId);
s_logger.debug("Management server " + _nodeId + " failed to rebalance agent " + hostId);
_hostTransferDao.completeAgentTransfer(hostId);
reconnect(hostId);
}
@DB
@ -1003,9 +1035,15 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache)_agents.get(hostId);
if (attache != null && attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) {
_agentToTransferIds.remove(hostId);
removeAgent(attache, Status.Rebalancing);
ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)createAttache(hostId);
if (forwardAttache == null) {
s_logger.warn("Unable to create a forward attache for the host " + hostId + " as a part of rebalance process");
return false;
}
s_logger.debug("Putting agent id=" + hostId + " to transfer mode");
attache.setTransferMode(true);
_agents.put(hostId, attache);
forwardAttache.setTransferMode(true);
_agents.put(hostId, forwardAttache);
} else {
if (attache == null) {
s_logger.warn("Attache for the agent " + hostId + " no longer exists on management server " + _nodeId + ", can't start host rebalancing");

View File

@ -17,13 +17,7 @@
*/
package com.cloud.agent.manager;
import java.util.LinkedList;
import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.Listener;
import com.cloud.agent.api.Command;
import com.cloud.agent.transport.Request;
import com.cloud.agent.transport.Response;
import com.cloud.exception.AgentUnavailableException;
@ -32,10 +26,8 @@ import com.cloud.resource.ServerResource;
import com.cloud.utils.exception.CloudRuntimeException;
public class ClusteredDirectAgentAttache extends DirectAgentAttache implements Routable {
private final static Logger s_logger = Logger.getLogger(ClusteredDirectAgentAttache.class);
private final ClusteredAgentManagerImpl _mgr;
private final long _nodeId;
private boolean _transferMode = false;
public ClusteredDirectAgentAttache(AgentManager agentMgr, long id, long mgmtId, ServerResource resource, boolean maintenance, ClusteredAgentManagerImpl mgr) {
super(agentMgr, id, resource, maintenance, mgr);
@ -43,25 +35,6 @@ public class ClusteredDirectAgentAttache extends DirectAgentAttache implements R
_nodeId = mgmtId;
}
public synchronized void setTransferMode(final boolean transfer) {
_transferMode = transfer;
}
@Override
protected void checkAvailability(final Command[] cmds) throws AgentUnavailableException {
if (_transferMode) {
// need to throw some other exception while agent is in rebalancing mode
for (final Command cmd : cmds) {
if (!cmd.allowCaching()) {
throw new AgentUnavailableException("Unable to send " + cmd.getClass().toString() + " because agent is in Rebalancing mode", _id);
}
}
}
super.checkAvailability(cmds);
}
@Override
public void routeToAgent(byte[] data) throws AgentUnavailableException {
Request req;
@ -93,37 +66,5 @@ public class ClusteredDirectAgentAttache extends DirectAgentAttache implements R
return super.processAnswers(seq, response);
}
}
@Override
public void send(Request req, final Listener listener) throws AgentUnavailableException {
checkAvailability(req.getCommands());
if (_transferMode) {
long seq = req.getSequence();
if (s_logger.isDebugEnabled()) {
s_logger.debug(log(seq, "Holding request as the corresponding agent is in transfer mode: "));
}
synchronized (this) {
addRequest(req);
return;
}
} else {
super.send(req, listener);
}
}
public boolean getTransferMode() {
return _transferMode;
}
public LinkedList<Request> getRequests() {
if (_transferMode) {
return _requests;
} else {
return null;
}
}
}

View File

@ -126,9 +126,7 @@ public class ClusterManagerImpl implements ClusterManager {
private String _name;
private String _clusterNodeIP = "127.0.0.1";
private boolean _agentLBEnabled = false;
private State _state = State.Starting;
private final Object stateLock = new Object();
public ClusterManagerImpl() {
clusterPeers = new HashMap<String, ClusterService>();
asyncCalls = new HashMap<String, Listener>();
@ -572,14 +570,6 @@ public class ClusterManagerImpl implements ClusterManager {
Connection conn = getHeartbeatConnection();
_mshostDao.update(conn, _mshostId, getCurrentRunId(), DateUtil.currentGMTTime());
if (_state == State.Starting) {
synchronized (stateLock) {
_mshostDao.update(conn, _mshostId, getCurrentRunId(), State.Up, DateUtil.currentGMTTime());
_state = State.Up;
stateLock.notifyAll();
}
}
if (s_logger.isTraceEnabled()) {
s_logger.trace("Cluster manager peer-scan, id:" + _mshostId);
}
@ -887,7 +877,7 @@ public class ClusterManagerImpl implements ClusterManager {
mshost.setLastUpdateTime(DateUtil.currentGMTTime());
mshost.setRemoved(null);
mshost.setAlertCount(0);
mshost.setState(ManagementServerHost.State.Starting);
mshost.setState(ManagementServerHost.State.Up);
_mshostDao.persist(mshost);
if (s_logger.isInfoEnabled()) {
@ -907,26 +897,15 @@ public class ClusterManagerImpl implements ClusterManager {
if (s_logger.isInfoEnabled()) {
s_logger.info("Management server (host id : " + _mshostId + ") is being started at " + _clusterNodeIP + ":" + _currentServiceAdapter.getServicePort());
}
// Initiate agent rebalancing
if (_agentLBEnabled) {
s_logger.debug("Management server " + _msId + " is asking other peers to rebalance their agents");
_rebalanceService.startRebalanceAgents();
}
// use seperate thread for heartbeat updates
_heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS);
_notificationExecutor.submit(getNotificationTask());
//wait here for heartbeat task to update the host state
try {
synchronized (stateLock) {
while (_state != State.Up) {
stateLock.wait();
}
}
} catch (final InterruptedException e) {
}
//Initiate agent rebalancing after the host is in UP state
if (_agentLBEnabled) {
_rebalanceService.scheduleRebalanceAgents();
}
} catch (Throwable e) {
s_logger.error("Unexpected exception : ", e);

View File

@ -7,7 +7,7 @@ import com.cloud.host.Status.Event;
public interface ClusteredAgentRebalanceService {
public static final int DEFAULT_TRANSFER_CHECK_INTERVAL = 10000;
void startRebalanceAgents();
void scheduleRebalanceAgents();
boolean executeRebalanceRequest(long agentId, long currentOwnerId, long futureOwnerId, Event event) throws AgentUnavailableException, OperationTimedoutException;

View File

@ -183,7 +183,7 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase<ManagementServer
pstmt.setInt(4, servicePort);
pstmt.setString(5, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), lastUpdate));
pstmt.setLong(6, runid);
pstmt.setString(7, State.Starting.toString());
pstmt.setString(7, ManagementServerHost.State.Up.toString());
pstmt.setLong(8, id);
pstmt.executeUpdate();