mirror of
				https://github.com/apache/cloudstack.git
				synced 2025-11-04 00:02:37 +01:00 
			
		
		
		
	bug 12709: incremental fix - profiling management server clustering heartbeat activities
This commit is contained in:
		
							parent
							
								
									ca189f661f
								
							
						
					
					
						commit
						323a07d7e2
					
				@ -93,14 +93,14 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
    private static final int EXECUTOR_SHUTDOWN_TIMEOUT = 1000; // 1 second
 | 
					    private static final int EXECUTOR_SHUTDOWN_TIMEOUT = 1000; // 1 second
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private final List<ClusterManagerListener> listeners = new ArrayList<ClusterManagerListener>();
 | 
					    private final List<ClusterManagerListener> _listeners = new ArrayList<ClusterManagerListener>();
 | 
				
			||||||
    private final Map<Long, ManagementServerHostVO> activePeers = new HashMap<Long, ManagementServerHostVO>();
 | 
					    private final Map<Long, ManagementServerHostVO> _activePeers = new HashMap<Long, ManagementServerHostVO>();
 | 
				
			||||||
    private int heartbeatInterval = ClusterManager.DEFAULT_HEARTBEAT_INTERVAL;
 | 
					    private int _heartbeatInterval = ClusterManager.DEFAULT_HEARTBEAT_INTERVAL;
 | 
				
			||||||
    private int heartbeatThreshold = ClusterManager.DEFAULT_HEARTBEAT_THRESHOLD;
 | 
					    private int _heartbeatThreshold = ClusterManager.DEFAULT_HEARTBEAT_THRESHOLD;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private final Map<String, ClusterService> clusterPeers;
 | 
					    private final Map<String, ClusterService> _clusterPeers;
 | 
				
			||||||
    private final Map<String, Listener> asyncCalls;
 | 
					    private final Map<String, Listener> _asyncCalls;
 | 
				
			||||||
    private final Gson gson;
 | 
					    private final Gson _gson;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Inject
 | 
					    @Inject
 | 
				
			||||||
    private AgentManager _agentMgr;
 | 
					    private AgentManager _agentMgr;
 | 
				
			||||||
@ -141,10 +141,10 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
    
 | 
					    
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
    public ClusterManagerImpl() {
 | 
					    public ClusterManagerImpl() {
 | 
				
			||||||
        clusterPeers = new HashMap<String, ClusterService>();
 | 
					        _clusterPeers = new HashMap<String, ClusterService>();
 | 
				
			||||||
        asyncCalls = new HashMap<String, Listener>();
 | 
					        _asyncCalls = new HashMap<String, Listener>();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        gson = GsonHelper.getGson();
 | 
					        _gson = GsonHelper.getGson();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        // executor to perform remote-calls in another thread context, to avoid potential
 | 
					        // executor to perform remote-calls in another thread context, to avoid potential
 | 
				
			||||||
        // recursive remote calls between nodes
 | 
					        // recursive remote calls between nodes
 | 
				
			||||||
@ -216,7 +216,7 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
    public void broadcast(long agentId, Command[] cmds) {
 | 
					    public void broadcast(long agentId, Command[] cmds) {
 | 
				
			||||||
        Date cutTime = DateUtil.currentGMTTime();
 | 
					        Date cutTime = DateUtil.currentGMTTime();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        List<ManagementServerHostVO> peers = _mshostDao.getActiveList(new Date(cutTime.getTime() - heartbeatThreshold));
 | 
					        List<ManagementServerHostVO> peers = _mshostDao.getActiveList(new Date(cutTime.getTime() - _heartbeatThreshold));
 | 
				
			||||||
        for (ManagementServerHostVO peer : peers) {
 | 
					        for (ManagementServerHostVO peer : peers) {
 | 
				
			||||||
            String peerName = Long.toString(peer.getMsid());
 | 
					            String peerName = Long.toString(peer.getMsid());
 | 
				
			||||||
            if (getSelfPeerName().equals(peerName)) {
 | 
					            if (getSelfPeerName().equals(peerName)) {
 | 
				
			||||||
@ -239,7 +239,7 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        if(s_logger.isDebugEnabled()) {
 | 
					        if(s_logger.isDebugEnabled()) {
 | 
				
			||||||
            s_logger.debug(getSelfPeerName() + " -> " + strPeer + "." + agentId + " " +
 | 
					            s_logger.debug(getSelfPeerName() + " -> " + strPeer + "." + agentId + " " +
 | 
				
			||||||
                    gson.toJson(cmds, Command[].class));
 | 
					                    _gson.toJson(cmds, Command[].class));
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        for(int i = 0; i < 2; i++) {
 | 
					        for(int i = 0; i < 2; i++) {
 | 
				
			||||||
@ -256,7 +256,7 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
                    }
 | 
					                    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    long startTick = System.currentTimeMillis();
 | 
					                    long startTick = System.currentTimeMillis();
 | 
				
			||||||
                    String strResult = peerService.execute(getSelfPeerName(), agentId, gson.toJson(cmds, Command[].class), stopOnError);
 | 
					                    String strResult = peerService.execute(getSelfPeerName(), agentId, _gson.toJson(cmds, Command[].class), stopOnError);
 | 
				
			||||||
                    if(s_logger.isDebugEnabled()) {
 | 
					                    if(s_logger.isDebugEnabled()) {
 | 
				
			||||||
                        s_logger.debug("Completed " + getSelfPeerName() + " -> " + strPeer + "." + agentId + "in " +
 | 
					                        s_logger.debug("Completed " + getSelfPeerName() + " -> " + strPeer + "." + agentId + "in " +
 | 
				
			||||||
                                (System.currentTimeMillis() - startTick) + " ms, result: " + strResult);
 | 
					                                (System.currentTimeMillis() - startTick) + " ms, result: " + strResult);
 | 
				
			||||||
@ -264,7 +264,7 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
                    if(strResult != null) {
 | 
					                    if(strResult != null) {
 | 
				
			||||||
                        try {
 | 
					                        try {
 | 
				
			||||||
                            return gson.fromJson(strResult, Answer[].class);
 | 
					                            return _gson.fromJson(strResult, Answer[].class);
 | 
				
			||||||
                        } catch(Throwable e) {
 | 
					                        } catch(Throwable e) {
 | 
				
			||||||
                            s_logger.error("Exception on parsing gson package from remote call to " + strPeer);
 | 
					                            s_logger.error("Exception on parsing gson package from remote call to " + strPeer);
 | 
				
			||||||
                        }
 | 
					                        }
 | 
				
			||||||
@ -289,7 +289,7 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        if(s_logger.isDebugEnabled()) {
 | 
					        if(s_logger.isDebugEnabled()) {
 | 
				
			||||||
            s_logger.debug("Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " " +
 | 
					            s_logger.debug("Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " " +
 | 
				
			||||||
                    gson.toJson(cmds, Command[].class));
 | 
					                    _gson.toJson(cmds, Command[].class));
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        for(int i = 0; i < 2; i++) {
 | 
					        for(int i = 0; i < 2; i++) {
 | 
				
			||||||
@ -307,7 +307,7 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
                        }
 | 
					                        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        long startTick = System.currentTimeMillis();
 | 
					                        long startTick = System.currentTimeMillis();
 | 
				
			||||||
                        seq = peerService.executeAsync(getSelfPeerName(), agentId, gson.toJson(cmds, Command[].class), stopOnError);
 | 
					                        seq = peerService.executeAsync(getSelfPeerName(), agentId, _gson.toJson(cmds, Command[].class), stopOnError);
 | 
				
			||||||
                        if(seq > 0) {
 | 
					                        if(seq > 0) {
 | 
				
			||||||
                            if(s_logger.isDebugEnabled()) {
 | 
					                            if(s_logger.isDebugEnabled()) {
 | 
				
			||||||
                                s_logger.debug("Completed Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId
 | 
					                                s_logger.debug("Completed Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId
 | 
				
			||||||
@ -339,7 +339,7 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
    public boolean onAsyncResult(String executingPeer, long agentId, long seq, Answer[] answers) {
 | 
					    public boolean onAsyncResult(String executingPeer, long agentId, long seq, Answer[] answers) {
 | 
				
			||||||
        if(s_logger.isDebugEnabled()) {
 | 
					        if(s_logger.isDebugEnabled()) {
 | 
				
			||||||
            s_logger.debug("Process Async-call result from remote peer " + executingPeer + ", {" +
 | 
					            s_logger.debug("Process Async-call result from remote peer " + executingPeer + ", {" +
 | 
				
			||||||
                    agentId + "-" + seq + "} answers: " + (answers != null ? gson.toJson(answers, Answer[].class): "null"));
 | 
					                    agentId + "-" + seq + "} answers: " + (answers != null ? _gson.toJson(answers, Answer[].class): "null"));
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        Listener listener = null;
 | 
					        Listener listener = null;
 | 
				
			||||||
@ -390,7 +390,7 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
    public boolean forwardAnswer(String targetPeer, long agentId, long seq, Answer[] answers) {
 | 
					    public boolean forwardAnswer(String targetPeer, long agentId, long seq, Answer[] answers) {
 | 
				
			||||||
        if(s_logger.isDebugEnabled()) {
 | 
					        if(s_logger.isDebugEnabled()) {
 | 
				
			||||||
            s_logger.debug("Forward -> " + targetPeer + " Async-call answer {" + agentId + "-" + seq +
 | 
					            s_logger.debug("Forward -> " + targetPeer + " Async-call answer {" + agentId + "-" + seq +
 | 
				
			||||||
                    "} " + (answers != null? gson.toJson(answers, Answer[].class):""));
 | 
					                    "} " + (answers != null? _gson.toJson(answers, Answer[].class):""));
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        final String targetPeerF = targetPeer;
 | 
					        final String targetPeerF = targetPeer;
 | 
				
			||||||
@ -416,7 +416,7 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
                        s_logger.debug("Start forwarding Async-call answer {" + agentId + "-" + seq + "} to remote");
 | 
					                        s_logger.debug("Start forwarding Async-call answer {" + agentId + "-" + seq + "} to remote");
 | 
				
			||||||
                    }
 | 
					                    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    result = peerService.onAsyncResult(getSelfPeerName(), agentIdF, seqF, gson.toJson(answersF, Answer[].class));
 | 
					                    result = peerService.onAsyncResult(getSelfPeerName(), agentIdF, seqF, _gson.toJson(answersF, Answer[].class));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    if(s_logger.isDebugEnabled()) {
 | 
					                    if(s_logger.isDebugEnabled()) {
 | 
				
			||||||
                        s_logger.debug("Completed forwarding Async-call answer {" + agentId + "-" + seq + "} in " +
 | 
					                        s_logger.debug("Completed forwarding Async-call answer {" + agentId + "-" + seq + "} in " +
 | 
				
			||||||
@ -469,20 +469,20 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    public void registerListener(ClusterManagerListener listener) {
 | 
					    public void registerListener(ClusterManagerListener listener) {
 | 
				
			||||||
        // Note : we don't check duplicates
 | 
					        // Note : we don't check duplicates
 | 
				
			||||||
        synchronized (listeners) {
 | 
					        synchronized (_listeners) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    		s_logger.info("register cluster listener " + listener.getClass());
 | 
					    		s_logger.info("register cluster listener " + listener.getClass());
 | 
				
			||||||
    		
 | 
					    		
 | 
				
			||||||
        	listeners.add(listener);
 | 
					        	_listeners.add(listener);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    public void unregisterListener(ClusterManagerListener listener) {
 | 
					    public void unregisterListener(ClusterManagerListener listener) {
 | 
				
			||||||
        synchronized(listeners) {
 | 
					        synchronized(_listeners) {
 | 
				
			||||||
    		s_logger.info("unregister cluster listener " + listener.getClass());
 | 
					    		s_logger.info("unregister cluster listener " + listener.getClass());
 | 
				
			||||||
        	
 | 
					        	
 | 
				
			||||||
        	listeners.remove(listener);
 | 
					        	_listeners.remove(listener);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -495,8 +495,8 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        synchronized(listeners) {
 | 
					        synchronized(_listeners) {
 | 
				
			||||||
            for(ClusterManagerListener listener : listeners) {
 | 
					            for(ClusterManagerListener listener : _listeners) {
 | 
				
			||||||
                listener.onManagementNodeJoined(nodeList, _mshostId);
 | 
					                listener.onManagementNodeJoined(nodeList, _mshostId);
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
@ -514,8 +514,8 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        synchronized(listeners) {
 | 
					        synchronized(_listeners) {
 | 
				
			||||||
            for(ClusterManagerListener listener : listeners) {
 | 
					            for(ClusterManagerListener listener : _listeners) {
 | 
				
			||||||
                listener.onManagementNodeLeft(nodeList, _mshostId);
 | 
					                listener.onManagementNodeLeft(nodeList, _mshostId);
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
@ -528,28 +528,28 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
        if(s_logger.isDebugEnabled())
 | 
					        if(s_logger.isDebugEnabled())
 | 
				
			||||||
            s_logger.debug("Notify management server node isolation to listeners");
 | 
					            s_logger.debug("Notify management server node isolation to listeners");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        synchronized(listeners) {
 | 
					        synchronized(_listeners) {
 | 
				
			||||||
            for(ClusterManagerListener listener : listeners) {
 | 
					            for(ClusterManagerListener listener : _listeners) {
 | 
				
			||||||
                listener.onManagementNodeIsolated();
 | 
					                listener.onManagementNodeIsolated();
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public ClusterService getPeerService(String strPeer) throws RemoteException {
 | 
					    public ClusterService getPeerService(String strPeer) throws RemoteException {
 | 
				
			||||||
        synchronized(clusterPeers) {
 | 
					        synchronized(_clusterPeers) {
 | 
				
			||||||
            if(clusterPeers.containsKey(strPeer)) {
 | 
					            if(_clusterPeers.containsKey(strPeer)) {
 | 
				
			||||||
                return clusterPeers.get(strPeer);
 | 
					                return _clusterPeers.get(strPeer);
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        ClusterService service = _currentServiceAdapter.getPeerService(strPeer);
 | 
					        ClusterService service = _currentServiceAdapter.getPeerService(strPeer);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if(service != null) {
 | 
					        if(service != null) {
 | 
				
			||||||
            synchronized(clusterPeers) {
 | 
					            synchronized(_clusterPeers) {
 | 
				
			||||||
                // re-check the peer map again to deal with the
 | 
					                // re-check the peer map again to deal with the
 | 
				
			||||||
                // race conditions
 | 
					                // race conditions
 | 
				
			||||||
                if(!clusterPeers.containsKey(strPeer)) {
 | 
					                if(!_clusterPeers.containsKey(strPeer)) {
 | 
				
			||||||
                    clusterPeers.put(strPeer, service);
 | 
					                    _clusterPeers.put(strPeer, service);
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
@ -558,9 +558,9 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public void invalidatePeerService(String strPeer) {
 | 
					    public void invalidatePeerService(String strPeer) {
 | 
				
			||||||
        synchronized(clusterPeers) {
 | 
					        synchronized(_clusterPeers) {
 | 
				
			||||||
            if(clusterPeers.containsKey(strPeer)) {
 | 
					            if(_clusterPeers.containsKey(strPeer)) {
 | 
				
			||||||
                clusterPeers.remove(strPeer);
 | 
					                _clusterPeers.remove(strPeer);
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@ -568,9 +568,9 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
    private void registerAsyncCall(String strPeer, long seq, Listener listener) {
 | 
					    private void registerAsyncCall(String strPeer, long seq, Listener listener) {
 | 
				
			||||||
        String key = strPeer + "/" + seq;
 | 
					        String key = strPeer + "/" + seq;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        synchronized(asyncCalls) {
 | 
					        synchronized(_asyncCalls) {
 | 
				
			||||||
            if(!asyncCalls.containsKey(key)) {
 | 
					            if(!_asyncCalls.containsKey(key)) {
 | 
				
			||||||
                asyncCalls.put(key, listener);
 | 
					                _asyncCalls.put(key, listener);
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@ -578,9 +578,9 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
    private Listener getAsyncCallListener(String strPeer, long seq) {
 | 
					    private Listener getAsyncCallListener(String strPeer, long seq) {
 | 
				
			||||||
        String key = strPeer + "/" + seq;
 | 
					        String key = strPeer + "/" + seq;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        synchronized(asyncCalls) {
 | 
					        synchronized(_asyncCalls) {
 | 
				
			||||||
            if(asyncCalls.containsKey(key)) {
 | 
					            if(_asyncCalls.containsKey(key)) {
 | 
				
			||||||
                return asyncCalls.get(key);
 | 
					                return _asyncCalls.get(key);
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -590,9 +590,9 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
    private void unregisterAsyncCall(String strPeer, long seq) {
 | 
					    private void unregisterAsyncCall(String strPeer, long seq) {
 | 
				
			||||||
        String key = strPeer + "/" + seq;
 | 
					        String key = strPeer + "/" + seq;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        synchronized(asyncCalls) {
 | 
					        synchronized(_asyncCalls) {
 | 
				
			||||||
            if(asyncCalls.containsKey(key)) {
 | 
					            if(_asyncCalls.containsKey(key)) {
 | 
				
			||||||
                asyncCalls.remove(key);
 | 
					                _asyncCalls.remove(key);
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@ -603,49 +603,72 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
            public void run() {
 | 
					            public void run() {
 | 
				
			||||||
                Transaction txn = Transaction.open("ClusterHeartBeat");
 | 
					                Transaction txn = Transaction.open("ClusterHeartBeat");
 | 
				
			||||||
                try {
 | 
					                try {
 | 
				
			||||||
                    txn.transitToUserManagedConnection(getHeartbeatConnection());
 | 
					                    Profiler profiler = new Profiler();
 | 
				
			||||||
                    if(s_logger.isTraceEnabled()) {
 | 
					                    Profiler profilerHeartbeatUpdate = new Profiler();
 | 
				
			||||||
                        s_logger.trace("Cluster manager heartbeat update, id:" + _mshostId);
 | 
					                    Profiler profilerPeerScan = new Profiler();
 | 
				
			||||||
                    }
 | 
					                    Profiler profilerAgentLB = new Profiler();
 | 
				
			||||||
 | 
					 | 
				
			||||||
                    _mshostDao.update(_mshostId, getCurrentRunId(), DateUtil.currentGMTTime());
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                    if (s_logger.isTraceEnabled()) {
 | 
					 | 
				
			||||||
                        s_logger.trace("Cluster manager peer-scan, id:" + _mshostId);
 | 
					 | 
				
			||||||
                    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                    if (!_peerScanInited) {
 | 
					 | 
				
			||||||
                        _peerScanInited = true;
 | 
					 | 
				
			||||||
                        initPeerScan();
 | 
					 | 
				
			||||||
                    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                    peerScan();
 | 
					 | 
				
			||||||
                    
 | 
					                    
 | 
				
			||||||
                    //initiate agent lb task will be scheduled and executed only once, and only when number of agents loaded exceeds _connectedAgentsThreshold
 | 
					                    try {
 | 
				
			||||||
                    if (_agentLBEnabled && !_agentLbHappened) {
 | 
					                        profiler.start();
 | 
				
			||||||
                    	SearchCriteriaService<HostVO, HostVO> sc = SearchCriteria2.create(HostVO.class);
 | 
					 | 
				
			||||||
                        sc.addAnd(sc.getEntity().getManagementServerId(), Op.NNULL);
 | 
					 | 
				
			||||||
                        sc.addAnd(sc.getEntity().getType(), Op.EQ, Host.Type.Routing);
 | 
					 | 
				
			||||||
                        List<HostVO> allManagedRoutingAgents = sc.list();
 | 
					 | 
				
			||||||
                        
 | 
					                        
 | 
				
			||||||
                        sc = SearchCriteria2.create(HostVO.class);
 | 
					                        profilerHeartbeatUpdate.start();
 | 
				
			||||||
                        sc.addAnd(sc.getEntity().getType(), Op.EQ, Host.Type.Routing);
 | 
					                        txn.transitToUserManagedConnection(getHeartbeatConnection());
 | 
				
			||||||
                        List<HostVO> allAgents = sc.list();
 | 
					                        if(s_logger.isTraceEnabled()) {
 | 
				
			||||||
                        double allHostsCount = allAgents.size();
 | 
					                            s_logger.trace("Cluster manager heartbeat update, id:" + _mshostId);
 | 
				
			||||||
                        double managedHostsCount = allManagedRoutingAgents.size();
 | 
					                        }
 | 
				
			||||||
                        if (allHostsCount > 0.0) {
 | 
					    
 | 
				
			||||||
                            double load = managedHostsCount/allHostsCount;
 | 
					                        _mshostDao.update(_mshostId, getCurrentRunId(), DateUtil.currentGMTTime());
 | 
				
			||||||
                            if (load >= _connectedAgentsThreshold) {
 | 
					                        profilerHeartbeatUpdate.stop();
 | 
				
			||||||
                                s_logger.debug("Scheduling agent rebalancing task as the average agent load " + load + " is more than the threshold " + _connectedAgentsThreshold);
 | 
					    
 | 
				
			||||||
                                _rebalanceService.scheduleRebalanceAgents();
 | 
					                        profilerPeerScan.start();
 | 
				
			||||||
                                _agentLbHappened = true;
 | 
					                        if (s_logger.isTraceEnabled()) {
 | 
				
			||||||
                            } else {
 | 
					                            s_logger.trace("Cluster manager peer-scan, id:" + _mshostId);
 | 
				
			||||||
                                s_logger.trace("Not scheduling agent rebalancing task as the averages load " + load + " is less than the threshold " + _connectedAgentsThreshold);
 | 
					                        }
 | 
				
			||||||
                            }
 | 
					    
 | 
				
			||||||
                        } 
 | 
					                        if (!_peerScanInited) {
 | 
				
			||||||
 | 
					                            _peerScanInited = true;
 | 
				
			||||||
 | 
					                            initPeerScan();
 | 
				
			||||||
 | 
					                        }
 | 
				
			||||||
 | 
					    
 | 
				
			||||||
 | 
					                        peerScan();
 | 
				
			||||||
 | 
					                        profilerPeerScan.stop();
 | 
				
			||||||
 | 
					                        
 | 
				
			||||||
 | 
					                        profilerAgentLB.start();
 | 
				
			||||||
 | 
					                        //initiate agent lb task will be scheduled and executed only once, and only when number of agents loaded exceeds _connectedAgentsThreshold
 | 
				
			||||||
 | 
					                        if (_agentLBEnabled && !_agentLbHappened) {
 | 
				
			||||||
 | 
					                            SearchCriteriaService<HostVO, HostVO> sc = SearchCriteria2.create(HostVO.class);
 | 
				
			||||||
 | 
					                            sc.addAnd(sc.getEntity().getManagementServerId(), Op.NNULL);
 | 
				
			||||||
 | 
					                            sc.addAnd(sc.getEntity().getType(), Op.EQ, Host.Type.Routing);
 | 
				
			||||||
 | 
					                            List<HostVO> allManagedRoutingAgents = sc.list();
 | 
				
			||||||
 | 
					                            
 | 
				
			||||||
 | 
					                            sc = SearchCriteria2.create(HostVO.class);
 | 
				
			||||||
 | 
					                            sc.addAnd(sc.getEntity().getType(), Op.EQ, Host.Type.Routing);
 | 
				
			||||||
 | 
					                            List<HostVO> allAgents = sc.list();
 | 
				
			||||||
 | 
					                            double allHostsCount = allAgents.size();
 | 
				
			||||||
 | 
					                            double managedHostsCount = allManagedRoutingAgents.size();
 | 
				
			||||||
 | 
					                            if (allHostsCount > 0.0) {
 | 
				
			||||||
 | 
					                                double load = managedHostsCount/allHostsCount;
 | 
				
			||||||
 | 
					                                if (load >= _connectedAgentsThreshold) {
 | 
				
			||||||
 | 
					                                    s_logger.debug("Scheduling agent rebalancing task as the average agent load " + load + " is more than the threshold " + _connectedAgentsThreshold);
 | 
				
			||||||
 | 
					                                    _rebalanceService.scheduleRebalanceAgents();
 | 
				
			||||||
 | 
					                                    _agentLbHappened = true;
 | 
				
			||||||
 | 
					                                } else {
 | 
				
			||||||
 | 
					                                    s_logger.trace("Not scheduling agent rebalancing task as the averages load " + load + " is less than the threshold " + _connectedAgentsThreshold);
 | 
				
			||||||
 | 
					                                }
 | 
				
			||||||
 | 
					                            } 
 | 
				
			||||||
 | 
					                        }
 | 
				
			||||||
 | 
					                        profilerAgentLB.stop();
 | 
				
			||||||
 | 
					                    } finally {
 | 
				
			||||||
 | 
					                        profiler.stop();
 | 
				
			||||||
 | 
					                        
 | 
				
			||||||
 | 
					                        if(profiler.getDuration() >= _heartbeatInterval) {
 | 
				
			||||||
 | 
					                            s_logger.warn("Management server heartbeat takes too long to finish. profiler: " + profiler.toString() + 
 | 
				
			||||||
 | 
					                                ", profilerHeartbeatUpdate: " + profilerHeartbeatUpdate.toString() +
 | 
				
			||||||
 | 
					                                ", profilerPeerScan: " + profilerPeerScan.toString() +
 | 
				
			||||||
 | 
					                                ", profilerAgentLB: " + profilerAgentLB.toString());
 | 
				
			||||||
 | 
					                        }
 | 
				
			||||||
                    }
 | 
					                    }
 | 
				
			||||||
                    
 | 
					                    
 | 
				
			||||||
                    
 | 
					 | 
				
			||||||
                } catch(CloudRuntimeException e) {
 | 
					                } catch(CloudRuntimeException e) {
 | 
				
			||||||
                    s_logger.error("Runtime DB exception ", e.getCause());
 | 
					                    s_logger.error("Runtime DB exception ", e.getCause());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -797,7 +820,7 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
        // upon startup, for all inactive management server nodes that we see at startup time, we will send notification also to help upper layer perform
 | 
					        // upon startup, for all inactive management server nodes that we see at startup time, we will send notification also to help upper layer perform
 | 
				
			||||||
        // missed cleanup
 | 
					        // missed cleanup
 | 
				
			||||||
        Date cutTime = DateUtil.currentGMTTime();
 | 
					        Date cutTime = DateUtil.currentGMTTime();
 | 
				
			||||||
        List<ManagementServerHostVO> inactiveList = _mshostDao.getInactiveList(new Date(cutTime.getTime() - heartbeatThreshold));
 | 
					        List<ManagementServerHostVO> inactiveList = _mshostDao.getInactiveList(new Date(cutTime.getTime() - _heartbeatThreshold));
 | 
				
			||||||
       
 | 
					       
 | 
				
			||||||
        // We don't have foreign key constraints to enforce the mgmt_server_id integrity in host table, when user manually 
 | 
					        // We don't have foreign key constraints to enforce the mgmt_server_id integrity in host table, when user manually 
 | 
				
			||||||
        // remove records from mshost table, this will leave orphan mgmt_serve_id reference in host table.
 | 
					        // remove records from mshost table, this will leave orphan mgmt_serve_id reference in host table.
 | 
				
			||||||
@ -828,14 +851,14 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
    private void peerScan() {
 | 
					    private void peerScan() {
 | 
				
			||||||
        Date cutTime = DateUtil.currentGMTTime();
 | 
					        Date cutTime = DateUtil.currentGMTTime();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        List<ManagementServerHostVO> currentList = _mshostDao.getActiveList(new Date(cutTime.getTime() - heartbeatThreshold));
 | 
					        List<ManagementServerHostVO> currentList = _mshostDao.getActiveList(new Date(cutTime.getTime() - _heartbeatThreshold));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        List<ManagementServerHostVO> removedNodeList = new ArrayList<ManagementServerHostVO>();
 | 
					        List<ManagementServerHostVO> removedNodeList = new ArrayList<ManagementServerHostVO>();
 | 
				
			||||||
        List<ManagementServerHostVO> invalidatedNodeList = new ArrayList<ManagementServerHostVO>();
 | 
					        List<ManagementServerHostVO> invalidatedNodeList = new ArrayList<ManagementServerHostVO>();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if(_mshostId != null) {
 | 
					        if(_mshostId != null) {
 | 
				
			||||||
            // only if we have already attached to cluster, will we start to check leaving nodes
 | 
					            // only if we have already attached to cluster, will we start to check leaving nodes
 | 
				
			||||||
            for(Map.Entry<Long, ManagementServerHostVO>  entry : activePeers.entrySet()) {
 | 
					            for(Map.Entry<Long, ManagementServerHostVO>  entry : _activePeers.entrySet()) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                ManagementServerHostVO current = getInListById(entry.getKey(), currentList);
 | 
					                ManagementServerHostVO current = getInListById(entry.getKey(), currentList);
 | 
				
			||||||
                if(current == null) {
 | 
					                if(current == null) {
 | 
				
			||||||
@ -869,7 +892,7 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
        // process invalidated node list
 | 
					        // process invalidated node list
 | 
				
			||||||
        if(invalidatedNodeList.size() > 0) {
 | 
					        if(invalidatedNodeList.size() > 0) {
 | 
				
			||||||
            for(ManagementServerHostVO mshost : invalidatedNodeList) {
 | 
					            for(ManagementServerHostVO mshost : invalidatedNodeList) {
 | 
				
			||||||
                activePeers.remove(mshost.getId());
 | 
					                _activePeers.remove(mshost.getId());
 | 
				
			||||||
                try {
 | 
					                try {
 | 
				
			||||||
                    JmxUtil.unregisterMBean("ClusterManager", "Node " + mshost.getId());
 | 
					                    JmxUtil.unregisterMBean("ClusterManager", "Node " + mshost.getId());
 | 
				
			||||||
                } catch(Exception e) {
 | 
					                } catch(Exception e) {
 | 
				
			||||||
@ -886,7 +909,7 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
            ManagementServerHostVO mshost = it.next();
 | 
					            ManagementServerHostVO mshost = it.next();
 | 
				
			||||||
            if(!pingManagementNode(mshost)) {
 | 
					            if(!pingManagementNode(mshost)) {
 | 
				
			||||||
                s_logger.warn("Management node " + mshost.getId() + " is detected inactive by timestamp and also not pingable");
 | 
					                s_logger.warn("Management node " + mshost.getId() + " is detected inactive by timestamp and also not pingable");
 | 
				
			||||||
                activePeers.remove(mshost.getId());
 | 
					                _activePeers.remove(mshost.getId());
 | 
				
			||||||
                try {
 | 
					                try {
 | 
				
			||||||
                    JmxUtil.unregisterMBean("ClusterManager", "Node " + mshost.getId());
 | 
					                    JmxUtil.unregisterMBean("ClusterManager", "Node " + mshost.getId());
 | 
				
			||||||
                } catch(Exception e) {
 | 
					                } catch(Exception e) {
 | 
				
			||||||
@ -904,8 +927,8 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        List<ManagementServerHostVO> newNodeList = new ArrayList<ManagementServerHostVO>();
 | 
					        List<ManagementServerHostVO> newNodeList = new ArrayList<ManagementServerHostVO>();
 | 
				
			||||||
        for(ManagementServerHostVO mshost : currentList) {
 | 
					        for(ManagementServerHostVO mshost : currentList) {
 | 
				
			||||||
            if(!activePeers.containsKey(mshost.getId())) {
 | 
					            if(!_activePeers.containsKey(mshost.getId())) {
 | 
				
			||||||
                activePeers.put(mshost.getId(), mshost);
 | 
					                _activePeers.put(mshost.getId(), mshost);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                if(s_logger.isDebugEnabled()) {
 | 
					                if(s_logger.isDebugEnabled()) {
 | 
				
			||||||
                    s_logger.debug("Detected management node joined, id:" + mshost.getId() + ", nodeIP:" + mshost.getServiceIP());
 | 
					                    s_logger.debug("Detected management node joined, id:" + mshost.getId() + ", nodeIP:" + mshost.getServiceIP());
 | 
				
			||||||
@ -986,7 +1009,7 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
            }
 | 
					            }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            // use seperate thread for heartbeat updates
 | 
					            // use seperate thread for heartbeat updates
 | 
				
			||||||
            _heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS);
 | 
					            _heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), _heartbeatInterval, _heartbeatInterval, TimeUnit.MILLISECONDS);
 | 
				
			||||||
            _notificationExecutor.submit(getNotificationTask());
 | 
					            _notificationExecutor.submit(getNotificationTask());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        } catch (Throwable e) {
 | 
					        } catch (Throwable e) {
 | 
				
			||||||
@ -1064,12 +1087,12 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        String value = configs.get("cluster.heartbeat.interval");
 | 
					        String value = configs.get("cluster.heartbeat.interval");
 | 
				
			||||||
        if (value != null) {
 | 
					        if (value != null) {
 | 
				
			||||||
            heartbeatInterval = NumbersUtil.parseInt(value, ClusterManager.DEFAULT_HEARTBEAT_INTERVAL);
 | 
					            _heartbeatInterval = NumbersUtil.parseInt(value, ClusterManager.DEFAULT_HEARTBEAT_INTERVAL);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        value = configs.get("cluster.heartbeat.threshold");
 | 
					        value = configs.get("cluster.heartbeat.threshold");
 | 
				
			||||||
        if (value != null) {
 | 
					        if (value != null) {
 | 
				
			||||||
            heartbeatThreshold = NumbersUtil.parseInt(value, ClusterManager.DEFAULT_HEARTBEAT_THRESHOLD);
 | 
					            _heartbeatThreshold = NumbersUtil.parseInt(value, ClusterManager.DEFAULT_HEARTBEAT_THRESHOLD);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        File dbPropsFile = PropertiesUtil.findConfigFile("db.properties");
 | 
					        File dbPropsFile = PropertiesUtil.findConfigFile("db.properties");
 | 
				
			||||||
@ -1141,7 +1164,7 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
    public boolean isManagementNodeAlive(long msid) {
 | 
					    public boolean isManagementNodeAlive(long msid) {
 | 
				
			||||||
        ManagementServerHostVO mshost = _mshostDao.findByMsid(msid);
 | 
					        ManagementServerHostVO mshost = _mshostDao.findByMsid(msid);
 | 
				
			||||||
        if(mshost != null) {
 | 
					        if(mshost != null) {
 | 
				
			||||||
            if(mshost.getLastUpdateTime().getTime() >=  DateUtil.currentGMTTime().getTime() - heartbeatThreshold) {
 | 
					            if(mshost.getLastUpdateTime().getTime() >=  DateUtil.currentGMTTime().getTime() - _heartbeatThreshold) {
 | 
				
			||||||
                return true;
 | 
					                return true;
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
@ -1194,20 +1217,20 @@ public class ClusterManagerImpl implements ClusterManager {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    public int getHeartbeatThreshold() {
 | 
					    public int getHeartbeatThreshold() {
 | 
				
			||||||
        return this.heartbeatThreshold;
 | 
					        return this._heartbeatThreshold;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public int getHeartbeatInterval() {
 | 
					    public int getHeartbeatInterval() {
 | 
				
			||||||
        return this.heartbeatInterval;
 | 
					        return this._heartbeatInterval;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    public void setHeartbeatThreshold(int threshold) {
 | 
					    public void setHeartbeatThreshold(int threshold) {
 | 
				
			||||||
        heartbeatThreshold = threshold;
 | 
					        _heartbeatThreshold = threshold;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private void checkConflicts() throws ConfigurationException {
 | 
					    private void checkConflicts() throws ConfigurationException {
 | 
				
			||||||
        Date cutTime = DateUtil.currentGMTTime();
 | 
					        Date cutTime = DateUtil.currentGMTTime();
 | 
				
			||||||
        List<ManagementServerHostVO> peers = _mshostDao.getActiveList(new Date(cutTime.getTime() - heartbeatThreshold));
 | 
					        List<ManagementServerHostVO> peers = _mshostDao.getActiveList(new Date(cutTime.getTime() - _heartbeatThreshold));
 | 
				
			||||||
        for(ManagementServerHostVO peer : peers) {
 | 
					        for(ManagementServerHostVO peer : peers) {
 | 
				
			||||||
            String peerIP = peer.getServiceIP().trim();
 | 
					            String peerIP = peer.getServiceIP().trim();
 | 
				
			||||||
            if(_clusterNodeIP.equals(peerIP)) {
 | 
					            if(_clusterNodeIP.equals(peerIP)) {
 | 
				
			||||||
 | 
				
			|||||||
@ -19,25 +19,46 @@
 | 
				
			|||||||
package com.cloud.utils;
 | 
					package com.cloud.utils;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
public class Profiler {
 | 
					public class Profiler {
 | 
				
			||||||
	private long startTickInMs;
 | 
						private Long startTickInMs;
 | 
				
			||||||
	private long stopTickInMs;
 | 
						private Long stopTickInMs;
 | 
				
			||||||
	
 | 
						
 | 
				
			||||||
	public Profiler() {
 | 
						public Profiler() {
 | 
				
			||||||
		startTickInMs = 0;
 | 
							startTickInMs = null;
 | 
				
			||||||
		stopTickInMs = 0;
 | 
							stopTickInMs = null;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	
 | 
						
 | 
				
			||||||
	public long start() {
 | 
						public long start() {
 | 
				
			||||||
		startTickInMs = System.currentTimeMillis();
 | 
							startTickInMs = System.currentTimeMillis();
 | 
				
			||||||
		return startTickInMs;
 | 
							return startTickInMs.longValue();
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	
 | 
						
 | 
				
			||||||
	public long stop() {
 | 
						public long stop() {
 | 
				
			||||||
		stopTickInMs = System.currentTimeMillis();
 | 
							stopTickInMs = System.currentTimeMillis();
 | 
				
			||||||
		return stopTickInMs;
 | 
							return stopTickInMs.longValue();
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	
 | 
						
 | 
				
			||||||
	public long getDuration() {
 | 
						public long getDuration() {
 | 
				
			||||||
		return stopTickInMs - startTickInMs;
 | 
						    if(startTickInMs != null &&  stopTickInMs != null)
 | 
				
			||||||
	}
 | 
						        return stopTickInMs.longValue() - startTickInMs.longValue();
 | 
				
			||||||
 | 
						    
 | 
				
			||||||
 | 
						    return -1;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						
 | 
				
			||||||
 | 
						public boolean isStarted() {
 | 
				
			||||||
 | 
						    return startTickInMs != null;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						
 | 
				
			||||||
 | 
						public boolean isStopped() {
 | 
				
			||||||
 | 
						    return stopTickInMs != null;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						
 | 
				
			||||||
 | 
						public String toString() {
 | 
				
			||||||
 | 
						    if(startTickInMs == null)
 | 
				
			||||||
 | 
						        return "Not Started";
 | 
				
			||||||
 | 
						    
 | 
				
			||||||
 | 
						    if(stopTickInMs == null)
 | 
				
			||||||
 | 
						        return "Started but not stopped";
 | 
				
			||||||
 | 
						    
 | 
				
			||||||
 | 
						    return "Done. Duration: " + getDuration() + "ms";
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user