From 7d87f48ef49b388908d27c75f1a3aa15862e47d7 Mon Sep 17 00:00:00 2001 From: Kelven Yang Date: Fri, 23 Mar 2012 22:41:37 -0700 Subject: [PATCH] bug 14301: 1) Drop synchronized call semantic for ClusterManagerImpl.broadcast() 2) Have no choice now but to use an unbound thread pool to notify upper layer. This is to prevent thread starvation when we have cross-management server waitings. Reviewed-By: Kelven(with unit test) --- .../src/com/cloud/cluster/ClusterManager.java | 1 + .../com/cloud/cluster/ClusterManagerImpl.java | 81 +++++++++++-------- .../com/cloud/cluster/ClusterServicePdu.java | 17 ++-- .../cluster/ClusterServiceRequestPdu.java | 1 + .../ClusterServiceServletHttpHandler.java | 4 +- .../cluster/ClusterServiceServletImpl.java | 2 +- .../cluster/DummyClusterManagerImpl.java | 5 ++ 7 files changed, 66 insertions(+), 45 deletions(-) diff --git a/server/src/com/cloud/cluster/ClusterManager.java b/server/src/com/cloud/cluster/ClusterManager.java index 46fdff3eacf..68adbe93034 100755 --- a/server/src/com/cloud/cluster/ClusterManager.java +++ b/server/src/com/cloud/cluster/ClusterManager.java @@ -32,6 +32,7 @@ public interface ClusterManager extends Manager { public static final String ALERT_SUBJECT = "cluster-alert"; public void OnReceiveClusterServicePdu(ClusterServicePdu pdu); + public void executeAsync(String strPeer, long agentId, Command [] cmds, boolean stopOnError); public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError); public Answer[] sendToAgent(Long hostId, Command [] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException; diff --git a/server/src/com/cloud/cluster/ClusterManagerImpl.java b/server/src/com/cloud/cluster/ClusterManagerImpl.java index 6a42fce107b..f5ba9eb2c16 100755 --- a/server/src/com/cloud/cluster/ClusterManagerImpl.java +++ b/server/src/com/cloud/cluster/ClusterManagerImpl.java @@ -33,10 +33,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -47,7 +45,6 @@ import org.apache.log4j.Logger; import com.cloud.agent.AgentManager; import com.cloud.agent.AgentManager.OnError; -import com.cloud.agent.api.AgentControlCommand; import com.cloud.agent.api.Answer; import com.cloud.agent.api.ChangeAgentAnswer; import com.cloud.agent.api.ChangeAgentCommand; @@ -309,33 +306,40 @@ public class ClusterManagerImpl implements ClusterManager { private void onNotifyingClusterPdu() { while(true) { try { - ClusterServicePdu pdu = popIncomingClusterPdu(1000); + final ClusterServicePdu pdu = popIncomingClusterPdu(1000); if(pdu == null) continue; - - if(pdu.isRequest()) { - String result = dispatchClusterServicePdu(pdu); - if(result == null) - result = ""; - - ClusterServicePdu responsePdu = new ClusterServicePdu(); - responsePdu.setSourcePeer(pdu.getDestPeer()); - responsePdu.setDestPeer(pdu.getSourcePeer()); - responsePdu.setAckSequenceId(pdu.getSequenceId()); - responsePdu.setJsonPackage(result); - - addOutgoingClusterPdu(responsePdu); - } else { - ClusterServiceRequestPdu requestPdu = popRequestPdu(pdu.getAckSequenceId()); - if(requestPdu != null) { - requestPdu.setResponseResult(pdu.getJsonPackage()); - synchronized(requestPdu) { - requestPdu.notifyAll(); - } - } else { - s_logger.warn("Original request has already been cancelled. pdu: " + _gson.toJson(pdu)); - } - } + + _executor.execute(new Runnable() { + public void run() { + if(pdu.getPduType() == ClusterServicePdu.PDU_TYPE_RESPONSE) { + ClusterServiceRequestPdu requestPdu = popRequestPdu(pdu.getAckSequenceId()); + if(requestPdu != null) { + requestPdu.setResponseResult(pdu.getJsonPackage()); + synchronized(requestPdu) { + requestPdu.notifyAll(); + } + } else { + s_logger.warn("Original request has already been cancelled. pdu: " + _gson.toJson(pdu)); + } + } else { + String result = dispatchClusterServicePdu(pdu); + if(result == null) + result = ""; + + if(pdu.getPduType() == ClusterServicePdu.PDU_TYPE_REQUEST) { + ClusterServicePdu responsePdu = new ClusterServicePdu(); + responsePdu.setPduType(ClusterServicePdu.PDU_TYPE_RESPONSE); + responsePdu.setSourcePeer(pdu.getDestPeer()); + responsePdu.setDestPeer(pdu.getSourcePeer()); + responsePdu.setAckSequenceId(pdu.getSequenceId()); + responsePdu.setJsonPackage(result); + + addOutgoingClusterPdu(responsePdu); + } + } + } + }); } catch(Throwable e) { s_logger.error("Unexcpeted exception: ", e); } @@ -502,13 +506,24 @@ public class ClusterManagerImpl implements ClusterManager { if (s_logger.isDebugEnabled()) { s_logger.debug("Forwarding " + cmds[0].toString() + " to " + peer.getMsid()); } - execute(peerName, agentId, cmds, true); + executeAsync(peerName, agentId, cmds, true); } catch (Exception e) { s_logger.warn("Caught exception while talkign to " + peer.getMsid()); } } } + @Override + public void executeAsync(String strPeer, long agentId, Command [] cmds, boolean stopOnError) { + ClusterServicePdu pdu = new ClusterServicePdu(); + pdu.setSourcePeer(getSelfPeerName()); + pdu.setDestPeer(strPeer); + pdu.setAgentId(agentId); + pdu.setJsonPackage(_gson.toJson(cmds, Command[].class)); + pdu.setStopOnError(true); + addOutgoingClusterPdu(pdu); + } + @Override public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError) { if(s_logger.isDebugEnabled()) { @@ -522,9 +537,8 @@ public class ClusterManagerImpl implements ClusterManager { pdu.setAgentId(agentId); pdu.setJsonPackage(_gson.toJson(cmds, Command[].class)); pdu.setStopOnError(stopOnError); - pdu.setRequest(true); registerRequestPdu(pdu); - _clusterPduOutgoingQueue.add(pdu); + addOutgoingClusterPdu(pdu); synchronized(pdu) { try { @@ -1273,10 +1287,7 @@ public class ClusterManagerImpl implements ClusterManager { } _executor.execute(getClusterPduSendingTask()); - - // TODO, make it configurable - for(int i = 0; i < 5; i++) - _executor.execute(getClusterPduNotificationTask()); + _executor.execute(getClusterPduNotificationTask()); Adapters adapters = locator.getAdapters(ClusterServiceAdapter.class); if (adapters == null || !adapters.isSet()) { diff --git a/server/src/com/cloud/cluster/ClusterServicePdu.java b/server/src/com/cloud/cluster/ClusterServicePdu.java index f0976c25be3..701ccae51d6 100644 --- a/server/src/com/cloud/cluster/ClusterServicePdu.java +++ b/server/src/com/cloud/cluster/ClusterServicePdu.java @@ -1,7 +1,10 @@ package com.cloud.cluster; public class ClusterServicePdu { - + public final static int PDU_TYPE_MESSAGE = 0; + public final static int PDU_TYPE_REQUEST = 1; + public final static int PDU_TYPE_RESPONSE = 2; + private long sequenceId; private long ackSequenceId; @@ -12,7 +15,7 @@ public class ClusterServicePdu { private boolean stopOnError; private String jsonPackage; - private boolean request = false; + private int pduType = PDU_TYPE_MESSAGE; private static long s_nextPduSequenceId = 1; @@ -83,11 +86,11 @@ public class ClusterServicePdu { this.jsonPackage = jsonPackage; } - public boolean isRequest() { - return request; + public int getPduType() { + return pduType; } - - public void setRequest(boolean value) { - this.request = value; + + public void setPduType(int pduType) { + this.pduType = pduType; } } diff --git a/server/src/com/cloud/cluster/ClusterServiceRequestPdu.java b/server/src/com/cloud/cluster/ClusterServiceRequestPdu.java index 1088d771503..599c68ab10a 100644 --- a/server/src/com/cloud/cluster/ClusterServiceRequestPdu.java +++ b/server/src/com/cloud/cluster/ClusterServiceRequestPdu.java @@ -9,6 +9,7 @@ public class ClusterServiceRequestPdu extends ClusterServicePdu { public ClusterServiceRequestPdu() { startTick = System.currentTimeMillis(); timeout = -1; + setPduType(PDU_TYPE_REQUEST); } public String getResponseResult() { diff --git a/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java b/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java index f1066699424..3d0dabd4071 100755 --- a/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java +++ b/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java @@ -166,7 +166,7 @@ public class ClusterServiceServletHttpHandler implements HttpRequestHandler { String agentId = (String)req.getParams().getParameter("agentId"); String gsonPackage = (String)req.getParams().getParameter("gsonPackage"); String stopOnError = (String)req.getParams().getParameter("stopOnError"); - String requestAck = (String)req.getParams().getParameter("requestAck"); + String pduType = (String)req.getParams().getParameter("pduType"); ClusterServicePdu pdu = new ClusterServicePdu(); pdu.setSourcePeer(sourcePeer); @@ -176,7 +176,7 @@ public class ClusterServiceServletHttpHandler implements HttpRequestHandler { pdu.setAckSequenceId(Long.parseLong(pduAckSeq)); pdu.setJsonPackage(gsonPackage); pdu.setStopOnError("1".equals(stopOnError)); - pdu.setRequest("1".equals(requestAck)); + pdu.setPduType(Integer.parseInt(pduType)); manager.OnReceiveClusterServicePdu(pdu); return "true"; diff --git a/server/src/com/cloud/cluster/ClusterServiceServletImpl.java b/server/src/com/cloud/cluster/ClusterServiceServletImpl.java index f61301d20cd..ca1ec761618 100644 --- a/server/src/com/cloud/cluster/ClusterServiceServletImpl.java +++ b/server/src/com/cloud/cluster/ClusterServiceServletImpl.java @@ -62,7 +62,7 @@ public class ClusterServiceServletImpl implements ClusterService { method.addParameter("agentId", Long.toString(pdu.getAgentId())); method.addParameter("gsonPackage", pdu.getJsonPackage()); method.addParameter("stopOnError", pdu.isStopOnError() ? "1" : "0"); - method.addParameter("requestAck", pdu.isRequest() ? "1" : "0"); + method.addParameter("pduType", Integer.toString(pdu.getPduType())); try { return executePostMethod(client, method); diff --git a/server/src/com/cloud/cluster/DummyClusterManagerImpl.java b/server/src/com/cloud/cluster/DummyClusterManagerImpl.java index 999e1a98d58..c8fe6bbf7cb 100755 --- a/server/src/com/cloud/cluster/DummyClusterManagerImpl.java +++ b/server/src/com/cloud/cluster/DummyClusterManagerImpl.java @@ -48,6 +48,11 @@ public class DummyClusterManagerImpl implements ClusterManager { public void OnReceiveClusterServicePdu(ClusterServicePdu pdu) { throw new CloudRuntimeException("Unsupported feature"); } + + @Override + public void executeAsync(String strPeer, long agentId, Command [] cmds, boolean stopOnError) { + throw new CloudRuntimeException("Unsupported feature"); + } @Override public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError) {