mirror of
https://github.com/apache/cloudstack.git
synced 2025-10-26 08:42:29 +01:00
more gson crap
This commit is contained in:
parent
f8b859230b
commit
2b650acc17
@ -25,15 +25,14 @@ import java.util.List;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import com.cloud.agent.api.Answer;
|
||||
import com.cloud.agent.api.Command;
|
||||
import com.cloud.agent.api.SecStorageFirewallCfgCommand.PortConfig;
|
||||
import com.cloud.exception.UnsupportedVersionException;
|
||||
import com.cloud.serializer.GsonHelper;
|
||||
import com.cloud.utils.NumbersUtil;
|
||||
import com.cloud.utils.Pair;
|
||||
import com.cloud.utils.exception.CloudRuntimeException;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import com.google.gson.JsonArray;
|
||||
import com.google.gson.JsonDeserializationContext;
|
||||
import com.google.gson.JsonDeserializer;
|
||||
@ -42,7 +41,6 @@ import com.google.gson.JsonNull;
|
||||
import com.google.gson.JsonParseException;
|
||||
import com.google.gson.JsonSerializationContext;
|
||||
import com.google.gson.JsonSerializer;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
|
||||
/**
|
||||
* Request is a simple wrapper around command and answer to add sequencing,
|
||||
@ -64,6 +62,9 @@ import com.google.gson.reflect.TypeToken;
|
||||
public class Request {
|
||||
private static final Logger s_logger = Logger.getLogger(Request.class);
|
||||
|
||||
protected static final Gson s_gson = GsonHelper.getGson();
|
||||
protected static final Gson s_gogger = GsonHelper.getGsonLogger();
|
||||
|
||||
public enum Version {
|
||||
v1, // using gson to marshall
|
||||
v2, // now using gson as marshalled.
|
||||
@ -86,33 +87,6 @@ public class Request {
|
||||
protected static final short FLAG_FROM_SERVER = 0x20;
|
||||
protected static final short FLAG_CONTROL = 0x40;
|
||||
|
||||
protected static final Gson s_gson;
|
||||
protected static final Gson s_gogger;
|
||||
|
||||
static {
|
||||
GsonBuilder gsonBuilder = new GsonBuilder();
|
||||
s_gson = setDefaultGsonConfig(gsonBuilder);
|
||||
GsonBuilder loggerBuilder = new GsonBuilder();
|
||||
loggerBuilder.disableHtmlEscaping();
|
||||
loggerBuilder.setExclusionStrategies(new LoggingExclusionStrategy(s_logger));
|
||||
s_gogger = setDefaultGsonConfig(loggerBuilder);
|
||||
s_logger.info("Default Builder inited.");
|
||||
}
|
||||
|
||||
public static Gson setDefaultGsonConfig(GsonBuilder builder) {
|
||||
ArrayTypeAdaptor<Command> cmdAdaptor = new ArrayTypeAdaptor<Command>();
|
||||
builder.registerTypeAdapter(Command[].class, cmdAdaptor);
|
||||
ArrayTypeAdaptor<Answer> ansAdaptor = new ArrayTypeAdaptor<Answer>();
|
||||
builder.registerTypeAdapter(Answer[].class, ansAdaptor);
|
||||
builder.registerTypeAdapter(new TypeToken<List<PortConfig>>() {
|
||||
}.getType(), new PortConfigListTypeAdaptor());
|
||||
builder.registerTypeAdapter(new TypeToken<Pair<Long, Long>>() {
|
||||
}.getType(), new NwGroupsCommandTypeAdaptor());
|
||||
Gson gson = builder.create();
|
||||
cmdAdaptor.initGson(gson);
|
||||
ansAdaptor.initGson(gson);
|
||||
return gson;
|
||||
}
|
||||
|
||||
protected Version _ver;
|
||||
protected long _session;
|
||||
|
||||
@ -18,21 +18,61 @@
|
||||
|
||||
package com.cloud.serializer;
|
||||
|
||||
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import com.cloud.agent.api.Answer;
|
||||
import com.cloud.agent.api.Command;
|
||||
import com.cloud.agent.api.SecStorageFirewallCfgCommand.PortConfig;
|
||||
import com.cloud.agent.transport.ArrayTypeAdaptor;
|
||||
import com.cloud.agent.transport.LoggingExclusionStrategy;
|
||||
import com.cloud.agent.transport.Request.NwGroupsCommandTypeAdaptor;
|
||||
import com.cloud.agent.transport.Request.PortConfigListTypeAdaptor;
|
||||
import com.cloud.utils.Pair;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
|
||||
public class GsonHelper {
|
||||
private static final GsonBuilder s_gBuilder;
|
||||
static {
|
||||
s_gBuilder = new GsonBuilder();
|
||||
s_gBuilder.setVersion(1.3);
|
||||
s_gBuilder.registerTypeAdapter(Command[].class, new ArrayTypeAdaptor<Command>());
|
||||
s_gBuilder.registerTypeAdapter(Answer[].class, new ArrayTypeAdaptor<Answer>());
|
||||
}
|
||||
|
||||
public static GsonBuilder getBuilder() {
|
||||
return s_gBuilder;
|
||||
}
|
||||
public class GsonHelper {
|
||||
private static final Logger s_logger = Logger.getLogger(GsonHelper.class);
|
||||
|
||||
protected static final Gson s_gson;
|
||||
protected static final Gson s_gogger;
|
||||
|
||||
static {
|
||||
GsonBuilder gsonBuilder = new GsonBuilder();
|
||||
s_gson = setDefaultGsonConfig(gsonBuilder);
|
||||
GsonBuilder loggerBuilder = new GsonBuilder();
|
||||
loggerBuilder.disableHtmlEscaping();
|
||||
loggerBuilder.setExclusionStrategies(new LoggingExclusionStrategy(s_logger));
|
||||
s_gogger = setDefaultGsonConfig(loggerBuilder);
|
||||
s_logger.info("Default Builder inited.");
|
||||
}
|
||||
|
||||
static Gson setDefaultGsonConfig(GsonBuilder builder) {
|
||||
builder.setVersion(1.5);
|
||||
ArrayTypeAdaptor<Command> cmdAdaptor = new ArrayTypeAdaptor<Command>();
|
||||
builder.registerTypeAdapter(Command[].class, cmdAdaptor);
|
||||
ArrayTypeAdaptor<Answer> ansAdaptor = new ArrayTypeAdaptor<Answer>();
|
||||
builder.registerTypeAdapter(Answer[].class, ansAdaptor);
|
||||
builder.registerTypeAdapter(new TypeToken<List<PortConfig>>() {
|
||||
}.getType(), new PortConfigListTypeAdaptor());
|
||||
builder.registerTypeAdapter(new TypeToken<Pair<Long, Long>>() {
|
||||
}.getType(), new NwGroupsCommandTypeAdaptor());
|
||||
Gson gson = builder.create();
|
||||
cmdAdaptor.initGson(gson);
|
||||
ansAdaptor.initGson(gson);
|
||||
return gson;
|
||||
}
|
||||
|
||||
public final static Gson getGson() {
|
||||
return s_gson;
|
||||
}
|
||||
|
||||
public final static Gson getGsonLogger() {
|
||||
return s_gogger;
|
||||
}
|
||||
}
|
||||
|
||||
@ -38,25 +38,25 @@ import com.google.gson.Gson;
|
||||
public class SerializerHelper {
|
||||
public static final Logger s_logger = Logger.getLogger(SerializerHelper.class.getName());
|
||||
public static String token = "/";
|
||||
|
||||
public static String toSerializedStringOld(Object result) {
|
||||
if(result != null) {
|
||||
Class<?> clz = result.getClass();
|
||||
Gson gson = GsonHelper.getBuilder().create();
|
||||
return clz.getName() + token + gson.toJson(result);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static Object fromSerializedString(String result) {
|
||||
try {
|
||||
if(result != null && !result.isEmpty()) {
|
||||
|
||||
String[] serializedParts = result.split(token);
|
||||
public static String toSerializedStringOld(Object result) {
|
||||
if(result != null) {
|
||||
Class<?> clz = result.getClass();
|
||||
Gson gson = GsonHelper.getGson();
|
||||
return clz.getName() + token + gson.toJson(result);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
if (serializedParts.length < 2) {
|
||||
return null;
|
||||
}
|
||||
public static Object fromSerializedString(String result) {
|
||||
try {
|
||||
if(result != null && !result.isEmpty()) {
|
||||
|
||||
String[] serializedParts = result.split(token);
|
||||
|
||||
if (serializedParts.length < 2) {
|
||||
return null;
|
||||
}
|
||||
String clzName = serializedParts[0];
|
||||
String nameField = null;
|
||||
String content = null;
|
||||
@ -68,117 +68,123 @@ public class SerializerHelper {
|
||||
content = result.substring(index + nameField.length() + 2);
|
||||
}
|
||||
|
||||
Class<?> clz;
|
||||
try {
|
||||
clz = Class.forName(clzName);
|
||||
} catch (ClassNotFoundException e) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Gson gson = GsonHelper.getBuilder().create();
|
||||
Object obj = gson.fromJson(content, clz);
|
||||
return obj;
|
||||
}
|
||||
return null;
|
||||
} catch(RuntimeException e) {
|
||||
s_logger.error("Caught runtime exception when doing GSON deserialization on: " + result);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public static List<Pair<String, Object>> toPairList(Object o, String name) {
|
||||
List<Pair<String, Object>> l = new ArrayList<Pair<String, Object>>();
|
||||
return appendPairList(l, o, name);
|
||||
}
|
||||
|
||||
public static List<Pair<String, Object>> appendPairList(List<Pair<String, Object>> l, Object o, String name) {
|
||||
if(o != null) {
|
||||
Class<?> clz = o.getClass();
|
||||
|
||||
if(clz.isPrimitive() || clz.getSuperclass() == Number.class || clz == String.class || clz == Date.class) {
|
||||
l.add(new Pair<String, Object>(name, o.toString()));
|
||||
return l;
|
||||
}
|
||||
|
||||
for(Field f : clz.getDeclaredFields()) {
|
||||
if((f.getModifiers() & Modifier.STATIC) != 0)
|
||||
continue;
|
||||
|
||||
Param param = f.getAnnotation(Param.class);
|
||||
if(param == null)
|
||||
continue;
|
||||
|
||||
String propName = f.getName();
|
||||
if(!param.propName().isEmpty())
|
||||
propName = param.propName();
|
||||
|
||||
String paramName = param.name();
|
||||
if(paramName.isEmpty())
|
||||
paramName = propName;
|
||||
|
||||
Method method = getGetMethod(o, propName);
|
||||
if(method != null) {
|
||||
try {
|
||||
Object fieldValue = method.invoke(o);
|
||||
if(fieldValue != null) {
|
||||
if (f.getType() == Date.class) {
|
||||
l.add(new Pair<String, Object>(paramName, DateUtil.getOutputString((Date)fieldValue)));
|
||||
} else {
|
||||
l.add(new Pair<String, Object>(paramName, fieldValue.toString()));
|
||||
}
|
||||
}
|
||||
//else
|
||||
// l.add(new Pair<String, Object>(paramName, ""));
|
||||
} catch (IllegalArgumentException e) {
|
||||
s_logger.error("Illegal argument exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName);
|
||||
|
||||
} catch (IllegalAccessException e) {
|
||||
s_logger.error("Illegal access exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName);
|
||||
} catch (InvocationTargetException e) {
|
||||
s_logger.error("Invocation target exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return l;
|
||||
}
|
||||
|
||||
private static Method getGetMethod(Object o, String propName) {
|
||||
Method method = null;
|
||||
String methodName = getGetMethodName("get", propName);
|
||||
try {
|
||||
method = o.getClass().getMethod(methodName);
|
||||
} catch (SecurityException e1) {
|
||||
s_logger.error("Security exception in getting POJO " + o.getClass().getName() + " get method for property: " + propName);
|
||||
} catch (NoSuchMethodException e1) {
|
||||
if(s_logger.isTraceEnabled())
|
||||
s_logger.trace("POJO " + o.getClass().getName() + " does not have " + methodName + "() method for property: " + propName + ", will check is-prefixed method to see if it is boolean property");
|
||||
}
|
||||
|
||||
if(method != null)
|
||||
return method;
|
||||
|
||||
methodName = getGetMethodName("is", propName);
|
||||
try {
|
||||
method = o.getClass().getMethod(methodName);
|
||||
} catch (SecurityException e1) {
|
||||
s_logger.error("Security exception in getting POJO " + o.getClass().getName() + " get method for property: " + propName);
|
||||
} catch (NoSuchMethodException e1) {
|
||||
s_logger.warn("POJO " + o.getClass().getName() + " does not have " + methodName + "() method for property: " + propName);
|
||||
}
|
||||
return method;
|
||||
}
|
||||
|
||||
private static String getGetMethodName(String prefix, String fieldName) {
|
||||
StringBuffer sb = new StringBuffer(prefix);
|
||||
|
||||
if(fieldName.length() >= prefix.length() && fieldName.substring(0, prefix.length()).equals(prefix)) {
|
||||
return fieldName;
|
||||
} else {
|
||||
sb.append(fieldName.substring(0, 1).toUpperCase());
|
||||
sb.append(fieldName.substring(1));
|
||||
}
|
||||
|
||||
return sb.toString();
|
||||
}
|
||||
Class<?> clz;
|
||||
try {
|
||||
clz = Class.forName(clzName);
|
||||
} catch (ClassNotFoundException e) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Gson gson = GsonHelper.getGson();
|
||||
Object obj = gson.fromJson(content, clz);
|
||||
return obj;
|
||||
}
|
||||
return null;
|
||||
} catch(RuntimeException e) {
|
||||
s_logger.error("Caught runtime exception when doing GSON deserialization on: " + result);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public static List<Pair<String, Object>> toPairList(Object o, String name) {
|
||||
List<Pair<String, Object>> l = new ArrayList<Pair<String, Object>>();
|
||||
return appendPairList(l, o, name);
|
||||
}
|
||||
|
||||
public static List<Pair<String, Object>> appendPairList(List<Pair<String, Object>> l, Object o, String name) {
|
||||
if(o != null) {
|
||||
Class<?> clz = o.getClass();
|
||||
|
||||
if(clz.isPrimitive() || clz.getSuperclass() == Number.class || clz == String.class || clz == Date.class) {
|
||||
l.add(new Pair<String, Object>(name, o.toString()));
|
||||
return l;
|
||||
}
|
||||
|
||||
for(Field f : clz.getDeclaredFields()) {
|
||||
if((f.getModifiers() & Modifier.STATIC) != 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Param param = f.getAnnotation(Param.class);
|
||||
if(param == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
String propName = f.getName();
|
||||
if(!param.propName().isEmpty()) {
|
||||
propName = param.propName();
|
||||
}
|
||||
|
||||
String paramName = param.name();
|
||||
if(paramName.isEmpty()) {
|
||||
paramName = propName;
|
||||
}
|
||||
|
||||
Method method = getGetMethod(o, propName);
|
||||
if(method != null) {
|
||||
try {
|
||||
Object fieldValue = method.invoke(o);
|
||||
if(fieldValue != null) {
|
||||
if (f.getType() == Date.class) {
|
||||
l.add(new Pair<String, Object>(paramName, DateUtil.getOutputString((Date)fieldValue)));
|
||||
} else {
|
||||
l.add(new Pair<String, Object>(paramName, fieldValue.toString()));
|
||||
}
|
||||
}
|
||||
//else
|
||||
// l.add(new Pair<String, Object>(paramName, ""));
|
||||
} catch (IllegalArgumentException e) {
|
||||
s_logger.error("Illegal argument exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName);
|
||||
|
||||
} catch (IllegalAccessException e) {
|
||||
s_logger.error("Illegal access exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName);
|
||||
} catch (InvocationTargetException e) {
|
||||
s_logger.error("Invocation target exception when calling POJO " + o.getClass().getName() + " get method for property: " + propName);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return l;
|
||||
}
|
||||
|
||||
private static Method getGetMethod(Object o, String propName) {
|
||||
Method method = null;
|
||||
String methodName = getGetMethodName("get", propName);
|
||||
try {
|
||||
method = o.getClass().getMethod(methodName);
|
||||
} catch (SecurityException e1) {
|
||||
s_logger.error("Security exception in getting POJO " + o.getClass().getName() + " get method for property: " + propName);
|
||||
} catch (NoSuchMethodException e1) {
|
||||
if(s_logger.isTraceEnabled()) {
|
||||
s_logger.trace("POJO " + o.getClass().getName() + " does not have " + methodName + "() method for property: " + propName + ", will check is-prefixed method to see if it is boolean property");
|
||||
}
|
||||
}
|
||||
|
||||
if(method != null) {
|
||||
return method;
|
||||
}
|
||||
|
||||
methodName = getGetMethodName("is", propName);
|
||||
try {
|
||||
method = o.getClass().getMethod(methodName);
|
||||
} catch (SecurityException e1) {
|
||||
s_logger.error("Security exception in getting POJO " + o.getClass().getName() + " get method for property: " + propName);
|
||||
} catch (NoSuchMethodException e1) {
|
||||
s_logger.warn("POJO " + o.getClass().getName() + " does not have " + methodName + "() method for property: " + propName);
|
||||
}
|
||||
return method;
|
||||
}
|
||||
|
||||
private static String getGetMethodName(String prefix, String fieldName) {
|
||||
StringBuffer sb = new StringBuffer(prefix);
|
||||
|
||||
if(fieldName.length() >= prefix.length() && fieldName.substring(0, prefix.length()).equals(prefix)) {
|
||||
return fieldName;
|
||||
} else {
|
||||
sb.append(fieldName.substring(0, 1).toUpperCase());
|
||||
sb.append(fieldName.substring(1));
|
||||
}
|
||||
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
||||
@ -41,19 +41,20 @@ public class RequestTest extends TestCase {
|
||||
GetHostStatsCommand cmd3 = new GetHostStatsCommand("hostguid", "hostname", 101);
|
||||
cmd2.addPortConfig("abc", "24", true, "eth0");
|
||||
cmd2.addPortConfig("127.0.0.1", "44", false, "eth1");
|
||||
Request sreq = new Request(1, 2, 3, new Command[] { cmd1, cmd2, cmd3 }, true, true);
|
||||
Request sreq = new Request(2, 3, new Command[] { cmd1, cmd2, cmd3 }, true, true);
|
||||
sreq.setSequence(1);
|
||||
|
||||
Logger logger = Logger.getLogger(Request.class);
|
||||
Level level = logger.getLevel();
|
||||
|
||||
logger.setLevel(Level.DEBUG);
|
||||
sreq.log(1, "Debug");
|
||||
sreq.log("Debug", true);
|
||||
|
||||
logger.setLevel(Level.TRACE);
|
||||
sreq.log(1, "Trace");
|
||||
sreq.log("Trace", true);
|
||||
|
||||
logger.setLevel(Level.INFO);
|
||||
sreq.log(1, "Info");
|
||||
sreq.log("Info", true);
|
||||
|
||||
logger.setLevel(level);
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@ -18,293 +18,311 @@
|
||||
|
||||
package com.cloud.cluster;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.URLDecoder;
|
||||
|
||||
import org.apache.commons.httpclient.HttpStatus;
|
||||
import org.apache.http.HttpEntityEnclosingRequest;
|
||||
import org.apache.http.HttpException;
|
||||
import org.apache.http.HttpRequest;
|
||||
import org.apache.http.HttpResponse;
|
||||
import org.apache.http.entity.BasicHttpEntity;
|
||||
import org.apache.http.protocol.HttpContext;
|
||||
import org.apache.http.protocol.HttpRequestHandler;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import com.cloud.agent.Listener;
|
||||
import com.cloud.agent.api.Answer;
|
||||
import com.cloud.agent.api.ChangeAgentAnswer;
|
||||
import com.cloud.agent.api.ChangeAgentCommand;
|
||||
import com.cloud.agent.api.Command;
|
||||
import com.cloud.exception.AgentUnavailableException;
|
||||
import com.cloud.exception.OperationTimedoutException;
|
||||
import com.cloud.serializer.GsonHelper;
|
||||
import com.google.gson.Gson;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.URLDecoder;
|
||||
|
||||
import org.apache.commons.httpclient.HttpStatus;
|
||||
import org.apache.http.HttpEntityEnclosingRequest;
|
||||
import org.apache.http.HttpException;
|
||||
import org.apache.http.HttpRequest;
|
||||
import org.apache.http.HttpResponse;
|
||||
import org.apache.http.entity.BasicHttpEntity;
|
||||
import org.apache.http.protocol.HttpContext;
|
||||
import org.apache.http.protocol.HttpRequestHandler;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import com.cloud.agent.Listener;
|
||||
import com.cloud.agent.api.Answer;
|
||||
import com.cloud.agent.api.ChangeAgentAnswer;
|
||||
import com.cloud.agent.api.ChangeAgentCommand;
|
||||
import com.cloud.agent.api.Command;
|
||||
import com.cloud.exception.AgentUnavailableException;
|
||||
import com.cloud.exception.OperationTimedoutException;
|
||||
import com.cloud.serializer.GsonHelper;
|
||||
import com.google.gson.Gson;
|
||||
|
||||
public class ClusterServiceServletHttpHandler implements HttpRequestHandler {
|
||||
private static final Logger s_logger = Logger.getLogger(ClusterServiceServletHttpHandler.class);
|
||||
|
||||
|
||||
private final Gson gson;
|
||||
private final ClusterManager manager;
|
||||
|
||||
public ClusterServiceServletHttpHandler(ClusterManager manager) {
|
||||
this.manager = manager;
|
||||
this.manager = manager;
|
||||
|
||||
gson = GsonHelper.getBuilder().create();
|
||||
gson = GsonHelper.getGson();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(HttpRequest request, HttpResponse response, HttpContext context)
|
||||
throws HttpException, IOException {
|
||||
|
||||
try {
|
||||
if(s_logger.isTraceEnabled())
|
||||
s_logger.trace("Start Handling cluster HTTP request");
|
||||
|
||||
parseRequest(request);
|
||||
handleRequest(request, response);
|
||||
|
||||
if(s_logger.isTraceEnabled())
|
||||
s_logger.trace("Handle cluster HTTP request done");
|
||||
|
||||
} catch(Throwable e) {
|
||||
if(s_logger.isTraceEnabled())
|
||||
s_logger.trace("Unexpected exception " + e.toString());
|
||||
|
||||
writeResponse(response, HttpStatus.SC_INTERNAL_SERVER_ERROR, null);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
private void parseRequest(HttpRequest request) throws IOException {
|
||||
if(request instanceof HttpEntityEnclosingRequest) {
|
||||
HttpEntityEnclosingRequest entityRequest = (HttpEntityEnclosingRequest)request;
|
||||
|
||||
String body = EntityUtils.toString(entityRequest.getEntity());
|
||||
if(body != null) {
|
||||
String[] paramArray = body.split("&");
|
||||
if(paramArray != null) {
|
||||
for (String paramEntry : paramArray) {
|
||||
String[] paramValue = paramEntry.split("=");
|
||||
if (paramValue.length != 2)
|
||||
continue;
|
||||
|
||||
String name = URLDecoder.decode(paramValue[0]);
|
||||
String value = URLDecoder.decode(paramValue[1]);
|
||||
|
||||
if(s_logger.isTraceEnabled())
|
||||
s_logger.trace("Parsed request parameter " + name + "=" + value);
|
||||
request.getParams().setParameter(name, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void writeResponse(HttpResponse response, int statusCode, String content) {
|
||||
if(content == null)
|
||||
content = "";
|
||||
response.setStatusCode(statusCode);
|
||||
|
||||
@Override
|
||||
public void handle(HttpRequest request, HttpResponse response, HttpContext context)
|
||||
throws HttpException, IOException {
|
||||
|
||||
try {
|
||||
if(s_logger.isTraceEnabled()) {
|
||||
s_logger.trace("Start Handling cluster HTTP request");
|
||||
}
|
||||
|
||||
parseRequest(request);
|
||||
handleRequest(request, response);
|
||||
|
||||
if(s_logger.isTraceEnabled()) {
|
||||
s_logger.trace("Handle cluster HTTP request done");
|
||||
}
|
||||
|
||||
} catch(Throwable e) {
|
||||
if(s_logger.isTraceEnabled()) {
|
||||
s_logger.trace("Unexpected exception " + e.toString());
|
||||
}
|
||||
|
||||
writeResponse(response, HttpStatus.SC_INTERNAL_SERVER_ERROR, null);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
private void parseRequest(HttpRequest request) throws IOException {
|
||||
if(request instanceof HttpEntityEnclosingRequest) {
|
||||
HttpEntityEnclosingRequest entityRequest = (HttpEntityEnclosingRequest)request;
|
||||
|
||||
String body = EntityUtils.toString(entityRequest.getEntity());
|
||||
if(body != null) {
|
||||
String[] paramArray = body.split("&");
|
||||
if(paramArray != null) {
|
||||
for (String paramEntry : paramArray) {
|
||||
String[] paramValue = paramEntry.split("=");
|
||||
if (paramValue.length != 2) {
|
||||
continue;
|
||||
}
|
||||
|
||||
String name = URLDecoder.decode(paramValue[0]);
|
||||
String value = URLDecoder.decode(paramValue[1]);
|
||||
|
||||
if(s_logger.isTraceEnabled()) {
|
||||
s_logger.trace("Parsed request parameter " + name + "=" + value);
|
||||
}
|
||||
request.getParams().setParameter(name, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void writeResponse(HttpResponse response, int statusCode, String content) {
|
||||
if(content == null) {
|
||||
content = "";
|
||||
}
|
||||
response.setStatusCode(statusCode);
|
||||
BasicHttpEntity body = new BasicHttpEntity();
|
||||
body.setContentType("text/html; charset=UTF-8");
|
||||
|
||||
|
||||
byte[] bodyData = content.getBytes();
|
||||
body.setContent(new ByteArrayInputStream(bodyData));
|
||||
body.setContentLength(bodyData.length);
|
||||
response.setEntity(body);
|
||||
}
|
||||
|
||||
protected void handleRequest(HttpRequest req, HttpResponse response) {
|
||||
String method = (String)req.getParams().getParameter("method");
|
||||
|
||||
int nMethod = RemoteMethodConstants.METHOD_UNKNOWN;
|
||||
String responseContent = null;
|
||||
try {
|
||||
if(method != null)
|
||||
nMethod = Integer.parseInt(method);
|
||||
|
||||
switch(nMethod) {
|
||||
case RemoteMethodConstants.METHOD_EXECUTE :
|
||||
responseContent = handleExecuteMethodCall(req);
|
||||
break;
|
||||
|
||||
case RemoteMethodConstants.METHOD_EXECUTE_ASYNC :
|
||||
responseContent = handleExecuteAsyncMethodCall(req);
|
||||
break;
|
||||
|
||||
case RemoteMethodConstants.METHOD_ASYNC_RESULT :
|
||||
responseContent = handleAsyncResultMethodCall(req);
|
||||
break;
|
||||
|
||||
case RemoteMethodConstants.METHOD_PING :
|
||||
responseContent = handlePingMethodCall(req);
|
||||
break;
|
||||
|
||||
case RemoteMethodConstants.METHOD_UNKNOWN :
|
||||
default :
|
||||
assert(false);
|
||||
s_logger.error("unrecognized method " + nMethod);
|
||||
break;
|
||||
}
|
||||
} catch(Throwable e) {
|
||||
s_logger.error("Unexpected exception when processing cluster service request : ", e);
|
||||
}
|
||||
|
||||
if(responseContent != null) {
|
||||
writeResponse(response, HttpStatus.SC_OK, responseContent);
|
||||
} else {
|
||||
writeResponse(response, HttpStatus.SC_BAD_REQUEST, null);
|
||||
}
|
||||
}
|
||||
|
||||
private String handleExecuteMethodCall(HttpRequest req) {
|
||||
String agentId = (String)req.getParams().getParameter("agentId");
|
||||
String gsonPackage = (String)req.getParams().getParameter("gsonPackage");
|
||||
String stopOnError = (String)req.getParams().getParameter("stopOnError");
|
||||
|
||||
if(s_logger.isDebugEnabled())
|
||||
s_logger.debug("|->" + agentId + " " + gsonPackage);
|
||||
|
||||
Command [] cmds = null;
|
||||
try {
|
||||
cmds = gson.fromJson(gsonPackage, Command[].class);
|
||||
} catch(Throwable e) {
|
||||
assert(false);
|
||||
s_logger.error("Excection in gson decoding : ", e);
|
||||
}
|
||||
|
||||
if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) { //intercepted
|
||||
ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0];
|
||||
}
|
||||
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Intercepting command for agent change: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
|
||||
}
|
||||
boolean result = false;
|
||||
try {
|
||||
result = manager.executeAgentUserRequest(cmd.getAgentId(), cmd.getEvent());
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Result is " + result);
|
||||
}
|
||||
|
||||
} catch (AgentUnavailableException e) {
|
||||
s_logger.warn("Agent is unavailable", e);
|
||||
return null;
|
||||
}
|
||||
|
||||
Answer[] answers = new Answer[1];
|
||||
answers[0] = new ChangeAgentAnswer(cmd, result);
|
||||
return gson.toJson(answers);
|
||||
}
|
||||
|
||||
try {
|
||||
long startTick = System.currentTimeMillis();
|
||||
if(s_logger.isDebugEnabled())
|
||||
s_logger.debug("Send |-> " + agentId + " " + gsonPackage + " to agent manager");
|
||||
|
||||
Answer[] answers = manager.sendToAgent(Long.parseLong(agentId), cmds,
|
||||
Integer.parseInt(stopOnError) != 0 ? true : false);
|
||||
|
||||
if(answers != null) {
|
||||
String jsonReturn = gson.toJson(answers);
|
||||
|
||||
if(s_logger.isDebugEnabled())
|
||||
s_logger.debug("Completed |-> " + agentId + " " + gsonPackage +
|
||||
" in " + (System.currentTimeMillis() - startTick) + " ms, return result: " + jsonReturn);
|
||||
|
||||
return jsonReturn;
|
||||
} else {
|
||||
if(s_logger.isDebugEnabled())
|
||||
s_logger.debug("Completed |-> " + agentId + " " + gsonPackage +
|
||||
" in " + (System.currentTimeMillis() - startTick) + " ms, return null result");
|
||||
}
|
||||
} catch(AgentUnavailableException e) {
|
||||
s_logger.warn("Agent is unavailable", e);
|
||||
} catch (OperationTimedoutException e) {
|
||||
protected void handleRequest(HttpRequest req, HttpResponse response) {
|
||||
String method = (String)req.getParams().getParameter("method");
|
||||
|
||||
int nMethod = RemoteMethodConstants.METHOD_UNKNOWN;
|
||||
String responseContent = null;
|
||||
try {
|
||||
if(method != null) {
|
||||
nMethod = Integer.parseInt(method);
|
||||
}
|
||||
|
||||
switch(nMethod) {
|
||||
case RemoteMethodConstants.METHOD_EXECUTE :
|
||||
responseContent = handleExecuteMethodCall(req);
|
||||
break;
|
||||
|
||||
case RemoteMethodConstants.METHOD_EXECUTE_ASYNC :
|
||||
responseContent = handleExecuteAsyncMethodCall(req);
|
||||
break;
|
||||
|
||||
case RemoteMethodConstants.METHOD_ASYNC_RESULT :
|
||||
responseContent = handleAsyncResultMethodCall(req);
|
||||
break;
|
||||
|
||||
case RemoteMethodConstants.METHOD_PING :
|
||||
responseContent = handlePingMethodCall(req);
|
||||
break;
|
||||
|
||||
case RemoteMethodConstants.METHOD_UNKNOWN :
|
||||
default :
|
||||
assert(false);
|
||||
s_logger.error("unrecognized method " + nMethod);
|
||||
break;
|
||||
}
|
||||
} catch(Throwable e) {
|
||||
s_logger.error("Unexpected exception when processing cluster service request : ", e);
|
||||
}
|
||||
|
||||
if(responseContent != null) {
|
||||
writeResponse(response, HttpStatus.SC_OK, responseContent);
|
||||
} else {
|
||||
writeResponse(response, HttpStatus.SC_BAD_REQUEST, null);
|
||||
}
|
||||
}
|
||||
|
||||
private String handleExecuteMethodCall(HttpRequest req) {
|
||||
String agentId = (String)req.getParams().getParameter("agentId");
|
||||
String gsonPackage = (String)req.getParams().getParameter("gsonPackage");
|
||||
String stopOnError = (String)req.getParams().getParameter("stopOnError");
|
||||
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("|->" + agentId + " " + gsonPackage);
|
||||
}
|
||||
|
||||
Command [] cmds = null;
|
||||
try {
|
||||
cmds = gson.fromJson(gsonPackage, Command[].class);
|
||||
} catch(Throwable e) {
|
||||
assert(false);
|
||||
s_logger.error("Excection in gson decoding : ", e);
|
||||
}
|
||||
|
||||
if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) { //intercepted
|
||||
ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0];
|
||||
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Intercepting command for agent change: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
|
||||
}
|
||||
boolean result = false;
|
||||
try {
|
||||
result = manager.executeAgentUserRequest(cmd.getAgentId(), cmd.getEvent());
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Result is " + result);
|
||||
}
|
||||
|
||||
} catch (AgentUnavailableException e) {
|
||||
s_logger.warn("Agent is unavailable", e);
|
||||
return null;
|
||||
}
|
||||
|
||||
Answer[] answers = new Answer[1];
|
||||
answers[0] = new ChangeAgentAnswer(cmd, result);
|
||||
return gson.toJson(answers);
|
||||
}
|
||||
|
||||
try {
|
||||
long startTick = System.currentTimeMillis();
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Send |-> " + agentId + " " + gsonPackage + " to agent manager");
|
||||
}
|
||||
|
||||
Answer[] answers = manager.sendToAgent(Long.parseLong(agentId), cmds,
|
||||
Integer.parseInt(stopOnError) != 0 ? true : false);
|
||||
|
||||
if(answers != null) {
|
||||
String jsonReturn = gson.toJson(answers);
|
||||
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Completed |-> " + agentId + " " + gsonPackage +
|
||||
" in " + (System.currentTimeMillis() - startTick) + " ms, return result: " + jsonReturn);
|
||||
}
|
||||
|
||||
return jsonReturn;
|
||||
} else {
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Completed |-> " + agentId + " " + gsonPackage +
|
||||
" in " + (System.currentTimeMillis() - startTick) + " ms, return null result");
|
||||
}
|
||||
}
|
||||
} catch(AgentUnavailableException e) {
|
||||
s_logger.warn("Agent is unavailable", e);
|
||||
} catch (OperationTimedoutException e) {
|
||||
s_logger.warn("Timed Out", e);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private String handleExecuteAsyncMethodCall(HttpRequest req) {
|
||||
String agentId = (String)req.getParams().getParameter("agentId");
|
||||
String gsonPackage = (String)req.getParams().getParameter("gsonPackage");
|
||||
String stopOnError = (String)req.getParams().getParameter("stopOnError");
|
||||
String callingPeer = (String)req.getParams().getParameter("caller");
|
||||
|
||||
if(s_logger.isDebugEnabled())
|
||||
s_logger.debug("Async " + callingPeer + " |-> " + agentId + " " + gsonPackage);
|
||||
|
||||
Command [] cmds = null;
|
||||
try {
|
||||
cmds = gson.fromJson(gsonPackage, Command[].class);
|
||||
} catch(Throwable e) {
|
||||
assert(false);
|
||||
s_logger.error("Excection in gson decoding : ", e);
|
||||
}
|
||||
|
||||
Listener listener = new ClusterAsyncExectuionListener(manager, callingPeer);
|
||||
long seq = -1;
|
||||
try {
|
||||
long startTick = System.currentTimeMillis();
|
||||
if(s_logger.isDebugEnabled())
|
||||
s_logger.debug("Send Async " + callingPeer + " |-> " + agentId + " " + gsonPackage + " to agent manager");
|
||||
|
||||
seq = manager.sendToAgent(Long.parseLong(agentId), cmds,
|
||||
Integer.parseInt(stopOnError) != 0 ? true : false, listener);
|
||||
|
||||
if(s_logger.isDebugEnabled())
|
||||
s_logger.debug("Complated Async " + callingPeer + " |-> " + agentId + " " + gsonPackage + " in " +
|
||||
+ (System.currentTimeMillis() - startTick) + " ms, returned seq: " + seq);
|
||||
} catch (AgentUnavailableException e) {
|
||||
s_logger.warn("Agent is unavailable", e);
|
||||
seq = -1;
|
||||
}
|
||||
|
||||
return gson.toJson(seq);
|
||||
}
|
||||
|
||||
private String handleAsyncResultMethodCall(HttpRequest req) {
|
||||
String agentId = (String)req.getParams().getParameter("agentId");
|
||||
String gsonPackage = (String)req.getParams().getParameter("gsonPackage");
|
||||
String seq = (String)req.getParams().getParameter("seq");
|
||||
String executingPeer = (String)req.getParams().getParameter("executingPeer");
|
||||
|
||||
if(s_logger.isDebugEnabled())
|
||||
s_logger.debug("Async callback " + executingPeer + "." + agentId + " |-> " + gsonPackage);
|
||||
|
||||
Answer[] answers = null;
|
||||
try {
|
||||
answers = gson.fromJson(gsonPackage, Answer[].class);
|
||||
} catch(Throwable e) {
|
||||
assert(false);
|
||||
s_logger.error("Excection in gson decoding : ", e);
|
||||
}
|
||||
|
||||
long startTick = System.currentTimeMillis();
|
||||
if(manager.onAsyncResult(executingPeer, Long.parseLong(agentId), Long.parseLong(seq), answers)) {
|
||||
if(s_logger.isDebugEnabled())
|
||||
s_logger.debug("Completed local callback in " + (System.currentTimeMillis() - startTick) +
|
||||
" ms, return recurring=true, let async listener contine on");
|
||||
|
||||
return "recurring=true";
|
||||
}
|
||||
|
||||
if(s_logger.isDebugEnabled())
|
||||
s_logger.debug("Completed local callback in " + (System.currentTimeMillis() - startTick) +
|
||||
" ms, return recurring=false, indicate to tear down async listener");
|
||||
|
||||
return "recurring=false";
|
||||
}
|
||||
|
||||
private String handlePingMethodCall(HttpRequest req) {
|
||||
String callingPeer = (String)req.getParams().getParameter("callingPeer");
|
||||
|
||||
if(s_logger.isDebugEnabled())
|
||||
s_logger.debug("Handle ping request from " + callingPeer);
|
||||
|
||||
return "true";
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private String handleExecuteAsyncMethodCall(HttpRequest req) {
|
||||
String agentId = (String)req.getParams().getParameter("agentId");
|
||||
String gsonPackage = (String)req.getParams().getParameter("gsonPackage");
|
||||
String stopOnError = (String)req.getParams().getParameter("stopOnError");
|
||||
String callingPeer = (String)req.getParams().getParameter("caller");
|
||||
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Async " + callingPeer + " |-> " + agentId + " " + gsonPackage);
|
||||
}
|
||||
|
||||
Command [] cmds = null;
|
||||
try {
|
||||
cmds = gson.fromJson(gsonPackage, Command[].class);
|
||||
} catch(Throwable e) {
|
||||
assert(false);
|
||||
s_logger.error("Excection in gson decoding : ", e);
|
||||
}
|
||||
|
||||
Listener listener = new ClusterAsyncExectuionListener(manager, callingPeer);
|
||||
long seq = -1;
|
||||
try {
|
||||
long startTick = System.currentTimeMillis();
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Send Async " + callingPeer + " |-> " + agentId + " " + gsonPackage + " to agent manager");
|
||||
}
|
||||
|
||||
seq = manager.sendToAgent(Long.parseLong(agentId), cmds,
|
||||
Integer.parseInt(stopOnError) != 0 ? true : false, listener);
|
||||
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Complated Async " + callingPeer + " |-> " + agentId + " " + gsonPackage + " in " +
|
||||
+ (System.currentTimeMillis() - startTick) + " ms, returned seq: " + seq);
|
||||
}
|
||||
} catch (AgentUnavailableException e) {
|
||||
s_logger.warn("Agent is unavailable", e);
|
||||
seq = -1;
|
||||
}
|
||||
|
||||
return gson.toJson(seq);
|
||||
}
|
||||
|
||||
private String handleAsyncResultMethodCall(HttpRequest req) {
|
||||
String agentId = (String)req.getParams().getParameter("agentId");
|
||||
String gsonPackage = (String)req.getParams().getParameter("gsonPackage");
|
||||
String seq = (String)req.getParams().getParameter("seq");
|
||||
String executingPeer = (String)req.getParams().getParameter("executingPeer");
|
||||
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Async callback " + executingPeer + "." + agentId + " |-> " + gsonPackage);
|
||||
}
|
||||
|
||||
Answer[] answers = null;
|
||||
try {
|
||||
answers = gson.fromJson(gsonPackage, Answer[].class);
|
||||
} catch(Throwable e) {
|
||||
assert(false);
|
||||
s_logger.error("Excection in gson decoding : ", e);
|
||||
}
|
||||
|
||||
long startTick = System.currentTimeMillis();
|
||||
if(manager.onAsyncResult(executingPeer, Long.parseLong(agentId), Long.parseLong(seq), answers)) {
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Completed local callback in " + (System.currentTimeMillis() - startTick) +
|
||||
" ms, return recurring=true, let async listener contine on");
|
||||
}
|
||||
|
||||
return "recurring=true";
|
||||
}
|
||||
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Completed local callback in " + (System.currentTimeMillis() - startTick) +
|
||||
" ms, return recurring=false, indicate to tear down async listener");
|
||||
}
|
||||
|
||||
return "recurring=false";
|
||||
}
|
||||
|
||||
private String handlePingMethodCall(HttpRequest req) {
|
||||
String callingPeer = (String)req.getParams().getParameter("callingPeer");
|
||||
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Handle ping request from " + callingPeer);
|
||||
}
|
||||
|
||||
return "true";
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,157 +18,167 @@
|
||||
|
||||
package com.cloud.cluster;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.rmi.RemoteException;
|
||||
|
||||
import org.apache.commons.httpclient.HttpClient;
|
||||
import org.apache.commons.httpclient.HttpException;
|
||||
import org.apache.commons.httpclient.HttpStatus;
|
||||
import org.apache.commons.httpclient.methods.PostMethod;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import com.cloud.serializer.GsonHelper;
|
||||
import com.google.gson.Gson;
|
||||
import java.io.IOException;
|
||||
import java.rmi.RemoteException;
|
||||
|
||||
import org.apache.commons.httpclient.HttpClient;
|
||||
import org.apache.commons.httpclient.HttpException;
|
||||
import org.apache.commons.httpclient.HttpStatus;
|
||||
import org.apache.commons.httpclient.methods.PostMethod;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import com.cloud.serializer.GsonHelper;
|
||||
import com.google.gson.Gson;
|
||||
|
||||
public class ClusterServiceServletImpl implements ClusterService {
|
||||
private static final long serialVersionUID = 4574025200012566153L;
|
||||
private static final long serialVersionUID = 4574025200012566153L;
|
||||
|
||||
private static final Logger s_logger = Logger.getLogger(ClusterServiceServletImpl.class);
|
||||
|
||||
private static final Logger s_logger = Logger.getLogger(ClusterServiceServletImpl.class);
|
||||
|
||||
private String serviceUrl;
|
||||
|
||||
|
||||
private Gson gson;
|
||||
|
||||
|
||||
public ClusterServiceServletImpl() {
|
||||
gson = GsonHelper.getBuilder().create();
|
||||
gson = GsonHelper.getGson();
|
||||
}
|
||||
|
||||
|
||||
public ClusterServiceServletImpl(String serviceUrl) {
|
||||
this.serviceUrl = serviceUrl;
|
||||
|
||||
gson = GsonHelper.getBuilder().create();
|
||||
this.serviceUrl = serviceUrl;
|
||||
|
||||
gson = GsonHelper.getGson();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String execute(String callingPeer, long agentId, String gsonPackage, boolean stopOnError) throws RemoteException {
|
||||
if(s_logger.isDebugEnabled())
|
||||
s_logger.debug("Post (sync-call) " + gsonPackage + " to " + serviceUrl + " for agent " + agentId + " from " + callingPeer);
|
||||
|
||||
public String execute(String callingPeer, long agentId, String gsonPackage, boolean stopOnError) throws RemoteException {
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Post (sync-call) " + gsonPackage + " to " + serviceUrl + " for agent " + agentId + " from " + callingPeer);
|
||||
}
|
||||
|
||||
HttpClient client = new HttpClient();
|
||||
PostMethod method = new PostMethod(serviceUrl);
|
||||
|
||||
|
||||
method.addParameter("method", Integer.toString(RemoteMethodConstants.METHOD_EXECUTE));
|
||||
method.addParameter("agentId", Long.toString(agentId));
|
||||
method.addParameter("gsonPackage", gsonPackage);
|
||||
method.addParameter("stopOnError", stopOnError ? "1" : "0");
|
||||
|
||||
return executePostMethod(client, method);
|
||||
|
||||
return executePostMethod(client, method);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public long executeAsync(String callingPeer, long agentId, String gsonPackage, boolean stopOnError) throws RemoteException {
|
||||
|
||||
if(s_logger.isDebugEnabled())
|
||||
s_logger.debug("Post (Async-call) " + gsonPackage + " to " + serviceUrl + " for agent " + agentId + " from " + callingPeer);
|
||||
|
||||
public long executeAsync(String callingPeer, long agentId, String gsonPackage, boolean stopOnError) throws RemoteException {
|
||||
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Post (Async-call) " + gsonPackage + " to " + serviceUrl + " for agent " + agentId + " from " + callingPeer);
|
||||
}
|
||||
|
||||
HttpClient client = new HttpClient();
|
||||
PostMethod method = new PostMethod(serviceUrl);
|
||||
|
||||
|
||||
method.addParameter("method", Integer.toString(RemoteMethodConstants.METHOD_EXECUTE_ASYNC));
|
||||
method.addParameter("agentId", Long.toString(agentId));
|
||||
method.addParameter("gsonPackage", gsonPackage);
|
||||
method.addParameter("stopOnError", stopOnError ? "1" : "0");
|
||||
method.addParameter("caller", callingPeer);
|
||||
|
||||
String result = executePostMethod(client, method);
|
||||
if(result == null) {
|
||||
s_logger.error("Empty return from remote async-execution on " + serviceUrl);
|
||||
throw new RemoteException("Invalid result returned from async-execution on peer : " + serviceUrl);
|
||||
}
|
||||
|
||||
try {
|
||||
return gson.fromJson(result, Long.class);
|
||||
} catch(Throwable e) {
|
||||
s_logger.error("Unable to parse executeAsync return : " + result);
|
||||
throw new RemoteException("Invalid result returned from async-execution on peer : " + serviceUrl);
|
||||
}
|
||||
|
||||
String result = executePostMethod(client, method);
|
||||
if(result == null) {
|
||||
s_logger.error("Empty return from remote async-execution on " + serviceUrl);
|
||||
throw new RemoteException("Invalid result returned from async-execution on peer : " + serviceUrl);
|
||||
}
|
||||
|
||||
try {
|
||||
return gson.fromJson(result, Long.class);
|
||||
} catch(Throwable e) {
|
||||
s_logger.error("Unable to parse executeAsync return : " + result);
|
||||
throw new RemoteException("Invalid result returned from async-execution on peer : " + serviceUrl);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean onAsyncResult(String executingPeer, long agentId, long seq, String gsonPackage) throws RemoteException {
|
||||
if(s_logger.isDebugEnabled())
|
||||
s_logger.debug("Forward Async-call answer to remote listener, agent: " + agentId
|
||||
+ ", excutingPeer: " + executingPeer
|
||||
+ ", seq: " + seq + ", gsonPackage: " + gsonPackage);
|
||||
|
||||
|
||||
@Override
|
||||
public boolean onAsyncResult(String executingPeer, long agentId, long seq, String gsonPackage) throws RemoteException {
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Forward Async-call answer to remote listener, agent: " + agentId
|
||||
+ ", excutingPeer: " + executingPeer
|
||||
+ ", seq: " + seq + ", gsonPackage: " + gsonPackage);
|
||||
}
|
||||
|
||||
HttpClient client = new HttpClient();
|
||||
PostMethod method = new PostMethod(serviceUrl);
|
||||
|
||||
|
||||
method.addParameter("method", Integer.toString(RemoteMethodConstants.METHOD_ASYNC_RESULT));
|
||||
method.addParameter("agentId", Long.toString(agentId));
|
||||
method.addParameter("gsonPackage", gsonPackage);
|
||||
method.addParameter("seq", Long.toString(seq));
|
||||
method.addParameter("executingPeer", executingPeer);
|
||||
|
||||
String result = executePostMethod(client, method);
|
||||
if(result.contains("recurring=true")) {
|
||||
if(s_logger.isDebugEnabled())
|
||||
s_logger.debug("Remote listener returned recurring=true");
|
||||
return true;
|
||||
}
|
||||
|
||||
if(s_logger.isDebugEnabled())
|
||||
s_logger.debug("Remote listener returned recurring=false");
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean ping(String callingPeer) throws RemoteException {
|
||||
if(s_logger.isDebugEnabled())
|
||||
s_logger.debug("Ping at " + serviceUrl);
|
||||
|
||||
|
||||
String result = executePostMethod(client, method);
|
||||
if(result.contains("recurring=true")) {
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Remote listener returned recurring=true");
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Remote listener returned recurring=false");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean ping(String callingPeer) throws RemoteException {
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Ping at " + serviceUrl);
|
||||
}
|
||||
|
||||
HttpClient client = new HttpClient();
|
||||
PostMethod method = new PostMethod(serviceUrl);
|
||||
|
||||
|
||||
method.addParameter("method", Integer.toString(RemoteMethodConstants.METHOD_PING));
|
||||
method.addParameter("callingPeer", callingPeer);
|
||||
String returnVal = executePostMethod(client, method);
|
||||
if("true".equalsIgnoreCase(returnVal))
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
private String executePostMethod(HttpClient client, PostMethod method) {
|
||||
String returnVal = executePostMethod(client, method);
|
||||
if("true".equalsIgnoreCase(returnVal)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private String executePostMethod(HttpClient client, PostMethod method) {
|
||||
int response = 0;
|
||||
String result = null;
|
||||
try {
|
||||
long startTick = System.currentTimeMillis();
|
||||
response = client.executeMethod(method);
|
||||
if(response == HttpStatus.SC_OK) {
|
||||
result = method.getResponseBodyAsString();
|
||||
if(s_logger.isDebugEnabled())
|
||||
s_logger.debug("POST " + serviceUrl + " response :" + result + ", responding time: "
|
||||
+ (System.currentTimeMillis() - startTick) + " ms");
|
||||
} else {
|
||||
s_logger.error("Invalid response code : " + response + ", from : "
|
||||
+ serviceUrl + ", method : " + method.getParameter("method")
|
||||
+ " responding time: " + (System.currentTimeMillis() - startTick));
|
||||
}
|
||||
long startTick = System.currentTimeMillis();
|
||||
response = client.executeMethod(method);
|
||||
if(response == HttpStatus.SC_OK) {
|
||||
result = method.getResponseBodyAsString();
|
||||
if(s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("POST " + serviceUrl + " response :" + result + ", responding time: "
|
||||
+ (System.currentTimeMillis() - startTick) + " ms");
|
||||
}
|
||||
} else {
|
||||
s_logger.error("Invalid response code : " + response + ", from : "
|
||||
+ serviceUrl + ", method : " + method.getParameter("method")
|
||||
+ " responding time: " + (System.currentTimeMillis() - startTick));
|
||||
}
|
||||
} catch (HttpException e) {
|
||||
s_logger.error("HttpException from : " + serviceUrl + ", method : " + method.getParameter("method"));
|
||||
s_logger.error("HttpException from : " + serviceUrl + ", method : " + method.getParameter("method"));
|
||||
} catch (IOException e) {
|
||||
s_logger.error("IOException from : " + serviceUrl + ", method : " + method.getParameter("method"));
|
||||
s_logger.error("IOException from : " + serviceUrl + ", method : " + method.getParameter("method"));
|
||||
} catch(Throwable e) {
|
||||
s_logger.error("Exception from : " + serviceUrl + ", method : " + method.getParameter("method") + ", exception :", e);
|
||||
s_logger.error("Exception from : " + serviceUrl + ", method : " + method.getParameter("method") + ", exception :", e);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
// for test purpose only
|
||||
public static void main(String[] args) {
|
||||
ClusterServiceServletImpl service = new ClusterServiceServletImpl("http://localhost:9090/clusterservice");
|
||||
try {
|
||||
String result = service.execute("test", 1, "{ p1:v1, p2:v2 }", true);
|
||||
System.out.println(result);
|
||||
} catch (RemoteException e) {
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
// for test purpose only
|
||||
public static void main(String[] args) {
|
||||
ClusterServiceServletImpl service = new ClusterServiceServletImpl("http://localhost:9090/clusterservice");
|
||||
try {
|
||||
String result = service.execute("test", 1, "{ p1:v1, p2:v2 }", true);
|
||||
System.out.println(result);
|
||||
} catch (RemoteException e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -30,6 +30,7 @@ import javax.naming.ConfigurationException;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import com.cloud.api.ApiDispatcher;
|
||||
import com.cloud.api.ApiGsonHelper;
|
||||
import com.cloud.api.commands.CreateSnapshotCmd;
|
||||
import com.cloud.async.AsyncJobManager;
|
||||
import com.cloud.async.AsyncJobResult;
|
||||
@ -38,7 +39,6 @@ import com.cloud.async.dao.AsyncJobDao;
|
||||
import com.cloud.configuration.dao.ConfigurationDao;
|
||||
import com.cloud.event.EventTypes;
|
||||
import com.cloud.event.EventUtils;
|
||||
import com.cloud.serializer.GsonHelper;
|
||||
import com.cloud.storage.Snapshot;
|
||||
import com.cloud.storage.SnapshotPolicyVO;
|
||||
import com.cloud.storage.SnapshotScheduleVO;
|
||||
@ -67,7 +67,7 @@ import com.cloud.utils.db.SearchCriteria;
|
||||
@Local(value={SnapshotScheduler.class})
|
||||
public class SnapshotSchedulerImpl implements SnapshotScheduler {
|
||||
private static final Logger s_logger = Logger.getLogger(SnapshotSchedulerImpl.class);
|
||||
|
||||
|
||||
private String _name = null;
|
||||
@Inject protected AsyncJobDao _asyncJobDao;
|
||||
@Inject protected SnapshotDao _snapshotDao;
|
||||
@ -77,13 +77,13 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler {
|
||||
@Inject protected SnapshotManager _snapshotManager;
|
||||
@Inject protected StoragePoolHostDao _poolHostDao;
|
||||
@Inject protected VolumeDao _volsDao;
|
||||
|
||||
|
||||
private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds
|
||||
private int _snapshotPollInterval;
|
||||
private Timer _testClockTimer;
|
||||
private Date _currentTimestamp;
|
||||
private TestClock _testTimerTask;
|
||||
|
||||
|
||||
private Date getNextScheduledTime(long policyId, Date currentTimestamp) {
|
||||
SnapshotPolicyVO policy = _snapshotPolicyDao.findById(policyId);
|
||||
Date nextTimestamp = null;
|
||||
@ -107,7 +107,7 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler {
|
||||
public void poll(Date currentTimestamp) {
|
||||
// We don't maintain the time. The timer task does.
|
||||
_currentTimestamp = currentTimestamp;
|
||||
|
||||
|
||||
GlobalLock scanLock = GlobalLock.getInternLock("snapshot.poll");
|
||||
try {
|
||||
if(scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
|
||||
@ -120,7 +120,7 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler {
|
||||
} finally {
|
||||
scanLock.releaseRef();
|
||||
}
|
||||
|
||||
|
||||
scanLock = GlobalLock.getInternLock("snapshot.poll");
|
||||
try {
|
||||
if(scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
|
||||
@ -132,9 +132,9 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler {
|
||||
}
|
||||
} finally {
|
||||
scanLock.releaseRef();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void checkStatusOfCurrentlyExecutingSnapshots() {
|
||||
SearchCriteria<SnapshotScheduleVO> sc = _snapshotScheduleDao.createSearchCriteria();
|
||||
sc.addAnd("asyncJobId", SearchCriteria.Op.NNULL);
|
||||
@ -182,19 +182,19 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler {
|
||||
// and cleanup the previous snapshot
|
||||
// Set the userId to that of system.
|
||||
//_snapshotManager.validateSnapshot(1L, snapshot);
|
||||
// In all cases, schedule the next snapshot job
|
||||
// In all cases, schedule the next snapshot job
|
||||
scheduleNextSnapshotJob(snapshotSchedule);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
break;
|
||||
case AsyncJobResult.STATUS_IN_PROGRESS:
|
||||
// There is no way of knowing from here whether
|
||||
// There is no way of knowing from here whether
|
||||
// 1) Another management server is processing this snapshot job
|
||||
// 2) The management server has crashed and this snapshot is lying
|
||||
// 2) The management server has crashed and this snapshot is lying
|
||||
// around in an inconsistent state.
|
||||
// Hopefully, this can be resolved at the backend when the current snapshot gets executed.
|
||||
// But if it remains in this state, the current snapshot will not get executed.
|
||||
// But if it remains in this state, the current snapshot will not get executed.
|
||||
// And it will remain in stasis.
|
||||
break;
|
||||
}
|
||||
@ -205,7 +205,7 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler {
|
||||
protected void scheduleSnapshots() {
|
||||
String displayTime = DateUtil.displayDateInTimezone(DateUtil.GMT_TIMEZONE, _currentTimestamp);
|
||||
s_logger.debug("Snapshot scheduler.poll is being called at " + displayTime);
|
||||
|
||||
|
||||
List<SnapshotScheduleVO> snapshotsToBeExecuted = _snapshotScheduleDao.getSchedulesToExecute(_currentTimestamp);
|
||||
s_logger.debug("Got " + snapshotsToBeExecuted.size() + " snapshots to be executed at " + displayTime);
|
||||
|
||||
@ -244,12 +244,12 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler {
|
||||
params.put("ctxUserId", "1");
|
||||
params.put("ctxAccountId", "1");
|
||||
params.put("ctxStartEventId", String.valueOf(eventId));
|
||||
|
||||
|
||||
CreateSnapshotCmd cmd = new CreateSnapshotCmd();
|
||||
ApiDispatcher.getInstance().dispatchCreateCmd(cmd, params);
|
||||
params.put("id", ""+cmd.getEntityId());
|
||||
params.put("ctxStartEventId", "1");
|
||||
|
||||
|
||||
AsyncJobVO job = new AsyncJobVO();
|
||||
job.setUserId(userId);
|
||||
// Just have SYSTEM own the job for now. Users won't be able to see this job, but
|
||||
@ -257,7 +257,7 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler {
|
||||
job.setAccountId(1L);
|
||||
job.setCmd(CreateSnapshotCmd.class.getName());
|
||||
job.setInstanceId(cmd.getEntityId());
|
||||
job.setCmdInfo(GsonHelper.getBuilder().create().toJson(params));
|
||||
job.setCmdInfo(ApiGsonHelper.getBuilder().create().toJson(params));
|
||||
|
||||
long jobId = _asyncMgr.submitAsyncJob(job);
|
||||
|
||||
@ -286,7 +286,7 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler {
|
||||
}
|
||||
return scheduleNextSnapshotJob(snapshotPolicy);
|
||||
}
|
||||
|
||||
|
||||
@Override @DB
|
||||
public Date scheduleNextSnapshotJob(SnapshotPolicyVO policy) {
|
||||
if ( policy == null) {
|
||||
@ -317,9 +317,9 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler {
|
||||
}
|
||||
return nextSnapshotTimestamp;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@Override @DB
|
||||
public boolean removeSchedule(Long volumeId, Long policyId) {
|
||||
// We can only remove schedules which are in the future. Not which are already executed in the past.
|
||||
@ -341,7 +341,7 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler {
|
||||
_name = name;
|
||||
|
||||
ComponentLocator locator = ComponentLocator.getCurrentLocator();
|
||||
|
||||
|
||||
ConfigurationDao configDao = locator.getDao(ConfigurationDao.class);
|
||||
if (configDao == null) {
|
||||
s_logger.error("Unable to get the configuration dao. " + ConfigurationDao.class.getName());
|
||||
@ -357,12 +357,12 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler {
|
||||
int daysPerMonth = NumbersUtil.parseInt(configDao.getValue("snapshot.test.days.per.month"), 30);
|
||||
int weeksPerMonth = NumbersUtil.parseInt(configDao.getValue("snapshot.test.weeks.per.month"), 4);
|
||||
int monthsPerYear = NumbersUtil.parseInt(configDao.getValue("snapshot.test.months.per.year"), 12);
|
||||
|
||||
|
||||
_testTimerTask = new TestClock(this, minutesPerHour, hoursPerDay, daysPerWeek, daysPerMonth, weeksPerMonth, monthsPerYear);
|
||||
}
|
||||
_currentTimestamp = new Date();
|
||||
s_logger.info("Snapshot Scheduler is configured.");
|
||||
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -397,7 +397,7 @@ public class SnapshotSchedulerImpl implements SnapshotScheduler {
|
||||
_testClockTimer = new Timer("SnapshotPollTask");
|
||||
_testClockTimer.schedule(timerTask, _snapshotPollInterval*1000L, _snapshotPollInterval*1000L);
|
||||
}
|
||||
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user