CLOUDSTACK-8885: added blocked connection listener for rabbitmqeventbus

When rabbitmq connections are blocked(for example when rabbitmq is
server is out of space), all the cloudstack threads which does any
action and publishes to rabbitmq(for example login, launch vm etc.) are
all blocked.

Added a blocked connection listener to handle this and unblock the
parent thread.
This commit is contained in:
Rajani Karuturi 2015-09-21 10:27:48 +05:30
parent 693a001c63
commit a723988aec

View File

@ -30,6 +30,8 @@ import java.util.concurrent.Executors;
import javax.ejb.Local; import javax.ejb.Local;
import javax.naming.ConfigurationException; import javax.naming.ConfigurationException;
import com.cloud.utils.exception.CloudRuntimeException;
import com.rabbitmq.client.BlockedListener;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.AMQP;
@ -93,6 +95,7 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus {
private ExecutorService executorService; private ExecutorService executorService;
private static DisconnectHandler disconnectHandler; private static DisconnectHandler disconnectHandler;
private static BlockedConnectionHandler blockedConnectionHandler;
private static final Logger s_logger = Logger.getLogger(RabbitMQEventBus.class); private static final Logger s_logger = Logger.getLogger(RabbitMQEventBus.class);
@Override @Override
@ -136,6 +139,7 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus {
s_subscribers = new ConcurrentHashMap<String, Ternary<String, Channel, EventSubscriber>>(); s_subscribers = new ConcurrentHashMap<String, Ternary<String, Channel, EventSubscriber>>();
executorService = Executors.newCachedThreadPool(); executorService = Executors.newCachedThreadPool();
disconnectHandler = new DisconnectHandler(); disconnectHandler = new DisconnectHandler();
blockedConnectionHandler = new BlockedConnectionHandler();
return true; return true;
} }
@ -384,6 +388,7 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus {
} }
Connection connection = factory.newConnection(); Connection connection = factory.newConnection();
connection.addShutdownListener(disconnectHandler); connection.addShutdownListener(disconnectHandler);
connection.addBlockedListener(blockedConnectionHandler);
s_connection = connection; s_connection = connection;
return s_connection; return s_connection;
} catch (Exception e) { } catch (Exception e) {
@ -507,6 +512,21 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus {
return true; return true;
} }
//logic to deal with blocked connection. connections are blocked for example when the rabbitmq server is out of space. https://www.rabbitmq.com/connection-blocked.html
private class BlockedConnectionHandler implements BlockedListener {
@Override
public void handleBlocked(String reason) throws IOException {
s_logger.error("rabbitmq connection is blocked with reason: " + reason);
closeConnection();
throw new CloudRuntimeException("unblocking the parent thread as publishing to rabbitmq server is blocked with reason: " + reason);
}
@Override
public void handleUnblocked() throws IOException {
s_logger.info("rabbitmq connection in unblocked");
}
}
// logic to deal with loss of connection to AMQP server // logic to deal with loss of connection to AMQP server
private class DisconnectHandler implements ShutdownListener { private class DisconnectHandler implements ShutdownListener {