Add Apache Kafka event-bus support - producing only.

This commit produces event bus messages to a "cloudstack" topic
in Apache Kafka. Configuration is expected to be found in
/etc/cloudstack/management/kafka.producer.properties and will
generally be of the form:

    bootstrap.servers=kafka-host1:9092,kafka-host2:9092
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    value.serializer=org.apache.kafka.common.serialization.StringSerializer

There is no way to parameterize the topic yet, and the consuming
code is just place-holder. I think adding a consumer within cloudstack
is very debatable and likely not needed.

Signed-off-by: Rohit Yadav <rohit.yadav@shapeblue.com>
This commit is contained in:
Pierre-Yves Ritschard 2015-03-10 23:25:21 +01:00 committed by Rohit Yadav
parent 05d2b0a707
commit 04b30e0e66
4 changed files with 160 additions and 0 deletions

View File

@ -241,6 +241,11 @@
<artifactId>cloud-mom-inmemory</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cloudstack</groupId>
<artifactId>cloud-mom-kafka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cloudstack</groupId>
<artifactId>cloud-framework-ipc</artifactId>

View File

@ -0,0 +1,45 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-mom-kafka</artifactId>
<name>Apache CloudStack Plugin - Kafka Event Bus</name>
<parent>
<groupId>org.apache.cloudstack</groupId>
<artifactId>cloudstack-plugins</artifactId>
<version>4.4.2</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.apache.cloudstack</groupId>
<artifactId>cloud-framework-events</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.0</version>
</dependency>
</dependencies>
<build>
<defaultGoal>install</defaultGoal>
</build>
</project>

View File

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.mom.kafka;
import java.io.FileInputStream;
import java.util.Map;
import java.util.UUID;
import java.util.Properties;
import javax.ejb.Local;
import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.cloudstack.framework.events.Event;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.EventBusException;
import org.apache.cloudstack.framework.events.EventSubscriber;
import org.apache.cloudstack.framework.events.EventTopic;
import com.cloud.utils.component.ManagerBase;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.cloud.utils.PropertiesUtil;
@Local(value = EventBus.class)
public class KafkaEventBus extends ManagerBase implements EventBus {
private final String _topic = "cloudstack";
private Producer<String,String> _producer;
private static final Logger s_logger = Logger.getLogger(KafkaEventBus.class);
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
final Properties props = new Properties();
try {
final FileInputStream is = new FileInputStream(PropertiesUtil.findConfigFile("kafka.producer.properties"));
props.load(is);
is.close();
} catch (Exception e) {
throw new ConfigurationException("Could not read kafka properties");
}
_producer = new KafkaProducer<String,String>(props);
_name = name;
return true;
}
@Override
public void setName(String name) {
_name = name;
}
@Override
public UUID subscribe(EventTopic topic, EventSubscriber subscriber) throws EventBusException {
/* NOOP */
return UUID.randomUUID();
}
@Override
public void unsubscribe(UUID subscriberId, EventSubscriber subscriber) throws EventBusException {
/* NOOP */
}
@Override
public void publish(Event event) throws EventBusException {
ProducerRecord<String, String> record = new ProducerRecord<String,String>(_topic, event.getResourceUUID(), event.getDescription());
_producer.send(record);
}
@Override
public String getName() {
return _name;
}
@Override
public boolean start() {
return true;
}
@Override
public boolean stop() {
return true;
}
}

View File

@ -54,6 +54,7 @@
<module>hypervisors/kvm</module>
<module>event-bus/rabbitmq</module>
<module>event-bus/inmemory</module>
<module>event-bus/kafka</module>
<module>hypervisors/baremetal</module>
<module>hypervisors/ucs</module>
<module>hypervisors/hyperv</module>