bug 13439: full sync at management server restart, disabling hourly sync

This commit is contained in:
abhi 2012-02-06 15:01:44 +05:30
parent b0aaf75c3b
commit 59c7150af7
9 changed files with 71 additions and 100 deletions

View File

@ -25,12 +25,7 @@ import com.cloud.vm.VirtualMachine.State;
public class ClusterSyncAnswer extends Answer {
private long _clusterId;
private HashMap<String, Pair<String, State>> _newStates;
private HashMap<String, Pair<String, State>> _allStates;
private int _type = -1; // 0 for full, 1 for delta
private boolean _isExecuted=false;
public static final int FULL_SYNC=0;
public static final int DELTA_SYNC=1;
// this is here because a cron command answer is being sent twice
// AgentAttache.processAnswers
@ -47,19 +42,9 @@ public class ClusterSyncAnswer extends Answer {
public ClusterSyncAnswer(long clusterId, HashMap<String, Pair<String, State>> newStates){
_clusterId = clusterId;
_newStates = newStates;
_allStates = null;
_type = DELTA_SYNC;
result = true;
}
public ClusterSyncAnswer(long clusterId, HashMap<String, Pair<String, State>> newStates, HashMap<String, Pair<String, State>> allStates){
_clusterId = clusterId;
_newStates = newStates;
_allStates = allStates;
_type = FULL_SYNC;
result = true;
}
public long getClusterId() {
return _clusterId;
}
@ -68,15 +53,4 @@ public class ClusterSyncAnswer extends Answer {
return _newStates;
}
public HashMap<String, Pair<String, State>> getAllStates() {
return _allStates;
}
public boolean isFull(){
return _type==0;
}
public boolean isDelta(){
return _type==1;
}
}

View File

@ -20,19 +20,15 @@ package com.cloud.agent.api;
public class ClusterSyncCommand extends Command implements CronCommand {
int _interval;
int _skipSteps; // skip this many steps for full sync
int _steps;
long _clusterId;
public ClusterSyncCommand() {
}
public ClusterSyncCommand(int interval, int skipSteps, long clusterId){
public ClusterSyncCommand(int interval, long clusterId){
_interval = interval;
_skipSteps = skipSteps;
_clusterId = clusterId;
_steps=0;
}
@Override
@ -40,19 +36,6 @@ public class ClusterSyncCommand extends Command implements CronCommand {
return _interval;
}
public int getSkipSteps(){
return _skipSteps;
}
public void incrStep(){
_steps++;
if (_steps>=_skipSteps)_steps=0;
}
public boolean isRightStep(){
return (_steps==0);
}
public long getClusterId() {
return _clusterId;
}

View File

@ -6626,16 +6626,7 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
return new Answer(cmd);
}
HashMap<String, Pair<String, State>> newStates = deltaClusterSync(conn);
cmd.incrStep();
if (cmd.isRightStep()){
// do full sync
HashMap<String, Pair<String, State>> allStates=fullClusterSync(conn);
return new ClusterSyncAnswer(cmd.getClusterId(), newStates, allStates);
}
else {
cmd.incrStep();
return new ClusterSyncAnswer(cmd.getClusterId(), newStates);
}
return new ClusterSyncAnswer(cmd.getClusterId(), newStates);
}

View File

@ -160,7 +160,6 @@ public enum Config {
PingInterval("Advanced", AgentManager.class, Integer.class, "ping.interval", "60", "Ping interval in seconds", null),
PingTimeout("Advanced", AgentManager.class, Float.class, "ping.timeout", "2.5", "Multiplier to ping.interval before announcing an agent has timed out", null),
ClusterDeltaSyncInterval("Advanced", AgentManager.class, Integer.class, "sync.interval", "60", "Cluster Delta sync interval in seconds", null),
ClusterFullSyncSkipSteps("Advanced", AgentManager.class, Integer.class, "skip.steps", "60", "Cluster full sync skip steps count", null),
Port("Advanced", AgentManager.class, Integer.class, "port", "8250", "Port to listen on for agent connection.", null),
RouterCpuMHz("Advanced", NetworkManager.class, Integer.class, "router.cpu.mhz", String.valueOf(VirtualNetworkApplianceManager.DEFAULT_ROUTER_CPU_MHZ), "Default CPU speed (MHz) for router VM.", null),
RestartRetryInterval("Advanced", HighAvailabilityManager.class, Integer.class, "restart.retry.interval", "600", "Time (in seconds) between retries to restart a vm", null),

View File

@ -66,5 +66,7 @@ public interface HostDao extends GenericDao<HostVO, Long>, StateDao<Status, Stat
List<HostVO> findAndUpdateApplianceToLoad(long lastPingSecondsAfter, long managementServerId);
boolean updateResourceState(ResourceState oldState, ResourceState.Event event, ResourceState newState, Host vo);
boolean updateResourceState(ResourceState oldState, ResourceState.Event event, ResourceState newState, Host vo);
HostVO findByGuid(String guid);
}

View File

@ -319,6 +319,14 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
List<HostVO> hosts = listBy(sc);
return hosts.size();
}
@Override
public HostVO findByGuid(String guid) {
SearchCriteria<HostVO> sc = GuidSearch.create("guid", guid);
return findOneBy(sc);
}
@Override @DB
public List<HostVO> findAndUpdateDirectAgentToLoad(long lastPingSecondsAfter, Long limit, long managementServerId) {
Transaction txn = Transaction.currentTxn();

View File

@ -1706,43 +1706,62 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
}
public void fullSync(final long clusterId, Map<String, Pair<String, State>> newStates, boolean init) {
public void fullSync(final long clusterId, Map<String, Pair<String, State>> newStates) {
if (newStates==null)return;
Map<Long, AgentVmInfo> infos = convertToInfos(newStates);
Set<VMInstanceVO> set_vms = Collections.synchronizedSet(new HashSet<VMInstanceVO>());
set_vms.addAll(_vmDao.listByClusterId(clusterId));
set_vms.addAll(_vmDao.listStartingByClusterId(clusterId));
set_vms.addAll(_vmDao.listLHByClusterId(clusterId));
for (VMInstanceVO vm : set_vms) {
if (vm.isRemoved() || vm.getState() == State.Destroyed || vm.getState() == State.Expunging) continue;
AgentVmInfo info = infos.remove(vm.getId());
if (init){ // mark the VMs real state on initial sync
VMInstanceVO castedVm = null;
if (info == null && vm.getState() == State.Running) { // only work on VMs which were supposed to be running earlier
info = new AgentVmInfo(vm.getInstanceName(), getVmGuru(vm), vm, State.Stopped);
castedVm = info.guru.findById(vm.getId());
try {
Host host = _resourceMgr.findHostByGuid(info.getHostUuid());
long hostId = host == null ? (vm.getHostId() == null ? vm.getLastHostId() : vm.getHostId()) : host.getId();
HypervisorGuru hvGuru = _hvGuruMgr.getGuru(castedVm.getHypervisorType());
Command command = compareState(hostId, castedVm, info, true, hvGuru.trackVmHostChange());
if (command != null){
Answer answer = _agentMgr.send(hostId, command);
if (!answer.getResult()) {
s_logger.warn("Failed to update state of the VM due to " + answer.getDetails());
}
}
} catch (Exception e) {
s_logger.warn("Unable to update state of the VM due to exception " + e.getMessage());
e.printStackTrace();
}
VMInstanceVO castedVm = null;
if ((info == null && (vm.getState() == State.Running || vm.getState() == State.Starting))
|| (info != null && (info.state == State.Running && vm.getState() == State.Starting)))
{
s_logger.info("Found vm " + vm.getInstanceName() + " in inconsistent state. " + vm.getState() + " on CS while " + (info == null ? "Stopped" : "Running") + " on agent");
info = new AgentVmInfo(vm.getInstanceName(), getVmGuru(vm), vm, State.Stopped);
vm.setState(State.Running); // set it as running and let HA take care of it
_vmDao.persist(vm);
castedVm = info.guru.findById(vm.getId());
try {
Host host = _hostDao.findByGuid(info.getHostUuid());
long hostId = host == null ? (vm.getHostId() == null ? vm.getLastHostId() : vm.getHostId()) : host.getId();
HypervisorGuru hvGuru = _hvGuruMgr.getGuru(castedVm.getHypervisorType());
Command command = compareState(hostId, castedVm, info, true, hvGuru.trackVmHostChange());
if (command != null){
Answer answer = _agentMgr.send(hostId, command);
if (!answer.getResult()) {
s_logger.warn("Failed to update state of the VM due to " + answer.getDetails());
}
}
} catch (Exception e) {
s_logger.warn("Unable to update state of the VM due to exception " + e.getMessage());
e.printStackTrace();
}
}
}
}
else
// host id can change
if (info != null && vm.getState() == State.Running){
// check for host id changes
Host host = _hostDao.findByGuid(info.getHostUuid());
if (host != null && (vm.getHostId() == null || host.getId() != vm.getHostId())){
s_logger.info("Found vm " + vm.getInstanceName() + " with inconsistent host in db, new host is " + host.getId());
try {
stateTransitTo(vm, VirtualMachine.Event.AgentReportMigrated, host.getId());
} catch (NoTransitionException e) {
s_logger.warn(e.getMessage());
}
}
}
}
for (final AgentVmInfo left : infos.values()) {
if (VirtualMachineName.isValidVmName(left.name)) continue; // if the vm follows cloudstack naming ignore it for stopping
try {
Host host = _resourceMgr.findHostByGuid(left.getHostUuid());
Host host = _hostDao.findByGuid(left.getHostUuid());
if (host != null){
s_logger.warn("Stopping a VM which we do not have any record of " + left.name);
Answer answer = _agentMgr.send(host.getId(), cleanup(left.name));
@ -1754,10 +1773,11 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
s_logger.warn("Unable to stop a VM due to " + e.getMessage());
}
}
}
protected Map<Long, AgentVmInfo> convertToInfos(final Map<String, Pair<String, State>> newStates) {
final HashMap<Long, AgentVmInfo> map = new HashMap<Long, AgentVmInfo>();
if (newStates == null) {
@ -2088,12 +2108,7 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
if (answer instanceof ClusterSyncAnswer) {
ClusterSyncAnswer hs = (ClusterSyncAnswer) answer;
if (!hs.isExceuted()){
if (hs.isFull()) {
deltaSync(hs.getNewStates());
fullSync(hs.getClusterId(), hs.getAllStates(), false);
} else if (hs.isDelta()){
deltaSync(hs.getNewStates());
}
deltaSync(hs.getNewStates());
hs.setExecuted();
}
} else if (!answer.getResult()) {
@ -2171,12 +2186,11 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
StartupRoutingCommand startup = (StartupRoutingCommand) cmd;
HashMap<String, Pair<String, State>> allStates = startup.getClusterVMStateChanges();
if (allStates != null){
this.fullSync(clusterId, allStates, true);
this.fullSync(clusterId, allStates);
}
// initiate the cron job
ClusterSyncCommand syncCmd = new ClusterSyncCommand(Integer.parseInt(Config.ClusterDeltaSyncInterval.getDefaultValue()),
Integer.parseInt(Config.ClusterFullSyncSkipSteps.getDefaultValue()), clusterId);
ClusterSyncCommand syncCmd = new ClusterSyncCommand(Integer.parseInt(Config.ClusterDeltaSyncInterval.getDefaultValue()), clusterId);
try {
long seq_no = _agentMgr.send(agentId, new Commands(syncCmd), this);
s_logger.debug("Cluster VM sync started with jobid " + seq_no);

View File

@ -84,7 +84,7 @@ public interface VMInstanceDao extends GenericDao<VMInstanceVO, Long>, StateDao<
public Long countAllocatedVirtualRoutersForAccount(long accountId);
List<VMInstanceVO> listByClusterId(long clusterId); // this does not pull up VMs which are starting
List<VMInstanceVO> listStartingByClusterId(long clusterId); // get all the VMs even starting one on this cluster
List<VMInstanceVO> listLHByClusterId(long clusterId); // get all the VMs even starting one on this cluster
List<VMInstanceVO> listVmsMigratingFromHost(Long hostId);

View File

@ -59,7 +59,7 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
public static final Logger s_logger = Logger.getLogger(VMInstanceDaoImpl.class);
protected final SearchBuilder<VMInstanceVO> VMClusterSearch;
protected final SearchBuilder<VMInstanceVO> StartingVMClusterSearch;
protected final SearchBuilder<VMInstanceVO> LHVMClusterSearch;
protected final SearchBuilder<VMInstanceVO> IdStatesSearch;
protected final SearchBuilder<VMInstanceVO> AllFieldsSearch;
protected final SearchBuilder<VMInstanceVO> ZoneTemplateNonExpungedSearch;
@ -102,11 +102,11 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
VMClusterSearch.done();
StartingVMClusterSearch = createSearchBuilder();
LHVMClusterSearch = createSearchBuilder();
SearchBuilder<HostVO> hostSearch1 = _hostDao.createSearchBuilder();
StartingVMClusterSearch.join("hostSearch1", hostSearch1, hostSearch1.entity().getId(), StartingVMClusterSearch.entity().getHostId(), JoinType.INNER);
LHVMClusterSearch.join("hostSearch1", hostSearch1, hostSearch1.entity().getId(), LHVMClusterSearch.entity().getLastHostId(), JoinType.INNER);
hostSearch1.and("clusterId", hostSearch1.entity().getClusterId(), SearchCriteria.Op.EQ);
StartingVMClusterSearch.done();
LHVMClusterSearch.done();
AllFieldsSearch = createSearchBuilder();
@ -227,8 +227,8 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
@Override
public List<VMInstanceVO> listStartingByClusterId(long clusterId) {
SearchCriteria<VMInstanceVO> sc = StartingVMClusterSearch.create();
public List<VMInstanceVO> listLHByClusterId(long clusterId) {
SearchCriteria<VMInstanceVO> sc = LHVMClusterSearch.create();
sc.setJoinParameters("hostSearch1", "clusterId", clusterId);
return listBy(sc);
}