From 04b30e0e66e6aba3c0484423fcd96687bbe53826 Mon Sep 17 00:00:00 2001 From: Pierre-Yves Ritschard Date: Tue, 10 Mar 2015 23:25:21 +0100 Subject: [PATCH] Add Apache Kafka event-bus support - producing only. This commit produces event bus messages to a "cloudstack" topic in Apache Kafka. Configuration is expected to be found in /etc/cloudstack/management/kafka.producer.properties and will generally be of the form: bootstrap.servers=kafka-host1:9092,kafka-host2:9092 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer There is no way to parameterize the topic yet, and the consuming code is just place-holder. I think adding a consumer within cloudstack is very debatable and likely not needed. Signed-off-by: Rohit Yadav --- client/pom.xml | 5 + plugins/event-bus/kafka/pom.xml | 45 ++++++++ .../cloudstack/mom/kafka/KafkaEventBus.java | 109 ++++++++++++++++++ plugins/pom.xml | 1 + 4 files changed, 160 insertions(+) create mode 100644 plugins/event-bus/kafka/pom.xml create mode 100644 plugins/event-bus/kafka/src/org/apache/cloudstack/mom/kafka/KafkaEventBus.java diff --git a/client/pom.xml b/client/pom.xml index 9453159a487..59137ca284f 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -241,6 +241,11 @@ cloud-mom-inmemory ${project.version} + + org.apache.cloudstack + cloud-mom-kafka + ${project.version} + org.apache.cloudstack cloud-framework-ipc diff --git a/plugins/event-bus/kafka/pom.xml b/plugins/event-bus/kafka/pom.xml new file mode 100644 index 00000000000..f6205e92124 --- /dev/null +++ b/plugins/event-bus/kafka/pom.xml @@ -0,0 +1,45 @@ + + + 4.0.0 + cloud-mom-kafka + Apache CloudStack Plugin - Kafka Event Bus + + org.apache.cloudstack + cloudstack-plugins + 4.4.2 + ../../pom.xml + + + + org.apache.cloudstack + cloud-framework-events + ${project.version} + + + org.apache.kafka + kafka-clients + 0.8.2.0 + + + + install + + diff --git a/plugins/event-bus/kafka/src/org/apache/cloudstack/mom/kafka/KafkaEventBus.java b/plugins/event-bus/kafka/src/org/apache/cloudstack/mom/kafka/KafkaEventBus.java new file mode 100644 index 00000000000..d959a5e6699 --- /dev/null +++ b/plugins/event-bus/kafka/src/org/apache/cloudstack/mom/kafka/KafkaEventBus.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cloudstack.mom.kafka; + +import java.io.FileInputStream; + +import java.util.Map; +import java.util.UUID; +import java.util.Properties; + +import javax.ejb.Local; +import javax.naming.ConfigurationException; + +import org.apache.log4j.Logger; + +import org.apache.cloudstack.framework.events.Event; +import org.apache.cloudstack.framework.events.EventBus; +import org.apache.cloudstack.framework.events.EventBusException; +import org.apache.cloudstack.framework.events.EventSubscriber; +import org.apache.cloudstack.framework.events.EventTopic; + +import com.cloud.utils.component.ManagerBase; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; + +import com.cloud.utils.PropertiesUtil; + +@Local(value = EventBus.class) +public class KafkaEventBus extends ManagerBase implements EventBus { + + private final String _topic = "cloudstack"; + private Producer _producer; + private static final Logger s_logger = Logger.getLogger(KafkaEventBus.class); + + @Override + public boolean configure(String name, Map params) throws ConfigurationException { + + final Properties props = new Properties(); + + try { + final FileInputStream is = new FileInputStream(PropertiesUtil.findConfigFile("kafka.producer.properties")); + props.load(is); + is.close(); + } catch (Exception e) { + throw new ConfigurationException("Could not read kafka properties"); + } + + _producer = new KafkaProducer(props); + _name = name; + + return true; + } + + @Override + public void setName(String name) { + _name = name; + } + + @Override + public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException { + /* NOOP */ + return UUID.randomUUID(); + } + + @Override + public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException { + /* NOOP */ + } + + @Override + public void publish(Event event) throws EventBusException { + ProducerRecord record = new ProducerRecord(_topic, event.getResourceUUID(), event.getDescription()); + _producer.send(record); + } + + @Override + public String getName() { + return _name; + } + + @Override + public boolean start() { + return true; + } + + @Override + public boolean stop() { + return true; + } +} diff --git a/plugins/pom.xml b/plugins/pom.xml index 962ce46609a..8034bd1da70 100755 --- a/plugins/pom.xml +++ b/plugins/pom.xml @@ -54,6 +54,7 @@ hypervisors/kvm event-bus/rabbitmq event-bus/inmemory + event-bus/kafka hypervisors/baremetal hypervisors/ucs hypervisors/hyperv