Fixes/tests on sample management server on new RPC/Async framework

This commit is contained in:
Kelven Yang 2012-12-13 15:18:25 -08:00
parent a6c441fcc5
commit e72417a1e7
8 changed files with 61 additions and 27 deletions

View File

@ -19,16 +19,20 @@
package org.apache.cloudstack.framework.messaging;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
public class JsonMessageSerializer implements MessageSerializer {
// this will be injected from external to allow installation of
// type adapters need by upper layer applications
// type adapters needed by upper layer applications
private Gson _gson;
private OnwireClassRegistry _clzRegistry;
public JsonMessageSerializer() {
GsonBuilder gsonBuilder = new GsonBuilder();
gsonBuilder.setVersion(1.5);
_gson = gsonBuilder.create();
}
public Gson getGson() {

View File

@ -42,6 +42,7 @@ public class OnwireClassRegistry {
private Map<String, Class<?>> registry = new HashMap<String, Class<?>>();
public OnwireClassRegistry() {
registry.put("Object", Object.class);
}
public OnwireClassRegistry(String packageName) {

View File

@ -28,6 +28,7 @@ public interface RpcProvider extends TransportMultiplexier {
void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint);
void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint);
RpcClientCall newCall();
RpcClientCall newCall(String targetAddress);
RpcClientCall newCall(TransportAddressMapper targetAddress);

View File

@ -30,7 +30,7 @@ public class RpcProviderImpl implements RpcProvider {
private String _transportAddress;
private RpcTransportEndpoint _transportEndpoint = new RpcTransportEndpoint(); // transport attachment at RPC layer
private MessageSerializer _messageSerializer = new JsonMessageSerializer(); // default message serializer
private MessageSerializer _messageSerializer;
private List<RpcServiceEndpoint> _serviceEndpoints = new ArrayList<RpcServiceEndpoint>();
private Map<Long, RpcClientCall> _outstandingCalls = new HashMap<Long, RpcClientCall>();
@ -102,6 +102,11 @@ public class RpcProviderImpl implements RpcProvider {
}
}
@Override
public RpcClientCall newCall() {
return newCall(TransportAddress.getLocalPredefinedTransportAddress("RpcProvider").toString());
}
@Override
public RpcClientCall newCall(String targetAddress) {
@ -111,22 +116,6 @@ public class RpcProviderImpl implements RpcProvider {
call.setTargetAddress(targetAddress);
call.setCallTag(callTag);
RpcCallRequestPdu pdu = new RpcCallRequestPdu();
pdu.setCommand(call.getCommand());
pdu.setRequestTag(callTag);
pdu.setRequestStartTick(System.currentTimeMillis());
String serializedCmdArg;
if(call.getCommandArg() != null)
serializedCmdArg = _messageSerializer.serializeTo(call.getCommandArg().getClass(), call.getCommandArg());
else
serializedCmdArg = _messageSerializer.serializeTo(Object.class, null);
pdu.setSerializedCommandArg(serializedCmdArg);
String serializedPdu = _messageSerializer.serializeTo(RpcCallRequestPdu.class, pdu);
_transportProvider.sendMessage(_transportAddress, targetAddress, RPC_MULTIPLEXIER,
serializedPdu);
return call;
}

View File

@ -19,6 +19,9 @@
package org.apache.cloudstack.framework.messaging;
public interface TransportProvider {
void setMessageSerializer(MessageSerializer messageSerializer);
MessageSerializer getMessageSerializer();
TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress);
boolean detach(TransportEndpoint endpoint);

View File

@ -18,12 +18,15 @@
*/
package org.apache.cloudstack.framework.messaging.client;
import org.apache.cloudstack.framework.messaging.MessageSerializer;
import org.apache.cloudstack.framework.messaging.TransportEndpoint;
import org.apache.cloudstack.framework.messaging.TransportEndpointSite;
import org.apache.cloudstack.framework.messaging.TransportProvider;
public class ClientTransportProvider implements TransportProvider {
private MessageSerializer _messageSerializer;
@Override
public TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress) {
// TODO Auto-generated method stub
@ -36,6 +39,17 @@ public class ClientTransportProvider implements TransportProvider {
return false;
}
@Override
public void setMessageSerializer(MessageSerializer messageSerializer) {
assert(messageSerializer != null);
_messageSerializer = messageSerializer;
}
@Override
public MessageSerializer getMessageSerializer() {
return _messageSerializer;
}
@Override
public void requestSiteOutput(TransportEndpointSite site) {

View File

@ -24,6 +24,7 @@ import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.cloudstack.framework.messaging.MessageSerializer;
import org.apache.cloudstack.framework.messaging.TransportAddress;
import org.apache.cloudstack.framework.messaging.TransportDataPdu;
import org.apache.cloudstack.framework.messaging.TransportEndpoint;
@ -46,6 +47,8 @@ public class ServerTransportProvider implements TransportProvider {
private ExecutorService _executor;
private int _nextEndpointId = new Random().nextInt();
private MessageSerializer _messageSerializer;
public ServerTransportProvider() {
}
@ -70,6 +73,17 @@ public class ServerTransportProvider implements TransportProvider {
return this;
}
@Override
public void setMessageSerializer(MessageSerializer messageSerializer) {
assert(messageSerializer != null);
_messageSerializer = messageSerializer;
}
@Override
public MessageSerializer getMessageSerializer() {
return _messageSerializer;
}
public void initialize() {
_executor = Executors.newFixedThreadPool(_poolSize, new NamedThreadFactory("Transport-Worker"));
}

View File

@ -16,15 +16,6 @@
<context:annotation-config />
<context:component-scan base-package="org.apache.cloudstack, com.cloud" />
<bean id="transportProvider" class="org.apache.cloudstack.framework.messaging.server.ServerTransportProvider" init-method="initialize">
<property name="workerPoolSize" value="5" />
<property name="nodeId" value="Node1" />
</bean>
<bean id="rpcProvider" class="org.apache.cloudstack.framework.messaging.RpcProviderImpl" init-method="initialize">
<constructor-arg ref="transportProvider" />
</bean>
<bean id="eventBus" class = "org.apache.cloudstack.framework.messaging.EventBusBase" />
<bean id="onwireRegistry" class="org.apache.cloudstack.framework.messaging.OnwireClassRegistry"
init-method="scan" >
<property name="packages">
@ -34,4 +25,21 @@
</property>
</bean>
<bean id="messageSerializer" class="org.apache.cloudstack.framework.messaging.JsonMessageSerializer">
<property name="onwireClassRegistry" ref="onwireRegistry" />
</bean>
<bean id="transportProvider" class="org.apache.cloudstack.framework.messaging.server.ServerTransportProvider" init-method="initialize">
<property name="workerPoolSize" value="5" />
<property name="nodeId" value="Node1" />
<property name="messageSerializer" ref="messageSerializer" />
</bean>
<bean id="rpcProvider" class="org.apache.cloudstack.framework.messaging.RpcProviderImpl" init-method="initialize">
<constructor-arg ref="transportProvider" />
<property name="messageSerializer" ref="messageSerializer" />
</bean>
<bean id="eventBus" class = "org.apache.cloudstack.framework.messaging.EventBusBase" />
</beans>