CLOUDSTACK-5787: support in-memroy eventbus

this checkin adds support for plug-in that provides an in memory event
bus which could be used as alternative to RabbitMQ based event bus. Both
publisher are subscriber should be running with management server to use
in-memroy event bus.
This commit is contained in:
Murali Reddy 2014-01-06 13:21:49 +05:30
parent 3ad0e8fb47
commit 3a6fcaf1fc
4 changed files with 209 additions and 0 deletions

View File

@ -206,6 +206,11 @@
<artifactId>cloud-mom-rabbitmq</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cloudstack</groupId>
<artifactId>cloud-mom-inmemory</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>

View File

@ -0,0 +1,40 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-mom-inmemory</artifactId>
<name>Apache CloudStack Plugin - In Memory Event Bus</name>
<parent>
<groupId>org.apache.cloudstack</groupId>
<artifactId>cloudstack-plugins</artifactId>
<version>4.4.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.apache.cloudstack</groupId>
<artifactId>cloud-framework-events</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<defaultGoal>install</defaultGoal>
</build>
</project>

View File

@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.mom.inmemory;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import javax.ejb.Local;
import javax.naming.ConfigurationException;
import com.cloud.utils.Pair;
import com.cloud.utils.Ternary;
import org.apache.log4j.Logger;
import org.apache.cloudstack.framework.events.Event;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.EventBusException;
import org.apache.cloudstack.framework.events.EventSubscriber;
import org.apache.cloudstack.framework.events.EventTopic;
import com.cloud.utils.component.ManagerBase;
@Local(value = EventBus.class)
public class InMemoryEventBus extends ManagerBase implements EventBus {
private String name;
private static final Logger s_logger = Logger.getLogger(InMemoryEventBus.class);
private static ConcurrentHashMap<UUID, Pair<EventTopic, EventSubscriber>> s_subscribers;
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
s_subscribers = new ConcurrentHashMap<UUID, Pair<EventTopic, EventSubscriber>>();
return true;
}
@Override
public void setName(String name) {
this.name = name;
}
@Override
public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException {
if (subscriber == null || topic == null) {
throw new EventBusException("Invalid EventSubscriber/EventTopic object passed.");
}
UUID subscriberId = UUID.randomUUID();
s_subscribers.put(subscriberId, new Pair(topic, subscriber));
return subscriberId;
}
@Override
public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException {
if (s_subscribers != null && s_subscribers.isEmpty()) {
throw new EventBusException("There are no registered subscribers to unregister.");
}
if (s_subscribers.get(subscriberId) == null) {
throw new EventBusException("No subscriber found with subscriber id " + subscriberId);
}
s_subscribers.remove(subscriberId);
}
@Override
public void publish(Event event) throws EventBusException {
if (s_subscribers == null || s_subscribers.isEmpty()) {
return; // no subscriber to publish to, so just return
}
for (UUID subscriberId : s_subscribers.keySet()) {
Pair<EventTopic, EventSubscriber> subscriberDetails = s_subscribers.get(subscriberId);
// if the event matches subscribers interested event topic then call back the subscriber with the event
if (isEventMatchesTopic(event, subscriberDetails.first())) {
EventSubscriber subscriber = subscriberDetails.second();
subscriber.onEvent(event);
}
}
}
@Override
public String getName() {
return _name;
}
@Override
public boolean start() {
return true;
}
@Override
public boolean stop() {
return true;
}
private String replaceNullWithWildcard(String key) {
if (key == null || key.isEmpty()) {
return "*";
} else {
return key;
}
}
private boolean isEventMatchesTopic(Event event, EventTopic topic) {
String eventTopicSource = replaceNullWithWildcard(topic.getEventSource());
eventTopicSource = eventTopicSource.replace(".", "-");
String eventSource = replaceNullWithWildcard(event.getEventSource());
eventSource = eventSource.replace(".", "-");
if (eventTopicSource != "*" && eventSource != "*" && !eventTopicSource.equalsIgnoreCase(eventSource)) {
return false;
}
String eventTopicCategory = replaceNullWithWildcard(topic.getEventCategory());
eventTopicCategory = eventTopicCategory.replace(".", "-");
String eventCategory = replaceNullWithWildcard(event.getEventCategory());
eventCategory = eventCategory.replace(".", "-");
if (eventTopicCategory != "*" && eventCategory != "*" && !eventTopicCategory.equalsIgnoreCase(eventCategory)) {
return false;
}
String eventTopicType = replaceNullWithWildcard(topic.getEventType());
eventTopicType = eventTopicType.replace(".", "-");
String eventType = replaceNullWithWildcard(event.getEventType());
eventType = eventType.replace(".", "-");
if (eventTopicType != "*" && eventType != "*" && !eventTopicType.equalsIgnoreCase(eventType)) {
return false;
}
String eventTopicResourceType = replaceNullWithWildcard(topic.getResourceType());
eventTopicResourceType = eventTopicResourceType.replace(".", "-");
String resourceType = replaceNullWithWildcard(event.getResourceType());
resourceType = resourceType.replace(".", "-");
if (eventTopicResourceType != "*" && resourceType != "*" && !eventTopicResourceType.equalsIgnoreCase(resourceType)) {
return false;
}
String resourceUuid = replaceNullWithWildcard(event.getResourceUUID());
resourceUuid = resourceUuid.replace(".", "-");
String eventTopicresourceUuid = replaceNullWithWildcard(topic.getResourceUUID());
eventTopicresourceUuid = eventTopicresourceUuid.replace(".", "-");
if (resourceUuid != "*" && eventTopicresourceUuid != "*" && !resourceUuid.equalsIgnoreCase(eventTopicresourceUuid)) {
return false;
}
return true;
}
}

View File

@ -39,6 +39,7 @@
<module>hypervisors/xen</module>
<module>hypervisors/kvm</module>
<module>event-bus/rabbitmq</module>
<module>event-bus/inmemory</module>
<module>hypervisors/baremetal</module>
<module>hypervisors/ucs</module>
<module>hypervisors/hyperv</module>