bug 10884: try out a global lock instead of per-vm locks

This commit is contained in:
Chiradeep Vittal 2011-08-01 16:30:47 -07:00
parent a6a2383561
commit efaa63a428
2 changed files with 62 additions and 43 deletions

View File

@ -67,6 +67,7 @@ public enum Config {
VmNetworkThrottlingRate("Network", ManagementServer.class, Integer.class, "vm.network.throttling.rate", "200", "Default data transfer rate in megabits per second allowed in User vm's default network.", null),
SecurityGroupWorkCleanupInterval("Network", ManagementServer.class, Integer.class, "network.securitygroups.work.cleanup.interval", "120", "Time interval (seconds) in which finished work is cleaned up from the work table", null),
SecurityGroupWorkerThreads("Network", ManagementServer.class, Integer.class, "network.securitygroups.workers.pool.size", "50", "Number of worker threads processing the security group update work queue", null),
SecurityGroupWorkGlobalLockTimeout("Network", ManagementServer.class, Integer.class, "network.securitygroups.work.lock.timeout", "300", "Lock wait timeout (seconds) while updating the security group work queue", null),
// Usage
CapacityCheckPeriod("Usage", ManagementServer.class, Integer.class, "capacity.check.period", "300000", "The interval in milliseconds between capacity checks", null),

View File

@ -86,6 +86,7 @@ import com.cloud.utils.component.Manager;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.Filter;
import com.cloud.utils.db.GlobalLock;
import com.cloud.utils.db.JoinBuilder;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
@ -149,6 +150,11 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
private int _timeBetweenCleanups = TIME_BETWEEN_CLEANUPS; // seconds
private int _numWorkerThreads = WORKER_THREAD_COUNT;
private int _globalWorkLockTimeout = 300; // 5 minutes
private final GlobalLock _workLock = GlobalLock.getInternLock("SecurityGroupWork");
SecurityGroupListener _answerListener;
@ -368,57 +374,69 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: scheduling ruleset updates for " + affectedVms.size() + " vms");
}
for (Long vmId : affectedVms) {
boolean locked = _workLock.lock(_globalWorkLockTimeout);
if (locked) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: scheduling ruleset updates for " + vmId);
s_logger.trace("Security Group Mgr: acquired global work lock");
}
VmRulesetLogVO log = null;
SecurityGroupWorkVO work = null;
UserVm vm = null;
Transaction txn = null;
try {
txn = Transaction.currentTxn();
txn.start();
vm = _userVMDao.acquireInLockTable(vmId);
if (vm == null) {
s_logger.warn("Failed to acquire lock on vm id " + vmId);
continue;
}
log = _rulesetLogDao.findByVmId(vmId);
if (log == null) {
log = new VmRulesetLogVO(vmId);
log = _rulesetLogDao.persist(log);
}
if (log != null && updateSeqno) {
log.incrLogsequence();
_rulesetLogDao.update(log.getId(), log);
}
work = _workDao.findByVmIdStep(vmId, Step.Scheduled);
if (work == null) {
work = new SecurityGroupWorkVO(vmId, null, null, SecurityGroupWorkVO.Step.Scheduled, null);
work = _workDao.persist(work);
for (Long vmId : affectedVms) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: created new work item for " + vmId);
s_logger.trace("Security Group Mgr: scheduling ruleset updates for " + vmId);
}
VmRulesetLogVO log = null;
SecurityGroupWorkVO work = null;
//UserVm vm = null;
Transaction txn = null;
try {
txn = Transaction.currentTxn();
txn.start();
//vm = _userVMDao.acquireInLockTable(vmId);
//if (vm == null) {
//s_logger.warn("Failed to acquire lock on vm id " + vmId);
//continue;
//}
log = _rulesetLogDao.findByVmId(vmId);
if (log == null) {
log = new VmRulesetLogVO(vmId);
log = _rulesetLogDao.persist(log);
}
if (log != null && updateSeqno) {
log.incrLogsequence();
_rulesetLogDao.update(log.getId(), log);
}
work = _workDao.findByVmIdStep(vmId, Step.Scheduled);
if (work == null) {
work = new SecurityGroupWorkVO(vmId, null, null, SecurityGroupWorkVO.Step.Scheduled, null);
work = _workDao.persist(work);
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: created new work item for " + vmId);
}
}
work.setLogsequenceNumber(log.getLogsequence());
_workDao.update(work.getId(), work);
} finally {
// if (vm != null) {
// _userVMDao.releaseFromLockTable(vmId);
// }
if (txn != null)
txn.commit();
}
_executorPool.schedule(new WorkerThread(), delayMs, TimeUnit.MILLISECONDS);
}
work.setLogsequenceNumber(log.getLogsequence());
_workDao.update(work.getId(), work);
} finally {
if (vm != null) {
_userVMDao.releaseFromLockTable(vmId);
_workLock.unlock();
if (s_logger.isTraceEnabled()) {
s_logger.trace("Security Group Mgr: released global work lock");
}
if (txn != null)
txn.commit();
}
_executorPool.schedule(new WorkerThread(), delayMs, TimeUnit.MILLISECONDS);
} else {
s_logger.warn("Security Group Mgr: failed to acquire global work lock");
}
}
@ -778,7 +796,7 @@ public class SecurityGroupManagerImpl implements SecurityGroupManager, SecurityG
Map<String, String> configs = _configDao.getConfiguration("Network", params);
_numWorkerThreads = NumbersUtil.parseInt(configs.get(Config.SecurityGroupWorkerThreads.key()), WORKER_THREAD_COUNT);
_timeBetweenCleanups = NumbersUtil.parseInt(configs.get(Config.SecurityGroupWorkCleanupInterval.key()), TIME_BETWEEN_CLEANUPS);
_globalWorkLockTimeout = NumbersUtil.parseInt(configs.get(Config.SecurityGroupWorkGlobalLockTimeout.key()), 300);
/* register state listener, no matter security group is enabled or not */
VirtualMachine.State.getStateMachine().registerListener(this);