diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentContainer.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentContainer.java index 1d0f2741451..c5828ae280d 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentContainer.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentContainer.java @@ -1,5 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.cloudstack.framework.messaging; public interface ComponentContainer { - ComponentEndpoint wireComponent(ComponentEndpoint endpoint, String predefinedAddress); + ComponentEndpoint wire(ComponentEndpoint endpoint, String predefinedAddress); } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentEndpoint.java index 92443e5df9c..b218c1999fb 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentEndpoint.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentEndpoint.java @@ -18,7 +18,8 @@ */ package org.apache.cloudstack.framework.messaging; -public class ComponentEndpoint implements RpcEndpoint, Subscriber { +public class ComponentEndpoint implements RpcServiceEndpoint, Subscriber { + private TransportEndpoint transportEndpoint; private RpcProvider rpcProvider; @@ -42,17 +43,19 @@ public class ComponentEndpoint implements RpcEndpoint, Subscriber { } public void initialize() { - rpcProvider.registerRpcEndpoint(this); + rpcProvider.registerRpcServiceEndpoint(this); } @Override - public void onCallReceive(RpcServerCall call) { - // TODO Auto-generated method stub - // implement annotation based call dispatching + public boolean onCallReceive(RpcServerCall call) { + return RpcServiceDispatcher.dispatch(this, call); } @Override public void onPublishEvent(String subject, String senderAddress, Object args) { - // TODO + try { + EventDispatcher.dispatch(this, subject, senderAddress, args); + } catch(RuntimeException e) { + } } } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventDispatcher.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventDispatcher.java new file mode 100644 index 00000000000..ec2afb45ab9 --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventDispatcher.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cloudstack.framework.messaging; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; + +public class EventDispatcher { + private static Map, Method> s_handlerCache = new HashMap, Method>(); + + public static boolean dispatch(Object target, String subject, String senderAddress, Object args) { + assert(subject != null); + assert(target != null); + + Method handler = resolveHandler(target.getClass(), subject); + if(handler == null) + return false; + + try { + handler.invoke(target, subject, senderAddress, args); + } catch (IllegalArgumentException e) { + throw new RuntimeException("IllegalArgumentException when invoking event handler for subject: " + subject); + } catch (IllegalAccessException e) { + throw new RuntimeException("IllegalAccessException when invoking event handler for subject: " + subject); + } catch (InvocationTargetException e) { + throw new RuntimeException("InvocationTargetException when invoking event handler for subject: " + subject); + } + + return true; + } + + public static Method resolveHandler(Class handlerClz, String subject) { + synchronized(s_handlerCache) { + Method handler = s_handlerCache.get(handlerClz); + if(handler != null) + return handler; + + for(Method method : handlerClz.getMethods()) { + EventHandler annotation = method.getAnnotation(EventHandler.class); + if(annotation != null) { + if(match(annotation.topic(), subject)) { + s_handlerCache.put(handlerClz, method); + return method; + } + } + } + } + + return null; + } + + private static boolean match(String expression, String param) { + return param.matches(expression); + } +} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventHandler.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventHandler.java index 6ee67c8ea40..5ec03f198ae 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventHandler.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventHandler.java @@ -1,19 +1,21 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.cloudstack.framework.messaging; import java.lang.annotation.ElementType; diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallbackDispatcher.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallbackDispatcher.java new file mode 100644 index 00000000000..8fbe38fe1ad --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallbackDispatcher.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cloudstack.framework.messaging; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; + +public class RpcCallbackDispatcher { + + private static Map, Method> s_handlerCache = new HashMap, Method>(); + + public static boolean dispatch(Object target, RpcClientCall clientCall) { + assert(clientCall != null); + assert(target != null); + + Method handler = resolveHandler(target.getClass(), clientCall.getCommand()); + if(handler == null) + return false; + + try { + handler.invoke(target, clientCall); + } catch (IllegalArgumentException e) { + throw new RpcException("IllegalArgumentException when invoking RPC callback for command: " + clientCall.getCommand()); + } catch (IllegalAccessException e) { + throw new RpcException("IllegalAccessException when invoking RPC callback for command: " + clientCall.getCommand()); + } catch (InvocationTargetException e) { + throw new RpcException("InvocationTargetException when invoking RPC callback for command: " + clientCall.getCommand()); + } + + return true; + } + + public static Method resolveHandler(Class handlerClz, String command) { + synchronized(s_handlerCache) { + Method handler = s_handlerCache.get(handlerClz); + if(handler != null) + return handler; + + for(Method method : handlerClz.getMethods()) { + RpcCallbackHandler annotation = method.getAnnotation(RpcCallbackHandler.class); + if(annotation != null) { + if(annotation.command().equals(command)) { + s_handlerCache.put(handlerClz, method); + return method; + } + } + } + } + + return null; + } +} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCallHandler.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallbackHandler.java similarity index 96% rename from framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCallHandler.java rename to framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallbackHandler.java index 73502ab24f1..61a214102e3 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServerCallHandler.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallbackHandler.java @@ -25,6 +25,6 @@ import java.lang.annotation.Target; @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) -public @interface RpcServerCallHandler { +public @interface RpcCallbackHandler { String command(); } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java index 5a1e9c47e12..a1a4bbff314 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCall.java @@ -19,8 +19,8 @@ package org.apache.cloudstack.framework.messaging; import java.util.concurrent.TimeUnit; public interface RpcClientCall { + String getCommand(); RpcClientCall setCommand(String cmd); - RpcClientCall setPipeline(String pipeline); RpcClientCall setTimeout(TimeUnit timeout); RpcClientCall setCommandArg(Object arg); @@ -34,5 +34,8 @@ public interface RpcClientCall { void apply(); void cancel(); + /** + * @return the result object, it may also throw RpcException to indicate RPC failures + */ T get(); } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallHandler.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallHandler.java deleted file mode 100644 index d695ff3cf42..00000000000 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcClientCallHandler.java +++ /dev/null @@ -1,12 +0,0 @@ -package org.apache.cloudstack.framework.messaging; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -@Retention(RetentionPolicy.RUNTIME) -@Target(ElementType.METHOD) -public @interface RpcClientCallHandler { - String command(); -} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java index 547a81acc77..334e09d4c67 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcProvider.java @@ -22,8 +22,8 @@ public interface RpcProvider extends TransportMultiplexier { void setMessageSerializer(MessageSerializer messageSerializer); MessageSerializer getMessageSerializer(); - void registerRpcEndpoint(RpcEndpoint rpcEndpoint); - void unregisteRpcEndpoint(RpcEndpoint rpcEndpoint); + void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint); + void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint); RpcClientCall target(String target); } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServiceDispatcher.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServiceDispatcher.java new file mode 100644 index 00000000000..1f1d1b93c4a --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServiceDispatcher.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cloudstack.framework.messaging; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; + +public class RpcServiceDispatcher { + + private static Map, Method> s_handlerCache = new HashMap, Method>(); + + public static boolean dispatch(Object target, RpcServerCall serviceCall) { + assert(serviceCall != null); + assert(target != null); + + Method handler = resolveHandler(target.getClass(), serviceCall.getCommand()); + if(handler == null) + return false; + + try { + handler.invoke(target, serviceCall); + } catch (IllegalArgumentException e) { + throw new RpcException("IllegalArgumentException when invoking RPC service command: " + serviceCall.getCommand()); + } catch (IllegalAccessException e) { + throw new RpcException("IllegalAccessException when invoking RPC service command: " + serviceCall.getCommand()); + } catch (InvocationTargetException e) { + throw new RpcException("InvocationTargetException when invoking RPC service command: " + serviceCall.getCommand()); + } + + return true; + } + + public static Method resolveHandler(Class handlerClz, String command) { + synchronized(s_handlerCache) { + Method handler = s_handlerCache.get(handlerClz); + if(handler != null) + return handler; + + for(Method method : handlerClz.getMethods()) { + RpcServiceHandler annotation = method.getAnnotation(RpcServiceHandler.class); + if(annotation != null) { + if(annotation.command().equals(command)) { + s_handlerCache.put(handlerClz, method); + return method; + } + } + } + } + + return null; + } +} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServiceEndpoint.java similarity index 79% rename from framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcEndpoint.java rename to framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServiceEndpoint.java index 375c1d3520d..8820139c6ec 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcEndpoint.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServiceEndpoint.java @@ -18,6 +18,13 @@ */ package org.apache.cloudstack.framework.messaging; -public interface RpcEndpoint { - void onCallReceive(RpcServerCall call); +public interface RpcServiceEndpoint { + /* + * @return + * true call has been handled + * false can not find the call handler + * @throws + * RpcException, exception when + */ + boolean onCallReceive(RpcServerCall call); } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServiceHandler.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServiceHandler.java new file mode 100644 index 00000000000..435f841c75b --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcServiceHandler.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cloudstack.framework.messaging; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface RpcServiceHandler { + String command(); +} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/Subscriber.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/Subscriber.java index 9f8d460ad56..d4fe69ce2c2 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/Subscriber.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/Subscriber.java @@ -1,19 +1,21 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.cloudstack.framework.messaging; diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java index 91ec86f49ea..0996bfddb7f 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java @@ -27,6 +27,6 @@ public interface TransportEndpoint { void registerMultiplexier(String name, TransportMultiplexier multiplexier); void unregisterMultiplexier(String name); - void sendMessage(TransportEndpoint sender, String targetEndpointAddress, + void sendMessage(String soureEndpointAddress, String targetEndpointAddress, String multiplexier, String message); } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java index e12ddcf48d1..a1c345ede84 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java @@ -50,7 +50,7 @@ public class ClientTransportEndpoint implements TransportEndpoint { } @Override - public void sendMessage(TransportEndpoint sender, + public void sendMessage(String sourceEndpointAddress, String targetEndpointAddress, String multiplexier, String message) { // TODO Auto-generated method stub }