From 28b682c8dbccfd27d30de08c539cb2847222c3a2 Mon Sep 17 00:00:00 2001 From: Kelven Yang Date: Thu, 15 Nov 2012 15:59:37 -0800 Subject: [PATCH] Add concept of caller context and message serilizer to messaging layer --- .../messaging/ComponentEndpoint.java | 32 ++++--------- .../framework/messaging/EventBus.java | 5 +- .../framework/messaging/EventBusBase.java | 24 +++++++--- .../framework/messaging/EventHandler.java | 16 +++++++ ...essFactory.java => MessageSerializer.java} | 5 +- .../framework/messaging/PublishScope.java | 24 +--------- .../framework/messaging/RpcCallContext.java | 47 +++++++++++++++++++ .../framework/messaging/RpcClientCall.java | 25 ++++++++++ .../messaging/RpcClientCallHandler.java | 12 +++++ .../framework/messaging/RpcEndpoint.java | 7 +-- .../framework/messaging/RpcException.java | 18 ++++++- .../framework/messaging/RpcProvider.java | 10 +++- .../{RpcCall.java => RpcServerCall.java} | 9 ++-- ...Handler.java => RpcServerCallHandler.java} | 3 +- .../framework/messaging/Subscriber.java | 2 +- .../messaging/TransportEndpoint.java | 4 +- .../messaging/client/ClientEventBus.java | 1 - .../client/ClientTransportEndpoint.java | 2 +- 18 files changed, 171 insertions(+), 75 deletions(-) rename framework/ipc/src/org/apache/cloudstack/framework/messaging/{TransportAddressFactory.java => MessageSerializer.java} (88%) create mode 100644 framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallContext.java create mode 100644 framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java create mode 100644 framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallHandler.java rename framework/ipc/src/org/apache/cloudstack/framework/messaging/{RpcCall.java => RpcServerCall.java} (87%) rename framework/ipc/src/org/apache/cloudstack/framework/messaging/{RpcCallHandler.java => RpcServerCallHandler.java} (93%) diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentEndpoint.java index ed9ea0b721a..442f9869b8b 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentEndpoint.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentEndpoint.java @@ -18,8 +18,7 @@ */ package org.apache.cloudstack.framework.messaging; -public class ComponentEndpoint implements RpcEndpoint, TransportMultiplexier, Subscriber { - +public class ComponentEndpoint implements RpcEndpoint, Subscriber { private TransportEndpoint transportEndpoint; private RpcProvider rpcProvider; @@ -42,44 +41,33 @@ public class ComponentEndpoint implements RpcEndpoint, TransportMultiplexier, Su this.rpcProvider = rpcProvider; } - public void initialize(String[] multiplexiers) { - if(multiplexiers != null) { - for(String name : multiplexiers) - transportEndpoint.registerMultiplexier(name, this); - } - + public void initialize() { rpcProvider.registerRpcEndpoint(this); } - @Override - public void onTransportMessage(String senderEndpointAddress, - String targetEndpointAddress, String multiplexer, String message) { - } - - @Override - public String call(String targetAddress, String rpcMessage) + // it will throw RpcRuntimeException in case of transport + public String call(RpcCallContext callContext, String targetAddress, String command, Object cmdArg) { - return null; + return rpcProvider.call(this, callContext, targetAddress, command, cmdArg); } - @Override - public RpcCall asyncCall(String targetAddress, String rpcMessage) { - return null; + public RpcClientCall asyncCall(RpcCallContext callContext, String targetAddress, String command, Object cmdArg) { + return rpcProvider.asyncCall(this, callContext, targetAddress, command, cmdArg); } @Override - public void onCallReceive(RpcCall call) { + public void onCallReceive(RpcServerCall call) { // TODO Auto-generated method stub // implement annotation based call dispatching } @Override - public void onCallReturn(RpcCall call, String rpcReturnMessage, RpcException e) { + public void onCallReturn(RpcClientCall call, Object returnObject, RpcException e) { // ??? } @Override - public void onPublishEvent(String subject, String senderAddress, String args) { + public void onPublishEvent(String subject, String senderAddress, Object args) { // TODO } } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBus.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBus.java index e11a009098a..b73438b5cb7 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBus.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBus.java @@ -18,8 +18,11 @@ package org.apache.cloudstack.framework.messaging; public interface EventBus { + void setMessageSerializer(MessageSerializer messageSerializer); + MessageSerializer getMessageSerializer(); + void subscribe(String subject, Subscriber subscriber); void unsubscribe(String subject, Subscriber subscriber); - void publish(String subject, PublishScope scope, String senderAddress, String args); + void publish(String senderAddress, String subject, PublishScope scope, Object args); } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBusBase.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBusBase.java index fbcf6488837..729208acdfc 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBusBase.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBusBase.java @@ -24,13 +24,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; - public class EventBusBase implements EventBus { private Gate _gate; private List _pendingActions; private SubscriptionNode _subscriberRoot; + private MessageSerializer _messageSerializer; public EventBusBase() { _gate = new Gate(); @@ -39,6 +39,16 @@ public class EventBusBase implements EventBus { _subscriberRoot = new SubscriptionNode("/", null); } + @Override + public void setMessageSerializer(MessageSerializer messageSerializer) { + _messageSerializer = messageSerializer; + } + + @Override + public MessageSerializer getMessageSerializer() { + return _messageSerializer; + } + @Override public void subscribe(String subject, Subscriber subscriber) { assert(subject != null); @@ -71,8 +81,8 @@ public class EventBusBase implements EventBus { } @Override - public void publish(String subject, PublishScope scope, String senderAddress, - String args) { + public void publish(String senderAddress, String subject, PublishScope scope, + Object args) { if(_gate.enter(true)) { @@ -80,11 +90,11 @@ public class EventBusBase implements EventBus { SubscriptionNode current = locate(subject, chainFromTop, false); if(current != null) - current.notifySubscribers(subject, senderAddress, args); + current.notifySubscribers(senderAddress, subject, args); Collections.reverse(chainFromTop); for(SubscriptionNode node : chainFromTop) - node.notifySubscribers(subject, senderAddress, args); + node.notifySubscribers(senderAddress, subject, args); _gate.leave(); } @@ -283,9 +293,9 @@ public class EventBusBase implements EventBus { _children.put(key, childNode); } - public void notifySubscribers(String subject, String senderAddress, String args) { + public void notifySubscribers(String senderAddress, String subject, Object args) { for(Subscriber subscriber : _subscribers) { - subscriber.onPublishEvent(subject, senderAddress, args); + subscriber.onPublishEvent(senderAddress, subject, args); } } } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventHandler.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventHandler.java index 12e6fb8394a..6ee67c8ea40 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventHandler.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventHandler.java @@ -1,3 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. package org.apache.cloudstack.framework.messaging; import java.lang.annotation.ElementType; diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportAddressFactory.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessageSerializer.java similarity index 88% rename from framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportAddressFactory.java rename to framework/ipc/src/org/apache/cloudstack/framework/messaging/MessageSerializer.java index d7f3e9de5f2..3aafd85a679 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportAddressFactory.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessageSerializer.java @@ -18,6 +18,7 @@ */ package org.apache.cloudstack.framework.messaging; -public interface TransportAddressFactory { - String createServiceAddress(String serviceProvider); +public interface MessageSerializer { + String serializeTo(Object object); + Object serializeFrom(String message); } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/PublishScope.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/PublishScope.java index fbce919af6d..a266578b53f 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/PublishScope.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/PublishScope.java @@ -17,26 +17,6 @@ package org.apache.cloudstack.framework.messaging; -public class PublishScope { - public enum Type { SINGLE, LOCAL, GLOBAL }; - - Type scope; - String address; - - public PublishScope(Type scope) { - this.scope = scope; - } - - public PublishScope(String address) { - scope = Type.SINGLE; - this.address = address; - } - - public Type getType() { - return scope; - } - - public String getAddress() { - return this.address; - } +public enum PublishScope { + LOCAL, GLOBAL } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallContext.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallContext.java new file mode 100644 index 00000000000..bdfcd453c24 --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallContext.java @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.cloudstack.framework.messaging; + +import java.util.HashMap; +import java.util.Map; + +public class RpcCallContext { + private final static int DEFAULT_RPC_TIMEOUT = 10000; + + Map _contextMap = new HashMap(); + int _timeoutMilliSeconds = DEFAULT_RPC_TIMEOUT; + + public RpcCallContext() { + } + + public int getTimeoutMilliSeconds() { + return _timeoutMilliSeconds; + } + + public void setTimeoutMilliSeconds(int timeoutMilliseconds) { + _timeoutMilliSeconds = timeoutMilliseconds; + } + + @SuppressWarnings("unchecked") + public T getContextParameter(String key) { + return (T)_contextMap.get(key); + } + + public void setContextParameter(String key, Object object) { + _contextMap.put(key, object); + } +} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java new file mode 100644 index 00000000000..7d9cd8f5886 --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.cloudstack.framework.messaging; + +public interface RpcClientCall { + String getCommand(); + Object getCommandArgument(); + RpcCallContext getCallContext(); + + void cancel(); +} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallHandler.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallHandler.java new file mode 100644 index 00000000000..d695ff3cf42 --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallHandler.java @@ -0,0 +1,12 @@ +package org.apache.cloudstack.framework.messaging; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface RpcClientCallHandler { + String command(); +} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcEndpoint.java index 1be5890e7af..ff007286045 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcEndpoint.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcEndpoint.java @@ -19,9 +19,6 @@ package org.apache.cloudstack.framework.messaging; public interface RpcEndpoint { - String call(String targetAddress, String rpcMessage); - RpcCall asyncCall(String targetAddress, String rpcMessage); - - void onCallReceive(RpcCall call); - void onCallReturn(RpcCall call, String rpcReturnMessage, RpcException e); + void onCallReceive(RpcServerCall call); + void onCallReturn(RpcClientCall call, Object returnObject, RpcException e); } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcException.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcException.java index 978f1c3421d..914d93add19 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcException.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcException.java @@ -1,6 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. package org.apache.cloudstack.framework.messaging; -public class RpcException extends Exception { +public class RpcException extends RuntimeException { private static final long serialVersionUID = -3164514701087423787L; public RpcException() { diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java index cbdd4a7342d..95217cdec1f 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java @@ -19,6 +19,12 @@ package org.apache.cloudstack.framework.messaging; public interface RpcProvider extends TransportMultiplexier { - public void registerRpcEndpoint(RpcEndpoint rpcEndpoint); - public void unregisteRpcEndpoint(RpcEndpoint rpcEndpoint); + void setMessageSerializer(MessageSerializer messageSerializer); + MessageSerializer getMessageSerializer(); + + void registerRpcEndpoint(RpcEndpoint rpcEndpoint); + void unregisteRpcEndpoint(RpcEndpoint rpcEndpoint); + + String call(RpcEndpoint endpoint, RpcCallContext callContext, String targetAddress, String command, Object cmdArg); + RpcClientCall asyncCall(RpcEndpoint endpoint, RpcCallContext callContext, String targetAddress, String command, Object cmdArg); } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCall.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCall.java similarity index 87% rename from framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCall.java rename to framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCall.java index 570b82aa5a4..7380fb2091a 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCall.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCall.java @@ -18,14 +18,11 @@ */ package org.apache.cloudstack.framework.messaging; -public interface RpcCall { +public interface RpcServerCall { String getCommand(); - String getContent(); + Object getCommandArgument(); String getRequestTag(); - // for sender to cancel - void cancel(); - // for receiver to response call - void completeCall(String rpcMessage); + void completeCall(Object returnObject); } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallHandler.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCallHandler.java similarity index 93% rename from framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallHandler.java rename to framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCallHandler.java index 70110ee8701..73502ab24f1 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallHandler.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCallHandler.java @@ -25,7 +25,6 @@ import java.lang.annotation.Target; @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) -public @interface RpcCallHandler { +public @interface RpcServerCallHandler { String command(); - boolean returnHandler() default false; } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/Subscriber.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/Subscriber.java index 3eb3fc60a67..9f8d460ad56 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/Subscriber.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/Subscriber.java @@ -18,5 +18,5 @@ package org.apache.cloudstack.framework.messaging; public interface Subscriber { - void onPublishEvent(String subject, String senderAddress, String args); + void onPublishEvent(String senderAddress, String subject, Object args); } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java index 3a98681ee64..6bca5668567 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java @@ -19,12 +19,12 @@ package org.apache.cloudstack.framework.messaging; public interface TransportEndpoint { - void onAttachConfirm(String endpointAddress); + void onAttachConfirm(boolean bSuccess, String endpointAddress); void onDetachIndication(String endpointAddress); void registerMultiplexier(String name, TransportMultiplexier multiplexier); void unregisterMultiplexier(String name); void sendMessage(TransportEndpoint sender, String targetEndpointAddress, - String multiplexier, String message); + String multiplexier, String message); } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientEventBus.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientEventBus.java index 68303bc9912..c06934ec3fd 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientEventBus.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientEventBus.java @@ -27,6 +27,5 @@ public class ClientEventBus extends EventBusBase implements TransportMultiplexie public void onTransportMessage(String senderEndpointAddress, String targetEndpointAddress, String multiplexer, String message) { // TODO Auto-generated method stub - } } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java index 49a1eb6c49b..3c8878ff997 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java @@ -24,7 +24,7 @@ import org.apache.cloudstack.framework.messaging.TransportMultiplexier; public class ClientTransportEndpoint implements TransportEndpoint { @Override - public void onAttachConfirm(String endpointAddress) { + public void onAttachConfirm(boolean bSuccess, String endpointAddress) { // TODO Auto-generated method stub }