diff --git a/server/src/com/cloud/cluster/ClusterManager.java b/server/src/com/cloud/cluster/ClusterManager.java index 46fdff3eacf..68adbe93034 100755 --- a/server/src/com/cloud/cluster/ClusterManager.java +++ b/server/src/com/cloud/cluster/ClusterManager.java @@ -32,6 +32,7 @@ public interface ClusterManager extends Manager { public static final String ALERT_SUBJECT = "cluster-alert"; public void OnReceiveClusterServicePdu(ClusterServicePdu pdu); + public void executeAsync(String strPeer, long agentId, Command [] cmds, boolean stopOnError); public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError); public Answer[] sendToAgent(Long hostId, Command [] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException; diff --git a/server/src/com/cloud/cluster/ClusterManagerImpl.java b/server/src/com/cloud/cluster/ClusterManagerImpl.java index 6a42fce107b..f5ba9eb2c16 100755 --- a/server/src/com/cloud/cluster/ClusterManagerImpl.java +++ b/server/src/com/cloud/cluster/ClusterManagerImpl.java @@ -33,10 +33,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -47,7 +45,6 @@ import org.apache.log4j.Logger; import com.cloud.agent.AgentManager; import com.cloud.agent.AgentManager.OnError; -import com.cloud.agent.api.AgentControlCommand; import com.cloud.agent.api.Answer; import com.cloud.agent.api.ChangeAgentAnswer; import com.cloud.agent.api.ChangeAgentCommand; @@ -309,33 +306,40 @@ public class ClusterManagerImpl implements ClusterManager { private void onNotifyingClusterPdu() { while(true) { try { - ClusterServicePdu pdu = popIncomingClusterPdu(1000); + final ClusterServicePdu pdu = popIncomingClusterPdu(1000); if(pdu == null) continue; - - if(pdu.isRequest()) { - String result = dispatchClusterServicePdu(pdu); - if(result == null) - result = ""; - - ClusterServicePdu responsePdu = new ClusterServicePdu(); - responsePdu.setSourcePeer(pdu.getDestPeer()); - responsePdu.setDestPeer(pdu.getSourcePeer()); - responsePdu.setAckSequenceId(pdu.getSequenceId()); - responsePdu.setJsonPackage(result); - - addOutgoingClusterPdu(responsePdu); - } else { - ClusterServiceRequestPdu requestPdu = popRequestPdu(pdu.getAckSequenceId()); - if(requestPdu != null) { - requestPdu.setResponseResult(pdu.getJsonPackage()); - synchronized(requestPdu) { - requestPdu.notifyAll(); - } - } else { - s_logger.warn("Original request has already been cancelled. pdu: " + _gson.toJson(pdu)); - } - } + + _executor.execute(new Runnable() { + public void run() { + if(pdu.getPduType() == ClusterServicePdu.PDU_TYPE_RESPONSE) { + ClusterServiceRequestPdu requestPdu = popRequestPdu(pdu.getAckSequenceId()); + if(requestPdu != null) { + requestPdu.setResponseResult(pdu.getJsonPackage()); + synchronized(requestPdu) { + requestPdu.notifyAll(); + } + } else { + s_logger.warn("Original request has already been cancelled. pdu: " + _gson.toJson(pdu)); + } + } else { + String result = dispatchClusterServicePdu(pdu); + if(result == null) + result = ""; + + if(pdu.getPduType() == ClusterServicePdu.PDU_TYPE_REQUEST) { + ClusterServicePdu responsePdu = new ClusterServicePdu(); + responsePdu.setPduType(ClusterServicePdu.PDU_TYPE_RESPONSE); + responsePdu.setSourcePeer(pdu.getDestPeer()); + responsePdu.setDestPeer(pdu.getSourcePeer()); + responsePdu.setAckSequenceId(pdu.getSequenceId()); + responsePdu.setJsonPackage(result); + + addOutgoingClusterPdu(responsePdu); + } + } + } + }); } catch(Throwable e) { s_logger.error("Unexcpeted exception: ", e); } @@ -502,13 +506,24 @@ public class ClusterManagerImpl implements ClusterManager { if (s_logger.isDebugEnabled()) { s_logger.debug("Forwarding " + cmds[0].toString() + " to " + peer.getMsid()); } - execute(peerName, agentId, cmds, true); + executeAsync(peerName, agentId, cmds, true); } catch (Exception e) { s_logger.warn("Caught exception while talkign to " + peer.getMsid()); } } } + @Override + public void executeAsync(String strPeer, long agentId, Command [] cmds, boolean stopOnError) { + ClusterServicePdu pdu = new ClusterServicePdu(); + pdu.setSourcePeer(getSelfPeerName()); + pdu.setDestPeer(strPeer); + pdu.setAgentId(agentId); + pdu.setJsonPackage(_gson.toJson(cmds, Command[].class)); + pdu.setStopOnError(true); + addOutgoingClusterPdu(pdu); + } + @Override public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError) { if(s_logger.isDebugEnabled()) { @@ -522,9 +537,8 @@ public class ClusterManagerImpl implements ClusterManager { pdu.setAgentId(agentId); pdu.setJsonPackage(_gson.toJson(cmds, Command[].class)); pdu.setStopOnError(stopOnError); - pdu.setRequest(true); registerRequestPdu(pdu); - _clusterPduOutgoingQueue.add(pdu); + addOutgoingClusterPdu(pdu); synchronized(pdu) { try { @@ -1273,10 +1287,7 @@ public class ClusterManagerImpl implements ClusterManager { } _executor.execute(getClusterPduSendingTask()); - - // TODO, make it configurable - for(int i = 0; i < 5; i++) - _executor.execute(getClusterPduNotificationTask()); + _executor.execute(getClusterPduNotificationTask()); Adapters adapters = locator.getAdapters(ClusterServiceAdapter.class); if (adapters == null || !adapters.isSet()) { diff --git a/server/src/com/cloud/cluster/ClusterServicePdu.java b/server/src/com/cloud/cluster/ClusterServicePdu.java index f0976c25be3..701ccae51d6 100644 --- a/server/src/com/cloud/cluster/ClusterServicePdu.java +++ b/server/src/com/cloud/cluster/ClusterServicePdu.java @@ -1,7 +1,10 @@ package com.cloud.cluster; public class ClusterServicePdu { - + public final static int PDU_TYPE_MESSAGE = 0; + public final static int PDU_TYPE_REQUEST = 1; + public final static int PDU_TYPE_RESPONSE = 2; + private long sequenceId; private long ackSequenceId; @@ -12,7 +15,7 @@ public class ClusterServicePdu { private boolean stopOnError; private String jsonPackage; - private boolean request = false; + private int pduType = PDU_TYPE_MESSAGE; private static long s_nextPduSequenceId = 1; @@ -83,11 +86,11 @@ public class ClusterServicePdu { this.jsonPackage = jsonPackage; } - public boolean isRequest() { - return request; + public int getPduType() { + return pduType; } - - public void setRequest(boolean value) { - this.request = value; + + public void setPduType(int pduType) { + this.pduType = pduType; } } diff --git a/server/src/com/cloud/cluster/ClusterServiceRequestPdu.java b/server/src/com/cloud/cluster/ClusterServiceRequestPdu.java index 1088d771503..599c68ab10a 100644 --- a/server/src/com/cloud/cluster/ClusterServiceRequestPdu.java +++ b/server/src/com/cloud/cluster/ClusterServiceRequestPdu.java @@ -9,6 +9,7 @@ public class ClusterServiceRequestPdu extends ClusterServicePdu { public ClusterServiceRequestPdu() { startTick = System.currentTimeMillis(); timeout = -1; + setPduType(PDU_TYPE_REQUEST); } public String getResponseResult() { diff --git a/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java b/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java index f1066699424..3d0dabd4071 100755 --- a/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java +++ b/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java @@ -166,7 +166,7 @@ public class ClusterServiceServletHttpHandler implements HttpRequestHandler { String agentId = (String)req.getParams().getParameter("agentId"); String gsonPackage = (String)req.getParams().getParameter("gsonPackage"); String stopOnError = (String)req.getParams().getParameter("stopOnError"); - String requestAck = (String)req.getParams().getParameter("requestAck"); + String pduType = (String)req.getParams().getParameter("pduType"); ClusterServicePdu pdu = new ClusterServicePdu(); pdu.setSourcePeer(sourcePeer); @@ -176,7 +176,7 @@ public class ClusterServiceServletHttpHandler implements HttpRequestHandler { pdu.setAckSequenceId(Long.parseLong(pduAckSeq)); pdu.setJsonPackage(gsonPackage); pdu.setStopOnError("1".equals(stopOnError)); - pdu.setRequest("1".equals(requestAck)); + pdu.setPduType(Integer.parseInt(pduType)); manager.OnReceiveClusterServicePdu(pdu); return "true"; diff --git a/server/src/com/cloud/cluster/ClusterServiceServletImpl.java b/server/src/com/cloud/cluster/ClusterServiceServletImpl.java index f61301d20cd..ca1ec761618 100644 --- a/server/src/com/cloud/cluster/ClusterServiceServletImpl.java +++ b/server/src/com/cloud/cluster/ClusterServiceServletImpl.java @@ -62,7 +62,7 @@ public class ClusterServiceServletImpl implements ClusterService { method.addParameter("agentId", Long.toString(pdu.getAgentId())); method.addParameter("gsonPackage", pdu.getJsonPackage()); method.addParameter("stopOnError", pdu.isStopOnError() ? "1" : "0"); - method.addParameter("requestAck", pdu.isRequest() ? "1" : "0"); + method.addParameter("pduType", Integer.toString(pdu.getPduType())); try { return executePostMethod(client, method); diff --git a/server/src/com/cloud/cluster/DummyClusterManagerImpl.java b/server/src/com/cloud/cluster/DummyClusterManagerImpl.java index 999e1a98d58..c8fe6bbf7cb 100755 --- a/server/src/com/cloud/cluster/DummyClusterManagerImpl.java +++ b/server/src/com/cloud/cluster/DummyClusterManagerImpl.java @@ -48,6 +48,11 @@ public class DummyClusterManagerImpl implements ClusterManager { public void OnReceiveClusterServicePdu(ClusterServicePdu pdu) { throw new CloudRuntimeException("Unsupported feature"); } + + @Override + public void executeAsync(String strPeer, long agentId, Command [] cmds, boolean stopOnError) { + throw new CloudRuntimeException("Unsupported feature"); + } @Override public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError) {