diff --git a/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java b/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java index e53d2e9ad88..29af6877b7d 100644 --- a/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java +++ b/plugins/event-bus/rabbitmq/src/org/apache/cloudstack/mom/rabbitmq/RabbitMQEventBus.java @@ -30,6 +30,8 @@ import java.util.concurrent.Executors; import javax.ejb.Local; import javax.naming.ConfigurationException; +import com.cloud.utils.exception.CloudRuntimeException; +import com.rabbitmq.client.BlockedListener; import org.apache.log4j.Logger; import com.rabbitmq.client.AMQP; @@ -93,6 +95,7 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus { private ExecutorService executorService; private static DisconnectHandler disconnectHandler; + private static BlockedConnectionHandler blockedConnectionHandler; private static final Logger s_logger = Logger.getLogger(RabbitMQEventBus.class); @Override @@ -136,6 +139,7 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus { s_subscribers = new ConcurrentHashMap>(); executorService = Executors.newCachedThreadPool(); disconnectHandler = new DisconnectHandler(); + blockedConnectionHandler = new BlockedConnectionHandler(); return true; } @@ -384,6 +388,7 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus { } Connection connection = factory.newConnection(); connection.addShutdownListener(disconnectHandler); + connection.addBlockedListener(blockedConnectionHandler); s_connection = connection; return s_connection; } catch (Exception e) { @@ -507,6 +512,21 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus { return true; } + //logic to deal with blocked connection. connections are blocked for example when the rabbitmq server is out of space. https://www.rabbitmq.com/connection-blocked.html + private class BlockedConnectionHandler implements BlockedListener { + + @Override + public void handleBlocked(String reason) throws IOException { + s_logger.error("rabbitmq connection is blocked with reason: " + reason); + closeConnection(); + throw new CloudRuntimeException("unblocking the parent thread as publishing to rabbitmq server is blocked with reason: " + reason); + } + + @Override + public void handleUnblocked() throws IOException { + s_logger.info("rabbitmq connection in unblocked"); + } + } // logic to deal with loss of connection to AMQP server private class DisconnectHandler implements ShutdownListener {