Refactoring the AsyncJobManager to queue jobs appropriately if there is a need to synchronize execution on an object, e.g. a router. API developers can now call command.synchronizeCommand(String, Long) to force the command to be synchronized on a particular object type [the string arg] with a particular id [the long arg]. When synchronizeCommand() is invoked, an exception maybe thrown by the framework (AsyncCommandQueued exception) to force the business logic to abort. The command will then be queued and invoked at the appropriate time. The synchronizeCommand() is re-entrant and will be a no-op if the command has already been queued and is now ready for execution.

This commit is contained in:
Kris McQueen 2010-09-16 19:04:58 -07:00
parent 83820075b3
commit 3f6a438d92
12 changed files with 219 additions and 66 deletions

View File

@ -28,6 +28,7 @@ import javax.persistence.Id;
import javax.persistence.Table;
import javax.persistence.Temporal;
import javax.persistence.TemporalType;
import javax.persistence.Transient;
import com.cloud.utils.db.GenericDao;
@ -106,7 +107,13 @@ public class AsyncJobVO {
@Column(name=GenericDao.REMOVED_COLUMN)
private Date removed;
@Transient
private SyncQueueItemVO syncSource = null;
@Transient
private boolean fromPreviousSession = false;
public AsyncJobVO() {
}
@ -302,6 +309,22 @@ public class AsyncJobVO {
this.cmdOriginator = cmdOriginator;
}
public SyncQueueItemVO getSyncSource() {
return syncSource;
}
public void setSyncSource(SyncQueueItemVO syncSource) {
this.syncSource = syncSource;
}
public boolean isFromPreviousSession() {
return fromPreviousSession;
}
public void setFromPreviousSession(boolean fromPreviousSession) {
this.fromPreviousSession = fromPreviousSession;
}
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("AsyncJobVO {id:").append(getId());

View File

@ -1,3 +1,20 @@
/**
* Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
*
* This software is licensed under the GNU General Public License v3 or later.
*
* It is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package com.cloud.api;
import java.lang.reflect.Field;

View File

@ -357,6 +357,16 @@ public class ApiServer implements HttpRequestHandler {
Gson gson = GsonHelper.getBuilder().create();
UserContext ctx = UserContext.current();
Long userId = ctx.getUserId();
Account account = (Account)ctx.getAccountObject();
if (userId != null) {
params.put("ctxUserId", userId.toString());
}
if (account != null) {
params.put("ctxAccountId", account.getId().toString());
}
AsyncJobVO job = new AsyncJobVO();
job.setUserId(UserContext.current().getUserId());
job.setCmd(cmdObj.getClass().getName());

View File

@ -1,5 +1,7 @@
package com.cloud.api;
import com.cloud.async.AsyncJobManager;
import com.cloud.async.AsyncJobVO;
import com.cloud.serializer.SerializerHelper;
/**
@ -7,13 +9,24 @@ import com.cloud.serializer.SerializerHelper;
* serialized to the queue (currently the async_job table) and a response will be immediately returned with the
* id of the queue object. The id can be used to query the status/progress of the command using the
* queryAsyncJobResult API command.
*
* TODO: Create commands are often async and yet they need to return the id of object in question, e.g. deployVirtualMachine
* should return a virtual machine id, createPortForwardingServiceRule should return a rule id, and createVolume should return
* a volume id.
*/
public abstract class BaseAsyncCmd extends BaseCmd {
private AsyncJobManager _asyncJobMgr = null;
private AsyncJobVO _job = null;
public String getResponse(long jobId) {
return SerializerHelper.toSerializedString(Long.valueOf(jobId));
}
public void setAsyncJobManager(AsyncJobManager mgr) {
_asyncJobMgr = mgr;
}
public void synchronizeCommand(String syncObjType, long syncObjId) {
_asyncJobMgr.syncAsyncJobExecution(_job, syncObjType, syncObjId);
}
public void setJob(AsyncJobVO job) {
_job = job;
}
}

View File

@ -0,0 +1,35 @@
/**
* Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
*
* This software is licensed under the GNU General Public License v3 or later.
*
* It is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package com.cloud.async;
import com.cloud.utils.SerialVersionUID;
public class AsyncCommandQueued extends RuntimeException {
private static final long serialVersionUID = SerialVersionUID.AsyncCommandQueued;
private SyncQueueVO _queue = null;
public AsyncCommandQueued(SyncQueueVO queue, String msg) {
super(msg);
_queue = queue;
}
public SyncQueueVO getQueue() {
return _queue;
}
}

View File

@ -33,6 +33,6 @@ public interface AsyncJobManager extends Manager {
public void updateAsyncJobStatus(long jobId, int processStatus, Object resultObject);
public void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId);
public void syncAsyncJobExecution(long jobId, String syncObjType, long syncObjId);
public void syncAsyncJobExecution(AsyncJobVO job, String syncObjType, long syncObjId);
public void releaseSyncSource(AsyncJobExecutor executor);
}

View File

@ -35,13 +35,16 @@ import org.apache.log4j.Logger;
import org.apache.log4j.NDC;
import com.cloud.api.ApiDispatcher;
import com.cloud.api.BaseCmd;
import com.cloud.api.BaseAsyncCmd;
import com.cloud.async.dao.AsyncJobDao;
import com.cloud.cluster.ClusterManager;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.maid.StackMaid;
import com.cloud.serializer.GsonHelper;
import com.cloud.serializer.SerializerHelper;
import com.cloud.user.Account;
import com.cloud.user.UserContext;
import com.cloud.user.dao.AccountDao;
import com.cloud.utils.DateUtil;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.component.ComponentLocator;
@ -68,6 +71,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
private AsyncJobExecutorContext _context;
private SyncQueueManager _queueMgr;
private ClusterManager _clusterMgr;
private AccountDao _accountDao;
private AsyncJobDao _jobDao;
private long _jobExpireSeconds = 86400; // 1 day
private ApiDispatcher _dispatcher;
@ -95,37 +99,30 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
public long submitAsyncJob(AsyncJobVO job) {
return submitAsyncJob(job, false);
}
@Override @DB
public long submitAsyncJob(AsyncJobVO job, boolean scheduleJobExecutionInContext) {
if(s_logger.isDebugEnabled())
s_logger.debug("submit async job-" + job.getId() + ", details: " + job.toString());
// AsyncJobExecutor executor = getJobExecutor(job);
// if(executor == null) {
// s_logger.error("Unable to find executor to execute command " + job.getCmd() + " for job-" + job.getId());
// } else {
Transaction txt = Transaction.currentTxn();
try {
txt.start();
job.setInitMsid(getMsid());
_jobDao.persist(job);
txt.commit();
// no sync source originally
// FIXME: sync source for commands? how does new API framework handle that?
// executor.setSyncSource(null);
// executor.setJob(job);
// scheduleExecution(executor, scheduleJobExecutionInContext);
scheduleExecution(job, scheduleJobExecutionInContext);
return job.getId();
} catch(Exception e) {
s_logger.error("Unexpected exception: ", e);
txt.rollback();
}
// }
Transaction txt = Transaction.currentTxn();
try {
txt.start();
job.setInitMsid(getMsid());
_jobDao.persist(job);
txt.commit();
// no sync source originally
job.setSyncSource(null);
scheduleExecution(job, scheduleJobExecutionInContext);
return job.getId();
} catch(Exception e) {
s_logger.error("Unexpected exception: ", e);
txt.rollback();
}
return 0L;
}
@Override @DB
public void completeAsyncJob(long jobId, int jobStatus, int resultCode, Object resultObject) {
if(s_logger.isDebugEnabled())
@ -219,10 +216,19 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
}
@Override
public void syncAsyncJobExecution(long jobId, String syncObjType, long syncObjId) {
if(s_logger.isDebugEnabled())
s_logger.debug("Sync job-" + jobId + " execution on object " + syncObjType + "." + syncObjId);
public void syncAsyncJobExecution(AsyncJobVO job, String syncObjType, long syncObjId) {
// This method is re-entrant. If an API developer wants to synchronized on an object, e.g. the router,
// when executing business logic, they will call this method (actually a method in BaseAsyncCmd that calls this).
// This method will get called every time their business logic executes. The first time it exectues for a job
// there will be no sync source, but on subsequent execution there will be a sync souce. If this is the first
// time the job executes we queue the job, otherwise we just return so that the business logic can execute.
if (job.getSyncSource() != null) {
return;
}
if(s_logger.isDebugEnabled())
s_logger.debug("Sync job-" + job.getId() + " execution on object " + syncObjType + "." + syncObjId);
SyncQueueVO queue = null;
// to deal with temporary DB exceptions like DB deadlock/Lock-wait time out cased rollbacks
@ -230,7 +236,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
Random random = new Random();
for(int i = 0; i < 5; i++) {
queue = _queueMgr.queue(syncObjType, syncObjId, "AsyncJob", jobId);
queue = _queueMgr.queue(syncObjType, syncObjId, "AsyncJob", job.getId());
if(queue != null)
break;
@ -239,12 +245,20 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
} catch (InterruptedException e) {
}
}
if(queue != null) {
checkQueue(queue.getId());
if (queue == null) {
throw new CloudRuntimeException("Unable to insert queue item into database, DB is full?");
} else {
throw new CloudRuntimeException("Unable to insert queue item into database, DB is full?");
throw new AsyncCommandQueued(queue, "job-" + job.getId() + " queued");
}
/*
if (queue != null) {
checkQueue(queue.getId());
} else {
throw new CloudRuntimeException("Unable to insert queue item into database, DB is full?");
}
*/
}
@Override @DB
@ -388,7 +402,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
}
private void scheduleExecution(final AsyncJobVO job, boolean executeInContext) {
Runnable runnable = getExecutorRunnable(job);
Runnable runnable = getExecutorRunnable(this, job);
if (executeInContext) {
runnable.run();
} else {
@ -396,7 +410,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
}
}
private Runnable getExecutorRunnable(final AsyncJobVO job) {
private Runnable getExecutorRunnable(final AsyncJobManager mgr, final AsyncJobVO job) {
return new Runnable() {
public void run() {
long jobId = 0;
@ -411,38 +425,65 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
}
Class<?> cmdClass = Class.forName(job.getCmd());
BaseCmd cmdObj = (BaseCmd)cmdClass.newInstance();
BaseAsyncCmd cmdObj = (BaseAsyncCmd)cmdClass.newInstance();
cmdObj.setAsyncJobManager(mgr);
cmdObj.setJob(job);
Type mapType = new TypeToken<Map<String, String>>() {}.getType();
Gson gson = GsonHelper.getBuilder().create();
Map<String, String> params = gson.fromJson(job.getCmdInfo(), mapType);
// FIXME: whenever we deserialize, the UserContext needs to be updated
//UserContext.registerContext(userId, accountObject, accountName, accountId, domainId, sessionId, apiServer);
// whenever we deserialize, the UserContext needs to be updated
String userIdStr = params.get("ctxUserId");
String acctIdStr = params.get("ctxAccoutId");
Long userId = null;
Account accountObject = null;
// FIXME: things might need to be queued as part of synchronization here, so they just have to be re-dispatched from the queue
// mechanism...
if (userIdStr != null) {
userId = Long.parseLong(userIdStr);
}
if (acctIdStr != null) {
accountObject = _accountDao.findById(Long.parseLong(acctIdStr));
}
UserContext.registerContext(userId, accountObject, null, null, null, null, false);
// dispatch could ultimately queue the job
_dispatcher.dispatch(cmdObj, params);
// serialize this to the async job table
completeAsyncJob(jobId, AsyncJobResult.STATUS_SUCCEEDED, 0, cmdObj.getResponse());
if(s_logger.isDebugEnabled())
// FIXME: things might need to be queued as part of synchronization here, so they just have to be re-dispatched from the queue
// mechanism...
if (job.getSyncSource() != null) {
_queueMgr.purgeItem(job.getSyncSource().getId());
checkQueue(job.getSyncSource().getQueueId());
}
if (s_logger.isDebugEnabled())
s_logger.debug("Done executing " + job.getCmd() + " for job-" + jobId);
} catch(Throwable e) {
s_logger.error("Unexpected exception while executing " + job.getCmd(), e);
/* FIXME: need to clean up any queue that happened as part of the dispatching
try {
if(executor.getSyncSource() != null) {
_queueMgr.purgeItem(executor.getSyncSource().getId());
checkQueue(executor.getSyncSource().getQueueId());
if (e instanceof AsyncCommandQueued) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("job " + job.getCmd() + " for job-" + jobId + " was queued, processing the queue.");
}
checkQueue(((AsyncCommandQueued)e).getQueue().getId());
} else {
s_logger.error("Unexpected exception while executing " + job.getCmd(), e);
//FIXME: need to clean up any queue that happened as part of the dispatching and move on to the next item in the queue
try {
if (job.getSyncSource() != null) {
_queueMgr.purgeItem(job.getSyncSource().getId());
checkQueue(job.getSyncSource().getQueueId());
}
} catch(Throwable ex) {
s_logger.fatal("Exception on exception, log it for record", ex);
}
} catch(Throwable ex) {
s_logger.fatal("Exception on exception, log it for record", ex);
}
*/
} finally {
StackMaid.current().exitCleanup();
txn.close();
@ -488,6 +529,8 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
s_logger.debug("Schedule queued job-" + job.getId());
}
job.setFromPreviousSession(fromPreviousSession);
job.setSyncSource(item);
scheduleExecution(job);
} else {
if(s_logger.isDebugEnabled())
@ -635,7 +678,11 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
int expireMinutes = NumbersUtil.parseInt(configs.get("job.expire.minutes"), 24*60);
_jobExpireSeconds = (long)expireMinutes*60;
_accountDao = locator.getDao(AccountDao.class);
if (_accountDao == null) {
throw new ConfigurationException("Unable to get " + AccountDao.class.getName());
}
_jobDao = locator.getDao(AsyncJobDao.class);
if (_jobDao == null) {
throw new ConfigurationException("Unable to get "

View File

@ -46,7 +46,7 @@ public class AssignSecurityGroupExecutor extends BaseAsyncJobExecutor {
AsyncJobVO job = getJob();
ManagementServer managementServer = asyncMgr.getExecutorContext().getManagementServer();
SecurityGroupParam param = gson.fromJson(job.getCmdInfo(), SecurityGroupParam.class);
/*
if(getSyncSource() == null) {
DomainRouterVO router = getRouterSyncSource(param);
if(router == null) {
@ -87,7 +87,8 @@ public class AssignSecurityGroupExecutor extends BaseAsyncJobExecutor {
asyncMgr.completeAsyncJob(getJob().getId(), AsyncJobResult.STATUS_FAILED, BaseCmd.INTERNAL_ERROR,
e.getMessage());
}
}
}
*/
return true;
}

View File

@ -30,6 +30,7 @@ public class AuthorizeNetworkGroupIngressExecutor extends BaseAsyncJobExecutor {
NetworkGroupIngressParam param = gson.fromJson(job.getCmdInfo(), NetworkGroupIngressParam.class);
AccountVO account = param.getAccount();
/*
if (getSyncSource() == null) {
NetworkGroupVO networkGroup = managementServer.findNetworkGroupByName(param.getAccount().getId(), param.getGroupName());
if(networkGroup == null) {
@ -59,6 +60,7 @@ public class AuthorizeNetworkGroupIngressExecutor extends BaseAsyncJobExecutor {
"Failed to authorize network group ingress from group: " + param.getGroupName() + " for account: " + account.getAccountName() + " (id: " + account.getId() + ")");
}
}
*/
return true;
}

View File

@ -44,7 +44,7 @@ public class VolumeOperationExecutor extends BaseAsyncJobExecutor {
Gson gson = GsonHelper.getBuilder().create();
AsyncJobManager asyncMgr = getAsyncJobMgr();
AsyncJobVO job = getJob();
/*
if (getSyncSource() == null) {
VolumeOperationParam param = gson.fromJson(job.getCmdInfo(), VolumeOperationParam.class);
asyncMgr.syncAsyncJobExecution(job.getId(), "Volume", param.getVolumeId());
@ -104,9 +104,10 @@ public class VolumeOperationExecutor extends BaseAsyncJobExecutor {
asyncMgr.getExecutorContext().getManagementServer().saveEvent(param.getUserId(), param.getAccountId(), EventVO.LEVEL_ERROR, eventType, failureDescription, null, param.getEventId());
}
}
return true;
}
*/
return true;
}
public void processAnswer(VolumeOperationListener listener, long agentId, long seq, Answer answer) {
@ -138,12 +139,12 @@ public class VolumeOperationExecutor extends BaseAsyncJobExecutor {
resultObject.setDiskOfferingName(getAsyncJobMgr().getExecutorContext().getManagementServer().findDiskOfferingById(diskOfferingId).getName());
resultObject.setDiskOfferingDisplayText(getAsyncJobMgr().getExecutorContext().getManagementServer().findDiskOfferingById(diskOfferingId).getDisplayText());
}
resultObject.setDomain(getAsyncJobMgr().getExecutorContext().getManagementServer().findDomainIdById(volume.getDomainId()).getName());
// resultObject.setDomain(getAsyncJobMgr().getExecutorContext().getManagementServer().findDomainIdById(volume.getDomainId()).getName());
resultObject.setStorageType("shared"); // NOTE: You can never create a local disk volume but if that changes, we need to change this
if (volume.getPoolId() != null)
resultObject.setStorage(getAsyncJobMgr().getExecutorContext().getManagementServer().findPoolById(volume.getPoolId()).getName());
resultObject.setZoneId(volume.getDataCenterId());
resultObject.setZoneName(getAsyncJobMgr().getExecutorContext().getManagementServer().getDataCenterBy(volume.getDataCenterId()).getName());
// resultObject.setZoneName(getAsyncJobMgr().getExecutorContext().getManagementServer().getDataCenterBy(volume.getDataCenterId()).getName());
return resultObject;
}

View File

@ -1933,6 +1933,9 @@ public class NetworkManagerImpl implements NetworkManager, VirtualMachineManager
throw new InvalidParameterValueException("Failed to assign to load balancer " + loadBalancerId + ", the load balancer was not found.");
}
DomainRouterVO syncObject = _routerDao.findByPublicIpAddress(loadBalancer.getIpAddress());
cmd.synchronizeCommand("Router", syncObject.getId());
// Permission check...
Account account = (Account)UserContext.current().getAccountObject();
if (account != null) {

View File

@ -56,4 +56,5 @@ public interface SerialVersionUID {
public static final long InfficientVirtualNetworkCapacityException = Base | 0x1a;
public static final long DiscoveryException = Base | 0x1b;
public static final long CloudAuthenticationException = Base | 0x1c;
public static final long AsyncCommandQueued = Base | 0x1d;
}