Seperate job executor pools to avoid thread starvation situation.

This commit is contained in:
Kelven Yang 2014-03-06 10:36:39 -08:00
parent 6ad245e675
commit 8e27120be4
3 changed files with 18 additions and 8 deletions

View File

@ -27,7 +27,8 @@ import com.cloud.utils.component.Manager;
public interface AsyncJobManager extends Manager {
public static final String JOB_POOL_THREAD_PREFIX = "Job-Executor";
public static final String API_JOB_POOL_THREAD_PREFIX = "API-Job-Executor";
public static final String WORK_JOB_POOL_THREAD_PREFIX = "Work-Job-Executor";
AsyncJobVO getAsyncJob(long jobId);

View File

@ -117,7 +117,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
private volatile long _executionRunNumber = 1;
private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat"));
private ExecutorService _executor;
private ExecutorService _apiJobExecutor;
private ExecutorService _workerJobExecutor;
@Override
public String getConfigComponentName() {
@ -390,7 +391,10 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
if (executeInContext) {
runnable.run();
} else {
_executor.submit(runnable);
if (job.getDispatcher() == null || job.getDispatcher().equalsIgnoreCase("ApiAsyncJobDispatcher"))
_apiJobExecutor.submit(runnable);
else
_workerJobExecutor.submit(runnable);
}
}
@ -855,10 +859,14 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
final Properties dbProps = DbProperties.getDbProperties();
final int cloudMaxActive = Integer.parseInt(dbProps.getProperty("db.cloud.maxActive"));
int poolSize = (cloudMaxActive * 2) / 3;
int apiPoolSize = cloudMaxActive / 2;
int workPoolSize = (cloudMaxActive * 2) / 3;
s_logger.info("Start AsyncJobManager thread pool in size " + poolSize);
_executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory(AsyncJobManager.JOB_POOL_THREAD_PREFIX));
s_logger.info("Start AsyncJobManager API executor thread pool in size " + apiPoolSize);
_apiJobExecutor = Executors.newFixedThreadPool(apiPoolSize, new NamedThreadFactory(AsyncJobManager.API_JOB_POOL_THREAD_PREFIX));
s_logger.info("Start AsyncJobManager Work executor thread pool in size " + workPoolSize);
_workerJobExecutor = Executors.newFixedThreadPool(workPoolSize, new NamedThreadFactory(AsyncJobManager.WORK_JOB_POOL_THREAD_PREFIX));
} catch (final Exception e) {
throw new ConfigurationException("Unable to load db.properties to configure AsyncJobManagerImpl");
}
@ -941,7 +949,8 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
@Override
public boolean stop() {
_heartbeatScheduler.shutdown();
_executor.shutdown();
_apiJobExecutor.shutdown();
_workerJobExecutor.shutdown();
return true;
}

View File

@ -115,7 +115,7 @@ public class AsyncJobMonitor extends ManagerBase {
assert (_activeTasks.get(runNumber) == null);
long threadId = Thread.currentThread().getId();
boolean fromPoolThread = Thread.currentThread().getName().contains(AsyncJobManager.JOB_POOL_THREAD_PREFIX);
boolean fromPoolThread = Thread.currentThread().getName().contains(AsyncJobManager.API_JOB_POOL_THREAD_PREFIX);
ActiveTaskRecord record = new ActiveTaskRecord(jobId, threadId, fromPoolThread);
_activeTasks.put(runNumber, record);
if (fromPoolThread)