bug 11336: limit the number of outstanding messages sent to a host to conserve memory

This commit is contained in:
Chiradeep Vittal 2011-10-25 12:10:14 -07:00
parent 99a7371ca3
commit 81a9e05b39
4 changed files with 137 additions and 2 deletions

View File

@ -100,6 +100,7 @@ public enum Config {
SecurityGroupWorkCleanupInterval("Network", ManagementServer.class, Integer.class, "network.securitygroups.work.cleanup.interval", "120", "Time interval (seconds) in which finished work is cleaned up from the work table", null),
SecurityGroupWorkerThreads("Network", ManagementServer.class, Integer.class, "network.securitygroups.workers.pool.size", "50", "Number of worker threads processing the security group update work queue", null),
SecurityGroupWorkGlobalLockTimeout("Network", ManagementServer.class, Integer.class, "network.securitygroups.work.lock.timeout", "300", "Lock wait timeout (seconds) while updating the security group work queue", null),
SecurityGroupWorkPerAgentMaxQueueSize("Network", ManagementServer.class, Integer.class, "network.securitygroups.work.per.agent.queue.size", "100", "The number of outstanding security group work items that can be queued to a host. If exceeded, work items will get dropped to conserve memory. Security Group Sync will take care of ensuring that the host gets updated eventually", null),
FirewallRuleUiEnabled("Network", ManagementServer.class, Boolean.class, "firewall.rule.ui.enabled", "false", "enable/disable UI that separates firewall rules from NAT/LB rules", null),

View File

@ -60,6 +60,8 @@ public class SecurityGroupListener implements Listener {
SecurityGroupWorkDao _workDao;
Map<Long, Integer> _vmFailureCounts = new ConcurrentHashMap<Long, Integer>();
private SecurityGroupWorkTracker _workTracker;
public SecurityGroupListener(SecurityGroupManagerImpl securityGroupManager,
AgentManager agentMgr, SecurityGroupWorkDao workDao) {
@ -110,6 +112,8 @@ public class SecurityGroupListener implements Listener {
}
}
commandNum++;
if (_workTracker != null)
_workTracker.processAnswers(agentId, seq, answers);
}
}
@ -171,6 +175,9 @@ public class SecurityGroupListener implements Listener {
//usually hypervisors that do not understand sec group rules.
s_logger.debug("Unable to schedule network rules cleanup for host " + host.getId(), e);
}
if (_workTracker != null) {
_workTracker.processConnect(host.getId());
}
}
}
@ -184,11 +191,22 @@ public class SecurityGroupListener implements Listener {
@Override
public boolean processDisconnect(long agentId, Status state) {
if (_workTracker != null) {
_workTracker.processDisconnect(agentId);
}
return true;
}
@Override
public boolean processTimeout(long agentId, long seq) {
if (_workTracker != null) {
_workTracker.processTimeout(agentId, seq);
}
return true;
}
public void setWorkDispatcher(SecurityGroupWorkTracker workDispatcher) {
this._workTracker = workDispatcher;
}
}

View File

@ -30,9 +30,11 @@ import javax.naming.ConfigurationException;
import com.cloud.agent.api.SecurityIngressRulesCmd;
import com.cloud.agent.manager.Commands;
import com.cloud.configuration.Config;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.network.security.SecurityGroupWork.Step;
import com.cloud.uservm.UserVm;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.Profiler;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.mgmt.JmxUtil;
@ -46,6 +48,7 @@ import com.cloud.vm.VirtualMachine.State;
@Local(value={ SecurityGroupManager.class, SecurityGroupService.class })
public class SecurityGroupManagerImpl2 extends SecurityGroupManagerImpl{
SecurityGroupWorkQueue _workQueue = new LocalSecurityGroupWorkQueue();
SecurityGroupWorkTracker _workTracker;
SecurityManagerMBeanImpl _mBean;
WorkerThread[] _workers;
@ -166,9 +169,13 @@ public class SecurityGroupManagerImpl2 extends SecurityGroupManagerImpl{
if (s_logger.isTraceEnabled()) {
s_logger.trace("SecurityGroupManager v2: found vm, " + userVmId + " state=" + vm.getState());
}
Map<PortAndProto, Set<String>> rules = generateRulesForVM(userVmId);
Long agentId = vm.getHostId();
if (agentId != null) {
if ( !_workTracker.canSend(agentId)) {
s_logger.trace("SecurityGroupManager v2: not sending ruleset update: too many messages outstanding");
return;
}
Map<PortAndProto, Set<String>> rules = generateRulesForVM(userVmId);
SecurityIngressRulesCmd cmd = generateRulesetCmd(vm.getInstanceName(), vm.getPrivateIpAddress(),
vm.getPrivateMacAddress(), vm.getId(), null,
work.getLogsequenceNumber(), rules);
@ -185,6 +192,7 @@ public class SecurityGroupManagerImpl2 extends SecurityGroupManagerImpl{
}
} catch (AgentUnavailableException e) {
s_logger.debug("Unable to send updates for vm: " + userVmId + "(agentid=" + agentId + ")");
_workTracker.handleException(agentId);
}
}
} else {
@ -261,7 +269,12 @@ public class SecurityGroupManagerImpl2 extends SecurityGroupManagerImpl{
} catch (Exception e){
s_logger.error("Failed to register MBean", e);
}
return super.configure(name, params);
boolean result = super.configure(name, params);
Map<String, String> configs = _configDao.getConfiguration("Network", params);
int bufferLength = NumbersUtil.parseInt(configs.get(Config.SecurityGroupWorkPerAgentMaxQueueSize.key()), 100);
_workTracker = new SecurityGroupWorkTracker(_agentMgr, _answerListener, bufferLength);
_answerListener.setWorkDispatcher(_workTracker);
return result;
}
public void disableSchedulerForVm(Long vmId, boolean disable) {

View File

@ -0,0 +1,103 @@
package com.cloud.network.security;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.Listener;
import com.cloud.agent.api.Answer;
public class SecurityGroupWorkTracker {
protected static final Logger s_logger = Logger.getLogger(SecurityGroupWorkTracker.class);
protected AtomicLong _discardCount = new AtomicLong(0);
AgentManager _agentMgr;
Listener _answerListener;
int _bufferLength;
Map<Long, Integer> _unackedMessages = new ConcurrentHashMap<Long, Integer>();
public SecurityGroupWorkTracker(AgentManager agentMgr, Listener answerListener, int bufferLength) {
super();
assert(bufferLength >= 1) : "SecurityGroupWorkTracker: Cannot have a zero length buffer";
this._agentMgr = agentMgr;
this._answerListener = answerListener;
this._bufferLength = bufferLength;
}
public boolean canSend(long agentId) {
int currLength = 0;
synchronized(this) {
Integer outstanding = _unackedMessages.get(agentId);
if (outstanding == null) {
outstanding = new Integer(0);
_unackedMessages.put(new Long(agentId), outstanding);
}
currLength = outstanding.intValue();
if (currLength + 1 > _bufferLength) {
long discarded = _discardCount.incrementAndGet();
//drop it on the floor
s_logger.debug("SecurityGroupManager: dropping a message because there are more than " + currLength +
" outstanding messages, total dropped=" + discarded);
return false;
}
_unackedMessages.put(new Long(agentId), ++currLength);
}
return true;
}
public void handleException(long agentId) {
synchronized(this) {
Integer outstanding = _unackedMessages.get(agentId);
if (outstanding != null && outstanding != 0 ) {
_unackedMessages.put(agentId, --outstanding);
}
}
}
public void processAnswers(long agentId, long seq, Answer[] answers) {
synchronized(this) {
Integer outstanding = _unackedMessages.get(agentId);
if (outstanding != null && outstanding != 0 ) {
_unackedMessages.put(agentId, --outstanding);
}
}
}
public void processTimeout(long agentId, long seq) {
synchronized(this) {
Integer outstanding = _unackedMessages.get(agentId);
if (outstanding != null && outstanding != 0 ) {
_unackedMessages.put(agentId, --outstanding);
}
}
}
public void processDisconnect(long agentId) {
synchronized(this) {
_unackedMessages.put(agentId, 0);
}
}
public void processConnect(long agentId) {
synchronized(this) {
_unackedMessages.put(agentId, 0);
}
}
public long getDiscardCount() {
return _discardCount.get();
}
public int getUnackedCount(long agentId) {
Integer outstanding = _unackedMessages.get(agentId);
if (outstanding == null) {
return 0;
}
return outstanding.intValue();
}
}