From 2cae5114644c58dab1626bfb18d6e85aeb7829f0 Mon Sep 17 00:00:00 2001 From: Kelven Yang Date: Wed, 14 Nov 2012 19:28:34 -0800 Subject: [PATCH] Connnect event bus to messaging layer --- .../framework/container/ServerContainer.java | 27 --------- .../cloudstack/framework/ipc/IpcParam.java | 22 ------- .../cloudstack/framework/ipc/Publisher.java | 42 -------------- .../framework/ipc/ResultHandler.java | 37 ------------ .../cloudstack/framework/ipc/Signal.java | 35 ----------- .../framework/ipc/SignalIntercepter.java | 23 -------- .../messaging/ComponentEndpoint.java | 26 ++++++--- .../framework/messaging}/EventBus.java | 6 +- .../framework/messaging}/EventBusBase.java | 19 +++--- .../framework/messaging/EventHandler.java | 12 ++++ .../framework/messaging/Message.java | 29 ---------- .../framework/messaging/MessageHandler.java | 30 ---------- .../framework/messaging/MessageSystem.java | 47 --------------- .../messaging/MessagingDeliveryListener.java | 24 -------- .../messaging/MessagingDeliveryStrategy.java | 24 -------- .../messaging/MessagingProvider.java | 32 ---------- .../messaging/MessagingSubscriber.java | 23 -------- .../framework/messaging}/PublishScope.java | 27 +++++++-- .../framework/messaging/RpcCall.java | 4 ++ .../framework/messaging/RpcCallHandler.java | 1 + .../framework/messaging/RpcEndpoint.java | 7 ++- .../framework/messaging/RpcException.java | 17 ++++++ .../framework/messaging/RpcIOException.java | 18 ++++++ .../messaging/RpcTimeoutException.java | 14 +++++ .../framework/messaging}/Subscriber.java | 6 +- .../messaging/TransportEndpoint.java | 1 + .../client/ClientEventBus.java} | 27 ++++----- .../client/ClientTransportEndpoint.java | 4 ++ .../hornetq/EmbeddedMessagingProvider.java | 58 ------------------- .../ServerEventBus.java} | 12 ++-- .../com/cloud/cluster/ClusterEventBus.java | 4 +- 31 files changed, 149 insertions(+), 509 deletions(-) delete mode 100755 framework/ipc/src/org/apache/cloudstack/framework/container/ServerContainer.java delete mode 100755 framework/ipc/src/org/apache/cloudstack/framework/ipc/IpcParam.java delete mode 100755 framework/ipc/src/org/apache/cloudstack/framework/ipc/Publisher.java delete mode 100755 framework/ipc/src/org/apache/cloudstack/framework/ipc/ResultHandler.java delete mode 100755 framework/ipc/src/org/apache/cloudstack/framework/ipc/Signal.java delete mode 100755 framework/ipc/src/org/apache/cloudstack/framework/ipc/SignalIntercepter.java rename {utils/src/com/cloud/utils/events => framework/ipc/src/org/apache/cloudstack/framework/messaging}/EventBus.java (86%) rename {utils/src/com/cloud/utils/events => framework/ipc/src/org/apache/cloudstack/framework/messaging}/EventBusBase.java (92%) create mode 100644 framework/ipc/src/org/apache/cloudstack/framework/messaging/EventHandler.java delete mode 100755 framework/ipc/src/org/apache/cloudstack/framework/messaging/Message.java delete mode 100644 framework/ipc/src/org/apache/cloudstack/framework/messaging/MessageHandler.java delete mode 100755 framework/ipc/src/org/apache/cloudstack/framework/messaging/MessageSystem.java delete mode 100644 framework/ipc/src/org/apache/cloudstack/framework/messaging/MessagingDeliveryListener.java delete mode 100644 framework/ipc/src/org/apache/cloudstack/framework/messaging/MessagingDeliveryStrategy.java delete mode 100644 framework/ipc/src/org/apache/cloudstack/framework/messaging/MessagingProvider.java delete mode 100644 framework/ipc/src/org/apache/cloudstack/framework/messaging/MessagingSubscriber.java rename {utils/src/com/cloud/utils/events => framework/ipc/src/org/apache/cloudstack/framework/messaging}/PublishScope.java (65%) create mode 100644 framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcException.java create mode 100644 framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcIOException.java create mode 100644 framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcTimeoutException.java rename {utils/src/com/cloud/utils/events => framework/ipc/src/org/apache/cloudstack/framework/messaging}/Subscriber.java (86%) rename framework/ipc/src/org/apache/cloudstack/framework/{ipc/Subscriber.java => messaging/client/ClientEventBus.java} (63%) mode change 100755 => 100644 delete mode 100644 framework/ipc/src/org/apache/cloudstack/framework/messaging/hornetq/EmbeddedMessagingProvider.java rename framework/ipc/src/org/apache/cloudstack/framework/messaging/{SubscriberDelegate.java => server/ServerEventBus.java} (64%) diff --git a/framework/ipc/src/org/apache/cloudstack/framework/container/ServerContainer.java b/framework/ipc/src/org/apache/cloudstack/framework/container/ServerContainer.java deleted file mode 100755 index 04ec03178a8..00000000000 --- a/framework/ipc/src/org/apache/cloudstack/framework/container/ServerContainer.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cloudstack.framework.container; - -/** - * Container of different processes so it doesn't have to deal with message - * to Java call. - */ -public interface ServerContainer { - -} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/ipc/IpcParam.java b/framework/ipc/src/org/apache/cloudstack/framework/ipc/IpcParam.java deleted file mode 100755 index 9bb811db5b9..00000000000 --- a/framework/ipc/src/org/apache/cloudstack/framework/ipc/IpcParam.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cloudstack.framework.ipc; - -public @interface IpcParam { -} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/ipc/Publisher.java b/framework/ipc/src/org/apache/cloudstack/framework/ipc/Publisher.java deleted file mode 100755 index 4afabd45321..00000000000 --- a/framework/ipc/src/org/apache/cloudstack/framework/ipc/Publisher.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cloudstack.framework.ipc; - -import java.util.Map; - -/** - * Publish the Event - * - */ -public interface Publisher { - /** - * Publish a topic - * - * @param topic topic being published - * @param content content published - * @return true if the topic has been picked up; false if not. - */ - boolean publish(String topic, Map content); - - /** - * @return the name of this publisher - */ - String getName(); - -} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/ipc/ResultHandler.java b/framework/ipc/src/org/apache/cloudstack/framework/ipc/ResultHandler.java deleted file mode 100755 index 5c5c1c82abd..00000000000 --- a/framework/ipc/src/org/apache/cloudstack/framework/ipc/ResultHandler.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cloudstack.framework.ipc; - -/** - * Handles results of the ipc call - * - */ -public interface ResultHandler { - - /** - * Signals a successful result - */ - void signalSuccess(); - - /** - * Signals an error result - * @param e exception thrown - */ - void signalError(Exception e); -} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/ipc/Signal.java b/framework/ipc/src/org/apache/cloudstack/framework/ipc/Signal.java deleted file mode 100755 index 1011eaa9d42..00000000000 --- a/framework/ipc/src/org/apache/cloudstack/framework/ipc/Signal.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cloudstack.framework.ipc; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -/** - * Marks a method as an ipc mechanism - */ -@Retention(RetentionPolicy.RUNTIME) -@Target(ElementType.METHOD) -public @interface Signal { - String topic(); - - String responseTopic(); -} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/ipc/SignalIntercepter.java b/framework/ipc/src/org/apache/cloudstack/framework/ipc/SignalIntercepter.java deleted file mode 100755 index e0e739cd22d..00000000000 --- a/framework/ipc/src/org/apache/cloudstack/framework/ipc/SignalIntercepter.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cloudstack.framework.ipc; - -public class SignalIntercepter { - -} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentEndpoint.java index 0c55b301277..ed9ea0b721a 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentEndpoint.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/ComponentEndpoint.java @@ -18,7 +18,7 @@ */ package org.apache.cloudstack.framework.messaging; -public class ComponentEndpoint implements RpcEndpoint, TransportMultiplexier { +public class ComponentEndpoint implements RpcEndpoint, TransportMultiplexier, Subscriber { private TransportEndpoint transportEndpoint; private RpcProvider rpcProvider; @@ -57,19 +57,29 @@ public class ComponentEndpoint implements RpcEndpoint, TransportMultiplexier { } @Override - public String call(String targetAddress, String rpcMessage) { - // TODO Auto-generated method stub + public String call(String targetAddress, String rpcMessage) + { + return null; + } + + @Override + public RpcCall asyncCall(String targetAddress, String rpcMessage) { return null; } @Override - public void asyncCall(String targetAddress, String rpcMessage) { + public void onCallReceive(RpcCall call) { // TODO Auto-generated method stub + // implement annotation based call dispatching } - + @Override - public void onCall(RpcCall call) { - // TODO Auto-generated method stub - + public void onCallReturn(RpcCall call, String rpcReturnMessage, RpcException e) { + // ??? + } + + @Override + public void onPublishEvent(String subject, String senderAddress, String args) { + // TODO } } diff --git a/utils/src/com/cloud/utils/events/EventBus.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBus.java similarity index 86% rename from utils/src/com/cloud/utils/events/EventBus.java rename to framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBus.java index c1b6f707449..e11a009098a 100644 --- a/utils/src/com/cloud/utils/events/EventBus.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBus.java @@ -15,13 +15,11 @@ // specific language governing permissions and limitations // under the License. -package com.cloud.utils.events; - -import java.io.Serializable; +package org.apache.cloudstack.framework.messaging; public interface EventBus { void subscribe(String subject, Subscriber subscriber); void unsubscribe(String subject, Subscriber subscriber); - void publish(String subject, PublishScope scope, Object sender, Serializable args); + void publish(String subject, PublishScope scope, String senderAddress, String args); } diff --git a/utils/src/com/cloud/utils/events/EventBusBase.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBusBase.java similarity index 92% rename from utils/src/com/cloud/utils/events/EventBusBase.java rename to framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBusBase.java index cd10f1d7506..fbcf6488837 100644 --- a/utils/src/com/cloud/utils/events/EventBusBase.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventBusBase.java @@ -15,16 +15,15 @@ // specific language governing permissions and limitations // under the License. -package com.cloud.utils.events; +package org.apache.cloudstack.framework.messaging; -import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import edu.emory.mathcs.backport.java.util.Arrays; -import edu.emory.mathcs.backport.java.util.Collections; public class EventBusBase implements EventBus { @@ -72,8 +71,8 @@ public class EventBusBase implements EventBus { } @Override - public void publish(String subject, PublishScope scope, Object sender, - Serializable args) { + public void publish(String subject, PublishScope scope, String senderAddress, + String args) { if(_gate.enter(true)) { @@ -81,11 +80,11 @@ public class EventBusBase implements EventBus { SubscriptionNode current = locate(subject, chainFromTop, false); if(current != null) - current.notifySubscribers(subject, sender, args); + current.notifySubscribers(subject, senderAddress, args); Collections.reverse(chainFromTop); for(SubscriptionNode node : chainFromTop) - node.notifySubscribers(subject, sender, args); + node.notifySubscribers(subject, senderAddress, args); _gate.leave(); } @@ -284,9 +283,9 @@ public class EventBusBase implements EventBus { _children.put(key, childNode); } - public void notifySubscribers(String subject, Object sender, Serializable args) { + public void notifySubscribers(String subject, String senderAddress, String args) { for(Subscriber subscriber : _subscribers) { - subscriber.onPublishEvent(subject, sender, args); + subscriber.onPublishEvent(subject, senderAddress, args); } } } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventHandler.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventHandler.java new file mode 100644 index 00000000000..12e6fb8394a --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/EventHandler.java @@ -0,0 +1,12 @@ +package org.apache.cloudstack.framework.messaging; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface EventHandler { + public String topic(); +} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/Message.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/Message.java deleted file mode 100755 index 21617d56bd4..00000000000 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/Message.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cloudstack.framework.messaging; - -public interface Message { - - String getTitle(); - String getContent(); - String getPublisherTag(); - - long getSendTime(); - long getReceiveTime(); -} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessageHandler.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessageHandler.java deleted file mode 100644 index 106104200c1..00000000000 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessageHandler.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cloudstack.framework.messaging; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -@Retention(RetentionPolicy.RUNTIME) -@Target(ElementType.METHOD) -public @interface MessageHandler { - String messageTitle(); -} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessageSystem.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessageSystem.java deleted file mode 100755 index cc506efa51e..00000000000 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessageSystem.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cloudstack.framework.messaging; - -import org.apache.cloudstack.framework.ipc.Publisher; -import org.apache.cloudstack.framework.ipc.Subscriber; - -public interface MessageSystem { - - /** - * Creates the publisher - * @param name of the publisher - * @return publisher - */ - Publisher createPublisher(String name); - - /** - * Creates the subscriber - * @param name of the subscriber - * @return subscriber - */ - Subscriber createSubscriber(String name); - - /** - * registers the subscriber - * @param subscriber subscriber - * @param topic topic to listen to - * @return subscriber - */ - boolean registerSubscriber(Subscriber subscriber, String topic); -} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessagingDeliveryListener.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessagingDeliveryListener.java deleted file mode 100644 index be70b540dfc..00000000000 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessagingDeliveryListener.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cloudstack.framework.messaging; - -public interface MessagingDeliveryListener { - void onAcknowledge(String topicChannel, Message message); - void onTimeout(String topicChannel, Message message); -} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessagingDeliveryStrategy.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessagingDeliveryStrategy.java deleted file mode 100644 index 2a9b066f78d..00000000000 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessagingDeliveryStrategy.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cloudstack.framework.messaging; - -public enum MessagingDeliveryStrategy { - atMostOneCasting, - blindCasting -} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessagingProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessagingProvider.java deleted file mode 100644 index 07e3d715932..00000000000 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessagingProvider.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cloudstack.framework.messaging; - -public interface MessagingProvider { - void createChanel(String topic, MessagingDeliveryStrategy deliveryStrategy); - - void pulishMessage(String topic, Message message); - - void publishCertifiedMessage(String topic, Message message, - int retryIntervalMillis, int timeoutMillis, - MessagingDeliveryListener deliveryListener); - - void subscribe(String topicChannel, String messageTitle, MessagingSubscriber subscriber); - void unsubscribe(String topicChannel, String messageTitle, MessagingSubscriber subscriber); -} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessagingSubscriber.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessagingSubscriber.java deleted file mode 100644 index 5019a43b41c..00000000000 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/MessagingSubscriber.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cloudstack.framework.messaging; - -public interface MessagingSubscriber { - void onMessage(String topic, Message message); -} diff --git a/utils/src/com/cloud/utils/events/PublishScope.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/PublishScope.java similarity index 65% rename from utils/src/com/cloud/utils/events/PublishScope.java rename to framework/ipc/src/org/apache/cloudstack/framework/messaging/PublishScope.java index bab852a11dc..fbce919af6d 100644 --- a/utils/src/com/cloud/utils/events/PublishScope.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/PublishScope.java @@ -15,9 +15,28 @@ // specific language governing permissions and limitations // under the License. -package com.cloud.utils.events; +package org.apache.cloudstack.framework.messaging; -public enum PublishScope { - LOCAL, - GLOBAL +public class PublishScope { + public enum Type { SINGLE, LOCAL, GLOBAL }; + + Type scope; + String address; + + public PublishScope(Type scope) { + this.scope = scope; + } + + public PublishScope(String address) { + scope = Type.SINGLE; + this.address = address; + } + + public Type getType() { + return scope; + } + + public String getAddress() { + return this.address; + } } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCall.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCall.java index be16bdd3e91..570b82aa5a4 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCall.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCall.java @@ -22,6 +22,10 @@ public interface RpcCall { String getCommand(); String getContent(); String getRequestTag(); + + // for sender to cancel + void cancel(); + // for receiver to response call void completeCall(String rpcMessage); } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallHandler.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallHandler.java index 2d940645114..70110ee8701 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallHandler.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcCallHandler.java @@ -27,4 +27,5 @@ import java.lang.annotation.Target; @Target(ElementType.METHOD) public @interface RpcCallHandler { String command(); + boolean returnHandler() default false; } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcEndpoint.java index 62dc97354fa..1be5890e7af 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcEndpoint.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcEndpoint.java @@ -19,8 +19,9 @@ package org.apache.cloudstack.framework.messaging; public interface RpcEndpoint { - public String call(String targetAddress, String rpcMessage); - public void asyncCall(String targetAddress, String rpcMessage); + String call(String targetAddress, String rpcMessage); + RpcCall asyncCall(String targetAddress, String rpcMessage); - void onCall(RpcCall call); + void onCallReceive(RpcCall call); + void onCallReturn(RpcCall call, String rpcReturnMessage, RpcException e); } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcException.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcException.java new file mode 100644 index 00000000000..978f1c3421d --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcException.java @@ -0,0 +1,17 @@ +package org.apache.cloudstack.framework.messaging; + +public class RpcException extends Exception { + private static final long serialVersionUID = -3164514701087423787L; + + public RpcException() { + super(); + } + + public RpcException(String message) { + super(message); + } + + public RpcException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcIOException.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcIOException.java new file mode 100644 index 00000000000..8b09ca90540 --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcIOException.java @@ -0,0 +1,18 @@ +package org.apache.cloudstack.framework.messaging; + +public class RpcIOException extends RpcException { + + private static final long serialVersionUID = -6108039302920641533L; + + public RpcIOException() { + super(); + } + + public RpcIOException(String message) { + super(message); + } + + public RpcIOException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcTimeoutException.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcTimeoutException.java new file mode 100644 index 00000000000..c0521c56730 --- /dev/null +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/RpcTimeoutException.java @@ -0,0 +1,14 @@ +package org.apache.cloudstack.framework.messaging; + +public class RpcTimeoutException extends RpcException { + + private static final long serialVersionUID = -3618654987984665833L; + + public RpcTimeoutException() { + super(); + } + + public RpcTimeoutException(String message) { + super(message); + } +} diff --git a/utils/src/com/cloud/utils/events/Subscriber.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/Subscriber.java similarity index 86% rename from utils/src/com/cloud/utils/events/Subscriber.java rename to framework/ipc/src/org/apache/cloudstack/framework/messaging/Subscriber.java index c3baa6f1169..3eb3fc60a67 100644 --- a/utils/src/com/cloud/utils/events/Subscriber.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/Subscriber.java @@ -15,10 +15,8 @@ // specific language governing permissions and limitations // under the License. -package com.cloud.utils.events; - -import java.io.Serializable; +package org.apache.cloudstack.framework.messaging; public interface Subscriber { - void onPublishEvent(String subject, Object sender, Serializable args); + void onPublishEvent(String subject, String senderAddress, String args); } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java index 68982e2bdea..3a98681ee64 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/TransportEndpoint.java @@ -20,6 +20,7 @@ package org.apache.cloudstack.framework.messaging; public interface TransportEndpoint { void onAttachConfirm(String endpointAddress); + void onDetachIndication(String endpointAddress); void registerMultiplexier(String name, TransportMultiplexier multiplexier); void unregisterMultiplexier(String name); diff --git a/framework/ipc/src/org/apache/cloudstack/framework/ipc/Subscriber.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientEventBus.java old mode 100755 new mode 100644 similarity index 63% rename from framework/ipc/src/org/apache/cloudstack/framework/ipc/Subscriber.java rename to framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientEventBus.java index 0856cdd654a..68303bc9912 --- a/framework/ipc/src/org/apache/cloudstack/framework/ipc/Subscriber.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientEventBus.java @@ -16,24 +16,17 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.cloudstack.framework.ipc; +package org.apache.cloudstack.framework.messaging.client; -import org.apache.cloudstack.framework.messaging.Message; +import org.apache.cloudstack.framework.messaging.EventBusBase; +import org.apache.cloudstack.framework.messaging.TransportMultiplexier; -/** - * Event subscriber interface - * - */ -public interface Subscriber { - - /** - * Message received - */ - Message receive(); - - /** - * @return the name of the subscriber - */ - String getName(); +public class ClientEventBus extends EventBusBase implements TransportMultiplexier { + @Override + public void onTransportMessage(String senderEndpointAddress, + String targetEndpointAddress, String multiplexer, String message) { + // TODO Auto-generated method stub + + } } diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java index 8ee4b8ff3d5..49a1eb6c49b 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/client/ClientTransportEndpoint.java @@ -27,6 +27,10 @@ public class ClientTransportEndpoint implements TransportEndpoint { public void onAttachConfirm(String endpointAddress) { // TODO Auto-generated method stub } + + @Override + public void onDetachIndication(String endpointAddress) { + } @Override public void registerMultiplexier(String name, diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/hornetq/EmbeddedMessagingProvider.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/hornetq/EmbeddedMessagingProvider.java deleted file mode 100644 index d9281285680..00000000000 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/hornetq/EmbeddedMessagingProvider.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cloudstack.framework.messaging.hornetq; - -import org.apache.cloudstack.framework.messaging.Message; -import org.apache.cloudstack.framework.messaging.MessagingDeliveryListener; -import org.apache.cloudstack.framework.messaging.MessagingDeliveryStrategy; -import org.apache.cloudstack.framework.messaging.MessagingProvider; -import org.apache.cloudstack.framework.messaging.MessagingSubscriber; - -public class EmbeddedMessagingProvider implements MessagingProvider { - - public void createChanel(String topic, - MessagingDeliveryStrategy deliveryStrategy) { - // TODO Auto-generated method stub - - } - - public void pulishMessage(String topic, Message message) { - // TODO Auto-generated method stub - - } - - public void publishCertifiedMessage(String topic, Message message, - int retryIntervalMillis, int timeoutMillis, - MessagingDeliveryListener deliveryListener) { - // TODO Auto-generated method stub - - } - - public void subscribe(String topicChannel, String messageTitle, - MessagingSubscriber subscriber) { - // TODO Auto-generated method stub - - } - - public void unsubscribe(String topicChannel, String messageTitle, - MessagingSubscriber subscriber) { - // TODO Auto-generated method stub - - } -} diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messaging/SubscriberDelegate.java b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerEventBus.java similarity index 64% rename from framework/ipc/src/org/apache/cloudstack/framework/messaging/SubscriberDelegate.java rename to framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerEventBus.java index 40a2eea5ebd..ff02cb876ac 100644 --- a/framework/ipc/src/org/apache/cloudstack/framework/messaging/SubscriberDelegate.java +++ b/framework/ipc/src/org/apache/cloudstack/framework/messaging/server/ServerEventBus.java @@ -16,12 +16,16 @@ * specific language governing permissions and limitations * under the License. */ +package org.apache.cloudstack.framework.messaging.server; -package org.apache.cloudstack.framework.messaging; +import org.apache.cloudstack.framework.messaging.EventBusBase; +import org.apache.cloudstack.framework.messaging.TransportMultiplexier; -public class SubscriberDelegate implements MessagingSubscriber { +public class ServerEventBus extends EventBusBase implements TransportMultiplexier { - public void onMessage(String topic, Message message) { - // ??? + @Override + public void onTransportMessage(String senderEndpointAddress, + String targetEndpointAddress, String multiplexer, String message) { + // TODO Auto-generated method stub } } diff --git a/server/src/com/cloud/cluster/ClusterEventBus.java b/server/src/com/cloud/cluster/ClusterEventBus.java index 4df5d373e4c..63ceaea6ca6 100644 --- a/server/src/com/cloud/cluster/ClusterEventBus.java +++ b/server/src/com/cloud/cluster/ClusterEventBus.java @@ -4,10 +4,10 @@ import java.io.Serializable; import javax.annotation.PostConstruct; +import org.apache.cloudstack.framework.messaging.EventBusBase; +import org.apache.cloudstack.framework.messaging.PublishScope; import org.springframework.stereotype.Component; -import com.cloud.utils.events.EventBusBase; -import com.cloud.utils.events.PublishScope; @Component public class ClusterEventBus extends EventBusBase {