Do app-level iteration to avoid mysql deadlocks

This commit is contained in:
Kelven Yang 2014-04-02 15:46:50 -07:00
parent 66486d4322
commit cd8801f6f7
2 changed files with 28 additions and 30 deletions

View File

@ -1,4 +1,4 @@
// Licensed to the Apache Software Foundation (ASF) under one
// Licensed to the Apacohe Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
@ -569,7 +569,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
@Override
public boolean start() {
// TODO, initial delay is hardcoded
_executor.scheduleAtFixedRate(new CleanupTask(), 5000, VmJobStateReportInterval.value(), TimeUnit.SECONDS);
_executor.scheduleAtFixedRate(new CleanupTask(), 5, VmJobStateReportInterval.value(), TimeUnit.SECONDS);
_executor.scheduleAtFixedRate(new TransitionTask(), VmOpCleanupInterval.value(), VmOpCleanupInterval.value(), TimeUnit.SECONDS);
cancelWorkItems(_nodeId);

View File

@ -20,9 +20,10 @@ import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import org.apache.log4j.Logger;
import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO.Step;
@ -41,9 +42,14 @@ import com.cloud.utils.db.TransactionStatus;
import com.cloud.vm.VirtualMachine;
public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implements VmWorkJobDao {
private static final Logger s_logger = Logger.getLogger(VmWorkJobDaoImpl.class);
protected SearchBuilder<VmWorkJobVO> PendingWorkJobSearch;
protected SearchBuilder<VmWorkJobVO> PendingWorkJobByCommandSearch;
protected SearchBuilder<VmWorkJobVO> ExpungingWorkJobSearch;
@Inject
protected AsyncJobDao _baseJobDao;
public VmWorkJobDaoImpl() {
}
@ -63,6 +69,12 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
PendingWorkJobByCommandSearch.and("step", PendingWorkJobByCommandSearch.entity().getStep(), Op.NEQ);
PendingWorkJobByCommandSearch.and("cmd", PendingWorkJobByCommandSearch.entity().getCmd(), Op.EQ);
PendingWorkJobByCommandSearch.done();
ExpungingWorkJobSearch = createSearchBuilder();
ExpungingWorkJobSearch.and("jobStatus", ExpungingWorkJobSearch.entity().getStatus(), Op.NEQ);
ExpungingWorkJobSearch.and("cutDate", ExpungingWorkJobSearch.entity().getLastUpdated(), Op.LT);
ExpungingWorkJobSearch.and("dispatcher", ExpungingWorkJobSearch.entity().getDispatcher(), Op.EQ);
ExpungingWorkJobSearch.done();
}
@Override
@ -124,33 +136,19 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
expunge(sc);
*/
Transaction.execute(new TransactionCallbackNoReturn() {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
TransactionLegacy txn = TransactionLegacy.currentTxn();
PreparedStatement pstmt = null;
try {
pstmt = txn.prepareAutoCloseStatement(
"DELETE FROM vm_work_job WHERE id IN (SELECT id FROM async_job WHERE job_dispatcher='VmWorkJobDispatcher' AND job_status != 0 AND last_updated < ?)");
pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
pstmt.execute();
} catch (SQLException e) {
} catch (Throwable e) {
}
try {
pstmt = txn.prepareAutoCloseStatement(
"DELETE FROM async_job WHERE job_dispatcher='VmWorkJobDispatcher' AND job_status != 0 AND last_updated < ?");
pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutDate));
pstmt.execute();
} catch (SQLException e) {
} catch (Throwable e) {
}
}
});
// loop at application level to avoid mysql deadlock issues
SearchCriteria<VmWorkJobVO> sc = ExpungingWorkJobSearch.create();
sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
sc.setParameters("lastUpdated", cutDate);
sc.setParameters("dispatcher", "VmWorkJobDispatcher");
List<VmWorkJobVO> expungeList = listBy(sc);
for (VmWorkJobVO job : expungeList) {
if (s_logger.isDebugEnabled())
s_logger.debug("Expunge completed work job-" + job.getId());
expunge(job.getId());
_baseJobDao.expunge(job.getId());
}
}
@Override