From 516ef2c67f21dbbf93e046d9821b2f406bb860ec Mon Sep 17 00:00:00 2001 From: Edison Su Date: Fri, 20 May 2011 10:50:59 -0400 Subject: [PATCH] put pingtask is in a seperate thread pool --- agent/src/com/cloud/agent/Agent.java | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/agent/src/com/cloud/agent/Agent.java b/agent/src/com/cloud/agent/Agent.java index 966753e104e..c3247e9550a 100755 --- a/agent/src/com/cloud/agent/Agent.java +++ b/agent/src/com/cloud/agent/Agent.java @@ -28,6 +28,10 @@ import java.util.List; import java.util.Map; import java.util.Timer; import java.util.TimerTask; + +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.naming.ConfigurationException; @@ -52,6 +56,7 @@ import com.cloud.exception.AgentControlChannelException; import com.cloud.resource.ServerResource; import com.cloud.utils.PropertiesUtil; import com.cloud.utils.backoff.BackoffAlgorithm; +import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.nio.HandlerFactory; import com.cloud.utils.nio.Link; @@ -112,6 +117,8 @@ public class Agent implements HandlerFactory, IAgentControl { StartupTask _startup = null; boolean _reconnectAllowed = true; + //For time sentitive task, e.g. PingTask + private ThreadPoolExecutor _ugentTaskPool; // for simulator use only public Agent(IAgentShell shell) { @@ -121,6 +128,10 @@ public class Agent implements HandlerFactory, IAgentControl { _connection = new NioClient("Agent", _shell.getHost(), _shell.getPort(), _shell.getWorkers(), this); Runtime.getRuntime().addShutdownHook(new ShutdownThread(this)); + + _ugentTaskPool = new ThreadPoolExecutor(shell.getPingRetries(), 2 * shell.getPingRetries(), 10, TimeUnit.MINUTES, + new SynchronousQueue(), new NamedThreadFactory("UgentTask") + ); } public Agent(IAgentShell shell, int localAgentId, ServerResource resource) throws ConfigurationException { @@ -152,6 +163,10 @@ public class Agent implements HandlerFactory, IAgentControl { s_logger.debug("Adding shutdown hook"); Runtime.getRuntime().addShutdownHook(new ShutdownThread(this)); + _ugentTaskPool = new ThreadPoolExecutor(shell.getPingRetries(), 2 * shell.getPingRetries(), 10, TimeUnit.MINUTES, + new SynchronousQueue(), new NamedThreadFactory("UgentTask") + ); + s_logger.info("Agent [id = " + (_id != null ? _id : "new") + " : type = " + getResourceName() + " : zone = " + _shell.getZone() + " : pod = " + _shell.getPod() + " : workers = " + _shell.getWorkers() + " : host = " + _shell.getHost() + " : port = " + _shell.getPort()); } @@ -246,6 +261,8 @@ public class Agent implements HandlerFactory, IAgentControl { _resource.stop(); _resource = null; } + + _ugentTaskPool.shutdownNow(); } public Long getId() { @@ -414,6 +431,9 @@ public class Agent implements HandlerFactory, IAgentControl { setLastPingResponseTime(); scheduleWatch(link, response, _pingInterval, _pingInterval); + + _ugentTaskPool.setKeepAliveTime(2* _pingInterval, TimeUnit.MILLISECONDS); + s_logger.info("Startup Response Received: agent id = " + getId()); } @@ -728,7 +748,11 @@ public class Agent implements HandlerFactory, IAgentControl { s_logger.trace("Scheduling " + (_request instanceof Response ? "Ping" : "Watch Task")); } try { - _link.schedule(new ServerHandler(Task.Type.OTHER, _link, _request)); + if (_request instanceof Response) { + _ugentTaskPool.submit(new ServerHandler(Task.Type.OTHER, _link, _request)); + } else { + _link.schedule(new ServerHandler(Task.Type.OTHER, _link, _request)); + } } catch (final ClosedChannelException e) { s_logger.warn("Unable to schedule task because channel is closed"); }