CLOUDSTACK-7749: AsyncJob GC thread cannot purge queue items that have been blocking for too long if exception is thrown in expunging some unfinished or completed old jobs, this will make some future jobs stuck.

This commit is contained in:
Min Chen 2014-10-16 18:15:50 -07:00
parent a1b913db2a
commit 248e4fbdac
2 changed files with 42 additions and 31 deletions

View File

@ -147,7 +147,7 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long>
SearchBuilder<SyncQueueItemVO> sbItem = createSearchBuilder();
sbItem.and("lastProcessMsid", sbItem.entity().getLastProcessMsid(), SearchCriteria.Op.NNULL);
sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessNumber(), SearchCriteria.Op.NNULL);
sbItem.and("lastProcessNumber", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.NNULL);
sbItem.and("lastProcessTime", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.NNULL);
sbItem.and("lastProcessTime2", sbItem.entity().getLastProcessTime(), SearchCriteria.Op.LT);
sbItem.done();

View File

@ -769,46 +769,57 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
public void reallyRun() {
try {
s_logger.trace("Begin cleanup expired async-jobs");
Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - JobExpireMinutes.value() * 60000);
// limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute
// hopefully this will be fast enough to balance potential growth of job table
// 1) Expire unfinished jobs that weren't processed yet
List<AsyncJobVO> unfinishedJobs = _jobDao.getExpiredUnfinishedJobs(cutTime, 100);
for (AsyncJobVO job : unfinishedJobs) {
s_logger.info("Expunging unfinished job " + job);
_jobMonitor.unregisterByJobId(job.getId());
expungeAsyncJob(job);
}
// 2) Expunge finished jobs
List<AsyncJobVO> completedJobs = _jobDao.getExpiredCompletedJobs(cutTime, 100);
for (AsyncJobVO job : completedJobs) {
s_logger.trace("Expunging completed job " + job);
expungeAsyncJob(job);
}
s_logger.info("Begin cleanup expired async-jobs");
// forcefully cancel blocking queue items if they've been staying there for too long
List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(JobCancelThresholdMinutes.value() * 60000, false);
if (blockItems != null && blockItems.size() > 0) {
for (SyncQueueItemVO item : blockItems) {
if (item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
s_logger.info("Remove Job-" + item.getContentId() + " from Queue-" + item.getId() + " since it has been blocked for too long");
completeAsyncJob(item.getContentId(), JobInfo.Status.FAILED, 0, "Job is cancelled as it has been blocking others for too long");
try {
if (item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
s_logger.info("Remove Job-" + item.getContentId() + " from Queue-" + item.getId() + " since it has been blocked for too long");
completeAsyncJob(item.getContentId(), JobInfo.Status.FAILED, 0, "Job is cancelled as it has been blocking others for too long");
_jobMonitor.unregisterByJobId(item.getContentId());
_jobMonitor.unregisterByJobId(item.getContentId());
}
// purge the item and resume queue processing
_queueMgr.purgeItem(item.getId());
} catch (Throwable e) {
s_logger.error("Unexpected exception when trying to remove job from sync queue, ", e);
}
// purge the item and resume queue processing
_queueMgr.purgeItem(item.getId());
}
}
s_logger.trace("End cleanup expired async-jobs");
Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - JobExpireMinutes.value() * 60000);
// limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute
// hopefully this will be fast enough to balance potential growth of job table
// 1) Expire unfinished jobs that weren't processed yet
List<AsyncJobVO> unfinishedJobs = _jobDao.getExpiredUnfinishedJobs(cutTime, 100);
for (AsyncJobVO job : unfinishedJobs) {
try {
s_logger.info("Expunging unfinished job-" + job.getId());
_jobMonitor.unregisterByJobId(job.getId());
expungeAsyncJob(job);
} catch (Throwable e) {
s_logger.error("Unexpected exception when trying to expunge job-" + job.getId(), e);
}
}
// 2) Expunge finished jobs
List<AsyncJobVO> completedJobs = _jobDao.getExpiredCompletedJobs(cutTime, 100);
for (AsyncJobVO job : completedJobs) {
try {
s_logger.info("Expunging completed job-" + job.getId());
expungeAsyncJob(job);
} catch (Throwable e) {
s_logger.error("Unexpected exception when trying to expunge job-" + job.getId(), e);
}
}
s_logger.info("End cleanup expired async-jobs");
} catch (Throwable e) {
s_logger.error("Unexpected exception when trying to execute queue item, ", e);
}