mirror of
https://github.com/apache/cloudstack.git
synced 2025-10-26 08:42:29 +01:00
CLOUDSTACK-7832: Move some job db update and item purge to
completeAsyncJob transaction to avoid MySQL deadlock.
This commit is contained in:
parent
e427d0004c
commit
ffaabdc13f
@ -18,11 +18,15 @@ package com.cloud.utils.db;
|
|||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
public class Transaction {
|
public class Transaction {
|
||||||
private final static AtomicLong counter = new AtomicLong(0);
|
private final static AtomicLong counter = new AtomicLong(0);
|
||||||
private final static TransactionStatus STATUS = new TransactionStatus() {
|
private final static TransactionStatus STATUS = new TransactionStatus() {
|
||||||
};
|
};
|
||||||
|
|
||||||
|
private static final Logger s_logger = Logger.getLogger(Transaction.class);
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
public static <T, E extends Throwable> T execute(TransactionCallbackWithException<T, E> callback) throws E {
|
public static <T, E extends Throwable> T execute(TransactionCallbackWithException<T, E> callback) throws E {
|
||||||
String name = "tx-" + counter.incrementAndGet();
|
String name = "tx-" + counter.incrementAndGet();
|
||||||
@ -33,6 +37,10 @@ public class Transaction {
|
|||||||
}
|
}
|
||||||
TransactionLegacy txn = TransactionLegacy.open(name, databaseId, false);
|
TransactionLegacy txn = TransactionLegacy.open(name, databaseId, false);
|
||||||
try {
|
try {
|
||||||
|
// if (txn.dbTxnStarted()){
|
||||||
|
// String warnMsg = "Potential Wrong Usage: TRANSACTION.EXECUTE IS WRAPPED INSIDE ANOTHER DB TRANSACTION!";
|
||||||
|
// s_logger.warn(warnMsg, new CloudRuntimeException(warnMsg));
|
||||||
|
// }
|
||||||
txn.start();
|
txn.start();
|
||||||
T result = callback.doInTransaction(STATUS);
|
T result = callback.doInTransaction(STATUS);
|
||||||
txn.commit();
|
txn.commit();
|
||||||
|
|||||||
@ -39,4 +39,22 @@
|
|||||||
<version>${project.version}</version>
|
<version>${project.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<artifactId>maven-surefire-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<skipTests>true</skipTests>
|
||||||
|
</configuration>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<phase>integration-test</phase>
|
||||||
|
<goals>
|
||||||
|
<goal>test</goal>
|
||||||
|
</goals>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
</project>
|
</project>
|
||||||
|
|||||||
@ -30,6 +30,9 @@ import org.apache.log4j.Logger;
|
|||||||
|
|
||||||
import org.apache.cloudstack.framework.serializer.MessageSerializer;
|
import org.apache.cloudstack.framework.serializer.MessageSerializer;
|
||||||
|
|
||||||
|
import com.cloud.utils.db.TransactionLegacy;
|
||||||
|
import com.cloud.utils.exception.CloudRuntimeException;
|
||||||
|
|
||||||
public class MessageBusBase implements MessageBus {
|
public class MessageBusBase implements MessageBus {
|
||||||
|
|
||||||
private final Gate _gate;
|
private final Gate _gate;
|
||||||
@ -158,7 +161,11 @@ public class MessageBusBase implements MessageBus {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void publish(String senderAddress, String subject, PublishScope scope, Object args) {
|
public void publish(String senderAddress, String subject, PublishScope scope, Object args) {
|
||||||
|
// publish cannot be in DB transaction, which may hold DB lock too long, and we are guarding this here
|
||||||
|
if (!noDbTxn()){
|
||||||
|
String errMsg = "NO EVENT PUBLISH CAN BE WRAPPED WITHIN DB TRANSACTION!";
|
||||||
|
s_logger.error(errMsg, new CloudRuntimeException(errMsg));
|
||||||
|
}
|
||||||
if (_gate.enter(true)) {
|
if (_gate.enter(true)) {
|
||||||
if (s_logger.isTraceEnabled()) {
|
if (s_logger.isTraceEnabled()) {
|
||||||
s_logger.trace("Enter gate in message bus publish");
|
s_logger.trace("Enter gate in message bus publish");
|
||||||
@ -256,6 +263,11 @@ public class MessageBusBase implements MessageBus {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean noDbTxn() {
|
||||||
|
TransactionLegacy txn = TransactionLegacy.currentTxn();
|
||||||
|
return !txn.dbTxnStarted();
|
||||||
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// Support inner classes
|
// Support inner classes
|
||||||
//
|
//
|
||||||
|
|||||||
@ -258,6 +258,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
|
|||||||
}
|
}
|
||||||
|
|
||||||
job.setLastUpdated(DateUtil.currentGMTTime());
|
job.setLastUpdated(DateUtil.currentGMTTime());
|
||||||
|
job.setExecutingMsid(null);
|
||||||
_jobDao.update(jobId, job);
|
_jobDao.update(jobId, job);
|
||||||
|
|
||||||
if (s_logger.isDebugEnabled()) {
|
if (s_logger.isDebugEnabled()) {
|
||||||
@ -266,6 +267,11 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
|
|||||||
List<Long> wakeupList = wakeupByJoinedJobCompletion(jobId);
|
List<Long> wakeupList = wakeupByJoinedJobCompletion(jobId);
|
||||||
_joinMapDao.disjoinAllJobs(jobId);
|
_joinMapDao.disjoinAllJobs(jobId);
|
||||||
|
|
||||||
|
// purge the job sync item from queue
|
||||||
|
if (job.getSyncSource() != null) {
|
||||||
|
_queueMgr.purgeItem(job.getSyncSource().getId());
|
||||||
|
}
|
||||||
|
|
||||||
return wakeupList;
|
return wakeupList;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -527,12 +533,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
|
|||||||
} finally {
|
} finally {
|
||||||
// guard final clause as well
|
// guard final clause as well
|
||||||
try {
|
try {
|
||||||
AsyncJobVO jobToUpdate = _jobDao.findById(job.getId());
|
|
||||||
jobToUpdate.setExecutingMsid(null);
|
|
||||||
_jobDao.update(job.getId(), jobToUpdate);
|
|
||||||
|
|
||||||
if (job.getSyncSource() != null) {
|
if (job.getSyncSource() != null) {
|
||||||
_queueMgr.purgeItem(job.getSyncSource().getId());
|
|
||||||
checkQueue(job.getSyncSource().getQueueId());
|
checkQueue(job.getSyncSource().getQueueId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user