CLOUDSTACK-2680: Async job expunge thread - expunge only:

1) Unfinished jobs that are yet to be processed.
2) Completed jobs

The jobs that are in process, will be skipped by the expunge thread

Conflicts:
	server/src/com/cloud/async/dao/AsyncJobDao.java
	server/src/com/cloud/async/dao/AsyncJobDaoImpl.java
	server/src/com/cloud/storage/snapshot/SnapshotManagerImpl.java
This commit is contained in:
Alena Prokharchyk 2013-05-24 14:08:28 -07:00
parent 6f65a5bbe7
commit 2ecf9e3293
3 changed files with 70 additions and 40 deletions

View File

@ -621,11 +621,18 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
// limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute
// hopefully this will be fast enough to balance potential growth of job table
List<AsyncJobVO> l = _jobDao.getExpiredJobs(cutTime, 100);
if(l != null && l.size() > 0) {
for(AsyncJobVO job : l) {
expungeAsyncJob(job);
}
//1) Expire unfinished jobs that weren't processed yet
List<AsyncJobVO> l = _jobDao.getExpiredUnfinishedJobs(cutTime, 100);
for(AsyncJobVO job : l) {
s_logger.trace("Expunging unfinished job " + job);
expungeAsyncJob(job);
}
//2) Expunge finished jobs
List<AsyncJobVO> completedJobs = _jobDao.getExpiredCompletedJobs(cutTime, 100);
for(AsyncJobVO job : completedJobs) {
s_logger.trace("Expunging completed job " + job);
expungeAsyncJob(job);
}
// forcefully cancel blocking queue items if they've been staying there for too long

View File

@ -26,6 +26,7 @@ import com.cloud.utils.db.GenericDao;
public interface AsyncJobDao extends GenericDao<AsyncJobVO, Long> {
AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId);
List<AsyncJobVO> findInstancePendingAsyncJobs(AsyncJob.Type instanceType, Long accountId);
List<AsyncJobVO> getExpiredJobs(Date cutTime, int limit);
List<AsyncJobVO> getExpiredUnfinishedJobs(Date cutTime, int limit);
void resetJobProcess(long msid, int jobResultCode, String jobResultMessage);
}
List<AsyncJobVO> getExpiredCompletedJobs(Date cutTime, int limit);
}

View File

@ -42,17 +42,19 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
private static final Logger s_logger = Logger.getLogger(AsyncJobDaoImpl.class.getName());
private final SearchBuilder<AsyncJobVO> pendingAsyncJobSearch;
private final SearchBuilder<AsyncJobVO> pendingAsyncJobsSearch;
private final SearchBuilder<AsyncJobVO> expiringAsyncJobSearch;
public AsyncJobDaoImpl() {
pendingAsyncJobSearch = createSearchBuilder();
pendingAsyncJobSearch.and("instanceType", pendingAsyncJobSearch.entity().getInstanceType(),
SearchCriteria.Op.EQ);
pendingAsyncJobSearch.and("instanceId", pendingAsyncJobSearch.entity().getInstanceId(),
SearchCriteria.Op.EQ);
pendingAsyncJobSearch.and("status", pendingAsyncJobSearch.entity().getStatus(),
SearchCriteria.Op.EQ);
private final SearchBuilder<AsyncJobVO> pendingAsyncJobsSearch;
private final SearchBuilder<AsyncJobVO> expiringUnfinishedAsyncJobSearch;
private final SearchBuilder<AsyncJobVO> expiringCompletedAsyncJobSearch;
public AsyncJobDaoImpl() {
pendingAsyncJobSearch = createSearchBuilder();
pendingAsyncJobSearch.and("instanceType", pendingAsyncJobSearch.entity().getInstanceType(),
SearchCriteria.Op.EQ);
pendingAsyncJobSearch.and("instanceId", pendingAsyncJobSearch.entity().getInstanceId(),
SearchCriteria.Op.EQ);
pendingAsyncJobSearch.and("status", pendingAsyncJobSearch.entity().getStatus(),
SearchCriteria.Op.EQ);
pendingAsyncJobSearch.done();
pendingAsyncJobsSearch = createSearchBuilder();
@ -64,27 +66,36 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
SearchCriteria.Op.EQ);
pendingAsyncJobsSearch.done();
expiringAsyncJobSearch = createSearchBuilder();
expiringAsyncJobSearch.and("created", expiringAsyncJobSearch.entity().getCreated(),
expiringUnfinishedAsyncJobSearch = createSearchBuilder();
expiringUnfinishedAsyncJobSearch.and("created", expiringUnfinishedAsyncJobSearch.entity().getCreated(),
SearchCriteria.Op.LTEQ);
expiringAsyncJobSearch.done();
}
public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId) {
SearchCriteria<AsyncJobVO> sc = pendingAsyncJobSearch.create();
sc.setParameters("instanceType", instanceType);
sc.setParameters("instanceId", instanceId);
sc.setParameters("status", AsyncJobResult.STATUS_IN_PROGRESS);
List<AsyncJobVO> l = listIncludingRemovedBy(sc);
if(l != null && l.size() > 0) {
if(l.size() > 1) {
s_logger.warn("Instance " + instanceType + "-" + instanceId + " has multiple pending async-job");
}
return l.get(0);
}
return null;
expiringUnfinishedAsyncJobSearch.and("completeMsId", expiringUnfinishedAsyncJobSearch.entity().getCompleteMsid(), SearchCriteria.Op.NULL);
expiringUnfinishedAsyncJobSearch.and("jobStatus", expiringUnfinishedAsyncJobSearch.entity().getStatus(), SearchCriteria.Op.EQ);
expiringUnfinishedAsyncJobSearch.done();
expiringCompletedAsyncJobSearch = createSearchBuilder();
expiringCompletedAsyncJobSearch.and("created", expiringCompletedAsyncJobSearch.entity().getCreated(),
SearchCriteria.Op.LTEQ);
expiringCompletedAsyncJobSearch.and("completeMsId", expiringCompletedAsyncJobSearch.entity().getCompleteMsid(), SearchCriteria.Op.NNULL);
expiringCompletedAsyncJobSearch.and("jobStatus", expiringCompletedAsyncJobSearch.entity().getStatus(), SearchCriteria.Op.NEQ);
expiringCompletedAsyncJobSearch.done();
}
public AsyncJobVO findInstancePendingAsyncJob(String instanceType, long instanceId) {
SearchCriteria<AsyncJobVO> sc = pendingAsyncJobSearch.create();
sc.setParameters("instanceType", instanceType);
sc.setParameters("instanceId", instanceId);
sc.setParameters("status", AsyncJobResult.STATUS_IN_PROGRESS);
List<AsyncJobVO> l = listIncludingRemovedBy(sc);
if(l != null && l.size() > 0) {
if(l.size() > 1) {
s_logger.warn("Instance " + instanceType + "-" + instanceId + " has multiple pending async-job");
}
return l.get(0);
}
return null;
}
public List<AsyncJobVO> findInstancePendingAsyncJobs(AsyncJob.Type instanceType, Long accountId) {
@ -99,9 +110,20 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
return listBy(sc);
}
public List<AsyncJobVO> getExpiredJobs(Date cutTime, int limit) {
SearchCriteria<AsyncJobVO> sc = expiringAsyncJobSearch.create();
@Override
public List<AsyncJobVO> getExpiredUnfinishedJobs(Date cutTime, int limit) {
SearchCriteria<AsyncJobVO> sc = expiringUnfinishedAsyncJobSearch.create();
sc.setParameters("created", cutTime);
sc.setParameters("jobStatus", 0);
Filter filter = new Filter(AsyncJobVO.class, "created", true, 0L, (long)limit);
return listIncludingRemovedBy(sc, filter);
}
@Override
public List<AsyncJobVO> getExpiredCompletedJobs(Date cutTime, int limit) {
SearchCriteria<AsyncJobVO> sc = expiringCompletedAsyncJobSearch.create();
sc.setParameters("created", cutTime);
sc.setParameters("jobStatus", 0);
Filter filter = new Filter(AsyncJobVO.class, "created", true, 0L, (long)limit);
return listIncludingRemovedBy(sc, filter);
}