propagate lock table fix

This commit is contained in:
Alex Huang 2011-07-22 11:23:41 -07:00
parent 65b0af0a9d
commit b59c6b4ab6
4 changed files with 99 additions and 96 deletions

View File

@ -18,6 +18,7 @@
package com.cloud.agent.manager;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import org.apache.log4j.Logger;
@ -59,13 +60,13 @@ public class AgentMonitor extends Thread implements Listener {
private AlertManager _alertMgr;
private long _msId;
private ConnectionConcierge _concierge;
protected AgentMonitor() {
}
public AgentMonitor(long msId, HostDao hostDao, VMInstanceDao vmDao, DataCenterDao dcDao, HostPodDao podDao, AgentManagerImpl agentMgr, AlertManager alertMgr, long pingTimeout) {
super("AgentMonitor");
_msId = msId;
super("AgentMonitor");
_msId = msId;
_pingTimeout = pingTimeout;
_hostDao = hostDao;
_agentMgr = agentMgr;
@ -74,19 +75,21 @@ public class AgentMonitor extends Thread implements Listener {
_dcDao = dcDao;
_podDao = podDao;
_alertMgr = alertMgr;
Connection conn = Transaction.getStandaloneConnection();
if (conn == null) {
throw new CloudRuntimeException("Unable to get a db connection.");
try {
Connection conn = Transaction.getStandaloneConnectionWithException();
conn.setAutoCommit(true);
conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
_concierge = new ConnectionConcierge("AgentMonitor", conn, true);
} catch (SQLException e) {
throw new CloudRuntimeException("Cannot get connection", e);
}
_concierge = new ConnectionConcierge("AgentMonitor", conn, true, true);
}
// TODO : use host machine time is not safe in clustering environment
@Override
public void run() {
public void run() {
s_logger.info("Agent Monitor is started.");
while (!_stop) {
try {
// check every 60 seconds
@ -94,41 +97,41 @@ public class AgentMonitor extends Thread implements Listener {
} catch (InterruptedException e) {
s_logger.info("Who woke me from my slumber?");
}
GlobalLock lock = GlobalLock.getInternLock("AgentMonitorLock");
if (lock == null) {
s_logger.error("Unable to acquire lock. Better luck next time?");
continue;
}
if (!lock.lock(10)) {
s_logger.info("Someone else is already working on the agents. Skipping my turn");
continue;
}
GlobalLock lock = GlobalLock.getInternLock("AgentMonitorLock");
if (lock == null) {
s_logger.error("Unable to acquire lock. Better luck next time?");
continue;
}
if (!lock.lock(10)) {
s_logger.info("Someone else is already working on the agents. Skipping my turn");
continue;
}
try {
long time = (System.currentTimeMillis() >> 10) - _pingTimeout;
List<HostVO> hosts = _hostDao.findLostHosts(time);
if (s_logger.isInfoEnabled()) {
s_logger.info("Found " + hosts.size() + " hosts behind on ping. pingTimeout : " + _pingTimeout + ", mark time : " + time);
}
for (HostVO host : hosts) {
if (host.getType().equals(Host.Type.ExternalFirewall) ||
host.getType().equals(Host.Type.ExternalLoadBalancer) ||
host.getType().equals(Host.Type.TrafficMonitor) ||
host.getType().equals(Host.Type.SecondaryStorage)) {
continue;
}
if (host.getManagementServerId() == null || host.getManagementServerId() == _msId) {
if (s_logger.isInfoEnabled()) {
s_logger.info("Asking agent mgr to investgate why host " + host.getId() + " is behind on ping. last ping time: " + host.getLastPinged());
}
_agentMgr.disconnect(host.getId(), Event.PingTimeout, true);
}
if (host.getType().equals(Host.Type.ExternalFirewall) ||
host.getType().equals(Host.Type.ExternalLoadBalancer) ||
host.getType().equals(Host.Type.TrafficMonitor) ||
host.getType().equals(Host.Type.SecondaryStorage)) {
continue;
}
if (host.getManagementServerId() == null || host.getManagementServerId() == _msId) {
if (s_logger.isInfoEnabled()) {
s_logger.info("Asking agent mgr to investgate why host " + host.getId() + " is behind on ping. last ping time: " + host.getLastPinged());
}
_agentMgr.disconnect(host.getId(), Event.PingTimeout, true);
}
}
hosts = _hostDao.listByStatus(Status.PrepareForMaintenance, Status.ErrorInMaintenance);
for (HostVO host : hosts) {
long hostId = host.getId();
@ -147,13 +150,13 @@ public class AgentMonitor extends Thread implements Listener {
} catch (Throwable th) {
s_logger.error("Caught the following exception: ", th);
} finally {
lock.unlock();
lock.unlock();
}
}
s_logger.info("Agent Monitor is leaving the building!");
}
public void signalStop() {
_stop = true;
interrupt();
@ -193,10 +196,10 @@ public class AgentMonitor extends Thread implements Listener {
}
return processed;
}
@Override
public AgentControlAnswer processControlCommand(long agentId, AgentControlCommand cmd) {
return null;
return null;
}
@Override
@ -207,15 +210,15 @@ public class AgentMonitor extends Thread implements Listener {
public boolean processDisconnect(long agentId, Status state) {
return true;
}
@Override
public boolean processTimeout(long agentId, long seq) {
return true;
return true;
}
@Override
public int getTimeout() {
return -1;
return -1;
}
}

View File

@ -477,14 +477,14 @@ public class ClusterManagerImpl implements ClusterManager {
}
public void notifyNodeJoined(List<ManagementServerHostVO> nodeList) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Notify management server node join to listeners.");
for(ManagementServerHostVO mshost : nodeList) {
s_logger.debug("Joining node, IP: " + mshost.getServiceIP() + ", msid: " + mshost.getMsid());
}
}
if(s_logger.isDebugEnabled()) {
s_logger.debug("Notify management server node join to listeners.");
for(ManagementServerHostVO mshost : nodeList) {
s_logger.debug("Joining node, IP: " + mshost.getServiceIP() + ", msid: " + mshost.getMsid());
}
}
synchronized(listeners) {
for(ClusterManagerListener listener : listeners) {
listener.onManagementNodeJoined(nodeList, _mshostId);
@ -496,14 +496,14 @@ public class ClusterManagerImpl implements ClusterManager {
}
public void notifyNodeLeft(List<ManagementServerHostVO> nodeList) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Notify management server node left to listeners.");
for(ManagementServerHostVO mshost : nodeList) {
s_logger.debug("Leaving node, IP: " + mshost.getServiceIP() + ", msid: " + mshost.getMsid());
}
}
if(s_logger.isDebugEnabled()) {
s_logger.debug("Notify management server node left to listeners.");
for(ManagementServerHostVO mshost : nodeList) {
s_logger.debug("Leaving node, IP: " + mshost.getServiceIP() + ", msid: " + mshost.getMsid());
}
}
synchronized(listeners) {
for(ClusterManagerListener listener : listeners) {
listener.onManagementNodeLeft(nodeList, _mshostId);
@ -515,9 +515,9 @@ public class ClusterManagerImpl implements ClusterManager {
}
public void notifyNodeIsolated() {
if(s_logger.isDebugEnabled())
s_logger.debug("Notify management server node isolation to listeners");
if(s_logger.isDebugEnabled())
s_logger.debug("Notify management server node isolation to listeners");
synchronized(listeners) {
for(ClusterManagerListener listener : listeners) {
listener.onManagementNodeIsolated();
@ -673,7 +673,7 @@ public class ClusterManagerImpl implements ClusterManager {
private Connection getHeartbeatConnection() throws SQLException {
if(_heartbeatConnection == null) {
Connection conn = Transaction.getStandaloneConnectionWithException();
_heartbeatConnection = new ConnectionConcierge("ClusterManagerHeartBeat", conn, false, false);
_heartbeatConnection = new ConnectionConcierge("ClusterManagerHeartBeat", conn, false);
}
return _heartbeatConnection.conn();
@ -969,11 +969,10 @@ public class ClusterManagerImpl implements ClusterManager {
if (s_logger.isInfoEnabled()) {
s_logger.info("Management server (host id : " + _mshostId + ") is being started at " + _clusterNodeIP + ":" + _currentServiceAdapter.getServicePort());
}
// use seperate thread for heartbeat updates
_heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS);
_notificationExecutor.submit(getNotificationTask());
} catch (Throwable e) {
s_logger.error("Unexpected exception : ", e);
@ -1093,8 +1092,8 @@ public class ClusterManagerImpl implements ClusterManager {
if(_currentServiceAdapter == null) {
throw new ConfigurationException("Unable to set current cluster service adapter");
}
_agentLBEnabled = Boolean.valueOf(configDao.getValue(Config.AgentLbEnable.key()));
String connectedAgentsThreshold = configs.get("agent.load.threshold");
@ -1225,7 +1224,7 @@ public class ClusterManagerImpl implements ClusterManager {
public boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException {
return _rebalanceService.executeRebalanceRequest(agentId, currentOwnerId, futureOwnerId, event);
}
@Override
public boolean isAgentRebalanceEnabled() {
return _agentLBEnabled;

View File

@ -48,19 +48,19 @@ import com.cloud.utils.mgmt.JmxUtil;
* your own.
*/
public class ConnectionConcierge {
static final Logger s_logger = Logger.getLogger(ConnectionConcierge.class);
static final ConnectionConciergeManager s_mgr = new ConnectionConciergeManager();
Connection _conn;
String _name;
boolean _keepAlive;
boolean _autoCommit;
int _isolationLevel;
int _holdability;
public ConnectionConcierge(String name, Connection conn, boolean autoCommit, boolean keepAlive) {
public ConnectionConcierge(String name, Connection conn, boolean keepAlive) {
_name = name + s_mgr.getNextId();
_keepAlive = keepAlive;
try {
@ -72,7 +72,7 @@ public class ConnectionConcierge {
}
reset(conn);
}
public void reset(Connection conn) {
try {
release();
@ -90,11 +90,11 @@ public class ConnectionConcierge {
s_mgr.register(_name, this);
s_logger.debug("Registering a database connection for " + _name);
}
public final Connection conn() {
return _conn;
}
public void release() {
s_mgr.unregister(_name);
try {
@ -106,23 +106,23 @@ public class ConnectionConcierge {
throw new CloudRuntimeException("Problem in closing a connection", e);
}
}
@Override
protected void finalize() throws Exception {
if (_conn != null) {
release();
}
}
public boolean keepAlive() {
return _keepAlive;
}
protected static class ConnectionConciergeManager extends StandardMBean implements ConnectionConciergeMBean {
ScheduledExecutorService _executor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("ConnectionKeeper"));
final ConcurrentHashMap<String, ConnectionConcierge> _conns = new ConcurrentHashMap<String, ConnectionConcierge>();
final AtomicInteger _idGenerator = new AtomicInteger();
ConnectionConciergeManager() {
super(ConnectionConciergeMBean.class, false);
resetKeepAliveTask(20);
@ -132,19 +132,19 @@ public class ConnectionConcierge {
s_logger.error("Unable to register mbean", e);
}
}
public Integer getNextId() {
return _idGenerator.incrementAndGet();
}
public void register(String name, ConnectionConcierge concierge) {
_conns.put(name, concierge);
}
public void unregister(String name) {
_conns.remove(name);
}
protected String testValidity(String name, Connection conn) {
PreparedStatement pstmt = null;
try {
@ -182,12 +182,12 @@ public class ConnectionConcierge {
if (concierge == null) {
return "Not Found";
}
Connection conn = Transaction.getStandaloneConnection();
if (conn == null) {
return "Unable to get anotehr db connection";
}
concierge.reset(conn);
return "Done";
}
@ -201,7 +201,7 @@ public class ConnectionConcierge {
s_logger.error("Unable to shutdown executor", e);
}
}
_executor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("ConnectionConcierge"));
_executor.schedule(new Runnable() {
@Override
@ -215,7 +215,7 @@ public class ConnectionConcierge {
}
}
}, seconds, TimeUnit.SECONDS);
return "As you wish.";
}

View File

@ -55,7 +55,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
TimeZone s_gmtTimeZone = TimeZone.getTimeZone("GMT");
private long _msId;
private final long _msId;
private static Merovingian2 s_instance = null;
ConnectionConcierge _concierge = null;
@ -67,7 +67,8 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
try {
conn = Transaction.getStandaloneConnectionWithException();
conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
_concierge = new ConnectionConcierge("LockMaster", conn, true, true);
conn.setAutoCommit(true);
_concierge = new ConnectionConcierge("LockMaster", conn, false);
} catch (SQLException e) {
s_logger.error("Unable to get a new db connection", e);
throw new CloudRuntimeException("Unable to initialize a connection to the database for locking purposes: ", e);