wrap cluster heartbeat in a try-catch

This commit is contained in:
Alex Huang 2011-02-01 12:03:55 -08:00
parent 23b9a2452d
commit ade0097a0a

View File

@ -170,9 +170,10 @@ public class ClusterManagerImpl implements ClusterManager {
public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError) {
ClusterService peerService = null;
if(s_logger.isDebugEnabled())
s_logger.debug(getSelfPeerName() + " -> " + strPeer + "." + agentId + " " +
if(s_logger.isDebugEnabled()) {
s_logger.debug(getSelfPeerName() + " -> " + strPeer + "." + agentId + " " +
gson.toJson(cmds, Command[].class));
}
for(int i = 0; i < 2; i++) {
try {
@ -183,14 +184,16 @@ public class ClusterManagerImpl implements ClusterManager {
if(peerService != null) {
try {
if(s_logger.isDebugEnabled())
s_logger.debug("Send " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " to remote");
if(s_logger.isDebugEnabled()) {
s_logger.debug("Send " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " to remote");
}
long startTick = System.currentTimeMillis();
String strResult = peerService.execute(getSelfPeerName(), agentId, gson.toJson(cmds, Command[].class), stopOnError);
if(s_logger.isDebugEnabled())
s_logger.debug("Completed " + getSelfPeerName() + " -> " + strPeer + "." + agentId + "in " +
if(s_logger.isDebugEnabled()) {
s_logger.debug("Completed " + getSelfPeerName() + " -> " + strPeer + "." + agentId + "in " +
(System.currentTimeMillis() - startTick) + " ms, result: " + strResult);
}
if(strResult != null) {
try {
@ -204,9 +207,10 @@ public class ClusterManagerImpl implements ClusterManager {
} catch (RemoteException e) {
invalidatePeerService(strPeer);
if(s_logger.isInfoEnabled())
s_logger.info("Exception on remote execution, peer: " + strPeer + ", iteration: "
if(s_logger.isInfoEnabled()) {
s_logger.info("Exception on remote execution, peer: " + strPeer + ", iteration: "
+ i + ", exception message :" + e.getMessage());
}
}
}
}
@ -219,9 +223,10 @@ public class ClusterManagerImpl implements ClusterManager {
ClusterService peerService = null;
if(s_logger.isDebugEnabled())
s_logger.debug("Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " " +
if(s_logger.isDebugEnabled()) {
s_logger.debug("Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " " +
gson.toJson(cmds, Command[].class));
}
for(int i = 0; i < 2; i++) {
try {
@ -234,17 +239,19 @@ public class ClusterManagerImpl implements ClusterManager {
try {
long seq = 0;
synchronized(String.valueOf(agentId).intern()) {
if(s_logger.isDebugEnabled())
s_logger.debug("Send Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " to remote");
if(s_logger.isDebugEnabled()) {
s_logger.debug("Send Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId + " to remote");
}
long startTick = System.currentTimeMillis();
seq = peerService.executeAsync(getSelfPeerName(), agentId, gson.toJson(cmds, Command[].class), stopOnError);
if(seq > 0) {
if(s_logger.isDebugEnabled())
s_logger.debug("Completed Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId
if(s_logger.isDebugEnabled()) {
s_logger.debug("Completed Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId
+ " in " + (System.currentTimeMillis() - startTick) + " ms"
+ ", register local listener " + strPeer + "/" + seq);
}
registerAsyncCall(strPeer, seq, listener);
} else {
@ -256,8 +263,9 @@ public class ClusterManagerImpl implements ClusterManager {
} catch (RemoteException e) {
invalidatePeerService(strPeer);
if(s_logger.isInfoEnabled())
s_logger.info("Exception on remote execution -> " + strPeer + ", iteration : " + i);
if(s_logger.isInfoEnabled()) {
s_logger.info("Exception on remote execution -> " + strPeer + ", iteration : " + i);
}
}
}
}
@ -268,9 +276,10 @@ public class ClusterManagerImpl implements ClusterManager {
@Override
public boolean onAsyncResult(String executingPeer, long agentId, long seq, Answer[] answers) {
if(s_logger.isDebugEnabled())
s_logger.debug("Process Async-call result from remote peer " + executingPeer + ", {" +
if(s_logger.isDebugEnabled()) {
s_logger.debug("Process Async-call result from remote peer " + executingPeer + ", {" +
agentId + "-" + seq + "} answers: " + (answers != null ? gson.toJson(answers, Answer[].class): "null"));
}
Listener listener = null;
synchronized(String.valueOf(agentId).intern()) {
@ -282,31 +291,36 @@ public class ClusterManagerImpl implements ClusterManager {
if(listener != null) {
long startTick = System.currentTimeMillis();
if(s_logger.isDebugEnabled())
s_logger.debug("Processing answer {" + agentId + "-" + seq + "} from remote peer " + executingPeer);
if(s_logger.isDebugEnabled()) {
s_logger.debug("Processing answer {" + agentId + "-" + seq + "} from remote peer " + executingPeer);
}
listener.processAnswers(agentId, seq, answers);
if(s_logger.isDebugEnabled())
s_logger.debug("Answer {" + agentId + "-" + seq + "} is processed in " +
if(s_logger.isDebugEnabled()) {
s_logger.debug("Answer {" + agentId + "-" + seq + "} is processed in " +
(System.currentTimeMillis() - startTick) + " ms");
}
if(!listener.isRecurring()) {
if(s_logger.isDebugEnabled())
s_logger.debug("Listener is not recurring after async-result callback {" +
if(s_logger.isDebugEnabled()) {
s_logger.debug("Listener is not recurring after async-result callback {" +
agentId + "-" + seq + "}, unregister it");
}
unregisterAsyncCall(executingPeer, seq);
} else {
if(s_logger.isDebugEnabled())
s_logger.debug("Listener is recurring after async-result callback {" + agentId
if(s_logger.isDebugEnabled()) {
s_logger.debug("Listener is recurring after async-result callback {" + agentId
+"-" + seq + "}, will keep it");
}
return true;
}
} else {
if(s_logger.isInfoEnabled())
s_logger.info("Async-call Listener has not been registered yet for {" + agentId
if(s_logger.isInfoEnabled()) {
s_logger.info("Async-call Listener has not been registered yet for {" + agentId
+"-" + seq + "}");
}
}
return false;
}
@ -314,9 +328,10 @@ public class ClusterManagerImpl implements ClusterManager {
@Override
public boolean forwardAnswer(String targetPeer, long agentId, long seq, Answer[] answers) {
if(s_logger.isDebugEnabled())
s_logger.debug("Forward -> " + targetPeer + " Async-call answer {" + agentId + "-" + seq +
if(s_logger.isDebugEnabled()) {
s_logger.debug("Forward -> " + targetPeer + " Async-call answer {" + agentId + "-" + seq +
"} " + (answers != null? gson.toJson(answers, Answer[].class):""));
}
final String targetPeerF = targetPeer;
final Answer[] answersF = answers;
@ -337,14 +352,16 @@ public class ClusterManagerImpl implements ClusterManager {
boolean result = false;
long startTick = System.currentTimeMillis();
if(s_logger.isDebugEnabled())
s_logger.debug("Start forwarding Async-call answer {" + agentId + "-" + seq + "} to remote");
if(s_logger.isDebugEnabled()) {
s_logger.debug("Start forwarding Async-call answer {" + agentId + "-" + seq + "} to remote");
}
result = peerService.onAsyncResult(getSelfPeerName(), agentIdF, seqF, gson.toJson(answersF, Answer[].class));
if(s_logger.isDebugEnabled())
s_logger.debug("Completed forwarding Async-call answer {" + agentId + "-" + seq + "} in " +
if(s_logger.isDebugEnabled()) {
s_logger.debug("Completed forwarding Async-call answer {" + agentId + "-" + seq + "} in " +
(System.currentTimeMillis() - startTick) + " ms, return result: " + result);
}
return result;
} catch (RemoteException e) {
@ -365,8 +382,9 @@ public class ClusterManagerImpl implements ClusterManager {
HostVO host = _hostDao.findById(agentHostId);
if(host != null && host.getManagementServerId() != null) {
if(getSelfPeerName().equals(Long.toString(host.getManagementServerId())))
return null;
if(getSelfPeerName().equals(Long.toString(host.getManagementServerId()))) {
return null;
}
return Long.toString(host.getManagementServerId());
}
@ -427,8 +445,9 @@ public class ClusterManagerImpl implements ClusterManager {
public ClusterService getPeerService(String strPeer) throws RemoteException {
synchronized(clusterPeers) {
if(clusterPeers.containsKey(strPeer))
return clusterPeers.get(strPeer);
if(clusterPeers.containsKey(strPeer)) {
return clusterPeers.get(strPeer);
}
}
ClusterService service = _currentServiceAdapter.getPeerService(strPeer);
@ -437,8 +456,9 @@ public class ClusterManagerImpl implements ClusterManager {
synchronized(clusterPeers) {
// re-check the peer map again to deal with the
// race conditions
if(!clusterPeers.containsKey(strPeer))
clusterPeers.put(strPeer, service);
if(!clusterPeers.containsKey(strPeer)) {
clusterPeers.put(strPeer, service);
}
}
}
@ -447,8 +467,9 @@ public class ClusterManagerImpl implements ClusterManager {
public void invalidatePeerService(String strPeer) {
synchronized(clusterPeers) {
if(clusterPeers.containsKey(strPeer))
clusterPeers.remove(strPeer);
if(clusterPeers.containsKey(strPeer)) {
clusterPeers.remove(strPeer);
}
}
}
@ -456,8 +477,9 @@ public class ClusterManagerImpl implements ClusterManager {
String key = strPeer + "/" + seq;
synchronized(asyncCalls) {
if(!asyncCalls.containsKey(key))
asyncCalls.put(key, listener);
if(!asyncCalls.containsKey(key)) {
asyncCalls.put(key, listener);
}
}
}
@ -465,8 +487,9 @@ public class ClusterManagerImpl implements ClusterManager {
String key = strPeer + "/" + seq;
synchronized(asyncCalls) {
if(asyncCalls.containsKey(key))
return asyncCalls.get(key);
if(asyncCalls.containsKey(key)) {
return asyncCalls.get(key);
}
}
return null;
@ -476,8 +499,9 @@ public class ClusterManagerImpl implements ClusterManager {
String key = strPeer + "/" + seq;
synchronized(asyncCalls) {
if(asyncCalls.containsKey(key))
asyncCalls.remove(key);
if(asyncCalls.containsKey(key)) {
asyncCalls.remove(key);
}
}
}
@ -485,11 +509,16 @@ public class ClusterManagerImpl implements ClusterManager {
return new Runnable() {
@Override
public void run() {
if(s_logger.isTraceEnabled())
s_logger.trace("Cluster manager heartbeat update, id:" + _mshostId);
_mshostDao.update(_mshostId, DateUtil.currentGMTTime());
peerScan();
try {
if(s_logger.isTraceEnabled()) {
s_logger.trace("Cluster manager heartbeat update, id:" + _mshostId);
}
_mshostDao.update(_mshostId, DateUtil.currentGMTTime());
peerScan();
} catch (Exception e) {
s_logger.error("Problem with the cluster heartbeat!", e);
}
}
};
}
@ -505,8 +534,9 @@ public class ClusterManagerImpl implements ClusterManager {
for(Map.Entry<Long, ManagementServerHostVO> entry : activePeers.entrySet()) {
if(!isIdInList(entry.getKey(), currentList)) {
if(entry.getKey().longValue() != _mshostId.longValue()) {
if(s_logger.isDebugEnabled())
s_logger.debug("Detected management node left, id:" + entry.getKey() + ", nodeIP:" + entry.getValue().getServiceIP());
if(s_logger.isDebugEnabled()) {
s_logger.debug("Detected management node left, id:" + entry.getKey() + ", nodeIP:" + entry.getValue().getServiceIP());
}
removedNodeList.add(entry.getValue());
}
}
@ -522,23 +552,27 @@ public class ClusterManagerImpl implements ClusterManager {
if(!activePeers.containsKey(mshost.getId())) {
activePeers.put(mshost.getId(), mshost);
if(s_logger.isDebugEnabled())
s_logger.debug("Detected management node joined, id:" + mshost.getId() + ", nodeIP:" + mshost.getServiceIP());
if(s_logger.isDebugEnabled()) {
s_logger.debug("Detected management node joined, id:" + mshost.getId() + ", nodeIP:" + mshost.getServiceIP());
}
newNodeList.add(mshost);
}
}
if(newNodeList.size() > 0)
notifyNodeJoined(newNodeList);
if(newNodeList.size() > 0) {
notifyNodeJoined(newNodeList);
}
if(removedNodeList.size() > 0)
notifyNodeLeft(removedNodeList);
if(removedNodeList.size() > 0) {
notifyNodeLeft(removedNodeList);
}
}
private static boolean isIdInList(Long id, List<ManagementServerHostVO> l) {
for(ManagementServerHostVO mshost : l) {
if(mshost.getId() != null && mshost.getId() == id)
return true;
if(mshost.getId() != null && mshost.getId() == id) {
return true;
}
}
return false;
}
@ -550,8 +584,9 @@ public class ClusterManagerImpl implements ClusterManager {
@Override @DB
public boolean start() {
if(s_logger.isInfoEnabled())
s_logger.info("Starting cluster manager, msid : " + _id);
if(s_logger.isInfoEnabled()) {
s_logger.info("Starting cluster manager, msid : " + _id);
}
Transaction txn = Transaction.currentTxn();
try {
@ -574,11 +609,13 @@ public class ClusterManagerImpl implements ClusterManager {
mshost.setAlertCount(0);
_mshostDao.persist(mshost);
if(s_logger.isInfoEnabled())
s_logger.info("New instance of management server msid " + _id + " is being started");
if(s_logger.isInfoEnabled()) {
s_logger.info("New instance of management server msid " + _id + " is being started");
}
} else {
if(s_logger.isInfoEnabled())
s_logger.info("Management server " + _id + " is being started");
if(s_logger.isInfoEnabled()) {
s_logger.info("Management server " + _id + " is being started");
}
_mshostDao.update(mshost.getId(), NetUtils.getHostName(), version,
_clusterNodeIP, _currentServiceAdapter.getServicePort(), DateUtil.currentGMTTime());
@ -587,8 +624,9 @@ public class ClusterManagerImpl implements ClusterManager {
txn.commit();
_mshostId = mshost.getId();
if(s_logger.isInfoEnabled())
s_logger.info("Management server (host id : " + _mshostId + ") is available at " + _clusterNodeIP + ":" + _currentServiceAdapter.getServicePort());
if(s_logger.isInfoEnabled()) {
s_logger.info("Management server (host id : " + _mshostId + ") is available at " + _clusterNodeIP + ":" + _currentServiceAdapter.getServicePort());
}
_heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), heartbeatInterval,
heartbeatInterval, TimeUnit.MILLISECONDS);
@ -600,15 +638,17 @@ public class ClusterManagerImpl implements ClusterManager {
throw new CloudRuntimeException("Unable to initialize cluster info into database");
}
if(s_logger.isInfoEnabled())
s_logger.info("Cluster manager is started");
if(s_logger.isInfoEnabled()) {
s_logger.info("Cluster manager is started");
}
return true;
}
@Override
public boolean stop() {
if(_mshostId != null)
_mshostDao.remove(_mshostId);
if(_mshostId != null) {
_mshostDao.remove(_mshostId);
}
_heartbeatScheduler.shutdownNow();
_executor.shutdownNow();
@ -619,29 +659,34 @@ public class ClusterManagerImpl implements ClusterManager {
} catch (InterruptedException e) {
}
if(s_logger.isInfoEnabled())
s_logger.info("Cluster manager is stopped");
if(s_logger.isInfoEnabled()) {
s_logger.info("Cluster manager is stopped");
}
return true;
}
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
if(s_logger.isInfoEnabled())
s_logger.info("Start configuring cluster manager : " + name);
if(s_logger.isInfoEnabled()) {
s_logger.info("Start configuring cluster manager : " + name);
}
ComponentLocator locator = ComponentLocator.getCurrentLocator();
_agentMgr = locator.getManager(AgentManager.class);
if (_agentMgr == null)
if (_agentMgr == null) {
throw new ConfigurationException("Unable to get " + AgentManager.class.getName());
}
_mshostDao = locator.getDao(ManagementServerHostDao.class);
if(_mshostDao == null)
if(_mshostDao == null) {
throw new ConfigurationException("Unable to get " + ManagementServerHostDao.class.getName());
}
_hostDao = locator.getDao(HostDao.class);
if(_hostDao == null)
if(_hostDao == null) {
throw new ConfigurationException("Unable to get " + HostDao.class.getName());
}
ConfigurationDao configDao = locator.getDao(ConfigurationDao.class);
if (configDao == null) {
@ -651,12 +696,14 @@ public class ClusterManagerImpl implements ClusterManager {
Map<String, String> configs = configDao.getConfiguration("management-server", params);
String value = configs.get("cluster.heartbeat.interval");
if(value != null)
heartbeatInterval = NumbersUtil.parseInt(value, ClusterManager.DEFAULT_HEARTBEAT_INTERVAL);
if(value != null) {
heartbeatInterval = NumbersUtil.parseInt(value, ClusterManager.DEFAULT_HEARTBEAT_INTERVAL);
}
value = configs.get("cluster.heartbeat.threshold");
if(value != null)
heartbeatThreshold = NumbersUtil.parseInt(value, ClusterManager.DEFAULT_HEARTBEAT_THRESHOLD);
if(value != null) {
heartbeatThreshold = NumbersUtil.parseInt(value, ClusterManager.DEFAULT_HEARTBEAT_THRESHOLD);
}
_name = name;
@ -670,27 +717,33 @@ public class ClusterManagerImpl implements ClusterManager {
throw new ConfigurationException("Unable to load db.properties content");
}
_clusterNodeIP = dbProps.getProperty("cluster.node.IP");
if(_clusterNodeIP == null)
_clusterNodeIP = "127.0.0.1";
if(s_logger.isInfoEnabled())
s_logger.info("Cluster node IP : " + _clusterNodeIP);
if(_clusterNodeIP == null) {
_clusterNodeIP = "127.0.0.1";
}
if(s_logger.isInfoEnabled()) {
s_logger.info("Cluster node IP : " + _clusterNodeIP);
}
if(!NetUtils.isLocalAddress(_clusterNodeIP))
throw new ConfigurationException("cluster node IP should be valid local address where the server is running, please check your configuration");
if(!NetUtils.isLocalAddress(_clusterNodeIP)) {
throw new ConfigurationException("cluster node IP should be valid local address where the server is running, please check your configuration");
}
Adapters<ClusterServiceAdapter> adapters = locator.getAdapters(ClusterServiceAdapter.class);
if (adapters == null || !adapters.isSet()) {
throw new ConfigurationException("Unable to get cluster service adapters");
}
Enumeration<ClusterServiceAdapter> it = adapters.enumeration();
if(it.hasMoreElements())
_currentServiceAdapter = it.nextElement();
if(it.hasMoreElements()) {
_currentServiceAdapter = it.nextElement();
}
if(_currentServiceAdapter == null)
if(_currentServiceAdapter == null) {
throw new ConfigurationException("Unable to set current cluster service adapter");
}
if(s_logger.isInfoEnabled())
s_logger.info("Cluster manager is configured.");
if(s_logger.isInfoEnabled()) {
s_logger.info("Cluster manager is configured.");
}
return true;
}