mirror of
https://github.com/apache/cloudstack.git
synced 2025-11-02 20:02:29 +01:00
Fix serializing async job responses by writing a custom adapter for ResponseObjects. Improve exception handling when calling async commands (the job still needs to complete, but with FAILED status).
This commit is contained in:
parent
2a4ddac41a
commit
62257d4021
110
core/src/com/cloud/api/ResponseObjectTypeAdapter.java
Normal file
110
core/src/com/cloud/api/ResponseObjectTypeAdapter.java
Normal file
@ -0,0 +1,110 @@
|
||||
package com.cloud.api;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Modifier;
|
||||
import java.lang.reflect.Type;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import com.google.gson.GsonBuilder;
|
||||
import com.google.gson.JsonElement;
|
||||
import com.google.gson.JsonObject;
|
||||
import com.google.gson.JsonSerializationContext;
|
||||
import com.google.gson.JsonSerializer;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
|
||||
public class ResponseObjectTypeAdapter implements JsonSerializer<ResponseObject> {
|
||||
private static final Logger s_logger = Logger.getLogger(ResponseObjectTypeAdapter.class.getName());
|
||||
private static final GsonBuilder s_gBuilder;
|
||||
static {
|
||||
s_gBuilder = new GsonBuilder().excludeFieldsWithModifiers(Modifier.TRANSIENT);
|
||||
s_gBuilder.registerTypeAdapter(ResponseObject.class, new ResponseObjectTypeAdapter());
|
||||
}
|
||||
|
||||
@Override
|
||||
public JsonElement serialize(ResponseObject responseObj, Type typeOfResponseObj, JsonSerializationContext ctx) {
|
||||
JsonObject obj = new JsonObject();
|
||||
|
||||
// Get the declared fields from the response obj, create a new JSON Object, add props to it.
|
||||
// Once that object is done, create a new JSON Object with the response name and the JSON Obj as the name/value pair. Return that as the serialized element.
|
||||
Field[] fields = responseObj.getClass().getDeclaredFields();
|
||||
for (Field field : fields) {
|
||||
if ((field.getModifiers() & Modifier.TRANSIENT) != 0) {
|
||||
continue; // skip transient fields
|
||||
}
|
||||
|
||||
SerializedName serializedName = field.getAnnotation(SerializedName.class);
|
||||
if (serializedName == null) {
|
||||
continue; // skip fields w/o serialized name
|
||||
}
|
||||
|
||||
String propName = field.getName();
|
||||
Method method = getGetMethod(responseObj, propName);
|
||||
if (method != null) {
|
||||
try {
|
||||
Object fieldValue = method.invoke(responseObj);
|
||||
if (fieldValue != null) {
|
||||
if (fieldValue instanceof ResponseObject) {
|
||||
ResponseObject subObj = (ResponseObject)fieldValue;
|
||||
obj.add(serializedName.value(), serialize(subObj, subObj.getClass(), ctx));
|
||||
} else {
|
||||
obj.addProperty(serializedName.value(), fieldValue.toString());
|
||||
}
|
||||
}
|
||||
} catch (IllegalArgumentException e) {
|
||||
s_logger.error("Illegal argument exception when calling ResponseObject " + responseObj.getClass().getName() + " get method for property: " + propName);
|
||||
} catch (IllegalAccessException e) {
|
||||
s_logger.error("Illegal access exception when calling ResponseObject " + responseObj.getClass().getName() + " get method for property: " + propName);
|
||||
} catch (InvocationTargetException e) {
|
||||
s_logger.error("Invocation target exception when calling ResponseObject " + responseObj.getClass().getName() + " get method for property: " + propName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
JsonObject response = new JsonObject();
|
||||
response.add(responseObj.getResponseName(), obj);
|
||||
return response;
|
||||
}
|
||||
|
||||
private static Method getGetMethod(Object o, String propName) {
|
||||
Method method = null;
|
||||
String methodName = getGetMethodName("get", propName);
|
||||
try {
|
||||
method = o.getClass().getMethod(methodName);
|
||||
} catch (SecurityException e1) {
|
||||
s_logger.error("Security exception in getting ResponseObject " + o.getClass().getName() + " get method for property: " + propName);
|
||||
} catch (NoSuchMethodException e1) {
|
||||
if (s_logger.isTraceEnabled()) {
|
||||
s_logger.trace("ResponseObject " + o.getClass().getName() + " does not have " + methodName + "() method for property: " + propName + ", will check is-prefixed method to see if it is boolean property");
|
||||
}
|
||||
}
|
||||
|
||||
if( method != null)
|
||||
return method;
|
||||
|
||||
methodName = getGetMethodName("is", propName);
|
||||
try {
|
||||
method = o.getClass().getMethod(methodName);
|
||||
} catch (SecurityException e1) {
|
||||
s_logger.error("Security exception in getting ResponseObject " + o.getClass().getName() + " get method for property: " + propName);
|
||||
} catch (NoSuchMethodException e1) {
|
||||
s_logger.warn("ResponseObject " + o.getClass().getName() + " does not have " + methodName + "() method for property: " + propName);
|
||||
}
|
||||
return method;
|
||||
}
|
||||
|
||||
private static String getGetMethodName(String prefix, String fieldName) {
|
||||
StringBuffer sb = new StringBuffer(prefix);
|
||||
|
||||
if(fieldName.length() >= prefix.length() && fieldName.substring(0, prefix.length()).equals(prefix)) {
|
||||
return fieldName;
|
||||
} else {
|
||||
sb.append(fieldName.substring(0, 1).toUpperCase());
|
||||
sb.append(fieldName.substring(1));
|
||||
}
|
||||
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
@ -18,16 +18,18 @@
|
||||
|
||||
package com.cloud.serializer;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.List;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.List;
|
||||
|
||||
import com.cloud.agent.api.Answer;
|
||||
import com.cloud.agent.api.Command;
|
||||
import com.cloud.agent.transport.ArrayTypeAdaptor;
|
||||
import com.cloud.agent.transport.VolListTypeAdaptor;
|
||||
import com.cloud.api.ResponseObject;
|
||||
import com.cloud.api.ResponseObjectTypeAdapter;
|
||||
import com.cloud.storage.VolumeVO;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
|
||||
public class GsonHelper {
|
||||
private static final GsonBuilder s_gBuilder;
|
||||
@ -37,7 +39,8 @@ public class GsonHelper {
|
||||
s_gBuilder.registerTypeAdapter(Command[].class, new ArrayTypeAdaptor<Command>());
|
||||
s_gBuilder.registerTypeAdapter(Answer[].class, new ArrayTypeAdaptor<Answer>());
|
||||
Type listType = new TypeToken<List<VolumeVO>>() {}.getType();
|
||||
s_gBuilder.registerTypeAdapter(listType, new VolListTypeAdaptor());
|
||||
s_gBuilder.registerTypeAdapter(listType, new VolListTypeAdaptor());
|
||||
s_gBuilder.registerTypeAdapter(ResponseObject.class, new ResponseObjectTypeAdapter());
|
||||
}
|
||||
|
||||
public static GsonBuilder getBuilder() {
|
||||
|
||||
@ -28,6 +28,7 @@ import java.util.List;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import com.cloud.api.ResponseObject;
|
||||
import com.cloud.utils.DateUtil;
|
||||
import com.cloud.utils.Pair;
|
||||
import com.google.gson.Gson;
|
||||
@ -42,8 +43,12 @@ public class SerializerHelper {
|
||||
if(result != null) {
|
||||
Class<?> clz = result.getClass();
|
||||
Gson gson = GsonHelper.getBuilder().create();
|
||||
|
||||
return clz.getName() + "/" + gson.toJson(result);
|
||||
|
||||
if (result instanceof ResponseObject) {
|
||||
return clz.getName() + "/" + ((ResponseObject)result).getResponseName() + "/" + gson.toJson(result);
|
||||
} else {
|
||||
return clz.getName() + "/" + gson.toJson(result);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
@ -51,12 +56,27 @@ public class SerializerHelper {
|
||||
public static Object fromSerializedString(String result) {
|
||||
try {
|
||||
if(result != null && !result.isEmpty()) {
|
||||
int seperatorPos = result.indexOf('/');
|
||||
if(seperatorPos < 0)
|
||||
return null;
|
||||
|
||||
String clzName = result.substring(0, seperatorPos);
|
||||
String content = result.substring(seperatorPos + 1);
|
||||
String[] serializedParts = result.split("/");
|
||||
// int seperatorPos = result.indexOf('/');
|
||||
// if(seperatorPos < 0)
|
||||
// return null;
|
||||
|
||||
if (serializedParts.length < 2) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// String clzName = result.substring(0, seperatorPos);
|
||||
// String content = result.substring(seperatorPos + 1);
|
||||
String clzName = serializedParts[0];
|
||||
String nameField = null;
|
||||
String content = null;
|
||||
if (serializedParts.length == 2) {
|
||||
content = serializedParts[1];
|
||||
} else {
|
||||
nameField = serializedParts[1];
|
||||
content = serializedParts[2];
|
||||
}
|
||||
|
||||
Class<?> clz;
|
||||
try {
|
||||
clz = Class.forName(clzName);
|
||||
@ -65,7 +85,11 @@ public class SerializerHelper {
|
||||
}
|
||||
|
||||
Gson gson = GsonHelper.getBuilder().create();
|
||||
return gson.fromJson(content, clz);
|
||||
Object obj = gson.fromJson(content, clz);
|
||||
if (nameField != null) {
|
||||
((ResponseObject)obj).setResponseName(nameField);
|
||||
}
|
||||
return obj;
|
||||
}
|
||||
return null;
|
||||
} catch(RuntimeException e) {
|
||||
|
||||
@ -33,6 +33,8 @@ import com.cloud.agent.manager.AgentManager;
|
||||
import com.cloud.api.BaseCmd.CommandType;
|
||||
import com.cloud.configuration.ConfigurationManager;
|
||||
import com.cloud.consoleproxy.ConsoleProxyManager;
|
||||
import com.cloud.exception.InvalidParameterValueException;
|
||||
import com.cloud.exception.PermissionDeniedException;
|
||||
import com.cloud.network.NetworkManager;
|
||||
import com.cloud.network.security.NetworkGroupManager;
|
||||
import com.cloud.server.ManagementServer;
|
||||
@ -89,6 +91,10 @@ public class ApiDispatcher {
|
||||
setupParameters(cmd, params);
|
||||
|
||||
Implementation impl = cmd.getClass().getAnnotation(Implementation.class);
|
||||
if (impl == null) {
|
||||
throw new ServerApiException(BaseCmd.INTERNAL_ERROR, "Unable to execute create command " + cmd.getClass().getName() + ", no implementation specified.");
|
||||
}
|
||||
|
||||
String methodName = impl.createMethod();
|
||||
Object mgr = _mgmtServer;
|
||||
switch (impl.manager()) {
|
||||
@ -134,16 +140,25 @@ public class ApiDispatcher {
|
||||
return (Long)id;
|
||||
} catch (NoSuchMethodException nsme) {
|
||||
s_logger.warn("Exception executing method " + methodName + " for command " + cmd.getClass().getSimpleName(), nsme);
|
||||
throw new CloudRuntimeException("Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", unable to find implementation.");
|
||||
throw new ServerApiException(BaseCmd.INTERNAL_ERROR, "Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", unable to find implementation.");
|
||||
} catch (InvocationTargetException ite) {
|
||||
Throwable cause = ite.getCause();
|
||||
if (cause instanceof InvalidParameterValueException) {
|
||||
throw new ServerApiException(BaseCmd.PARAM_ERROR, cause.getMessage());
|
||||
} else if (cause instanceof PermissionDeniedException) {
|
||||
throw new ServerApiException(BaseCmd.ACCOUNT_ERROR, cause.getMessage());
|
||||
}
|
||||
s_logger.warn("Exception executing method " + methodName + " for command " + cmd.getClass().getSimpleName(), ite);
|
||||
throw new CloudRuntimeException("Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", internal error in the implementation.");
|
||||
throw new ServerApiException(BaseCmd.INTERNAL_ERROR, "Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", internal error in the implementation.");
|
||||
} catch (IllegalAccessException iae) {
|
||||
s_logger.warn("Exception executing method " + methodName + " for command " + cmd.getClass().getSimpleName(), iae);
|
||||
throw new CloudRuntimeException("Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", internal error in the implementation.");
|
||||
throw new ServerApiException(BaseCmd.INTERNAL_ERROR, "Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", internal error in the implementation.");
|
||||
} catch (IllegalArgumentException iArgEx) {
|
||||
s_logger.warn("Exception executing method " + methodName + " for command " + cmd.getClass().getSimpleName(), iArgEx);
|
||||
throw new CloudRuntimeException("Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", internal error in the implementation.");
|
||||
throw new ServerApiException(BaseCmd.INTERNAL_ERROR, "Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", internal error in the implementation.");
|
||||
} catch (Exception ex) {
|
||||
s_logger.error("Unhandled exception invoking method " + methodName + " for command " + cmd.getClass().getSimpleName(), ex);
|
||||
throw new ServerApiException(BaseCmd.INTERNAL_ERROR, "Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", internal error in the implementation.");
|
||||
}
|
||||
}
|
||||
|
||||
@ -152,7 +167,7 @@ public class ApiDispatcher {
|
||||
|
||||
Implementation impl = cmd.getClass().getAnnotation(Implementation.class);
|
||||
if (impl == null) {
|
||||
throw new CloudRuntimeException("Unable to execute command " + cmd.getClass().getName() + ", no implementation specified.");
|
||||
throw new ServerApiException(BaseCmd.INTERNAL_ERROR, "Unable to execute command " + cmd.getClass().getName() + ", no implementation specified.");
|
||||
}
|
||||
|
||||
String methodName = impl.method();
|
||||
@ -196,16 +211,25 @@ public class ApiDispatcher {
|
||||
cmd.setResponseObject(result);
|
||||
} catch (NoSuchMethodException nsme) {
|
||||
s_logger.warn("Exception executing method " + methodName + " for command " + cmd.getClass().getSimpleName(), nsme);
|
||||
throw new CloudRuntimeException("Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", unable to find implementation.");
|
||||
throw new ServerApiException(BaseCmd.INTERNAL_ERROR, "Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", unable to find implementation.");
|
||||
} catch (InvocationTargetException ite) {
|
||||
s_logger.warn("Exception executing method " + methodName + " for command " + cmd.getClass().getSimpleName(), ite);
|
||||
throw new CloudRuntimeException("Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", internal error in the implementation.");
|
||||
Throwable cause = ite.getCause();
|
||||
if (cause instanceof InvalidParameterValueException) {
|
||||
throw new ServerApiException(BaseCmd.PARAM_ERROR, cause.getMessage());
|
||||
} else if (cause instanceof PermissionDeniedException) {
|
||||
throw new ServerApiException(BaseCmd.ACCOUNT_ERROR, cause.getMessage());
|
||||
}
|
||||
throw new ServerApiException(BaseCmd.INTERNAL_ERROR, "Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", internal error in the implementation.");
|
||||
} catch (IllegalAccessException iae) {
|
||||
s_logger.warn("Exception executing method " + methodName + " for command " + cmd.getClass().getSimpleName(), iae);
|
||||
throw new CloudRuntimeException("Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", internal error in the implementation.");
|
||||
throw new ServerApiException(BaseCmd.INTERNAL_ERROR, "Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", internal error in the implementation.");
|
||||
} catch (IllegalArgumentException iArgEx) {
|
||||
s_logger.warn("Exception executing method " + methodName + " for command " + cmd.getClass().getSimpleName(), iArgEx);
|
||||
throw new CloudRuntimeException("Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", internal error in the implementation.");
|
||||
throw new ServerApiException(BaseCmd.INTERNAL_ERROR, "Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", internal error in the implementation.");
|
||||
} catch (Exception ex) {
|
||||
s_logger.error("Unhandled exception invoking method " + methodName + " for command " + cmd.getClass().getSimpleName(), ex);
|
||||
throw new ServerApiException(BaseCmd.INTERNAL_ERROR, "Unable to execute method " + methodName + " for command " + cmd.getClass().getSimpleName() + ", internal error in the implementation.");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -325,11 +325,6 @@ public class ApiServer implements HttpRequestHandler {
|
||||
|
||||
// This is where the command is either serialized, or directly dispatched
|
||||
response = queueCommand(cmdObj, paramMap);
|
||||
|
||||
// Map<String, Object> validatedParams = cmdObj.validateParams(paramMap, decode);
|
||||
|
||||
// List<Pair<String, Object>> resultValues = cmdObj.execute(validatedParams);
|
||||
// response = cmdObj.buildResponse(resultValues, responseType);
|
||||
} else {
|
||||
s_logger.warn("unknown API command: " + ((command == null) ? "null" : command[0]));
|
||||
response = buildErrorResponse("unknown API command: " + ((command == null) ? "null" : command[0]), responseType);
|
||||
|
||||
@ -28,6 +28,7 @@ import com.cloud.api.ResponseObject;
|
||||
import com.cloud.api.response.AsyncJobResponse;
|
||||
import com.cloud.api.response.ListResponse;
|
||||
import com.cloud.async.AsyncJobVO;
|
||||
import com.cloud.serializer.SerializerHelper;
|
||||
|
||||
@Implementation(method="searchForAsyncJobs")
|
||||
public class ListAsyncJobsCmd extends BaseListCmd {
|
||||
@ -87,7 +88,7 @@ public class ListAsyncJobsCmd extends BaseListCmd {
|
||||
jobResponse.setJobInstanceId(job.getInstanceId());
|
||||
jobResponse.setJobInstanceType(job.getInstanceType());
|
||||
jobResponse.setJobProcStatus(job.getProcessStatus());
|
||||
jobResponse.setJobResult(job.getResult());
|
||||
jobResponse.setJobResult((ResponseObject)SerializerHelper.fromSerializedString(job.getResult()));
|
||||
jobResponse.setJobResultCode(job.getResultCode());
|
||||
jobResponse.setJobStatus(job.getStatus());
|
||||
jobResponse.setUserId(job.getUserId());
|
||||
|
||||
@ -28,6 +28,7 @@ import com.cloud.api.Parameter;
|
||||
import com.cloud.api.ResponseObject;
|
||||
import com.cloud.api.response.AsyncJobResponse;
|
||||
import com.cloud.async.AsyncJobResult;
|
||||
import com.cloud.serializer.SerializerHelper;
|
||||
|
||||
@Implementation(method="queryAsyncJobResult")
|
||||
public class QueryAsyncJobResultCmd extends BaseCmd {
|
||||
@ -68,7 +69,7 @@ public class QueryAsyncJobResultCmd extends BaseCmd {
|
||||
response.setJobStatus(result.getJobStatus());
|
||||
response.setJobProcStatus(result.getProcessStatus());
|
||||
response.setJobResultCode(result.getResultCode());
|
||||
response.setJobResult(result.getResult());
|
||||
response.setJobResult((ResponseObject)SerializerHelper.fromSerializedString(result.getResult()));
|
||||
|
||||
Object resultObject = result.getResultObject();
|
||||
if (resultObject != null) {
|
||||
|
||||
@ -4,14 +4,14 @@ import java.lang.reflect.Modifier;
|
||||
import java.util.List;
|
||||
|
||||
import com.cloud.api.ResponseObject;
|
||||
import com.cloud.serializer.GsonHelper;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
|
||||
public class ApiResponseSerializer {
|
||||
// FIXME: what about XML response?
|
||||
public static String toSerializedString(ResponseObject result) {
|
||||
if (result != null) {
|
||||
Gson gson = new GsonBuilder().excludeFieldsWithModifiers(Modifier.TRANSIENT).create();
|
||||
Gson gson = GsonHelper.getBuilder().excludeFieldsWithModifiers(Modifier.TRANSIENT).create();
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
||||
sb.append("{ \"" + result.getResponseName() + "\" : ");
|
||||
|
||||
@ -19,6 +19,7 @@ package com.cloud.api.response;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
import com.cloud.api.ResponseObject;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
|
||||
public class AsyncJobResponse extends BaseResponse {
|
||||
@ -47,7 +48,7 @@ public class AsyncJobResponse extends BaseResponse {
|
||||
private String jobResultType;
|
||||
|
||||
@SerializedName("jobresult")
|
||||
private String jobResult;
|
||||
private ResponseObject jobResult;
|
||||
|
||||
@SerializedName("jobinstancetype")
|
||||
private String jobInstanceType;
|
||||
@ -122,11 +123,11 @@ public class AsyncJobResponse extends BaseResponse {
|
||||
this.jobResultType = jobResultType;
|
||||
}
|
||||
|
||||
public String getJobResult() {
|
||||
public ResponseObject getJobResult() {
|
||||
return jobResult;
|
||||
}
|
||||
|
||||
public void setJobResult(String jobResult) {
|
||||
public void setJobResult(ResponseObject jobResult) {
|
||||
this.jobResult = jobResult;
|
||||
}
|
||||
|
||||
|
||||
@ -36,6 +36,8 @@ import org.apache.log4j.NDC;
|
||||
|
||||
import com.cloud.api.ApiDispatcher;
|
||||
import com.cloud.api.BaseAsyncCmd;
|
||||
import com.cloud.api.BaseCmd;
|
||||
import com.cloud.api.ServerApiException;
|
||||
import com.cloud.async.dao.AsyncJobDao;
|
||||
import com.cloud.cluster.ClusterManager;
|
||||
import com.cloud.configuration.dao.ConfigurationDao;
|
||||
@ -152,7 +154,6 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
|
||||
|
||||
if (resultObject != null) {
|
||||
job.setResult(SerializerHelper.toSerializedStringOld(resultObject));
|
||||
// job.setResult((String)resultObject);
|
||||
}
|
||||
|
||||
job.setLastUpdated(DateUtil.currentGMTTime());
|
||||
@ -253,14 +254,6 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
|
||||
} else {
|
||||
throw new AsyncCommandQueued(queue, "job-" + job.getId() + " queued");
|
||||
}
|
||||
|
||||
/*
|
||||
if (queue != null) {
|
||||
checkQueue(queue.getId());
|
||||
} else {
|
||||
throw new CloudRuntimeException("Unable to insert queue item into database, DB is full?");
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
@Override @DB
|
||||
@ -310,94 +303,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
|
||||
s_logger.trace("Job status: " + jobResult.toString());
|
||||
|
||||
return jobResult;
|
||||
}
|
||||
|
||||
/* old code...remove for new API framework
|
||||
private AsyncJobExecutor getJobExecutor(AsyncJobVO job) {
|
||||
String executorClazzName = "com.cloud.async.executor." + job.getCmd() + "Executor";
|
||||
|
||||
try {
|
||||
Class<?> consoleProxyClazz = Class.forName(executorClazzName);
|
||||
|
||||
AsyncJobExecutor executor = (AsyncJobExecutor)ComponentLocator.inject(consoleProxyClazz);
|
||||
executor.setJob(job);
|
||||
executor.setAsyncJobMgr(this);
|
||||
return executor;
|
||||
} catch (final ClassNotFoundException e) {
|
||||
s_logger.error("Unable to load async-job executor class: " + executorClazzName, e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// old code...remove for new API framework
|
||||
private void scheduleExecution(final AsyncJobExecutor executor) {
|
||||
scheduleExecution(executor, false);
|
||||
}
|
||||
|
||||
// old code...remove for new API framework
|
||||
private void scheduleExecution(final AsyncJobExecutor executor, boolean executeInContext) {
|
||||
Runnable runnable = getExecutorRunnable(executor);
|
||||
if(executeInContext)
|
||||
runnable.run();
|
||||
else
|
||||
_executor.submit(runnable);
|
||||
}
|
||||
|
||||
// old code...remove for new API framework
|
||||
private Runnable getExecutorRunnable(final AsyncJobExecutor executor) {
|
||||
return new Runnable() {
|
||||
public void run() {
|
||||
long jobId = 0;
|
||||
BaseAsyncJobExecutor.setCurrentExecutor(executor);
|
||||
|
||||
Transaction txn = Transaction.open(Transaction.CLOUD_DB);
|
||||
try {
|
||||
jobId = executor.getJob().getId();
|
||||
NDC.push("job-" + jobId);
|
||||
|
||||
if(s_logger.isDebugEnabled())
|
||||
s_logger.debug("Executing " + executor.getClass().getName() + " for job-" + jobId);
|
||||
|
||||
if(executor.execute()) {
|
||||
if(s_logger.isTraceEnabled())
|
||||
s_logger.trace("Executing " + executor.getClass().getName() + " returns true for job-" + jobId);
|
||||
|
||||
if(executor.getSyncSource() != null) {
|
||||
_queueMgr.purgeItem(executor.getSyncSource().getId());
|
||||
checkQueue(executor.getSyncSource().getQueueId());
|
||||
}
|
||||
} else {
|
||||
if(s_logger.isTraceEnabled())
|
||||
s_logger.trace("Executing " + executor.getClass().getName() + " returns false for job-" + jobId);
|
||||
}
|
||||
|
||||
if(s_logger.isDebugEnabled())
|
||||
s_logger.debug("Done executing " + executor.getClass().getName() + " for job-" + jobId);
|
||||
|
||||
} catch(Throwable e) {
|
||||
s_logger.error("Unexpected exception while executing " + executor.getClass().getName(), e);
|
||||
|
||||
try {
|
||||
if(executor.getSyncSource() != null) {
|
||||
_queueMgr.purgeItem(executor.getSyncSource().getId());
|
||||
|
||||
checkQueue(executor.getSyncSource().getQueueId());
|
||||
}
|
||||
} catch(Throwable ex) {
|
||||
s_logger.fatal("Exception on exception, log it for record", ex);
|
||||
}
|
||||
} finally {
|
||||
StackMaid.current().exitCleanup();
|
||||
txn.close();
|
||||
NDC.pop();
|
||||
}
|
||||
|
||||
// leave no trace out after execution for security reason
|
||||
BaseAsyncJobExecutor.setCurrentExecutor(null);
|
||||
}
|
||||
};
|
||||
}
|
||||
*/
|
||||
|
||||
private void scheduleExecution(final AsyncJobVO job) {
|
||||
scheduleExecution(job, false);
|
||||
@ -457,8 +363,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
|
||||
// serialize this to the async job table
|
||||
completeAsyncJob(jobId, AsyncJobResult.STATUS_SUCCEEDED, 0, cmdObj.getResponse());
|
||||
|
||||
// FIXME: things might need to be queued as part of synchronization here, so they just have to be re-dispatched from the queue
|
||||
// mechanism...
|
||||
// commands might need to be queued as part of synchronization here, so they just have to be re-dispatched from the queue mechanism...
|
||||
if (job.getSyncSource() != null) {
|
||||
_queueMgr.purgeItem(job.getSyncSource().getId());
|
||||
checkQueue(job.getSyncSource().getQueueId());
|
||||
@ -474,9 +379,15 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
|
||||
}
|
||||
checkQueue(((AsyncCommandQueued)e).getQueue().getId());
|
||||
} else {
|
||||
s_logger.error("Unexpected exception while executing " + job.getCmd(), e);
|
||||
if (!(e instanceof ServerApiException)) {
|
||||
s_logger.error("Unexpected exception while executing " + job.getCmd(), e);
|
||||
}
|
||||
|
||||
//FIXME: need to clean up any queue that happened as part of the dispatching and move on to the next item in the queue
|
||||
// FIXME: setting resultCode to BaseCmd.INTERNAL_ERROR is not right, usually executors have their exception handling
|
||||
// and we need to preserve that as much as possible here
|
||||
completeAsyncJob(jobId, AsyncJobResult.STATUS_FAILED, BaseCmd.INTERNAL_ERROR, e.getMessage());
|
||||
|
||||
// need to clean up any queue that happened as part of the dispatching and move on to the next item in the queue
|
||||
try {
|
||||
if (job.getSyncSource() != null) {
|
||||
_queueMgr.purgeItem(job.getSyncSource().getId());
|
||||
@ -491,38 +402,9 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
|
||||
txn.close();
|
||||
NDC.pop();
|
||||
}
|
||||
|
||||
// leave no trace out after execution for security reason
|
||||
// BaseAsyncJobExecutor.setCurrentExecutor(null);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/* Old method...remove as part of API refactoring...
|
||||
private void executeQueueItem(SyncQueueItemVO item, boolean fromPreviousSession) {
|
||||
AsyncJobVO job = _jobDao.findById(item.getContentId());
|
||||
if(job != null) {
|
||||
AsyncJobExecutor executor = getJobExecutor(job);
|
||||
if(executor == null) {
|
||||
s_logger.error("Unable to find job exectutor for job-" + job.getId());
|
||||
_queueMgr.purgeItem(item.getId());
|
||||
} else {
|
||||
if(s_logger.isDebugEnabled())
|
||||
s_logger.debug("Schedule queued job-" + job.getId());
|
||||
|
||||
executor.setFromPreviousSession(fromPreviousSession);
|
||||
executor.setSyncSource(item);
|
||||
executor.setJob(job);
|
||||
scheduleExecution(executor);
|
||||
}
|
||||
} else {
|
||||
if(s_logger.isDebugEnabled())
|
||||
s_logger.debug("Unable to find related job for queue item: " + item.toString());
|
||||
|
||||
_queueMgr.purgeItem(item.getId());
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
private void executeQueueItem(SyncQueueItemVO item, boolean fromPreviousSession) {
|
||||
AsyncJobVO job = _jobDao.findById(item.getContentId());
|
||||
@ -738,4 +620,3 @@ public class AsyncJobManagerImpl implements AsyncJobManager {
|
||||
return _name;
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user