diff --git a/core/src/com/cloud/async/AsyncJobVO.java b/core/src/com/cloud/async/AsyncJobVO.java index 0e5fed8bf0a..eecfb410da5 100644 --- a/core/src/com/cloud/async/AsyncJobVO.java +++ b/core/src/com/cloud/async/AsyncJobVO.java @@ -28,6 +28,7 @@ import javax.persistence.Id; import javax.persistence.Table; import javax.persistence.Temporal; import javax.persistence.TemporalType; +import javax.persistence.Transient; import com.cloud.utils.db.GenericDao; @@ -106,7 +107,13 @@ public class AsyncJobVO { @Column(name=GenericDao.REMOVED_COLUMN) private Date removed; - + + @Transient + private SyncQueueItemVO syncSource = null; + + @Transient + private boolean fromPreviousSession = false; + public AsyncJobVO() { } @@ -302,6 +309,22 @@ public class AsyncJobVO { this.cmdOriginator = cmdOriginator; } + public SyncQueueItemVO getSyncSource() { + return syncSource; + } + + public void setSyncSource(SyncQueueItemVO syncSource) { + this.syncSource = syncSource; + } + + public boolean isFromPreviousSession() { + return fromPreviousSession; + } + + public void setFromPreviousSession(boolean fromPreviousSession) { + this.fromPreviousSession = fromPreviousSession; + } + public String toString() { StringBuffer sb = new StringBuffer(); sb.append("AsyncJobVO {id:").append(getId()); diff --git a/server/src/com/cloud/api/ApiDispatcher.java b/server/src/com/cloud/api/ApiDispatcher.java index 0c9bb35a674..45936301551 100644 --- a/server/src/com/cloud/api/ApiDispatcher.java +++ b/server/src/com/cloud/api/ApiDispatcher.java @@ -1,3 +1,20 @@ +/** + * Copyright (C) 2010 Cloud.com, Inc. All rights reserved. + * + * This software is licensed under the GNU General Public License v3 or later. + * + * It is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ package com.cloud.api; import java.lang.reflect.Field; diff --git a/server/src/com/cloud/api/ApiServer.java b/server/src/com/cloud/api/ApiServer.java index a23a9d87677..e220c5394e7 100644 --- a/server/src/com/cloud/api/ApiServer.java +++ b/server/src/com/cloud/api/ApiServer.java @@ -357,6 +357,16 @@ public class ApiServer implements HttpRequestHandler { Gson gson = GsonHelper.getBuilder().create(); + UserContext ctx = UserContext.current(); + Long userId = ctx.getUserId(); + Account account = (Account)ctx.getAccountObject(); + if (userId != null) { + params.put("ctxUserId", userId.toString()); + } + if (account != null) { + params.put("ctxAccountId", account.getId().toString()); + } + AsyncJobVO job = new AsyncJobVO(); job.setUserId(UserContext.current().getUserId()); job.setCmd(cmdObj.getClass().getName()); diff --git a/server/src/com/cloud/api/BaseAsyncCmd.java b/server/src/com/cloud/api/BaseAsyncCmd.java index d8ce6a8db1a..b4aa79eee37 100644 --- a/server/src/com/cloud/api/BaseAsyncCmd.java +++ b/server/src/com/cloud/api/BaseAsyncCmd.java @@ -1,5 +1,7 @@ package com.cloud.api; +import com.cloud.async.AsyncJobManager; +import com.cloud.async.AsyncJobVO; import com.cloud.serializer.SerializerHelper; /** @@ -7,13 +9,24 @@ import com.cloud.serializer.SerializerHelper; * serialized to the queue (currently the async_job table) and a response will be immediately returned with the * id of the queue object. The id can be used to query the status/progress of the command using the * queryAsyncJobResult API command. - * - * TODO: Create commands are often async and yet they need to return the id of object in question, e.g. deployVirtualMachine - * should return a virtual machine id, createPortForwardingServiceRule should return a rule id, and createVolume should return - * a volume id. */ public abstract class BaseAsyncCmd extends BaseCmd { + private AsyncJobManager _asyncJobMgr = null; + private AsyncJobVO _job = null; + public String getResponse(long jobId) { return SerializerHelper.toSerializedString(Long.valueOf(jobId)); } + + public void setAsyncJobManager(AsyncJobManager mgr) { + _asyncJobMgr = mgr; + } + + public void synchronizeCommand(String syncObjType, long syncObjId) { + _asyncJobMgr.syncAsyncJobExecution(_job, syncObjType, syncObjId); + } + + public void setJob(AsyncJobVO job) { + _job = job; + } } diff --git a/server/src/com/cloud/async/AsyncCommandQueued.java b/server/src/com/cloud/async/AsyncCommandQueued.java new file mode 100644 index 00000000000..7894fa63907 --- /dev/null +++ b/server/src/com/cloud/async/AsyncCommandQueued.java @@ -0,0 +1,35 @@ +/** + * Copyright (C) 2010 Cloud.com, Inc. All rights reserved. + * + * This software is licensed under the GNU General Public License v3 or later. + * + * It is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ +package com.cloud.async; + +import com.cloud.utils.SerialVersionUID; + +public class AsyncCommandQueued extends RuntimeException { + private static final long serialVersionUID = SerialVersionUID.AsyncCommandQueued; + + private SyncQueueVO _queue = null; + + public AsyncCommandQueued(SyncQueueVO queue, String msg) { + super(msg); + _queue = queue; + } + + public SyncQueueVO getQueue() { + return _queue; + } +} diff --git a/server/src/com/cloud/async/AsyncJobManager.java b/server/src/com/cloud/async/AsyncJobManager.java index 525b5f703df..a293d764449 100644 --- a/server/src/com/cloud/async/AsyncJobManager.java +++ b/server/src/com/cloud/async/AsyncJobManager.java @@ -33,6 +33,6 @@ public interface AsyncJobManager extends Manager { public void updateAsyncJobStatus(long jobId, int processStatus, Object resultObject); public void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId); - public void syncAsyncJobExecution(long jobId, String syncObjType, long syncObjId); + public void syncAsyncJobExecution(AsyncJobVO job, String syncObjType, long syncObjId); public void releaseSyncSource(AsyncJobExecutor executor); } diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index 9121defe4f3..22ce5de66cd 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -35,13 +35,16 @@ import org.apache.log4j.Logger; import org.apache.log4j.NDC; import com.cloud.api.ApiDispatcher; -import com.cloud.api.BaseCmd; +import com.cloud.api.BaseAsyncCmd; import com.cloud.async.dao.AsyncJobDao; import com.cloud.cluster.ClusterManager; import com.cloud.configuration.dao.ConfigurationDao; import com.cloud.maid.StackMaid; import com.cloud.serializer.GsonHelper; import com.cloud.serializer.SerializerHelper; +import com.cloud.user.Account; +import com.cloud.user.UserContext; +import com.cloud.user.dao.AccountDao; import com.cloud.utils.DateUtil; import com.cloud.utils.NumbersUtil; import com.cloud.utils.component.ComponentLocator; @@ -68,6 +71,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager { private AsyncJobExecutorContext _context; private SyncQueueManager _queueMgr; private ClusterManager _clusterMgr; + private AccountDao _accountDao; private AsyncJobDao _jobDao; private long _jobExpireSeconds = 86400; // 1 day private ApiDispatcher _dispatcher; @@ -95,37 +99,30 @@ public class AsyncJobManagerImpl implements AsyncJobManager { public long submitAsyncJob(AsyncJobVO job) { return submitAsyncJob(job, false); } - + @Override @DB public long submitAsyncJob(AsyncJobVO job, boolean scheduleJobExecutionInContext) { if(s_logger.isDebugEnabled()) s_logger.debug("submit async job-" + job.getId() + ", details: " + job.toString()); -// AsyncJobExecutor executor = getJobExecutor(job); -// if(executor == null) { -// s_logger.error("Unable to find executor to execute command " + job.getCmd() + " for job-" + job.getId()); -// } else { - Transaction txt = Transaction.currentTxn(); - try { - txt.start(); - job.setInitMsid(getMsid()); - _jobDao.persist(job); - txt.commit(); - // no sync source originally - // FIXME: sync source for commands? how does new API framework handle that? -// executor.setSyncSource(null); -// executor.setJob(job); -// scheduleExecution(executor, scheduleJobExecutionInContext); - scheduleExecution(job, scheduleJobExecutionInContext); - return job.getId(); - } catch(Exception e) { - s_logger.error("Unexpected exception: ", e); - txt.rollback(); - } -// } + Transaction txt = Transaction.currentTxn(); + try { + txt.start(); + job.setInitMsid(getMsid()); + _jobDao.persist(job); + txt.commit(); + + // no sync source originally + job.setSyncSource(null); + scheduleExecution(job, scheduleJobExecutionInContext); + return job.getId(); + } catch(Exception e) { + s_logger.error("Unexpected exception: ", e); + txt.rollback(); + } return 0L; } - + @Override @DB public void completeAsyncJob(long jobId, int jobStatus, int resultCode, Object resultObject) { if(s_logger.isDebugEnabled()) @@ -219,10 +216,19 @@ public class AsyncJobManagerImpl implements AsyncJobManager { } @Override - public void syncAsyncJobExecution(long jobId, String syncObjType, long syncObjId) { - if(s_logger.isDebugEnabled()) - s_logger.debug("Sync job-" + jobId + " execution on object " + syncObjType + "." + syncObjId); - + public void syncAsyncJobExecution(AsyncJobVO job, String syncObjType, long syncObjId) { + // This method is re-entrant. If an API developer wants to synchronized on an object, e.g. the router, + // when executing business logic, they will call this method (actually a method in BaseAsyncCmd that calls this). + // This method will get called every time their business logic executes. The first time it exectues for a job + // there will be no sync source, but on subsequent execution there will be a sync souce. If this is the first + // time the job executes we queue the job, otherwise we just return so that the business logic can execute. + if (job.getSyncSource() != null) { + return; + } + + if(s_logger.isDebugEnabled()) + s_logger.debug("Sync job-" + job.getId() + " execution on object " + syncObjType + "." + syncObjId); + SyncQueueVO queue = null; // to deal with temporary DB exceptions like DB deadlock/Lock-wait time out cased rollbacks @@ -230,7 +236,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager { Random random = new Random(); for(int i = 0; i < 5; i++) { - queue = _queueMgr.queue(syncObjType, syncObjId, "AsyncJob", jobId); + queue = _queueMgr.queue(syncObjType, syncObjId, "AsyncJob", job.getId()); if(queue != null) break; @@ -239,12 +245,20 @@ public class AsyncJobManagerImpl implements AsyncJobManager { } catch (InterruptedException e) { } } - - if(queue != null) { - checkQueue(queue.getId()); + + if (queue == null) { + throw new CloudRuntimeException("Unable to insert queue item into database, DB is full?"); } else { - throw new CloudRuntimeException("Unable to insert queue item into database, DB is full?"); + throw new AsyncCommandQueued(queue, "job-" + job.getId() + " queued"); } + + /* + if (queue != null) { + checkQueue(queue.getId()); + } else { + throw new CloudRuntimeException("Unable to insert queue item into database, DB is full?"); + } + */ } @Override @DB @@ -388,7 +402,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager { } private void scheduleExecution(final AsyncJobVO job, boolean executeInContext) { - Runnable runnable = getExecutorRunnable(job); + Runnable runnable = getExecutorRunnable(this, job); if (executeInContext) { runnable.run(); } else { @@ -396,7 +410,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager { } } - private Runnable getExecutorRunnable(final AsyncJobVO job) { + private Runnable getExecutorRunnable(final AsyncJobManager mgr, final AsyncJobVO job) { return new Runnable() { public void run() { long jobId = 0; @@ -411,38 +425,65 @@ public class AsyncJobManagerImpl implements AsyncJobManager { } Class cmdClass = Class.forName(job.getCmd()); - BaseCmd cmdObj = (BaseCmd)cmdClass.newInstance(); + BaseAsyncCmd cmdObj = (BaseAsyncCmd)cmdClass.newInstance(); + cmdObj.setAsyncJobManager(mgr); + cmdObj.setJob(job); + Type mapType = new TypeToken>() {}.getType(); Gson gson = GsonHelper.getBuilder().create(); Map params = gson.fromJson(job.getCmdInfo(), mapType); - // FIXME: whenever we deserialize, the UserContext needs to be updated - //UserContext.registerContext(userId, accountObject, accountName, accountId, domainId, sessionId, apiServer); + // whenever we deserialize, the UserContext needs to be updated + String userIdStr = params.get("ctxUserId"); + String acctIdStr = params.get("ctxAccoutId"); + Long userId = null; + Account accountObject = null; - // FIXME: things might need to be queued as part of synchronization here, so they just have to be re-dispatched from the queue - // mechanism... + if (userIdStr != null) { + userId = Long.parseLong(userIdStr); + } + + if (acctIdStr != null) { + accountObject = _accountDao.findById(Long.parseLong(acctIdStr)); + } + + UserContext.registerContext(userId, accountObject, null, null, null, null, false); + + // dispatch could ultimately queue the job _dispatcher.dispatch(cmdObj, params); // serialize this to the async job table completeAsyncJob(jobId, AsyncJobResult.STATUS_SUCCEEDED, 0, cmdObj.getResponse()); - if(s_logger.isDebugEnabled()) + // FIXME: things might need to be queued as part of synchronization here, so they just have to be re-dispatched from the queue + // mechanism... + if (job.getSyncSource() != null) { + _queueMgr.purgeItem(job.getSyncSource().getId()); + checkQueue(job.getSyncSource().getQueueId()); + } + + if (s_logger.isDebugEnabled()) s_logger.debug("Done executing " + job.getCmd() + " for job-" + jobId); } catch(Throwable e) { - s_logger.error("Unexpected exception while executing " + job.getCmd(), e); - - /* FIXME: need to clean up any queue that happened as part of the dispatching - try { - if(executor.getSyncSource() != null) { - _queueMgr.purgeItem(executor.getSyncSource().getId()); - - checkQueue(executor.getSyncSource().getQueueId()); + if (e instanceof AsyncCommandQueued) { + if (s_logger.isDebugEnabled()) { + s_logger.debug("job " + job.getCmd() + " for job-" + jobId + " was queued, processing the queue."); + } + checkQueue(((AsyncCommandQueued)e).getQueue().getId()); + } else { + s_logger.error("Unexpected exception while executing " + job.getCmd(), e); + + //FIXME: need to clean up any queue that happened as part of the dispatching and move on to the next item in the queue + try { + if (job.getSyncSource() != null) { + _queueMgr.purgeItem(job.getSyncSource().getId()); + checkQueue(job.getSyncSource().getQueueId()); + } + } catch(Throwable ex) { + s_logger.fatal("Exception on exception, log it for record", ex); } - } catch(Throwable ex) { - s_logger.fatal("Exception on exception, log it for record", ex); } - */ } finally { StackMaid.current().exitCleanup(); txn.close(); @@ -488,6 +529,8 @@ public class AsyncJobManagerImpl implements AsyncJobManager { s_logger.debug("Schedule queued job-" + job.getId()); } + job.setFromPreviousSession(fromPreviousSession); + job.setSyncSource(item); scheduleExecution(job); } else { if(s_logger.isDebugEnabled()) @@ -635,7 +678,11 @@ public class AsyncJobManagerImpl implements AsyncJobManager { int expireMinutes = NumbersUtil.parseInt(configs.get("job.expire.minutes"), 24*60); _jobExpireSeconds = (long)expireMinutes*60; - + + _accountDao = locator.getDao(AccountDao.class); + if (_accountDao == null) { + throw new ConfigurationException("Unable to get " + AccountDao.class.getName()); + } _jobDao = locator.getDao(AsyncJobDao.class); if (_jobDao == null) { throw new ConfigurationException("Unable to get " diff --git a/server/src/com/cloud/async/executor/AssignSecurityGroupExecutor.java b/server/src/com/cloud/async/executor/AssignSecurityGroupExecutor.java index 9023f4e1f72..f10a6ad3cee 100644 --- a/server/src/com/cloud/async/executor/AssignSecurityGroupExecutor.java +++ b/server/src/com/cloud/async/executor/AssignSecurityGroupExecutor.java @@ -46,7 +46,7 @@ public class AssignSecurityGroupExecutor extends BaseAsyncJobExecutor { AsyncJobVO job = getJob(); ManagementServer managementServer = asyncMgr.getExecutorContext().getManagementServer(); SecurityGroupParam param = gson.fromJson(job.getCmdInfo(), SecurityGroupParam.class); - + /* if(getSyncSource() == null) { DomainRouterVO router = getRouterSyncSource(param); if(router == null) { @@ -87,7 +87,8 @@ public class AssignSecurityGroupExecutor extends BaseAsyncJobExecutor { asyncMgr.completeAsyncJob(getJob().getId(), AsyncJobResult.STATUS_FAILED, BaseCmd.INTERNAL_ERROR, e.getMessage()); } - } + } + */ return true; } diff --git a/server/src/com/cloud/async/executor/AuthorizeNetworkGroupIngressExecutor.java b/server/src/com/cloud/async/executor/AuthorizeNetworkGroupIngressExecutor.java index e2c9b4bd462..a9b070ca62e 100644 --- a/server/src/com/cloud/async/executor/AuthorizeNetworkGroupIngressExecutor.java +++ b/server/src/com/cloud/async/executor/AuthorizeNetworkGroupIngressExecutor.java @@ -30,6 +30,7 @@ public class AuthorizeNetworkGroupIngressExecutor extends BaseAsyncJobExecutor { NetworkGroupIngressParam param = gson.fromJson(job.getCmdInfo(), NetworkGroupIngressParam.class); AccountVO account = param.getAccount(); + /* if (getSyncSource() == null) { NetworkGroupVO networkGroup = managementServer.findNetworkGroupByName(param.getAccount().getId(), param.getGroupName()); if(networkGroup == null) { @@ -59,6 +60,7 @@ public class AuthorizeNetworkGroupIngressExecutor extends BaseAsyncJobExecutor { "Failed to authorize network group ingress from group: " + param.getGroupName() + " for account: " + account.getAccountName() + " (id: " + account.getId() + ")"); } } + */ return true; } diff --git a/server/src/com/cloud/async/executor/VolumeOperationExecutor.java b/server/src/com/cloud/async/executor/VolumeOperationExecutor.java index 9f32126c568..43425158051 100644 --- a/server/src/com/cloud/async/executor/VolumeOperationExecutor.java +++ b/server/src/com/cloud/async/executor/VolumeOperationExecutor.java @@ -44,7 +44,7 @@ public class VolumeOperationExecutor extends BaseAsyncJobExecutor { Gson gson = GsonHelper.getBuilder().create(); AsyncJobManager asyncMgr = getAsyncJobMgr(); AsyncJobVO job = getJob(); - + /* if (getSyncSource() == null) { VolumeOperationParam param = gson.fromJson(job.getCmdInfo(), VolumeOperationParam.class); asyncMgr.syncAsyncJobExecution(job.getId(), "Volume", param.getVolumeId()); @@ -104,9 +104,10 @@ public class VolumeOperationExecutor extends BaseAsyncJobExecutor { asyncMgr.getExecutorContext().getManagementServer().saveEvent(param.getUserId(), param.getAccountId(), EventVO.LEVEL_ERROR, eventType, failureDescription, null, param.getEventId()); } } - return true; } +*/ + return true; } public void processAnswer(VolumeOperationListener listener, long agentId, long seq, Answer answer) { @@ -138,12 +139,12 @@ public class VolumeOperationExecutor extends BaseAsyncJobExecutor { resultObject.setDiskOfferingName(getAsyncJobMgr().getExecutorContext().getManagementServer().findDiskOfferingById(diskOfferingId).getName()); resultObject.setDiskOfferingDisplayText(getAsyncJobMgr().getExecutorContext().getManagementServer().findDiskOfferingById(diskOfferingId).getDisplayText()); } - resultObject.setDomain(getAsyncJobMgr().getExecutorContext().getManagementServer().findDomainIdById(volume.getDomainId()).getName()); +// resultObject.setDomain(getAsyncJobMgr().getExecutorContext().getManagementServer().findDomainIdById(volume.getDomainId()).getName()); resultObject.setStorageType("shared"); // NOTE: You can never create a local disk volume but if that changes, we need to change this if (volume.getPoolId() != null) resultObject.setStorage(getAsyncJobMgr().getExecutorContext().getManagementServer().findPoolById(volume.getPoolId()).getName()); resultObject.setZoneId(volume.getDataCenterId()); - resultObject.setZoneName(getAsyncJobMgr().getExecutorContext().getManagementServer().getDataCenterBy(volume.getDataCenterId()).getName()); +// resultObject.setZoneName(getAsyncJobMgr().getExecutorContext().getManagementServer().getDataCenterBy(volume.getDataCenterId()).getName()); return resultObject; } diff --git a/server/src/com/cloud/network/NetworkManagerImpl.java b/server/src/com/cloud/network/NetworkManagerImpl.java index 46153b31ce6..242f7f1d8ea 100755 --- a/server/src/com/cloud/network/NetworkManagerImpl.java +++ b/server/src/com/cloud/network/NetworkManagerImpl.java @@ -1933,6 +1933,9 @@ public class NetworkManagerImpl implements NetworkManager, VirtualMachineManager throw new InvalidParameterValueException("Failed to assign to load balancer " + loadBalancerId + ", the load balancer was not found."); } + DomainRouterVO syncObject = _routerDao.findByPublicIpAddress(loadBalancer.getIpAddress()); + cmd.synchronizeCommand("Router", syncObject.getId()); + // Permission check... Account account = (Account)UserContext.current().getAccountObject(); if (account != null) { diff --git a/utils/src/com/cloud/utils/SerialVersionUID.java b/utils/src/com/cloud/utils/SerialVersionUID.java index c80e1219da4..6c26f4891e7 100755 --- a/utils/src/com/cloud/utils/SerialVersionUID.java +++ b/utils/src/com/cloud/utils/SerialVersionUID.java @@ -56,4 +56,5 @@ public interface SerialVersionUID { public static final long InfficientVirtualNetworkCapacityException = Base | 0x1a; public static final long DiscoveryException = Base | 0x1b; public static final long CloudAuthenticationException = Base | 0x1c; + public static final long AsyncCommandQueued = Base | 0x1d; }