put pingtask is in a seperate thread pool

This commit is contained in:
Edison Su 2011-05-20 10:50:59 -04:00
parent 501c46bbd5
commit 516ef2c67f

View File

@ -28,6 +28,10 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.naming.ConfigurationException; import javax.naming.ConfigurationException;
@ -52,6 +56,7 @@ import com.cloud.exception.AgentControlChannelException;
import com.cloud.resource.ServerResource; import com.cloud.resource.ServerResource;
import com.cloud.utils.PropertiesUtil; import com.cloud.utils.PropertiesUtil;
import com.cloud.utils.backoff.BackoffAlgorithm; import com.cloud.utils.backoff.BackoffAlgorithm;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.nio.HandlerFactory; import com.cloud.utils.nio.HandlerFactory;
import com.cloud.utils.nio.Link; import com.cloud.utils.nio.Link;
@ -112,6 +117,8 @@ public class Agent implements HandlerFactory, IAgentControl {
StartupTask _startup = null; StartupTask _startup = null;
boolean _reconnectAllowed = true; boolean _reconnectAllowed = true;
//For time sentitive task, e.g. PingTask
private ThreadPoolExecutor _ugentTaskPool;
// for simulator use only // for simulator use only
public Agent(IAgentShell shell) { public Agent(IAgentShell shell) {
@ -121,6 +128,10 @@ public class Agent implements HandlerFactory, IAgentControl {
_connection = new NioClient("Agent", _shell.getHost(), _shell.getPort(), _shell.getWorkers(), this); _connection = new NioClient("Agent", _shell.getHost(), _shell.getPort(), _shell.getWorkers(), this);
Runtime.getRuntime().addShutdownHook(new ShutdownThread(this)); Runtime.getRuntime().addShutdownHook(new ShutdownThread(this));
_ugentTaskPool = new ThreadPoolExecutor(shell.getPingRetries(), 2 * shell.getPingRetries(), 10, TimeUnit.MINUTES,
new SynchronousQueue<Runnable>(), new NamedThreadFactory("UgentTask")
);
} }
public Agent(IAgentShell shell, int localAgentId, ServerResource resource) throws ConfigurationException { public Agent(IAgentShell shell, int localAgentId, ServerResource resource) throws ConfigurationException {
@ -152,6 +163,10 @@ public class Agent implements HandlerFactory, IAgentControl {
s_logger.debug("Adding shutdown hook"); s_logger.debug("Adding shutdown hook");
Runtime.getRuntime().addShutdownHook(new ShutdownThread(this)); Runtime.getRuntime().addShutdownHook(new ShutdownThread(this));
_ugentTaskPool = new ThreadPoolExecutor(shell.getPingRetries(), 2 * shell.getPingRetries(), 10, TimeUnit.MINUTES,
new SynchronousQueue<Runnable>(), new NamedThreadFactory("UgentTask")
);
s_logger.info("Agent [id = " + (_id != null ? _id : "new") + " : type = " + getResourceName() + " : zone = " + _shell.getZone() + " : pod = " + _shell.getPod() + " : workers = " s_logger.info("Agent [id = " + (_id != null ? _id : "new") + " : type = " + getResourceName() + " : zone = " + _shell.getZone() + " : pod = " + _shell.getPod() + " : workers = "
+ _shell.getWorkers() + " : host = " + _shell.getHost() + " : port = " + _shell.getPort()); + _shell.getWorkers() + " : host = " + _shell.getHost() + " : port = " + _shell.getPort());
} }
@ -246,6 +261,8 @@ public class Agent implements HandlerFactory, IAgentControl {
_resource.stop(); _resource.stop();
_resource = null; _resource = null;
} }
_ugentTaskPool.shutdownNow();
} }
public Long getId() { public Long getId() {
@ -414,6 +431,9 @@ public class Agent implements HandlerFactory, IAgentControl {
setLastPingResponseTime(); setLastPingResponseTime();
scheduleWatch(link, response, _pingInterval, _pingInterval); scheduleWatch(link, response, _pingInterval, _pingInterval);
_ugentTaskPool.setKeepAliveTime(2* _pingInterval, TimeUnit.MILLISECONDS);
s_logger.info("Startup Response Received: agent id = " + getId()); s_logger.info("Startup Response Received: agent id = " + getId());
} }
@ -728,7 +748,11 @@ public class Agent implements HandlerFactory, IAgentControl {
s_logger.trace("Scheduling " + (_request instanceof Response ? "Ping" : "Watch Task")); s_logger.trace("Scheduling " + (_request instanceof Response ? "Ping" : "Watch Task"));
} }
try { try {
_link.schedule(new ServerHandler(Task.Type.OTHER, _link, _request)); if (_request instanceof Response) {
_ugentTaskPool.submit(new ServerHandler(Task.Type.OTHER, _link, _request));
} else {
_link.schedule(new ServerHandler(Task.Type.OTHER, _link, _request));
}
} catch (final ClosedChannelException e) { } catch (final ClosedChannelException e) {
s_logger.warn("Unable to schedule task because channel is closed"); s_logger.warn("Unable to schedule task because channel is closed");
} }