From a9733b5df23a0d0f996726eca93f8309d17a920a Mon Sep 17 00:00:00 2001 From: Kelven Yang Date: Fri, 17 Jan 2014 10:41:42 -0800 Subject: [PATCH] CLOUDSTACK-5731: Use general instance type to categorize VM work jobs to correctly serialize VM operations --- .../src/com/cloud/vm/VmWorkConstants.java | 1 + .../cloud/vm/VirtualMachineManagerImpl.java | 171 +++++++++++++++--- .../src/com/cloud/vm/VmWorkJobDispatcher.java | 49 ++--- .../jobs/AsyncJobExecutionContext.java | 48 +++-- .../framework/jobs/dao/VmWorkJobDao.java | 2 + .../framework/jobs/dao/VmWorkJobDaoImpl.java | 93 ++++++++-- .../cloud/storage/VolumeApiServiceImpl.java | 99 ++++++++-- .../vm/snapshot/VMSnapshotManagerImpl.java | 80 +++++++- 8 files changed, 453 insertions(+), 90 deletions(-) diff --git a/engine/components-api/src/com/cloud/vm/VmWorkConstants.java b/engine/components-api/src/com/cloud/vm/VmWorkConstants.java index 20e40b7f84b..4627cfe078b 100644 --- a/engine/components-api/src/com/cloud/vm/VmWorkConstants.java +++ b/engine/components-api/src/com/cloud/vm/VmWorkConstants.java @@ -20,4 +20,5 @@ public interface VmWorkConstants { public static final String VM_WORK_QUEUE = "VmWorkJobQueue"; public static final String VM_WORK_JOB_DISPATCHER = "VmWorkJobDispatcher"; public static final String VM_WORK_JOB_WAKEUP_DISPATCHER = "VmWorkJobWakeupDispatcher"; + public static final String VM_WORK_JOB_PLACEHOLDER = "VmWorkJobPlaceHolder"; } diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java index 89a02585a60..06805e1391b 100755 --- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java +++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java @@ -561,6 +561,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac _executor.scheduleAtFixedRate(new TransitionTask(), 5000, VmJobStateReportInterval.value(), TimeUnit.SECONDS); _executor.scheduleAtFixedRate(new CleanupTask(), VmOpCleanupInterval.value(), VmOpCleanupInterval.value(), TimeUnit.SECONDS); cancelWorkItems(_nodeId); + + // cleanup left over place holder works + _workJobDao.expungeLeftoverWorkJobs(ManagementServerNode.getManagementServerId()); return true; } @@ -751,7 +754,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - orchestrateStart(vmUuid, params, planToDeploy, planner); + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + VirtualMachine vm = _vmDao.findByUuid(vmUuid); + placeHolder = createPlaceHolderWork(vm.getId()); + } + try { + orchestrateStart(vmUuid, params, planToDeploy, planner); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } } else { Outcome outcome = startVmThroughJobQueue(vmUuid, params, planToDeploy); @@ -1275,7 +1288,19 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - orchestrateStop(vmUuid, cleanUpEvenIfUnableToStop); + + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + VirtualMachine vm = _vmDao.findByUuid(vmUuid); + placeHolder = createPlaceHolderWork(vm.getId()); + } + try { + orchestrateStop(vmUuid, cleanUpEvenIfUnableToStop); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } + } else { Outcome outcome = stopVmThroughJobQueue(vmUuid, cleanUpEvenIfUnableToStop); @@ -1567,7 +1592,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - orchestrateStorageMigration(vmUuid, destPool); + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + VirtualMachine vm = _vmDao.findByUuid(vmUuid); + placeHolder = createPlaceHolderWork(vm.getId()); + } + try { + orchestrateStorageMigration(vmUuid, destPool); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } } else { Outcome outcome = migrateVmStorageThroughJobQueue(vmUuid, destPool); @@ -1649,7 +1684,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - orchestrateMigrate(vmUuid, srcHostId, dest); + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + VirtualMachine vm = _vmDao.findByUuid(vmUuid); + placeHolder = createPlaceHolderWork(vm.getId()); + } + try { + orchestrateMigrate(vmUuid, srcHostId, dest); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } } else { Outcome outcome = migrateVmThroughJobQueue(vmUuid, srcHostId, dest); @@ -1920,7 +1965,19 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - orchestrateMigrateWithStorage(vmUuid, srcHostId, destHostId, volumeToPool); + + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + VirtualMachine vm = _vmDao.findByUuid(vmUuid); + placeHolder = createPlaceHolderWork(vm.getId()); + } + try { + orchestrateMigrateWithStorage(vmUuid, srcHostId, destHostId, volumeToPool); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } + } else { Outcome outcome = migrateVmWithStorageThroughJobQueue(vmUuid, srcHostId, destHostId, volumeToPool); @@ -2163,6 +2220,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac s_logger.trace("VM Operation Thread Running"); try { _workDao.cleanup(VmOpCleanupWait.value()); + + // TODO. hard-coded to one hour after job has been completed + Date cutDate = new Date(new Date().getTime() - 3600000); + _workJobDao.expungeCompletedWorkJobs(cutDate); } catch (Exception e) { s_logger.error("VM Operations failed due to ", e); } @@ -2199,7 +2260,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - orchestrateReboot(vmUuid, params); + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + VirtualMachine vm = _vmDao.findByUuid(vmUuid); + placeHolder = createPlaceHolderWork(vm.getId()); + } + try { + orchestrateReboot(vmUuid, params); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } } else { Outcome outcome = rebootVmThroughJobQueue(vmUuid, params); @@ -3120,7 +3191,16 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - return orchestrateAddVmToNetwork(vm, network, requested); + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + placeHolder = createPlaceHolderWork(vm.getId()); + } + try { + return orchestrateAddVmToNetwork(vm, network, requested); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } } else { Outcome outcome = addVmToNetworkThroughJobQueue(vm, network, requested); @@ -3223,7 +3303,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - return orchestrateRemoveNicFromVm(vm, nic); + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + placeHolder = createPlaceHolderWork(vm.getId()); + } + try { + return orchestrateRemoveNicFromVm(vm, nic); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } + } else { Outcome outcome = removeNicFromVmThroughJobQueue(vm, nic); @@ -3462,7 +3552,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - orchestrateMigrateForScale(vmUuid, srcHostId, dest, oldSvcOfferingId); + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + VirtualMachine vm = _vmDao.findByUuid(vmUuid); + placeHolder = createPlaceHolderWork(vm.getId()); + } + try { + orchestrateMigrateForScale(vmUuid, srcHostId, dest, oldSvcOfferingId); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } } else { Outcome outcome = migrateVmForScaleThroughJobQueue(vmUuid, srcHostId, dest, oldSvcOfferingId); @@ -3711,7 +3811,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - return orchestrateReConfigureVm(vmUuid, oldServiceOffering, reconfiguringOnExistingHost); + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + VirtualMachine vm = _vmDao.findByUuid(vmUuid); + placeHolder = createPlaceHolderWork(vm.getId()); + } + try { + return orchestrateReConfigureVm(vmUuid, oldServiceOffering, reconfiguringOnExistingHost); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } } else { Outcome outcome = reconfigureVmThroughJobQueue(vmUuid, oldServiceOffering, reconfiguringOnExistingHost); @@ -4187,7 +4297,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setAccountId(callingAccount.getId()); workJob.setUserId(callingUser.getId()); workJob.setStep(VmWorkJobVO.Step.Starting); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -4240,7 +4350,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); workJob.setStep(VmWorkJobVO.Step.Prepare); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -4293,7 +4403,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); workJob.setStep(VmWorkJobVO.Step.Prepare); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -4343,7 +4453,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -4397,7 +4507,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -4449,7 +4559,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -4501,7 +4611,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -4551,7 +4661,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -4600,7 +4710,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -4649,7 +4759,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -4700,7 +4810,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setAccountId(account.getId()); workJob.setUserId(user.getId()); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -4856,4 +4966,23 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac public Pair handleVmWorkJob(VmWork work) throws Exception { return _jobHandlerProxy.handleVmWorkJob(work); } + + private VmWorkJobVO createPlaceHolderWork(long instanceId) { + VmWorkJobVO workJob = new VmWorkJobVO(""); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_PLACEHOLDER); + workJob.setCmd(""); + workJob.setCmdInfo(""); + + workJob.setAccountId(0); + workJob.setUserId(0); + workJob.setStep(VmWorkJobVO.Step.Starting); + workJob.setVmType(VirtualMachine.Type.Instance); + workJob.setVmInstanceId(instanceId); + workJob.setInitMsid(ManagementServerNode.getManagementServerId()); + + _workJobDao.persist(workJob); + + return workJob; + } } diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java index 285c8a2831e..31b2d9ca2d2 100644 --- a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java +++ b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java @@ -60,9 +60,6 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch String cmd = job.getCmd(); assert (cmd != null); - if (s_logger.isDebugEnabled()) - s_logger.debug("Run VM work job: " + cmd + ", job origin: " + job.getRelated()); - Class workClz = null; try { workClz = Class.forName(job.getCmd()); @@ -80,27 +77,33 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch return; } - if (_handlers == null || _handlers.isEmpty()) { - s_logger.error("Invalid startup configuration, no work job handler is found. cmd: " + job.getCmd() + ", job info: " + job.getCmdInfo() - + ", job origin: " + job.getRelated()); - _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Invalid startup configuration. no job handler is found"); - return; + if (s_logger.isDebugEnabled()) + s_logger.debug("Run VM work job: " + cmd + " for VM " + work.getVmId() + ", job origin: " + job.getRelated()); + try { + if (_handlers == null || _handlers.isEmpty()) { + s_logger.error("Invalid startup configuration, no work job handler is found. cmd: " + job.getCmd() + ", job info: " + job.getCmdInfo() + + ", job origin: " + job.getRelated()); + _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Invalid startup configuration. no job handler is found"); + return; + } + + VmWorkJobHandler handler = _handlers.get(work.getHandlerName()); + + if (handler == null) { + s_logger.error("Unable to find work job handler. handler name: " + work.getHandlerName() + ", job cmd: " + job.getCmd() + + ", job info: " + job.getCmdInfo() + ", job origin: " + job.getRelated()); + _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Unable to find work job handler"); + return; + } + + CallContext.register(work.getUserId(), work.getAccountId(), job.getRelated()); + + Pair result = handler.handleVmWorkJob(work); + _asyncJobMgr.completeAsyncJob(job.getId(), result.first(), 0, result.second()); + } finally { + if (s_logger.isDebugEnabled()) + s_logger.debug("Done with run of VM work job: " + cmd + " for VM " + work.getVmId() + ", job origin: " + job.getRelated()); } - - VmWorkJobHandler handler = _handlers.get(work.getHandlerName()); - - if (handler == null) { - s_logger.error("Unable to find work job handler. handler name: " + work.getHandlerName() + ", job cmd: " + job.getCmd() - + ", job info: " + job.getCmdInfo() + ", job origin: " + job.getRelated()); - _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Unable to find work job handler"); - return; - } - - CallContext.register(work.getUserId(), work.getAccountId(), job.getRelated()); - - Pair result = handler.handleVmWorkJob(work); - _asyncJobMgr.completeAsyncJob(job.getId(), result.first(), 0, result.second()); - } catch(Throwable e) { s_logger.error("Unable to complete " + job + ", job origin:" + job.getRelated(), e); diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java index f558e013712..20125f43e66 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/AsyncJobExecutionContext.java @@ -30,7 +30,9 @@ import com.cloud.exception.ConcurrentOperationException; import com.cloud.exception.InsufficientCapacityException; import com.cloud.exception.ResourceUnavailableException; -public class AsyncJobExecutionContext { +public class AsyncJobExecutionContext { + private static final Logger s_logger = Logger.getLogger(AsyncJobExecutionContext.class); + private AsyncJob _job; static private AsyncJobManager s_jobMgr; @@ -112,7 +114,8 @@ public class AsyncJobExecutionContext { } // - // check failure exception before we disjoin the worker job + // check failure exception before we disjoin the worker job, work job usually fails with exception + // this will help propogate exception between jobs // TODO : it is ugly and this will become unnecessary after we switch to full-async mode // public void disjoinJob(long joinedJobId) throws InsufficientCapacityException, @@ -120,21 +123,34 @@ public class AsyncJobExecutionContext { assert (_job != null); AsyncJobJoinMapVO record = s_joinMapDao.getJoinRecord(_job.getId(), joinedJobId); - if (record.getJoinStatus() == JobInfo.Status.FAILED && record.getJoinResult() != null) { - Object exception = JobSerializerHelper.fromObjectSerializedString(record.getJoinResult()); - if (exception != null && exception instanceof Exception) { - if (exception instanceof InsufficientCapacityException) - throw (InsufficientCapacityException)exception; - else if (exception instanceof ConcurrentOperationException) - throw (ConcurrentOperationException)exception; - else if (exception instanceof ResourceUnavailableException) - throw (ResourceUnavailableException)exception; - else - throw new RuntimeException((Exception)exception); + s_jobMgr.disjoinJob(_job.getId(), joinedJobId); + + if (record.getJoinStatus() == JobInfo.Status.FAILED) { + if (record.getJoinResult() != null) { + Object exception = JobSerializerHelper.fromObjectSerializedString(record.getJoinResult()); + if (exception != null && exception instanceof Exception) { + if (exception instanceof InsufficientCapacityException) { + s_logger.error("Job " + joinedJobId + " failed with InsufficientCapacityException"); + throw (InsufficientCapacityException)exception; + } + else if (exception instanceof ConcurrentOperationException) { + s_logger.error("Job " + joinedJobId + " failed with ConcurrentOperationException"); + throw (ConcurrentOperationException)exception; + } + else if (exception instanceof ResourceUnavailableException) { + s_logger.error("Job " + joinedJobId + " failed with ResourceUnavailableException"); + throw (ResourceUnavailableException)exception; + } + else { + s_logger.error("Job " + joinedJobId + " failed with exception"); + throw new RuntimeException((Exception)exception); + } + } + } else { + s_logger.error("Job " + joinedJobId + " failed without providing an error object"); + throw new RuntimeException("Job " + joinedJobId + " failed without providing an error object"); } } - - s_jobMgr.disjoinJob(_job.getId(), joinedJobId); } public void completeJoin(JobInfo.Status joinStatus, String joinResult) { @@ -151,6 +167,8 @@ public class AsyncJobExecutionContext { public static AsyncJobExecutionContext getCurrentExecutionContext() { AsyncJobExecutionContext context = s_currentExectionContext.get(); if (context == null) { + // TODO, this has security implicitions + s_logger.warn("Job is executed without a context, setup psudo job for the executing thread"); context = registerPseudoExecutionContext(CallContext.current().getCallingAccountId(), CallContext.current().getCallingUserId()); } diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java index a3dbddf0eb6..44e39e40291 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java @@ -35,4 +35,6 @@ public interface VmWorkJobDao extends GenericDao { void updateStep(long workJobId, Step step); void expungeCompletedWorkJobs(Date cutDate); + + void expungeLeftoverWorkJobs(long msid); } diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java index 5e0ffb68c39..cf3e17392b5 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java @@ -16,8 +16,11 @@ // under the License. package org.apache.cloudstack.framework.jobs.dao; +import java.sql.PreparedStatement; +import java.sql.SQLException; import java.util.Date; import java.util.List; +import java.util.TimeZone; import javax.annotation.PostConstruct; @@ -31,13 +34,16 @@ import com.cloud.utils.db.GenericDaoBase; import com.cloud.utils.db.SearchBuilder; import com.cloud.utils.db.SearchCriteria; import com.cloud.utils.db.SearchCriteria.Op; +import com.cloud.utils.db.Transaction; +import com.cloud.utils.db.TransactionCallbackNoReturn; +import com.cloud.utils.db.TransactionLegacy; +import com.cloud.utils.db.TransactionStatus; import com.cloud.vm.VirtualMachine; public class VmWorkJobDaoImpl extends GenericDaoBase implements VmWorkJobDao { protected SearchBuilder PendingWorkJobSearch; protected SearchBuilder PendingWorkJobByCommandSearch; - protected SearchBuilder ExpungeWorkJobSearch; public VmWorkJobDaoImpl() { } @@ -48,7 +54,6 @@ public class VmWorkJobDaoImpl extends GenericDaoBase implemen PendingWorkJobSearch.and("jobStatus", PendingWorkJobSearch.entity().getStatus(), Op.EQ); PendingWorkJobSearch.and("vmType", PendingWorkJobSearch.entity().getVmType(), Op.EQ); PendingWorkJobSearch.and("vmInstanceId", PendingWorkJobSearch.entity().getVmInstanceId(), Op.EQ); - PendingWorkJobSearch.and("step", PendingWorkJobSearch.entity().getStep(), Op.NEQ); PendingWorkJobSearch.done(); PendingWorkJobByCommandSearch = createSearchBuilder(); @@ -58,11 +63,6 @@ public class VmWorkJobDaoImpl extends GenericDaoBase implemen PendingWorkJobByCommandSearch.and("step", PendingWorkJobByCommandSearch.entity().getStep(), Op.NEQ); PendingWorkJobByCommandSearch.and("cmd", PendingWorkJobByCommandSearch.entity().getCmd(), Op.EQ); PendingWorkJobByCommandSearch.done(); - - ExpungeWorkJobSearch = createSearchBuilder(); - ExpungeWorkJobSearch.and("lastUpdated", ExpungeWorkJobSearch.entity().getLastUpdated(), Op.LT); - ExpungeWorkJobSearch.and("jobStatus", ExpungeWorkJobSearch.entity().getStatus(), Op.NEQ); - ExpungeWorkJobSearch.done(); } @Override @@ -115,11 +115,80 @@ public class VmWorkJobDaoImpl extends GenericDaoBase implemen } @Override - public void expungeCompletedWorkJobs(Date cutDate) { - SearchCriteria sc = ExpungeWorkJobSearch.create(); - sc.setParameters("lastUpdated", cutDate); - sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS); + public void expungeCompletedWorkJobs(final Date cutDate) { + // current DAO machenism does not support following usage + /* + SearchCriteria sc = ExpungeWorkJobSearch.create(); + sc.setParameters("lastUpdated",cutDate); + sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS); - expunge(sc); + expunge(sc); + */ + Transaction.execute(new TransactionCallbackNoReturn() { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { + TransactionLegacy txn = TransactionLegacy.currentTxn(); + + PreparedStatement pstmt = null; + try { + pstmt = txn.prepareAutoCloseStatement( + "DELETE FROM vm_work_job WHERE id IN (SELECT id FROM async_job WHERE job_dispatcher='VmWorkJobDispatcher' AND job_status != 0 AND last_updated < ?)"); + pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + + pstmt.execute(); + } catch (SQLException e) { + } catch (Throwable e) { + } + + try { + pstmt = txn.prepareAutoCloseStatement( + "DELETE FROM async_job WHERE job_dispatcher='VmWorkJobDispatcher' AND job_status != 0 AND last_updated < ?"); + pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate)); + + pstmt.execute(); + } catch (SQLException e) { + } catch (Throwable e) { + } + } + }); + } + + @Override + public void expungeLeftoverWorkJobs(final long msid) { + // current DAO machenism does not support following usage + /* + SearchCriteria sc = ExpungePlaceHolderWorkJobSearch.create(); + sc.setParameters("dispatcher", "VmWorkJobPlaceHolder"); + sc.setParameters("msid", msid); + + expunge(sc); + */ + Transaction.execute(new TransactionCallbackNoReturn() { + @Override + public void doInTransactionWithoutResult(TransactionStatus status) { + TransactionLegacy txn = TransactionLegacy.currentTxn(); + + PreparedStatement pstmt = null; + try { + pstmt = txn.prepareAutoCloseStatement( + "DELETE FROM vm_work_job WHERE id IN (SELECT id FROM async_job WHERE (job_dispatcher='VmWorkJobPlaceHolder' OR job_dispatcher='VmWorkJobDispatcher') AND job_init_msid=?)"); + pstmt.setLong(1, msid); + + pstmt.execute(); + } catch (SQLException e) { + } catch (Throwable e) { + } + + try { + pstmt = txn.prepareAutoCloseStatement( + "DELETE FROM async_job WHERE (job_dispatcher='VmWorkJobPlaceHolder' OR job_dispatcher='VmWorkJobDispatcher') AND job_init_msid=?"); + pstmt.setLong(1, msid); + + pstmt.execute(); + } catch (SQLException e) { + } catch (Throwable e) { + } + } + }); } } diff --git a/server/src/com/cloud/storage/VolumeApiServiceImpl.java b/server/src/com/cloud/storage/VolumeApiServiceImpl.java index 8d475dd2cee..fb35e2334ac 100644 --- a/server/src/com/cloud/storage/VolumeApiServiceImpl.java +++ b/server/src/com/cloud/storage/VolumeApiServiceImpl.java @@ -59,6 +59,7 @@ import org.apache.cloudstack.framework.jobs.AsyncJob; import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext; import org.apache.cloudstack.framework.jobs.AsyncJobManager; import org.apache.cloudstack.framework.jobs.Outcome; +import org.apache.cloudstack.framework.jobs.dao.VmWorkJobDao; import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO; import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl; import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO; @@ -73,6 +74,7 @@ import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao; import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao; import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO; import org.apache.cloudstack.storage.image.datastore.ImageStoreEntity; +import org.apache.cloudstack.utils.identity.ManagementServerNode; import com.cloud.agent.AgentManager; import com.cloud.agent.api.Answer; @@ -327,6 +329,9 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic @Inject protected AsyncJobManager _jobMgr; + @Inject + protected VmWorkJobDao _workJobDao; + VmWorkJobHandlerProxy _jobHandlerProxy = new VmWorkJobHandlerProxy(this); // TODO @@ -911,8 +916,19 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - return orchestrateResizeVolume(volume.getId(), currentSize, newSize, - newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk); + + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + placeHolder = createPlaceHolderWork(userVm.getId()); + } + try { + return orchestrateResizeVolume(volume.getId(), currentSize, newSize, + newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } + } else { Outcome outcome = resizeVolumeThroughJobQueue(userVm.getId(), volume.getId(), currentSize, newSize, newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk); @@ -1102,7 +1118,18 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - return orchestrateAttachVolumeToVM(command.getVirtualMachineId(), command.getId(), command.getDeviceId()); + + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + placeHolder = createPlaceHolderWork(command.getVirtualMachineId()); + } + try { + return orchestrateAttachVolumeToVM(command.getVirtualMachineId(), command.getId(), command.getDeviceId()); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } + } else { Outcome outcome = attachVolumeToVmThroughJobQueue(command.getVirtualMachineId(), command.getId(), command.getDeviceId()); @@ -1405,7 +1432,16 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - return orchestrateDetachVolumeFromVM(vmId, volumeId); + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + placeHolder = createPlaceHolderWork(vmId); + } + try { + return orchestrateDetachVolumeFromVM(vmId, volumeId); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } } else { Outcome outcome = detachVolumeFromVmThroughJobQueue(vmId, volumeId); @@ -1571,7 +1607,18 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - return orchestrateMigrateVolume(vol.getId(), destPool.getId(), liveMigrateVolume); + + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + placeHolder = createPlaceHolderWork(vm.getId()); + } + try { + return orchestrateMigrateVolume(vol.getId(), destPool.getId(), liveMigrateVolume); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } + } else { Outcome outcome = migrateVolumeThroughJobQueue(vm.getId(), vol.getId(), destPool.getId(), liveMigrateVolume); @@ -1662,7 +1709,18 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - return orchestrateTakeVolumeSnapshot(volumeId, policyId, snapshotId, account, quiescevm); + + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + placeHolder = createPlaceHolderWork(vm.getId()); + } + try { + return orchestrateTakeVolumeSnapshot(volumeId, policyId, snapshotId, account, quiescevm); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } + } else { Outcome outcome = takeVolumeSnapshotThroughJobQueue(vm.getId(), volumeId, policyId, snapshotId, account.getId(), quiescevm); @@ -2190,7 +2248,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic workJob.setAccountId(callingAccount.getId()); workJob.setUserId(callingUser.getId()); workJob.setStep(VmWorkJobVO.Step.Starting); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -2237,7 +2295,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic workJob.setAccountId(callingAccount.getId()); workJob.setUserId(callingUser.getId()); workJob.setStep(VmWorkJobVO.Step.Starting); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -2281,7 +2339,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic workJob.setAccountId(callingAccount.getId()); workJob.setUserId(callingUser.getId()); workJob.setStep(VmWorkJobVO.Step.Starting); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -2326,7 +2384,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic workJob.setAccountId(callingAccount.getId()); workJob.setUserId(callingUser.getId()); workJob.setStep(VmWorkJobVO.Step.Starting); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -2371,7 +2429,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic workJob.setAccountId(callingAccount.getId()); workJob.setUserId(callingUser.getId()); workJob.setStep(VmWorkJobVO.Step.Starting); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -2428,4 +2486,23 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic public Pair handleVmWorkJob(VmWork work) throws Exception { return _jobHandlerProxy.handleVmWorkJob(work); } + + private VmWorkJobVO createPlaceHolderWork(long instanceId) { + VmWorkJobVO workJob = new VmWorkJobVO(""); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_PLACEHOLDER); + workJob.setCmd(""); + workJob.setCmdInfo(""); + + workJob.setAccountId(0); + workJob.setUserId(0); + workJob.setStep(VmWorkJobVO.Step.Starting); + workJob.setVmType(VirtualMachine.Type.Instance); + workJob.setVmInstanceId(instanceId); + workJob.setInitMsid(ManagementServerNode.getManagementServerId()); + + _workJobDao.persist(workJob); + + return workJob; + } } diff --git a/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java b/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java index 7d6e0ec71c1..1e3926dfb39 100644 --- a/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java +++ b/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java @@ -41,10 +41,12 @@ import org.apache.cloudstack.framework.jobs.AsyncJob; import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext; import org.apache.cloudstack.framework.jobs.AsyncJobManager; import org.apache.cloudstack.framework.jobs.Outcome; +import org.apache.cloudstack.framework.jobs.dao.VmWorkJobDao; import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO; import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl; import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO; import org.apache.cloudstack.jobs.JobInfo; +import org.apache.cloudstack.utils.identity.ManagementServerNode; import com.cloud.event.ActionEvent; import com.cloud.event.EventTypes; @@ -124,6 +126,9 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana @Inject AsyncJobManager _jobMgr; + @Inject + VmWorkJobDao _workJobDao; + VmWorkJobHandlerProxy _jobHandlerProxy = new VmWorkJobHandlerProxy(this); int _vmSnapshotMax; @@ -364,7 +369,17 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - return orchestrateCreateVMSnapshot(vmId, vmSnapshotId, quiescevm); + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + placeHolder = createPlaceHolderWork(vmId); + } + try { + return orchestrateCreateVMSnapshot(vmId, vmSnapshotId, quiescevm); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } + } else { Outcome outcome = createVMSnapshotThroughJobQueue(vmId, vmSnapshotId, quiescevm); @@ -452,7 +467,16 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - return orchestrateDeleteVMSnapshot(vmSnapshotId); + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + placeHolder = createPlaceHolderWork(vmSnapshot.getVmId()); + } + try { + return orchestrateDeleteVMSnapshot(vmSnapshotId); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } } else { Outcome outcome = deleteVMSnapshotThroughJobQueue(vmSnapshot.getVmId(), vmSnapshotId); @@ -558,7 +582,18 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - return orchestrateRevertToVMSnapshot(vmSnapshotId); + + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + placeHolder = createPlaceHolderWork(vmSnapshotVo.getVmId()); + } + try { + return orchestrateRevertToVMSnapshot(vmSnapshotId); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } + } else { Outcome outcome = revertToVMSnapshotThroughJobQueue(vmSnapshotVo.getVmId(), vmSnapshotId); @@ -684,7 +719,17 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext(); if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance - return orchestrateDeleteAllVMSnapshots(vmId, type); + VmWorkJobVO placeHolder = null; + if (VmJobEnabled.value()) { + placeHolder = createPlaceHolderWork(vmId); + } + try { + return orchestrateDeleteAllVMSnapshots(vmId, type); + } finally { + if (VmJobEnabled.value()) + _workJobDao.expunge(placeHolder.getId()); + } + } else { Outcome outcome = deleteAllVMSnapshotsThroughJobQueue(vmId, type); @@ -828,7 +873,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana workJob.setAccountId(callingAccount.getId()); workJob.setUserId(callingUser.getId()); workJob.setStep(VmWorkJobVO.Step.Starting); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -872,7 +917,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana workJob.setAccountId(callingAccount.getId()); workJob.setUserId(callingUser.getId()); workJob.setStep(VmWorkJobVO.Step.Starting); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -916,7 +961,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana workJob.setAccountId(callingAccount.getId()); workJob.setUserId(callingUser.getId()); workJob.setStep(VmWorkJobVO.Step.Starting); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -960,7 +1005,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana workJob.setAccountId(callingAccount.getId()); workJob.setUserId(callingUser.getId()); workJob.setStep(VmWorkJobVO.Step.Starting); - workJob.setVmType(vm.getType()); + workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(vm.getId()); workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId()); @@ -1009,4 +1054,23 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana public Pair handleVmWorkJob(VmWork work) throws Exception { return _jobHandlerProxy.handleVmWorkJob(work); } + + private VmWorkJobVO createPlaceHolderWork(long instanceId) { + VmWorkJobVO workJob = new VmWorkJobVO(""); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_PLACEHOLDER); + workJob.setCmd(""); + workJob.setCmdInfo(""); + + workJob.setAccountId(0); + workJob.setUserId(0); + workJob.setStep(VmWorkJobVO.Step.Starting); + workJob.setVmType(VirtualMachine.Type.Instance); + workJob.setVmInstanceId(instanceId); + workJob.setInitMsid(ManagementServerNode.getManagementServerId()); + + _workJobDao.persist(workJob); + + return workJob; + } }