bug 6969: First step of now displaying pending async jobs for listXXXCommands. There is a lot more cleanup and fixing to do but all commands acting against VirtualMachines now work.

This commit is contained in:
will 2010-11-30 19:11:55 -08:00
parent 5b72f25552
commit c67d26cec4
22 changed files with 234 additions and 49 deletions

View File

@ -88,6 +88,7 @@ public class ApiConstants {
public static final String IS_RECURSIVE = "isrecursive";
public static final String ISO_FILTER = "isofilter";
public static final String JOB_ID = "jobid";
public static final String JOB_STATUS = "jobstatus";
public static final String LASTNAME = "lastname";
public static final String LEVEL = "level";
public static final String LUN = "lun";

View File

@ -18,6 +18,7 @@
package com.cloud.api;
import com.cloud.api.response.AsyncJobResponse;
import com.cloud.async.AsyncJob;
/**
* A base command for supporting asynchronous API calls. When an API command is received, the command will be
@ -76,4 +77,19 @@ public abstract class BaseAsyncCmd extends BaseCmd {
public void setStartEventId(Long startEventId) {
this.startEventId = startEventId;
}
/**
* Async commands that want to be tracked as part of the listXXX commands need to
* provide implementations of the two following methods, getInstanceId() and getInstanceType()
*
* getObjectId() should return the id of the object the async command is executing on
* getObjectType() should return a type from the AsyncJobVO.Type enumeration
*/
public Long getInstanceId() {
return null;
}
public AsyncJob.Type getInstanceType() {
return AsyncJob.Type.None;
}
}

View File

@ -1,5 +1,7 @@
package com.cloud.api;
import com.cloud.async.AsyncJob;
public abstract class BaseListCmd extends BaseCmd {
@ -61,4 +63,8 @@ public abstract class BaseListCmd extends BaseCmd {
}
return startIndex;
}
public AsyncJob.Type getInstanceType() {
return AsyncJob.Type.None;
}
}

View File

@ -24,4 +24,33 @@ public interface ResponseObject {
* @param name
*/
void setObjectName(String name);
/**
* Returns the object Id
*/
Long getObjectId();
/**
* Returns the job id
* @return
*/
Long getJobId();
/**
* Sets the job id
* @param jobId
*/
void setJobId(Long jobId);
/**
* Returns the job status
* @return
*/
Integer getJobStatus();
/**
*
* @param jobStatus
*/
void setJobStatus(Integer jobStatus);
}

View File

@ -29,6 +29,7 @@ import com.cloud.api.Implementation;
import com.cloud.api.Parameter;
import com.cloud.api.ServerApiException;
import com.cloud.api.response.UserVmResponse;
import com.cloud.async.AsyncJob;
import com.cloud.event.EventTypes;
import com.cloud.exception.InsufficientStorageCapacityException;
import com.cloud.exception.ResourceAllocationException;
@ -191,6 +192,10 @@ public class DeployVMCmd extends BaseAsyncCmd {
return "deploying Vm";
}
public AsyncJob.Type getInstanceType() {
return AsyncJob.Type.VirtualMachine;
}
@Override
public void execute(){
try {

View File

@ -26,6 +26,7 @@ import com.cloud.api.Implementation;
import com.cloud.api.Parameter;
import com.cloud.api.ServerApiException;
import com.cloud.api.response.UserVmResponse;
import com.cloud.async.AsyncJob;
import com.cloud.event.EventTypes;
import com.cloud.exception.ConcurrentOperationException;
import com.cloud.exception.ResourceUnavailableException;
@ -81,6 +82,14 @@ public class DestroyVMCmd extends BaseAsyncCmd {
public String getEventDescription() {
return "destroying vm: " + getId();
}
public AsyncJob.Type getInstanceType() {
return AsyncJob.Type.VirtualMachine;
}
public Long getInstanceId() {
return getId();
}
@Override
public void execute() throws ResourceUnavailableException, ConcurrentOperationException{

View File

@ -28,6 +28,7 @@ import com.cloud.api.Implementation;
import com.cloud.api.Parameter;
import com.cloud.api.response.ListResponse;
import com.cloud.api.response.UserVmResponse;
import com.cloud.async.AsyncJob;
import com.cloud.uservm.UserVm;
@Implementation(description="List the virtual machines owned by the account.", responseObject=UserVmResponse.class)
@ -126,6 +127,10 @@ public class ListVMsCmd extends BaseListCmd {
return s_name;
}
public AsyncJob.Type getInstanceType() {
return AsyncJob.Type.VirtualMachine;
}
@Override
public void execute(){
List<? extends UserVm> result = _mgr.searchForUserVMs(this);

View File

@ -26,6 +26,7 @@ import com.cloud.api.Implementation;
import com.cloud.api.Parameter;
import com.cloud.api.ServerApiException;
import com.cloud.api.response.UserVmResponse;
import com.cloud.async.AsyncJob;
import com.cloud.event.EventTypes;
import com.cloud.user.Account;
import com.cloud.uservm.UserVm;
@ -79,6 +80,14 @@ public class RebootVMCmd extends BaseAsyncCmd {
return "rebooting user vm: " + getId();
}
public AsyncJob.Type getInstanceType() {
return AsyncJob.Type.VirtualMachine;
}
public Long getInstanceId() {
return getId();
}
@Override
public void execute(){
UserVm result = _userVmService.rebootVirtualMachine(this);

View File

@ -28,6 +28,7 @@ import com.cloud.api.Implementation;
import com.cloud.api.Parameter;
import com.cloud.api.ServerApiException;
import com.cloud.api.response.UserVmResponse;
import com.cloud.async.AsyncJob;
import com.cloud.event.EventTypes;
import com.cloud.user.Account;
import com.cloud.uservm.UserVm;
@ -95,6 +96,14 @@ public class ResetVMPasswordCmd extends BaseAsyncCmd {
public String getEventDescription() {
return "resetting password for vm: " + getId();
}
public AsyncJob.Type getInstanceType() {
return AsyncJob.Type.VirtualMachine;
}
public Long getInstanceId() {
return getId();
}
Random _rand = new Random(System.currentTimeMillis());
@Override

View File

@ -26,6 +26,7 @@ import com.cloud.api.Implementation;
import com.cloud.api.Parameter;
import com.cloud.api.ServerApiException;
import com.cloud.api.response.UserVmResponse;
import com.cloud.async.AsyncJob;
import com.cloud.event.EventTypes;
import com.cloud.exception.ConcurrentOperationException;
import com.cloud.exception.InsufficientCapacityException;
@ -89,6 +90,14 @@ public class StartVMCmd extends BaseAsyncCmd {
return "starting user vm: " + getId();
}
public AsyncJob.Type getInstanceType() {
return AsyncJob.Type.VirtualMachine;
}
public Long getInstanceId() {
return getId();
}
@Override
public void execute() throws ResourceUnavailableException, InsufficientCapacityException{
try {

View File

@ -26,6 +26,7 @@ import com.cloud.api.Implementation;
import com.cloud.api.Parameter;
import com.cloud.api.ServerApiException;
import com.cloud.api.response.UserVmResponse;
import com.cloud.async.AsyncJob;
import com.cloud.event.EventTypes;
import com.cloud.exception.ConcurrentOperationException;
import com.cloud.user.Account;
@ -84,6 +85,14 @@ public class StopVMCmd extends BaseAsyncCmd {
public String getEventDescription() {
return "stopping user vm: " + getId();
}
public AsyncJob.Type getInstanceType() {
return AsyncJob.Type.VirtualMachine;
}
public Long getInstanceId() {
return getId();
}
@Override
public void execute() throws ServerApiException, ConcurrentOperationException{

View File

@ -1,11 +1,14 @@
package com.cloud.api.response;
import com.cloud.api.ApiConstants;
import com.cloud.api.ResponseObject;
import com.cloud.serializer.Param;
import com.google.gson.annotations.SerializedName;
public class BaseResponse implements ResponseObject {
private transient String responseName;
private transient String objectName;
@Override
public String getResponseName() {
return responseName;
@ -25,5 +28,32 @@ public class BaseResponse implements ResponseObject {
public void setObjectName(String objectName) {
this.objectName = objectName;
}
public Long getObjectId() {
return null;
}
// For use by list commands with pending async jobs
@SerializedName(ApiConstants.JOB_ID) @Param(description="the ID of the latest async job acting on this object")
private Long jobId;
@SerializedName(ApiConstants.JOB_STATUS) @Param(description="the current status of the latest async job acting on this object")
private Integer jobStatus;
public Long getJobId() {
return jobId;
}
public void setJobId(Long jobId) {
this.jobId = jobId;
}
public Integer getJobStatus() {
return jobStatus;
}
public void setJobStatus(Integer jobStatus) {
this.jobStatus = jobStatus;
}
}

View File

@ -28,12 +28,6 @@ public class DomainRouterResponse extends BaseResponse {
@SerializedName(ApiConstants.ID) @Param(description="the id of the router")
private Long id;
@SerializedName(ApiConstants.JOB_ID) @Param(description="the job ID associated with the router. This is only displayed if the router listed is part of a currently running asynchronous job.")
private Long jobId;
@SerializedName("jobstatus") @Param(description="the job status associated with the router. This is only displayed if the router listed is part of a currently running asynchronous job.")
private Integer jobStatus;
@SerializedName(ApiConstants.ZONE_ID) @Param(description="the Zone ID for the router")
private Long zoneId;
@ -117,22 +111,6 @@ public class DomainRouterResponse extends BaseResponse {
this.id = id;
}
public Long getJobId() {
return jobId;
}
public void setJobId(Long jobId) {
this.jobId = jobId;
}
public Integer getJobStatus() {
return jobStatus;
}
public void setJobStatus(Integer jobStatus) {
this.jobStatus = jobStatus;
}
public Long getZoneId() {
return zoneId;
}

View File

@ -141,6 +141,10 @@ public class UserVmResponse extends BaseResponse {
@SerializedName("jobstatus") @Param(description="shows the current pending asynchronous job status")
private Integer jobStatus;
public Long getObjectId() {
return getId();
}
public Long getId() {
return id;
}

View File

@ -20,6 +20,15 @@ package com.cloud.async;
import java.util.Date;
public interface AsyncJob {
public enum Type {
None,
VirtualMachine,
Router,
Volume,
ConsoleProxy,
Snapshot
}
Long getId();
long getUserId();
long getAccountId();
@ -38,7 +47,7 @@ public interface AsyncJob {
Date getLastUpdated();
Date getLastPolled();
Date getRemoved();
String getInstanceType();
Type getInstanceType();
Long getInstanceId();
String getSessionKey();
String getCmdOriginator();

View File

@ -22,6 +22,8 @@ import java.util.Date;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
@ -81,9 +83,10 @@ public class AsyncJobVO implements AsyncJob {
@Column(name="job_result", length=65535)
private String result;
@Enumerated(value=EnumType.STRING)
@Column(name="instance_type", length=64)
private String instanceType;
private Type instanceType;
@Column(name="instance_id", length=64)
private Long instanceId;
@ -296,11 +299,11 @@ public class AsyncJobVO implements AsyncJob {
}
@Override
public String getInstanceType() {
public Type getInstanceType() {
return instanceType;
}
public void setInstanceType(String instanceType) {
public void setInstanceType(Type instanceType) {
this.instanceType = instanceType;
}

View File

@ -606,12 +606,6 @@ public class ApiResponseHelper implements ResponseGenerator {
DomainRouterResponse routerResponse = new DomainRouterResponse();
routerResponse.setId(router.getId());
AsyncJobVO asyncJob = ApiDBUtils.findInstancePendingAsyncJob("domain_router", router.getId());
if (asyncJob != null) {
routerResponse.setJobId(asyncJob.getId());
routerResponse.setJobStatus(asyncJob.getStatus());
}
routerResponse.setZoneId(router.getDataCenterId());
routerResponse.setZoneName(ApiDBUtils.findZoneById(router.getDataCenterId()).getName());
routerResponse.setDns1(router.getDns1());
@ -1266,12 +1260,6 @@ public class ApiResponseHelper implements ResponseGenerator {
routerResponse.setDomainName(ApiDBUtils.findDomainById(accountTemp.getDomainId()).getName());
}
AsyncJobVO asyncJob = ApiDBUtils.findInstancePendingAsyncJob("domain_router", router.getId());
if (asyncJob != null) {
routerResponse.setJobId(asyncJob.getId());
routerResponse.setJobStatus(asyncJob.getStatus());
}
List<? extends Nic> nics = ApiDBUtils.getNics(router);
for (Nic singleNic : nics) {
Long configId = singleNic.getNetworkId();
@ -1912,7 +1900,7 @@ public class ApiResponseHelper implements ResponseGenerator {
jobResponse.setCreated(job.getCreated());
jobResponse.setId(job.getId());
jobResponse.setJobInstanceId(job.getInstanceId());
jobResponse.setJobInstanceType(job.getInstanceType());
jobResponse.setJobInstanceType(job.getInstanceType().toString());
jobResponse.setJobProcStatus(job.getProcessStatus());
jobResponse.setJobResult((ResponseObject)ApiSerializerHelper.fromSerializedString(job.getResult()));
jobResponse.setJobResultCode(job.getResultCode());

View File

@ -80,6 +80,8 @@ import org.apache.http.protocol.ResponseServer;
import org.apache.log4j.Logger;
import com.cloud.api.response.ApiResponseSerializer;
import com.cloud.api.response.ListResponse;
import com.cloud.async.AsyncJob;
import com.cloud.async.AsyncJobManager;
import com.cloud.async.AsyncJobVO;
import com.cloud.configuration.ConfigurationVO;
@ -363,6 +365,9 @@ public class ApiServer implements HttpRequestHandler {
}
private String queueCommand(BaseCmd cmdObj, Map<String, String> params) {
UserContext ctx = UserContext.current();
Long userId = ctx.getUserId();
Account account = ctx.getAccount();
if (cmdObj instanceof BaseAsyncCmd) {
Long objectId = null;
if (cmdObj instanceof BaseAsyncCreateCmd) {
@ -375,10 +380,10 @@ public class ApiServer implements HttpRequestHandler {
}
BaseAsyncCmd asyncCmd = (BaseAsyncCmd)cmdObj;
if (objectId != null) {
objectId = asyncCmd.getInstanceId();
}
UserContext ctx = UserContext.current();
Long userId = ctx.getUserId();
Account account = ctx.getAccount();
if (userId != null) {
params.put("ctxUserId", userId.toString());
}
@ -395,6 +400,8 @@ public class ApiServer implements HttpRequestHandler {
}
AsyncJobVO job = new AsyncJobVO();
job.setInstanceId(asyncCmd.getInstanceId());
job.setInstanceType(asyncCmd.getInstanceType());
job.setUserId(userId);
if (account != null) {
job.setAccountId(ctx.getAccount().getId());
@ -414,8 +421,36 @@ public class ApiServer implements HttpRequestHandler {
return ApiResponseSerializer.toSerializedString(asyncCmd.getResponse(jobId), asyncCmd.getResponseType());
} else {
_dispatcher.dispatch(cmdObj, params);
// if the command is of the listXXXCommand, we will need to also return the
// the job id and status if possible
if (cmdObj instanceof BaseListCmd) {
buildAsyncListResponse((BaseListCmd)cmdObj, account);
}
return ApiResponseSerializer.toSerializedString((ResponseObject)cmdObj.getResponseObject(), cmdObj.getResponseType());
}
}
private void buildAsyncListResponse(BaseListCmd command, Account account) {
List<ResponseObject> responses = ((ListResponse)command.getResponseObject()).getResponses();
if (responses != null && responses.size() > 0) {
List<? extends AsyncJob> jobs = _asyncMgr.findInstancePendingAsyncJobs(command.getInstanceType(), account.getId());
if (jobs.size() == 0) {
return;
}
// Using maps might possibly be more efficient if the set is large enough but for now, we'll just n squared
// comparison of two lists. Either way, there shouldn't be too many async jobs active for the account.
for (AsyncJob job : jobs) {
if (job.getInstanceId() == null) continue;
for (ResponseObject response : responses) {
if (job.getInstanceId() == response.getObjectId()) {
response.setJobId(job.getId());
response.setJobStatus(job.getStatus());
}
}
}
}
}
private void buildAuditTrail(StringBuffer auditTrailSb, String command, String result) {

View File

@ -18,16 +18,20 @@
package com.cloud.async;
import java.util.List;
import com.cloud.api.commands.QueryAsyncJobResultCmd;
import com.cloud.exception.InvalidParameterValueException;
import com.cloud.exception.PermissionDeniedException;
import com.cloud.user.UserAccount;
import com.cloud.utils.component.Manager;
public interface AsyncJobManager extends Manager {
public AsyncJobExecutorContext getExecutorContext();
public AsyncJobVO getAsyncJob(long jobId);
public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId);
public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId);
public List<? extends AsyncJob> findInstancePendingAsyncJobs(AsyncJob.Type instanceType, long accountId);
public long submitAsyncJob(AsyncJobVO job);
public long submitAsyncJob(AsyncJobVO job, boolean scheduleJobExecutionInContext);

View File

@ -99,6 +99,11 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
@Override
public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId) {
return _jobDao.findInstancePendingAsyncJob(instanceType, instanceId);
}
@Override
public List<AsyncJobVO> findInstancePendingAsyncJobs(AsyncJob.Type instanceType, long accountId) {
return _jobDao.findInstancePendingAsyncJobs(instanceType, accountId);
}
@Override
@ -111,7 +116,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
Transaction txt = Transaction.currentTxn();
try {
txt.start();
job.setInitMsid(getMsid());
job.setInitMsid(getMsid());
_jobDao.persist(job);
txt.commit();
@ -210,7 +215,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
txt.start();
AsyncJobVO job = _jobDao.createForUpdate();
job.setInstanceType(instanceType);
//job.setInstanceType(instanceType);
job.setInstanceId(instanceId);
job.setLastUpdated(DateUtil.currentGMTTime());
_jobDao.update(jobId, job);

View File

@ -21,10 +21,12 @@ package com.cloud.async.dao;
import java.util.Date;
import java.util.List;
import com.cloud.async.AsyncJob;
import com.cloud.async.AsyncJobVO;
import com.cloud.utils.db.GenericDao;
public interface AsyncJobDao extends GenericDao<AsyncJobVO, Long> {
AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId);
List<AsyncJobVO> findInstancePendingAsyncJobs(AsyncJob.Type instanceType, long accountId);
List<AsyncJobVO> getExpiredJobs(Date cutTime, int limit);
}

View File

@ -25,6 +25,7 @@ import javax.ejb.Local;
import org.apache.log4j.Logger;
import com.cloud.async.AsyncJob;
import com.cloud.async.AsyncJobResult;
import com.cloud.async.AsyncJobVO;
import com.cloud.utils.db.Filter;
@ -36,7 +37,8 @@ import com.cloud.utils.db.SearchCriteria;
public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements AsyncJobDao {
private static final Logger s_logger = Logger.getLogger(AsyncJobDaoImpl.class.getName());
private SearchBuilder<AsyncJobVO> pendingAsyncJobSearch;
private SearchBuilder<AsyncJobVO> pendingAsyncJobSearch;
private SearchBuilder<AsyncJobVO> pendingAsyncJobsSearch;
private SearchBuilder<AsyncJobVO> expiringAsyncJobSearch;
public AsyncJobDaoImpl() {
@ -49,6 +51,15 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
SearchCriteria.Op.EQ);
pendingAsyncJobSearch.done();
pendingAsyncJobsSearch = createSearchBuilder();
pendingAsyncJobsSearch.and("instanceType", pendingAsyncJobsSearch.entity().getInstanceType(),
SearchCriteria.Op.EQ);
pendingAsyncJobsSearch.and("accountId", pendingAsyncJobsSearch.entity().getAccountId(),
SearchCriteria.Op.EQ);
pendingAsyncJobsSearch.and("status", pendingAsyncJobsSearch.entity().getStatus(),
SearchCriteria.Op.EQ);
pendingAsyncJobsSearch.done();
expiringAsyncJobSearch = createSearchBuilder();
expiringAsyncJobSearch.and("created", expiringAsyncJobSearch.entity().getCreated(),
SearchCriteria.Op.LTEQ);
@ -72,6 +83,15 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
return null;
}
public List<AsyncJobVO> findInstancePendingAsyncJobs(AsyncJob.Type instanceType, long accountId) {
SearchCriteria<AsyncJobVO> sc = pendingAsyncJobsSearch.create();
sc.setParameters("instanceType", instanceType);
sc.setParameters("accountId", accountId);
sc.setParameters("status", AsyncJobResult.STATUS_IN_PROGRESS);
return listBy(sc);
}
public List<AsyncJobVO> getExpiredJobs(Date cutTime, int limit) {
SearchCriteria<AsyncJobVO> sc = expiringAsyncJobSearch.create();
sc.setParameters("created", cutTime);