CLOUDSTACK-1391: EventBus is not getting injected after javelin merge

added logic to inject event bus using ComponentContext.getComponent

Conflicts:
	server/src/com/cloud/storage/listener/SnapshotStateListener.java
This commit is contained in:
Murali Reddy 2013-02-27 14:48:59 +05:30
parent a5ab63602d
commit 3e0bdaac1e
9 changed files with 137 additions and 110 deletions

View File

@ -19,15 +19,13 @@
package org.apache.cloudstack.framework.events;
import com.cloud.utils.component.Adapter;
import java.util.UUID;
/**
* Interface to publish and subscribe to CloudStack events
*
*/
public interface EventBus extends Adapter{
public interface EventBus {
/**
* publish an event on to the event bus

View File

@ -40,13 +40,17 @@ import java.util.concurrent.Executors;
public class RabbitMQEventBus extends ManagerBase implements EventBus {
// details of AMQP server
private static String _amqpHost;
private static Integer _port;
private static String _username;
private static String _password;
private static String amqpHost;
private static Integer port;
private static String username;
private static String password;
// AMQP exchange name where all CloudStack events will be published
private static String _amqpExchangeName;
private static String amqpExchangeName;
private String name;
private static Integer retryInterval;
// hashmap to book keep the registered subscribers
private static ConcurrentHashMap<String, Ternary<String, Channel, EventSubscriber>> _subscribers;
@ -58,59 +62,76 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus {
private static boolean _autoAck = true;
private ExecutorService executorService;
private String _name;
private static DisconnectHandler disconnectHandler;
private static Integer _retryInterval;
private static final Logger s_logger = Logger.getLogger(RabbitMQEventBus.class);
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
_amqpHost = (String) params.get("server");
if (_amqpHost == null || _amqpHost.isEmpty()) {
throw new ConfigurationException("Unable to get the AMQP server details");
}
_username = (String) params.get("username");
if (_username == null || _username.isEmpty()) {
throw new ConfigurationException("Unable to get the username details");
}
_password = (String) params.get("password");
if (_password == null || _password.isEmpty()) {
throw new ConfigurationException("Unable to get the password details");
}
_amqpExchangeName = (String) params.get("exchangename");
if (_amqpExchangeName == null || _amqpExchangeName.isEmpty()) {
throw new ConfigurationException("Unable to get the _exchange details on the AMQP server");
}
try {
String portStr = (String) params.get("port");
if (portStr == null || portStr.isEmpty()) {
if (amqpHost == null || amqpHost.isEmpty()) {
throw new ConfigurationException("Unable to get the AMQP server details");
}
if (username == null || username.isEmpty()) {
throw new ConfigurationException("Unable to get the username details");
}
if (password == null || password.isEmpty()) {
throw new ConfigurationException("Unable to get the password details");
}
if (amqpExchangeName == null || amqpExchangeName.isEmpty()) {
throw new ConfigurationException("Unable to get the _exchange details on the AMQP server");
}
if (port == null) {
throw new ConfigurationException("Unable to get the port details of AMQP server");
}
_port = Integer.parseInt(portStr);
String retryIntervalStr = (String) params.get("retryinterval");
if (retryIntervalStr == null || retryIntervalStr.isEmpty()) {
// default to 10s to try out reconnect
retryIntervalStr = "10000";
if (retryInterval == null) {
retryInterval = 10000;// default to 10s to try out reconnect
}
_retryInterval = Integer.parseInt(retryIntervalStr);
} catch (NumberFormatException e) {
throw new ConfigurationException("Invalid port number/retry interval");
}
_subscribers = new ConcurrentHashMap<String, Ternary<String, Channel, EventSubscriber>>();
executorService = Executors.newCachedThreadPool();
disconnectHandler = new DisconnectHandler();
_name = name;
return true;
}
public void setServer(String amqpHost) {
this.amqpHost = amqpHost;
}
public void setUsername(String username) {
this.username = username;
}
public void setPassword(String password) {
this.password = password;
}
public void setPort(Integer port) {
this.port = port;
}
public void setName(String name) {
this.name = name;
}
public void setExchange(String exchange) {
this.amqpExchangeName = exchange;
}
public void setRetryInterval(Integer retryInterval) {
this.retryInterval = retryInterval;
}
/** Call to subscribe to interested set of events
*
* @param topic defines category and type of the events being subscribed to
@ -141,9 +162,9 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus {
Channel channel = createChannel(connection);
// create a queue and bind it to the exchange with binding key formed from event topic
createExchange(channel, _amqpExchangeName);
createExchange(channel, amqpExchangeName);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, _amqpExchangeName, bindingKey);
channel.queueBind(queueName, amqpExchangeName, bindingKey);
// register a callback handler to receive the events that a subscriber subscribed to
channel.basicConsume(queueName, _autoAck, queueName,
@ -216,8 +237,8 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus {
try {
Connection connection = getConnection();
Channel channel = createChannel(connection);
createExchange(channel, _amqpExchangeName);
publishEventToExchange(channel, _amqpExchangeName, routingKey, eventDescription);
createExchange(channel, amqpExchangeName);
publishEventToExchange(channel, amqpExchangeName, routingKey, eventDescription);
channel.close();
} catch (AlreadyClosedException e) {
closeConnection();
@ -315,11 +336,11 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus {
private synchronized Connection createConnection() throws Exception {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(_username);
factory.setPassword(_password);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost("/");
factory.setHost(_amqpHost);
factory.setPort(_port);
factory.setHost(amqpHost);
factory.setPort(port);
Connection connection = factory.newConnection();
connection.addShutdownListener(disconnectHandler);
_connection = connection;
@ -481,7 +502,7 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus {
while (!connected) {
try {
Thread.sleep(_retryInterval);
Thread.sleep(retryInterval);
} catch (InterruptedException ie) {
// ignore timer interrupts
}
@ -504,9 +525,9 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus {
* with binding key formed from event topic
*/
Channel channel = createChannel(connection);
createExchange(channel, _amqpExchangeName);
createExchange(channel, amqpExchangeName);
channel.queueDeclare(subscriberId, false, false, false, null);
channel.queueBind(subscriberId, _amqpExchangeName, bindingKey);
channel.queueBind(subscriberId, amqpExchangeName, bindingKey);
// register a callback handler to receive the events that a subscriber subscribed to
channel.basicConsume(subscriberId, _autoAck, subscriberId,

View File

@ -26,22 +26,23 @@ import com.cloud.user.UserContext;
import com.cloud.user.dao.AccountDao;
import com.cloud.user.dao.UserDao;
import com.cloud.utils.component.AnnotationInterceptor;
import com.cloud.utils.component.ComponentContext;
import net.sf.cglib.proxy.Callback;
import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.EventBusException;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
@Component
public class ActionEventUtils {
private static final Logger s_logger = Logger.getLogger(ActionEventUtils.class);
@ -49,14 +50,12 @@ public class ActionEventUtils {
private static EventDao _eventDao;
private static AccountDao _accountDao;
protected static UserDao _userDao;
// get the event bus provider if configured
protected static EventBus _eventBus;
protected static EventBus _eventBus = null;
@Inject EventDao eventDao;
@Inject AccountDao accountDao;
@Inject UserDao userDao;
public ActionEventUtils() {
}
@ -65,8 +64,6 @@ public class ActionEventUtils {
_eventDao = eventDao;
_accountDao = accountDao;
_userDao = userDao;
// TODO we will do injection of event bus later
}
public static Long onActionEvent(Long userId, Long accountId, Long domainId, String type, String description) {
@ -156,7 +153,9 @@ public class ActionEventUtils {
private static void publishOnEventBus(long userId, long accountId, String eventCategory,
String eventType, Event.State state) {
if (_eventBus == null) {
try {
_eventBus = ComponentContext.getComponent(EventBus.class);
} catch(NoSuchBeanDefinitionException nbe) {
return; // no provider is configured to provide events bus, so just return
}

View File

@ -22,16 +22,17 @@ import com.cloud.dc.HostPodVO;
import com.cloud.dc.dao.DataCenterDao;
import com.cloud.dc.dao.HostPodDao;
import com.cloud.server.ManagementServer;
import org.apache.cloudstack.framework.events.*;
import com.cloud.utils.component.ComponentContext;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.EventBusException;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.stereotype.Component;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.Map;
@Component
public class AlertGenerator {
@ -39,13 +40,11 @@ public class AlertGenerator {
private static final Logger s_logger = Logger.getLogger(AlertGenerator.class);
private static DataCenterDao _dcDao;
private static HostPodDao _podDao;
// get the event bus provider if configured
protected static EventBus _eventBus = null;
@Inject DataCenterDao dcDao;
@Inject HostPodDao podDao;
public AlertGenerator() {
}
@ -56,8 +55,10 @@ public class AlertGenerator {
}
public static void publishAlertOnEventBus(String alertType, long dataCenterId, Long podId, String subject, String body) {
if (_eventBus == null) {
return; // no provider is configured to provider events bus, so just return
try {
_eventBus = ComponentContext.getComponent(EventBus.class);
} catch(NoSuchBeanDefinitionException nbe) {
return; // no provider is configured to provide events bus, so just return
}
org.apache.cloudstack.framework.events.Event event =

View File

@ -23,17 +23,18 @@ import com.cloud.event.dao.UsageEventDao;
import com.cloud.server.ManagementServer;
import com.cloud.user.Account;
import com.cloud.user.dao.AccountDao;
import org.apache.cloudstack.framework.events.EventBus;
import com.cloud.utils.component.ComponentContext;
import org.apache.cloudstack.framework.events.Event;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.EventBusException;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.Map;
@Component
public class UsageEventUtils {
@ -42,14 +43,12 @@ public class UsageEventUtils {
private static AccountDao _accountDao;
private static DataCenterDao _dcDao;
private static final Logger s_logger = Logger.getLogger(UsageEventUtils.class);
// get the event bus provider if configured
protected static EventBus _eventBus;
protected static EventBus _eventBus = null;
@Inject UsageEventDao usageEventDao;
@Inject AccountDao accountDao;
@Inject DataCenterDao dcDao;
public UsageEventUtils() {
}
@ -116,8 +115,10 @@ public class UsageEventUtils {
private static void publishUsageEvent(String usageEventType, Long accountId, Long zoneId, String resourceType, String resourceUUID) {
if (_eventBus == null) {
return; // no provider is configured to provider events bus, so just return
try {
_eventBus = ComponentContext.getComponent(EventBus.class);
} catch(NoSuchBeanDefinitionException nbe) {
return; // no provider is configured to provide events bus, so just return
}
Account account = _accountDao.findById(accountId);

View File

@ -23,24 +23,23 @@ import com.cloud.network.Network.Event;
import com.cloud.network.Network.State;
import com.cloud.network.dao.NetworkDao;
import com.cloud.server.ManagementServer;
import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.fsm.StateListener;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.EventBusException;
import org.apache.log4j.Logger;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.Map;
public class NetworkStateListener implements StateListener<State, Event, Network> {
@Inject protected UsageEventDao _usageEventDao;
@Inject protected NetworkDao _networkDao;
// get the event bus provider if configured
@Inject protected EventBus _eventBus;
protected static EventBus _eventBus = null;
private static final Logger s_logger = Logger.getLogger(NetworkStateListener.class);
@ -63,7 +62,9 @@ public class NetworkStateListener implements StateListener<State, Event, Network
private void pubishOnEventBus(String event, String status, Network vo, State oldState, State newState) {
if (_eventBus == null) {
try {
_eventBus = ComponentContext.getComponent(EventBus.class);
} catch(NoSuchBeanDefinitionException nbe) {
return; // no provider is configured to provide events bus, so just return
}

View File

@ -17,27 +17,29 @@
package com.cloud.storage.listener;
import java.util.HashMap;
import java.util.Map;
import javax.inject.Inject;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.EventBusException;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import com.cloud.event.EventCategory;
import com.cloud.server.ManagementServer;
import com.cloud.storage.Snapshot;
import com.cloud.storage.Snapshot.State;
import com.cloud.storage.Snapshot.Event;
import com.cloud.storage.Snapshot.State;
import com.cloud.storage.SnapshotVO;
import com.cloud.utils.fsm.StateListener;
import com.cloud.utils.component.ComponentContext;
import java.util.HashMap;
import java.util.Map;
public class SnapshotStateListener implements StateListener<State, Event, SnapshotVO> {
// get the event bus provider if configured
@Inject protected EventBus _eventBus;
protected static EventBus _eventBus = null;
private static final Logger s_logger = Logger.getLogger(VolumeStateListener.class);
@ -59,8 +61,10 @@ public class SnapshotStateListener implements StateListener<State, Event, Snapsh
private void pubishOnEventBus(String event, String status, Snapshot vo, State oldState, State newState) {
if (_eventBus == null) {
return; // no provider is configured to provide events bus, so just return
try {
_eventBus = ComponentContext.getComponent(EventBus.class);
} catch(NoSuchBeanDefinitionException nbe) {
return; // no provider is configured to provide events bus, so just return
}
String resourceName = getEntityFromClassName(Snapshot.class.getName());

View File

@ -18,24 +18,23 @@
package com.cloud.storage.listener;
import com.cloud.event.EventCategory;
import com.cloud.server.ManagementServer;
import com.cloud.storage.Volume;
import com.cloud.storage.Volume.Event;
import com.cloud.storage.Volume.State;
import com.cloud.server.ManagementServer;
import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.fsm.StateListener;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.EventBusException;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import java.util.HashMap;
import java.util.Map;
import javax.inject.Inject;
public class VolumeStateListener implements StateListener<State, Event, Volume> {
// get the event bus provider if configured
@Inject protected EventBus _eventBus = null;
protected static EventBus _eventBus = null;
private static final Logger s_logger = Logger.getLogger(VolumeStateListener.class);
@ -57,8 +56,10 @@ public class VolumeStateListener implements StateListener<State, Event, Volume>
private void pubishOnEventBus(String event, String status, Volume vo, State oldState, State newState) {
if (_eventBus == null) {
return; // no provider is configured to provide events bus, so just return
try {
_eventBus = ComponentContext.getComponent(EventBus.class);
} catch(NoSuchBeanDefinitionException nbe) {
return; // no provider is configured to provide events bus, so just return
}
String resourceName = getEntityFromClassName(Volume.class.getName());

View File

@ -20,24 +20,24 @@ import com.cloud.event.EventCategory;
import com.cloud.event.EventTypes;
import com.cloud.event.UsageEventUtils;
import com.cloud.event.dao.UsageEventDao;
import com.cloud.network.Network;
import com.cloud.network.dao.NetworkDao;
import com.cloud.network.dao.NetworkVO;
import com.cloud.network.Network;
import com.cloud.server.ManagementServer;
import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.fsm.StateListener;
import com.cloud.vm.VirtualMachine.Event;
import com.cloud.vm.VirtualMachine.State;
import com.cloud.vm.dao.NicDao;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import java.util.Enumeration;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.inject.Inject;
public class UserVmStateListener implements StateListener<State, VirtualMachine.Event, VirtualMachine> {
@Inject protected UsageEventDao _usageEventDao;
@ -45,8 +45,7 @@ public class UserVmStateListener implements StateListener<State, VirtualMachine.
@Inject protected NicDao _nicDao;
private static final Logger s_logger = Logger.getLogger(UserVmStateListener.class);
// get the event bus provider if configured
@Inject protected org.apache.cloudstack.framework.events.EventBus _eventBus = null;
protected static EventBus _eventBus = null;
public UserVmStateListener(UsageEventDao usageEventDao, NetworkDao networkDao, NicDao nicDao) {
this._usageEventDao = usageEventDao;
@ -94,7 +93,9 @@ public class UserVmStateListener implements StateListener<State, VirtualMachine.
private void pubishOnEventBus(String event, String status, VirtualMachine vo, VirtualMachine.State oldState, VirtualMachine.State newState) {
if (_eventBus == null) {
try {
_eventBus = ComponentContext.getComponent(EventBus.class);
} catch(NoSuchBeanDefinitionException nbe) {
return; // no provider is configured to provide events bus, so just return
}