Merge pull request #857 from karuturi/CLOUDSTACK-8885

Cloudstack-8885 added blocked connection listener for rabbitmqeventbus When rabbitmq connections are blocked(for example when rabbitmq is
server is out of space), all the cloudstack threads which does any
action and publishes to rabbitmq(for example login, launch vm etc.) are
all blocked.

Added a blocked connection listener to handle this and unblock the
parent thread.

also updated rabbitmq amqp client to 3.5.4 from 3.4.2

Testing:
I didnt find a way to write tests for this change as of now.
manually tested that login and other cloudstack apis work when the configured rabbitmq server goes out of space and publish actions are blocked.

* pr/857:
  CLOUDSTACK-8885: added blocked connection listener for rabbitmqeventbus
  updating rabbitmq amqp client to 3.5.4 from 3.4.2

Signed-off-by: Remi Bergsma <github@remi.nl>
This commit is contained in:
Remi Bergsma 2016-01-20 15:06:45 +01:00
commit 0403a3522e
2 changed files with 21 additions and 1 deletions

View File

@ -31,7 +31,7 @@
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.4.2</version>
<version>3.5.4</version>
</dependency>
<dependency>
<groupId>org.apache.cloudstack</groupId>

View File

@ -29,6 +29,8 @@ import java.util.concurrent.Executors;
import javax.naming.ConfigurationException;
import com.cloud.utils.exception.CloudRuntimeException;
import com.rabbitmq.client.BlockedListener;
import org.apache.log4j.Logger;
import com.rabbitmq.client.AMQP;
@ -91,6 +93,7 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus {
private ExecutorService executorService;
private static DisconnectHandler disconnectHandler;
private static BlockedConnectionHandler blockedConnectionHandler;
private static final Logger s_logger = Logger.getLogger(RabbitMQEventBus.class);
@Override
@ -134,6 +137,7 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus {
s_subscribers = new ConcurrentHashMap<String, Ternary<String, Channel, EventSubscriber>>();
executorService = Executors.newCachedThreadPool();
disconnectHandler = new DisconnectHandler();
blockedConnectionHandler = new BlockedConnectionHandler();
return true;
}
@ -382,6 +386,7 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus {
}
Connection connection = factory.newConnection();
connection.addShutdownListener(disconnectHandler);
connection.addBlockedListener(blockedConnectionHandler);
s_connection = connection;
return s_connection;
} catch (Exception e) {
@ -505,6 +510,21 @@ public class RabbitMQEventBus extends ManagerBase implements EventBus {
return true;
}
//logic to deal with blocked connection. connections are blocked for example when the rabbitmq server is out of space. https://www.rabbitmq.com/connection-blocked.html
private class BlockedConnectionHandler implements BlockedListener {
@Override
public void handleBlocked(String reason) throws IOException {
s_logger.error("rabbitmq connection is blocked with reason: " + reason);
closeConnection();
throw new CloudRuntimeException("unblocking the parent thread as publishing to rabbitmq server is blocked with reason: " + reason);
}
@Override
public void handleUnblocked() throws IOException {
s_logger.info("rabbitmq connection in unblocked");
}
}
// logic to deal with loss of connection to AMQP server
private class DisconnectHandler implements ShutdownListener {