CLOUDSTACK-3190: Async jobs actions now trigger event bus messages to be

published

-AsyncJobManagerImpl to publish async job events when async jobs are
created, updated and completed
This commit is contained in:
Ryan Dietrich 2013-07-05 17:43:34 +05:30 committed by Murali Reddy
parent 06c8e5bfd8
commit 70f33efa72
3 changed files with 78 additions and 0 deletions

View File

@ -52,4 +52,5 @@ public class EventCategory {
public static final EventCategory ALERT_EVENT = new EventCategory("AlertEvent"); public static final EventCategory ALERT_EVENT = new EventCategory("AlertEvent");
public static final EventCategory USAGE_EVENT = new EventCategory("UsageEvent"); public static final EventCategory USAGE_EVENT = new EventCategory("UsageEvent");
public static final EventCategory RESOURCE_STATE_CHANGE_EVENT = new EventCategory("ResourceStateEvent"); public static final EventCategory RESOURCE_STATE_CHANGE_EVENT = new EventCategory("ResourceStateEvent");
public static final EventCategory ASYNC_JOB_CHANGE_EVENT = new EventCategory("AsyncJobEvent");
} }

View File

@ -502,6 +502,7 @@ public class ApiServer extends ManagerBase implements HttpRequestHandler, ApiSer
} }
params.put("ctxStartEventId", String.valueOf(startEventId)); params.put("ctxStartEventId", String.valueOf(startEventId));
params.put("cmdEventType", asyncCmd.getEventType().toString());
ctx.setAccountId(asyncCmd.getEntityOwnerId()); ctx.setAccountId(asyncCmd.getEntityOwnerId());

View File

@ -21,6 +21,7 @@ import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
@ -41,14 +42,22 @@ import org.apache.cloudstack.api.BaseAsyncCmd;
import org.apache.cloudstack.api.ServerApiException; import org.apache.cloudstack.api.ServerApiException;
import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd; import org.apache.cloudstack.api.command.user.job.QueryAsyncJobResultCmd;
import org.apache.cloudstack.api.response.ExceptionResponse; import org.apache.cloudstack.api.response.ExceptionResponse;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.EventBusException;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.log4j.NDC; import org.apache.log4j.NDC;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import com.cloud.api.ApiDispatcher; import com.cloud.api.ApiDispatcher;
import com.cloud.api.ApiDBUtils;
import com.cloud.api.ApiGsonHelper; import com.cloud.api.ApiGsonHelper;
import com.cloud.api.ApiSerializerHelper; import com.cloud.api.ApiSerializerHelper;
import com.cloud.async.dao.AsyncJobDao; import com.cloud.async.dao.AsyncJobDao;
import com.cloud.domain.dao.DomainDao;
import com.cloud.domain.Domain;
import com.cloud.domain.DomainVO;
import com.cloud.cluster.ClusterManager; import com.cloud.cluster.ClusterManager;
import com.cloud.cluster.ClusterManagerListener; import com.cloud.cluster.ClusterManagerListener;
import com.cloud.cluster.ManagementServerHostVO; import com.cloud.cluster.ManagementServerHostVO;
@ -56,6 +65,9 @@ import com.cloud.configuration.Config;
import com.cloud.configuration.dao.ConfigurationDao; import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.exception.InvalidParameterValueException; import com.cloud.exception.InvalidParameterValueException;
import com.cloud.exception.PermissionDeniedException; import com.cloud.exception.PermissionDeniedException;
import com.cloud.event.EventCategory;
import com.cloud.event.EventTypes;
import com.cloud.server.ManagementServer;
import com.cloud.user.Account; import com.cloud.user.Account;
import com.cloud.user.AccountManager; import com.cloud.user.AccountManager;
import com.cloud.user.User; import com.cloud.user.User;
@ -94,6 +106,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
@Inject private AccountDao _accountDao; @Inject private AccountDao _accountDao;
@Inject private AsyncJobDao _jobDao; @Inject private AsyncJobDao _jobDao;
@Inject private ConfigurationDao _configDao; @Inject private ConfigurationDao _configDao;
@Inject private DomainDao _domainDao;
private long _jobExpireSeconds = 86400; // 1 day private long _jobExpireSeconds = 86400; // 1 day
private long _jobCancelThresholdSeconds = 3600; // 1 hour (for cancelling the jobs blocking other jobs) private long _jobCancelThresholdSeconds = 3600; // 1 hour (for cancelling the jobs blocking other jobs)
@ -123,6 +136,65 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
return _jobDao.findInstancePendingAsyncJobs(instanceType, accountId); return _jobDao.findInstancePendingAsyncJobs(instanceType, accountId);
} }
private void publishOnEventBus(AsyncJobVO job, String jobEvent) {
EventBus eventBus = null;
try {
eventBus = ComponentContext.getComponent(EventBus.class);
} catch(NoSuchBeanDefinitionException nbe) {
return; // no provider is configured to provide events bus, so just return
}
// Get the event type from the cmdInfo json string
String info = job.getCmdInfo();
String cmdEventType;
if ( info == null ) {
cmdEventType = "unknown";
} else {
String marker = "\"cmdEventType\"";
int begin = info.indexOf(marker);
cmdEventType = info.substring(begin + marker.length() + 2, info.indexOf(",", begin) - 1);
}
// For some reason, the instanceType / instanceId are not abstract, which means we may get null values.
org.apache.cloudstack.framework.events.Event event = new org.apache.cloudstack.framework.events.Event(
ManagementServer.Name,
EventCategory.ASYNC_JOB_CHANGE_EVENT.getName(),
jobEvent,
( job.getInstanceType() != null ? job.getInstanceType().toString() : "unknown" ), null);
User userJobOwner = _accountMgr.getUserIncludingRemoved(job.getUserId());
Account jobOwner = _accountMgr.getAccount(userJobOwner.getAccountId());
Map<String, String> eventDescription = new HashMap<String, String>();
eventDescription.put("command", job.getCmd());
eventDescription.put("user", userJobOwner.getUuid());
eventDescription.put("account", jobOwner.getUuid());
eventDescription.put("processStatus", "" + job.getProcessStatus());
eventDescription.put("resultCode", "" + job.getResultCode());
eventDescription.put("instanceUuid", ApiDBUtils.findJobInstanceUuid(job));
eventDescription.put("instanceType", ( job.getInstanceType() != null ? job.getInstanceType().toString() : "unknown" ) );
eventDescription.put("commandEventType", cmdEventType);
eventDescription.put("jobId", job.getUuid());
// If the event.accountinfo boolean value is set, get the human readable value for the username / domainname
Map<String, String> configs = _configDao.getConfiguration("management-server", new HashMap<String, String>());
if ( Boolean.valueOf(configs.get("event.accountinfo")) ) {
DomainVO domain = _domainDao.findById(jobOwner.getDomainId());
eventDescription.put("username", userJobOwner.getUsername());
eventDescription.put("domainname", domain.getName());
}
event.setDescription(eventDescription);
try {
eventBus.publish(event);
} catch (EventBusException evx) {
String errMsg = "Failed to publish async job event on the the event bus.";
s_logger.warn(errMsg, evx);
throw new CloudRuntimeException(errMsg);
}
}
@Override @Override
public long submitAsyncJob(AsyncJobVO job) { public long submitAsyncJob(AsyncJobVO job) {
return submitAsyncJob(job, false); return submitAsyncJob(job, false);
@ -143,6 +215,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
if(s_logger.isDebugEnabled()) { if(s_logger.isDebugEnabled()) {
s_logger.debug("submit async job-" + job.getId() + ", details: " + job.toString()); s_logger.debug("submit async job-" + job.getId() + ", details: " + job.toString());
} }
publishOnEventBus(job, "submit");
return job.getId(); return job.getId();
} catch(Exception e) { } catch(Exception e) {
txt.rollback(); txt.rollback();
@ -177,6 +250,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
job.setStatus(jobStatus); job.setStatus(jobStatus);
job.setResultCode(resultCode); job.setResultCode(resultCode);
publishOnEventBus(job, "complete"); // publish before the instance type and ID are wiped out
// reset attached object // reset attached object
job.setInstanceType(null); job.setInstanceType(null);
job.setInstanceId(null); job.setInstanceId(null);
@ -220,6 +295,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
} }
job.setLastUpdated(DateUtil.currentGMTTime()); job.setLastUpdated(DateUtil.currentGMTTime());
_jobDao.update(jobId, job); _jobDao.update(jobId, job);
publishOnEventBus(job, "update");
txt.commit(); txt.commit();
} catch(Exception e) { } catch(Exception e) {
s_logger.error("Unexpected exception while updating async job-" + jobId + " status: ", e); s_logger.error("Unexpected exception while updating async job-" + jobId + " status: ", e);