KVM agent connet:

* send StartupAnswer right after StartupCommand is recieved
* if post processor going wrong, send out readycommand with error message to agent, then agent will exit
This commit is contained in:
Edison Xu 2012-10-29 15:29:36 -07:00 committed by Alena Prokharchyk
parent c10eeb6036
commit b101dc7279
3 changed files with 85 additions and 85 deletions

View File

@ -27,7 +27,6 @@ import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
@ -48,6 +47,7 @@ import com.cloud.agent.api.MaintainAnswer;
import com.cloud.agent.api.MaintainCommand;
import com.cloud.agent.api.ModifySshKeysCommand;
import com.cloud.agent.api.PingCommand;
import com.cloud.agent.api.ReadyCommand;
import com.cloud.agent.api.ShutdownCommand;
import com.cloud.agent.api.StartupAnswer;
import com.cloud.agent.api.StartupCommand;
@ -491,6 +491,10 @@ public class Agent implements HandlerFactory, IAgentControl {
cancelTasks();
_reconnectAllowed = false;
answer = new Answer(cmd, true, null);
} else if (cmd instanceof ReadyCommand && ((ReadyCommand)cmd).getDetails() != null) {
s_logger.debug("Not ready to connect to mgt server: " + ((ReadyCommand)cmd).getDetails());
System.exit(1);
return;
} else if (cmd instanceof MaintainCommand) {
s_logger.debug("Received maintainCommand" );
cancelTasks();
@ -513,6 +517,9 @@ public class Agent implements HandlerFactory, IAgentControl {
}
} else {
if (cmd instanceof ReadyCommand) {
processReadyCommand((ReadyCommand)cmd);
}
_inProgress.incrementAndGet();
try {
answer = _resource.executeRequest(cmd);
@ -576,6 +583,19 @@ public class Agent implements HandlerFactory, IAgentControl {
setLastPingResponseTime();
}
}
public void processReadyCommand(Command cmd) {
final ReadyCommand ready = (ReadyCommand) cmd;
s_logger.info("Proccess agent ready command, agent id = " + ready.getHostId());
if (ready.getHostId() != null) {
setId(ready.getHostId());
}
s_logger.info("Ready command is processed: agent id = " + getId());
}
public void processOtherTask(Task task) {
final Object obj = task.get();
@ -601,6 +621,7 @@ public class Agent implements HandlerFactory, IAgentControl {
} catch (final ClosedChannelException e) {
s_logger.warn("Unable to send request: " + request.toString());
}
} else if (obj instanceof Request) {
final Request req = (Request) obj;
final Command command = req.getCommand();

View File

@ -23,12 +23,18 @@ public class ReadyCommand extends Command {
}
private Long dcId;
private Long hostId;
public ReadyCommand(Long dcId) {
super();
this.dcId = dcId;
}
public ReadyCommand(Long dcId, Long hostId) {
this(dcId);
this.hostId = hostId;
}
public void setDetails(String details) {
_details = details;
}
@ -46,4 +52,7 @@ public class ReadyCommand extends Command {
return true;
}
public Long getHostId() {
return hostId;
}
}

View File

@ -634,7 +634,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
}
Long dcId = host.getDataCenterId();
ReadyCommand ready = new ReadyCommand(dcId);
ReadyCommand ready = new ReadyCommand(dcId, host.getId());
Answer answer = easySend(hostId, ready);
if (answer == null || !answer.getResult()) {
// this is tricky part for secondary storage
@ -1096,91 +1096,37 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
return attache;
}
//TODO: handle mycloud specific
private AgentAttache handleConnectedAgent(final Link link, final StartupCommand[] startup, Request request) {
AgentAttache attache = null;
StartupAnswer[] answers = new StartupAnswer[startup.length];
try {
HostVO host = _resourceMgr.createHostVOForConnectedAgent(startup);
ReadyCommand ready = null;
try {
HostVO host = _resourceMgr.createHostVOForConnectedAgent(startup);
if (host != null) {
ready = new ReadyCommand(host.getDataCenterId(), host.getId());
attache = createAttacheForConnect(host, link);
attache = notifyMonitorsOfConnection(attache, startup, false);
}
Command cmd;
for (int i = 0; i < startup.length; i++) {
cmd = startup[i];
if ((cmd instanceof StartupRoutingCommand) || (cmd instanceof StartupProxyCommand) || (cmd instanceof StartupSecondaryStorageCommand) || (cmd instanceof StartupStorageCommand)) {
answers[i] = new StartupAnswer(startup[i], attache.getId(), getPingInterval());
break;
}
}
}catch (ConnectionException e) {
Command cmd;
for (int i = 0; i < startup.length; i++) {
cmd = startup[i];
if ((cmd instanceof StartupRoutingCommand) || (cmd instanceof StartupProxyCommand) || (cmd instanceof StartupSecondaryStorageCommand) || (cmd instanceof StartupStorageCommand)) {
answers[i] = new StartupAnswer(startup[i], e.toString());
break;
}
}
} catch (IllegalArgumentException e) {
Command cmd;
for (int i = 0; i < startup.length; i++) {
cmd = startup[i];
if ((cmd instanceof StartupRoutingCommand) || (cmd instanceof StartupProxyCommand) || (cmd instanceof StartupSecondaryStorageCommand) || (cmd instanceof StartupStorageCommand)) {
answers[i] = new StartupAnswer(startup[i], e.toString());
break;
}
}
} catch (CloudRuntimeException e) {
Command cmd;
for (int i = 0; i < startup.length; i++) {
cmd = startup[i];
if ((cmd instanceof StartupRoutingCommand) || (cmd instanceof StartupProxyCommand) || (cmd instanceof StartupSecondaryStorageCommand) || (cmd instanceof StartupStorageCommand)) {
answers[i] = new StartupAnswer(startup[i], e.toString());
break;
}
}
}
Response response = null;
if (attache != null) {
response = new Response(request, answers[0], _nodeId, attache.getId());
} else {
response = new Response(request, answers[0], _nodeId, -1);
} catch (Exception e) {
s_logger.debug("Failed to handle host connection: " + e.toString());
ready = new ReadyCommand(null);
ready.setDetails(e.toString());
} finally {
if (ready == null) {
ready = new ReadyCommand(null);
}
}
try {
link.send(response.toBytes());
} catch (ClosedChannelException e) {
s_logger.debug("Failed to send startupanswer: " + e.toString());
return null;
}
if (attache == null) {
return null;
}
try {
attache = notifyMonitorsOfConnection(attache, startup, false);
return attache;
} catch (ConnectionException e) {
ReadyCommand ready = new ReadyCommand(null);
ready.setDetails(e.toString());
try {
if (attache == null) {
final Request readyRequest = new Request(-1, -1, ready, false);
link.send(readyRequest.getBytes());
} else {
easySend(attache.getId(), ready);
} catch (Exception e1) {
s_logger.debug("Failed to send readycommand, due to " + e.toString());
}
return null;
} catch (CloudRuntimeException e) {
ReadyCommand ready = new ReadyCommand(null);
ready.setDetails(e.toString());
try {
easySend(attache.getId(), ready);
} catch (Exception e1) {
s_logger.debug("Failed to send readycommand, due to " + e.toString());
}
return null;
} catch (Exception e) {
s_logger.debug("Failed to send ready command:" + e.toString());
}
return attache;
}
protected class SimulateStartTask implements Runnable {
@ -1233,6 +1179,7 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
for (int i = 0; i < _cmds.length; i++) {
startups[i] = (StartupCommand) _cmds[i];
}
AgentAttache attache = handleConnectedAgent(_link, startups, _request);
if (attache == null) {
s_logger.warn("Unable to create attache for agent: " + _request);
@ -1241,6 +1188,23 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
}
protected void connectAgent(Link link, final Command[] cmds, final Request request) {
//send startupanswer to agent in the very beginning, so agent can move on without waiting for the answer for an undetermined time, if we put this logic into another thread pool.
StartupAnswer[] answers = new StartupAnswer[cmds.length];
Command cmd;
for (int i = 0; i < cmds.length; i++) {
cmd = cmds[i];
if ((cmd instanceof StartupRoutingCommand) || (cmd instanceof StartupProxyCommand) || (cmd instanceof StartupSecondaryStorageCommand) || (cmd instanceof StartupStorageCommand)) {
answers[i] = new StartupAnswer((StartupCommand)cmds[i], 0, getPingInterval());
break;
}
}
Response response = null;
response = new Response(request, answers[0], _nodeId, -1);
try {
link.send(response.toBytes());
} catch (ClosedChannelException e) {
s_logger.debug("Failed to send startupanswer: " + e.toString());
}
_connectExecutor.execute(new HandleAgentConnectTask(link, cmds, request));
}
@ -1327,17 +1291,23 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory, Manager {
if (cmd instanceof PingRoutingCommand) {
boolean gatewayAccessible = ((PingRoutingCommand) cmd).isGatewayAccessible();
HostVO host = _hostDao.findById(Long.valueOf(cmdHostId));
if (!gatewayAccessible) {
// alert that host lost connection to
// gateway (cannot ping the default route)
DataCenterVO dcVO = _dcDao.findById(host.getDataCenterId());
HostPodVO podVO = _podDao.findById(host.getPodId());
String hostDesc = "name: " + host.getName() + " (id:" + host.getId() + "), availability zone: " + dcVO.getName() + ", pod: " + podVO.getName();
if (host != null) {
if (!gatewayAccessible) {
// alert that host lost connection to
// gateway (cannot ping the default route)
DataCenterVO dcVO = _dcDao.findById(host.getDataCenterId());
HostPodVO podVO = _podDao.findById(host.getPodId());
String hostDesc = "name: " + host.getName() + " (id:" + host.getId() + "), availability zone: " + dcVO.getName() + ", pod: " + podVO.getName();
_alertMgr.sendAlert(AlertManager.ALERT_TYPE_ROUTING, host.getDataCenterId(), host.getPodId(), "Host lost connection to gateway, " + hostDesc, "Host [" + hostDesc
+ "] lost connection to gateway (default route) and is possibly having network connection issues.");
_alertMgr.sendAlert(AlertManager.ALERT_TYPE_ROUTING, host.getDataCenterId(), host.getPodId(), "Host lost connection to gateway, " + hostDesc, "Host [" + hostDesc
+ "] lost connection to gateway (default route) and is possibly having network connection issues.");
} else {
_alertMgr.clearAlert(AlertManager.ALERT_TYPE_ROUTING, host.getDataCenterId(), host.getPodId());
}
} else {
_alertMgr.clearAlert(AlertManager.ALERT_TYPE_ROUTING, host.getDataCenterId(), host.getPodId());
s_logger.debug("Not processing " + PingRoutingCommand.class.getSimpleName() +
" for agent id=" + cmdHostId + "; can't find the host in the DB");
}
}
answer = new PingAnswer((PingCommand) cmd);