CS-16611: when expunge Async job, expunge corresponding sync queue items

Conflicts:

	server/src/com/cloud/async/AsyncJobManagerImpl.java
	server/src/com/cloud/async/dao/SyncQueueItemDao.java
This commit is contained in:
Alena Prokharchyk 2012-10-31 16:43:51 -07:00
parent fe41325e96
commit 013102c028
6 changed files with 116 additions and 64 deletions

View File

@ -16,7 +16,9 @@
// under the License.
package com.cloud.async;
public interface SyncQueueItem {
public final String AsyncJobContentType = "AsyncJob";
String getContentType();

View File

@ -270,7 +270,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
Random random = new Random();
for(int i = 0; i < 5; i++) {
queue = _queueMgr.queue(syncObjType, syncObjId, "AsyncJob", job.getId(), queueSizeLimit);
queue = _queueMgr.queue(syncObjType, syncObjId, SyncQueueItem.AsyncJobContentType, job.getId(), queueSizeLimit);
if(queue != null) {
break;
}
@ -598,60 +598,73 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
return new Runnable() {
@Override
public void run() {
GlobalLock scanLock = GlobalLock.getInternLock("AsyncJobManagerGC");
try {
if(scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
try {
reallyRun();
} finally {
scanLock.unlock();
}
}
} finally {
scanLock.releaseRef();
}
}
private void reallyRun() {
try {
s_logger.trace("Begin cleanup expired async-jobs");
Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - _jobExpireSeconds*1000);
// limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute
// hopefully this will be fast enough to balance potential growth of job table
List<AsyncJobVO> l = _jobDao.getExpiredJobs(cutTime, 100);
if(l != null && l.size() > 0) {
for(AsyncJobVO job : l) {
_jobDao.expunge(job.getId());
}
}
// forcely cancel blocking queue items if they've been staying there for too long
List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(_jobCancelThresholdSeconds*1000, false);
if(blockItems != null && blockItems.size() > 0) {
for(SyncQueueItemVO item : blockItems) {
if(item.getContentType().equalsIgnoreCase("AsyncJob")) {
completeAsyncJob(item.getContentId(), AsyncJobResult.STATUS_FAILED, 0, getResetResultResponse("Job is cancelled as it has been blocking others for too long"));
GlobalLock scanLock = GlobalLock.getInternLock("AsyncJobManagerGC");
try {
if(scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
try {
reallyRun();
} finally {
scanLock.unlock();
}
}
} finally {
scanLock.releaseRef();
}
}
public void reallyRun() {
try {
s_logger.trace("Begin cleanup expired async-jobs");
Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - _jobExpireSeconds*1000);
// limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute
// hopefully this will be fast enough to balance potential growth of job table
List<AsyncJobVO> l = _jobDao.getExpiredJobs(cutTime, 100);
if(l != null && l.size() > 0) {
for(AsyncJobVO job : l) {
expungeAsyncJob(job);
}
}
// forcefully cancel blocking queue items if they've been staying there for too long
List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(_jobCancelThresholdSeconds*1000, false);
if(blockItems != null && blockItems.size() > 0) {
for(SyncQueueItemVO item : blockItems) {
if(item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
completeAsyncJob(item.getContentId(), AsyncJobResult.STATUS_FAILED, 0,
getResetResultResponse("Job is cancelled as it has been blocking others for too long"));
}
// purge the item and resume queue processing
_queueMgr.purgeItem(item.getId());
}
}
s_logger.trace("End cleanup expired async-jobs");
} catch(Throwable e) {
s_logger.error("Unexpected exception when trying to execute queue item, ", e);
} finally {
StackMaid.current().exitCleanup();
}
}
};
}
private long getMsid() {
if(_clusterMgr != null) {
// purge the item and resume queue processing
_queueMgr.purgeItem(item.getId());
}
}
s_logger.trace("End cleanup expired async-jobs");
} catch(Throwable e) {
s_logger.error("Unexpected exception when trying to execute queue item, ", e);
} finally {
StackMaid.current().exitCleanup();
}
}
};
}
@DB
protected void expungeAsyncJob(AsyncJobVO job) {
Transaction txn = Transaction.currentTxn();
txn.start();
_jobDao.expunge(job.getId());
//purge corresponding sync queue item
_queueMgr.purgeAsyncJobQueueItemId(job.getId());
txn.commit();
}
private long getMsid() {
if(_clusterMgr != null) {
return _clusterMgr.getManagementNodeId();
}
@ -666,7 +679,7 @@ public class AsyncJobManagerImpl implements AsyncJobManager, ClusterManagerListe
}
String contentType = item.getContentType();
if(contentType != null && contentType.equals("AsyncJob")) {
if(contentType != null && contentType.equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
Long jobId = item.getContentId();
if(jobId != null) {
s_logger.warn("Mark job as failed as its correspoding queue-item has been discarded. job id: " + jobId);

View File

@ -30,4 +30,6 @@ public interface SyncQueueManager extends Manager {
public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive);
public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive);
void purgeAsyncJobQueueItemId(long asyncJobId);
}

View File

@ -185,13 +185,16 @@ public class SyncQueueManagerImpl implements SyncQueueManager {
if(itemVO != null) {
SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true);
_syncQueueItemDao.expunge(itemVO.getId());
queueVO.setLastUpdated(DateUtil.currentGMTTime());
//decrement the count
assert (queueVO.getQueueSize() > 0) : "Count reduce happens when it's already <= 0!";
queueVO.setQueueSize(queueVO.getQueueSize() - 1);
_syncQueueDao.update(queueVO.getId(), queueVO);
_syncQueueItemDao.expunge(itemVO.getId());
//if item is active, reset queue information
if (itemVO.getLastProcessMsid() != null) {
queueVO.setLastUpdated(DateUtil.currentGMTTime());
//decrement the count
assert (queueVO.getQueueSize() > 0) : "Count reduce happens when it's already <= 0!";
queueVO.setQueueSize(queueVO.getQueueSize() - 1);
_syncQueueDao.update(queueVO.getId(), queueVO);
}
}
txt.commit();
} catch(Exception e) {
@ -273,5 +276,13 @@ public class SyncQueueManagerImpl implements SyncQueueManager {
private boolean queueReadyToProcess(SyncQueueVO queueVO) {
return queueVO.getQueueSize() < queueVO.getQueueSizeLimit();
}
@Override
public void purgeAsyncJobQueueItemId(long asyncJobId) {
Long itemId = _syncQueueItemDao.getQueueItemIdByContentIdAndType(asyncJobId, SyncQueueItem.AsyncJobContentType);
if (itemId != null) {
purgeItem(itemId);
}
}
}
}

View File

@ -26,4 +26,5 @@ public interface SyncQueueItemDao extends GenericDao<SyncQueueItemVO, Long> {
public List<SyncQueueItemVO> getNextQueueItems(int maxItems);
public List<SyncQueueItemVO> getActiveQueueItems(Long msid, boolean exclusive);
public List<SyncQueueItemVO> getBlockedQueueItems(long thresholdMs, boolean exclusive);
}
public Long getQueueItemIdByContentIdAndType(long contentId, String contentType);
}

View File

@ -33,13 +33,25 @@ import com.cloud.async.SyncQueueItemVO;
import com.cloud.utils.DateUtil;
import com.cloud.utils.db.Filter;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.GenericSearchBuilder;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.Transaction;
@Local(value = { SyncQueueItemDao.class })
public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long> implements SyncQueueItemDao {
private static final Logger s_logger = Logger.getLogger(SyncQueueItemDaoImpl.class);
final GenericSearchBuilder<SyncQueueItemVO, Long> queueIdSearch;
protected SyncQueueItemDaoImpl() {
super();
queueIdSearch = createSearchBuilder(Long.class);
queueIdSearch.and("contentId", queueIdSearch.entity().getContentId(), Op.EQ);
queueIdSearch.and("contentType", queueIdSearch.entity().getContentType(), Op.EQ);
queueIdSearch.selectField(queueIdSearch.entity().getId());
queueIdSearch.done();
}
@Override
@ -132,4 +144,15 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long>
return lockRows(sc, null, true);
return listBy(sc, null);
}
@Override
public Long getQueueItemIdByContentIdAndType(long contentId, String contentType) {
SearchCriteria<Long> sc = queueIdSearch.create();
sc.setParameters("contentId", contentId);
sc.setParameters("contentType", contentType);
List<Long> id = customSearch(sc, null);
return id.size() == 0 ? null : id.get(0);
}
}