From 3a6fcaf1fc4132a59b5260b1854fcbe7390d848c Mon Sep 17 00:00:00 2001 From: Murali Reddy Date: Mon, 6 Jan 2014 13:21:49 +0530 Subject: [PATCH] CLOUDSTACK-5787: support in-memroy eventbus this checkin adds support for plug-in that provides an in memory event bus which could be used as alternative to RabbitMQ based event bus. Both publisher are subscriber should be running with management server to use in-memroy event bus. --- client/pom.xml | 5 + plugins/event-bus/inmemory/pom.xml | 40 +++++ .../mom/inmemory/InMemoryEventBus.java | 163 ++++++++++++++++++ plugins/pom.xml | 1 + 4 files changed, 209 insertions(+) create mode 100644 plugins/event-bus/inmemory/pom.xml create mode 100644 plugins/event-bus/inmemory/src/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java diff --git a/client/pom.xml b/client/pom.xml index 75b5504ec96..33d3f1eeaeb 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -206,6 +206,11 @@ cloud-mom-rabbitmq ${project.version} + + org.apache.cloudstack + cloud-mom-inmemory + ${project.version} + mysql mysql-connector-java diff --git a/plugins/event-bus/inmemory/pom.xml b/plugins/event-bus/inmemory/pom.xml new file mode 100644 index 00000000000..1bde8b8d843 --- /dev/null +++ b/plugins/event-bus/inmemory/pom.xml @@ -0,0 +1,40 @@ + + + 4.0.0 + cloud-mom-inmemory + Apache CloudStack Plugin - In Memory Event Bus + + org.apache.cloudstack + cloudstack-plugins + 4.4.0-SNAPSHOT + ../../pom.xml + + + + org.apache.cloudstack + cloud-framework-events + ${project.version} + + + + install + + diff --git a/plugins/event-bus/inmemory/src/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java b/plugins/event-bus/inmemory/src/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java new file mode 100644 index 00000000000..7c282d739d9 --- /dev/null +++ b/plugins/event-bus/inmemory/src/org/apache/cloudstack/mom/inmemory/InMemoryEventBus.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cloudstack.mom.inmemory; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +import javax.ejb.Local; +import javax.naming.ConfigurationException; + +import com.cloud.utils.Pair; +import com.cloud.utils.Ternary; +import org.apache.log4j.Logger; + +import org.apache.cloudstack.framework.events.Event; +import org.apache.cloudstack.framework.events.EventBus; +import org.apache.cloudstack.framework.events.EventBusException; +import org.apache.cloudstack.framework.events.EventSubscriber; +import org.apache.cloudstack.framework.events.EventTopic; + +import com.cloud.utils.component.ManagerBase; + +@Local(value = EventBus.class) +public class InMemoryEventBus extends ManagerBase implements EventBus { + + private String name; + private static final Logger s_logger = Logger.getLogger(InMemoryEventBus.class); + private static ConcurrentHashMap> s_subscribers; + + @Override + public boolean configure(String name, Map params) throws ConfigurationException { + s_subscribers = new ConcurrentHashMap>(); + return true; + } + + @Override + public void setName(String name) { + this.name = name; + } + + @Override + public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException { + if (subscriber == null || topic == null) { + throw new EventBusException("Invalid EventSubscriber/EventTopic object passed."); + } + UUID subscriberId = UUID.randomUUID(); + + s_subscribers.put(subscriberId, new Pair(topic, subscriber)); + return subscriberId; + } + + @Override + public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException { + if (s_subscribers != null && s_subscribers.isEmpty()) { + throw new EventBusException("There are no registered subscribers to unregister."); + } + if (s_subscribers.get(subscriberId) == null) { + throw new EventBusException("No subscriber found with subscriber id " + subscriberId); + } + s_subscribers.remove(subscriberId); + } + + @Override + public void publish(Event event) throws EventBusException { + if (s_subscribers == null || s_subscribers.isEmpty()) { + return; // no subscriber to publish to, so just return + } + + for (UUID subscriberId : s_subscribers.keySet()) { + Pair subscriberDetails = s_subscribers.get(subscriberId); + // if the event matches subscribers interested event topic then call back the subscriber with the event + if (isEventMatchesTopic(event, subscriberDetails.first())) { + EventSubscriber subscriber = subscriberDetails.second(); + subscriber.onEvent(event); + } + } + } + + @Override + public String getName() { + return _name; + } + + @Override + public boolean start() { + return true; + } + + @Override + public boolean stop() { + return true; + } + + private String replaceNullWithWildcard(String key) { + if (key == null || key.isEmpty()) { + return "*"; + } else { + return key; + } + } + + private boolean isEventMatchesTopic(Event event, EventTopic topic) { + + String eventTopicSource = replaceNullWithWildcard(topic.getEventSource()); + eventTopicSource = eventTopicSource.replace(".", "-"); + String eventSource = replaceNullWithWildcard(event.getEventSource()); + eventSource = eventSource.replace(".", "-"); + if (eventTopicSource != "*" && eventSource != "*" && !eventTopicSource.equalsIgnoreCase(eventSource)) { + return false; + } + + String eventTopicCategory = replaceNullWithWildcard(topic.getEventCategory()); + eventTopicCategory = eventTopicCategory.replace(".", "-"); + String eventCategory = replaceNullWithWildcard(event.getEventCategory()); + eventCategory = eventCategory.replace(".", "-"); + if (eventTopicCategory != "*" && eventCategory != "*" && !eventTopicCategory.equalsIgnoreCase(eventCategory)) { + return false; + } + + String eventTopicType = replaceNullWithWildcard(topic.getEventType()); + eventTopicType = eventTopicType.replace(".", "-"); + String eventType = replaceNullWithWildcard(event.getEventType()); + eventType = eventType.replace(".", "-"); + if (eventTopicType != "*" && eventType != "*" && !eventTopicType.equalsIgnoreCase(eventType)) { + return false; + } + + String eventTopicResourceType = replaceNullWithWildcard(topic.getResourceType()); + eventTopicResourceType = eventTopicResourceType.replace(".", "-"); + String resourceType = replaceNullWithWildcard(event.getResourceType()); + resourceType = resourceType.replace(".", "-"); + if (eventTopicResourceType != "*" && resourceType != "*" && !eventTopicResourceType.equalsIgnoreCase(resourceType)) { + return false; + } + + String resourceUuid = replaceNullWithWildcard(event.getResourceUUID()); + resourceUuid = resourceUuid.replace(".", "-"); + String eventTopicresourceUuid = replaceNullWithWildcard(topic.getResourceUUID()); + eventTopicresourceUuid = eventTopicresourceUuid.replace(".", "-"); + if (resourceUuid != "*" && eventTopicresourceUuid != "*" && !resourceUuid.equalsIgnoreCase(eventTopicresourceUuid)) { + return false; + } + + return true; + } +} diff --git a/plugins/pom.xml b/plugins/pom.xml index 06cf79f785e..8ec6a711376 100755 --- a/plugins/pom.xml +++ b/plugins/pom.xml @@ -39,6 +39,7 @@ hypervisors/xen hypervisors/kvm event-bus/rabbitmq + event-bus/inmemory hypervisors/baremetal hypervisors/ucs hypervisors/hyperv