diff --git a/server/src/com/cloud/configuration/Config.java b/server/src/com/cloud/configuration/Config.java index 8e650a9f837..9d3b7801aae 100755 --- a/server/src/com/cloud/configuration/Config.java +++ b/server/src/com/cloud/configuration/Config.java @@ -67,6 +67,7 @@ public enum Config { VmNetworkThrottlingRate("Network", ManagementServer.class, Integer.class, "vm.network.throttling.rate", "200", "Default data transfer rate in megabits per second allowed in User vm's default network.", null), SecurityGroupWorkCleanupInterval("Network", ManagementServer.class, Integer.class, "network.securitygroups.work.cleanup.interval", "120", "Time interval (seconds) in which finished work is cleaned up from the work table", null), SecurityGroupWorkerThreads("Network", ManagementServer.class, Integer.class, "network.securitygroups.workers.pool.size", "50", "Number of worker threads processing the security group update work queue", null), + SecurityGroupWorkGlobalLockTimeout("Network", ManagementServer.class, Integer.class, "network.securitygroups.work.lock.timeout", "300", "Lock wait timeout (seconds) while updating the security group work queue", null), // Usage CapacityCheckPeriod("Usage", ManagementServer.class, Integer.class, "capacity.check.period", "300000", "The interval in milliseconds between capacity checks", null), diff --git a/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java b/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java index 132cffcbdb7..664b34fafad 100755 --- a/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java +++ b/server/src/com/cloud/network/security/SecurityGroupManagerImpl.java @@ -86,6 +86,7 @@ import com.cloud.utils.component.Manager; import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.db.DB; import com.cloud.utils.db.Filter; +import com.cloud.utils.db.GlobalLock; import com.cloud.utils.db.JoinBuilder; import com.cloud.utils.db.SearchBuilder; import com.cloud.utils.db.SearchCriteria; @@ -149,6 +150,11 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG private int _timeBetweenCleanups = TIME_BETWEEN_CLEANUPS; // seconds private int _numWorkerThreads = WORKER_THREAD_COUNT; + private int _globalWorkLockTimeout = 300; // 5 minutes + + private final GlobalLock _workLock = GlobalLock.getInternLock("SecurityGroupWork"); + + SecurityGroupListener _answerListener; @@ -368,57 +374,69 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG if (s_logger.isTraceEnabled()) { s_logger.trace("Security Group Mgr: scheduling ruleset updates for " + affectedVms.size() + " vms"); } - - for (Long vmId : affectedVms) { + boolean locked = _workLock.lock(_globalWorkLockTimeout); + if (locked) { if (s_logger.isTraceEnabled()) { - s_logger.trace("Security Group Mgr: scheduling ruleset updates for " + vmId); + s_logger.trace("Security Group Mgr: acquired global work lock"); } - VmRulesetLogVO log = null; - SecurityGroupWorkVO work = null; - UserVm vm = null; - Transaction txn = null; try { - txn = Transaction.currentTxn(); - txn.start(); - - vm = _userVMDao.acquireInLockTable(vmId); - if (vm == null) { - s_logger.warn("Failed to acquire lock on vm id " + vmId); - continue; - } - log = _rulesetLogDao.findByVmId(vmId); - if (log == null) { - log = new VmRulesetLogVO(vmId); - log = _rulesetLogDao.persist(log); - } - - if (log != null && updateSeqno) { - log.incrLogsequence(); - _rulesetLogDao.update(log.getId(), log); - } - work = _workDao.findByVmIdStep(vmId, Step.Scheduled); - if (work == null) { - work = new SecurityGroupWorkVO(vmId, null, null, SecurityGroupWorkVO.Step.Scheduled, null); - work = _workDao.persist(work); + for (Long vmId : affectedVms) { if (s_logger.isTraceEnabled()) { - s_logger.trace("Security Group Mgr: created new work item for " + vmId); + s_logger.trace("Security Group Mgr: scheduling ruleset updates for " + vmId); } + VmRulesetLogVO log = null; + SecurityGroupWorkVO work = null; + //UserVm vm = null; + Transaction txn = null; + try { + txn = Transaction.currentTxn(); + txn.start(); + + //vm = _userVMDao.acquireInLockTable(vmId); + //if (vm == null) { + //s_logger.warn("Failed to acquire lock on vm id " + vmId); + //continue; + //} + log = _rulesetLogDao.findByVmId(vmId); + if (log == null) { + log = new VmRulesetLogVO(vmId); + log = _rulesetLogDao.persist(log); + } + + if (log != null && updateSeqno) { + log.incrLogsequence(); + _rulesetLogDao.update(log.getId(), log); + } + work = _workDao.findByVmIdStep(vmId, Step.Scheduled); + if (work == null) { + work = new SecurityGroupWorkVO(vmId, null, null, SecurityGroupWorkVO.Step.Scheduled, null); + work = _workDao.persist(work); + if (s_logger.isTraceEnabled()) { + s_logger.trace("Security Group Mgr: created new work item for " + vmId); + } + } + + work.setLogsequenceNumber(log.getLogsequence()); + _workDao.update(work.getId(), work); + + } finally { + // if (vm != null) { + // _userVMDao.releaseFromLockTable(vmId); + // } + if (txn != null) + txn.commit(); + } + + _executorPool.schedule(new WorkerThread(), delayMs, TimeUnit.MILLISECONDS); } - - work.setLogsequenceNumber(log.getLogsequence()); - _workDao.update(work.getId(), work); - } finally { - if (vm != null) { - _userVMDao.releaseFromLockTable(vmId); + _workLock.unlock(); + if (s_logger.isTraceEnabled()) { + s_logger.trace("Security Group Mgr: released global work lock"); } - - if (txn != null) - txn.commit(); } - - _executorPool.schedule(new WorkerThread(), delayMs, TimeUnit.MILLISECONDS); - + } else { + s_logger.warn("Security Group Mgr: failed to acquire global work lock"); } } @@ -778,7 +796,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG Map configs = _configDao.getConfiguration("Network", params); _numWorkerThreads = NumbersUtil.parseInt(configs.get(Config.SecurityGroupWorkerThreads.key()), WORKER_THREAD_COUNT); _timeBetweenCleanups = NumbersUtil.parseInt(configs.get(Config.SecurityGroupWorkCleanupInterval.key()), TIME_BETWEEN_CLEANUPS); - + _globalWorkLockTimeout = NumbersUtil.parseInt(configs.get(Config.SecurityGroupWorkGlobalLockTimeout.key()), 300); /* register state listener, no matter security group is enabled or not */ VirtualMachine.State.getStateMachine().registerListener(this);