diff --git a/framework/events/src/org/apache/cloudstack/framework/events/EventBus.java b/framework/events/src/org/apache/cloudstack/framework/events/EventBus.java index c16ee6f96f4..b83e3b28a7a 100644 --- a/framework/events/src/org/apache/cloudstack/framework/events/EventBus.java +++ b/framework/events/src/org/apache/cloudstack/framework/events/EventBus.java @@ -19,15 +19,13 @@ package org.apache.cloudstack.framework.events; -import com.cloud.utils.component.Adapter; - import java.util.UUID; /** * Interface to publish and subscribe to CloudStack events * */ -public interface EventBus extends Adapter{ +public interface EventBus { /** * publish an event on to the event bus diff --git a/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java b/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java index ce0930d115d..1c0c6bef6f2 100644 --- a/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java +++ b/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java @@ -40,13 +40,17 @@ import java.util.concurrent.Executors; public class RabbitMQEventBus extends ManagerBase implements EventBus { // details of AMQP server - private static String _amqpHost; - private static Integer _port; - private static String _username; - private static String _password; + private static String amqpHost; + private static Integer port; + private static String username; + private static String password; // AMQP exchange name where all CloudStack events will be published - private static String _amqpExchangeName; + private static String amqpExchangeName; + + private String name; + + private static Integer retryInterval; // hashmap to book keep the registered subscribers private static ConcurrentHashMap> _subscribers; @@ -58,59 +62,76 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus { private static boolean _autoAck = true; private ExecutorService executorService; - private String _name; private static DisconnectHandler disconnectHandler; - private static Integer _retryInterval; private static final Logger s_logger = Logger.getLogger(RabbitMQEventBus.class); @Override public boolean configure(String name, Map params) throws ConfigurationException { - _amqpHost = (String) params.get("server"); - if (_amqpHost == null || _amqpHost.isEmpty()) { - throw new ConfigurationException("Unable to get the AMQP server details"); - } - - _username = (String) params.get("username"); - if (_username == null || _username.isEmpty()) { - throw new ConfigurationException("Unable to get the username details"); - } - - _password = (String) params.get("password"); - if (_password == null || _password.isEmpty()) { - throw new ConfigurationException("Unable to get the password details"); - } - - _amqpExchangeName = (String) params.get("exchangename"); - if (_amqpExchangeName == null || _amqpExchangeName.isEmpty()) { - throw new ConfigurationException("Unable to get the _exchange details on the AMQP server"); - } - try { - String portStr = (String) params.get("port"); - if (portStr == null || portStr.isEmpty()) { + if (amqpHost == null || amqpHost.isEmpty()) { + throw new ConfigurationException("Unable to get the AMQP server details"); + } + + if (username == null || username.isEmpty()) { + throw new ConfigurationException("Unable to get the username details"); + } + + if (password == null || password.isEmpty()) { + throw new ConfigurationException("Unable to get the password details"); + } + + if (amqpExchangeName == null || amqpExchangeName.isEmpty()) { + throw new ConfigurationException("Unable to get the _exchange details on the AMQP server"); + } + + if (port == null) { throw new ConfigurationException("Unable to get the port details of AMQP server"); } - _port = Integer.parseInt(portStr); - String retryIntervalStr = (String) params.get("retryinterval"); - if (retryIntervalStr == null || retryIntervalStr.isEmpty()) { - // default to 10s to try out reconnect - retryIntervalStr = "10000"; + if (retryInterval == null) { + retryInterval = 10000;// default to 10s to try out reconnect } - _retryInterval = Integer.parseInt(retryIntervalStr); + } catch (NumberFormatException e) { throw new ConfigurationException("Invalid port number/retry interval"); } _subscribers = new ConcurrentHashMap>(); - executorService = Executors.newCachedThreadPool(); disconnectHandler = new DisconnectHandler(); - _name = name; + return true; } + public void setServer(String amqpHost) { + this.amqpHost = amqpHost; + } + + public void setUsername(String username) { + this.username = username; + } + + public void setPassword(String password) { + this.password = password; + } + + public void setPort(Integer port) { + this.port = port; + } + + public void setName(String name) { + this.name = name; + } + + public void setExchange(String exchange) { + this.amqpExchangeName = exchange; + } + + public void setRetryInterval(Integer retryInterval) { + this.retryInterval = retryInterval; + } + /** Call to subscribe to interested set of events * * @param topic defines category and type of the events being subscribed to @@ -141,9 +162,9 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus { Channel channel = createChannel(connection); // create a queue and bind it to the exchange with binding key formed from event topic - createExchange(channel, _amqpExchangeName); + createExchange(channel, amqpExchangeName); channel.queueDeclare(queueName, false, false, false, null); - channel.queueBind(queueName, _amqpExchangeName, bindingKey); + channel.queueBind(queueName, amqpExchangeName, bindingKey); // register a callback handler to receive the events that a subscriber subscribed to channel.basicConsume(queueName, _autoAck, queueName, @@ -216,8 +237,8 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus { try { Connection connection = getConnection(); Channel channel = createChannel(connection); - createExchange(channel, _amqpExchangeName); - publishEventToExchange(channel, _amqpExchangeName, routingKey, eventDescription); + createExchange(channel, amqpExchangeName); + publishEventToExchange(channel, amqpExchangeName, routingKey, eventDescription); channel.close(); } catch (AlreadyClosedException e) { closeConnection(); @@ -315,11 +336,11 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus { private synchronized Connection createConnection() throws Exception { try { ConnectionFactory factory = new ConnectionFactory(); - factory.setUsername(_username); - factory.setPassword(_password); + factory.setUsername(username); + factory.setPassword(password); factory.setVirtualHost("/"); - factory.setHost(_amqpHost); - factory.setPort(_port); + factory.setHost(amqpHost); + factory.setPort(port); Connection connection = factory.newConnection(); connection.addShutdownListener(disconnectHandler); _connection = connection; @@ -481,7 +502,7 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus { while (!connected) { try { - Thread.sleep(_retryInterval); + Thread.sleep(retryInterval); } catch (InterruptedException ie) { // ignore timer interrupts } @@ -504,9 +525,9 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus { * with binding key formed from event topic */ Channel channel = createChannel(connection); - createExchange(channel, _amqpExchangeName); + createExchange(channel, amqpExchangeName); channel.queueDeclare(subscriberId, false, false, false, null); - channel.queueBind(subscriberId, _amqpExchangeName, bindingKey); + channel.queueBind(subscriberId, amqpExchangeName, bindingKey); // register a callback handler to receive the events that a subscriber subscribed to channel.basicConsume(subscriberId, _autoAck, subscriberId, diff --git a/server/src/com/cloud/event/ActionEventUtils.java b/server/src/com/cloud/event/ActionEventUtils.java index 22589f1a292..3f3ca685f73 100755 --- a/server/src/com/cloud/event/ActionEventUtils.java +++ b/server/src/com/cloud/event/ActionEventUtils.java @@ -26,22 +26,23 @@ import com.cloud.user.UserContext; import com.cloud.user.dao.AccountDao; import com.cloud.user.dao.UserDao; import com.cloud.utils.component.AnnotationInterceptor; +import com.cloud.utils.component.ComponentContext; import net.sf.cglib.proxy.Callback; import net.sf.cglib.proxy.MethodInterceptor; import net.sf.cglib.proxy.MethodProxy; import org.apache.cloudstack.framework.events.EventBus; import org.apache.cloudstack.framework.events.EventBusException; import org.apache.log4j.Logger; +import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.stereotype.Component; +import javax.annotation.PostConstruct; +import javax.inject.Inject; import java.lang.reflect.AnnotatedElement; import java.lang.reflect.Method; import java.util.HashMap; import java.util.Map; -import javax.annotation.PostConstruct; -import javax.inject.Inject; - @Component public class ActionEventUtils { private static final Logger s_logger = Logger.getLogger(ActionEventUtils.class); @@ -49,14 +50,12 @@ public class ActionEventUtils { private static EventDao _eventDao; private static AccountDao _accountDao; protected static UserDao _userDao; - - // get the event bus provider if configured - protected static EventBus _eventBus; + protected static EventBus _eventBus = null; @Inject EventDao eventDao; @Inject AccountDao accountDao; @Inject UserDao userDao; - + public ActionEventUtils() { } @@ -65,8 +64,6 @@ public class ActionEventUtils { _eventDao = eventDao; _accountDao = accountDao; _userDao = userDao; - - // TODO we will do injection of event bus later } public static Long onActionEvent(Long userId, Long accountId, Long domainId, String type, String description) { @@ -156,7 +153,9 @@ public class ActionEventUtils { private static void publishOnEventBus(long userId, long accountId, String eventCategory, String eventType, Event.State state) { - if (_eventBus == null) { + try { + _eventBus = ComponentContext.getComponent(EventBus.class); + } catch(NoSuchBeanDefinitionException nbe) { return; // no provider is configured to provide events bus, so just return } diff --git a/server/src/com/cloud/event/AlertGenerator.java b/server/src/com/cloud/event/AlertGenerator.java index 2dc7f3eb9e1..c56f9177af2 100644 --- a/server/src/com/cloud/event/AlertGenerator.java +++ b/server/src/com/cloud/event/AlertGenerator.java @@ -22,16 +22,17 @@ import com.cloud.dc.HostPodVO; import com.cloud.dc.dao.DataCenterDao; import com.cloud.dc.dao.HostPodDao; import com.cloud.server.ManagementServer; -import org.apache.cloudstack.framework.events.*; +import com.cloud.utils.component.ComponentContext; +import org.apache.cloudstack.framework.events.EventBus; +import org.apache.cloudstack.framework.events.EventBusException; import org.apache.log4j.Logger; +import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.stereotype.Component; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Map; - import javax.annotation.PostConstruct; import javax.inject.Inject; +import java.util.HashMap; +import java.util.Map; @Component public class AlertGenerator { @@ -39,13 +40,11 @@ public class AlertGenerator { private static final Logger s_logger = Logger.getLogger(AlertGenerator.class); private static DataCenterDao _dcDao; private static HostPodDao _podDao; - - // get the event bus provider if configured protected static EventBus _eventBus = null; @Inject DataCenterDao dcDao; @Inject HostPodDao podDao; - + public AlertGenerator() { } @@ -56,8 +55,10 @@ public class AlertGenerator { } public static void publishAlertOnEventBus(String alertType, long dataCenterId, Long podId, String subject, String body) { - if (_eventBus == null) { - return; // no provider is configured to provider events bus, so just return + try { + _eventBus = ComponentContext.getComponent(EventBus.class); + } catch(NoSuchBeanDefinitionException nbe) { + return; // no provider is configured to provide events bus, so just return } org.apache.cloudstack.framework.events.Event event = diff --git a/server/src/com/cloud/event/UsageEventUtils.java b/server/src/com/cloud/event/UsageEventUtils.java index d59262af2ba..54012443848 100644 --- a/server/src/com/cloud/event/UsageEventUtils.java +++ b/server/src/com/cloud/event/UsageEventUtils.java @@ -23,17 +23,18 @@ import com.cloud.event.dao.UsageEventDao; import com.cloud.server.ManagementServer; import com.cloud.user.Account; import com.cloud.user.dao.AccountDao; -import org.apache.cloudstack.framework.events.EventBus; +import com.cloud.utils.component.ComponentContext; import org.apache.cloudstack.framework.events.Event; +import org.apache.cloudstack.framework.events.EventBus; import org.apache.cloudstack.framework.events.EventBusException; import org.apache.log4j.Logger; +import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.stereotype.Component; -import java.util.HashMap; -import java.util.Map; - import javax.annotation.PostConstruct; import javax.inject.Inject; +import java.util.HashMap; +import java.util.Map; @Component public class UsageEventUtils { @@ -42,14 +43,12 @@ public class UsageEventUtils { private static AccountDao _accountDao; private static DataCenterDao _dcDao; private static final Logger s_logger = Logger.getLogger(UsageEventUtils.class); - - // get the event bus provider if configured - protected static EventBus _eventBus; + protected static EventBus _eventBus = null; @Inject UsageEventDao usageEventDao; @Inject AccountDao accountDao; @Inject DataCenterDao dcDao; - + public UsageEventUtils() { } @@ -116,8 +115,10 @@ public class UsageEventUtils { private static void publishUsageEvent(String usageEventType, Long accountId, Long zoneId, String resourceType, String resourceUUID) { - if (_eventBus == null) { - return; // no provider is configured to provider events bus, so just return + try { + _eventBus = ComponentContext.getComponent(EventBus.class); + } catch(NoSuchBeanDefinitionException nbe) { + return; // no provider is configured to provide events bus, so just return } Account account = _accountDao.findById(accountId); diff --git a/server/src/com/cloud/network/NetworkStateListener.java b/server/src/com/cloud/network/NetworkStateListener.java index bafe6d2d1f9..038e76988bf 100644 --- a/server/src/com/cloud/network/NetworkStateListener.java +++ b/server/src/com/cloud/network/NetworkStateListener.java @@ -23,24 +23,23 @@ import com.cloud.network.Network.Event; import com.cloud.network.Network.State; import com.cloud.network.dao.NetworkDao; import com.cloud.server.ManagementServer; +import com.cloud.utils.component.ComponentContext; import com.cloud.utils.fsm.StateListener; import org.apache.cloudstack.framework.events.EventBus; import org.apache.cloudstack.framework.events.EventBusException; import org.apache.log4j.Logger; - -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Map; +import org.springframework.beans.factory.NoSuchBeanDefinitionException; import javax.inject.Inject; +import java.util.HashMap; +import java.util.Map; public class NetworkStateListener implements StateListener { @Inject protected UsageEventDao _usageEventDao; @Inject protected NetworkDao _networkDao; - // get the event bus provider if configured - @Inject protected EventBus _eventBus; + protected static EventBus _eventBus = null; private static final Logger s_logger = Logger.getLogger(NetworkStateListener.class); @@ -63,7 +62,9 @@ public class NetworkStateListener implements StateListener { - // get the event bus provider if configured - @Inject protected EventBus _eventBus; + protected static EventBus _eventBus = null; private static final Logger s_logger = Logger.getLogger(VolumeStateListener.class); @@ -59,8 +61,10 @@ public class SnapshotStateListener implements StateListener { - // get the event bus provider if configured - @Inject protected EventBus _eventBus = null; + protected static EventBus _eventBus = null; private static final Logger s_logger = Logger.getLogger(VolumeStateListener.class); @@ -57,8 +56,10 @@ public class VolumeStateListener implements StateListener private void pubishOnEventBus(String event, String status, Volume vo, State oldState, State newState) { - if (_eventBus == null) { - return; // no provider is configured to provide events bus, so just return + try { + _eventBus = ComponentContext.getComponent(EventBus.class); + } catch(NoSuchBeanDefinitionException nbe) { + return; // no provider is configured to provide events bus, so just return } String resourceName = getEntityFromClassName(Volume.class.getName()); diff --git a/server/src/com/cloud/vm/UserVmStateListener.java b/server/src/com/cloud/vm/UserVmStateListener.java index 18f85670948..04aa8180b67 100644 --- a/server/src/com/cloud/vm/UserVmStateListener.java +++ b/server/src/com/cloud/vm/UserVmStateListener.java @@ -20,24 +20,24 @@ import com.cloud.event.EventCategory; import com.cloud.event.EventTypes; import com.cloud.event.UsageEventUtils; import com.cloud.event.dao.UsageEventDao; +import com.cloud.network.Network; import com.cloud.network.dao.NetworkDao; import com.cloud.network.dao.NetworkVO; -import com.cloud.network.Network; import com.cloud.server.ManagementServer; +import com.cloud.utils.component.ComponentContext; import com.cloud.utils.fsm.StateListener; import com.cloud.vm.VirtualMachine.Event; import com.cloud.vm.VirtualMachine.State; import com.cloud.vm.dao.NicDao; - +import org.apache.cloudstack.framework.events.EventBus; import org.apache.log4j.Logger; +import org.springframework.beans.factory.NoSuchBeanDefinitionException; -import java.util.Enumeration; +import javax.inject.Inject; import java.util.HashMap; import java.util.List; import java.util.Map; -import javax.inject.Inject; - public class UserVmStateListener implements StateListener { @Inject protected UsageEventDao _usageEventDao; @@ -45,8 +45,7 @@ public class UserVmStateListener implements StateListener