CLOUDSTACK-5731: Use general instance type to categorize VM work jobs to correctly serialize VM operations

This commit is contained in:
Kelven Yang 2014-01-17 10:41:42 -08:00
parent 2e5e403e3c
commit a9733b5df2
8 changed files with 453 additions and 90 deletions

View File

@ -20,4 +20,5 @@ public interface VmWorkConstants {
public static final String VM_WORK_QUEUE = "VmWorkJobQueue";
public static final String VM_WORK_JOB_DISPATCHER = "VmWorkJobDispatcher";
public static final String VM_WORK_JOB_WAKEUP_DISPATCHER = "VmWorkJobWakeupDispatcher";
public static final String VM_WORK_JOB_PLACEHOLDER = "VmWorkJobPlaceHolder";
}

View File

@ -561,6 +561,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
_executor.scheduleAtFixedRate(new TransitionTask(), 5000, VmJobStateReportInterval.value(), TimeUnit.SECONDS);
_executor.scheduleAtFixedRate(new CleanupTask(), VmOpCleanupInterval.value(), VmOpCleanupInterval.value(), TimeUnit.SECONDS);
cancelWorkItems(_nodeId);
// cleanup left over place holder works
_workJobDao.expungeLeftoverWorkJobs(ManagementServerNode.getManagementServerId());
return true;
}
@ -751,7 +754,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
orchestrateStart(vmUuid, params, planToDeploy, planner);
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
VirtualMachine vm = _vmDao.findByUuid(vmUuid);
placeHolder = createPlaceHolderWork(vm.getId());
}
try {
orchestrateStart(vmUuid, params, planToDeploy, planner);
} finally {
if (VmJobEnabled.value())
_workJobDao.expunge(placeHolder.getId());
}
} else {
Outcome<VirtualMachine> outcome = startVmThroughJobQueue(vmUuid, params, planToDeploy);
@ -1275,7 +1288,19 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
orchestrateStop(vmUuid, cleanUpEvenIfUnableToStop);
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
VirtualMachine vm = _vmDao.findByUuid(vmUuid);
placeHolder = createPlaceHolderWork(vm.getId());
}
try {
orchestrateStop(vmUuid, cleanUpEvenIfUnableToStop);
} finally {
if (VmJobEnabled.value())
_workJobDao.expunge(placeHolder.getId());
}
} else {
Outcome<VirtualMachine> outcome = stopVmThroughJobQueue(vmUuid, cleanUpEvenIfUnableToStop);
@ -1567,7 +1592,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
orchestrateStorageMigration(vmUuid, destPool);
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
VirtualMachine vm = _vmDao.findByUuid(vmUuid);
placeHolder = createPlaceHolderWork(vm.getId());
}
try {
orchestrateStorageMigration(vmUuid, destPool);
} finally {
if (VmJobEnabled.value())
_workJobDao.expunge(placeHolder.getId());
}
} else {
Outcome<VirtualMachine> outcome = migrateVmStorageThroughJobQueue(vmUuid, destPool);
@ -1649,7 +1684,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
orchestrateMigrate(vmUuid, srcHostId, dest);
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
VirtualMachine vm = _vmDao.findByUuid(vmUuid);
placeHolder = createPlaceHolderWork(vm.getId());
}
try {
orchestrateMigrate(vmUuid, srcHostId, dest);
} finally {
if (VmJobEnabled.value())
_workJobDao.expunge(placeHolder.getId());
}
} else {
Outcome<VirtualMachine> outcome = migrateVmThroughJobQueue(vmUuid, srcHostId, dest);
@ -1920,7 +1965,19 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
orchestrateMigrateWithStorage(vmUuid, srcHostId, destHostId, volumeToPool);
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
VirtualMachine vm = _vmDao.findByUuid(vmUuid);
placeHolder = createPlaceHolderWork(vm.getId());
}
try {
orchestrateMigrateWithStorage(vmUuid, srcHostId, destHostId, volumeToPool);
} finally {
if (VmJobEnabled.value())
_workJobDao.expunge(placeHolder.getId());
}
} else {
Outcome<VirtualMachine> outcome = migrateVmWithStorageThroughJobQueue(vmUuid, srcHostId, destHostId, volumeToPool);
@ -2163,6 +2220,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
s_logger.trace("VM Operation Thread Running");
try {
_workDao.cleanup(VmOpCleanupWait.value());
// TODO. hard-coded to one hour after job has been completed
Date cutDate = new Date(new Date().getTime() - 3600000);
_workJobDao.expungeCompletedWorkJobs(cutDate);
} catch (Exception e) {
s_logger.error("VM Operations failed due to ", e);
}
@ -2199,7 +2260,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
orchestrateReboot(vmUuid, params);
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
VirtualMachine vm = _vmDao.findByUuid(vmUuid);
placeHolder = createPlaceHolderWork(vm.getId());
}
try {
orchestrateReboot(vmUuid, params);
} finally {
if (VmJobEnabled.value())
_workJobDao.expunge(placeHolder.getId());
}
} else {
Outcome<VirtualMachine> outcome = rebootVmThroughJobQueue(vmUuid, params);
@ -3120,7 +3191,16 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
return orchestrateAddVmToNetwork(vm, network, requested);
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
placeHolder = createPlaceHolderWork(vm.getId());
}
try {
return orchestrateAddVmToNetwork(vm, network, requested);
} finally {
if (VmJobEnabled.value())
_workJobDao.expunge(placeHolder.getId());
}
} else {
Outcome<VirtualMachine> outcome = addVmToNetworkThroughJobQueue(vm, network, requested);
@ -3223,7 +3303,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
return orchestrateRemoveNicFromVm(vm, nic);
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
placeHolder = createPlaceHolderWork(vm.getId());
}
try {
return orchestrateRemoveNicFromVm(vm, nic);
} finally {
if (VmJobEnabled.value())
_workJobDao.expunge(placeHolder.getId());
}
} else {
Outcome<VirtualMachine> outcome = removeNicFromVmThroughJobQueue(vm, nic);
@ -3462,7 +3552,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
orchestrateMigrateForScale(vmUuid, srcHostId, dest, oldSvcOfferingId);
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
VirtualMachine vm = _vmDao.findByUuid(vmUuid);
placeHolder = createPlaceHolderWork(vm.getId());
}
try {
orchestrateMigrateForScale(vmUuid, srcHostId, dest, oldSvcOfferingId);
} finally {
if (VmJobEnabled.value())
_workJobDao.expunge(placeHolder.getId());
}
} else {
Outcome<VirtualMachine> outcome = migrateVmForScaleThroughJobQueue(vmUuid, srcHostId, dest, oldSvcOfferingId);
@ -3711,7 +3811,17 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
return orchestrateReConfigureVm(vmUuid, oldServiceOffering, reconfiguringOnExistingHost);
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
VirtualMachine vm = _vmDao.findByUuid(vmUuid);
placeHolder = createPlaceHolderWork(vm.getId());
}
try {
return orchestrateReConfigureVm(vmUuid, oldServiceOffering, reconfiguringOnExistingHost);
} finally {
if (VmJobEnabled.value())
_workJobDao.expunge(placeHolder.getId());
}
} else {
Outcome<VirtualMachine> outcome = reconfigureVmThroughJobQueue(vmUuid, oldServiceOffering, reconfiguringOnExistingHost);
@ -4187,7 +4297,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
workJob.setVmType(vm.getType());
workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@ -4240,7 +4350,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setAccountId(account.getId());
workJob.setUserId(user.getId());
workJob.setStep(VmWorkJobVO.Step.Prepare);
workJob.setVmType(vm.getType());
workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@ -4293,7 +4403,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setAccountId(account.getId());
workJob.setUserId(user.getId());
workJob.setStep(VmWorkJobVO.Step.Prepare);
workJob.setVmType(vm.getType());
workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@ -4343,7 +4453,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setAccountId(account.getId());
workJob.setUserId(user.getId());
workJob.setVmType(vm.getType());
workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@ -4397,7 +4507,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setAccountId(account.getId());
workJob.setUserId(user.getId());
workJob.setVmType(vm.getType());
workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@ -4449,7 +4559,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setAccountId(account.getId());
workJob.setUserId(user.getId());
workJob.setVmType(vm.getType());
workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@ -4501,7 +4611,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setAccountId(account.getId());
workJob.setUserId(user.getId());
workJob.setVmType(vm.getType());
workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@ -4551,7 +4661,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setAccountId(account.getId());
workJob.setUserId(user.getId());
workJob.setVmType(vm.getType());
workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@ -4600,7 +4710,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setAccountId(account.getId());
workJob.setUserId(user.getId());
workJob.setVmType(vm.getType());
workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@ -4649,7 +4759,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setAccountId(account.getId());
workJob.setUserId(user.getId());
workJob.setVmType(vm.getType());
workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@ -4700,7 +4810,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
workJob.setAccountId(account.getId());
workJob.setUserId(user.getId());
workJob.setVmType(vm.getType());
workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@ -4856,4 +4966,23 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
public Pair<JobInfo.Status, String> handleVmWorkJob(VmWork work) throws Exception {
return _jobHandlerProxy.handleVmWorkJob(work);
}
private VmWorkJobVO createPlaceHolderWork(long instanceId) {
VmWorkJobVO workJob = new VmWorkJobVO("");
workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_PLACEHOLDER);
workJob.setCmd("");
workJob.setCmdInfo("");
workJob.setAccountId(0);
workJob.setUserId(0);
workJob.setStep(VmWorkJobVO.Step.Starting);
workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(instanceId);
workJob.setInitMsid(ManagementServerNode.getManagementServerId());
_workJobDao.persist(workJob);
return workJob;
}
}

View File

@ -60,9 +60,6 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch
String cmd = job.getCmd();
assert (cmd != null);
if (s_logger.isDebugEnabled())
s_logger.debug("Run VM work job: " + cmd + ", job origin: " + job.getRelated());
Class<?> workClz = null;
try {
workClz = Class.forName(job.getCmd());
@ -80,27 +77,33 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch
return;
}
if (_handlers == null || _handlers.isEmpty()) {
s_logger.error("Invalid startup configuration, no work job handler is found. cmd: " + job.getCmd() + ", job info: " + job.getCmdInfo()
+ ", job origin: " + job.getRelated());
_asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Invalid startup configuration. no job handler is found");
return;
if (s_logger.isDebugEnabled())
s_logger.debug("Run VM work job: " + cmd + " for VM " + work.getVmId() + ", job origin: " + job.getRelated());
try {
if (_handlers == null || _handlers.isEmpty()) {
s_logger.error("Invalid startup configuration, no work job handler is found. cmd: " + job.getCmd() + ", job info: " + job.getCmdInfo()
+ ", job origin: " + job.getRelated());
_asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Invalid startup configuration. no job handler is found");
return;
}
VmWorkJobHandler handler = _handlers.get(work.getHandlerName());
if (handler == null) {
s_logger.error("Unable to find work job handler. handler name: " + work.getHandlerName() + ", job cmd: " + job.getCmd()
+ ", job info: " + job.getCmdInfo() + ", job origin: " + job.getRelated());
_asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Unable to find work job handler");
return;
}
CallContext.register(work.getUserId(), work.getAccountId(), job.getRelated());
Pair<JobInfo.Status, String> result = handler.handleVmWorkJob(work);
_asyncJobMgr.completeAsyncJob(job.getId(), result.first(), 0, result.second());
} finally {
if (s_logger.isDebugEnabled())
s_logger.debug("Done with run of VM work job: " + cmd + " for VM " + work.getVmId() + ", job origin: " + job.getRelated());
}
VmWorkJobHandler handler = _handlers.get(work.getHandlerName());
if (handler == null) {
s_logger.error("Unable to find work job handler. handler name: " + work.getHandlerName() + ", job cmd: " + job.getCmd()
+ ", job info: " + job.getCmdInfo() + ", job origin: " + job.getRelated());
_asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Unable to find work job handler");
return;
}
CallContext.register(work.getUserId(), work.getAccountId(), job.getRelated());
Pair<JobInfo.Status, String> result = handler.handleVmWorkJob(work);
_asyncJobMgr.completeAsyncJob(job.getId(), result.first(), 0, result.second());
} catch(Throwable e) {
s_logger.error("Unable to complete " + job + ", job origin:" + job.getRelated(), e);

View File

@ -30,7 +30,9 @@ import com.cloud.exception.ConcurrentOperationException;
import com.cloud.exception.InsufficientCapacityException;
import com.cloud.exception.ResourceUnavailableException;
public class AsyncJobExecutionContext {
public class AsyncJobExecutionContext {
private static final Logger s_logger = Logger.getLogger(AsyncJobExecutionContext.class);
private AsyncJob _job;
static private AsyncJobManager s_jobMgr;
@ -112,7 +114,8 @@ public class AsyncJobExecutionContext {
}
//
// check failure exception before we disjoin the worker job
// check failure exception before we disjoin the worker job, work job usually fails with exception
// this will help propogate exception between jobs
// TODO : it is ugly and this will become unnecessary after we switch to full-async mode
//
public void disjoinJob(long joinedJobId) throws InsufficientCapacityException,
@ -120,21 +123,34 @@ public class AsyncJobExecutionContext {
assert (_job != null);
AsyncJobJoinMapVO record = s_joinMapDao.getJoinRecord(_job.getId(), joinedJobId);
if (record.getJoinStatus() == JobInfo.Status.FAILED && record.getJoinResult() != null) {
Object exception = JobSerializerHelper.fromObjectSerializedString(record.getJoinResult());
if (exception != null && exception instanceof Exception) {
if (exception instanceof InsufficientCapacityException)
throw (InsufficientCapacityException)exception;
else if (exception instanceof ConcurrentOperationException)
throw (ConcurrentOperationException)exception;
else if (exception instanceof ResourceUnavailableException)
throw (ResourceUnavailableException)exception;
else
throw new RuntimeException((Exception)exception);
s_jobMgr.disjoinJob(_job.getId(), joinedJobId);
if (record.getJoinStatus() == JobInfo.Status.FAILED) {
if (record.getJoinResult() != null) {
Object exception = JobSerializerHelper.fromObjectSerializedString(record.getJoinResult());
if (exception != null && exception instanceof Exception) {
if (exception instanceof InsufficientCapacityException) {
s_logger.error("Job " + joinedJobId + " failed with InsufficientCapacityException");
throw (InsufficientCapacityException)exception;
}
else if (exception instanceof ConcurrentOperationException) {
s_logger.error("Job " + joinedJobId + " failed with ConcurrentOperationException");
throw (ConcurrentOperationException)exception;
}
else if (exception instanceof ResourceUnavailableException) {
s_logger.error("Job " + joinedJobId + " failed with ResourceUnavailableException");
throw (ResourceUnavailableException)exception;
}
else {
s_logger.error("Job " + joinedJobId + " failed with exception");
throw new RuntimeException((Exception)exception);
}
}
} else {
s_logger.error("Job " + joinedJobId + " failed without providing an error object");
throw new RuntimeException("Job " + joinedJobId + " failed without providing an error object");
}
}
s_jobMgr.disjoinJob(_job.getId(), joinedJobId);
}
public void completeJoin(JobInfo.Status joinStatus, String joinResult) {
@ -151,6 +167,8 @@ public class AsyncJobExecutionContext {
public static AsyncJobExecutionContext getCurrentExecutionContext() {
AsyncJobExecutionContext context = s_currentExectionContext.get();
if (context == null) {
// TODO, this has security implicitions
s_logger.warn("Job is executed without a context, setup psudo job for the executing thread");
context = registerPseudoExecutionContext(CallContext.current().getCallingAccountId(),
CallContext.current().getCallingUserId());
}

View File

@ -35,4 +35,6 @@ public interface VmWorkJobDao extends GenericDao<VmWorkJobVO, Long> {
void updateStep(long workJobId, Step step);
void expungeCompletedWorkJobs(Date cutDate);
void expungeLeftoverWorkJobs(long msid);
}

View File

@ -16,8 +16,11 @@
// under the License.
package org.apache.cloudstack.framework.jobs.dao;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
import javax.annotation.PostConstruct;
@ -31,13 +34,16 @@ import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.Transaction;
import com.cloud.utils.db.TransactionCallbackNoReturn;
import com.cloud.utils.db.TransactionLegacy;
import com.cloud.utils.db.TransactionStatus;
import com.cloud.vm.VirtualMachine;
public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implements VmWorkJobDao {
protected SearchBuilder<VmWorkJobVO> PendingWorkJobSearch;
protected SearchBuilder<VmWorkJobVO> PendingWorkJobByCommandSearch;
protected SearchBuilder<VmWorkJobVO> ExpungeWorkJobSearch;
public VmWorkJobDaoImpl() {
}
@ -48,7 +54,6 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
PendingWorkJobSearch.and("jobStatus", PendingWorkJobSearch.entity().getStatus(), Op.EQ);
PendingWorkJobSearch.and("vmType", PendingWorkJobSearch.entity().getVmType(), Op.EQ);
PendingWorkJobSearch.and("vmInstanceId", PendingWorkJobSearch.entity().getVmInstanceId(), Op.EQ);
PendingWorkJobSearch.and("step", PendingWorkJobSearch.entity().getStep(), Op.NEQ);
PendingWorkJobSearch.done();
PendingWorkJobByCommandSearch = createSearchBuilder();
@ -58,11 +63,6 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
PendingWorkJobByCommandSearch.and("step", PendingWorkJobByCommandSearch.entity().getStep(), Op.NEQ);
PendingWorkJobByCommandSearch.and("cmd", PendingWorkJobByCommandSearch.entity().getCmd(), Op.EQ);
PendingWorkJobByCommandSearch.done();
ExpungeWorkJobSearch = createSearchBuilder();
ExpungeWorkJobSearch.and("lastUpdated", ExpungeWorkJobSearch.entity().getLastUpdated(), Op.LT);
ExpungeWorkJobSearch.and("jobStatus", ExpungeWorkJobSearch.entity().getStatus(), Op.NEQ);
ExpungeWorkJobSearch.done();
}
@Override
@ -115,11 +115,80 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
}
@Override
public void expungeCompletedWorkJobs(Date cutDate) {
SearchCriteria<VmWorkJobVO> sc = ExpungeWorkJobSearch.create();
sc.setParameters("lastUpdated", cutDate);
sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
public void expungeCompletedWorkJobs(final Date cutDate) {
// current DAO machenism does not support following usage
/*
SearchCriteria<VmWorkJobVO> sc = ExpungeWorkJobSearch.create();
sc.setParameters("lastUpdated",cutDate);
sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
expunge(sc);
expunge(sc);
*/
Transaction.execute(new TransactionCallbackNoReturn() {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
TransactionLegacy txn = TransactionLegacy.currentTxn();
PreparedStatement pstmt = null;
try {
pstmt = txn.prepareAutoCloseStatement(
"DELETE FROM vm_work_job WHERE id IN (SELECT id FROM async_job WHERE job_dispatcher='VmWorkJobDispatcher' AND job_status != 0 AND last_updated < ?)");
pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
pstmt.execute();
} catch (SQLException e) {
} catch (Throwable e) {
}
try {
pstmt = txn.prepareAutoCloseStatement(
"DELETE FROM async_job WHERE job_dispatcher='VmWorkJobDispatcher' AND job_status != 0 AND last_updated < ?");
pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
pstmt.execute();
} catch (SQLException e) {
} catch (Throwable e) {
}
}
});
}
@Override
public void expungeLeftoverWorkJobs(final long msid) {
// current DAO machenism does not support following usage
/*
SearchCriteria<VmWorkJobVO> sc = ExpungePlaceHolderWorkJobSearch.create();
sc.setParameters("dispatcher", "VmWorkJobPlaceHolder");
sc.setParameters("msid", msid);
expunge(sc);
*/
Transaction.execute(new TransactionCallbackNoReturn() {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
TransactionLegacy txn = TransactionLegacy.currentTxn();
PreparedStatement pstmt = null;
try {
pstmt = txn.prepareAutoCloseStatement(
"DELETE FROM vm_work_job WHERE id IN (SELECT id FROM async_job WHERE (job_dispatcher='VmWorkJobPlaceHolder' OR job_dispatcher='VmWorkJobDispatcher') AND job_init_msid=?)");
pstmt.setLong(1, msid);
pstmt.execute();
} catch (SQLException e) {
} catch (Throwable e) {
}
try {
pstmt = txn.prepareAutoCloseStatement(
"DELETE FROM async_job WHERE (job_dispatcher='VmWorkJobPlaceHolder' OR job_dispatcher='VmWorkJobDispatcher') AND job_init_msid=?");
pstmt.setLong(1, msid);
pstmt.execute();
} catch (SQLException e) {
} catch (Throwable e) {
}
}
});
}
}

View File

@ -59,6 +59,7 @@ import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.framework.jobs.Outcome;
import org.apache.cloudstack.framework.jobs.dao.VmWorkJobDao;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl;
import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
@ -73,6 +74,7 @@ import org.apache.cloudstack.storage.datastore.db.TemplateDataStoreDao;
import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao;
import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO;
import org.apache.cloudstack.storage.image.datastore.ImageStoreEntity;
import org.apache.cloudstack.utils.identity.ManagementServerNode;
import com.cloud.agent.AgentManager;
import com.cloud.agent.api.Answer;
@ -327,6 +329,9 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
@Inject
protected AsyncJobManager _jobMgr;
@Inject
protected VmWorkJobDao _workJobDao;
VmWorkJobHandlerProxy _jobHandlerProxy = new VmWorkJobHandlerProxy(this);
// TODO
@ -911,8 +916,19 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
return orchestrateResizeVolume(volume.getId(), currentSize, newSize,
newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk);
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
placeHolder = createPlaceHolderWork(userVm.getId());
}
try {
return orchestrateResizeVolume(volume.getId(), currentSize, newSize,
newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk);
} finally {
if (VmJobEnabled.value())
_workJobDao.expunge(placeHolder.getId());
}
} else {
Outcome<Volume> outcome = resizeVolumeThroughJobQueue(userVm.getId(), volume.getId(), currentSize, newSize,
newDiskOffering != null ? cmd.getNewDiskOfferingId() : null, shrinkOk);
@ -1102,7 +1118,18 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
return orchestrateAttachVolumeToVM(command.getVirtualMachineId(), command.getId(), command.getDeviceId());
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
placeHolder = createPlaceHolderWork(command.getVirtualMachineId());
}
try {
return orchestrateAttachVolumeToVM(command.getVirtualMachineId(), command.getId(), command.getDeviceId());
} finally {
if (VmJobEnabled.value())
_workJobDao.expunge(placeHolder.getId());
}
} else {
Outcome<Volume> outcome = attachVolumeToVmThroughJobQueue(command.getVirtualMachineId(), command.getId(), command.getDeviceId());
@ -1405,7 +1432,16 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
return orchestrateDetachVolumeFromVM(vmId, volumeId);
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
placeHolder = createPlaceHolderWork(vmId);
}
try {
return orchestrateDetachVolumeFromVM(vmId, volumeId);
} finally {
if (VmJobEnabled.value())
_workJobDao.expunge(placeHolder.getId());
}
} else {
Outcome<Volume> outcome = detachVolumeFromVmThroughJobQueue(vmId, volumeId);
@ -1571,7 +1607,18 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
return orchestrateMigrateVolume(vol.getId(), destPool.getId(), liveMigrateVolume);
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
placeHolder = createPlaceHolderWork(vm.getId());
}
try {
return orchestrateMigrateVolume(vol.getId(), destPool.getId(), liveMigrateVolume);
} finally {
if (VmJobEnabled.value())
_workJobDao.expunge(placeHolder.getId());
}
} else {
Outcome<Volume> outcome = migrateVolumeThroughJobQueue(vm.getId(), vol.getId(), destPool.getId(), liveMigrateVolume);
@ -1662,7 +1709,18 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
return orchestrateTakeVolumeSnapshot(volumeId, policyId, snapshotId, account, quiescevm);
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
placeHolder = createPlaceHolderWork(vm.getId());
}
try {
return orchestrateTakeVolumeSnapshot(volumeId, policyId, snapshotId, account, quiescevm);
} finally {
if (VmJobEnabled.value())
_workJobDao.expunge(placeHolder.getId());
}
} else {
Outcome<Snapshot> outcome = takeVolumeSnapshotThroughJobQueue(vm.getId(), volumeId, policyId, snapshotId, account.getId(), quiescevm);
@ -2190,7 +2248,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
workJob.setVmType(vm.getType());
workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@ -2237,7 +2295,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
workJob.setVmType(vm.getType());
workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@ -2281,7 +2339,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
workJob.setVmType(vm.getType());
workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@ -2326,7 +2384,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
workJob.setVmType(vm.getType());
workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@ -2371,7 +2429,7 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
workJob.setVmType(vm.getType());
workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@ -2428,4 +2486,23 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
public Pair<JobInfo.Status, String> handleVmWorkJob(VmWork work) throws Exception {
return _jobHandlerProxy.handleVmWorkJob(work);
}
private VmWorkJobVO createPlaceHolderWork(long instanceId) {
VmWorkJobVO workJob = new VmWorkJobVO("");
workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_PLACEHOLDER);
workJob.setCmd("");
workJob.setCmdInfo("");
workJob.setAccountId(0);
workJob.setUserId(0);
workJob.setStep(VmWorkJobVO.Step.Starting);
workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(instanceId);
workJob.setInitMsid(ManagementServerNode.getManagementServerId());
_workJobDao.persist(workJob);
return workJob;
}
}

View File

@ -41,10 +41,12 @@ import org.apache.cloudstack.framework.jobs.AsyncJob;
import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.framework.jobs.Outcome;
import org.apache.cloudstack.framework.jobs.dao.VmWorkJobDao;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl;
import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
import org.apache.cloudstack.jobs.JobInfo;
import org.apache.cloudstack.utils.identity.ManagementServerNode;
import com.cloud.event.ActionEvent;
import com.cloud.event.EventTypes;
@ -124,6 +126,9 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
@Inject
AsyncJobManager _jobMgr;
@Inject
VmWorkJobDao _workJobDao;
VmWorkJobHandlerProxy _jobHandlerProxy = new VmWorkJobHandlerProxy(this);
int _vmSnapshotMax;
@ -364,7 +369,17 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
return orchestrateCreateVMSnapshot(vmId, vmSnapshotId, quiescevm);
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
placeHolder = createPlaceHolderWork(vmId);
}
try {
return orchestrateCreateVMSnapshot(vmId, vmSnapshotId, quiescevm);
} finally {
if (VmJobEnabled.value())
_workJobDao.expunge(placeHolder.getId());
}
} else {
Outcome<VMSnapshot> outcome = createVMSnapshotThroughJobQueue(vmId, vmSnapshotId, quiescevm);
@ -452,7 +467,16 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
return orchestrateDeleteVMSnapshot(vmSnapshotId);
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
placeHolder = createPlaceHolderWork(vmSnapshot.getVmId());
}
try {
return orchestrateDeleteVMSnapshot(vmSnapshotId);
} finally {
if (VmJobEnabled.value())
_workJobDao.expunge(placeHolder.getId());
}
} else {
Outcome<VMSnapshot> outcome = deleteVMSnapshotThroughJobQueue(vmSnapshot.getVmId(), vmSnapshotId);
@ -558,7 +582,18 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
return orchestrateRevertToVMSnapshot(vmSnapshotId);
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
placeHolder = createPlaceHolderWork(vmSnapshotVo.getVmId());
}
try {
return orchestrateRevertToVMSnapshot(vmSnapshotId);
} finally {
if (VmJobEnabled.value())
_workJobDao.expunge(placeHolder.getId());
}
} else {
Outcome<VMSnapshot> outcome = revertToVMSnapshotThroughJobQueue(vmSnapshotVo.getVmId(), vmSnapshotId);
@ -684,7 +719,17 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
if (!VmJobEnabled.value() || jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) {
// avoid re-entrance
return orchestrateDeleteAllVMSnapshots(vmId, type);
VmWorkJobVO placeHolder = null;
if (VmJobEnabled.value()) {
placeHolder = createPlaceHolderWork(vmId);
}
try {
return orchestrateDeleteAllVMSnapshots(vmId, type);
} finally {
if (VmJobEnabled.value())
_workJobDao.expunge(placeHolder.getId());
}
} else {
Outcome<VirtualMachine> outcome = deleteAllVMSnapshotsThroughJobQueue(vmId, type);
@ -828,7 +873,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
workJob.setVmType(vm.getType());
workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@ -872,7 +917,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
workJob.setVmType(vm.getType());
workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@ -916,7 +961,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
workJob.setVmType(vm.getType());
workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@ -960,7 +1005,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
workJob.setAccountId(callingAccount.getId());
workJob.setUserId(callingUser.getId());
workJob.setStep(VmWorkJobVO.Step.Starting);
workJob.setVmType(vm.getType());
workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(vm.getId());
workJob.setRelated(AsyncJobExecutionContext.getOriginJobContextId());
@ -1009,4 +1054,23 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
public Pair<JobInfo.Status, String> handleVmWorkJob(VmWork work) throws Exception {
return _jobHandlerProxy.handleVmWorkJob(work);
}
private VmWorkJobVO createPlaceHolderWork(long instanceId) {
VmWorkJobVO workJob = new VmWorkJobVO("");
workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_PLACEHOLDER);
workJob.setCmd("");
workJob.setCmdInfo("");
workJob.setAccountId(0);
workJob.setUserId(0);
workJob.setStep(VmWorkJobVO.Step.Starting);
workJob.setVmType(VirtualMachine.Type.Instance);
workJob.setVmInstanceId(instanceId);
workJob.setInitMsid(ManagementServerNode.getManagementServerId());
_workJobDao.persist(workJob);
return workJob;
}
}