Finish RPC calling side implementation

This commit is contained in:
Kelven Yang 2012-11-27 20:31:10 -08:00
parent 225ad3c289
commit 1d75063217
9 changed files with 374 additions and 22 deletions

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
@OnwireName(name="RpcRequest")
public class RpcCallRequestPdu {
private long requestTag;
private long requestStartTick;
private String command;
private String serializedCommandArg;
public RpcCallRequestPdu() {
requestTag = 0;
requestStartTick = System.currentTimeMillis();
}
public long getRequestTag() {
return requestTag;
}
public void setRequestTag(long requestTag) {
this.requestTag = requestTag;
}
public long getRequestStartTick() {
return requestStartTick;
}
public void setRequestStartTick(long requestStartTick) {
this.requestStartTick = requestStartTick;
}
public String getCommand() {
return command;
}
public void setCommand(String command) {
this.command = command;
}
public String getSerializedCommandArg() {
return serializedCommandArg;
}
public void setSerializedCommandArg(String serializedCommandArg) {
this.serializedCommandArg = serializedCommandArg;
}
}

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
@OnwireName(name="RpcResponse")
public class RpcCallResponsePdu {
public static final int RESULT_SUCCESSFUL = 0;
public static final int RESULT_HANDLER_NOT_EXIST = 1;
public static final int RESULT_HANDLER_EXCEPTION = 2;
private long requestTag;
private long requestStartTick;
private int result;
private String command;
private String serializedResult;
public RpcCallResponsePdu() {
requestTag = 0;
requestStartTick = 0;
}
public long getRequestTag() {
return requestTag;
}
public void setRequestTag(long requestTag) {
this.requestTag = requestTag;
}
public long getRequestStartTick() {
return requestStartTick;
}
public void setRequestStartTick(long requestStartTick) {
this.requestStartTick = requestStartTick;
}
public int getResult() {
return result;
}
public void setResult(int result) {
this.result = result;
}
public String getCommand() {
return command;
}
public void setCommand(String command) {
this.command = command;
}
public String getSerializedResult() {
return serializedResult;
}
public void setSerializedResult(String serializedResult) {
this.serializedResult = serializedResult;
}
}

View File

@ -19,6 +19,8 @@
package org.apache.cloudstack.framework.messaging;
public interface RpcClientCall {
final static int DEFAULT_RPC_TIMEOUT = 10000;
String getCommand();
RpcClientCall setCommand(String cmd);
RpcClientCall setTimeout(int timeoutMilliseconds);

View File

@ -18,7 +18,9 @@
*/
package org.apache.cloudstack.framework.messaging;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class RpcClientCallImpl implements RpcClientCall {
@ -26,17 +28,30 @@ public class RpcClientCallImpl implements RpcClientCall {
private String _command;
private Object _commandArg;
private int _timeoutMilliseconds;
private int _timeoutMilliseconds = DEFAULT_RPC_TIMEOUT;
private Map<String, Object> _contextParams = new HashMap<String, Object>();
private boolean _oneway = false;
public RpcClientCallImpl() {
private List<RpcCallbackListener> _callbackListeners = new ArrayList<RpcCallbackListener>();
private RpcProvider _rpcProvider;
private long _startTickInMs;
private long _callTag;
private String _sourceAddress;
private String _targetAddress;
private Object _responseLock = new Object();
private boolean _responseDone = false;;
private Object _responseResult;
public RpcClientCallImpl(RpcProvider rpcProvider) {
assert(rpcProvider != null);
_rpcProvider = rpcProvider;
}
@Override
public String getCommand() {
// TODO Auto-generated method stub
return null;
return _command;
}
@Override
@ -71,37 +86,126 @@ public class RpcClientCallImpl implements RpcClientCall {
@Override
public Object getContextParam(String key) {
// TODO Auto-generated method stub
return null;
return _contextParams.get(key);
}
@Override
public <T> RpcClientCall addCallbackListener(RpcCallbackListener<T> listener) {
// TODO Auto-generated method stub
return null;
assert(listener != null);
_callbackListeners.add(listener);
return this;
}
@Override
public RpcClientCall setOneway() {
// TODO Auto-generated method stub
return null;
_oneway = true;
return this;
}
public String getSourceAddress() {
return _sourceAddress;
}
public void setSourceAddress(String sourceAddress) {
_sourceAddress = sourceAddress;
}
public String getTargetAddress() {
return _targetAddress;
}
public void setTargetAddress(String targetAddress) {
_targetAddress = targetAddress;
}
public long getCallTag() {
return _callTag;
}
public void setCallTag(long callTag) {
_callTag = callTag;
}
@Override
public void apply() {
// TODO Auto-generated method stub
// sanity check
assert(_sourceAddress != null);
assert(_targetAddress != null);
if(!_oneway)
_rpcProvider.registerCall(this);
RpcCallRequestPdu pdu = new RpcCallRequestPdu();
pdu.setCommand(getCommand());
if(_commandArg != null)
pdu.setSerializedCommandArg(_rpcProvider.getMessageSerializer().serializeTo(_commandArg.getClass(), _commandArg));
pdu.setRequestTag(this.getCallTag());
_rpcProvider.sendRpcPdu(getSourceAddress(), getTargetAddress(),
_rpcProvider.getMessageSerializer().serializeTo(RpcCallRequestPdu.class, pdu));
}
@Override
public void cancel() {
// TODO Auto-generated method stub
_rpcProvider.cancelCall(this);
}
@Override
public <T> T get() {
// TODO Auto-generated method stub
if(!_oneway) {
synchronized(_responseLock) {
if(!_responseDone) {
long timeToWait = _timeoutMilliseconds - (System.currentTimeMillis() - _startTickInMs);
if(timeToWait < 0)
timeToWait = 0;
try {
_responseLock.wait(timeToWait);
} catch (InterruptedException e) {
throw new RpcTimeoutException("RPC call timed out");
}
}
assert(_responseDone);
if(_responseResult == null)
return null;
if(_responseResult instanceof RpcException)
throw (RpcException)_responseResult;
assert(_rpcProvider.getMessageSerializer() != null);
assert(_responseResult instanceof String);
return _rpcProvider.getMessageSerializer().serializeFrom((String)_responseResult);
}
}
return null;
}
public void complete(String result) {
_responseResult = result;
synchronized(_responseLock) {
_responseDone = true;
_responseLock.notifyAll();
}
assert(_rpcProvider.getMessageSerializer() != null);
Object resultObject = _rpcProvider.getMessageSerializer().serializeFrom(result);
for(RpcCallbackListener listener: _callbackListeners)
listener.onSuccess(resultObject);
}
public void complete(RpcException e) {
_responseResult = e;
synchronized(_responseLock) {
_responseDone = true;
_responseLock.notifyAll();
}
for(RpcCallbackListener listener: _callbackListeners)
listener.onFailure(e);
}
}

View File

@ -19,11 +19,17 @@
package org.apache.cloudstack.framework.messaging;
public interface RpcProvider extends TransportMultiplexier {
final static String RPC_MULTIPLEXIER = "rpc";
void setMessageSerializer(MessageSerializer messageSerializer);
MessageSerializer getMessageSerializer();
void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint);
void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint);
RpcClientCall target(String target);
RpcClientCall newCall(String sourceAddress, String targetAddress);
void registerCall(RpcClientCall call);
void cancelCall(RpcClientCall call);
void sendRpcPdu(String sourceAddress, String targetAddress, String serializedPdu);
}

View File

@ -19,13 +19,18 @@
package org.apache.cloudstack.framework.messaging;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class RpcProviderImpl implements RpcProvider {
private TransportProvider _transportProvider;
private MessageSerializer _messageSerializer;
private List<RpcServiceEndpoint> _serviceEndpoints = new ArrayList<RpcServiceEndpoint>();
private TransportProvider _transportProvider;
private Map<Long, RpcClientCall> _outstandingCalls = new HashMap<Long, RpcClientCall>();
private long _nextCallTag = System.currentTimeMillis();
public RpcProviderImpl() {
}
@ -41,9 +46,16 @@ public class RpcProviderImpl implements RpcProvider {
@Override
public void onTransportMessage(String senderEndpointAddress,
String targetEndpointAddress, String multiplexer, String message) {
// TODO Auto-generated method stub
assert(_messageSerializer != null);
Object pdu = _messageSerializer.serializeFrom(message);
if(pdu instanceof RpcCallRequestPdu) {
handleCallRequestPdu((RpcCallRequestPdu)pdu);
} else if(pdu instanceof RpcCallResponsePdu) {
handleCallResponsePdu((RpcCallResponsePdu)pdu);
} else {
assert(false);
}
}
@Override
@ -71,8 +83,76 @@ public class RpcProviderImpl implements RpcProvider {
}
@Override
public RpcClientCall target(String target) {
// TODO Auto-generated method stub
return null;
public RpcClientCall newCall(String sourceAddress, String targetAddress) {
long callTag = getNextCallTag();
RpcClientCallImpl call = new RpcClientCallImpl(this);
call.setSourceAddress(sourceAddress);
call.setTargetAddress(targetAddress);
call.setCallTag(callTag);
return call;
}
@Override
public void registerCall(RpcClientCall call) {
assert(call != null);
synchronized(this) {
_outstandingCalls.put(((RpcClientCallImpl)call).getCallTag(), call);
}
}
@Override
public void cancelCall(RpcClientCall call) {
synchronized(this) {
_outstandingCalls.remove(((RpcClientCallImpl)call).getCallTag());
}
((RpcClientCallImpl)call).complete(new RpcException("Call is cancelled"));
}
@Override
public void sendRpcPdu(String sourceAddress, String targetAddress, String serializedPdu) {
assert(_transportProvider != null);
_transportProvider.sendMessage(sourceAddress, targetAddress, this.RPC_MULTIPLEXIER, serializedPdu);
}
protected synchronized long getNextCallTag() {
long tag = _nextCallTag++;
if(tag == 0)
tag++;
return tag;
}
private void handleCallRequestPdu(RpcCallRequestPdu pdu) {
// ???
}
private void handleCallResponsePdu(RpcCallResponsePdu pdu) {
RpcClientCallImpl call = null;
synchronized(this) {
call = (RpcClientCallImpl)_outstandingCalls.remove(pdu.getRequestTag());
}
if(call != null) {
switch(pdu.getResult()) {
case RpcCallResponsePdu.RESULT_SUCCESSFUL :
call.complete(pdu.getSerializedResult());
break;
case RpcCallResponsePdu.RESULT_HANDLER_NOT_EXIST :
call.complete(new RpcException("Handler does not exist"));
break;
case RpcCallResponsePdu.RESULT_HANDLER_EXCEPTION :
call.complete(new RpcException("Exception in handler"));
break;
default :
assert(false);
break;
}
}
}
}

View File

@ -21,4 +21,7 @@ package org.apache.cloudstack.framework.messaging;
public interface TransportProvider {
void attach(TransportEndpoint endpoint, String predefinedAddress);
void detach(TransportEndpoint endpoint);
void sendMessage(String soureEndpointAddress, String targetEndpointAddress,
String multiplexier, String message);
}

View File

@ -32,4 +32,11 @@ public class ClientTransportProvider implements TransportProvider {
public void detach(TransportEndpoint endpoint) {
// TODO Auto-generated method stub
}
@Override
public void sendMessage(String soureEndpointAddress, String targetEndpointAddress,
String multiplexier, String message) {
// TODO
}
}

View File

@ -34,4 +34,10 @@ public class ServerTransportProvider implements TransportProvider {
// TODO Auto-generated method stub
}
@Override
public void sendMessage(String soureEndpointAddress, String targetEndpointAddress,
String multiplexier, String message) {
// TODO
}
}