mirror of
				https://github.com/apache/cloudstack.git
				synced 2025-11-04 00:02:37 +01:00 
			
		
		
		
	bug 10816: more db lock controls
This commit is contained in:
		
							parent
							
								
									5cfad0b6c9
								
							
						
					
					
						commit
						d7667180c5
					
				@ -68,6 +68,7 @@ import com.cloud.utils.component.Adapters;
 | 
			
		||||
import com.cloud.utils.component.ComponentLocator;
 | 
			
		||||
import com.cloud.utils.component.Inject;
 | 
			
		||||
import com.cloud.utils.concurrency.NamedThreadFactory;
 | 
			
		||||
import com.cloud.utils.db.ConnectionConcierge;
 | 
			
		||||
import com.cloud.utils.db.DB;
 | 
			
		||||
import com.cloud.utils.db.Transaction;
 | 
			
		||||
import com.cloud.utils.events.SubscriptionMgr;
 | 
			
		||||
@ -102,7 +103,7 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
			
		||||
    private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-Heartbeat"));
 | 
			
		||||
    private final ExecutorService _notificationExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("Cluster-Notification"));
 | 
			
		||||
    private final List<ClusterManagerMessage> _notificationMsgs = new ArrayList<ClusterManagerMessage>();
 | 
			
		||||
    private Connection _heartbeatConnection = null;
 | 
			
		||||
    private ConnectionConcierge _heartbeatConnection = null;
 | 
			
		||||
 | 
			
		||||
    private final ExecutorService _executor;
 | 
			
		||||
 | 
			
		||||
@ -647,23 +648,20 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private Connection getHeartbeatConnection() throws SQLException {
 | 
			
		||||
        if(_heartbeatConnection != null) {
 | 
			
		||||
            return _heartbeatConnection;
 | 
			
		||||
        if(_heartbeatConnection == null) {
 | 
			
		||||
            Connection conn = Transaction.getStandaloneConnectionWithException();
 | 
			
		||||
            _heartbeatConnection = new ConnectionConcierge("ClusterManagerHeartBeat", conn, false, false);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        _heartbeatConnection = Transaction.getStandaloneConnectionWithException();
 | 
			
		||||
        return _heartbeatConnection;
 | 
			
		||||
        return _heartbeatConnection.conn();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private void invalidHeartbeatConnection() {
 | 
			
		||||
        if(_heartbeatConnection != null) {
 | 
			
		||||
            try {
 | 
			
		||||
                _heartbeatConnection.close();
 | 
			
		||||
            } catch (SQLException e) {
 | 
			
		||||
                s_logger.warn("Unable to close hearbeat DB connection. ", e);
 | 
			
		||||
            Connection conn = Transaction.getStandaloneConnection();
 | 
			
		||||
            if (conn != null) {
 | 
			
		||||
                _heartbeatConnection.reset(Transaction.getStandaloneConnection());
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            _heartbeatConnection = null;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -2601,7 +2601,7 @@ public class StorageManagerImpl implements StorageManager, StorageService, Manag
 | 
			
		||||
            }
 | 
			
		||||
            if (assignedPool != null) {
 | 
			
		||||
                Volume.State state = vol.getState();
 | 
			
		||||
                if (state == Volume.State.Allocated) {
 | 
			
		||||
                if (state == Volume.State.Allocated || state == Volume.State.Creating) {
 | 
			
		||||
                    recreateVols.add(vol);
 | 
			
		||||
                } else {
 | 
			
		||||
                    if (vol.isRecreatable()) {
 | 
			
		||||
 | 
			
		||||
@ -27,16 +27,12 @@ import java.util.LinkedList;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.TimeZone;
 | 
			
		||||
import java.util.concurrent.Executors;
 | 
			
		||||
import java.util.concurrent.ScheduledExecutorService;
 | 
			
		||||
import java.util.concurrent.TimeUnit;
 | 
			
		||||
 | 
			
		||||
import javax.management.StandardMBean;
 | 
			
		||||
 | 
			
		||||
import org.apache.log4j.Logger;
 | 
			
		||||
 | 
			
		||||
import com.cloud.utils.DateUtil;
 | 
			
		||||
import com.cloud.utils.concurrency.NamedThreadFactory;
 | 
			
		||||
import com.cloud.utils.exception.CloudRuntimeException;
 | 
			
		||||
import com.cloud.utils.mgmt.JmxUtil;
 | 
			
		||||
import com.cloud.utils.time.InaccurateClock;
 | 
			
		||||
@ -62,15 +58,18 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
 | 
			
		||||
    private final long _msId;
 | 
			
		||||
 | 
			
		||||
    private static Merovingian2 s_instance = null;
 | 
			
		||||
    ScheduledExecutorService _executor = null;
 | 
			
		||||
    Connection _conn = null;
 | 
			
		||||
    private ConnectionConcierge _concierge = null;
 | 
			
		||||
 | 
			
		||||
    private Merovingian2(long msId) {
 | 
			
		||||
        super(MerovingianMBean.class, false);
 | 
			
		||||
        _msId = msId;
 | 
			
		||||
        String result = resetDbConnection();
 | 
			
		||||
        if (!result.equalsIgnoreCase("Success")) {
 | 
			
		||||
            throw new CloudRuntimeException("Unable to initialize a connection to the database for locking purposes due to " + result);
 | 
			
		||||
        Connection conn = null;
 | 
			
		||||
        try {
 | 
			
		||||
            conn = Transaction.getStandaloneConnectionWithException();
 | 
			
		||||
            _concierge = new ConnectionConcierge("LockMaster", conn, true, true);
 | 
			
		||||
        } catch (SQLException e) {
 | 
			
		||||
            s_logger.error("Unable to get a new db connection", e);
 | 
			
		||||
            throw new CloudRuntimeException("Unable to initialize a connection to the database for locking purposes: ", e);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -90,44 +89,6 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
 | 
			
		||||
        return s_instance;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public String resetDbConnection() {
 | 
			
		||||
        s_logger.info("Resetting the database connection for locks");
 | 
			
		||||
        if (_conn != null) {
 | 
			
		||||
            try {
 | 
			
		||||
                _conn.close();
 | 
			
		||||
            } catch (Throwable th) {
 | 
			
		||||
                s_logger.error("Unable to close connection", th);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        try {
 | 
			
		||||
            _conn = Transaction.getStandaloneConnectionWithException();
 | 
			
		||||
            _conn.setAutoCommit(true);
 | 
			
		||||
            _conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
 | 
			
		||||
        } catch (SQLException e) {
 | 
			
		||||
            s_logger.error("Unable to get a new db connection", e);
 | 
			
		||||
            return "Unable to initialize a connection to the database for locking purposes: " + e;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (_conn == null) {
 | 
			
		||||
            return "Unable to initialize a connection to the database for locking purposes, shutdown this server!";
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (_executor != null) {
 | 
			
		||||
            try {
 | 
			
		||||
                _executor.shutdown();
 | 
			
		||||
            } catch (Throwable th) {
 | 
			
		||||
                s_logger.error("Unable to shutdown the executor", th);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        _executor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("LockMasterConnectionKeepAlive"));
 | 
			
		||||
        _executor.schedule(new KeepAliveTask(), 10, TimeUnit.SECONDS);
 | 
			
		||||
 | 
			
		||||
        return "Success";
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public boolean acquire(String key, int timeInSeconds) {
 | 
			
		||||
        Thread th = Thread.currentThread();
 | 
			
		||||
        String threadName = th.getName();
 | 
			
		||||
@ -162,7 +123,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
 | 
			
		||||
    protected boolean increment(String key, String threadName, int threadId) {
 | 
			
		||||
        PreparedStatement pstmt = null;
 | 
			
		||||
        try {
 | 
			
		||||
            pstmt = _conn.prepareStatement(INCREMENT_SQL);
 | 
			
		||||
            pstmt = _concierge.conn().prepareStatement(INCREMENT_SQL);
 | 
			
		||||
            pstmt.setString(1, key);
 | 
			
		||||
            pstmt.setLong(2, _msId);
 | 
			
		||||
            pstmt.setString(3, threadName);
 | 
			
		||||
@ -190,7 +151,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
 | 
			
		||||
 | 
			
		||||
        long startTime = InaccurateClock.getTime();
 | 
			
		||||
        try {
 | 
			
		||||
            pstmt = _conn.prepareStatement(ACQUIRE_SQL);
 | 
			
		||||
            pstmt = _concierge.conn().prepareStatement(ACQUIRE_SQL);
 | 
			
		||||
            pstmt.setString(1, key);
 | 
			
		||||
            pstmt.setLong(2, _msId);
 | 
			
		||||
            pstmt.setString(3, threadName);
 | 
			
		||||
@ -228,7 +189,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
 | 
			
		||||
        PreparedStatement pstmt = null;
 | 
			
		||||
        ResultSet rs = null;
 | 
			
		||||
        try {
 | 
			
		||||
            pstmt = _conn.prepareStatement(INQUIRE_SQL);
 | 
			
		||||
            pstmt = _concierge.conn().prepareStatement(INQUIRE_SQL);
 | 
			
		||||
            pstmt.setString(1, key);
 | 
			
		||||
            rs = pstmt.executeQuery();
 | 
			
		||||
            if (!rs.next()) {
 | 
			
		||||
@ -261,8 +222,8 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
 | 
			
		||||
        s_logger.info("Cleaning up locks for " + msId);
 | 
			
		||||
        PreparedStatement pstmt = null;
 | 
			
		||||
        try {
 | 
			
		||||
            pstmt = _conn.prepareStatement(CLEANUP_MGMT_LOCKS_SQL);
 | 
			
		||||
            pstmt.setLong(1, msId);
 | 
			
		||||
            pstmt = _concierge.conn().prepareStatement(CLEANUP_MGMT_LOCKS_SQL);
 | 
			
		||||
            pstmt.setLong(1, _msId);
 | 
			
		||||
            int rows = pstmt.executeUpdate();
 | 
			
		||||
            s_logger.info("Released " + rows + " locks for " + msId);
 | 
			
		||||
        } catch (SQLException e) {
 | 
			
		||||
@ -283,7 +244,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
 | 
			
		||||
        String threadName = th.getName();
 | 
			
		||||
        int threadId = System.identityHashCode(th);
 | 
			
		||||
        try {
 | 
			
		||||
            pstmt = _conn.prepareStatement(DECREMENT_SQL);
 | 
			
		||||
            pstmt = _concierge.conn().prepareStatement(DECREMENT_SQL);
 | 
			
		||||
            pstmt.setString(1, key);
 | 
			
		||||
            pstmt.setLong(2, _msId);
 | 
			
		||||
            pstmt.setString(3, threadName);
 | 
			
		||||
@ -295,7 +256,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
 | 
			
		||||
            }
 | 
			
		||||
            if (rows == 1) {
 | 
			
		||||
                pstmt.close();
 | 
			
		||||
                pstmt = _conn.prepareStatement(RELEASE_SQL);
 | 
			
		||||
                pstmt = _concierge.conn().prepareStatement(RELEASE_SQL);
 | 
			
		||||
                pstmt.setString(1, key);
 | 
			
		||||
                pstmt.setLong(2, _msId);
 | 
			
		||||
                int result = pstmt.executeUpdate();
 | 
			
		||||
@ -340,7 +301,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
 | 
			
		||||
        PreparedStatement pstmt = null;
 | 
			
		||||
        ResultSet rs = null;
 | 
			
		||||
        try {
 | 
			
		||||
            pstmt = _conn.prepareStatement(sql);
 | 
			
		||||
            pstmt = _concierge.conn().prepareStatement(sql);
 | 
			
		||||
            if (msId != null) {
 | 
			
		||||
                pstmt.setLong(1, msId);
 | 
			
		||||
            }
 | 
			
		||||
@ -388,7 +349,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
 | 
			
		||||
        PreparedStatement pstmt = null;
 | 
			
		||||
        ResultSet rs = null;
 | 
			
		||||
        try {
 | 
			
		||||
            pstmt = _conn.prepareStatement(SELECT_THREAD_LOCKS_SQL);
 | 
			
		||||
            pstmt = _concierge.conn().prepareStatement(SELECT_THREAD_LOCKS_SQL);
 | 
			
		||||
            pstmt.setLong(1, msId);
 | 
			
		||||
            pstmt.setString(2, threadName);
 | 
			
		||||
            rs = pstmt.executeQuery();
 | 
			
		||||
@ -415,7 +376,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
 | 
			
		||||
 | 
			
		||||
        PreparedStatement pstmt = null;
 | 
			
		||||
        try {
 | 
			
		||||
            pstmt = _conn.prepareStatement(CLEANUP_THREAD_LOCKS_SQL);
 | 
			
		||||
            pstmt = _concierge.conn().prepareStatement(CLEANUP_THREAD_LOCKS_SQL);
 | 
			
		||||
            pstmt.setLong(1, _msId);
 | 
			
		||||
            pstmt.setString(2, threadName);
 | 
			
		||||
            pstmt.setInt(3, threadId);
 | 
			
		||||
@ -438,7 +399,7 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
 | 
			
		||||
        s_logger.info("Releasing a lock from jMX lck-" + key);
 | 
			
		||||
        PreparedStatement pstmt = null;
 | 
			
		||||
        try {
 | 
			
		||||
            pstmt = _conn.prepareStatement(RELEASE_LOCK_SQL);
 | 
			
		||||
            pstmt = _concierge.conn().prepareStatement(RELEASE_LOCK_SQL);
 | 
			
		||||
            pstmt.setString(1, key);
 | 
			
		||||
            int rows = pstmt.executeUpdate();
 | 
			
		||||
            return rows > 0;
 | 
			
		||||
@ -447,25 +408,4 @@ public class Merovingian2 extends StandardMBean implements MerovingianMBean {
 | 
			
		||||
            return false;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    protected class KeepAliveTask implements Runnable {
 | 
			
		||||
        @Override
 | 
			
		||||
        public void run() {
 | 
			
		||||
            PreparedStatement pstmt = null;  // Should this even be prepared everytime?
 | 
			
		||||
            try {
 | 
			
		||||
                pstmt = _conn.prepareStatement("SELECT 1");
 | 
			
		||||
                pstmt.executeQuery();
 | 
			
		||||
            } catch (Throwable th) {
 | 
			
		||||
                s_logger.error("Unable to keep the db connection alive for locking purposes!", th);
 | 
			
		||||
            } finally {
 | 
			
		||||
                if (pstmt != null) {
 | 
			
		||||
                    try {
 | 
			
		||||
                        pstmt.close();
 | 
			
		||||
                    } catch (SQLException e) {
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -29,7 +29,5 @@ public interface MerovingianMBean {
 | 
			
		||||
    
 | 
			
		||||
    boolean releaseLockAsLastResortAndIReallyKnowWhatIAmDoing(String key);
 | 
			
		||||
    
 | 
			
		||||
    String resetDbConnection();
 | 
			
		||||
 | 
			
		||||
    void cleanupForServer(long msId);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user