Formatting class ClusterManagerImpl

- Splitting format commit from fix commit.

Signed-off-by: wilderrodrigues <wrodrigues@schubergphilis.com>
This commit is contained in:
wilderrodrigues 2015-06-25 07:59:35 +02:00
parent af902fd9d9
commit ea1f2eb049

View File

@ -41,13 +41,12 @@ import javax.ejb.Local;
import javax.inject.Inject;
import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.cloudstack.framework.config.ConfigDepot;
import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.framework.config.Configurable;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.cloudstack.utils.identity.ManagementServerNode;
import org.apache.log4j.Logger;
import com.cloud.cluster.dao.ManagementServerHostDao;
import com.cloud.cluster.dao.ManagementServerHostPeerDao;
@ -130,21 +129,21 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
setRunLevel(ComponentLifecycle.RUN_LEVEL_FRAMEWORK);
}
private void registerRequestPdu(ClusterServiceRequestPdu pdu) {
private void registerRequestPdu(final ClusterServiceRequestPdu pdu) {
synchronized (_outgoingPdusWaitingForAck) {
_outgoingPdusWaitingForAck.put(pdu.getSequenceId(), pdu);
}
}
@Override
public void registerDispatcher(Dispatcher dispatcher) {
public void registerDispatcher(final Dispatcher dispatcher) {
_dispatcher = dispatcher;
}
private ClusterServiceRequestPdu popRequestPdu(long ackSequenceId) {
private ClusterServiceRequestPdu popRequestPdu(final long ackSequenceId) {
synchronized (_outgoingPdusWaitingForAck) {
if (_outgoingPdusWaitingForAck.get(ackSequenceId) != null) {
ClusterServiceRequestPdu pdu = _outgoingPdusWaitingForAck.get(ackSequenceId);
final ClusterServiceRequestPdu pdu = _outgoingPdusWaitingForAck.get(ackSequenceId);
_outgoingPdusWaitingForAck.remove(ackSequenceId);
return pdu;
}
@ -153,20 +152,21 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
return null;
}
private void cancelClusterRequestToPeer(String strPeer) {
List<ClusterServiceRequestPdu> candidates = new ArrayList<ClusterServiceRequestPdu>();
private void cancelClusterRequestToPeer(final String strPeer) {
final List<ClusterServiceRequestPdu> candidates = new ArrayList<ClusterServiceRequestPdu>();
synchronized (_outgoingPdusWaitingForAck) {
for (Map.Entry<Long, ClusterServiceRequestPdu> entry : _outgoingPdusWaitingForAck.entrySet()) {
if (entry.getValue().getDestPeer().equalsIgnoreCase(strPeer))
for (final Map.Entry<Long, ClusterServiceRequestPdu> entry : _outgoingPdusWaitingForAck.entrySet()) {
if (entry.getValue().getDestPeer().equalsIgnoreCase(strPeer)) {
candidates.add(entry.getValue());
}
}
for (ClusterServiceRequestPdu pdu : candidates) {
for (final ClusterServiceRequestPdu pdu : candidates) {
_outgoingPdusWaitingForAck.remove(pdu.getSequenceId());
}
}
for (ClusterServiceRequestPdu pdu : candidates) {
for (final ClusterServiceRequestPdu pdu : candidates) {
s_logger.warn("Cancel cluster request PDU to peer: " + strPeer + ", pdu: " + pdu.getJsonPackage());
synchronized (pdu) {
pdu.notifyAll();
@ -174,22 +174,22 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
}
private void addOutgoingClusterPdu(ClusterServicePdu pdu) {
private void addOutgoingClusterPdu(final ClusterServicePdu pdu) {
synchronized (_clusterPduOutgoingQueue) {
_clusterPduOutgoingQueue.add(pdu);
_clusterPduOutgoingQueue.notifyAll();
}
}
private ClusterServicePdu popOutgoingClusterPdu(long timeoutMs) {
private ClusterServicePdu popOutgoingClusterPdu(final long timeoutMs) {
synchronized (_clusterPduOutgoingQueue) {
try {
_clusterPduOutgoingQueue.wait(timeoutMs);
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
}
if (_clusterPduOutgoingQueue.size() > 0) {
ClusterServicePdu pdu = _clusterPduOutgoingQueue.get(0);
final ClusterServicePdu pdu = _clusterPduOutgoingQueue.get(0);
_clusterPduOutgoingQueue.remove(0);
return pdu;
}
@ -197,22 +197,22 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
return null;
}
private void addIncomingClusterPdu(ClusterServicePdu pdu) {
private void addIncomingClusterPdu(final ClusterServicePdu pdu) {
synchronized (_clusterPduIncomingQueue) {
_clusterPduIncomingQueue.add(pdu);
_clusterPduIncomingQueue.notifyAll();
}
}
private ClusterServicePdu popIncomingClusterPdu(long timeoutMs) {
private ClusterServicePdu popIncomingClusterPdu(final long timeoutMs) {
synchronized (_clusterPduIncomingQueue) {
try {
_clusterPduIncomingQueue.wait(timeoutMs);
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
}
if (_clusterPduIncomingQueue.size() > 0) {
ClusterServicePdu pdu = _clusterPduIncomingQueue.get(0);
final ClusterServicePdu pdu = _clusterPduIncomingQueue.get(0);
_clusterPduIncomingQueue.remove(0);
return pdu;
}
@ -241,15 +241,16 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
private void onSendingClusterPdu() {
while (true) {
try {
ClusterServicePdu pdu = popOutgoingClusterPdu(1000);
if (pdu == null)
final ClusterServicePdu pdu = popOutgoingClusterPdu(1000);
if (pdu == null) {
continue;
}
ClusterService peerService = null;
for (int i = 0; i < 2; i++) {
try {
peerService = getPeerService(pdu.getDestPeer());
} catch (RemoteException e) {
} catch (final RemoteException e) {
s_logger.error("Unable to get cluster service on peer : " + pdu.getDestPeer());
}
@ -257,30 +258,31 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
try {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Cluster PDU " + getSelfPeerName() + " -> " + pdu.getDestPeer() + ". agent: " + pdu.getAgentId() + ", pdu seq: " +
pdu.getSequenceId() + ", pdu ack seq: " + pdu.getAckSequenceId() + ", json: " + pdu.getJsonPackage());
pdu.getSequenceId() + ", pdu ack seq: " + pdu.getAckSequenceId() + ", json: " + pdu.getJsonPackage());
}
long startTick = System.currentTimeMillis();
String strResult = peerService.execute(pdu);
final long startTick = System.currentTimeMillis();
final String strResult = peerService.execute(pdu);
if (s_logger.isDebugEnabled()) {
s_logger.debug("Cluster PDU " + getSelfPeerName() + " -> " + pdu.getDestPeer() + " completed. time: " +
(System.currentTimeMillis() - startTick) + "ms. agent: " + pdu.getAgentId() + ", pdu seq: " + pdu.getSequenceId() +
", pdu ack seq: " + pdu.getAckSequenceId() + ", json: " + pdu.getJsonPackage());
(System.currentTimeMillis() - startTick) + "ms. agent: " + pdu.getAgentId() + ", pdu seq: " + pdu.getSequenceId() +
", pdu ack seq: " + pdu.getAckSequenceId() + ", json: " + pdu.getJsonPackage());
}
if ("true".equals(strResult))
if ("true".equals(strResult)) {
break;
}
} catch (RemoteException e) {
} catch (final RemoteException e) {
invalidatePeerService(pdu.getDestPeer());
if (s_logger.isInfoEnabled()) {
s_logger.info("Exception on remote execution, peer: " + pdu.getDestPeer() + ", iteration: " + i + ", exception message :" +
e.getMessage());
e.getMessage());
}
}
}
}
} catch (Throwable e) {
} catch (final Throwable e) {
s_logger.error("Unexcpeted exception: ", e);
}
}
@ -290,14 +292,15 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
while (true) {
try {
final ClusterServicePdu pdu = popIncomingClusterPdu(1000);
if (pdu == null)
if (pdu == null) {
continue;
}
_executor.execute(new ManagedContextRunnable() {
@Override
protected void runInContext() {
if (pdu.getPduType() == ClusterServicePdu.PDU_TYPE_RESPONSE) {
ClusterServiceRequestPdu requestPdu = popRequestPdu(pdu.getAckSequenceId());
final ClusterServiceRequestPdu requestPdu = popRequestPdu(pdu.getAckSequenceId());
if (requestPdu != null) {
requestPdu.setResponseResult(pdu.getJsonPackage());
synchronized (requestPdu) {
@ -308,11 +311,12 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
} else {
String result = _dispatcher.dispatch(pdu);
if (result == null)
if (result == null) {
result = "";
}
if (pdu.getPduType() == ClusterServicePdu.PDU_TYPE_REQUEST) {
ClusterServicePdu responsePdu = new ClusterServicePdu();
final ClusterServicePdu responsePdu = new ClusterServicePdu();
responsePdu.setPduType(ClusterServicePdu.PDU_TYPE_RESPONSE);
responsePdu.setSourcePeer(pdu.getDestPeer());
responsePdu.setDestPeer(pdu.getSourcePeer());
@ -324,14 +328,14 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
}
});
} catch (Throwable e) {
} catch (final Throwable e) {
s_logger.error("Unexcpeted exception: ", e);
}
}
}
@Override
public void OnReceiveClusterServicePdu(ClusterServicePdu pdu) {
public void OnReceiveClusterServicePdu(final ClusterServicePdu pdu) {
addIncomingClusterPdu(pdu);
}
@ -343,17 +347,17 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
* peer is running regardless of version.
* @return true if there are peers running and false if not.
*/
public static final boolean arePeersRunning(String notVersion) {
public static final boolean arePeersRunning(final String notVersion) {
return false; // TODO: Leaving this for Kelven to take care of.
}
@Override
public void broadcast(long agentId, String cmds) {
Date cutTime = DateUtil.currentGMTTime();
public void broadcast(final long agentId, final String cmds) {
final Date cutTime = DateUtil.currentGMTTime();
List<ManagementServerHostVO> peers = _mshostDao.getActiveList(new Date(cutTime.getTime() - HeartbeatThreshold.value()));
for (ManagementServerHostVO peer : peers) {
String peerName = Long.toString(peer.getMsid());
final List<ManagementServerHostVO> peers = _mshostDao.getActiveList(new Date(cutTime.getTime() - HeartbeatThreshold.value()));
for (final ManagementServerHostVO peer : peers) {
final String peerName = Long.toString(peer.getMsid());
if (getSelfPeerName().equals(peerName)) {
continue; // Skip myself.
}
@ -362,14 +366,14 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
s_logger.debug("Forwarding " + cmds + " to " + peer.getMsid());
}
executeAsync(peerName, agentId, cmds, true);
} catch (Exception e) {
} catch (final Exception e) {
s_logger.warn("Caught exception while talkign to " + peer.getMsid());
}
}
}
public void executeAsync(String strPeer, long agentId, String cmds, boolean stopOnError) {
ClusterServicePdu pdu = new ClusterServicePdu();
public void executeAsync(final String strPeer, final long agentId, final String cmds, final boolean stopOnError) {
final ClusterServicePdu pdu = new ClusterServicePdu();
pdu.setSourcePeer(getSelfPeerName());
pdu.setDestPeer(strPeer);
pdu.setAgentId(agentId);
@ -379,12 +383,12 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
@Override
public String execute(String strPeer, long agentId, String cmds, boolean stopOnError) {
public String execute(final String strPeer, final long agentId, final String cmds, final boolean stopOnError) {
if (s_logger.isDebugEnabled()) {
s_logger.debug(getSelfPeerName() + " -> " + strPeer + "." + agentId + " " + cmds);
}
ClusterServiceRequestPdu pdu = new ClusterServiceRequestPdu();
final ClusterServiceRequestPdu pdu = new ClusterServiceRequestPdu();
pdu.setSourcePeer(getSelfPeerName());
pdu.setDestPeer(strPeer);
pdu.setAgentId(agentId);
@ -396,7 +400,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
synchronized (pdu) {
try {
pdu.wait();
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
}
}
@ -412,7 +416,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
@Override
public ManagementServerHostVO getPeer(String mgmtServerId) {
public ManagementServerHostVO getPeer(final String mgmtServerId) {
return _mshostDao.findByMsid(Long.parseLong(mgmtServerId));
}
@ -426,7 +430,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
@Override
public void registerListener(ClusterManagerListener listener) {
public void registerListener(final ClusterManagerListener listener) {
// Note : we don't check duplicates
synchronized (_listeners) {
@ -437,7 +441,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
@Override
public void unregisterListener(ClusterManagerListener listener) {
public void unregisterListener(final ClusterManagerListener listener) {
synchronized (_listeners) {
s_logger.info("unregister cluster listener " + listener.getClass());
@ -445,17 +449,17 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
}
public void notifyNodeJoined(List<ManagementServerHostVO> nodeList) {
public void notifyNodeJoined(final List<ManagementServerHostVO> nodeList) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Notify management server node join to listeners.");
for (ManagementServerHostVO mshost : nodeList) {
for (final ManagementServerHostVO mshost : nodeList) {
s_logger.debug("Joining node, IP: " + mshost.getServiceIP() + ", msid: " + mshost.getMsid());
}
}
synchronized (_listeners) {
for (ClusterManagerListener listener : _listeners) {
for (final ClusterManagerListener listener : _listeners) {
listener.onManagementNodeJoined(nodeList, _mshostId);
}
}
@ -463,19 +467,20 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
SubscriptionMgr.getInstance().notifySubscribers(ClusterManager.ALERT_SUBJECT, this, new ClusterNodeJoinEventArgs(_mshostId, nodeList));
}
public void notifyNodeLeft(List<ManagementServerHostVO> nodeList) {
public void notifyNodeLeft(final List<ManagementServerHostVO> nodeList) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Notify management server node left to listeners.");
}
for (ManagementServerHostVO mshost : nodeList) {
if (s_logger.isDebugEnabled())
for (final ManagementServerHostVO mshost : nodeList) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Leaving node, IP: " + mshost.getServiceIP() + ", msid: " + mshost.getMsid());
}
cancelClusterRequestToPeer(String.valueOf(mshost.getMsid()));
}
synchronized (_listeners) {
for (ClusterManagerListener listener : _listeners) {
for (final ClusterManagerListener listener : _listeners) {
listener.onManagementNodeLeft(nodeList, _mshostId);
}
}
@ -484,24 +489,25 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
public void notifyNodeIsolated() {
if (s_logger.isDebugEnabled())
if (s_logger.isDebugEnabled()) {
s_logger.debug("Notify management server node isolation to listeners");
}
synchronized (_listeners) {
for (ClusterManagerListener listener : _listeners) {
for (final ClusterManagerListener listener : _listeners) {
listener.onManagementNodeIsolated();
}
}
}
public ClusterService getPeerService(String strPeer) throws RemoteException {
public ClusterService getPeerService(final String strPeer) throws RemoteException {
synchronized (_clusterPeers) {
if (_clusterPeers.containsKey(strPeer)) {
return _clusterPeers.get(strPeer);
}
}
ClusterService service = _currentServiceAdapter.getPeerService(strPeer);
final ClusterService service = _currentServiceAdapter.getPeerService(strPeer);
if (service != null) {
synchronized (_clusterPeers) {
@ -516,7 +522,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
return service;
}
public void invalidatePeerService(String strPeer) {
public void invalidatePeerService(final String strPeer) {
synchronized (_clusterPeers) {
if (_clusterPeers.containsKey(strPeer)) {
_clusterPeers.remove(strPeer);
@ -528,11 +534,11 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
return new ManagedContextRunnable() {
@Override
protected void runInContext() {
TransactionLegacy txn = TransactionLegacy.open("ClusterHeartbeat");
final TransactionLegacy txn = TransactionLegacy.open("ClusterHeartbeat");
try {
Profiler profiler = new Profiler();
Profiler profilerHeartbeatUpdate = new Profiler();
Profiler profilerPeerScan = new Profiler();
final Profiler profiler = new Profiler();
final Profiler profilerHeartbeatUpdate = new Profiler();
final Profiler profilerPeerScan = new Profiler();
try {
profiler.start();
@ -563,13 +569,14 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
profiler.stop();
if (profiler.getDuration() >= HeartbeatInterval.value()) {
if (s_logger.isDebugEnabled())
if (s_logger.isDebugEnabled()) {
s_logger.debug("Management server heartbeat takes too long to finish. profiler: " + profiler.toString() + ", profilerHeartbeatUpdate: " +
profilerHeartbeatUpdate.toString() + ", profilerPeerScan: " + profilerPeerScan.toString());
profilerHeartbeatUpdate.toString() + ", profilerPeerScan: " + profilerPeerScan.toString());
}
}
}
} catch (CloudRuntimeException e) {
} catch (final CloudRuntimeException e) {
s_logger.error("Runtime DB exception ", e.getCause());
if (e.getCause() instanceof ClusterInvalidSessionException) {
@ -580,9 +587,9 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
if (isRootCauseConnectionRelated(e.getCause())) {
invalidHeartbeatConnection();
}
} catch (ActiveFencingException e) {
} catch (final ActiveFencingException e) {
queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated));
} catch (Throwable e) {
} catch (final Throwable e) {
s_logger.error("Unexpected exception in cluster heartbeat", e);
if (isRootCauseConnectionRelated(e.getCause())) {
invalidHeartbeatConnection();
@ -598,7 +605,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
private boolean isRootCauseConnectionRelated(Throwable e) {
while (e != null) {
if (e instanceof SQLRecoverableException || e instanceof SQLNonTransientException) {
return true;
return true;
}
e = e.getCause();
@ -609,7 +616,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
private Connection getHeartbeatConnection() throws SQLException {
if (_heartbeatConnection == null) {
Connection conn = TransactionLegacy.getStandaloneConnectionWithException();
final Connection conn = TransactionLegacy.getStandaloneConnectionWithException();
_heartbeatConnection = new ConnectionConcierge("ClusterManagerHeartbeat", conn, false);
}
@ -618,7 +625,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
private void invalidHeartbeatConnection() {
if (_heartbeatConnection != null) {
Connection conn = TransactionLegacy.getStandaloneConnection();
final Connection conn = TransactionLegacy.getStandaloneConnection();
if (conn != null) {
_heartbeatConnection.reset(conn);
} else {
@ -638,7 +645,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
synchronized (_notificationMsgs) {
try {
_notificationMsgs.wait(1000);
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
}
}
@ -646,94 +653,94 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
while ((msg = getNextNotificationMessage()) != null) {
try {
switch (msg.getMessageType()) {
case nodeAdded:
if (msg.getNodes() != null && msg.getNodes().size() > 0) {
Profiler profiler = new Profiler();
profiler.start();
case nodeAdded:
if (msg.getNodes() != null && msg.getNodes().size() > 0) {
final Profiler profiler = new Profiler();
profiler.start();
notifyNodeJoined(msg.getNodes());
notifyNodeJoined(msg.getNodes());
profiler.stop();
if (profiler.getDuration() > 1000) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Notifying management server join event took " + profiler.getDuration() + " ms");
}
} else {
s_logger.warn("Notifying management server join event took " + profiler.getDuration() + " ms");
profiler.stop();
if (profiler.getDuration() > 1000) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Notifying management server join event took " + profiler.getDuration() + " ms");
}
} else {
s_logger.warn("Notifying management server join event took " + profiler.getDuration() + " ms");
}
break;
}
break;
case nodeRemoved:
if (msg.getNodes() != null && msg.getNodes().size() > 0) {
Profiler profiler = new Profiler();
profiler.start();
case nodeRemoved:
if (msg.getNodes() != null && msg.getNodes().size() > 0) {
final Profiler profiler = new Profiler();
profiler.start();
notifyNodeLeft(msg.getNodes());
notifyNodeLeft(msg.getNodes());
profiler.stop();
if (profiler.getDuration() > 1000) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Notifying management server leave event took " + profiler.getDuration() + " ms");
}
} else {
s_logger.warn("Notifying management server leave event took " + profiler.getDuration() + " ms");
profiler.stop();
if (profiler.getDuration() > 1000) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Notifying management server leave event took " + profiler.getDuration() + " ms");
}
} else {
s_logger.warn("Notifying management server leave event took " + profiler.getDuration() + " ms");
}
break;
}
break;
case nodeIsolated:
notifyNodeIsolated();
break;
case nodeIsolated:
notifyNodeIsolated();
break;
default:
assert (false);
break;
default:
assert false;
break;
}
} catch (Throwable e) {
} catch (final Throwable e) {
s_logger.warn("Unexpected exception during cluster notification. ", e);
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
}
}
}
};
}
private void queueNotification(ClusterManagerMessage msg) {
private void queueNotification(final ClusterManagerMessage msg) {
synchronized (_notificationMsgs) {
_notificationMsgs.add(msg);
_notificationMsgs.notifyAll();
}
switch (msg.getMessageType()) {
case nodeAdded: {
List<ManagementServerHostVO> l = msg.getNodes();
if (l != null && l.size() > 0) {
for (ManagementServerHostVO mshost : l) {
_mshostPeerDao.updatePeerInfo(_mshostId, mshost.getId(), mshost.getRunid(), ManagementServerHost.State.Up);
}
case nodeAdded: {
final List<ManagementServerHostVO> l = msg.getNodes();
if (l != null && l.size() > 0) {
for (final ManagementServerHostVO mshost : l) {
_mshostPeerDao.updatePeerInfo(_mshostId, mshost.getId(), mshost.getRunid(), ManagementServerHost.State.Up);
}
}
break;
}
break;
case nodeRemoved: {
List<ManagementServerHostVO> l = msg.getNodes();
if (l != null && l.size() > 0) {
for (ManagementServerHostVO mshost : l) {
_mshostPeerDao.updatePeerInfo(_mshostId, mshost.getId(), mshost.getRunid(), ManagementServerHost.State.Down);
}
case nodeRemoved: {
final List<ManagementServerHostVO> l = msg.getNodes();
if (l != null && l.size() > 0) {
for (final ManagementServerHostVO mshost : l) {
_mshostPeerDao.updatePeerInfo(_mshostId, mshost.getId(), mshost.getRunid(), ManagementServerHost.State.Down);
}
}
break;
}
break;
default:
break;
default:
break;
}
}
@ -750,14 +757,14 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
private void initPeerScan() {
// upon startup, for all inactive management server nodes that we see at startup time, we will send notification also to help upper layer perform
// missed cleanup
Date cutTime = DateUtil.currentGMTTime();
List<ManagementServerHostVO> inactiveList = _mshostDao.getInactiveList(new Date(cutTime.getTime() - HeartbeatThreshold.value()));
final Date cutTime = DateUtil.currentGMTTime();
final List<ManagementServerHostVO> inactiveList = _mshostDao.getInactiveList(new Date(cutTime.getTime() - HeartbeatThreshold.value()));
// We don't have foreign key constraints to enforce the mgmt_server_id integrity in host table, when user manually
// remove records from mshost table, this will leave orphan mgmt_serve_id reference in host table.
List<Long> orphanList = _mshostDao.listOrphanMsids();
final List<Long> orphanList = _mshostDao.listOrphanMsids();
if (orphanList.size() > 0) {
for (Long orphanMsid : orphanList) {
for (final Long orphanMsid : orphanList) {
// construct fake ManagementServerHostVO based on orphan MSID
s_logger.info("Add orphan management server msid found in host table to initial clustering notification, orphan msid: " + orphanMsid);
inactiveList.add(new ManagementServerHostVO(orphanMsid, 0, "orphan", 0, new Date()));
@ -769,55 +776,57 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
if (inactiveList.size() > 0) {
if (s_logger.isInfoEnabled()) {
s_logger.info("Found " + inactiveList.size() + " inactive management server node based on timestamp");
for (ManagementServerHostVO host : inactiveList)
for (final ManagementServerHostVO host : inactiveList) {
s_logger.info("management server node msid: " + host.getMsid() + ", name: " + host.getName() + ", service ip: " + host.getServiceIP() +
", version: " + host.getVersion());
", version: " + host.getVersion());
}
}
List<ManagementServerHostVO> downHostList = new ArrayList<ManagementServerHostVO>();
for (ManagementServerHostVO host : inactiveList) {
final List<ManagementServerHostVO> downHostList = new ArrayList<ManagementServerHostVO>();
for (final ManagementServerHostVO host : inactiveList) {
if (!pingManagementNode(host)) {
s_logger.warn("Management node " + host.getId() + " is detected inactive by timestamp and also not pingable");
downHostList.add(host);
}
}
if (downHostList.size() > 0)
if (downHostList.size() > 0) {
queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, downHostList));
}
} else {
s_logger.info("No inactive management server node found");
}
}
private void peerScan() throws ActiveFencingException {
Date cutTime = DateUtil.currentGMTTime();
final Date cutTime = DateUtil.currentGMTTime();
Profiler profiler = new Profiler();
final Profiler profiler = new Profiler();
profiler.start();
Profiler profilerQueryActiveList = new Profiler();
final Profiler profilerQueryActiveList = new Profiler();
profilerQueryActiveList.start();
List<ManagementServerHostVO> currentList = _mshostDao.getActiveList(new Date(cutTime.getTime() - HeartbeatThreshold.value()));
final List<ManagementServerHostVO> currentList = _mshostDao.getActiveList(new Date(cutTime.getTime() - HeartbeatThreshold.value()));
profilerQueryActiveList.stop();
Profiler profilerSyncClusterInfo = new Profiler();
final Profiler profilerSyncClusterInfo = new Profiler();
profilerSyncClusterInfo.start();
List<ManagementServerHostVO> removedNodeList = new ArrayList<ManagementServerHostVO>();
List<ManagementServerHostVO> invalidatedNodeList = new ArrayList<ManagementServerHostVO>();
final List<ManagementServerHostVO> removedNodeList = new ArrayList<ManagementServerHostVO>();
final List<ManagementServerHostVO> invalidatedNodeList = new ArrayList<ManagementServerHostVO>();
if (_mshostId != null) {
if (_mshostPeerDao.countStateSeenInPeers(_mshostId, _runId, ManagementServerHost.State.Down) > 0) {
String msg =
"We have detected that at least one management server peer reports that this management server is down, perform active fencing to avoid split-brain situation";
final String msg =
"We have detected that at least one management server peer reports that this management server is down, perform active fencing to avoid split-brain situation";
s_logger.error(msg);
throw new ActiveFencingException(msg);
}
// only if we have already attached to cluster, will we start to check leaving nodes
for (Map.Entry<Long, ManagementServerHostVO> entry : _activePeers.entrySet()) {
for (final Map.Entry<Long, ManagementServerHostVO> entry : _activePeers.entrySet()) {
ManagementServerHostVO current = getInListById(entry.getKey(), currentList);
final ManagementServerHostVO current = getInListById(entry.getKey(), currentList);
if (current == null) {
if (entry.getKey().longValue() != _mshostId.longValue()) {
if (s_logger.isDebugEnabled()) {
@ -830,7 +839,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
if (entry.getKey().longValue() != _mshostId.longValue()) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Detected management node left because of invalidated session, id:" + entry.getKey() + ", nodeIP:" +
entry.getValue().getServiceIP());
entry.getValue().getServiceIP());
}
invalidatedNodeList.add(entry.getValue());
}
@ -848,15 +857,15 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
profilerSyncClusterInfo.stop();
Profiler profilerInvalidatedNodeList = new Profiler();
final Profiler profilerInvalidatedNodeList = new Profiler();
profilerInvalidatedNodeList.start();
// process invalidated node list
if (invalidatedNodeList.size() > 0) {
for (ManagementServerHostVO mshost : invalidatedNodeList) {
for (final ManagementServerHostVO mshost : invalidatedNodeList) {
_activePeers.remove(mshost.getId());
try {
JmxUtil.unregisterMBean("ClusterManager", "Node " + mshost.getId());
} catch (Exception e) {
} catch (final Exception e) {
s_logger.warn("Unable to deregiester cluster node from JMX monitoring due to exception " + e.toString());
}
}
@ -865,18 +874,18 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
profilerInvalidatedNodeList.stop();
Profiler profilerRemovedList = new Profiler();
final Profiler profilerRemovedList = new Profiler();
profilerRemovedList.start();
// process removed node list
Iterator<ManagementServerHostVO> it = removedNodeList.iterator();
final Iterator<ManagementServerHostVO> it = removedNodeList.iterator();
while (it.hasNext()) {
ManagementServerHostVO mshost = it.next();
final ManagementServerHostVO mshost = it.next();
if (!pingManagementNode(mshost)) {
s_logger.warn("Management node " + mshost.getId() + " is detected inactive by timestamp and also not pingable");
_activePeers.remove(mshost.getId());
try {
JmxUtil.unregisterMBean("ClusterManager", "Node " + mshost.getId());
} catch (Exception e) {
} catch (final Exception e) {
s_logger.warn("Unable to deregiester cluster node from JMX monitoring due to exception " + e.toString());
}
} else {
@ -890,8 +899,8 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
profilerRemovedList.stop();
List<ManagementServerHostVO> newNodeList = new ArrayList<ManagementServerHostVO>();
for (ManagementServerHostVO mshost : currentList) {
final List<ManagementServerHostVO> newNodeList = new ArrayList<ManagementServerHostVO>();
for (final ManagementServerHostVO mshost : currentList) {
if (!_activePeers.containsKey(mshost.getId())) {
_activePeers.put(mshost.getId(), mshost);
@ -902,7 +911,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
try {
JmxUtil.registerMBean("ClusterManager", "Node " + mshost.getId(), new ClusterManagerMBeanImpl(this, mshost));
} catch (Exception e) {
} catch (final Exception e) {
s_logger.warn("Unable to regiester cluster node into JMX monitoring due to exception " + ExceptionUtil.toString(e));
}
}
@ -915,15 +924,16 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
profiler.stop();
if (profiler.getDuration() >= HeartbeatInterval.value()) {
if (s_logger.isDebugEnabled())
if (s_logger.isDebugEnabled()) {
s_logger.debug("Peer scan takes too long to finish. profiler: " + profiler.toString() + ", profilerQueryActiveList: " +
profilerQueryActiveList.toString() + ", profilerSyncClusterInfo: " + profilerSyncClusterInfo.toString() + ", profilerInvalidatedNodeList: " +
profilerInvalidatedNodeList.toString() + ", profilerRemovedList: " + profilerRemovedList.toString());
profilerQueryActiveList.toString() + ", profilerSyncClusterInfo: " + profilerSyncClusterInfo.toString() + ", profilerInvalidatedNodeList: " +
profilerInvalidatedNodeList.toString() + ", profilerRemovedList: " + profilerRemovedList.toString());
}
}
}
private static ManagementServerHostVO getInListById(Long id, List<ManagementServerHostVO> l) {
for (ManagementServerHostVO mshost : l) {
private static ManagementServerHostVO getInListById(final Long id, final List<ManagementServerHostVO> l) {
for (final ManagementServerHostVO mshost : l) {
if (mshost.getId() == id) {
return mshost;
}
@ -938,12 +948,12 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
s_logger.info("Starting Cluster manager, msid : " + _msId);
}
ManagementServerHostVO mshost = Transaction.execute(new TransactionCallback<ManagementServerHostVO>() {
final ManagementServerHostVO mshost = Transaction.execute(new TransactionCallback<ManagementServerHostVO>() {
@Override
public ManagementServerHostVO doInTransaction(TransactionStatus status) {
public ManagementServerHostVO doInTransaction(final TransactionStatus status) {
final Class<?> c = this.getClass();
String version = c.getPackage().getImplementationVersion();
final String version = c.getPackage().getImplementationVersion();
ManagementServerHostVO mshost = _mshostDao.findByMsid(_msId);
if (mshost == null) {
@ -964,7 +974,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
} else {
_mshostDao.update(mshost.getId(), _runId, NetUtils.getHostName(), version, _clusterNodeIP, _currentServiceAdapter.getServicePort(),
DateUtil.currentGMTTime());
DateUtil.currentGMTTime());
if (s_logger.isInfoEnabled()) {
s_logger.info("Management server " + _msId + ", runId " + _runId + " is being started");
}
@ -1000,7 +1010,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
if (_mshostId != null) {
ManagementServerHostVO mshost = _mshostDao.findByMsid(_msId);
final ManagementServerHostVO mshost = _mshostDao.findByMsid(_msId);
mshost.setState(ManagementServerHost.State.Down);
_mshostDao.update(_mshostId, mshost);
}
@ -1011,7 +1021,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
try {
_heartbeatScheduler.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
_executor.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
}
if (s_logger.isInfoEnabled()) {
@ -1022,12 +1032,12 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
public boolean configure(final String name, final Map<String, Object> params) throws ConfigurationException {
if (s_logger.isInfoEnabled()) {
s_logger.info("Start configuring cluster manager : " + name);
}
Properties dbProps = DbProperties.getDbProperties();
final Properties dbProps = DbProperties.getDbProperties();
_clusterNodeIP = dbProps.getProperty("cluster.node.IP");
if (_clusterNodeIP == null) {
_clusterNodeIP = "127.0.0.1";
@ -1042,8 +1052,9 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
throw new ConfigurationException("cluster node IP should be valid local address where the server is running, please check your configuration");
}
for (int i = 0; i < DEFAULT_OUTGOING_WORKERS; i++)
for (int i = 0; i < DEFAULT_OUTGOING_WORKERS; i++) {
_executor.execute(getClusterPduSendingTask());
}
// notification task itself in turn works as a task dispatcher
_executor.execute(getClusterPduNotificationTask());
@ -1076,16 +1087,16 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
@Override
public long getManagementRunId(long msId) {
ManagementServerHostVO mshost = _mshostDao.findByMsid(msId);
public long getManagementRunId(final long msId) {
final ManagementServerHostVO mshost = _mshostDao.findByMsid(msId);
if (mshost != null) {
return mshost.getRunid();
}
return -1;
}
public boolean isManagementNodeAlive(long msid) {
ManagementServerHostVO mshost = _mshostDao.findByMsid(msid);
public boolean isManagementNodeAlive(final long msid) {
final ManagementServerHostVO mshost = _mshostDao.findByMsid(msid);
if (mshost != null) {
if (mshost.getLastUpdateTime().getTime() >= DateUtil.currentGMTTime().getTime() - HeartbeatThreshold.value()) {
return true;
@ -1095,8 +1106,8 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
return false;
}
public boolean pingManagementNode(long msid) {
ManagementServerHostVO mshost = _mshostDao.findByMsid(msid);
public boolean pingManagementNode(final long msid) {
final ManagementServerHostVO mshost = _mshostDao.findByMsid(msid);
if (mshost == null) {
return false;
}
@ -1114,9 +1125,9 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
return new ConfigKey<?>[] {HeartbeatInterval, HeartbeatThreshold};
}
private boolean pingManagementNode(ManagementServerHostVO mshost) {
private boolean pingManagementNode(final ManagementServerHostVO mshost) {
String targetIp = mshost.getServiceIP();
final String targetIp = mshost.getServiceIP();
if ("127.0.0.1".equals(targetIp) || "0.0.0.0".equals(targetIp)) {
s_logger.info("ping management node cluster service can not be performed on self");
return false;
@ -1131,10 +1142,10 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
sch.configureBlocking(true);
sch.socket().setSoTimeout(5000);
InetSocketAddress addr = new InetSocketAddress(targetIp, mshost.getServicePort());
final InetSocketAddress addr = new InetSocketAddress(targetIp, mshost.getServicePort());
sch.connect(addr);
return true;
} catch (IOException e) {
} catch (final IOException e) {
if (e instanceof ConnectException) {
s_logger.error("Unable to ping management server at " + targetIp + ":" + mshost.getServicePort() + " due to ConnectException", e);
return false;
@ -1143,14 +1154,14 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
if (sch != null) {
try {
sch.close();
} catch (IOException e) {
} catch (final IOException e) {
}
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
} catch (final InterruptedException ex) {
}
}
@ -1163,31 +1174,31 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
private void checkConflicts() throws ConfigurationException {
Date cutTime = DateUtil.currentGMTTime();
List<ManagementServerHostVO> peers = _mshostDao.getActiveList(new Date(cutTime.getTime() - HeartbeatThreshold.value()));
for (ManagementServerHostVO peer : peers) {
String peerIP = peer.getServiceIP().trim();
final Date cutTime = DateUtil.currentGMTTime();
final List<ManagementServerHostVO> peers = _mshostDao.getActiveList(new Date(cutTime.getTime() - HeartbeatThreshold.value()));
for (final ManagementServerHostVO peer : peers) {
final String peerIP = peer.getServiceIP().trim();
if (_clusterNodeIP.equals(peerIP)) {
if ("127.0.0.1".equals(_clusterNodeIP)) {
if (pingManagementNode(peer.getMsid())) {
String msg = "Detected another management node with localhost IP is already running, please check your cluster configuration";
final String msg = "Detected another management node with localhost IP is already running, please check your cluster configuration";
s_logger.error(msg);
throw new ConfigurationException(msg);
} else {
String msg =
"Detected another management node with localhost IP is considered as running in DB, however it is not pingable, we will continue cluster initialization with this management server node";
final String msg =
"Detected another management node with localhost IP is considered as running in DB, however it is not pingable, we will continue cluster initialization with this management server node";
s_logger.info(msg);
}
} else {
if (pingManagementNode(peer.getMsid())) {
String msg =
"Detected that another management node with the same IP " + peer.getServiceIP() +
final String msg =
"Detected that another management node with the same IP " + peer.getServiceIP() +
" is already running, please check your cluster configuration";
s_logger.error(msg);
throw new ConfigurationException(msg);
} else {
String msg =
"Detected that another management node with the same IP " + peer.getServiceIP() +
final String msg =
"Detected that another management node with the same IP " + peer.getServiceIP() +
" is considered as running in DB, however it is not pingable, we will continue cluster initialization with this management server node";
s_logger.info(msg);
}