Deal with concurrent state update for VM and Host objects.

This commit is contained in:
Kelven Yang 2014-03-14 10:54:27 -07:00
parent d3973ed081
commit 12f7cbcb76
5 changed files with 79 additions and 94 deletions

View File

@ -102,6 +102,7 @@ public interface VirtualMachine extends RunningOn, ControlledEntity, Identity, I
s_fsm.addTransition(State.Running, VirtualMachine.Event.StopRequested, State.Stopping);
s_fsm.addTransition(State.Running, VirtualMachine.Event.AgentReportShutdowned, State.Stopped);
s_fsm.addTransition(State.Running, VirtualMachine.Event.AgentReportMigrated, State.Running);
s_fsm.addTransition(State.Running, VirtualMachine.Event.OperationSucceeded, State.Running);
s_fsm.addTransition(State.Migrating, VirtualMachine.Event.MigrationRequested, State.Migrating);
s_fsm.addTransition(State.Migrating, VirtualMachine.Event.OperationSucceeded, State.Running);
s_fsm.addTransition(State.Migrating, VirtualMachine.Event.OperationFailed, State.Running);

View File

@ -36,12 +36,10 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.ejb.Local;
import javax.inject.Inject;
import javax.naming.ConfigurationException;
import org.apache.cloudstack.affinity.dao.AffinityGroupVMMapDao;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.engine.orchestration.service.NetworkOrchestrationService;
import org.apache.cloudstack.engine.orchestration.service.VolumeOrchestrationService;
import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
@ -51,19 +49,9 @@ import org.apache.cloudstack.framework.config.ConfigDepot;
import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.framework.config.Configurable;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.framework.jobs.Outcome;
import org.apache.cloudstack.framework.jobs.dao.VmWorkJobDao;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl;
import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
import org.apache.cloudstack.framework.messagebus.MessageBus;
import org.apache.cloudstack.framework.messagebus.MessageDispatcher;
import org.apache.cloudstack.framework.messagebus.MessageHandler;
import org.apache.cloudstack.jobs.JobInfo;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
import org.apache.cloudstack.storage.to.VolumeObjectTO;
@ -291,7 +279,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
}
public void setHostAllocators(List<HostAllocator> hostAllocators) {
this.hostAllocators = hostAllocators;
hostAllocators = hostAllocators;
}
protected List<StoragePoolAllocator> _storagePoolAllocators;
@ -3243,9 +3231,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@SuppressWarnings("unchecked")
public AgentVmInfo(String name, VMInstanceVO vm, State state, String host) {
this.name = name;
this.state = state;
this.vm = vm;
name = name;
state = state;
vm = vm;
hostUuid = host;
}
@ -4100,7 +4088,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
List<VmWorkJobVO> pendingWorkJobs = _workJobDao.listPendingWorkJobs(
VirtualMachine.Type.Instance, vmId);
if (pendingWorkJobs.size() == 0 || !_haMgr.hasPendingHaWork(vmId)) {
if (pendingWorkJobs.size() == 0 && !_haMgr.hasPendingHaWork(vmId)) {
// there is no pending operation job
VMInstanceVO vm = _vmDao.findById(vmId);
if (vm != null) {
@ -4407,7 +4395,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@Override
public boolean checkCondition() {
VMInstanceVO instance = _vmDao.findById(vmId);
if (instance.getPowerState() == desiredPowerState && (srcHostIdForMigration != null && srcHostIdForMigration.equals(instance.getPowerHostId())))
if ((instance.getPowerState() == desiredPowerState && srcHostIdForMigration == null) ||
(instance.getPowerState() == desiredPowerState && (srcHostIdForMigration != null && instance.getPowerHostId() != srcHostIdForMigration)))
return true;
return false;
}

View File

@ -953,46 +953,49 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
int result = update(ub, sc, null);
assert result <= 1 : "How can this update " + result + " rows? ";
if (status_logger.isDebugEnabled() && result == 0) {
if (result == 0) {
HostVO ho = findById(host.getId());
assert ho != null : "How how how? : " + host.getId();
StringBuilder str = new StringBuilder("Unable to update host for event:").append(event.toString());
str.append(". Name=").append(host.getName());
str.append("; New=[status=")
.append(newStatus.toString())
.append(":msid=")
.append(newStatus.lostConnection() ? "null" : host.getManagementServerId())
.append(":lastpinged=")
.append(host.getLastPinged())
.append("]");
str.append("; Old=[status=")
.append(oldStatus.toString())
.append(":msid=")
.append(host.getManagementServerId())
.append(":lastpinged=")
.append(oldPingTime)
.append("]");
str.append("; DB=[status=")
.append(vo.getStatus().toString())
.append(":msid=")
.append(vo.getManagementServerId())
.append(":lastpinged=")
.append(vo.getLastPinged())
.append(":old update count=")
.append(oldUpdateCount)
.append("]");
status_logger.debug(str.toString());
} else {
StringBuilder msg = new StringBuilder("Agent status update: [");
msg.append("id = " + host.getId());
msg.append("; name = " + host.getName());
msg.append("; old status = " + oldStatus);
msg.append("; event = " + event);
msg.append("; new status = " + newStatus);
msg.append("; old update count = " + oldUpdateCount);
msg.append("; new update count = " + newUpdateCount + "]");
status_logger.debug(msg.toString());
if (status_logger.isDebugEnabled()) {
StringBuilder str = new StringBuilder("Unable to update host for event:").append(event.toString());
str.append(". Name=").append(host.getName());
str.append("; New=[status=")
.append(newStatus.toString())
.append(":msid=")
.append(newStatus.lostConnection() ? "null" : host.getManagementServerId())
.append(":lastpinged=")
.append(host.getLastPinged())
.append("]");
str.append("; Old=[status=").append(oldStatus.toString()).append(":msid=").append(host.getManagementServerId()).append(":lastpinged=").append(oldPingTime)
.append("]");
str.append("; DB=[status=")
.append(vo.getStatus().toString())
.append(":msid=")
.append(vo.getManagementServerId())
.append(":lastpinged=")
.append(vo.getLastPinged())
.append(":old update count=")
.append(oldUpdateCount)
.append("]");
status_logger.debug(str.toString());
} else {
StringBuilder msg = new StringBuilder("Agent status update: [");
msg.append("id = " + host.getId());
msg.append("; name = " + host.getName());
msg.append("; old status = " + oldStatus);
msg.append("; event = " + event);
msg.append("; new status = " + newStatus);
msg.append("; old update count = " + oldUpdateCount);
msg.append("; new update count = " + newUpdateCount + "]");
status_logger.debug(msg.toString());
}
if (ho.getState() == newStatus) {
status_logger.debug("Host " + ho.getName() + " state has already been updated to " + newStatus);
return true;
}
}
return result > 0;

View File

@ -452,41 +452,29 @@ public class VMInstanceDaoImpl extends GenericDaoBase<VMInstanceVO, Long> implem
ub.set(vmi, _updateTimeAttr, new Date());
int result = update(vmi, sc);
if (result == 0 && s_logger.isDebugEnabled()) {
if (result == 0) {
VMInstanceVO vo = findByIdIncludingRemoved(vm.getId());
if (vo != null) {
StringBuilder str = new StringBuilder("Unable to update ").append(vo.toString());
str.append(": DB Data={Host=")
.append(vo.getHostId())
.append("; State=")
.append(vo.getState().toString())
.append("; updated=")
.append(vo.getUpdated())
.append("; time=")
.append(vo.getUpdateTime());
str.append("} New Data: {Host=")
.append(vm.getHostId())
.append("; State=")
.append(vm.getState().toString())
.append("; updated=")
.append(vmi.getUpdated())
.append("; time=")
.append(vo.getUpdateTime());
str.append("} Stale Data: {Host=")
.append(oldHostId)
.append("; State=")
.append(oldState)
.append("; updated=")
.append(oldUpdated)
.append("; time=")
.append(oldUpdateDate)
.append("}");
s_logger.debug(str.toString());
if (s_logger.isDebugEnabled()) {
if (vo != null) {
StringBuilder str = new StringBuilder("Unable to update ").append(vo.toString());
str.append(": DB Data={Host=").append(vo.getHostId()).append("; State=").append(vo.getState().toString()).append("; updated=").append(vo.getUpdated())
.append("; time=").append(vo.getUpdateTime());
str.append("} New Data: {Host=").append(vm.getHostId()).append("; State=").append(vm.getState().toString()).append("; updated=").append(vmi.getUpdated())
.append("; time=").append(vo.getUpdateTime());
str.append("} Stale Data: {Host=").append(oldHostId).append("; State=").append(oldState).append("; updated=").append(oldUpdated).append("; time=")
.append(oldUpdateDate).append("}");
s_logger.debug(str.toString());
} else {
s_logger.debug("Unable to update the vm id=" + vm.getId() + "; the vm either doesn't exist or already removed");
} else {
s_logger.debug("Unable to update the vm id=" + vm.getId() + "; the vm either doesn't exist or already removed");
}
}
if (vo != null && vo.getState() == newState) {
// allow for concurrent update if target state has already been matched
s_logger.debug("VM " + vo.getInstanceName() + " state has been already been updated to " + newState);
return true;
}
}
return result > 0;

View File

@ -256,13 +256,17 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
});
for (Long id : wakeupList) {
// TODO, we assume that all jobs in this category is API job only
AsyncJobVO jobToWakeup = _jobDao.findById(id);
if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & AsyncJob.Constants.SIGNAL_MASK_WAKEUP) != 0)
scheduleExecution(jobToWakeup, false);
}
//
// disable wakeup scheduling now, since all API jobs are currently using block-waiting for sub-jobs
//
/*
for (Long id : wakeupList) {
// TODO, we assume that all jobs in this category is API job only
AsyncJobVO jobToWakeup = _jobDao.findById(id);
if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & AsyncJob.Constants.SIGNAL_MASK_WAKEUP) != 0)
scheduleExecution(jobToWakeup, false);
}
*/
_messageBus.publish(null, AsyncJob.Topics.JOB_STATE, PublishScope.GLOBAL, jobId);
}