diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java index d2adbfda952..1ad03ef89de 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java @@ -32,6 +32,8 @@ public interface RpcClientCall { Object getContextParam(String key); RpcClientCall addCallbackListener(RpcCallbackListener listener); + RpcClientCall setCallbackDispatcherTarget(Object target); + RpcClientCall setOneway(); void apply(); diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java index 574a273af37..e244f624a35 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallImpl.java @@ -33,7 +33,8 @@ public class RpcClientCallImpl implements RpcClientCall { private boolean _oneway = false; private List _callbackListeners = new ArrayList(); - + private Object _callbackDispatcherTarget; + private RpcProvider _rpcProvider; private long _startTickInMs; private long _callTag; @@ -95,6 +96,13 @@ public class RpcClientCallImpl implements RpcClientCall { _callbackListeners.add(listener); return this; } + + @Override + public RpcClientCall setCallbackDispatcherTarget(Object target) { + _callbackDispatcherTarget = target; + return this; + } + @Override public RpcClientCall setOneway() { @@ -189,11 +197,16 @@ public class RpcClientCallImpl implements RpcClientCall { _responseDone = true; _responseLock.notifyAll(); } - - assert(_rpcProvider.getMessageSerializer() != null); - Object resultObject = _rpcProvider.getMessageSerializer().serializeFrom(result); - for(RpcCallbackListener listener: _callbackListeners) - listener.onSuccess(resultObject); + + if(_callbackListeners.size() > 0) { + assert(_rpcProvider.getMessageSerializer() != null); + Object resultObject = _rpcProvider.getMessageSerializer().serializeFrom(result); + for(RpcCallbackListener listener: _callbackListeners) + listener.onSuccess(resultObject); + } else { + if(_callbackDispatcherTarget != null) + RpcCallbackDispatcher.dispatch(_callbackDispatcherTarget, this); + } } public void complete(RpcException e) { @@ -205,7 +218,12 @@ public class RpcClientCallImpl implements RpcClientCall { _responseLock.notifyAll(); } - for(RpcCallbackListener listener: _callbackListeners) - listener.onFailure(e); + if(_callbackListeners.size() > 0) { + for(RpcCallbackListener listener: _callbackListeners) + listener.onFailure(e); + } else { + if(_callbackDispatcherTarget != null) + RpcCallbackDispatcher.dispatch(_callbackDispatcherTarget, this); + } } } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java index c652982f932..ea6993169ee 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProviderImpl.java @@ -50,9 +50,9 @@ public class RpcProviderImpl implements RpcProvider { Object pdu = _messageSerializer.serializeFrom(message); if(pdu instanceof RpcCallRequestPdu) { - handleCallRequestPdu((RpcCallRequestPdu)pdu); + handleCallRequestPdu(senderEndpointAddress, targetEndpointAddress, (RpcCallRequestPdu)pdu); } else if(pdu instanceof RpcCallResponsePdu) { - handleCallResponsePdu((RpcCallResponsePdu)pdu); + handleCallResponsePdu(senderEndpointAddress, targetEndpointAddress, (RpcCallResponsePdu)pdu); } else { assert(false); } @@ -81,7 +81,7 @@ public class RpcProviderImpl implements RpcProvider { _serviceEndpoints.remove(rpcEndpoint); } } - + @Override public RpcClientCall newCall(String sourceAddress, String targetAddress) { long callTag = getNextCallTag(); @@ -124,11 +124,42 @@ public class RpcProviderImpl implements RpcProvider { return tag; } - private void handleCallRequestPdu(RpcCallRequestPdu pdu) { - // ??? + private void handleCallRequestPdu(String sourceAddress, String targetAddress, RpcCallRequestPdu pdu) { + try { + RpcServerCall call = new RpcServerCallImpl(this, sourceAddress, targetAddress, pdu); + + // TODO, we are trying to avoid locking when calling into callbacks + // this can be optimized later + List endpoints = new ArrayList(); + synchronized(_serviceEndpoints) { + endpoints.addAll(_serviceEndpoints); + } + + for(RpcServiceEndpoint endpoint : endpoints) { + if(RpcServiceDispatcher.dispatch(endpoint, call)) + return; + } + + RpcCallResponsePdu responsePdu = new RpcCallResponsePdu(); + responsePdu.setCommand(pdu.getCommand()); + responsePdu.setRequestStartTick(pdu.getRequestStartTick()); + responsePdu.setRequestTag(pdu.getRequestTag()); + responsePdu.setResult(RpcCallResponsePdu.RESULT_HANDLER_NOT_EXIST); + sendRpcPdu(targetAddress, sourceAddress, _messageSerializer.serializeTo(RpcCallResponsePdu.class, responsePdu)); + + } catch (Throwable e) { + + RpcCallResponsePdu responsePdu = new RpcCallResponsePdu(); + responsePdu.setCommand(pdu.getCommand()); + responsePdu.setRequestStartTick(pdu.getRequestStartTick()); + responsePdu.setRequestTag(pdu.getRequestTag()); + responsePdu.setResult(RpcCallResponsePdu.RESULT_HANDLER_EXCEPTION); + + sendRpcPdu(targetAddress, sourceAddress, _messageSerializer.serializeTo(RpcCallResponsePdu.class, responsePdu)); + } } - private void handleCallResponsePdu(RpcCallResponsePdu pdu) { + private void handleCallResponsePdu(String sourceAddress, String targetAddress, RpcCallResponsePdu pdu) { RpcClientCallImpl call = null; synchronized(this) { diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCallImpl.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCallImpl.java new file mode 100644 index 00000000000..75f521f7bbc --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCallImpl.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cloudstack.framework.messaging; + +public class RpcServerCallImpl implements RpcServerCall { + + private RpcProvider _rpcProvider; + private String _sourceAddress; + private String _targetAddress; + + private RpcCallRequestPdu _requestPdu; + + public RpcServerCallImpl(RpcProvider provider, String sourceAddress, String targetAddress, + RpcCallRequestPdu requestPdu) { + + _rpcProvider = provider; + _sourceAddress = sourceAddress; + _targetAddress = targetAddress; + _requestPdu = requestPdu; + } + + @Override + public String getCommand() { + assert(_requestPdu != null); + return _requestPdu.getCommand(); + } + + @Override + public Object getCommandArgument() { + if(_requestPdu.getSerializedCommandArg() == null) + return null; + + assert(_rpcProvider.getMessageSerializer() != null); + return _rpcProvider.getMessageSerializer().serializeFrom(_requestPdu.getSerializedCommandArg()); + } + + @Override + public void completeCall(Object returnObject) { + assert(_sourceAddress != null); + assert(_targetAddress != null); + + RpcCallResponsePdu pdu = new RpcCallResponsePdu(); + pdu.setCommand(_requestPdu.getCommand()); + pdu.setRequestTag(_requestPdu.getRequestTag()); + pdu.setRequestStartTick(_requestPdu.getRequestStartTick()); + pdu.setRequestStartTick(RpcCallResponsePdu.RESULT_SUCCESSFUL); + if(returnObject != null) { + assert(_rpcProvider.getMessageSerializer() != null); + pdu.setSerializedResult(_rpcProvider.getMessageSerializer().serializeTo(returnObject.getClass(), returnObject)); + } + + _rpcProvider.sendRpcPdu(_targetAddress, _sourceAddress, + _rpcProvider.getMessageSerializer().serializeTo(RpcCallResponsePdu.class, pdu)); + } +}