Add transport implementation for RPC/Async framework

This commit is contained in:
Kelven Yang 2012-12-10 18:10:53 -08:00
parent cdf5511664
commit 98a1295fb3
12 changed files with 411 additions and 28 deletions

View File

@ -24,16 +24,16 @@ public interface RpcProvider extends TransportMultiplexier {
void setMessageSerializer(MessageSerializer messageSerializer);
MessageSerializer getMessageSerializer();
void registerRpcServiceEndpoint(String serviceAddress, RpcServiceEndpoint rpcEndpoint);
void unregisteRpcServiceEndpoint(RpcAddressable serviceAddress, RpcServiceEndpoint rpcEndpoint);
void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint);
void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint);
RpcClientCall newCall(String targetAddress);
RpcClientCall newCall(RpcAddressable targetAddress);
RpcClientCall newCall(TransportAddressMapper targetAddress);
//
// low-level public API
//
RpcClientCall newCall(TransportEndpoint sourceEndpoint, RpcAddressable targetAddress);
RpcClientCall newCall(TransportEndpoint sourceEndpoint, TransportAddressMapper targetAddress);
RpcClientCall newCall(TransportEndpoint sourceEndpoint, String targetAddress);
void registerCall(RpcClientCall call);

View File

@ -69,14 +69,14 @@ public class RpcProviderImpl implements RpcProvider {
}
@Override
public void registerRpcServiceEndpoint(String serviceAddress, RpcServiceEndpoint rpcEndpoint) {
public void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint) {
synchronized(_serviceEndpoints) {
_serviceEndpoints.add(rpcEndpoint);
}
}
@Override
public void unregisteRpcServiceEndpoint(RpcAddressable serviceAddress, RpcServiceEndpoint rpcEndpoint) {
public void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint) {
synchronized(_serviceEndpoints) {
_serviceEndpoints.remove(rpcEndpoint);
}
@ -94,7 +94,7 @@ public class RpcProviderImpl implements RpcProvider {
}
@Override
public RpcClientCall newCall(TransportEndpoint sourceEndpoint, RpcAddressable targetAddress) {
public RpcClientCall newCall(TransportEndpoint sourceEndpoint, TransportAddressMapper targetAddress) {
long callTag = getNextCallTag();
RpcClientCallImpl call = new RpcClientCallImpl(this);
call.setSourceAddress(sourceEndpoint.getEndpointAddress());
@ -112,7 +112,7 @@ public class RpcProviderImpl implements RpcProvider {
}
@Override
public RpcClientCall newCall(RpcAddressable targetAddress) {
public RpcClientCall newCall(TransportAddressMapper targetAddress) {
return newCall(targetAddress.getAddress());
}

View File

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
import java.util.Random;
public class TransportAddress {
public final static String LOCAL_SERVICE_NODE = "";
private String _nodeId = LOCAL_SERVICE_NODE;
private String _endpointId;
private int _magic;
public TransportAddress(String nodeId, String endpointId) {
assert(nodeId != null);
assert(endpointId != null);
assert(nodeId.indexOf(".") < 0);
assert(endpointId.indexOf(".") < 0);
_nodeId = nodeId;
_endpointId = endpointId;
_magic = new Random().nextInt();
}
public TransportAddress(String nodeId, String endpointId, int magic) {
assert(nodeId != null);
assert(endpointId != null);
assert(nodeId.indexOf(".") < 0);
assert(endpointId.indexOf(".") < 0);
_nodeId = nodeId;
_endpointId = endpointId;
_magic = magic;
}
public String getNodeId() {
return _nodeId;
}
public TransportAddress setNodeId(String nodeId) {
_nodeId = nodeId;
return this;
}
public String getEndpointId() {
return _endpointId;
}
public TransportAddress setEndpointId(String endpointId) {
_endpointId = endpointId;
return this;
}
public static TransportAddress fromAddressString(String addressString) {
if(addressString == null || addressString.isEmpty())
return null;
String tokens[] = addressString.split("\\.");
if(tokens.length != 3)
return null;
return new TransportAddress(tokens[0], tokens[1], Integer.parseInt(tokens[2]));
}
public static TransportAddress getLocalPredefinedTransportAddress(String predefinedIdentifier) {
return new TransportAddress(LOCAL_SERVICE_NODE, predefinedIdentifier, 0);
}
@Override
public int hashCode() {
int hashCode = _magic;
hashCode = (hashCode << 3) ^ _nodeId.hashCode();
hashCode = (hashCode << 3) ^ _endpointId.hashCode();
return hashCode;
}
@Override
public boolean equals(Object other) {
if(other == null)
return false;
if(!(other instanceof TransportAddress))
return false;
if(this == other)
return true;
return _nodeId.equals(((TransportAddress)other)._nodeId) &&
_endpointId.equals(((TransportAddress)other)._endpointId) &&
_magic == ((TransportAddress)other)._magic;
}
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
if(_nodeId != null)
sb.append(_nodeId);
sb.append(".");
sb.append(_endpointId);
sb.append(".");
sb.append(_magic);
return sb.toString();
}
}

View File

@ -18,6 +18,6 @@
*/
package org.apache.cloudstack.framework.messaging;
public interface RpcAddressable {
public interface TransportAddressMapper {
String getAddress();
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
@OnwireName(name="TransportDataPdu")
public class TransportDataPdu extends TransportPdu {
private String _multiplexier;
private String _content;
public TransportDataPdu() {
}
public String getMultiplexier() {
return _multiplexier;
}
public void setMultiplexier(String multiplexier) {
_multiplexier = multiplexier;
}
public String getContent() {
return _content;
}
public void setContent(String content) {
_content = content;
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
import java.util.ArrayList;
import java.util.List;
public class TransportEndpointSite {
private TransportEndpoint _endpoint;
private TransportAddress _address;
private List<TransportPdu> _outputQueue = new ArrayList<TransportPdu>();
public TransportEndpointSite(TransportEndpoint endpoint, TransportAddress address) {
assert(endpoint != null);
assert(address != null);
_endpoint = endpoint;
_address = address;
}
public TransportEndpoint getEndpoint() {
return _endpoint;
}
public TransportAddress getAddress() {
return _address;
}
public void setAddress(TransportAddress address) {
_address = address;
}
public void addOutputPdu(TransportPdu pdu) {
synchronized(this) {
_outputQueue.add(pdu);
}
processOutput();
}
public TransportPdu getNextOutputPdu() {
synchronized(this) {
if(_outputQueue.size() > 0)
return _outputQueue.remove(0);
}
return null;
}
private void processOutput() {
TransportPdu pdu;
while((pdu = getNextOutputPdu()) != null) {
// ???
}
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
public class TransportPdu {
protected String _sourceAddress;
protected String _destAddress;
public TransportPdu() {
}
public String getSourceAddress() { return _sourceAddress; }
public void setSourceAddress(String sourceAddress) {
_sourceAddress = sourceAddress;
}
public String getDestAddress() {
return _destAddress;
}
public void setDestAddress(String destAddress) {
_destAddress = destAddress;
}
}

View File

@ -19,8 +19,8 @@
package org.apache.cloudstack.framework.messaging;
public interface TransportProvider {
void attach(TransportEndpoint endpoint, String predefinedAddress);
void detach(TransportEndpoint endpoint);
boolean attach(TransportEndpoint endpoint, String predefinedAddress);
boolean detach(TransportEndpoint endpoint);
void sendMessage(String soureEndpointAddress, String targetEndpointAddress,
String multiplexier, String message);

View File

@ -24,13 +24,16 @@ import org.apache.cloudstack.framework.messaging.TransportProvider;
public class ClientTransportProvider implements TransportProvider {
@Override
public void attach(TransportEndpoint endpoint, String predefinedAddress) {
public boolean attach(TransportEndpoint endpoint, String predefinedAddress) {
// TODO Auto-generated method stub
return false;
}
@Override
public void detach(TransportEndpoint endpoint) {
public boolean detach(TransportEndpoint endpoint) {
// TODO Auto-generated method stub
return false;
}
@Override
@ -38,5 +41,4 @@ public class ClientTransportProvider implements TransportProvider {
String multiplexier, String message) {
// TODO
}
}

View File

@ -18,26 +18,121 @@
*/
package org.apache.cloudstack.framework.messaging.server;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.cloudstack.framework.messaging.TransportAddress;
import org.apache.cloudstack.framework.messaging.TransportDataPdu;
import org.apache.cloudstack.framework.messaging.TransportEndpoint;
import org.apache.cloudstack.framework.messaging.TransportEndpointSite;
import org.apache.cloudstack.framework.messaging.TransportPdu;
import org.apache.cloudstack.framework.messaging.TransportProvider;
public class ServerTransportProvider implements TransportProvider {
private String _nodeId;
@Override
public void attach(TransportEndpoint endpoint, String predefinedAddress) {
// TODO Auto-generated method stub
private Map<String, TransportEndpointSite> _endpointMap = new HashMap<String, TransportEndpointSite>();
private int _nextEndpointId = new Random().nextInt();
public ServerTransportProvider() {
}
@Override
public void detach(TransportEndpoint endpoint) {
// TODO Auto-generated method stub
public String getNodeId() { return _nodeId; }
public void setNodeId(String nodeId) {
_nodeId = nodeId;
}
@Override
public void sendMessage(String soureEndpointAddress, String targetEndpointAddress,
public boolean attach(TransportEndpoint endpoint, String predefinedAddress) {
TransportAddress transportAddress;
String endpointId;
if(predefinedAddress != null && !predefinedAddress.isEmpty()) {
endpointId = predefinedAddress;
transportAddress = new TransportAddress(_nodeId, endpointId, 0);
} else {
endpointId = String.valueOf(getNextEndpointId());
transportAddress = new TransportAddress(_nodeId, endpointId);
}
TransportEndpointSite endpointSite;
synchronized(this) {
endpointSite = _endpointMap.get(endpointId);
if(endpointSite != null) {
// already attached
return false;
}
endpointSite = new TransportEndpointSite(endpoint, transportAddress);
_endpointMap.put(endpointId, endpointSite);
}
endpoint.onAttachConfirm(true, transportAddress.toString());
return true;
}
@Override
public boolean detach(TransportEndpoint endpoint) {
TransportAddress transportAddress = TransportAddress.fromAddressString(endpoint.getEndpointAddress());
if(transportAddress == null)
return false;
boolean found = false;
synchronized(this) {
TransportEndpointSite endpointSite = _endpointMap.get(transportAddress.getEndpointId());
if(endpointSite.getAddress().equals(transportAddress)) {
found = true;
_endpointMap.remove(transportAddress.getEndpointId());
}
}
if(found) {
endpoint.onDetachIndication(endpoint.getEndpointAddress());
return true;
}
return false;
}
@Override
public void sendMessage(String sourceEndpointAddress, String targetEndpointAddress,
String multiplexier, String message) {
// TODO
TransportDataPdu pdu = new TransportDataPdu();
pdu.setSourceAddress(sourceEndpointAddress);
pdu.setDestAddress(targetEndpointAddress);
pdu.setMultiplexier(multiplexier);
pdu.setContent(message);
dispatchPdu(pdu);
}
private void dispatchPdu(TransportPdu pdu) {
TransportAddress transportAddress = TransportAddress.fromAddressString(pdu.getDestAddress());
if(isLocalAddress(transportAddress)) {
TransportEndpointSite endpointSite = null;
synchronized(this) {
endpointSite = _endpointMap.get(transportAddress.getEndpointId());
}
if(endpointSite != null)
endpointSite.addOutputPdu(pdu);
} else {
// do cross-node forwarding
}
}
private boolean isLocalAddress(TransportAddress address) {
if(address.getNodeId().equals(_nodeId) || address.getNodeId().equals(TransportAddress.LOCAL_SERVICE_NODE))
return true;
return false;
}
private synchronized int getNextEndpointId() {
return _nextEndpointId++;
}
}

View File

@ -19,7 +19,7 @@
package org.apache.cloudstack.framework.messaging;
public class AsyncSampleEventDrivenStyleCaller {
AsyncSampleCallee _ds;
AsyncSampleCallee _ds = new AsyncSampleCallee();
AsyncCallbackDriver _callbackDriver;
public void MethodThatWillCallAsyncMethod() {
@ -28,7 +28,7 @@ public class AsyncSampleEventDrivenStyleCaller {
new AsyncCallbackDispatcher(this)
.setOperationName("volume.create")
.setContextParam("origVolume", vol)
.attachDriver(_callbackDriver));
);
}
@AsyncCallbackHandler(operationName="volume.create")
@ -37,4 +37,9 @@ public class AsyncSampleEventDrivenStyleCaller {
TestVolume resultVol = callback.getResult();
}
public static void main(String[] args) {
AsyncSampleEventDrivenStyleCaller caller = new AsyncSampleEventDrivenStyleCaller();
caller.MethodThatWillCallAsyncMethod();
}
}

View File

@ -28,7 +28,7 @@ public class SampleComponent {
public void init() {
_rpcProvider.registerRpcServiceEndpoint("AgentManager",
_rpcProvider.registerRpcServiceEndpoint(
RpcServiceDispatcher.getDispatcher(this));
// subscribe to all network events (for example)