Add concept of caller context and message serilizer to messaging layer

This commit is contained in:
Kelven Yang 2012-11-15 15:59:37 -08:00
parent 3b668d2907
commit 28b682c8db
18 changed files with 171 additions and 75 deletions

View File

@ -18,8 +18,7 @@
*/ */
package org.apache.cloudstack.framework.messaging; package org.apache.cloudstack.framework.messaging;
public class ComponentEndpoint implements RpcEndpoint, TransportMultiplexier, Subscriber { public class ComponentEndpoint implements RpcEndpoint, Subscriber {
private TransportEndpoint transportEndpoint; private TransportEndpoint transportEndpoint;
private RpcProvider rpcProvider; private RpcProvider rpcProvider;
@ -42,44 +41,33 @@ public class ComponentEndpoint implements RpcEndpoint, TransportMultiplexier, Su
this.rpcProvider = rpcProvider; this.rpcProvider = rpcProvider;
} }
public void initialize(String[] multiplexiers) { public void initialize() {
if(multiplexiers != null) {
for(String name : multiplexiers)
transportEndpoint.registerMultiplexier(name, this);
}
rpcProvider.registerRpcEndpoint(this); rpcProvider.registerRpcEndpoint(this);
} }
@Override // it will throw RpcRuntimeException in case of transport
public void onTransportMessage(String senderEndpointAddress, public String call(RpcCallContext callContext, String targetAddress, String command, Object cmdArg)
String targetEndpointAddress, String multiplexer, String message) {
}
@Override
public String call(String targetAddress, String rpcMessage)
{ {
return null; return rpcProvider.call(this, callContext, targetAddress, command, cmdArg);
} }
@Override public RpcClientCall asyncCall(RpcCallContext callContext, String targetAddress, String command, Object cmdArg) {
public RpcCall asyncCall(String targetAddress, String rpcMessage) { return rpcProvider.asyncCall(this, callContext, targetAddress, command, cmdArg);
return null;
} }
@Override @Override
public void onCallReceive(RpcCall call) { public void onCallReceive(RpcServerCall call) {
// TODO Auto-generated method stub // TODO Auto-generated method stub
// implement annotation based call dispatching // implement annotation based call dispatching
} }
@Override @Override
public void onCallReturn(RpcCall call, String rpcReturnMessage, RpcException e) { public void onCallReturn(RpcClientCall call, Object returnObject, RpcException e) {
// ??? // ???
} }
@Override @Override
public void onPublishEvent(String subject, String senderAddress, String args) { public void onPublishEvent(String subject, String senderAddress, Object args) {
// TODO // TODO
} }
} }

View File

@ -18,8 +18,11 @@
package org.apache.cloudstack.framework.messaging; package org.apache.cloudstack.framework.messaging;
public interface EventBus { public interface EventBus {
void setMessageSerializer(MessageSerializer messageSerializer);
MessageSerializer getMessageSerializer();
void subscribe(String subject, Subscriber subscriber); void subscribe(String subject, Subscriber subscriber);
void unsubscribe(String subject, Subscriber subscriber); void unsubscribe(String subject, Subscriber subscriber);
void publish(String subject, PublishScope scope, String senderAddress, String args); void publish(String senderAddress, String subject, PublishScope scope, Object args);
} }

View File

@ -24,13 +24,13 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
public class EventBusBase implements EventBus { public class EventBusBase implements EventBus {
private Gate _gate; private Gate _gate;
private List<ActionRecord> _pendingActions; private List<ActionRecord> _pendingActions;
private SubscriptionNode _subscriberRoot; private SubscriptionNode _subscriberRoot;
private MessageSerializer _messageSerializer;
public EventBusBase() { public EventBusBase() {
_gate = new Gate(); _gate = new Gate();
@ -39,6 +39,16 @@ public class EventBusBase implements EventBus {
_subscriberRoot = new SubscriptionNode("/", null); _subscriberRoot = new SubscriptionNode("/", null);
} }
@Override
public void setMessageSerializer(MessageSerializer messageSerializer) {
_messageSerializer = messageSerializer;
}
@Override
public MessageSerializer getMessageSerializer() {
return _messageSerializer;
}
@Override @Override
public void subscribe(String subject, Subscriber subscriber) { public void subscribe(String subject, Subscriber subscriber) {
assert(subject != null); assert(subject != null);
@ -71,8 +81,8 @@ public class EventBusBase implements EventBus {
} }
@Override @Override
public void publish(String subject, PublishScope scope, String senderAddress, public void publish(String senderAddress, String subject, PublishScope scope,
String args) { Object args) {
if(_gate.enter(true)) { if(_gate.enter(true)) {
@ -80,11 +90,11 @@ public class EventBusBase implements EventBus {
SubscriptionNode current = locate(subject, chainFromTop, false); SubscriptionNode current = locate(subject, chainFromTop, false);
if(current != null) if(current != null)
current.notifySubscribers(subject, senderAddress, args); current.notifySubscribers(senderAddress, subject, args);
Collections.reverse(chainFromTop); Collections.reverse(chainFromTop);
for(SubscriptionNode node : chainFromTop) for(SubscriptionNode node : chainFromTop)
node.notifySubscribers(subject, senderAddress, args); node.notifySubscribers(senderAddress, subject, args);
_gate.leave(); _gate.leave();
} }
@ -283,9 +293,9 @@ public class EventBusBase implements EventBus {
_children.put(key, childNode); _children.put(key, childNode);
} }
public void notifySubscribers(String subject, String senderAddress, String args) { public void notifySubscribers(String senderAddress, String subject, Object args) {
for(Subscriber subscriber : _subscribers) { for(Subscriber subscriber : _subscribers) {
subscriber.onPublishEvent(subject, senderAddress, args); subscriber.onPublishEvent(senderAddress, subject, args);
} }
} }
} }

View File

@ -1,3 +1,19 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.framework.messaging; package org.apache.cloudstack.framework.messaging;
import java.lang.annotation.ElementType; import java.lang.annotation.ElementType;

View File

@ -18,6 +18,7 @@
*/ */
package org.apache.cloudstack.framework.messaging; package org.apache.cloudstack.framework.messaging;
public interface TransportAddressFactory { public interface MessageSerializer {
String createServiceAddress(String serviceProvider); String serializeTo(Object object);
Object serializeFrom(String message);
} }

View File

@ -17,26 +17,6 @@
package org.apache.cloudstack.framework.messaging; package org.apache.cloudstack.framework.messaging;
public class PublishScope { public enum PublishScope {
public enum Type { SINGLE, LOCAL, GLOBAL }; LOCAL, GLOBAL
Type scope;
String address;
public PublishScope(Type scope) {
this.scope = scope;
}
public PublishScope(String address) {
scope = Type.SINGLE;
this.address = address;
}
public Type getType() {
return scope;
}
public String getAddress() {
return this.address;
}
} }

View File

@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.framework.messaging;
import java.util.HashMap;
import java.util.Map;
public class RpcCallContext {
private final static int DEFAULT_RPC_TIMEOUT = 10000;
Map<String, Object> _contextMap = new HashMap<String, Object>();
int _timeoutMilliSeconds = DEFAULT_RPC_TIMEOUT;
public RpcCallContext() {
}
public int getTimeoutMilliSeconds() {
return _timeoutMilliSeconds;
}
public void setTimeoutMilliSeconds(int timeoutMilliseconds) {
_timeoutMilliSeconds = timeoutMilliseconds;
}
@SuppressWarnings("unchecked")
public <T> T getContextParameter(String key) {
return (T)_contextMap.get(key);
}
public void setContextParameter(String key, Object object) {
_contextMap.put(key, object);
}
}

View File

@ -0,0 +1,25 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.framework.messaging;
public interface RpcClientCall {
String getCommand();
Object getCommandArgument();
RpcCallContext getCallContext();
void cancel();
}

View File

@ -0,0 +1,12 @@
package org.apache.cloudstack.framework.messaging;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RpcClientCallHandler {
String command();
}

View File

@ -19,9 +19,6 @@
package org.apache.cloudstack.framework.messaging; package org.apache.cloudstack.framework.messaging;
public interface RpcEndpoint { public interface RpcEndpoint {
String call(String targetAddress, String rpcMessage); void onCallReceive(RpcServerCall call);
RpcCall asyncCall(String targetAddress, String rpcMessage); void onCallReturn(RpcClientCall call, Object returnObject, RpcException e);
void onCallReceive(RpcCall call);
void onCallReturn(RpcCall call, String rpcReturnMessage, RpcException e);
} }

View File

@ -1,6 +1,22 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.cloudstack.framework.messaging; package org.apache.cloudstack.framework.messaging;
public class RpcException extends Exception { public class RpcException extends RuntimeException {
private static final long serialVersionUID = -3164514701087423787L; private static final long serialVersionUID = -3164514701087423787L;
public RpcException() { public RpcException() {

View File

@ -19,6 +19,12 @@
package org.apache.cloudstack.framework.messaging; package org.apache.cloudstack.framework.messaging;
public interface RpcProvider extends TransportMultiplexier { public interface RpcProvider extends TransportMultiplexier {
public void registerRpcEndpoint(RpcEndpoint rpcEndpoint); void setMessageSerializer(MessageSerializer messageSerializer);
public void unregisteRpcEndpoint(RpcEndpoint rpcEndpoint); MessageSerializer getMessageSerializer();
void registerRpcEndpoint(RpcEndpoint rpcEndpoint);
void unregisteRpcEndpoint(RpcEndpoint rpcEndpoint);
String call(RpcEndpoint endpoint, RpcCallContext callContext, String targetAddress, String command, Object cmdArg);
RpcClientCall asyncCall(RpcEndpoint endpoint, RpcCallContext callContext, String targetAddress, String command, Object cmdArg);
} }

View File

@ -18,14 +18,11 @@
*/ */
package org.apache.cloudstack.framework.messaging; package org.apache.cloudstack.framework.messaging;
public interface RpcCall { public interface RpcServerCall {
String getCommand(); String getCommand();
String getContent(); Object getCommandArgument();
String getRequestTag(); String getRequestTag();
// for sender to cancel
void cancel();
// for receiver to response call // for receiver to response call
void completeCall(String rpcMessage); void completeCall(Object returnObject);
} }

View File

@ -25,7 +25,6 @@ import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD) @Target(ElementType.METHOD)
public @interface RpcCallHandler { public @interface RpcServerCallHandler {
String command(); String command();
boolean returnHandler() default false;
} }

View File

@ -18,5 +18,5 @@
package org.apache.cloudstack.framework.messaging; package org.apache.cloudstack.framework.messaging;
public interface Subscriber { public interface Subscriber {
void onPublishEvent(String subject, String senderAddress, String args); void onPublishEvent(String senderAddress, String subject, Object args);
} }

View File

@ -19,12 +19,12 @@
package org.apache.cloudstack.framework.messaging; package org.apache.cloudstack.framework.messaging;
public interface TransportEndpoint { public interface TransportEndpoint {
void onAttachConfirm(String endpointAddress); void onAttachConfirm(boolean bSuccess, String endpointAddress);
void onDetachIndication(String endpointAddress); void onDetachIndication(String endpointAddress);
void registerMultiplexier(String name, TransportMultiplexier multiplexier); void registerMultiplexier(String name, TransportMultiplexier multiplexier);
void unregisterMultiplexier(String name); void unregisterMultiplexier(String name);
void sendMessage(TransportEndpoint sender, String targetEndpointAddress, void sendMessage(TransportEndpoint sender, String targetEndpointAddress,
String multiplexier, String message); String multiplexier, String message);
} }

View File

@ -27,6 +27,5 @@ public class ClientEventBus extends EventBusBase implements TransportMultiplexie
public void onTransportMessage(String senderEndpointAddress, public void onTransportMessage(String senderEndpointAddress,
String targetEndpointAddress, String multiplexer, String message) { String targetEndpointAddress, String multiplexer, String message) {
// TODO Auto-generated method stub // TODO Auto-generated method stub
} }
} }

View File

@ -24,7 +24,7 @@ import org.apache.cloudstack.framework.messaging.TransportMultiplexier;
public class ClientTransportEndpoint implements TransportEndpoint { public class ClientTransportEndpoint implements TransportEndpoint {
@Override @Override
public void onAttachConfirm(String endpointAddress) { public void onAttachConfirm(boolean bSuccess, String endpointAddress) {
// TODO Auto-generated method stub // TODO Auto-generated method stub
} }