bug 10588: code to sync VM state at cluster level and dettached from ping command

This commit is contained in:
Abhinandan Prateek 2011-09-27 13:18:21 +05:30
parent ccd47c1b21
commit 83c6cf3db0
4 changed files with 292 additions and 340 deletions

View File

@ -90,6 +90,8 @@ import com.cloud.agent.api.GetVmStatsCommand;
import com.cloud.agent.api.GetVncPortAnswer;
import com.cloud.agent.api.GetVncPortCommand;
import com.cloud.agent.api.HostStatsEntry;
import com.cloud.agent.api.ClusterSyncAnswer;
import com.cloud.agent.api.ClusterSyncCommand;
import com.cloud.agent.api.MaintainAnswer;
import com.cloud.agent.api.MaintainCommand;
import com.cloud.agent.api.ManageSnapshotAnswer;
@ -255,7 +257,7 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
protected long _dcId;
protected String _pod;
protected String _cluster;
protected HashMap<String, State> _vms = new HashMap<String, State>(71);
protected static final XenServerPoolVms _vms = new XenServerPoolVms();
protected String _privateNetworkName;
protected String _linkLocalPrivateNetworkName;
protected String _publicNetworkName;
@ -505,6 +507,8 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
return execute((SetFirewallRulesCommand)cmd);
} else if (cmd instanceof BumpUpPriorityCommand) {
return execute((BumpUpPriorityCommand)cmd);
} else if (cmd instanceof ClusterSyncCommand) {
return execute((ClusterSyncCommand)cmd);
} else {
return Answer.createUnsupportedCommandAnswer(cmd);
}
@ -1081,9 +1085,8 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
}
}
synchronized (_vms) {
_vms.put(vmName, State.Starting);
}
_vms.put(_cluster, _name, vmName, State.Starting);
Host host = Host.getByUuid(conn, _host.uuid);
vm = createVmFromTemplate(conn, vmSpec, host);
@ -1164,9 +1167,9 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
} finally {
synchronized (_vms) {
if (state != State.Stopped) {
_vms.put(vmName, state);
_vms.put(_cluster, _name, vmName, state);
} else {
_vms.remove(vmName);
_vms.remove(_cluster, _name, vmName);
}
}
}
@ -2044,13 +2047,12 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
return state == null ? State.Unknown : state;
}
protected HashMap<String, State> getAllVms(Connection conn) {
final HashMap<String, State> vmStates = new HashMap<String, State>();
Set<VM> vms = null;
protected HashMap<String, Pair<String, State>> getAllVms(Connection conn) {
final HashMap<String, Pair<String, State>> vmStates = new HashMap<String, Pair<String, State>>();
Map<VM, VM.Record> vm_map = null;
for (int i = 0; i < 2; i++) {
try {
Host host = Host.getByUuid(conn, _host.uuid);
vms = host.getResidentVMs(conn);
vm_map = VM.getAllRecords(conn); //USE THIS TO GET ALL VMS FROM A CLUSTER
break;
} catch (final Throwable e) {
s_logger.warn("Unable to get vms", e);
@ -2061,29 +2063,11 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
}
}
if (vms == null) {
if (vm_map == null) {
return null;
}
for (VM vm : vms) {
VM.Record record = null;
for (int i = 0; i < 2; i++) {
try {
record = vm.getRecord(conn);
break;
} catch (XenAPIException e1) {
s_logger.debug("VM.getRecord failed on host:" + _host.uuid + " due to " + e1.toString());
} catch (XmlRpcException e1) {
s_logger.debug("VM.getRecord failed on host:" + _host.uuid + " due to " + e1.getMessage());
}
try {
Thread.sleep(1000);
} catch (final InterruptedException ex) {
}
}
if (record == null) {
continue;
}
for (VM.Record record: vm_map.values()) {
if (record.isControlDomain || record.isASnapshot || record.isATemplate) {
continue; // Skip DOM0
}
@ -2093,7 +2077,23 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
if (s_logger.isTraceEnabled()) {
s_logger.trace("VM " + record.nameLabel + ": powerstate = " + ps + "; vm state=" + state.toString());
}
vmStates.put(record.nameLabel, state);
Host host = record.residentOn;
String host_uuid = null;
if( ! isRefNull(host) ) {
try {
host_uuid = host.getUuid(conn);
} catch (BadServerResponse e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (XenAPIException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (XmlRpcException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
vmStates.put(record.nameLabel, new Pair<String, State>(host_uuid, state));
}
}
return vmStates;
@ -2146,7 +2146,7 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
Integer vncPort = null;
if (state == State.Running) {
synchronized (_vms) {
_vms.put(vmName, State.Running);
_vms.put(_cluster, _name, vmName, State.Running);
}
}
@ -2168,9 +2168,7 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
for (NicTO nic : nics) {
getNetwork(conn, nic);
}
synchronized (_vms) {
_vms.put(vm.getName(), State.Migrating);
}
_vms.put(_cluster, _name, vm.getName(), State.Migrating);
return new PrepareForMigrationAnswer(cmd);
} catch (Exception e) {
@ -2404,10 +2402,8 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
final String vmName = cmd.getVmName();
State state = null;
synchronized (_vms) {
state = _vms.get(vmName);
_vms.put(vmName, State.Stopping);
}
state = _vms.getState(_cluster, vmName);
_vms.put(_cluster, _name, vmName, State.Stopping);
try {
Set<VM> vms = VM.getByNameLabel(conn, vmName);
@ -2473,9 +2469,7 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
s_logger.warn(msg, e);
return new MigrateAnswer(cmd, false, msg, null);
} finally {
synchronized (_vms) {
_vms.put(vmName, state);
}
_vms.put(_cluster, _name, vmName, state);
}
}
@ -2525,105 +2519,7 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
throw new CloudRuntimeException("Com'on no control domain? What the crap?!#@!##$@");
}
protected HashMap<String, State> deltaSync(Connection conn) {
HashMap<String, State> newStates;
HashMap<String, State> oldStates = null;
final HashMap<String, State> changes = new HashMap<String, State>();
newStates = getAllVms(conn);
if (newStates == null) {
s_logger.debug("Unable to get the vm states so no state sync at this point.");
return null;
}
synchronized (_vms) {
oldStates = new HashMap<String, State>(_vms.size());
oldStates.putAll(_vms);
for (final Map.Entry<String, State> entry : newStates.entrySet()) {
final String vm = entry.getKey();
State newState = entry.getValue();
final State oldState = oldStates.remove(vm);
if (newState == State.Stopped && oldState != State.Stopping && oldState != null && oldState != State.Stopped) {
newState = getRealPowerState(conn, vm);
}
if (s_logger.isTraceEnabled()) {
s_logger.trace("VM " + vm + ": xen has state " + newState + " and we have state " + (oldState != null ? oldState.toString() : "null"));
}
if (vm.startsWith("migrating")) {
s_logger.debug("Migrating from xen detected. Skipping");
continue;
}
if (oldState == null) {
_vms.put(vm, newState);
s_logger.debug("Detecting a new state but couldn't find a old state so adding it to the changes: " + vm);
changes.put(vm, newState);
} else if (oldState == State.Starting) {
if (newState == State.Running) {
_vms.put(vm, newState);
} else if (newState == State.Stopped) {
s_logger.debug("Ignoring vm " + vm + " because of a lag in starting the vm.");
}
} else if (oldState == State.Migrating) {
if (newState == State.Running) {
s_logger.debug("Detected that an migrating VM is now running: " + vm);
_vms.put(vm, newState);
}
} else if (oldState == State.Stopping) {
if (newState == State.Stopped) {
_vms.put(vm, newState);
} else if (newState == State.Running) {
s_logger.debug("Ignoring vm " + vm + " because of a lag in stopping the vm. ");
}
} else if (oldState != newState) {
_vms.put(vm, newState);
if (newState == State.Stopped) {
/*
* if (_vmsKilled.remove(vm)) { s_logger.debug("VM " + vm + " has been killed for storage. ");
* newState = State.Error; }
*/
}
changes.put(vm, newState);
}
}
for (final Map.Entry<String, State> entry : oldStates.entrySet()) {
final String vm = entry.getKey();
final State oldState = entry.getValue();
if (s_logger.isTraceEnabled()) {
s_logger.trace("VM " + vm + " is now missing from xen so reporting stopped");
}
if (oldState == State.Stopping) {
s_logger.debug("Ignoring VM " + vm + " in transition state stopping.");
_vms.remove(vm);
} else if (oldState == State.Starting) {
s_logger.debug("Ignoring VM " + vm + " in transition state starting.");
} else if (oldState == State.Stopped) {
_vms.remove(vm);
} else if (oldState == State.Migrating) {
s_logger.debug("Ignoring VM " + vm + " in migrating state.");
} else {
State newState = State.Stopped;
changes.put(entry.getKey(), newState);
}
}
}
return changes;
}
protected void fullSync(StartupRoutingCommand cmd, Connection conn) {
synchronized (_vms) {
_vms.clear();
}
protected void fullHostSync(StartupRoutingCommand cmd, Connection conn) {
try {
final HashMap<String, VmState> vmStates = new HashMap<String, VmState>();
Host lhost = Host.getByUuid(conn, _host.uuid);
@ -2639,18 +2535,14 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
Host host = record.residentOn;
String host_uuid = null;
if( ! isRefNull(host) ) {
host_uuid = host.getUuid(conn);
if( host_uuid.equals(_host.uuid)) {
synchronized (_vms) {
_vms.put(vm_name, state);
}
}
host_uuid = host.getUuid(conn);
VmState vm_state = new StartupRoutingCommand.VmState(state, host_uuid);
vmStates.put(vm_name, vm_state);
_vms.put(_cluster, host_uuid, vm_name, state);
}
if (s_logger.isTraceEnabled()) {
s_logger.trace("VM " + vm_name + ": powerstate = " + ps + "; vm state=" + state.toString());
}
VmState vm_state = new StartupRoutingCommand.VmState(state, host_uuid);
vmStates.put(vm_name, vm_state);
}
}
cmd.setChanges(vmStates);
} catch (final Throwable e) {
@ -2730,9 +2622,7 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
@Override
public RebootAnswer execute(RebootCommand cmd) {
Connection conn = getConnection();
synchronized (_vms) {
_vms.put(cmd.getVmName(), State.Starting);
}
_vms.put(_cluster, _name, cmd.getVmName(), State.Starting);
try {
Set<VM> vms = null;
try {
@ -2755,9 +2645,7 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
}
return new RebootAnswer(cmd, "reboot succeeded", null, null);
} finally {
synchronized (_vms) {
_vms.put(cmd.getVmName(), State.Running);
}
_vms.put(_cluster, _name, cmd.getVmName(), State.Running);
}
}
@ -3219,9 +3107,7 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
if (vms.size() == 0) {
s_logger.info("VM does not exist on XenServer" + _host.uuid);
synchronized (_vms) {
_vms.remove(vmName);
}
_vms.remove(_cluster, _name, vmName);
return new StopAnswer(cmd, "VM does not exist", 0 , 0L, 0L);
}
Long bytesSent = 0L;
@ -3241,11 +3127,8 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
return new StopAnswer(cmd, msg);
}
State state = null;
synchronized (_vms) {
state = _vms.get(vmName);
_vms.put(vmName, State.Stopping);
}
State state = _vms.getState(_cluster, vmName);
_vms.put(_cluster, _name, vmName, State.Stopping);
try {
if (vmr.powerState == VmPowerState.RUNNING) {
@ -3306,9 +3189,7 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
String msg = "VM destroy failed in Stop " + vmName + " Command due to " + e.getMessage();
s_logger.warn(msg, e);
} finally {
synchronized (_vms) {
_vms.put(vmName, state);
}
_vms.put(_cluster, _name, vmName, state);
}
}
}
@ -3874,19 +3755,15 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
}
}
Connection conn = getConnection();
HashMap<String, State> newStates = deltaSync(conn);
if (newStates == null) {
s_logger.warn("Unable to get current status from sync");
return null;
}
if (!_canBridgeFirewall && !_isOvs) {
return new PingRoutingCommand(getType(), id, newStates);
} else if (_isOvs) {
/**if (!_canBridgeFirewall && !_isOvs) {
return new PingRoutingCommand(getType(), id, null);
} else**/
if (_isOvs) {
List<Pair<String, Long>>ovsStates = ovsFullSyncStates();
return new PingRoutingWithOvsCommand(getType(), id, newStates, ovsStates);
return new PingRoutingWithOvsCommand(getType(), id, null, ovsStates);
}else {
HashMap<String, Pair<Long, Long>> nwGrpStates = syncNetworkGroups(conn, id);
return new PingRoutingWithNwGroupsCommand(getType(), id, newStates, nwGrpStates);
return new PingRoutingWithNwGroupsCommand(getType(), id, null, nwGrpStates);
}
} catch (Exception e) {
s_logger.warn("Unable to get current status", e);
@ -4114,9 +3991,9 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
StartupRoutingCommand cmd = new StartupRoutingCommand();
fillHostInfo(conn, cmd);
fullSync(cmd, conn);
fullHostSync(cmd, conn);
cmd.setHypervisorType(HypervisorType.XenServer);
cmd.setCluster(_cluster);
cmd.setCluster(_cluster);
cmd.setPoolSync(true);
StartupStorageCommand sscmd = initializeLocalSR(conn);
@ -5053,7 +4930,7 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
value = (String) params.get("migratewait");
_migratewait = NumbersUtil.parseInt(value, 3600);
if (_pod == null) {
throw new ConfigurationException("Unable to get the pod");
}
@ -6702,4 +6579,157 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
}
return new SetFirewallRulesAnswer(cmd, true, results);
}
protected Answer execute(final ClusterSyncCommand cmd) {
Connection conn = getConnection();
HashMap<String, Pair<String, State>> newStates;
int sync_type;
if (cmd.isRightStep()){
// do full sync
newStates=fullClusterSync(conn);
sync_type = ClusterSyncAnswer.FULL_SYNC;
}
else {
// do delta sync
newStates = deltaClusterSync(conn);
if (newStates == null) {
s_logger.warn("Unable to get current status from sync");
}
sync_type = ClusterSyncAnswer.DELTA_SYNC;
}
cmd.incrStep();
return new ClusterSyncAnswer(cmd.getClusterId(), newStates, sync_type);
}
protected HashMap<String, Pair<String, State>> fullClusterSync(Connection conn) {
_vms.clear(_cluster);
try {
Host lhost = Host.getByUuid(conn, _host.uuid);
Map<VM, VM.Record> vm_map = VM.getAllRecords(conn); //USE THIS TO GET ALL VMS FROM A CLUSTER
for (VM.Record record: vm_map.values()) {
if (record.isControlDomain || record.isASnapshot || record.isATemplate) {
continue; // Skip DOM0
}
String vm_name = record.nameLabel;
VmPowerState ps = record.powerState;
final State state = convertToState(ps);
Host host = record.residentOn;
String host_uuid = null;
if( ! isRefNull(host) ) {
host_uuid = host.getUuid(conn);
_vms.put(_cluster, host_uuid, vm_name, state);
}
if (s_logger.isTraceEnabled()) {
s_logger.trace("VM " + vm_name + ": powerstate = " + ps + "; vm state=" + state.toString());
}
}
} catch (final Throwable e) {
String msg = "Unable to get vms through host " + _host.uuid + " due to to " + e.toString();
s_logger.warn(msg, e);
throw new CloudRuntimeException(msg);
}
return _vms.getClusterVmState(_cluster);
}
protected HashMap<String, Pair<String, State>> deltaClusterSync(Connection conn) {
HashMap<String, Pair<String, State>> newStates;
HashMap<String, Pair<String, State>> oldStates = null;
final HashMap<String, Pair<String, State>> changes = new HashMap<String, Pair<String, State>>();
newStates = getAllVms(conn);
if (newStates == null) {
s_logger.warn("Unable to get the vm states so no state sync at this point.");
return null;
}
synchronized (_vms) {
oldStates = new HashMap<String, Pair<String, State>>(_vms.size(_cluster));
oldStates.putAll(_vms.getClusterVmState(_cluster));
for (final Map.Entry<String, Pair<String, State>> entry : newStates.entrySet()) {
final String vm = entry.getKey();
State newState = entry.getValue().second();
String host_uuid = entry.getValue().first();
final Pair<String, State> oldState = oldStates.remove(vm);
if (newState == State.Stopped && oldState != null && oldState.second() != State.Stopping && oldState.second() != State.Stopped) {
newState = getRealPowerState(conn, vm);
}
if (s_logger.isTraceEnabled()) {
s_logger.trace("VM " + vm + ": xen has state " + newState + " and we have state " + (oldState != null ? oldState.toString() : "null"));
}
if (vm.startsWith("migrating")) {
s_logger.warn("Migrating from xen detected. Skipping");
continue;
}
if (oldState == null) {
_vms.put(_cluster, host_uuid, vm, newState);
s_logger.warn("Detecting a new state but couldn't find a old state so adding it to the changes: " + vm);
changes.put(vm, new Pair<String, State>(host_uuid, newState));
} else if (oldState.second() == State.Starting) {
if (newState == State.Running) {
_vms.put(_cluster, host_uuid, vm, newState);
} else if (newState == State.Stopped) {
s_logger.warn("Ignoring vm " + vm + " because of a lag in starting the vm.");
}
} else if (oldState.second() == State.Migrating) {
if (newState == State.Running) {
s_logger.debug("Detected that an migrating VM is now running: " + vm);
_vms.put(_cluster, host_uuid, vm, newState);
}
} else if (oldState.second() == State.Stopping) {
if (newState == State.Stopped) {
_vms.put(_cluster, host_uuid, vm, newState);
} else if (newState == State.Running) {
s_logger.warn("Ignoring vm " + vm + " because of a lag in stopping the vm. ");
}
} else if (oldState.second() != newState) {
_vms.put(_cluster, host_uuid, vm, newState);
if (newState == State.Stopped) {
/*
* if (_vmsKilled.remove(vm)) { s_logger.debug("VM " + vm + " has been killed for storage. ");
* newState = State.Error; }
*/
}
changes.put(vm, new Pair<String, State>(host_uuid, newState));
}
}
for (final Map.Entry<String, Pair<String, State>> entry : oldStates.entrySet()) {
final String vm = entry.getKey();
final State oldState = entry.getValue().second();
String host_uuid = entry.getValue().first();
if (s_logger.isTraceEnabled()) {
s_logger.trace("VM " + vm + " is now missing from xen so reporting stopped");
}
if (oldState == State.Stopping) {
s_logger.warn("Ignoring VM " + vm + " in transition state stopping.");
_vms.remove(_cluster, host_uuid, vm);
} else if (oldState == State.Starting) {
s_logger.warn("Ignoring VM " + vm + " in transition state starting.");
} else if (oldState == State.Stopped) {
_vms.remove(_cluster, host_uuid, vm);
} else if (oldState == State.Migrating) {
s_logger.warn("Ignoring VM " + vm + " in migrating state.");
} else {
State newState = State.Stopped;
changes.put(vm, new Pair<String, State>(host_uuid, newState));
}
}
}
return changes;
}
}

View File

@ -27,7 +27,6 @@ public class XenServerPoolVms {
}
public void clear(String clusterId){
s_logger.debug("Clearing vm stated for cluster=" + clusterId);
HashMap<String, Pair<String, State>> _vms= getClusterVmState(clusterId);
synchronized (_vms) {
_vms.clear();
@ -35,14 +34,12 @@ public class XenServerPoolVms {
}
public State getState(String clusterId, String name){
s_logger.debug("Getting state for vmname=" + name + ", for clusterid=" + clusterId + ", " + this);
HashMap<String, Pair<String, State>> vms = getClusterVmState(clusterId);
Pair<String, State> pv = vms.get(name);
return pv.second();
}
public void put(String clusterId, String hostUuid, String name, State state){
s_logger.debug("Adding hostUuid=" + hostUuid + ", vm=" + name + ", state=" + state + ", " + this);
HashMap<String, Pair<String, State>> vms= getClusterVmState(clusterId);
synchronized (vms) {
vms.put(name, new Pair<String, State>(hostUuid, state));
@ -75,7 +72,7 @@ public class XenServerPoolVms {
@Override
public String toString(){
StringBuilder sbuf = new StringBuilder("PoolVms");
StringBuilder sbuf = new StringBuilder("PoolVms=");
for (HashMap<String/* vm name */, Pair<String/* host uuid */, State/* vm state */>> clusterVM: _cluster_vms.values()){
for (String vmname: clusterVM.keySet()){
sbuf.append(vmname).append("-").append(clusterVM.get(vmname).second()).append(",");

View File

@ -58,6 +58,8 @@ import com.cloud.agent.api.StartupRoutingCommand.VmState;
import com.cloud.agent.api.StopAnswer;
import com.cloud.agent.api.StopCommand;
import com.cloud.agent.api.to.VirtualMachineTO;
import com.cloud.agent.api.ClusterSyncAnswer;
import com.cloud.agent.api.ClusterSyncCommand;
import com.cloud.agent.manager.Commands;
import com.cloud.agent.manager.allocator.HostAllocator;
import com.cloud.alert.AlertManager;
@ -233,6 +235,7 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
protected long _cancelWait;
protected long _opWaitInterval;
protected int _lockStateRetry;
private static HashMap<String, Commands> _syncCommandMap= new HashMap<String, Commands>();
@Override
public <T extends VMInstanceVO> void registerGuru(VirtualMachine.Type type, VirtualMachineGuru<T> guru) {
@ -1500,7 +1503,7 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
return new StopCommand(vmName);
}
public Commands deltaSync(long hostId, Map<String, State> newStates) {
public Commands deltaSync(Map<String, Pair<String, State>> newStates) {
Map<Long, AgentVmInfo> states = convertDeltaToInfos(newStates);
Commands commands = new Commands(OnError.Continue);
@ -1512,7 +1515,7 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
Command command = null;
if (vm != null) {
HypervisorGuru hvGuru = _hvGuruMgr.getGuru(vm.getHypervisorType());
command = compareState(hostId, vm, info, false, hvGuru.trackVmHostChange());
command = compareState(vm.hostId, vm, info, false, hvGuru.trackVmHostChange());
} else {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Cleaning up a VM that is no longer found: " + info.name);
@ -1529,7 +1532,44 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
}
protected Map<Long, AgentVmInfo> convertDeltaToInfos(final Map<String, State> states) {
public Commands fullSync(final long clusterId, Map<String, Pair<String, State>> newStates) {
Commands commands = new Commands(OnError.Continue);
Map<Long, AgentVmInfo> infos = convertToInfos(newStates);
long hId = 0;
final List<VMInstanceVO> vms = _vmDao.listByClusterId(clusterId);
for (VMInstanceVO vm : vms) {
AgentVmInfo info = infos.remove(vm.getId());
VMInstanceVO castedVm = null;
if (info == null) {
info = new AgentVmInfo(vm.getInstanceName(), getVmGuru(vm), vm, State.Stopped);
castedVm = info.guru.findById(vm.getId());
hId = vm.getHostId() == null ? vm.getLastHostId() : vm.getHostId();
} else {
castedVm = info.vm;
String host_guid = info.getHostUuid();
Host host = _hostDao.findByGuid(host_guid);
if ( host == null ) {
infos.put(vm.getId(), info);
continue;
}
hId = host.getId();
}
HypervisorGuru hvGuru = _hvGuruMgr.getGuru(castedVm.getHypervisorType());
Command command = compareState(hId, castedVm, info, true, hvGuru.trackVmHostChange());
if (command != null) {
commands.addCommand(command);
}
}
for (final AgentVmInfo left : infos.values()) {
s_logger.warn("Stopping a VM that we have no record of: " + left.name);
commands.addCommand(cleanup(left.name));
}
return commands;
}
protected Map<Long, AgentVmInfo> convertDeltaToInfos(final Map<String, Pair<String, State>> states) {
final HashMap<Long, AgentVmInfo> map = new HashMap<Long, AgentVmInfo>();
if (states == null) {
@ -1538,20 +1578,20 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
Collection<VirtualMachineGuru<? extends VMInstanceVO>> vmGurus = _vmGurus.values();
for (Map.Entry<String, State> entry : states.entrySet()) {
for (Map.Entry<String, Pair<String, State>> entry : states.entrySet()) {
for (VirtualMachineGuru<? extends VMInstanceVO> vmGuru : vmGurus) {
String name = entry.getKey();
VMInstanceVO vm = vmGuru.findByName(name);
if (vm != null) {
map.put(vm.getId(), new AgentVmInfo(entry.getKey(), vmGuru, vm, entry.getValue()));
map.put(vm.getId(), new AgentVmInfo(entry.getKey(), vmGuru, vm, entry.getValue().second()));
break;
}
Long id = vmGuru.convertToId(name);
if (id != null) {
map.put(id, new AgentVmInfo(entry.getKey(), vmGuru, null,entry.getValue()));
map.put(id, new AgentVmInfo(entry.getKey(), vmGuru, null,entry.getValue().second()));
break;
}
}
@ -1560,29 +1600,26 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
return map;
}
protected Map<Long, AgentVmInfo> convertToInfos(final Map<String, VmState> states) {
protected Map<Long, AgentVmInfo> convertToInfos(final Map<String, Pair<String, State>> newStates) {
final HashMap<Long, AgentVmInfo> map = new HashMap<Long, AgentVmInfo>();
if (states == null) {
if (newStates == null) {
return map;
}
Collection<VirtualMachineGuru<? extends VMInstanceVO>> vmGurus = _vmGurus.values();
for (Map.Entry<String, VmState> entry : states.entrySet()) {
for (Map.Entry<String, Pair<String, State>> entry : newStates.entrySet()) {
for (VirtualMachineGuru<? extends VMInstanceVO> vmGuru : vmGurus) {
String name = entry.getKey();
VMInstanceVO vm = vmGuru.findByName(name);
if (vm != null) {
map.put(vm.getId(), new AgentVmInfo(entry.getKey(), vmGuru, vm, entry.getValue().getState(), entry.getValue().getHost() ));
if (vm != null) {
map.put(vm.getId(), new AgentVmInfo(entry.getKey(), vmGuru, vm, entry.getValue().second(), entry.getValue().first() ));
break;
}
Long id = vmGuru.convertToId(name);
if (id != null) {
map.put(id, new AgentVmInfo(entry.getKey(), vmGuru, null,entry.getValue().getState(), entry.getValue().getHost() ));
map.put(id, new AgentVmInfo(entry.getKey(), vmGuru, null,entry.getValue().second(), entry.getValue().first() ));
break;
}
}
@ -1605,9 +1642,9 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
Command command = null;
if (s_logger.isDebugEnabled()) {
s_logger.debug("VM " + serverName + ": server state = " + serverState + " and agent state = " + agentState);
s_logger.debug("VM " + serverName + ": cs state = " + serverState + " and realState = " + agentState);
}
if (agentState == State.Error) {
agentState = State.Stopped;
@ -1816,94 +1853,6 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
}
public Commands fullSync(final long hostId, StartupRoutingCommand startup) {
Commands commands = new Commands(OnError.Continue);
Map<Long, AgentVmInfo> infos = convertToInfos(startup.getVmStates());
if( startup.isPoolSync()) {
long hId = 0;
Host host = _hostDao.findById(hostId);
long clusterId= host.getClusterId();
final List<? extends VMInstanceVO> vms = _vmDao.listByClusterId(clusterId);
s_logger.debug("Found " + vms.size() + " VMs for cluster " + clusterId);
for (VMInstanceVO vm : vms) {
AgentVmInfo info = infos.remove(vm.getId());
VMInstanceVO castedVm = null;
if (info == null) {
info = new AgentVmInfo(vm.getInstanceName(), getVmGuru(vm), vm, State.Stopped);
hId = 0;
castedVm = info.guru.findById(vm.getId());
} else {
castedVm = info.vm;
String host_guid = info.getHost();
host = _hostDao.findByGuid(host_guid);
if ( host == null ) {
infos.put(vm.getId(), info);
continue;
}
hId = host.getId();
HypervisorGuru hvGuru = _hvGuruMgr.getGuru(castedVm.getHypervisorType());
Command command = compareState(hId, castedVm, info, true, hvGuru.trackVmHostChange());
if (command != null) {
commands.addCommand(command);
}
}
}
for (final AgentVmInfo left : infos.values()) {
s_logger.warn("Stopping a VM that we have no record of: " + left.name);
commands.addCommand(cleanup(left.name));
}
} else {
final List<? extends VMInstanceVO> vms = _vmDao.listByHostId(hostId);
s_logger.debug("Found " + vms.size() + " VMs for host " + hostId);
for (VMInstanceVO vm : vms) {
AgentVmInfo info = infos.remove(vm.getId());
VMInstanceVO castedVm = null;
if (info == null) {
info = new AgentVmInfo(vm.getInstanceName(), getVmGuru(vm), vm, State.Stopped);
castedVm = info.guru.findById(vm.getId());
} else {
castedVm = info.vm;
}
HypervisorGuru hvGuru = _hvGuruMgr.getGuru(castedVm.getHypervisorType());
Command command = compareState(hostId, castedVm, info, true, hvGuru.trackVmHostChange());
if (command != null) {
commands.addCommand(command);
}
}
for (final AgentVmInfo left : infos.values()) {
boolean found = false;
for (VirtualMachineGuru<? extends VMInstanceVO> vmGuru : _vmGurus.values()) {
VMInstanceVO vm = vmGuru.findByName(left.name);
if (vm != null) {
found = true;
HypervisorGuru hvGuru = _hvGuruMgr.getGuru(vm.getHypervisorType());
if(hvGuru.trackVmHostChange()) {
Command command = compareState(hostId, vm, left, true, true);
if (command != null) {
commands.addCommand(command);
}
} else {
s_logger.warn("Stopping a VM, VM " + left.name + " migrate from Host " + vm.getHostId() + " to Host " + hostId );
commands.addCommand(cleanup(left.name));
}
break;
}
}
if ( ! found ) {
s_logger.warn("Stopping a VM that we have no record of: " + left.name);
commands.addCommand(cleanup(left.name));
}
}
}
return commands;
}
@Override
public boolean isRecurring() {
return false;
@ -1912,7 +1861,16 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
@Override
public boolean processAnswers(long agentId, long seq, Answer[] answers) {
for (final Answer answer : answers) {
if (!answer.getResult()) {
if (answer instanceof ClusterSyncAnswer){
ClusterSyncAnswer hs = (ClusterSyncAnswer)answer;
if (hs.isFull()){
fullSync(hs.getClusterId(), hs.getNewStates());
}
else {
deltaSync(hs.getNewStates());
}
}
else if (!answer.getResult()) {
s_logger.warn("Cleanup failed due to " + answer.getDetails());
} else {
if (s_logger.isDebugEnabled()) {
@ -1935,24 +1893,7 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
@Override
public boolean processCommands(long agentId, long seq, Command[] cmds) {
boolean processed = false;
for (Command cmd : cmds) {
if (cmd instanceof PingRoutingCommand) {
PingRoutingCommand ping = (PingRoutingCommand) cmd;
if (ping.getNewStates().size() > 0) {
Commands commands = deltaSync(agentId, ping.getNewStates());
if (commands.size() > 0) {
try {
_agentMgr.send(agentId, commands, this);
} catch (final AgentUnavailableException e) {
s_logger.warn("Agent is now unavailable", e);
}
}
}
processed = true;
}
}
return processed;
return false;
}
@Override
@ -1982,32 +1923,16 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
}
long agentId = agent.getId();
StartupRoutingCommand startup = (StartupRoutingCommand) cmd;
Commands commands = fullSync(agentId, startup);
if (commands.size() > 0) {
s_logger.debug("Sending clean commands to the agent");
long clusterId = agent.getClusterId();
//check if the cluster already has a host sync command set
if (_syncCommandMap.get(clusterId +"") == null){
// send the cluster sync command if this is the first host in the cluster
Commands syncCmd = new Commands(new ClusterSyncCommand(60, 20, clusterId));
try {
boolean error = false;
Answer[] answers = _agentMgr.send(agentId, commands);
for (Answer answer : answers) {
if (!answer.getResult()) {
s_logger.warn("Unable to stop a VM due to " + answer.getDetails());
error = true;
}
}
if (error) {
throw new ConnectionException(true, "Unable to stop VMs");
}
} catch (final AgentUnavailableException e) {
s_logger.warn("Agent is unavailable now", e);
throw new ConnectionException(true, "Unable to sync", e);
} catch (final OperationTimedoutException e) {
s_logger.warn("Agent is unavailable now", e);
throw new ConnectionException(true, "Unable to sync", e);
long seq_no=_agentMgr.send(agentId, syncCmd, this);
_syncCommandMap.put(clusterId +"", syncCmd);
} catch (AgentUnavailableException e1) {
e1.printStackTrace();
}
}
}
@ -2048,7 +1973,7 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
protected class AgentVmInfo {
public String name;
public State state;
public String host;
public String hostUuid;
public VMInstanceVO vm;
public VirtualMachineGuru<VMInstanceVO> guru;
@ -2058,15 +1983,15 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
this.state = state;
this.vm = vm;
this.guru = (VirtualMachineGuru<VMInstanceVO>) guru;
this.host = host;
this.hostUuid = host;
}
public AgentVmInfo(String name, VirtualMachineGuru<? extends VMInstanceVO> guru, VMInstanceVO vm, State state) {
this(name, guru, vm, state, null);
}
public String getHost() {
return host;
public String getHostUuid() {
return hostUuid;
}
}

View File

@ -76,7 +76,7 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
VMClusterSearch = createSearchBuilder();
SearchBuilder<HostVO> hostSearch = _hostDao.createSearchBuilder();
VMClusterSearch.join("hostSearch", hostSearch, hostSearch.entity().getId(), VMClusterSearch.entity().getHostId(), JoinType.INNER);
VMClusterSearch.join("hostSearch", hostSearch, hostSearch.entity().getId(), VMClusterSearch.entity().getLastHostId(), JoinType.INNER);
hostSearch.and("clusterId", hostSearch.entity().getClusterId(), SearchCriteria.Op.EQ);
VMClusterSearch.done();