From 693a001c63c1cd8e700f168b5c747f7246061d91 Mon Sep 17 00:00:00 2001 From: Rajani Karuturi Date: Mon, 21 Sep 2015 10:15:37 +0530 Subject: [PATCH 1/2] updating rabbitmq amqp client to 3.5.4 from 3.4.2 --- plugins/event-bus/rabbitmq/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/event-bus/rabbitmq/pom.xml b/plugins/event-bus/rabbitmq/pom.xml index 6f8120f6d31..014e330d6a6 100644 --- a/plugins/event-bus/rabbitmq/pom.xml +++ b/plugins/event-bus/rabbitmq/pom.xml @@ -31,7 +31,7 @@ com.rabbitmq amqp-client - 3.4.2 + 3.5.4 org.apache.cloudstack From a723988aec901eec149361e8492370a490d53e5a Mon Sep 17 00:00:00 2001 From: Rajani Karuturi Date: Mon, 21 Sep 2015 10:27:48 +0530 Subject: [PATCH 2/2] CLOUDSTACK-8885: added blocked connection listener for rabbitmqeventbus When rabbitmq connections are blocked(for example when rabbitmq is server is out of space), all the cloudstack threads which does any action and publishes to rabbitmq(for example login, launch vm etc.) are all blocked. Added a blocked connection listener to handle this and unblock the parent thread. --- .../mom/rabbitmq/RabbitMQEventBus.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java b/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java index e53d2e9ad88..29af6877b7d 100644 --- a/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java +++ b/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java @@ -30,6 +30,8 @@ import java.util.concurrent.Executors; import javax.ejb.Local; import javax.naming.ConfigurationException; +import com.cloud.utils.exception.CloudRuntimeException; +import com.rabbitmq.client.BlockedListener; import org.apache.log4j.Logger; import com.rabbitmq.client.AMQP; @@ -93,6 +95,7 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus { private ExecutorService executorService; private static DisconnectHandler disconnectHandler; + private static BlockedConnectionHandler blockedConnectionHandler; private static final Logger s_logger = Logger.getLogger(RabbitMQEventBus.class); @Override @@ -136,6 +139,7 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus { s_subscribers = new ConcurrentHashMap>(); executorService = Executors.newCachedThreadPool(); disconnectHandler = new DisconnectHandler(); + blockedConnectionHandler = new BlockedConnectionHandler(); return true; } @@ -384,6 +388,7 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus { } Connection connection = factory.newConnection(); connection.addShutdownListener(disconnectHandler); + connection.addBlockedListener(blockedConnectionHandler); s_connection = connection; return s_connection; } catch (Exception e) { @@ -507,6 +512,21 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus { return true; } + //logic to deal with blocked connection. connections are blocked for example when the rabbitmq server is out of space. https://www.rabbitmq.com/connection-blocked.html + private class BlockedConnectionHandler implements BlockedListener { + + @Override + public void handleBlocked(String reason) throws IOException { + s_logger.error("rabbitmq connection is blocked with reason: " + reason); + closeConnection(); + throw new CloudRuntimeException("unblocking the parent thread as publishing to rabbitmq server is blocked with reason: " + reason); + } + + @Override + public void handleUnblocked() throws IOException { + s_logger.info("rabbitmq connection in unblocked"); + } + } // logic to deal with loss of connection to AMQP server private class DisconnectHandler implements ShutdownListener {