From f52950689b53d2be9d457a1fc494463d48df4552 Mon Sep 17 00:00:00 2001 From: Kelven Yang Date: Tue, 11 Dec 2012 17:53:46 -0800 Subject: [PATCH] Hook server side RPC provider with a server side transport --- .../framework/messaging/RpcProvider.java | 4 +- .../framework/messaging/RpcProviderImpl.java | 79 ++++++++++++++----- .../messaging/TransportEndpoint.java | 10 +-- .../messaging/TransportEndpointSite.java | 38 ++++++++- .../messaging/TransportProvider.java | 2 +- .../client/ClientTransportEndpoint.java | 23 +----- .../client/ClientTransportProvider.java | 5 +- .../server/ServerTransportProvider.java | 26 +++--- 8 files changed, 112 insertions(+), 75 deletions(-) diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java index 3b766e80e81..b7c3fd6965e 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java @@ -23,6 +23,7 @@ public interface RpcProvider extends TransportMultiplexier { void setMessageSerializer(MessageSerializer messageSerializer); MessageSerializer getMessageSerializer(); + boolean initialize(); void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint); void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint); @@ -33,9 +34,6 @@ public interface RpcProvider extends TransportMultiplexier { // // low-level public API // - RpcClientCall newCall(TransportEndpoint sourceEndpoint, TransportAddressMapper targetAddress); - RpcClientCall newCall(TransportEndpoint sourceEndpoint, String targetAddress); - void registerCall(RpcClientCall call); void cancelCall(RpcClientCall call); diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java index d4355fa246a..409be59dac4 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java @@ -24,9 +24,13 @@ import java.util.List; import java.util.Map; public class RpcProviderImpl implements RpcProvider { + public static final String RPC_MULTIPLEXIER = "rpc"; private TransportProvider _transportProvider; - private MessageSerializer _messageSerializer; + private String _transportAddress; + private RpcTransportEndpoint _transportEndpoint = new RpcTransportEndpoint(); // transport attachment at RPC layer + + private MessageSerializer _messageSerializer = new JsonMessageSerializer(); // default message serializer private List _serviceEndpoints = new ArrayList(); private Map _outstandingCalls = new HashMap(); @@ -60,6 +64,7 @@ public class RpcProviderImpl implements RpcProvider { @Override public void setMessageSerializer(MessageSerializer messageSerializer) { + assert(messageSerializer != null); _messageSerializer = messageSerializer; } @@ -67,6 +72,17 @@ public class RpcProviderImpl implements RpcProvider { public MessageSerializer getMessageSerializer() { return _messageSerializer; } + + @Override + public boolean initialize() { + assert(_transportProvider != null); + if(_transportProvider == null) + return false; + + TransportEndpointSite endpointSite = _transportProvider.attach(_transportEndpoint, "RpcProvider"); + endpointSite.registerMultiplexier(RPC_MULTIPLEXIER, this); + return true; + } @Override public void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint) { @@ -83,40 +99,38 @@ public class RpcProviderImpl implements RpcProvider { } @Override - public RpcClientCall newCall(TransportEndpoint sourceEndpoint, String targetAddress) { + public RpcClientCall newCall(String targetAddress) { + long callTag = getNextCallTag(); RpcClientCallImpl call = new RpcClientCallImpl(this); - call.setSourceAddress(sourceEndpoint.getEndpointAddress()); + call.setSourceAddress(_transportAddress); call.setTargetAddress(targetAddress); call.setCallTag(callTag); - return call; - } - - @Override - public RpcClientCall newCall(TransportEndpoint sourceEndpoint, TransportAddressMapper targetAddress) { - long callTag = getNextCallTag(); - RpcClientCallImpl call = new RpcClientCallImpl(this); - call.setSourceAddress(sourceEndpoint.getEndpointAddress()); - call.setTargetAddress(targetAddress.getAddress()); - call.setCallTag(callTag); + RpcCallRequestPdu pdu = new RpcCallRequestPdu(); + pdu.setCommand(call.getCommand()); + pdu.setRequestTag(callTag); + pdu.setRequestStartTick(System.currentTimeMillis()); + + String serializedCmdArg; + if(call.getCommandArg() != null) + serializedCmdArg = _messageSerializer.serializeTo(call.getCommandArg().getClass(), call.getCommandArg()); + else + serializedCmdArg = _messageSerializer.serializeTo(Object.class, null); + pdu.setSerializedCommandArg(serializedCmdArg); + + String serializedPdu = _messageSerializer.serializeTo(RpcCallRequestPdu.class, pdu); + _transportProvider.sendMessage(_transportAddress, targetAddress, RPC_MULTIPLEXIER, + serializedPdu); return call; } - @Override - public RpcClientCall newCall(String targetAddress) { - - // ??? - return null; - } - @Override public RpcClientCall newCall(TransportAddressMapper targetAddress) { return newCall(targetAddress.getAddress()); } - @Override public void registerCall(RpcClientCall call) { assert(call != null); @@ -210,4 +224,27 @@ public class RpcProviderImpl implements RpcProvider { } } } + + private class RpcTransportEndpoint implements TransportEndpoint { + + @Override + public void onTransportMessage(String senderEndpointAddress, + String targetEndpointAddress, String multiplexer, String message) { + + // we won't handle generic transport message toward RPC transport endpoint + } + + @Override + public void onAttachConfirm(boolean bSuccess, String endpointAddress) { + if(bSuccess) + _transportAddress = endpointAddress; + + } + + @Override + public void onDetachIndication(String endpointAddress) { + if(_transportAddress != null && _transportAddress.equals(endpointAddress)) + _transportAddress = null; + } + } } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java index 0996bfddb7f..fedfb353b91 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java @@ -18,15 +18,7 @@ */ package org.apache.cloudstack.framework.messaging; -public interface TransportEndpoint { - String getEndpointAddress(); - +public interface TransportEndpoint extends TransportMultiplexier { void onAttachConfirm(boolean bSuccess, String endpointAddress); void onDetachIndication(String endpointAddress); - - void registerMultiplexier(String name, TransportMultiplexier multiplexier); - void unregisterMultiplexier(String name); - - void sendMessage(String soureEndpointAddress, String targetEndpointAddress, - String multiplexier, String message); } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java index 4af777271f3..ca6155be39c 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpointSite.java @@ -19,13 +19,16 @@ package org.apache.cloudstack.framework.messaging; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; public class TransportEndpointSite { private TransportEndpoint _endpoint; private TransportAddress _address; private List _outputQueue = new ArrayList(); + private Map _multiplexierMap = new HashMap(); public TransportEndpointSite(TransportEndpoint endpoint, TransportAddress address) { assert(endpoint != null); @@ -47,6 +50,19 @@ public class TransportEndpointSite { _address = address; } + public void registerMultiplexier(String name, TransportMultiplexier multiplexier) { + assert(name != null); + assert(multiplexier != null); + assert(_multiplexierMap.get(name) == null); + + _multiplexierMap.put(name, multiplexier); + } + + public void unregisterMultiplexier(String name) { + assert(name != null); + _multiplexierMap.remove(name); + } + public void addOutputPdu(TransportPdu pdu) { synchronized(this) { _outputQueue.add(pdu); @@ -66,8 +82,26 @@ public class TransportEndpointSite { private void processOutput() { TransportPdu pdu; - while((pdu = getNextOutputPdu()) != null) { - // ??? + TransportEndpoint endpoint = getEndpoint(); + + if(endpoint != null) { + while((pdu = getNextOutputPdu()) != null) { + if(pdu instanceof TransportDataPdu) { + String multiplexierName = ((TransportDataPdu) pdu).getMultiplexier(); + TransportMultiplexier multiplexier = getRoutedMultiplexier(multiplexierName); + assert(multiplexier != null); + multiplexier.onTransportMessage(pdu.getSourceAddress(), pdu.getDestAddress(), + multiplexierName, ((TransportDataPdu) pdu).getContent()); + } + } } } + + private TransportMultiplexier getRoutedMultiplexier(String multiplexierName) { + TransportMultiplexier multiplexier = _multiplexierMap.get(multiplexierName); + if(multiplexier == null) + multiplexier = _endpoint; + + return multiplexier; + } } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java index 132aa9a76da..bdbdd179317 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java @@ -19,7 +19,7 @@ package org.apache.cloudstack.framework.messaging; public interface TransportProvider { - boolean attach(TransportEndpoint endpoint, String predefinedAddress); + TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress); boolean detach(TransportEndpoint endpoint); void sendMessage(String soureEndpointAddress, String targetEndpointAddress, diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java index a1c345ede84..44c8060a990 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java @@ -19,16 +19,9 @@ package org.apache.cloudstack.framework.messaging.client; import org.apache.cloudstack.framework.messaging.TransportEndpoint; -import org.apache.cloudstack.framework.messaging.TransportMultiplexier; public class ClientTransportEndpoint implements TransportEndpoint { - @Override - public String getEndpointAddress() { - // ??? - return ""; - } - @Override public void onAttachConfirm(boolean bSuccess, String endpointAddress) { // TODO Auto-generated method stub @@ -39,19 +32,9 @@ public class ClientTransportEndpoint implements TransportEndpoint { } @Override - public void registerMultiplexier(String name, - TransportMultiplexier multiplexier) { - // TODO Auto-generated method stub - } - - @Override - public void unregisterMultiplexier(String name) { - // TODO Auto-generated method stub - } - - @Override - public void sendMessage(String sourceEndpointAddress, - String targetEndpointAddress, String multiplexier, String message) { + public void onTransportMessage(String senderEndpointAddress, + String targetEndpointAddress, String multiplexer, String message) { // TODO Auto-generated method stub + } } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java index 2189c7ab165..60c07c3f1b5 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java @@ -19,14 +19,15 @@ package org.apache.cloudstack.framework.messaging.client; import org.apache.cloudstack.framework.messaging.TransportEndpoint; +import org.apache.cloudstack.framework.messaging.TransportEndpointSite; import org.apache.cloudstack.framework.messaging.TransportProvider; public class ClientTransportProvider implements TransportProvider { @Override - public boolean attach(TransportEndpoint endpoint, String predefinedAddress) { + public TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress) { // TODO Auto-generated method stub - return false; + return null; } @Override diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java index 02674cbf7b4..3372b75c361 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java @@ -45,7 +45,7 @@ public class ServerTransportProvider implements TransportProvider { } @Override - public boolean attach(TransportEndpoint endpoint, String predefinedAddress) { + public TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress) { TransportAddress transportAddress; String endpointId; @@ -62,35 +62,26 @@ public class ServerTransportProvider implements TransportProvider { endpointSite = _endpointMap.get(endpointId); if(endpointSite != null) { // already attached - return false; + return endpointSite; } endpointSite = new TransportEndpointSite(endpoint, transportAddress); _endpointMap.put(endpointId, endpointSite); } endpoint.onAttachConfirm(true, transportAddress.toString()); - return true; + return endpointSite; } @Override public boolean detach(TransportEndpoint endpoint) { - TransportAddress transportAddress = TransportAddress.fromAddressString(endpoint.getEndpointAddress()); - if(transportAddress == null) - return false; - - boolean found = false; synchronized(this) { - TransportEndpointSite endpointSite = _endpointMap.get(transportAddress.getEndpointId()); - if(endpointSite.getAddress().equals(transportAddress)) { - found = true; - _endpointMap.remove(transportAddress.getEndpointId()); + for(Map.Entry entry : _endpointMap.entrySet()) { + if(entry.getValue().getEndpoint() == endpoint) { + _endpointMap.remove(entry.getKey()); + return true; + } } } - - if(found) { - endpoint.onDetachIndication(endpoint.getEndpointAddress()); - return true; - } return false; } @@ -122,6 +113,7 @@ public class ServerTransportProvider implements TransportProvider { endpointSite.addOutputPdu(pdu); } else { // do cross-node forwarding + // ??? } }