diff --git a/plugins/event-bus/rabbitmq/pom.xml b/plugins/event-bus/rabbitmq/pom.xml index 71b8c369d3c..d8a4430f2f4 100644 --- a/plugins/event-bus/rabbitmq/pom.xml +++ b/plugins/event-bus/rabbitmq/pom.xml @@ -31,7 +31,7 @@ com.rabbitmq amqp-client - 3.4.2 + 3.5.4 org.apache.cloudstack diff --git a/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java b/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java index eb989dd2871..5c0d6ce6047 100644 --- a/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java +++ b/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java @@ -29,6 +29,8 @@ import java.util.concurrent.Executors; import javax.naming.ConfigurationException; +import com.cloud.utils.exception.CloudRuntimeException; +import com.rabbitmq.client.BlockedListener; import org.apache.log4j.Logger; import com.rabbitmq.client.AMQP; @@ -91,6 +93,7 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus { private ExecutorService executorService; private static DisconnectHandler disconnectHandler; + private static BlockedConnectionHandler blockedConnectionHandler; private static final Logger s_logger = Logger.getLogger(RabbitMQEventBus.class); @Override @@ -134,6 +137,7 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus { s_subscribers = new ConcurrentHashMap>(); executorService = Executors.newCachedThreadPool(); disconnectHandler = new DisconnectHandler(); + blockedConnectionHandler = new BlockedConnectionHandler(); return true; } @@ -382,6 +386,7 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus { } Connection connection = factory.newConnection(); connection.addShutdownListener(disconnectHandler); + connection.addBlockedListener(blockedConnectionHandler); s_connection = connection; return s_connection; } catch (Exception e) { @@ -505,6 +510,21 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus { return true; } + //logic to deal with blocked connection. connections are blocked for example when the rabbitmq server is out of space. https://www.rabbitmq.com/connection-blocked.html + private class BlockedConnectionHandler implements BlockedListener { + + @Override + public void handleBlocked(String reason) throws IOException { + s_logger.error("rabbitmq connection is blocked with reason: " + reason); + closeConnection(); + throw new CloudRuntimeException("unblocking the parent thread as publishing to rabbitmq server is blocked with reason: " + reason); + } + + @Override + public void handleUnblocked() throws IOException { + s_logger.info("rabbitmq connection in unblocked"); + } + } // logic to deal with loss of connection to AMQP server private class DisconnectHandler implements ShutdownListener {