bug 14301:

1) Drop synchronized call semantic for ClusterManagerImpl.broadcast()
2) Have no choice now but to use an unbound thread pool to notify upper layer. This is to prevent thread starvation when we have cross-management server waitings.

Reviewed-By: Kelven(with unit test)
This commit is contained in:
Kelven Yang 2012-03-23 22:41:37 -07:00
parent 880188466c
commit 7d87f48ef4
7 changed files with 66 additions and 45 deletions

View File

@ -32,6 +32,7 @@ public interface ClusterManager extends Manager {
public static final String ALERT_SUBJECT = "cluster-alert";
public void OnReceiveClusterServicePdu(ClusterServicePdu pdu);
public void executeAsync(String strPeer, long agentId, Command [] cmds, boolean stopOnError);
public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError);
public Answer[] sendToAgent(Long hostId, Command [] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException;

View File

@ -33,10 +33,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -47,7 +45,6 @@ import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.AgentManager.OnError;
import com.cloud.agent.api.AgentControlCommand;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.ChangeAgentAnswer;
import com.cloud.agent.api.ChangeAgentCommand;
@ -309,33 +306,40 @@ public class ClusterManagerImpl implements ClusterManager {
private void onNotifyingClusterPdu() {
while(true) {
try {
ClusterServicePdu pdu = popIncomingClusterPdu(1000);
final ClusterServicePdu pdu = popIncomingClusterPdu(1000);
if(pdu == null)
continue;
if(pdu.isRequest()) {
String result = dispatchClusterServicePdu(pdu);
if(result == null)
result = "";
ClusterServicePdu responsePdu = new ClusterServicePdu();
responsePdu.setSourcePeer(pdu.getDestPeer());
responsePdu.setDestPeer(pdu.getSourcePeer());
responsePdu.setAckSequenceId(pdu.getSequenceId());
responsePdu.setJsonPackage(result);
addOutgoingClusterPdu(responsePdu);
} else {
ClusterServiceRequestPdu requestPdu = popRequestPdu(pdu.getAckSequenceId());
if(requestPdu != null) {
requestPdu.setResponseResult(pdu.getJsonPackage());
synchronized(requestPdu) {
requestPdu.notifyAll();
}
} else {
s_logger.warn("Original request has already been cancelled. pdu: " + _gson.toJson(pdu));
}
}
_executor.execute(new Runnable() {
public void run() {
if(pdu.getPduType() == ClusterServicePdu.PDU_TYPE_RESPONSE) {
ClusterServiceRequestPdu requestPdu = popRequestPdu(pdu.getAckSequenceId());
if(requestPdu != null) {
requestPdu.setResponseResult(pdu.getJsonPackage());
synchronized(requestPdu) {
requestPdu.notifyAll();
}
} else {
s_logger.warn("Original request has already been cancelled. pdu: " + _gson.toJson(pdu));
}
} else {
String result = dispatchClusterServicePdu(pdu);
if(result == null)
result = "";
if(pdu.getPduType() == ClusterServicePdu.PDU_TYPE_REQUEST) {
ClusterServicePdu responsePdu = new ClusterServicePdu();
responsePdu.setPduType(ClusterServicePdu.PDU_TYPE_RESPONSE);
responsePdu.setSourcePeer(pdu.getDestPeer());
responsePdu.setDestPeer(pdu.getSourcePeer());
responsePdu.setAckSequenceId(pdu.getSequenceId());
responsePdu.setJsonPackage(result);
addOutgoingClusterPdu(responsePdu);
}
}
}
});
} catch(Throwable e) {
s_logger.error("Unexcpeted exception: ", e);
}
@ -502,13 +506,24 @@ public class ClusterManagerImpl implements ClusterManager {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Forwarding " + cmds[0].toString() + " to " + peer.getMsid());
}
execute(peerName, agentId, cmds, true);
executeAsync(peerName, agentId, cmds, true);
} catch (Exception e) {
s_logger.warn("Caught exception while talkign to " + peer.getMsid());
}
}
}
@Override
public void executeAsync(String strPeer, long agentId, Command [] cmds, boolean stopOnError) {
ClusterServicePdu pdu = new ClusterServicePdu();
pdu.setSourcePeer(getSelfPeerName());
pdu.setDestPeer(strPeer);
pdu.setAgentId(agentId);
pdu.setJsonPackage(_gson.toJson(cmds, Command[].class));
pdu.setStopOnError(true);
addOutgoingClusterPdu(pdu);
}
@Override
public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError) {
if(s_logger.isDebugEnabled()) {
@ -522,9 +537,8 @@ public class ClusterManagerImpl implements ClusterManager {
pdu.setAgentId(agentId);
pdu.setJsonPackage(_gson.toJson(cmds, Command[].class));
pdu.setStopOnError(stopOnError);
pdu.setRequest(true);
registerRequestPdu(pdu);
_clusterPduOutgoingQueue.add(pdu);
addOutgoingClusterPdu(pdu);
synchronized(pdu) {
try {
@ -1273,10 +1287,7 @@ public class ClusterManagerImpl implements ClusterManager {
}
_executor.execute(getClusterPduSendingTask());
// TODO, make it configurable
for(int i = 0; i < 5; i++)
_executor.execute(getClusterPduNotificationTask());
_executor.execute(getClusterPduNotificationTask());
Adapters<ClusterServiceAdapter> adapters = locator.getAdapters(ClusterServiceAdapter.class);
if (adapters == null || !adapters.isSet()) {

View File

@ -1,7 +1,10 @@
package com.cloud.cluster;
public class ClusterServicePdu {
public final static int PDU_TYPE_MESSAGE = 0;
public final static int PDU_TYPE_REQUEST = 1;
public final static int PDU_TYPE_RESPONSE = 2;
private long sequenceId;
private long ackSequenceId;
@ -12,7 +15,7 @@ public class ClusterServicePdu {
private boolean stopOnError;
private String jsonPackage;
private boolean request = false;
private int pduType = PDU_TYPE_MESSAGE;
private static long s_nextPduSequenceId = 1;
@ -83,11 +86,11 @@ public class ClusterServicePdu {
this.jsonPackage = jsonPackage;
}
public boolean isRequest() {
return request;
public int getPduType() {
return pduType;
}
public void setRequest(boolean value) {
this.request = value;
public void setPduType(int pduType) {
this.pduType = pduType;
}
}

View File

@ -9,6 +9,7 @@ public class ClusterServiceRequestPdu extends ClusterServicePdu {
public ClusterServiceRequestPdu() {
startTick = System.currentTimeMillis();
timeout = -1;
setPduType(PDU_TYPE_REQUEST);
}
public String getResponseResult() {

View File

@ -166,7 +166,7 @@ public class ClusterServiceServletHttpHandler implements HttpRequestHandler {
String agentId = (String)req.getParams().getParameter("agentId");
String gsonPackage = (String)req.getParams().getParameter("gsonPackage");
String stopOnError = (String)req.getParams().getParameter("stopOnError");
String requestAck = (String)req.getParams().getParameter("requestAck");
String pduType = (String)req.getParams().getParameter("pduType");
ClusterServicePdu pdu = new ClusterServicePdu();
pdu.setSourcePeer(sourcePeer);
@ -176,7 +176,7 @@ public class ClusterServiceServletHttpHandler implements HttpRequestHandler {
pdu.setAckSequenceId(Long.parseLong(pduAckSeq));
pdu.setJsonPackage(gsonPackage);
pdu.setStopOnError("1".equals(stopOnError));
pdu.setRequest("1".equals(requestAck));
pdu.setPduType(Integer.parseInt(pduType));
manager.OnReceiveClusterServicePdu(pdu);
return "true";

View File

@ -62,7 +62,7 @@ public class ClusterServiceServletImpl implements ClusterService {
method.addParameter("agentId", Long.toString(pdu.getAgentId()));
method.addParameter("gsonPackage", pdu.getJsonPackage());
method.addParameter("stopOnError", pdu.isStopOnError() ? "1" : "0");
method.addParameter("requestAck", pdu.isRequest() ? "1" : "0");
method.addParameter("pduType", Integer.toString(pdu.getPduType()));
try {
return executePostMethod(client, method);

View File

@ -48,6 +48,11 @@ public class DummyClusterManagerImpl implements ClusterManager {
public void OnReceiveClusterServicePdu(ClusterServicePdu pdu) {
throw new CloudRuntimeException("Unsupported feature");
}
@Override
public void executeAsync(String strPeer, long agentId, Command [] cmds, boolean stopOnError) {
throw new CloudRuntimeException("Unsupported feature");
}
@Override
public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError) {