From 1d75063217612b0695a9e430ed223ede31ffb7f5 Mon Sep 17 00:00:00 2001 From: Kelven Yang Date: Tue, 27 Nov 2012 20:31:10 -0800 Subject: [PATCH] Finish RPC calling side implementation --- .../messaging/RpcCallRequestPdu.java | 66 +++++++++ .../messaging/RpcCallResponsePdu.java | 78 ++++++++++ .../framework/messaging/RpcClientCall.java | 2 + .../messaging/RpcClientCallImpl.java | 134 ++++++++++++++++-- .../framework/messaging/RpcProvider.java | 8 +- .../framework/messaging/RpcProviderImpl.java | 92 +++++++++++- .../messaging/TransportProvider.java | 3 + .../client/ClientTransportProvider.java | 7 + .../server/ServerTransportProvider.java | 6 + 9 files changed, 374 insertions(+), 22 deletions(-) create mode 100644 framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallRequestPdu.java create mode 100644 framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallResponsePdu.java diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallRequestPdu.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallRequestPdu.java new file mode 100644 index 00000000000..0992116091d --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallRequestPdu.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cloudstack.framework.messaging; + +@OnwireName(name="RpcRequest") +public class RpcCallRequestPdu { + + private long requestTag; + private long requestStartTick; + + private String command; + private String serializedCommandArg; + + public RpcCallRequestPdu() { + requestTag = 0; + requestStartTick = System.currentTimeMillis(); + } + + public long getRequestTag() { + return requestTag; + } + + public void setRequestTag(long requestTag) { + this.requestTag = requestTag; + } + + public long getRequestStartTick() { + return requestStartTick; + } + + public void setRequestStartTick(long requestStartTick) { + this.requestStartTick = requestStartTick; + } + + public String getCommand() { + return command; + } + + public void setCommand(String command) { + this.command = command; + } + + public String getSerializedCommandArg() { + return serializedCommandArg; + } + + public void setSerializedCommandArg(String serializedCommandArg) { + this.serializedCommandArg = serializedCommandArg; + } +} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallResponsePdu.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallResponsePdu.java new file mode 100644 index 00000000000..ca882e9e95b --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallResponsePdu.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cloudstack.framework.messaging; + +@OnwireName(name="RpcResponse") +public class RpcCallResponsePdu { + public static final int RESULT_SUCCESSFUL = 0; + public static final int RESULT_HANDLER_NOT_EXIST = 1; + public static final int RESULT_HANDLER_EXCEPTION = 2; + + private long requestTag; + private long requestStartTick; + + private int result; + private String command; + private String serializedResult; + + public RpcCallResponsePdu() { + requestTag = 0; + requestStartTick = 0; + } + + public long getRequestTag() { + return requestTag; + } + + public void setRequestTag(long requestTag) { + this.requestTag = requestTag; + } + + public long getRequestStartTick() { + return requestStartTick; + } + + public void setRequestStartTick(long requestStartTick) { + this.requestStartTick = requestStartTick; + } + + public int getResult() { + return result; + } + + public void setResult(int result) { + this.result = result; + } + + public String getCommand() { + return command; + } + + public void setCommand(String command) { + this.command = command; + } + + public String getSerializedResult() { + return serializedResult; + } + + public void setSerializedResult(String serializedResult) { + this.serializedResult = serializedResult; + } +} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java index 45825687664..d2adbfda952 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java @@ -19,6 +19,8 @@ package org.apache.cloudstack.framework.messaging; public interface RpcClientCall { + final static int DEFAULT_RPC_TIMEOUT = 10000; + String getCommand(); RpcClientCall setCommand(String cmd); RpcClientCall setTimeout(int timeoutMilliseconds); diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java index 5dd1d9426eb..574a273af37 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java @@ -18,7 +18,9 @@ */ package org.apache.cloudstack.framework.messaging; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; public class RpcClientCallImpl implements RpcClientCall { @@ -26,17 +28,30 @@ public class RpcClientCallImpl implements RpcClientCall { private String _command; private Object _commandArg; - private int _timeoutMilliseconds; - + private int _timeoutMilliseconds = DEFAULT_RPC_TIMEOUT; private Map _contextParams = new HashMap(); + private boolean _oneway = false; - public RpcClientCallImpl() { + private List _callbackListeners = new ArrayList(); + + private RpcProvider _rpcProvider; + private long _startTickInMs; + private long _callTag; + private String _sourceAddress; + private String _targetAddress; + + private Object _responseLock = new Object(); + private boolean _responseDone = false;; + private Object _responseResult; + + public RpcClientCallImpl(RpcProvider rpcProvider) { + assert(rpcProvider != null); + _rpcProvider = rpcProvider; } @Override public String getCommand() { - // TODO Auto-generated method stub - return null; + return _command; } @Override @@ -71,37 +86,126 @@ public class RpcClientCallImpl implements RpcClientCall { @Override public Object getContextParam(String key) { - // TODO Auto-generated method stub - return null; + return _contextParams.get(key); } @Override public RpcClientCall addCallbackListener(RpcCallbackListener listener) { - // TODO Auto-generated method stub - return null; + assert(listener != null); + _callbackListeners.add(listener); + return this; } @Override public RpcClientCall setOneway() { - // TODO Auto-generated method stub - return null; + _oneway = true; + return this; + } + + public String getSourceAddress() { + return _sourceAddress; + } + + public void setSourceAddress(String sourceAddress) { + _sourceAddress = sourceAddress; + } + + public String getTargetAddress() { + return _targetAddress; + } + + public void setTargetAddress(String targetAddress) { + _targetAddress = targetAddress; + } + + public long getCallTag() { + return _callTag; + } + + public void setCallTag(long callTag) { + _callTag = callTag; } @Override public void apply() { - // TODO Auto-generated method stub + // sanity check + assert(_sourceAddress != null); + assert(_targetAddress != null); + if(!_oneway) + _rpcProvider.registerCall(this); + + RpcCallRequestPdu pdu = new RpcCallRequestPdu(); + pdu.setCommand(getCommand()); + if(_commandArg != null) + pdu.setSerializedCommandArg(_rpcProvider.getMessageSerializer().serializeTo(_commandArg.getClass(), _commandArg)); + pdu.setRequestTag(this.getCallTag()); + + _rpcProvider.sendRpcPdu(getSourceAddress(), getTargetAddress(), + _rpcProvider.getMessageSerializer().serializeTo(RpcCallRequestPdu.class, pdu)); } @Override public void cancel() { - // TODO Auto-generated method stub - + _rpcProvider.cancelCall(this); } @Override public T get() { - // TODO Auto-generated method stub + if(!_oneway) { + synchronized(_responseLock) { + if(!_responseDone) { + long timeToWait = _timeoutMilliseconds - (System.currentTimeMillis() - _startTickInMs); + if(timeToWait < 0) + timeToWait = 0; + + try { + _responseLock.wait(timeToWait); + } catch (InterruptedException e) { + throw new RpcTimeoutException("RPC call timed out"); + } + } + + assert(_responseDone); + + if(_responseResult == null) + return null; + + if(_responseResult instanceof RpcException) + throw (RpcException)_responseResult; + + assert(_rpcProvider.getMessageSerializer() != null); + assert(_responseResult instanceof String); + return _rpcProvider.getMessageSerializer().serializeFrom((String)_responseResult); + } + } return null; } + + public void complete(String result) { + _responseResult = result; + + synchronized(_responseLock) { + _responseDone = true; + _responseLock.notifyAll(); + } + + assert(_rpcProvider.getMessageSerializer() != null); + Object resultObject = _rpcProvider.getMessageSerializer().serializeFrom(result); + for(RpcCallbackListener listener: _callbackListeners) + listener.onSuccess(resultObject); + } + + public void complete(RpcException e) { + _responseResult = e; + + synchronized(_responseLock) { + _responseDone = true; + + _responseLock.notifyAll(); + } + + for(RpcCallbackListener listener: _callbackListeners) + listener.onFailure(e); + } } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java index 334e09d4c67..908912a3459 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java @@ -19,11 +19,17 @@ package org.apache.cloudstack.framework.messaging; public interface RpcProvider extends TransportMultiplexier { + final static String RPC_MULTIPLEXIER = "rpc"; + void setMessageSerializer(MessageSerializer messageSerializer); MessageSerializer getMessageSerializer(); void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint); void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint); - RpcClientCall target(String target); + RpcClientCall newCall(String sourceAddress, String targetAddress); + void registerCall(RpcClientCall call); + void cancelCall(RpcClientCall call); + + void sendRpcPdu(String sourceAddress, String targetAddress, String serializedPdu); } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java index fdbb27d66cb..c652982f932 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java @@ -19,13 +19,18 @@ package org.apache.cloudstack.framework.messaging; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; public class RpcProviderImpl implements RpcProvider { + private TransportProvider _transportProvider; private MessageSerializer _messageSerializer; private List _serviceEndpoints = new ArrayList(); - private TransportProvider _transportProvider; + private Map _outstandingCalls = new HashMap(); + + private long _nextCallTag = System.currentTimeMillis(); public RpcProviderImpl() { } @@ -41,9 +46,16 @@ public class RpcProviderImpl implements RpcProvider { @Override public void onTransportMessage(String senderEndpointAddress, String targetEndpointAddress, String multiplexer, String message) { - - // TODO Auto-generated method stub + assert(_messageSerializer != null); + Object pdu = _messageSerializer.serializeFrom(message); + if(pdu instanceof RpcCallRequestPdu) { + handleCallRequestPdu((RpcCallRequestPdu)pdu); + } else if(pdu instanceof RpcCallResponsePdu) { + handleCallResponsePdu((RpcCallResponsePdu)pdu); + } else { + assert(false); + } } @Override @@ -71,8 +83,76 @@ public class RpcProviderImpl implements RpcProvider { } @Override - public RpcClientCall target(String target) { - // TODO Auto-generated method stub - return null; + public RpcClientCall newCall(String sourceAddress, String targetAddress) { + long callTag = getNextCallTag(); + RpcClientCallImpl call = new RpcClientCallImpl(this); + call.setSourceAddress(sourceAddress); + call.setTargetAddress(targetAddress); + call.setCallTag(callTag); + + return call; + } + + @Override + public void registerCall(RpcClientCall call) { + assert(call != null); + synchronized(this) { + _outstandingCalls.put(((RpcClientCallImpl)call).getCallTag(), call); + } + } + + @Override + public void cancelCall(RpcClientCall call) { + synchronized(this) { + _outstandingCalls.remove(((RpcClientCallImpl)call).getCallTag()); + } + + ((RpcClientCallImpl)call).complete(new RpcException("Call is cancelled")); + } + + @Override + public void sendRpcPdu(String sourceAddress, String targetAddress, String serializedPdu) { + assert(_transportProvider != null); + _transportProvider.sendMessage(sourceAddress, targetAddress, this.RPC_MULTIPLEXIER, serializedPdu); + } + + protected synchronized long getNextCallTag() { + long tag = _nextCallTag++; + if(tag == 0) + tag++; + + return tag; + } + + private void handleCallRequestPdu(RpcCallRequestPdu pdu) { + // ??? + } + + private void handleCallResponsePdu(RpcCallResponsePdu pdu) { + RpcClientCallImpl call = null; + + synchronized(this) { + call = (RpcClientCallImpl)_outstandingCalls.remove(pdu.getRequestTag()); + } + + if(call != null) { + switch(pdu.getResult()) { + case RpcCallResponsePdu.RESULT_SUCCESSFUL : + call.complete(pdu.getSerializedResult()); + break; + + case RpcCallResponsePdu.RESULT_HANDLER_NOT_EXIST : + call.complete(new RpcException("Handler does not exist")); + break; + + case RpcCallResponsePdu.RESULT_HANDLER_EXCEPTION : + call.complete(new RpcException("Exception in handler")); + break; + + default : + assert(false); + break; + } + } } } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java index 6773e8d0ba6..c843b061157 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportProvider.java @@ -21,4 +21,7 @@ package org.apache.cloudstack.framework.messaging; public interface TransportProvider { void attach(TransportEndpoint endpoint, String predefinedAddress); void detach(TransportEndpoint endpoint); + + void sendMessage(String soureEndpointAddress, String targetEndpointAddress, + String multiplexier, String message); } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java index 665d207a762..551838eabf0 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportProvider.java @@ -32,4 +32,11 @@ public class ClientTransportProvider implements TransportProvider { public void detach(TransportEndpoint endpoint) { // TODO Auto-generated method stub } + + @Override + public void sendMessage(String soureEndpointAddress, String targetEndpointAddress, + String multiplexier, String message) { + // TODO + } + } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java index 1f7c12bfc8d..332e788c3f6 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerTransportProvider.java @@ -34,4 +34,10 @@ public class ServerTransportProvider implements TransportProvider { // TODO Auto-generated method stub } + + @Override + public void sendMessage(String soureEndpointAddress, String targetEndpointAddress, + String multiplexier, String message) { + // TODO + } }