Connnect event bus to messaging layer

This commit is contained in:
Kelven Yang 2012-11-14 19:28:34 -08:00
parent cad75e7220
commit 2cae511464
31 changed files with 149 additions and 509 deletions

View File

@ -1,27 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.container;
/**
* Container of different processes so it doesn't have to deal with message
* to Java call.
*/
public interface ServerContainer {
}

View File

@ -1,22 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.ipc;
public @interface IpcParam {
}

View File

@ -1,42 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.ipc;
import java.util.Map;
/**
* Publish the Event
*
*/
public interface Publisher {
/**
* Publish a topic
*
* @param topic topic being published
* @param content content published
* @return true if the topic has been picked up; false if not.
*/
boolean publish(String topic, Map<String, Object> content);
/**
* @return the name of this publisher
*/
String getName();
}

View File

@ -1,37 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.ipc;
/**
* Handles results of the ipc call
*
*/
public interface ResultHandler {
/**
* Signals a successful result
*/
void signalSuccess();
/**
* Signals an error result
* @param e exception thrown
*/
void signalError(Exception e);
}

View File

@ -1,35 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.ipc;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Marks a method as an ipc mechanism
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Signal {
String topic();
String responseTopic();
}

View File

@ -1,23 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.ipc;
public class SignalIntercepter {
}

View File

@ -18,7 +18,7 @@
*/
package org.apache.cloudstack.framework.messaging;
public class ComponentEndpoint implements RpcEndpoint, TransportMultiplexier {
public class ComponentEndpoint implements RpcEndpoint, TransportMultiplexier, Subscriber {
private TransportEndpoint transportEndpoint;
private RpcProvider rpcProvider;
@ -57,19 +57,29 @@ public class ComponentEndpoint implements RpcEndpoint, TransportMultiplexier {
}
@Override
public String call(String targetAddress, String rpcMessage) {
// TODO Auto-generated method stub
public String call(String targetAddress, String rpcMessage)
{
return null;
}
@Override
public RpcCall asyncCall(String targetAddress, String rpcMessage) {
return null;
}
@Override
public void asyncCall(String targetAddress, String rpcMessage) {
public void onCallReceive(RpcCall call) {
// TODO Auto-generated method stub
// implement annotation based call dispatching
}
@Override
public void onCall(RpcCall call) {
// TODO Auto-generated method stub
public void onCallReturn(RpcCall call, String rpcReturnMessage, RpcException e) {
// ???
}
@Override
public void onPublishEvent(String subject, String senderAddress, String args) {
// TODO
}
}

View File

@ -15,13 +15,11 @@
// specific language governing permissions and limitations
// under the License.
package com.cloud.utils.events;
import java.io.Serializable;
package org.apache.cloudstack.framework.messaging;
public interface EventBus {
void subscribe(String subject, Subscriber subscriber);
void unsubscribe(String subject, Subscriber subscriber);
void publish(String subject, PublishScope scope, Object sender, Serializable args);
void publish(String subject, PublishScope scope, String senderAddress, String args);
}

View File

@ -15,16 +15,15 @@
// specific language governing permissions and limitations
// under the License.
package com.cloud.utils.events;
package org.apache.cloudstack.framework.messaging;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import edu.emory.mathcs.backport.java.util.Arrays;
import edu.emory.mathcs.backport.java.util.Collections;
public class EventBusBase implements EventBus {
@ -72,8 +71,8 @@ public class EventBusBase implements EventBus {
}
@Override
public void publish(String subject, PublishScope scope, Object sender,
Serializable args) {
public void publish(String subject, PublishScope scope, String senderAddress,
String args) {
if(_gate.enter(true)) {
@ -81,11 +80,11 @@ public class EventBusBase implements EventBus {
SubscriptionNode current = locate(subject, chainFromTop, false);
if(current != null)
current.notifySubscribers(subject, sender, args);
current.notifySubscribers(subject, senderAddress, args);
Collections.reverse(chainFromTop);
for(SubscriptionNode node : chainFromTop)
node.notifySubscribers(subject, sender, args);
node.notifySubscribers(subject, senderAddress, args);
_gate.leave();
}
@ -284,9 +283,9 @@ public class EventBusBase implements EventBus {
_children.put(key, childNode);
}
public void notifySubscribers(String subject, Object sender, Serializable args) {
public void notifySubscribers(String subject, String senderAddress, String args) {
for(Subscriber subscriber : _subscribers) {
subscriber.onPublishEvent(subject, sender, args);
subscriber.onPublishEvent(subject, senderAddress, args);
}
}
}

View File

@ -0,0 +1,12 @@
package org.apache.cloudstack.framework.messaging;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface EventHandler {
public String topic();
}

View File

@ -1,29 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
public interface Message {
String getTitle();
String getContent();
String getPublisherTag();
long getSendTime();
long getReceiveTime();
}

View File

@ -1,30 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface MessageHandler {
String messageTitle();
}

View File

@ -1,47 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
import org.apache.cloudstack.framework.ipc.Publisher;
import org.apache.cloudstack.framework.ipc.Subscriber;
public interface MessageSystem {
/**
* Creates the publisher
* @param name of the publisher
* @return publisher
*/
Publisher createPublisher(String name);
/**
* Creates the subscriber
* @param name of the subscriber
* @return subscriber
*/
Subscriber createSubscriber(String name);
/**
* registers the subscriber
* @param subscriber subscriber
* @param topic topic to listen to
* @return subscriber
*/
boolean registerSubscriber(Subscriber subscriber, String topic);
}

View File

@ -1,24 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
public interface MessagingDeliveryListener {
void onAcknowledge(String topicChannel, Message message);
void onTimeout(String topicChannel, Message message);
}

View File

@ -1,24 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
public enum MessagingDeliveryStrategy {
atMostOneCasting,
blindCasting
}

View File

@ -1,32 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
public interface MessagingProvider {
void createChanel(String topic, MessagingDeliveryStrategy deliveryStrategy);
void pulishMessage(String topic, Message message);
void publishCertifiedMessage(String topic, Message message,
int retryIntervalMillis, int timeoutMillis,
MessagingDeliveryListener deliveryListener);
void subscribe(String topicChannel, String messageTitle, MessagingSubscriber subscriber);
void unsubscribe(String topicChannel, String messageTitle, MessagingSubscriber subscriber);
}

View File

@ -1,23 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging;
public interface MessagingSubscriber {
void onMessage(String topic, Message message);
}

View File

@ -15,9 +15,28 @@
// specific language governing permissions and limitations
// under the License.
package com.cloud.utils.events;
package org.apache.cloudstack.framework.messaging;
public enum PublishScope {
LOCAL,
GLOBAL
public class PublishScope {
public enum Type { SINGLE, LOCAL, GLOBAL };
Type scope;
String address;
public PublishScope(Type scope) {
this.scope = scope;
}
public PublishScope(String address) {
scope = Type.SINGLE;
this.address = address;
}
public Type getType() {
return scope;
}
public String getAddress() {
return this.address;
}
}

View File

@ -22,6 +22,10 @@ public interface RpcCall {
String getCommand();
String getContent();
String getRequestTag();
// for sender to cancel
void cancel();
// for receiver to response call
void completeCall(String rpcMessage);
}

View File

@ -27,4 +27,5 @@ import java.lang.annotation.Target;
@Target(ElementType.METHOD)
public @interface RpcCallHandler {
String command();
boolean returnHandler() default false;
}

View File

@ -19,8 +19,9 @@
package org.apache.cloudstack.framework.messaging;
public interface RpcEndpoint {
public String call(String targetAddress, String rpcMessage);
public void asyncCall(String targetAddress, String rpcMessage);
String call(String targetAddress, String rpcMessage);
RpcCall asyncCall(String targetAddress, String rpcMessage);
void onCall(RpcCall call);
void onCallReceive(RpcCall call);
void onCallReturn(RpcCall call, String rpcReturnMessage, RpcException e);
}

View File

@ -0,0 +1,17 @@
package org.apache.cloudstack.framework.messaging;
public class RpcException extends Exception {
private static final long serialVersionUID = -3164514701087423787L;
public RpcException() {
super();
}
public RpcException(String message) {
super(message);
}
public RpcException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,18 @@
package org.apache.cloudstack.framework.messaging;
public class RpcIOException extends RpcException {
private static final long serialVersionUID = -6108039302920641533L;
public RpcIOException() {
super();
}
public RpcIOException(String message) {
super(message);
}
public RpcIOException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,14 @@
package org.apache.cloudstack.framework.messaging;
public class RpcTimeoutException extends RpcException {
private static final long serialVersionUID = -3618654987984665833L;
public RpcTimeoutException() {
super();
}
public RpcTimeoutException(String message) {
super(message);
}
}

View File

@ -15,10 +15,8 @@
// specific language governing permissions and limitations
// under the License.
package com.cloud.utils.events;
import java.io.Serializable;
package org.apache.cloudstack.framework.messaging;
public interface Subscriber {
void onPublishEvent(String subject, Object sender, Serializable args);
void onPublishEvent(String subject, String senderAddress, String args);
}

View File

@ -20,6 +20,7 @@ package org.apache.cloudstack.framework.messaging;
public interface TransportEndpoint {
void onAttachConfirm(String endpointAddress);
void onDetachIndication(String endpointAddress);
void registerMultiplexier(String name, TransportMultiplexier multiplexier);
void unregisterMultiplexier(String name);

View File

@ -16,24 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.ipc;
package org.apache.cloudstack.framework.messaging.client;
import org.apache.cloudstack.framework.messaging.Message;
import org.apache.cloudstack.framework.messaging.EventBusBase;
import org.apache.cloudstack.framework.messaging.TransportMultiplexier;
/**
* Event subscriber interface
*
*/
public interface Subscriber {
/**
* Message received
*/
Message receive();
/**
* @return the name of the subscriber
*/
String getName();
public class ClientEventBus extends EventBusBase implements TransportMultiplexier {
@Override
public void onTransportMessage(String senderEndpointAddress,
String targetEndpointAddress, String multiplexer, String message) {
// TODO Auto-generated method stub
}
}

View File

@ -27,6 +27,10 @@ public class ClientTransportEndpoint implements TransportEndpoint {
public void onAttachConfirm(String endpointAddress) {
// TODO Auto-generated method stub
}
@Override
public void onDetachIndication(String endpointAddress) {
}
@Override
public void registerMultiplexier(String name,

View File

@ -1,58 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging.hornetq;
import org.apache.cloudstack.framework.messaging.Message;
import org.apache.cloudstack.framework.messaging.MessagingDeliveryListener;
import org.apache.cloudstack.framework.messaging.MessagingDeliveryStrategy;
import org.apache.cloudstack.framework.messaging.MessagingProvider;
import org.apache.cloudstack.framework.messaging.MessagingSubscriber;
public class EmbeddedMessagingProvider implements MessagingProvider {
public void createChanel(String topic,
MessagingDeliveryStrategy deliveryStrategy) {
// TODO Auto-generated method stub
}
public void pulishMessage(String topic, Message message) {
// TODO Auto-generated method stub
}
public void publishCertifiedMessage(String topic, Message message,
int retryIntervalMillis, int timeoutMillis,
MessagingDeliveryListener deliveryListener) {
// TODO Auto-generated method stub
}
public void subscribe(String topicChannel, String messageTitle,
MessagingSubscriber subscriber) {
// TODO Auto-generated method stub
}
public void unsubscribe(String topicChannel, String messageTitle,
MessagingSubscriber subscriber) {
// TODO Auto-generated method stub
}
}

View File

@ -16,12 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.messaging.server;
package org.apache.cloudstack.framework.messaging;
import org.apache.cloudstack.framework.messaging.EventBusBase;
import org.apache.cloudstack.framework.messaging.TransportMultiplexier;
public class SubscriberDelegate implements MessagingSubscriber {
public class ServerEventBus extends EventBusBase implements TransportMultiplexier {
public void onMessage(String topic, Message message) {
// ???
@Override
public void onTransportMessage(String senderEndpointAddress,
String targetEndpointAddress, String multiplexer, String message) {
// TODO Auto-generated method stub
}
}

View File

@ -4,10 +4,10 @@ import java.io.Serializable;
import javax.annotation.PostConstruct;
import org.apache.cloudstack.framework.messaging.EventBusBase;
import org.apache.cloudstack.framework.messaging.PublishScope;
import org.springframework.stereotype.Component;
import com.cloud.utils.events.EventBusBase;
import com.cloud.utils.events.PublishScope;
@Component
public class ClusterEventBus extends EventBusBase {