From 248e4fbdacb1372da94cbcf49ca23a14c8c9b514 Mon Sep 17 00:00:00 2001 From: Min Chen Date: Thu, 16 Oct 2014 18:15:50 -0700 Subject: [PATCH] CLOUDSTACK-7749: AsyncJob GC thread cannot purge queue items that have been blocking for too long if exception is thrown in expunging some unfinished or completed old jobs, this will make some future jobs stuck. --- .../jobs/dao/SyncQueueItemDaoImpl.java | 2 +- .../jobs/impl/AsyncJobManagerImpl.java | 71 +++++++++++-------- 2 files changed, 42 insertions(+), 31 deletions(-) diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java index 41f14190f36..167d9f511ed 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java @@ -147,7 +147,7 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase SearchBuilder sbItem = createSearchBuilder(); sbItem.and("lastProcessMsid", sbItem.entity().getLastProcessMsid(), SearchCriteria.Op.NNULL); sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessNumber(), SearchCriteria.Op.NNULL); - sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.NNULL); + sbItem.and("lastProcessTime", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.NNULL); sbItem.and("lastProcessTime2", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.LT); sbItem.done(); diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java index 7e65ede3e53..04fab24d7ae 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java @@ -769,46 +769,57 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, public void reallyRun() { try { - s_logger.trace("Begin cleanup expired async-jobs"); - - Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - JobExpireMinutes.value() * 60000); - - // limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute - // hopefully this will be fast enough to balance potential growth of job table - // 1) Expire unfinished jobs that weren't processed yet - List unfinishedJobs = _jobDao.getExpiredUnfinishedJobs(cutTime, 100); - for (AsyncJobVO job : unfinishedJobs) { - s_logger.info("Expunging unfinished job " + job); - - _jobMonitor.unregisterByJobId(job.getId()); - expungeAsyncJob(job); - } - - // 2) Expunge finished jobs - List completedJobs = _jobDao.getExpiredCompletedJobs(cutTime, 100); - for (AsyncJobVO job : completedJobs) { - s_logger.trace("Expunging completed job " + job); - - expungeAsyncJob(job); - } + s_logger.info("Begin cleanup expired async-jobs"); // forcefully cancel blocking queue items if they've been staying there for too long List blockItems = _queueMgr.getBlockedQueueItems(JobCancelThresholdMinutes.value() * 60000, false); if (blockItems != null && blockItems.size() > 0) { for (SyncQueueItemVO item : blockItems) { - if (item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) { - s_logger.info("Remove Job-" + item.getContentId() + " from Queue-" + item.getId() + " since it has been blocked for too long"); - completeAsyncJob(item.getContentId(), JobInfo.Status.FAILED, 0, "Job is cancelled as it has been blocking others for too long"); + try { + if (item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) { + s_logger.info("Remove Job-" + item.getContentId() + " from Queue-" + item.getId() + " since it has been blocked for too long"); + completeAsyncJob(item.getContentId(), JobInfo.Status.FAILED, 0, "Job is cancelled as it has been blocking others for too long"); - _jobMonitor.unregisterByJobId(item.getContentId()); + _jobMonitor.unregisterByJobId(item.getContentId()); + } + + // purge the item and resume queue processing + _queueMgr.purgeItem(item.getId()); + } catch (Throwable e) { + s_logger.error("Unexpected exception when trying to remove job from sync queue, ", e); } - - // purge the item and resume queue processing - _queueMgr.purgeItem(item.getId()); } } - s_logger.trace("End cleanup expired async-jobs"); + Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - JobExpireMinutes.value() * 60000); + // limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute + // hopefully this will be fast enough to balance potential growth of job table + // 1) Expire unfinished jobs that weren't processed yet + List unfinishedJobs = _jobDao.getExpiredUnfinishedJobs(cutTime, 100); + for (AsyncJobVO job : unfinishedJobs) { + try { + s_logger.info("Expunging unfinished job-" + job.getId()); + + _jobMonitor.unregisterByJobId(job.getId()); + expungeAsyncJob(job); + } catch (Throwable e) { + s_logger.error("Unexpected exception when trying to expunge job-" + job.getId(), e); + } + } + + // 2) Expunge finished jobs + List completedJobs = _jobDao.getExpiredCompletedJobs(cutTime, 100); + for (AsyncJobVO job : completedJobs) { + try { + s_logger.info("Expunging completed job-" + job.getId()); + + expungeAsyncJob(job); + } catch (Throwable e) { + s_logger.error("Unexpected exception when trying to expunge job-" + job.getId(), e); + } + } + + s_logger.info("End cleanup expired async-jobs"); } catch (Throwable e) { s_logger.error("Unexpected exception when trying to execute queue item, ", e); }