Make RPC naming convention clear for RPC users, add handler and event dispatchers

This commit is contained in:
Kelven Yang 2012-11-20 15:42:51 -08:00
parent 17f2af409e
commit d62da2a7b5
15 changed files with 325 additions and 59 deletions

View File

@ -1,5 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging; package org.apache.cloudstack.framework.messaging;
public interface ComponentContainer { public interface ComponentContainer {
ComponentEndpoint wireComponent(ComponentEndpoint endpoint, String predefinedAddress); ComponentEndpoint wire(ComponentEndpoint endpoint, String predefinedAddress);
} }

View File

@ -18,7 +18,8 @@
*/ */
package org.apache.cloudstack.framework.messaging; package org.apache.cloudstack.framework.messaging;
public class ComponentEndpoint implements RpcEndpoint, Subscriber { public class ComponentEndpoint implements RpcServiceEndpoint, Subscriber {
private TransportEndpoint transportEndpoint; private TransportEndpoint transportEndpoint;
private RpcProvider rpcProvider; private RpcProvider rpcProvider;
@ -42,17 +43,19 @@ public class ComponentEndpoint implements RpcEndpoint, Subscriber {
} }
public void initialize() { public void initialize() {
rpcProvider.registerRpcEndpoint(this); rpcProvider.registerRpcServiceEndpoint(this);
} }
@Override @Override
public void onCallReceive(RpcServerCall call) { public boolean onCallReceive(RpcServerCall call) {
// TODO Auto-generated method stub return RpcServiceDispatcher.dispatch(this, call);
// implement annotation based call dispatching
} }
@Override @Override
public void onPublishEvent(String subject, String senderAddress, Object args) { public void onPublishEvent(String subject, String senderAddress, Object args) {
// TODO try {
EventDispatcher.dispatch(this, subject, senderAddress, args);
} catch(RuntimeException e) {
}
} }
} }

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
public class EventDispatcher {
private static Map<Class<?>, Method> s_handlerCache = new HashMap<Class<?>, Method>();
public static boolean dispatch(Object target, String subject, String senderAddress, Object args) {
assert(subject != null);
assert(target != null);
Method handler = resolveHandler(target.getClass(), subject);
if(handler == null)
return false;
try {
handler.invoke(target, subject, senderAddress, args);
} catch (IllegalArgumentException e) {
throw new RuntimeException("IllegalArgumentException when invoking event handler for subject: " + subject);
} catch (IllegalAccessException e) {
throw new RuntimeException("IllegalAccessException when invoking event handler for subject: " + subject);
} catch (InvocationTargetException e) {
throw new RuntimeException("InvocationTargetException when invoking event handler for subject: " + subject);
}
return true;
}
public static Method resolveHandler(Class<?> handlerClz, String subject) {
synchronized(s_handlerCache) {
Method handler = s_handlerCache.get(handlerClz);
if(handler != null)
return handler;
for(Method method : handlerClz.getMethods()) {
EventHandler annotation = method.getAnnotation(EventHandler.class);
if(annotation != null) {
if(match(annotation.topic(), subject)) {
s_handlerCache.put(handlerClz, method);
return method;
}
}
}
}
return null;
}
private static boolean match(String expression, String param) {
return param.matches(expression);
}
}

View File

@ -1,19 +1,21 @@
// Licensed to the Apache Software Foundation (ASF) under one /*
// or more contributor license agreements. See the NOTICE file * Licensed to the Apache Software Foundation (ASF) under one
// distributed with this work for additional information * or more contributor license agreements. See the NOTICE file
// regarding copyright ownership. The ASF licenses this file * distributed with this work for additional information
// to you under the Apache License, Version 2.0 (the * regarding copyright ownership. The ASF licenses this file
// "License"); you may not use this file except in compliance * to you under the Apache License, Version 2.0 (the
// the License. You may obtain a copy of the License at * "License"); you may not use this file except in compliance
// * with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0 *
// * http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, *
// software distributed under the License is distributed on an * Unless required by applicable law or agreed to in writing,
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// specific language governing permissions and limitations * KIND, either express or implied. See the License for the
// under the License. * specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging; package org.apache.cloudstack.framework.messaging;
import java.lang.annotation.ElementType; import java.lang.annotation.ElementType;

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
public class RpcCallbackDispatcher {
private static Map<Class<?>, Method> s_handlerCache = new HashMap<Class<?>, Method>();
public static boolean dispatch(Object target, RpcClientCall clientCall) {
assert(clientCall != null);
assert(target != null);
Method handler = resolveHandler(target.getClass(), clientCall.getCommand());
if(handler == null)
return false;
try {
handler.invoke(target, clientCall);
} catch (IllegalArgumentException e) {
throw new RpcException("IllegalArgumentException when invoking RPC callback for command: " + clientCall.getCommand());
} catch (IllegalAccessException e) {
throw new RpcException("IllegalAccessException when invoking RPC callback for command: " + clientCall.getCommand());
} catch (InvocationTargetException e) {
throw new RpcException("InvocationTargetException when invoking RPC callback for command: " + clientCall.getCommand());
}
return true;
}
public static Method resolveHandler(Class<?> handlerClz, String command) {
synchronized(s_handlerCache) {
Method handler = s_handlerCache.get(handlerClz);
if(handler != null)
return handler;
for(Method method : handlerClz.getMethods()) {
RpcCallbackHandler annotation = method.getAnnotation(RpcCallbackHandler.class);
if(annotation != null) {
if(annotation.command().equals(command)) {
s_handlerCache.put(handlerClz, method);
return method;
}
}
}
}
return null;
}
}

View File

@ -25,6 +25,6 @@ import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD) @Target(ElementType.METHOD)
public @interface RpcServerCallHandler { public @interface RpcCallbackHandler {
String command(); String command();
} }

View File

@ -19,8 +19,8 @@ package org.apache.cloudstack.framework.messaging;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public interface RpcClientCall { public interface RpcClientCall {
String getCommand();
RpcClientCall setCommand(String cmd); RpcClientCall setCommand(String cmd);
RpcClientCall setPipeline(String pipeline);
RpcClientCall setTimeout(TimeUnit timeout); RpcClientCall setTimeout(TimeUnit timeout);
RpcClientCall setCommandArg(Object arg); RpcClientCall setCommandArg(Object arg);
@ -34,5 +34,8 @@ public interface RpcClientCall {
void apply(); void apply();
void cancel(); void cancel();
/**
* @return the result object it may also throw RpcException to indicate RPC failures
*/
<T> T get(); <T> T get();
} }

View File

@ -1,12 +0,0 @@
package org.apache.cloudstack.framework.messaging;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RpcClientCallHandler {
String command();
}

View File

@ -22,8 +22,8 @@ public interface RpcProvider extends TransportMultiplexier {
void setMessageSerializer(MessageSerializer messageSerializer); void setMessageSerializer(MessageSerializer messageSerializer);
MessageSerializer getMessageSerializer(); MessageSerializer getMessageSerializer();
void registerRpcEndpoint(RpcEndpoint rpcEndpoint); void registerRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint);
void unregisteRpcEndpoint(RpcEndpoint rpcEndpoint); void unregisteRpcServiceEndpoint(RpcServiceEndpoint rpcEndpoint);
RpcClientCall target(String target); RpcClientCall target(String target);
} }

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
public class RpcServiceDispatcher {
private static Map<Class<?>, Method> s_handlerCache = new HashMap<Class<?>, Method>();
public static boolean dispatch(Object target, RpcServerCall serviceCall) {
assert(serviceCall != null);
assert(target != null);
Method handler = resolveHandler(target.getClass(), serviceCall.getCommand());
if(handler == null)
return false;
try {
handler.invoke(target, serviceCall);
} catch (IllegalArgumentException e) {
throw new RpcException("IllegalArgumentException when invoking RPC service command: " + serviceCall.getCommand());
} catch (IllegalAccessException e) {
throw new RpcException("IllegalAccessException when invoking RPC service command: " + serviceCall.getCommand());
} catch (InvocationTargetException e) {
throw new RpcException("InvocationTargetException when invoking RPC service command: " + serviceCall.getCommand());
}
return true;
}
public static Method resolveHandler(Class<?> handlerClz, String command) {
synchronized(s_handlerCache) {
Method handler = s_handlerCache.get(handlerClz);
if(handler != null)
return handler;
for(Method method : handlerClz.getMethods()) {
RpcServiceHandler annotation = method.getAnnotation(RpcServiceHandler.class);
if(annotation != null) {
if(annotation.command().equals(command)) {
s_handlerCache.put(handlerClz, method);
return method;
}
}
}
}
return null;
}
}

View File

@ -18,6 +18,13 @@
*/ */
package org.apache.cloudstack.framework.messaging; package org.apache.cloudstack.framework.messaging;
public interface RpcEndpoint { public interface RpcServiceEndpoint {
void onCallReceive(RpcServerCall call); /*
* @return
* true call has been handled
* false can not find the call handler
* @throws
* RpcException, exception when
*/
boolean onCallReceive(RpcServerCall call);
} }

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RpcServiceHandler {
String command();
}

View File

@ -1,19 +1,21 @@
// Licensed to the Apache Software Foundation (ASF) under one /*
// or more contributor license agreements. See the NOTICE file * Licensed to the Apache Software Foundation (ASF) under one
// distributed with this work for additional information * or more contributor license agreements. See the NOTICE file
// regarding copyright ownership. The ASF licenses this file * distributed with this work for additional information
// to you under the Apache License, Version 2.0 (the * regarding copyright ownership. The ASF licenses this file
// "License"); you may not use this file except in compliance * to you under the Apache License, Version 2.0 (the
// the License. You may obtain a copy of the License at * "License"); you may not use this file except in compliance
// * with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0 *
// * http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, *
// software distributed under the License is distributed on an * Unless required by applicable law or agreed to in writing,
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// specific language governing permissions and limitations * KIND, either express or implied. See the License for the
// under the License. * specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging; package org.apache.cloudstack.framework.messaging;

View File

@ -27,6 +27,6 @@ public interface TransportEndpoint {
void registerMultiplexier(String name, TransportMultiplexier multiplexier); void registerMultiplexier(String name, TransportMultiplexier multiplexier);
void unregisterMultiplexier(String name); void unregisterMultiplexier(String name);
void sendMessage(TransportEndpoint sender, String targetEndpointAddress, void sendMessage(String soureEndpointAddress, String targetEndpointAddress,
String multiplexier, String message); String multiplexier, String message);
} }

View File

@ -50,7 +50,7 @@ public class ClientTransportEndpoint implements TransportEndpoint {
} }
@Override @Override
public void sendMessage(TransportEndpoint sender, public void sendMessage(String sourceEndpointAddress,
String targetEndpointAddress, String multiplexier, String message) { String targetEndpointAddress, String multiplexier, String message) {
// TODO Auto-generated method stub // TODO Auto-generated method stub
} }