Hook server side RPC provider with a server side transport

This commit is contained in:
Kelven Yang 2012-12-11 17:53:46 -08:00
parent 98a1295fb3
commit f52950689b
8 changed files with 112 additions and 75 deletions

View File

@ -23,6 +23,7 @@ public interface RpcProvider extends TransportMultiplexier {
void setMessageSerializer(MessageSerializer messageSerializer); void setMessageSerializer(MessageSerializer messageSerializer);
MessageSerializer getMessageSerializer(); MessageSerializer getMessageSerializer();
boolean initialize();
void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint); void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint);
void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint); void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint);
@ -33,9 +34,6 @@ public interface RpcProvider extends TransportMultiplexier {
// //
// low-level public API // low-level public API
// //
RpcClientCall newCall(TransportEndpoint sourceEndpoint, TransportAddressMapper targetAddress);
RpcClientCall newCall(TransportEndpoint sourceEndpoint, String targetAddress);
void registerCall(RpcClientCall call); void registerCall(RpcClientCall call);
void cancelCall(RpcClientCall call); void cancelCall(RpcClientCall call);

View File

@ -24,9 +24,13 @@ import java.util.List;
import java.util.Map; import java.util.Map;
public class RpcProviderImpl implements RpcProvider { public class RpcProviderImpl implements RpcProvider {
public static final String RPC_MULTIPLEXIER = "rpc";
private TransportProvider _transportProvider; private TransportProvider _transportProvider;
private MessageSerializer _messageSerializer; private String _transportAddress;
private RpcTransportEndpoint _transportEndpoint = new RpcTransportEndpoint(); // transport attachment at RPC layer
private MessageSerializer _messageSerializer = new JsonMessageSerializer(); // default message serializer
private List<RpcServiceEndpoint> _serviceEndpoints = new ArrayList<RpcServiceEndpoint>(); private List<RpcServiceEndpoint> _serviceEndpoints = new ArrayList<RpcServiceEndpoint>();
private Map<Long, RpcClientCall> _outstandingCalls = new HashMap<Long, RpcClientCall>(); private Map<Long, RpcClientCall> _outstandingCalls = new HashMap<Long, RpcClientCall>();
@ -60,6 +64,7 @@ public class RpcProviderImpl implements RpcProvider {
@Override @Override
public void setMessageSerializer(MessageSerializer messageSerializer) { public void setMessageSerializer(MessageSerializer messageSerializer) {
assert(messageSerializer != null);
_messageSerializer = messageSerializer; _messageSerializer = messageSerializer;
} }
@ -68,6 +73,17 @@ public class RpcProviderImpl implements RpcProvider {
return _messageSerializer; return _messageSerializer;
} }
@Override
public boolean initialize() {
assert(_transportProvider != null);
if(_transportProvider == null)
return false;
TransportEndpointSite endpointSite = _transportProvider.attach(_transportEndpoint, "RpcProvider");
endpointSite.registerMultiplexier(RPC_MULTIPLEXIER, this);
return true;
}
@Override @Override
public void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint) { public void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint) {
synchronized(_serviceEndpoints) { synchronized(_serviceEndpoints) {
@ -83,40 +99,38 @@ public class RpcProviderImpl implements RpcProvider {
} }
@Override @Override
public RpcClientCall newCall(TransportEndpoint sourceEndpoint, String targetAddress) { public RpcClientCall newCall(String targetAddress) {
long callTag = getNextCallTag(); long callTag = getNextCallTag();
RpcClientCallImpl call = new RpcClientCallImpl(this); RpcClientCallImpl call = new RpcClientCallImpl(this);
call.setSourceAddress(sourceEndpoint.getEndpointAddress()); call.setSourceAddress(_transportAddress);
call.setTargetAddress(targetAddress); call.setTargetAddress(targetAddress);
call.setCallTag(callTag); call.setCallTag(callTag);
return call; RpcCallRequestPdu pdu = new RpcCallRequestPdu();
} pdu.setCommand(call.getCommand());
pdu.setRequestTag(callTag);
pdu.setRequestStartTick(System.currentTimeMillis());
@Override String serializedCmdArg;
public RpcClientCall newCall(TransportEndpoint sourceEndpoint, TransportAddressMapper targetAddress) { if(call.getCommandArg() != null)
long callTag = getNextCallTag(); serializedCmdArg = _messageSerializer.serializeTo(call.getCommandArg().getClass(), call.getCommandArg());
RpcClientCallImpl call = new RpcClientCallImpl(this); else
call.setSourceAddress(sourceEndpoint.getEndpointAddress()); serializedCmdArg = _messageSerializer.serializeTo(Object.class, null);
call.setTargetAddress(targetAddress.getAddress()); pdu.setSerializedCommandArg(serializedCmdArg);
call.setCallTag(callTag);
String serializedPdu = _messageSerializer.serializeTo(RpcCallRequestPdu.class, pdu);
_transportProvider.sendMessage(_transportAddress, targetAddress, RPC_MULTIPLEXIER,
serializedPdu);
return call; return call;
} }
@Override
public RpcClientCall newCall(String targetAddress) {
// ???
return null;
}
@Override @Override
public RpcClientCall newCall(TransportAddressMapper targetAddress) { public RpcClientCall newCall(TransportAddressMapper targetAddress) {
return newCall(targetAddress.getAddress()); return newCall(targetAddress.getAddress());
} }
@Override @Override
public void registerCall(RpcClientCall call) { public void registerCall(RpcClientCall call) {
assert(call != null); assert(call != null);
@ -210,4 +224,27 @@ public class RpcProviderImpl implements RpcProvider {
} }
} }
} }
private class RpcTransportEndpoint implements TransportEndpoint {
@Override
public void onTransportMessage(String senderEndpointAddress,
String targetEndpointAddress, String multiplexer, String message) {
// we won't handle generic transport message toward RPC transport endpoint
}
@Override
public void onAttachConfirm(boolean bSuccess, String endpointAddress) {
if(bSuccess)
_transportAddress = endpointAddress;
}
@Override
public void onDetachIndication(String endpointAddress) {
if(_transportAddress != null && _transportAddress.equals(endpointAddress))
_transportAddress = null;
}
}
} }

View File

@ -18,15 +18,7 @@
*/ */
package org.apache.cloudstack.framework.messaging; package org.apache.cloudstack.framework.messaging;
public interface TransportEndpoint { public interface TransportEndpoint extends TransportMultiplexier {
String getEndpointAddress();
void onAttachConfirm(boolean bSuccess, String endpointAddress); void onAttachConfirm(boolean bSuccess, String endpointAddress);
void onDetachIndication(String endpointAddress); void onDetachIndication(String endpointAddress);
void registerMultiplexier(String name, TransportMultiplexier multiplexier);
void unregisterMultiplexier(String name);
void sendMessage(String soureEndpointAddress, String targetEndpointAddress,
String multiplexier, String message);
} }

View File

@ -19,13 +19,16 @@
package org.apache.cloudstack.framework.messaging; package org.apache.cloudstack.framework.messaging;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
public class TransportEndpointSite { public class TransportEndpointSite {
private TransportEndpoint _endpoint; private TransportEndpoint _endpoint;
private TransportAddress _address; private TransportAddress _address;
private List<TransportPdu> _outputQueue = new ArrayList<TransportPdu>(); private List<TransportPdu> _outputQueue = new ArrayList<TransportPdu>();
private Map<String, TransportMultiplexier> _multiplexierMap = new HashMap<String, TransportMultiplexier>();
public TransportEndpointSite(TransportEndpoint endpoint, TransportAddress address) { public TransportEndpointSite(TransportEndpoint endpoint, TransportAddress address) {
assert(endpoint != null); assert(endpoint != null);
@ -47,6 +50,19 @@ public class TransportEndpointSite {
_address = address; _address = address;
} }
public void registerMultiplexier(String name, TransportMultiplexier multiplexier) {
assert(name != null);
assert(multiplexier != null);
assert(_multiplexierMap.get(name) == null);
_multiplexierMap.put(name, multiplexier);
}
public void unregisterMultiplexier(String name) {
assert(name != null);
_multiplexierMap.remove(name);
}
public void addOutputPdu(TransportPdu pdu) { public void addOutputPdu(TransportPdu pdu) {
synchronized(this) { synchronized(this) {
_outputQueue.add(pdu); _outputQueue.add(pdu);
@ -66,8 +82,26 @@ public class TransportEndpointSite {
private void processOutput() { private void processOutput() {
TransportPdu pdu; TransportPdu pdu;
TransportEndpoint endpoint = getEndpoint();
if(endpoint != null) {
while((pdu = getNextOutputPdu()) != null) { while((pdu = getNextOutputPdu()) != null) {
// ??? if(pdu instanceof TransportDataPdu) {
String multiplexierName = ((TransportDataPdu) pdu).getMultiplexier();
TransportMultiplexier multiplexier = getRoutedMultiplexier(multiplexierName);
assert(multiplexier != null);
multiplexier.onTransportMessage(pdu.getSourceAddress(), pdu.getDestAddress(),
multiplexierName, ((TransportDataPdu) pdu).getContent());
} }
} }
} }
}
private TransportMultiplexier getRoutedMultiplexier(String multiplexierName) {
TransportMultiplexier multiplexier = _multiplexierMap.get(multiplexierName);
if(multiplexier == null)
multiplexier = _endpoint;
return multiplexier;
}
}

View File

@ -19,7 +19,7 @@
package org.apache.cloudstack.framework.messaging; package org.apache.cloudstack.framework.messaging;
public interface TransportProvider { public interface TransportProvider {
boolean attach(TransportEndpoint endpoint, String predefinedAddress); TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress);
boolean detach(TransportEndpoint endpoint); boolean detach(TransportEndpoint endpoint);
void sendMessage(String soureEndpointAddress, String targetEndpointAddress, void sendMessage(String soureEndpointAddress, String targetEndpointAddress,

View File

@ -19,16 +19,9 @@
package org.apache.cloudstack.framework.messaging.client; package org.apache.cloudstack.framework.messaging.client;
import org.apache.cloudstack.framework.messaging.TransportEndpoint; import org.apache.cloudstack.framework.messaging.TransportEndpoint;
import org.apache.cloudstack.framework.messaging.TransportMultiplexier;
public class ClientTransportEndpoint implements TransportEndpoint { public class ClientTransportEndpoint implements TransportEndpoint {
@Override
public String getEndpointAddress() {
// ???
return "";
}
@Override @Override
public void onAttachConfirm(boolean bSuccess, String endpointAddress) { public void onAttachConfirm(boolean bSuccess, String endpointAddress) {
// TODO Auto-generated method stub // TODO Auto-generated method stub
@ -39,19 +32,9 @@ public class ClientTransportEndpoint implements TransportEndpoint {
} }
@Override @Override
public void registerMultiplexier(String name, public void onTransportMessage(String senderEndpointAddress,
TransportMultiplexier multiplexier) { String targetEndpointAddress, String multiplexer, String message) {
// TODO Auto-generated method stub // TODO Auto-generated method stub
}
@Override
public void unregisterMultiplexier(String name) {
// TODO Auto-generated method stub
}
@Override
public void sendMessage(String sourceEndpointAddress,
String targetEndpointAddress, String multiplexier, String message) {
// TODO Auto-generated method stub
} }
} }

View File

@ -19,14 +19,15 @@
package org.apache.cloudstack.framework.messaging.client; package org.apache.cloudstack.framework.messaging.client;
import org.apache.cloudstack.framework.messaging.TransportEndpoint; import org.apache.cloudstack.framework.messaging.TransportEndpoint;
import org.apache.cloudstack.framework.messaging.TransportEndpointSite;
import org.apache.cloudstack.framework.messaging.TransportProvider; import org.apache.cloudstack.framework.messaging.TransportProvider;
public class ClientTransportProvider implements TransportProvider { public class ClientTransportProvider implements TransportProvider {
@Override @Override
public boolean attach(TransportEndpoint endpoint, String predefinedAddress) { public TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress) {
// TODO Auto-generated method stub // TODO Auto-generated method stub
return false; return null;
} }
@Override @Override

View File

@ -45,7 +45,7 @@ public class ServerTransportProvider implements TransportProvider {
} }
@Override @Override
public boolean attach(TransportEndpoint endpoint, String predefinedAddress) { public TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress) {
TransportAddress transportAddress; TransportAddress transportAddress;
String endpointId; String endpointId;
@ -62,35 +62,26 @@ public class ServerTransportProvider implements TransportProvider {
endpointSite = _endpointMap.get(endpointId); endpointSite = _endpointMap.get(endpointId);
if(endpointSite != null) { if(endpointSite != null) {
// already attached // already attached
return false; return endpointSite;
} }
endpointSite = new TransportEndpointSite(endpoint, transportAddress); endpointSite = new TransportEndpointSite(endpoint, transportAddress);
_endpointMap.put(endpointId, endpointSite); _endpointMap.put(endpointId, endpointSite);
} }
endpoint.onAttachConfirm(true, transportAddress.toString()); endpoint.onAttachConfirm(true, transportAddress.toString());
return true; return endpointSite;
} }
@Override @Override
public boolean detach(TransportEndpoint endpoint) { public boolean detach(TransportEndpoint endpoint) {
TransportAddress transportAddress = TransportAddress.fromAddressString(endpoint.getEndpointAddress());
if(transportAddress == null)
return false;
boolean found = false;
synchronized(this) { synchronized(this) {
TransportEndpointSite endpointSite = _endpointMap.get(transportAddress.getEndpointId()); for(Map.Entry<String, TransportEndpointSite> entry : _endpointMap.entrySet()) {
if(endpointSite.getAddress().equals(transportAddress)) { if(entry.getValue().getEndpoint() == endpoint) {
found = true; _endpointMap.remove(entry.getKey());
_endpointMap.remove(transportAddress.getEndpointId());
}
}
if(found) {
endpoint.onDetachIndication(endpoint.getEndpointAddress());
return true; return true;
} }
}
}
return false; return false;
} }
@ -122,6 +113,7 @@ public class ServerTransportProvider implements TransportProvider {
endpointSite.addOutputPdu(pdu); endpointSite.addOutputPdu(pdu);
} else { } else {
// do cross-node forwarding // do cross-node forwarding
// ???
} }
} }