bug 10501: This is really Kelven's bug but I'll fix it for him anyways. DAO code already have a way to extract the DB connection from a transaction that is stored in the TLS. There's no real reason for the DAO code to add special semantics to use a different DB connection. That can be done by simply switching the transaction before it even reached the dao code. Think about it. Why would anyone want to call one dao function, switch transaction, and then switch back. The right thing is for the caller to switch transaction, call a series of dao codes, and switch it back. That's the semantics I changed to. By doing this, it also eliminates the number of debug messages in this bug.

This commit is contained in:
Alex Huang 2011-07-01 11:02:56 -07:00
parent 9a6c567269
commit cfc25d01be
8 changed files with 82 additions and 169 deletions

View File

@ -94,6 +94,7 @@ public class ClusterManagerImpl implements ClusterManager {
private final Map<String, Listener> asyncCalls;
private final Gson gson;
@Inject
private AgentManager _agentMgr;
@Inject
private ClusteredAgentRebalanceService _rebalanceService;
@ -563,13 +564,14 @@ public class ClusterManagerImpl implements ClusterManager {
return new Runnable() {
@Override
public void run() {
Transaction txn = Transaction.open("ClusterHeartBeat");
try {
txn.transitToUserManagedConnection(getHeartbeatConnection());
if(s_logger.isTraceEnabled()) {
s_logger.trace("Cluster manager heartbeat update, id:" + _mshostId);
}
Connection conn = getHeartbeatConnection();
_mshostDao.update(conn, _mshostId, getCurrentRunId(), DateUtil.currentGMTTime());
_mshostDao.update(_mshostId, getCurrentRunId(), DateUtil.currentGMTTime());
if (s_logger.isTraceEnabled()) {
s_logger.trace("Cluster manager peer-scan, id:" + _mshostId);
@ -577,10 +579,10 @@ public class ClusterManagerImpl implements ClusterManager {
if (!_peerScanInited) {
_peerScanInited = true;
initPeerScan(conn);
initPeerScan();
}
peerScan(conn);
peerScan();
} catch(CloudRuntimeException e) {
s_logger.error("Runtime DB exception ", e.getCause());
@ -602,6 +604,8 @@ public class ClusterManagerImpl implements ClusterManager {
}
s_logger.error("Problem with the cluster heartbeat!", e);
} finally {
txn.close("ClusterHeartBeat");
}
}
};
@ -706,7 +710,7 @@ public class ClusterManagerImpl implements ClusterManager {
}
}
try { Thread.currentThread().sleep(1000); } catch (InterruptedException e) {}
try { Thread.sleep(1000); } catch (InterruptedException e) {}
}
}
};
@ -729,20 +733,20 @@ public class ClusterManagerImpl implements ClusterManager {
return null;
}
private void initPeerScan(Connection conn) {
private void initPeerScan() {
// upon startup, for all inactive management server nodes that we see at startup time, we will send notification also to help upper layer perform
// missed cleanup
Date cutTime = DateUtil.currentGMTTime();
List<ManagementServerHostVO> inactiveList = _mshostDao.getInactiveList(conn, new Date(cutTime.getTime() - heartbeatThreshold));
List<ManagementServerHostVO> inactiveList = _mshostDao.getInactiveList(new Date(cutTime.getTime() - heartbeatThreshold));
if(inactiveList.size() > 0) {
this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, inactiveList));
}
}
private void peerScan(Connection conn) {
private void peerScan() {
Date cutTime = DateUtil.currentGMTTime();
List<ManagementServerHostVO> currentList = _mshostDao.getActiveList(conn, new Date(cutTime.getTime() - heartbeatThreshold));
List<ManagementServerHostVO> currentList = _mshostDao.getActiveList(new Date(cutTime.getTime() - heartbeatThreshold));
List<ManagementServerHostVO> removedNodeList = new ArrayList<ManagementServerHostVO>();
List<ManagementServerHostVO> invalidatedNodeList = new ArrayList<ManagementServerHostVO>();
@ -801,7 +805,7 @@ public class ClusterManagerImpl implements ClusterManager {
if(!pingManagementNode(mshost)) {
s_logger.warn("Management node " + mshost.getId() + " is detected inactive by timestamp and also not pingable");
activePeers.remove(mshost.getId());
_mshostDao.invalidateRunSession(conn, mshost.getId(), mshost.getRunid());
_mshostDao.invalidateRunSession(mshost.getId(), mshost.getRunid());
try {
JmxUtil.unregisterMBean("ClusterManager", "Node " + mshost.getId());
} catch(Exception e) {

View File

@ -18,14 +18,12 @@
package com.cloud.cluster.dao;
import java.sql.Connection;
import java.util.Date;
import java.util.List;
import com.cloud.cluster.ManagementServerHost.State;
import com.cloud.cluster.ManagementServerHost;
import com.cloud.cluster.ManagementServerHost.State;
import com.cloud.cluster.ManagementServerHostVO;
import com.cloud.host.Status;
import com.cloud.utils.db.GenericDao;
public interface ManagementServerHostDao extends GenericDao<ManagementServerHostVO, Long> {
@ -40,13 +38,9 @@ public interface ManagementServerHostDao extends GenericDao<ManagementServerHost
List<ManagementServerHostVO> getActiveList(Date cutTime);
List<ManagementServerHostVO> getInactiveList(Date cutTime);
void update(Connection conn, long id, long runid, String name, String version, String serviceIP, int servicePort, Date lastUpdate);
void update(Connection conn, long id, long runid, Date lastUpdate);
void invalidateRunSession(Connection conn, long id, long runid);
List<ManagementServerHostVO> getActiveList(Connection conn, Date cutTime);
List<ManagementServerHostVO> getInactiveList(Connection conn, Date cutTime);
void invalidateRunSession(long id, long runid);
void update(Connection conn, long id, long runId, State state, Date lastUpdate);
void update(long id, long runId, State state, Date lastUpdate);
List<ManagementServerHostVO> listBy(ManagementServerHost.State...states);
}

View File

@ -18,7 +18,6 @@
package com.cloud.cluster.dao;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Date;
@ -51,110 +50,20 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase<ManagementServer
private final SearchBuilder<ManagementServerHostVO> StateSearch;
@Override
public void update(Connection conn, long id, long runid, String name, String version, String serviceIP, int servicePort, Date lastUpdate) {
public void invalidateRunSession(long id, long runid) {
Transaction txn = Transaction.currentTxn();
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement("update mshost set name=?, version=?, service_ip=?, service_port=?, last_update=?, removed=null, alert_count=0, runid=? where id=?");
pstmt.setString(1, name);
pstmt.setString(2, version);
pstmt.setString(3, serviceIP);
pstmt.setInt(4, servicePort);
pstmt.setString(5, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), lastUpdate));
pstmt.setLong(6, runid);
pstmt.setLong(7, id);
pstmt.executeUpdate();
conn.commit();
} catch(SQLException e ) {
throw new CloudRuntimeException("DB exception on " + pstmt.toString(), e);
} finally {
if(pstmt != null) {
try {
pstmt.close();
} catch(Exception e) {
s_logger.warn("Unable to close prepared statement due to exception ", e);
}
}
}
}
@Override
public void update(Connection conn, long id, long runid, Date lastUpdate) {
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement("update mshost set last_update=?, removed=null, alert_count=0 where id=? and runid=?");
pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), lastUpdate));
pstmt.setLong(2, id);
pstmt.setLong(3, runid);
int count = pstmt.executeUpdate();
conn.commit();
if(count < 1) {
throw new CloudRuntimeException("Invalid cluster session detected", new ClusterInvalidSessionException("runid " + runid + " is no longer valid"));
}
} catch (SQLException e) {
throw new CloudRuntimeException("DB exception on " + pstmt.toString(), e);
} finally {
if(pstmt != null) {
try {
pstmt.close();
} catch(Exception e) {
s_logger.warn("Unable to close prepared statement due to exception ", e);
}
}
}
}
@Override
public void invalidateRunSession(Connection conn, long id, long runid) {
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement("update mshost set runid=0, state='Down' where id=? and runid=?");
pstmt = txn.prepareAutoCloseStatement("update mshost set runid=0, state='Down' where id=? and runid=?");
pstmt.setLong(1, id);
pstmt.setLong(2, runid);
pstmt.executeUpdate();
conn.commit();
} catch (SQLException e) {
throw new CloudRuntimeException("DB exception on " + pstmt.toString(), e);
} finally {
if(pstmt != null) {
try {
pstmt.close();
} catch(Exception e) {
s_logger.warn("Unable to close prepared statement due to exception ", e);
}
}
}
}
@Override
public List<ManagementServerHostVO> getActiveList(Connection conn, Date cutTime) {
Transaction txn = Transaction.openNew("getActiveList", conn);
try {
SearchCriteria<ManagementServerHostVO> sc = ActiveSearch.create();
sc.setParameters("lastUpdateTime", cutTime);
return listIncludingRemovedBy(sc);
} finally {
txn.close("getActiveList");
}
}
@Override
public List<ManagementServerHostVO> getInactiveList(Connection conn, Date cutTime) {
Transaction txn = Transaction.openNew("getInactiveList", conn);
try {
SearchCriteria<ManagementServerHostVO> sc = InactiveSearch.create();
sc.setParameters("lastUpdateTime", cutTime);
return listIncludingRemovedBy(sc);
} finally {
txn.close("getInactiveList");
}
}
@Override
public ManagementServerHostVO findByMsid(long msid) {
SearchCriteria<ManagementServerHostVO> sc = MsIdSearch.create();
@ -190,7 +99,6 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase<ManagementServer
txn.commit();
} catch(Exception e) {
s_logger.warn("Unexpected exception, ", e);
txn.rollback();
}
}
@ -210,7 +118,6 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase<ManagementServer
return true;
} catch(Exception e) {
s_logger.warn("Unexpected exception, ", e);
txn.rollback();
}
return false;
@ -237,7 +144,6 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase<ManagementServer
}
} catch(Exception e) {
s_logger.warn("Unexpected exception, ", e);
txn.rollback();
}
}
@ -298,31 +204,23 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase<ManagementServer
@Override
public void update(Connection conn, long id, long runId, State state, Date lastUpdate) {
public void update(long id, long runId, State state, Date lastUpdate) {
Transaction txn = Transaction.currentTxn();
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement("update mshost set state=?, last_update=? where id=? and runid=?");
pstmt = txn.prepareAutoCloseStatement("update mshost set state=?, last_update=? where id=? and runid=?");
pstmt.setString(1, state.toString());
pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), lastUpdate));
pstmt.setLong(3, id);
pstmt.setLong(4, runId);
int count = pstmt.executeUpdate();
conn.commit();
if(count < 1) {
throw new CloudRuntimeException("Invalid cluster session detected", new ClusterInvalidSessionException("runid " + runId + " is no longer valid"));
}
} catch (SQLException e) {
throw new CloudRuntimeException("DB exception on " + pstmt.toString(), e);
} finally {
if(pstmt != null) {
try {
pstmt.close();
} catch(Exception e) {
s_logger.warn("Unable to close prepared statement due to exception ", e);
}
}
}
}

View File

@ -37,9 +37,9 @@ import java.lang.annotation.Target;
* txn.commit();
*
* 2. Annotate methods that uses a DAO's acquire method.
* _dao.acquire(id);
* _dao.acquireInLockTable(id);
* ...
* _dao.release(id);
* _dao.releaseFromLockTable(id);
*
* 3. Annotate methods that are inside a DAO but doesn't use
* the Transaction class. Generally, these are methods

View File

@ -279,7 +279,7 @@ public abstract class GenericDaoBase<T, ID extends Serializable> implements Gene
return createForUpdate(null);
}
@Override
@Override @DB(txn=false)
public <K> K getNextInSequence(final Class<K> clazz, final String name) {
final TableGenerator tg = _tgs.get(name);
assert (tg != null) : "Couldn't find Table generator using " + name;
@ -377,7 +377,7 @@ public abstract class GenericDaoBase<T, ID extends Serializable> implements Gene
}
}
@Override @SuppressWarnings("unchecked") @DB
@Override @SuppressWarnings("unchecked")
public <M> List<M> customSearchIncludingRemoved(SearchCriteria<M> sc, final Filter filter) {
String clause = sc != null ? sc.getWhereClause() : null;
if (clause != null && clause.length() == 0) {
@ -733,7 +733,6 @@ public abstract class GenericDaoBase<T, ID extends Serializable> implements Gene
return rowsUpdated;
}
// @Override
public int update(UpdateBuilder ub, final SearchCriteria<?> sc, Integer rows) {
StringBuilder sql = null;
PreparedStatement pstmt = null;

View File

@ -97,7 +97,7 @@ public class Transaction {
private String _name;
private Connection _conn;
private boolean _txn;
private final short _dbId;
private short _dbId;
private long _txnTime;
private Statement _stmt;
private String _creator;
@ -125,17 +125,16 @@ public class Transaction {
// transaction context in the stack. It is used in special use cases that we want to control DB connection explicitly and in the mean time utilize
// the existing DAO features
//
public static Transaction openNew(final String name, Connection conn) {
assert(conn != null);
Transaction txn = new Transaction(name, false, CONNECTED_DB);
txn._conn = conn;
txn._prev = tls.get();
tls.set(txn);
txn.takeOver(name, true);
s_logger.debug("Registering txn" + txn.getId());
s_mbean.addTransaction(txn);
return txn;
public void transitToUserManagedConnection(Connection conn) {
assert(_conn == null && _stack.size() <= 1) : "Can't change to a user managed connection unless the stack is empty and the db connection is null: " + toString();
_conn = conn;
_dbId = CONNECTED_DB;
}
public void transitToAutoManagedConnection(short dbId) {
assert(_stack.size() == 0) : "Can't change to auto managed connection unless your stack is empty";
_dbId = dbId;
_conn = null;
}
public static Transaction open(final String name) {
@ -166,7 +165,6 @@ public class Transaction {
txn.takeOver(name, false);
if (isNew) {
s_logger.debug("Registering txn" + txn.getId());
s_mbean.addTransaction(txn);
}
return txn;
@ -611,7 +609,6 @@ public class Transaction {
if(this._dbId == CONNECTED_DB) {
tls.set(_prev);
_prev = null;
s_logger.debug("Unregistering txn" + getId());
s_mbean.removeTransaction(this);
}
}

View File

@ -18,14 +18,17 @@
package com.cloud.utils.db;
import java.util.List;
import java.util.Map;
public interface TransactionMBean {
int getTransactionCount();
int getActiveTransactionCount();
int[] getActiveTransactionCount();
List<String> getTransactions();
List<Map<String, String>> getTransactions();
List<String> getActiveTransactions();
List<Map<String, String>> getActiveTransactions();
List<Map<String, String>> getTransactionsWithDatabaseConnection();
}

View File

@ -17,7 +17,9 @@
*/
package com.cloud.utils.db;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -48,49 +50,65 @@ public class TransactionMBeanImpl extends StandardMBean implements TransactionMB
}
@Override
public int getActiveTransactionCount() {
int count = 0;
public int[] getActiveTransactionCount() {
int[] count = new int[2];
count[0] = 0;
count[1] = 0;
for (Transaction txn : _txns.values()) {
if (txn.getStack().size() > 0) {
count++;
count[0]++;
}
if (txn.getCurrentConnection() != null) {
count[1]++;
}
}
return count;
}
@Override
public List<String> getTransactions() {
ArrayList<String> txns = new ArrayList<String>();
public List<Map<String, String>> getTransactions() {
ArrayList<Map<String, String>> txns = new ArrayList<Map<String, String>>();
for (Transaction info : _txns.values()) {
txns.add(toString(info));
txns.add(toMap(info));
}
return txns;
}
@Override
public List<String> getActiveTransactions() {
ArrayList<String> txns = new ArrayList<String>();
public List<Map<String, String>> getActiveTransactions() {
ArrayList<Map<String, String>> txns = new ArrayList<Map<String, String>>();
for (Transaction txn : _txns.values()) {
if (txn.getStack().size() > 0 || txn.getCurrentConnection() != null) {
txns.add(toString(txn));
txns.add(toMap(txn));
}
}
return txns;
}
protected String toString(Transaction txn) {
StringBuilder buff = new StringBuilder("[Name=");
buff.append(txn.getName());
buff.append("; Creator=");
buff.append(txn.getCreator());
buff.append("; DB=");
buff.append(txn.getCurrentConnection());
buff.append("; Stack=");
protected Map<String, String> toMap(Transaction txn) {
Map<String, String> map = new HashMap<String, String>();
map.put("name", txn.getName());
map.put("id", Long.toString(txn.getId()));
map.put("creator", txn.getCreator());
Connection conn = txn.getCurrentConnection();
map.put("db", conn != null ? Integer.toString(System.identityHashCode(conn)) : "none");
StringBuilder buff = new StringBuilder();
for (StackElement element : txn.getStack()) {
buff.append(",").append(element.toString());
buff.append(element.toString()).append(",");
}
buff.append("]");
map.put("stack", buff.toString());
return buff.toString();
return map;
}
@Override
public List<Map<String, String>> getTransactionsWithDatabaseConnection() {
ArrayList<Map<String, String>> txns = new ArrayList<Map<String, String>>();
for (Transaction txn : _txns.values()) {
if (txn.getCurrentConnection() != null) {
txns.add(toMap(txn));
}
}
return txns;
}
}