Fixes problems in routing between management servers

This commit is contained in:
Alex Huang 2011-06-05 16:06:54 -07:00
parent a0ce3da191
commit 019cc78976
4 changed files with 14 additions and 13 deletions

View File

@ -53,7 +53,7 @@ public class RequestTest extends TestCase {
cmd2.addPortConfig("abc", "24", true, "eth0");
cmd2.addPortConfig("127.0.0.1", "44", false, "eth1");
Request sreq = new Request(2, 3, new Command[] { cmd1, cmd2, cmd3 }, true, true);
sreq.setSequence(1);
sreq.setSequence(892403717);
Logger logger = Logger.getLogger(GsonHelper.class);
Level level = logger.getLevel();
@ -82,7 +82,7 @@ public class RequestTest extends TestCase {
byte[] bytes = sreq.getBytes();
assert Request.getSequence(bytes) == 1;
assert Request.getSequence(bytes) == 892403717;
assert Request.getManagementServerId(bytes) == 3;
assert Request.getAgentId(bytes) == 2;
assert Request.getViaAgentId(bytes) == 2;

View File

@ -329,9 +329,8 @@ public abstract class AgentAttache {
public void send(Request req, final Listener listener) throws AgentUnavailableException {
checkAvailability(req.getCommands());
long seq = getNextSequence();
req.setSequence(seq);
long seq = req.getSequence();
if (listener != null) {
registerListener(seq, listener);
} else if (s_logger.isDebugEnabled()) {
@ -376,9 +375,8 @@ public abstract class AgentAttache {
public Answer[] send(Request req, int wait) throws AgentUnavailableException, OperationTimedoutException {
SynchronousListener sl = new SynchronousListener(null);
long seq = getNextSequence();
req.setSequence(seq);
long seq = req.getSequence();
send(req, sl);
try {

View File

@ -804,6 +804,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
}
Request req = new Request(hostId, _nodeId, cmds, commands.stopOnError(), true);
req.setSequence(agent.getNextSequence());
Answer[] answers = agent.send(req, timeout);
notifyAnswersToMonitors(hostId, req.getSequence(), answers);
commands.setAnswers(answers);
@ -818,6 +819,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
try {
Request req = new Request(hostId, _nodeId, new CheckHealthCommand(), true);
req.setSequence(agent.getNextSequence());
Answer[] answers = agent.send(req, 50 * 1000);
if (answers != null && answers[0] != null) {
Status status = answers[0].getResult() ? Status.Up : Status.Down;
@ -863,6 +865,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
return -1;
}
Request req = new Request(hostId, _nodeId, cmds, commands.stopOnError(), true);
req.setSequence(agent.getNextSequence());
agent.send(req, listener);
return req.getSequence();
}

View File

@ -85,7 +85,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
public final static int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds
public long _loadSize = 100;
protected Set<Long> _agentToTransferIds = new HashSet<Long>();
private final long rebalanceTimeOut = 300000; // 5 mins - after this time remove the agent from the transfer list
private final long rebalanceTimeOut = 300000; // 5 mins - after this time remove the agent from the transfer list
@Inject
protected ClusterManager _clusterMgr = null;
@ -596,7 +596,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
final byte[] data = task.getData();
Version ver = Request.getVersion(data);
if (ver.ordinal() != Version.v1.ordinal()) {
if (ver.ordinal() != Version.v1.ordinal() && ver.ordinal() != Version.v3.ordinal()) {
s_logger.warn("Wrong version for clustered agent request");
super.doTask(task);
return;
@ -718,7 +718,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
return setToWaitForRebalance(agentId);
} else if (event == Event.StartAgentRebalance) {
return rebalanceHost(agentId);
}
}
return true;
}
@ -907,7 +907,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
if (result) {
s_logger.debug("Got host id=" + hostId + " from management server " + map.getFutureOwner());
}
}
} else {
s_logger.warn("Unable to find agent " + hostId + " on management server " + _nodeId);
@ -935,10 +935,10 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
if (map.getInitialOwner() != _nodeId) {
s_logger.warn("Why finish rebalance called not by initial host owner???");
return false;
}
}
boolean success = (event == Event.RebalanceCompleted) ? true : false;
if (s_logger.isDebugEnabled()) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Finishing rebalancing for the agent " + hostId + " with result " + success);
}