CLOUDSTACK-6305: protect lock leaking from unhandled exception

This commit is contained in:
Kelven Yang 2014-03-28 15:28:39 -07:00
parent 4e61e49143
commit b6cc6dd8e3

View File

@ -58,10 +58,13 @@ public class MessageBusBase implements MessageBus {
assert (subject != null);
assert (subscriber != null);
if (_gate.enter()) {
SubscriptionNode current = locate(subject, null, true);
assert (current != null);
current.addSubscriber(subscriber);
_gate.leave();
try {
SubscriptionNode current = locate(subject, null, true);
assert (current != null);
current.addSubscriber(subscriber);
} finally {
_gate.leave();
}
} else {
synchronized (_pendingActions) {
_pendingActions.add(new ActionRecord(ActionType.Subscribe, subject, subscriber));
@ -72,14 +75,17 @@ public class MessageBusBase implements MessageBus {
@Override
public void unsubscribe(String subject, MessageSubscriber subscriber) {
if (_gate.enter()) {
if (subject != null) {
SubscriptionNode current = locate(subject, null, false);
if (current != null)
current.removeSubscriber(subscriber, false);
} else {
_subscriberRoot.removeSubscriber(subscriber, true);
try {
if (subject != null) {
SubscriptionNode current = locate(subject, null, false);
if (current != null)
current.removeSubscriber(subscriber, false);
} else {
_subscriberRoot.removeSubscriber(subscriber, true);
}
} finally {
_gate.leave();
}
_gate.leave();
} else {
synchronized (_pendingActions) {
_pendingActions.add(new ActionRecord(ActionType.Unsubscribe, subject, subscriber));
@ -90,9 +96,12 @@ public class MessageBusBase implements MessageBus {
@Override
public void clearAll() {
if (_gate.enter()) {
_subscriberRoot.clearAll();
doPrune();
_gate.leave();
try {
_subscriberRoot.clearAll();
doPrune();
} finally {
_gate.leave();
}
} else {
synchronized (_pendingActions) {
_pendingActions.add(new ActionRecord(ActionType.ClearAll, null, null));
@ -103,8 +112,11 @@ public class MessageBusBase implements MessageBus {
@Override
public void prune() {
if (_gate.enter()) {
doPrune();
_gate.leave();
try {
doPrune();
} finally {
_gate.leave();
}
} else {
synchronized (_pendingActions) {
_pendingActions.add(new ActionRecord(ActionType.Prune, null, null));
@ -132,18 +144,19 @@ public class MessageBusBase implements MessageBus {
public void publish(String senderAddress, String subject, PublishScope scope, Object args) {
if (_gate.enter(true)) {
try {
List<SubscriptionNode> chainFromTop = new ArrayList<SubscriptionNode>();
SubscriptionNode current = locate(subject, chainFromTop, false);
List<SubscriptionNode> chainFromTop = new ArrayList<SubscriptionNode>();
SubscriptionNode current = locate(subject, chainFromTop, false);
if (current != null)
current.notifySubscribers(senderAddress, subject, args);
if (current != null)
current.notifySubscribers(senderAddress, subject, args);
Collections.reverse(chainFromTop);
for (SubscriptionNode node : chainFromTop)
node.notifySubscribers(senderAddress, subject, args);
_gate.leave();
Collections.reverse(chainFromTop);
for (SubscriptionNode node : chainFromTop)
node.notifySubscribers(senderAddress, subject, args);
} finally {
_gate.leave();
}
}
}