From 5310e66f30fbfcc025b69b50a9a717557ac9eb3c Mon Sep 17 00:00:00 2001 From: Kelven Yang Date: Thu, 6 Mar 2014 16:28:29 -0800 Subject: [PATCH] Do not use row lock in sync-queue scheduling to work around mysql locking issues. --- api/src/com/cloud/vm/VirtualMachine.java | 2 ++ .../jobs/impl/AsyncJobManagerImpl.java | 16 ++++++++++++++ .../jobs/impl/SyncQueueManagerImpl.java | 22 +++++++++---------- 3 files changed, 29 insertions(+), 11 deletions(-) diff --git a/api/src/com/cloud/vm/VirtualMachine.java b/api/src/com/cloud/vm/VirtualMachine.java index dd11a829afa..b085d4a7c4e 100755 --- a/api/src/com/cloud/vm/VirtualMachine.java +++ b/api/src/com/cloud/vm/VirtualMachine.java @@ -119,10 +119,12 @@ public interface VirtualMachine extends RunningOn, ControlledEntity, Identity, I s_fsm.addTransition(State.Error, VirtualMachine.Event.DestroyRequested, State.Expunging); s_fsm.addTransition(State.Error, VirtualMachine.Event.ExpungeOperation, State.Expunging); + s_fsm.addTransition(State.Starting, VirtualMachine.Event.FollowAgentPowerOnReport, State.Running); s_fsm.addTransition(State.Stopping, VirtualMachine.Event.FollowAgentPowerOnReport, State.Running); s_fsm.addTransition(State.Stopped, VirtualMachine.Event.FollowAgentPowerOnReport, State.Running); s_fsm.addTransition(State.Running, VirtualMachine.Event.FollowAgentPowerOnReport, State.Running); s_fsm.addTransition(State.Migrating, VirtualMachine.Event.FollowAgentPowerOnReport, State.Running); + s_fsm.addTransition(State.Starting, VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped); s_fsm.addTransition(State.Stopping, VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped); s_fsm.addTransition(State.Running, VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped); diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java index 9b9460c28c5..49c3032b120 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java @@ -659,8 +659,24 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, private Runnable getHeartbeatTask() { return new ManagedContextRunnable() { + @Override protected void runInContext() { + GlobalLock scanLock = GlobalLock.getInternLock("AsyncJobManagerHeartbeat"); + try { + if (scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) { + try { + reallyRun(); + } finally { + scanLock.unlock(); + } + } + } finally { + scanLock.releaseRef(); + } + } + + protected void reallyRun() { try { List l = _queueMgr.dequeueFromAny(getMsid(), MAX_ONETIME_SCHEDULE_SIZE); if (l != null && l.size() > 0) { diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java index d8e26748e18..5160e05db77 100644 --- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java +++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/SyncQueueManagerImpl.java @@ -83,8 +83,8 @@ public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManage return Transaction.execute(new TransactionCallback() { @Override public SyncQueueItemVO doInTransaction(TransactionStatus status) { - SyncQueueVO queueVO = _syncQueueDao.lockRow(queueId, true); - if (queueVO == null) { + SyncQueueVO queueVO = _syncQueueDao.findById(queueId); + if(queueVO == null) { s_logger.error("Sync queue(id: " + queueId + ") does not exist"); return null; } @@ -139,11 +139,11 @@ public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManage @Override public void doInTransactionWithoutResult(TransactionStatus status) { List l = _syncQueueItemDao.getNextQueueItems(maxItems); - if (l != null && l.size() > 0) { - for (SyncQueueItemVO item : l) { - SyncQueueVO queueVO = _syncQueueDao.lockRow(item.getQueueId(), true); - SyncQueueItemVO itemVO = _syncQueueItemDao.lockRow(item.getId(), true); - if (queueReadyToProcess(queueVO) && itemVO.getLastProcessNumber() == null) { + if(l != null && l.size() > 0) { + for(SyncQueueItemVO item : l) { + SyncQueueVO queueVO = _syncQueueDao.findById(item.getQueueId()); + SyncQueueItemVO itemVO = _syncQueueItemDao.findById(item.getId()); + if(queueReadyToProcess(queueVO) && itemVO.getLastProcessNumber() == null) { Long processNumber = queueVO.getLastProcessNumber(); if (processNumber == null) processNumber = new Long(1); @@ -184,8 +184,8 @@ public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManage @Override public void doInTransactionWithoutResult(TransactionStatus status) { SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId); - if (itemVO != null) { - SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true); + if(itemVO != null) { + SyncQueueVO queueVO = _syncQueueDao.findById(itemVO.getQueueId()); _syncQueueItemDao.expunge(itemVO.getId()); @@ -213,8 +213,8 @@ public class SyncQueueManagerImpl extends ManagerBase implements SyncQueueManage @Override public void doInTransactionWithoutResult(TransactionStatus status) { SyncQueueItemVO itemVO = _syncQueueItemDao.findById(queueItemId); - if (itemVO != null) { - SyncQueueVO queueVO = _syncQueueDao.lockRow(itemVO.getQueueId(), true); + if(itemVO != null) { + SyncQueueVO queueVO = _syncQueueDao.findById(itemVO.getQueueId()); itemVO.setLastProcessMsid(null); itemVO.setLastProcessNumber(null);