diff --git a/core/src/com/cloud/agent/api/ScheduleHostScanTaskCommand.java b/core/src/com/cloud/agent/api/ScheduleHostScanTaskCommand.java old mode 100755 new mode 100644 diff --git a/framework/cluster/pom.xml b/framework/cluster/pom.xml new file mode 100644 index 00000000000..116b6353258 --- /dev/null +++ b/framework/cluster/pom.xml @@ -0,0 +1,28 @@ + + + 4.0.0 + cloud-framework-cluster + Apache CloudStack Framework - Clustering + + org.apache.cloudstack + cloudstack-framework + 4.2.0-SNAPSHOT + ../pom.xml + + + + org.apache.cloudstack + cloud-utils + ${project.version} + + + diff --git a/server/src/com/cloud/cluster/ActiveFencingException.java b/framework/cluster/src/com/cloud/cluster/ActiveFencingException.java similarity index 91% rename from server/src/com/cloud/cluster/ActiveFencingException.java rename to framework/cluster/src/com/cloud/cluster/ActiveFencingException.java index 512219d07b7..a4450864d05 100644 --- a/server/src/com/cloud/cluster/ActiveFencingException.java +++ b/framework/cluster/src/com/cloud/cluster/ActiveFencingException.java @@ -16,9 +16,8 @@ // under the License. package com.cloud.cluster; -import com.cloud.exception.CloudException; -public class ActiveFencingException extends CloudException { +public class ActiveFencingException extends Exception { private static final long serialVersionUID = -3975376101728211726L; public ActiveFencingException(String message) { diff --git a/server/src/com/cloud/cluster/ClusterFenceManager.java b/framework/cluster/src/com/cloud/cluster/ClusterFenceManager.java similarity index 100% rename from server/src/com/cloud/cluster/ClusterFenceManager.java rename to framework/cluster/src/com/cloud/cluster/ClusterFenceManager.java diff --git a/server/src/com/cloud/cluster/ClusterFenceManagerImpl.java b/framework/cluster/src/com/cloud/cluster/ClusterFenceManagerImpl.java similarity index 100% rename from server/src/com/cloud/cluster/ClusterFenceManagerImpl.java rename to framework/cluster/src/com/cloud/cluster/ClusterFenceManagerImpl.java diff --git a/engine/schema/src/com/cloud/cluster/ClusterInvalidSessionException.java b/framework/cluster/src/com/cloud/cluster/ClusterInvalidSessionException.java similarity index 90% rename from engine/schema/src/com/cloud/cluster/ClusterInvalidSessionException.java rename to framework/cluster/src/com/cloud/cluster/ClusterInvalidSessionException.java index 8ac94f27d54..e9378b77468 100644 --- a/engine/schema/src/com/cloud/cluster/ClusterInvalidSessionException.java +++ b/framework/cluster/src/com/cloud/cluster/ClusterInvalidSessionException.java @@ -16,9 +16,8 @@ // under the License. package com.cloud.cluster; -import com.cloud.exception.CloudException; -public class ClusterInvalidSessionException extends CloudException { +public class ClusterInvalidSessionException extends Exception { private static final long serialVersionUID = -6636524194520997512L; diff --git a/framework/cluster/src/com/cloud/cluster/ClusterManager.java b/framework/cluster/src/com/cloud/cluster/ClusterManager.java new file mode 100644 index 00000000000..51d993ea473 --- /dev/null +++ b/framework/cluster/src/com/cloud/cluster/ClusterManager.java @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package com.cloud.cluster; + +import com.cloud.utils.component.Manager; + +public interface ClusterManager extends Manager { + static final String ALERT_SUBJECT = "cluster-alert"; + + void OnReceiveClusterServicePdu(ClusterServicePdu pdu); + + /** + * This executes + * @param strPeer + * @param agentId + * @param cmds + * @param stopOnError + * @return + */ + String execute(String strPeer, long agentId, String cmds, boolean stopOnError); + + /** + * Broadcast the command to all of the management server nodes. + * @param agentId agent id this broadcast is regarding + * @param cmds commands to broadcast + */ + void broadcast(long agentId, String cmds); + + int getHeartbeatThreshold(); + + void registerListener(ClusterManagerListener listener); + void unregisterListener(ClusterManagerListener listener); + + void registerDispatcher(Dispatcher dispatcher); + + ManagementServerHost getPeer(String peerName); + + String getSelfPeerName(); + + public interface Dispatcher { + String getName(); + String dispatch(ClusterServicePdu pdu); + } +} diff --git a/server/src/com/cloud/cluster/ClusterManagerImpl.java b/framework/cluster/src/com/cloud/cluster/ClusterManagerImpl.java old mode 100755 new mode 100644 similarity index 72% rename from server/src/com/cloud/cluster/ClusterManagerImpl.java rename to framework/cluster/src/com/cloud/cluster/ClusterManagerImpl.java index 5a08f878625..dd7a6035a1a --- a/server/src/com/cloud/cluster/ClusterManagerImpl.java +++ b/framework/cluster/src/com/cloud/cluster/ClusterManagerImpl.java @@ -45,44 +45,21 @@ import javax.naming.ConfigurationException; import org.apache.log4j.Logger; -import com.cloud.agent.AgentManager; -import com.cloud.agent.AgentManager.OnError; -import com.cloud.agent.api.Answer; -import com.cloud.agent.api.ChangeAgentAnswer; -import com.cloud.agent.api.ChangeAgentCommand; -import com.cloud.agent.api.Command; -import com.cloud.agent.api.PropagateResourceEventCommand; -import com.cloud.agent.api.TransferAgentCommand; -import com.cloud.agent.api.ScheduleHostScanTaskCommand; -import com.cloud.agent.manager.ClusteredAgentManagerImpl; -import com.cloud.agent.manager.Commands; -import com.cloud.cluster.agentlb.dao.HostTransferMapDao; +import org.apache.cloudstack.config.ConfigDepot; +import org.apache.cloudstack.config.ConfigKey; +import org.apache.cloudstack.config.ConfigValue; +import org.apache.cloudstack.utils.identity.ManagementServerNode; + import com.cloud.cluster.dao.ManagementServerHostDao; import com.cloud.cluster.dao.ManagementServerHostPeerDao; -import com.cloud.configuration.Config; -import com.cloud.configuration.dao.ConfigurationDao; -import com.cloud.exception.AgentUnavailableException; -import com.cloud.exception.OperationTimedoutException; -import com.cloud.host.Host; -import com.cloud.host.HostVO; -import com.cloud.host.Status.Event; -import com.cloud.host.dao.HostDao; -import com.cloud.resource.ResourceManager; -import com.cloud.resource.ResourceState; -import com.cloud.serializer.GsonHelper; import com.cloud.utils.DateUtil; -import com.cloud.utils.NumbersUtil; import com.cloud.utils.Profiler; import com.cloud.utils.PropertiesUtil; -import com.cloud.utils.component.ComponentContext; import com.cloud.utils.component.ComponentLifecycle; import com.cloud.utils.component.ManagerBase; import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.db.ConnectionConcierge; import com.cloud.utils.db.DB; -import com.cloud.utils.db.SearchCriteria.Op; -import com.cloud.utils.db.SearchCriteria2; -import com.cloud.utils.db.SearchCriteriaService; import com.cloud.utils.db.Transaction; import com.cloud.utils.events.SubscriptionMgr; import com.cloud.utils.exception.CloudRuntimeException; @@ -90,31 +67,22 @@ import com.cloud.utils.exception.ExceptionUtil; import com.cloud.utils.mgmt.JmxUtil; import com.cloud.utils.net.NetUtils; -import com.google.gson.Gson; - -import org.apache.cloudstack.utils.identity.ManagementServerNode; - @Local(value = { ClusterManager.class }) public class ClusterManagerImpl extends ManagerBase implements ClusterManager { private static final Logger s_logger = Logger.getLogger(ClusterManagerImpl.class); private static final int EXECUTOR_SHUTDOWN_TIMEOUT = 1000; // 1 second - private static final int DEFAULT_OUTGOING_WORKERS = 5; + private static final int DEFAULT_OUTGOING_WORKERS = 5; private final List _listeners = new ArrayList(); private final Map _activePeers = new HashMap(); - private int _heartbeatInterval = ClusterManager.DEFAULT_HEARTBEAT_INTERVAL; - private int _heartbeatThreshold = ClusterManager.DEFAULT_HEARTBEAT_THRESHOLD; + private ConfigValue _heartbeatInterval; + private ConfigValue _heartbeatThreshold; private final Map _clusterPeers; - private final Gson _gson; @Inject - private AgentManager _agentMgr; - @Inject - private ClusteredAgentRebalanceService _rebalanceService; - @Inject - private ResourceManager _resourceMgr; + protected ConfigDepot _configDepot; private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-Heartbeat")); private final ExecutorService _notificationExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("Cluster-Notification")); @@ -130,9 +98,8 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { @Inject private ManagementServerHostDao _mshostDao; @Inject private ManagementServerHostPeerDao _mshostPeerDao; - @Inject private HostDao _hostDao; - @Inject private HostTransferMapDao _hostTransferDao; - @Inject private ConfigurationDao _configDao; + + protected Dispatcher _dispatcher; // // pay attention to _mshostId and _msid @@ -146,9 +113,6 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { private boolean _peerScanInited = false; private String _clusterNodeIP = "127.0.0.1"; - private boolean _agentLBEnabled = false; - private double _connectedAgentsThreshold = 0.7; - private static boolean _agentLbHappened = false; private final List _clusterPduOutgoingQueue = new ArrayList(); private final List _clusterPduIncomingQueue = new ArrayList(); @@ -157,8 +121,6 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { public ClusterManagerImpl() { _clusterPeers = new HashMap(); - _gson = GsonHelper.getGson(); - // executor to perform remote-calls in another thread context, to avoid potential // recursive remote calls between nodes // @@ -172,6 +134,11 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { } } + @Override + public void registerDispatcher(Dispatcher dispatcher) { + _dispatcher = dispatcher; + } + private ClusterServiceRequestPdu popRequestPdu(long ackSequenceId) { synchronized(_outgoingPdusWaitingForAck) { if(_outgoingPdusWaitingForAck.get(ackSequenceId) != null) { @@ -198,7 +165,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { } for(ClusterServiceRequestPdu pdu : candidates) { - s_logger.warn("Cancel cluster request PDU to peer: " + strPeer + ", pdu: " + _gson.toJson(pdu)); + s_logger.warn("Cancel cluster request PDU to peer: " + strPeer + ", pdu: " + pdu.getJsonPackage()); synchronized(pdu) { pdu.notifyAll(); } @@ -287,7 +254,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { if(peerService != null) { try { if(s_logger.isDebugEnabled()) { - s_logger.debug("Cluster PDU " + getSelfPeerName() + " -> " + pdu.getDestPeer() + ". agent: " + pdu.getAgentId() + s_logger.debug("Cluster PDU " + getSelfPeerName() + " -> " + pdu.getDestPeer() + ". agent: " + pdu.getAgentId() + ", pdu seq: " + pdu.getSequenceId() + ", pdu ack seq: " + pdu.getAckSequenceId() + ", json: " + pdu.getJsonPackage()); } @@ -295,7 +262,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { String strResult = peerService.execute(pdu); if(s_logger.isDebugEnabled()) { s_logger.debug("Cluster PDU " + getSelfPeerName() + " -> " + pdu.getDestPeer() + " completed. time: " + - (System.currentTimeMillis() - startTick) + "ms. agent: " + pdu.getAgentId() + (System.currentTimeMillis() - startTick) + "ms. agent: " + pdu.getAgentId() + ", pdu seq: " + pdu.getSequenceId() + ", pdu ack seq: " + pdu.getAckSequenceId() + ", json: " + pdu.getJsonPackage()); } @@ -335,10 +302,10 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { requestPdu.notifyAll(); } } else { - s_logger.warn("Original request has already been cancelled. pdu: " + _gson.toJson(pdu)); + s_logger.warn("Original request has already been cancelled. pdu: " + pdu.getJsonPackage()); } } else { - String result = dispatchClusterServicePdu(pdu); + String result = _dispatcher.dispatch(pdu); if(result == null) result = ""; @@ -361,187 +328,14 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { } } - private String handleScheduleHostScanTaskCommand(ScheduleHostScanTaskCommand cmd) { - if (s_logger.isDebugEnabled()) { - s_logger.debug("Intercepting resource manager command: " + _gson.toJson(cmd)); - } - try { - // schedule a scan task immediately - if (_agentMgr instanceof ClusteredAgentManagerImpl) { - if (s_logger.isDebugEnabled()) { - s_logger.debug("Received notification as part of addHost command to start a host scan task"); - } - ClusteredAgentManagerImpl clusteredAgentMgr = (ClusteredAgentManagerImpl)_agentMgr; - clusteredAgentMgr.scheduleHostScanTask(); - } - } catch (Exception e) { - // Scheduling host scan task in peer MS is a best effort operation during host add, regular host scan - // happens at fixed intervals anyways. So handling any exceptions that may be thrown - s_logger.warn("Exception happened while trying to schedule host scan task on mgmt server " + getSelfPeerName() + ", ignoring as regular host scan happens at fixed interval anyways", e); - return null; - } - - Answer[] answers = new Answer[1]; - answers[0] = new Answer(cmd, true, null); - return _gson.toJson(answers); - } - - private String dispatchClusterServicePdu(ClusterServicePdu pdu) { - - if(s_logger.isDebugEnabled()) { - s_logger.debug("Dispatch ->" + pdu.getAgentId() + ", json: " + pdu.getJsonPackage()); - } - - Command [] cmds = null; - try { - cmds = _gson.fromJson(pdu.getJsonPackage(), Command[].class); - } catch(Throwable e) { - assert(false); - s_logger.error("Excection in gson decoding : ", e); - } - - if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) { //intercepted - ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0]; - - if (s_logger.isDebugEnabled()) { - s_logger.debug("Intercepting command for agent change: agent " + cmd.getAgentId() + " event: " + cmd.getEvent()); - } - boolean result = false; - try { - result = executeAgentUserRequest(cmd.getAgentId(), cmd.getEvent()); - if (s_logger.isDebugEnabled()) { - s_logger.debug("Result is " + result); - } - - } catch (AgentUnavailableException e) { - s_logger.warn("Agent is unavailable", e); - return null; - } - - Answer[] answers = new Answer[1]; - answers[0] = new ChangeAgentAnswer(cmd, result); - return _gson.toJson(answers); - } else if (cmds.length == 1 && cmds[0] instanceof TransferAgentCommand) { - TransferAgentCommand cmd = (TransferAgentCommand) cmds[0]; - - if (s_logger.isDebugEnabled()) { - s_logger.debug("Intercepting command for agent rebalancing: agent " + cmd.getAgentId() + " event: " + cmd.getEvent()); - } - boolean result = false; - try { - result = rebalanceAgent(cmd.getAgentId(), cmd.getEvent(), cmd.getCurrentOwner(), cmd.getFutureOwner()); - if (s_logger.isDebugEnabled()) { - s_logger.debug("Result is " + result); - } - - } catch (AgentUnavailableException e) { - s_logger.warn("Agent is unavailable", e); - return null; - } catch (OperationTimedoutException e) { - s_logger.warn("Operation timed out", e); - return null; - } - Answer[] answers = new Answer[1]; - answers[0] = new Answer(cmd, result, null); - return _gson.toJson(answers); - } else if (cmds.length == 1 && cmds[0] instanceof PropagateResourceEventCommand ) { - PropagateResourceEventCommand cmd = (PropagateResourceEventCommand) cmds[0]; - - s_logger.debug("Intercepting command to propagate event " + cmd.getEvent().name() + " for host " + cmd.getHostId()); - - boolean result = false; - try { - result = executeResourceUserRequest(cmd.getHostId(), cmd.getEvent()); - s_logger.debug("Result is " + result); - } catch (AgentUnavailableException ex) { - s_logger.warn("Agent is unavailable", ex); - return null; - } - - Answer[] answers = new Answer[1]; - answers[0] = new Answer(cmd, result, null); - return _gson.toJson(answers); - } else if (cmds.length == 1 && cmds[0] instanceof ScheduleHostScanTaskCommand) { - ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand) cmds[0]; - String response = handleScheduleHostScanTaskCommand(cmd); - return response; - } - - try { - long startTick = System.currentTimeMillis(); - if(s_logger.isDebugEnabled()) { - s_logger.debug("Dispatch -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage()); - } - - Answer[] answers = sendToAgent(pdu.getAgentId(), cmds, pdu.isStopOnError()); - if(answers != null) { - String jsonReturn = _gson.toJson(answers); - - if(s_logger.isDebugEnabled()) { - s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() + - " in " + (System.currentTimeMillis() - startTick) + " ms, return result: " + jsonReturn); - } - - return jsonReturn; - } else { - if(s_logger.isDebugEnabled()) { - s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() + - " in " + (System.currentTimeMillis() - startTick) + " ms, return null result"); - } - } - } catch(AgentUnavailableException e) { - s_logger.warn("Agent is unavailable", e); - } catch (OperationTimedoutException e) { - s_logger.warn("Timed Out", e); - } - - return null; - } @Override public void OnReceiveClusterServicePdu(ClusterServicePdu pdu) { addIncomingClusterPdu(pdu); } - @Override - public Answer[] sendToAgent(Long hostId, Command[] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException { - Commands commands = new Commands(stopOnError ? OnError.Stop : OnError.Continue); - for (Command cmd : cmds) { - commands.addCommand(cmd); - } - return _agentMgr.send(hostId, commands); - } - @Override - public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException { - return _agentMgr.executeUserRequest(agentId, event); - } - - @Override - public Boolean propagateAgentEvent(long agentId, Event event) throws AgentUnavailableException { - final String msPeer = getPeerName(agentId); - if (msPeer == null) { - return null; - } - - if (s_logger.isDebugEnabled()) { - s_logger.debug("Propagating agent change request event:" + event.toString() + " to agent:" + agentId); - } - Command[] cmds = new Command[1]; - cmds[0] = new ChangeAgentCommand(agentId, event); - - Answer[] answers = execute(msPeer, agentId, cmds, true); - if (answers == null) { - throw new AgentUnavailableException(agentId); - } - - if (s_logger.isDebugEnabled()) { - s_logger.debug("Result for agent change is " + answers[0].getResult()); - } - - return answers[0].getResult(); - } /** * called by DatabaseUpgradeChecker to see if there are other peers running. @@ -556,10 +350,10 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { } @Override - public void broadcast(long agentId, Command[] cmds) { + public void broadcast(long agentId, String cmds) { Date cutTime = DateUtil.currentGMTTime(); - List peers = _mshostDao.getActiveList(new Date(cutTime.getTime() - _heartbeatThreshold)); + List peers = _mshostDao.getActiveList(new Date(cutTime.getTime() - _heartbeatThreshold.value())); for (ManagementServerHostVO peer : peers) { String peerName = Long.toString(peer.getMsid()); if (getSelfPeerName().equals(peerName)) { @@ -567,7 +361,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { } try { if (s_logger.isDebugEnabled()) { - s_logger.debug("Forwarding " + cmds[0].toString() + " to " + peer.getMsid()); + s_logger.debug("Forwarding " + cmds + " to " + peer.getMsid()); } executeAsync(peerName, agentId, cmds, true); } catch (Exception e) { @@ -576,29 +370,27 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { } } - @Override - public void executeAsync(String strPeer, long agentId, Command [] cmds, boolean stopOnError) { + public void executeAsync(String strPeer, long agentId, String cmds, boolean stopOnError) { ClusterServicePdu pdu = new ClusterServicePdu(); pdu.setSourcePeer(getSelfPeerName()); pdu.setDestPeer(strPeer); pdu.setAgentId(agentId); - pdu.setJsonPackage(_gson.toJson(cmds, Command[].class)); + pdu.setJsonPackage(cmds); pdu.setStopOnError(true); addOutgoingClusterPdu(pdu); } @Override - public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError) { + public String execute(String strPeer, long agentId, String cmds, boolean stopOnError) { if(s_logger.isDebugEnabled()) { - s_logger.debug(getSelfPeerName() + " -> " + strPeer + "." + agentId + " " + - _gson.toJson(cmds, Command[].class)); + s_logger.debug(getSelfPeerName() + " -> " + strPeer + "." + agentId + " " + cmds); } ClusterServiceRequestPdu pdu = new ClusterServiceRequestPdu(); pdu.setSourcePeer(getSelfPeerName()); pdu.setDestPeer(strPeer); pdu.setAgentId(agentId); - pdu.setJsonPackage(_gson.toJson(cmds, Command[].class)); + pdu.setJsonPackage(cmds); pdu.setStopOnError(stopOnError); registerRequestPdu(pdu); addOutgoingClusterPdu(pdu); @@ -616,30 +408,12 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { } if(pdu.getResponseResult() != null && pdu.getResponseResult().length() > 0) { - try { - return _gson.fromJson(pdu.getResponseResult(), Answer[].class); - } catch(Throwable e) { - s_logger.error("Exception on parsing gson package from remote call to " + strPeer); - } + return pdu.getResponseResult(); } return null; } - @Override - public String getPeerName(long agentHostId) { - - HostVO host = _hostDao.findById(agentHostId); - if(host != null && host.getManagementServerId() != null) { - if(getSelfPeerName().equals(Long.toString(host.getManagementServerId()))) { - return null; - } - - return Long.toString(host.getManagementServerId()); - } - return null; - } - @Override public ManagementServerHostVO getPeer(String mgmtServerId) { return _mshostDao.findByMsid(Long.valueOf(mgmtServerId)); @@ -650,7 +424,6 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { return Long.toString(_msId); } - @Override public String getSelfNodeIP() { return _clusterNodeIP; } @@ -765,7 +538,6 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { Profiler profiler = new Profiler(); Profiler profilerHeartbeatUpdate = new Profiler(); Profiler profilerPeerScan = new Profiler(); - Profiler profilerAgentLB = new Profiler(); try { profiler.start(); @@ -792,40 +564,14 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { peerScan(); profilerPeerScan.stop(); - profilerAgentLB.start(); - //initiate agent lb task will be scheduled and executed only once, and only when number of agents loaded exceeds _connectedAgentsThreshold - if (_agentLBEnabled && !_agentLbHappened) { - SearchCriteriaService sc = SearchCriteria2.create(HostVO.class); - sc.addAnd(sc.getEntity().getManagementServerId(), Op.NNULL); - sc.addAnd(sc.getEntity().getType(), Op.EQ, Host.Type.Routing); - List allManagedRoutingAgents = sc.list(); - - sc = SearchCriteria2.create(HostVO.class); - sc.addAnd(sc.getEntity().getType(), Op.EQ, Host.Type.Routing); - List allAgents = sc.list(); - double allHostsCount = allAgents.size(); - double managedHostsCount = allManagedRoutingAgents.size(); - if (allHostsCount > 0.0) { - double load = managedHostsCount/allHostsCount; - if (load >= _connectedAgentsThreshold) { - s_logger.debug("Scheduling agent rebalancing task as the average agent load " + load + " is more than the threshold " + _connectedAgentsThreshold); - _rebalanceService.scheduleRebalanceAgents(); - _agentLbHappened = true; - } else { - s_logger.trace("Not scheduling agent rebalancing task as the averages load " + load + " is less than the threshold " + _connectedAgentsThreshold); - } - } - } - profilerAgentLB.stop(); } finally { profiler.stop(); - if(profiler.getDuration() >= _heartbeatInterval) { + if (profiler.getDuration() >= _heartbeatInterval.value()) { if(s_logger.isDebugEnabled()) - s_logger.debug("Management server heartbeat takes too long to finish. profiler: " + profiler.toString() + + s_logger.debug("Management server heartbeat takes too long to finish. profiler: " + profiler.toString() + ", profilerHeartbeatUpdate: " + profilerHeartbeatUpdate.toString() + - ", profilerPeerScan: " + profilerPeerScan.toString() + - ", profilerAgentLB: " + profilerAgentLB.toString()); + ", profilerPeerScan: " + profilerPeerScan.toString()); } } @@ -854,8 +600,8 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { invalidHeartbeatConnection(); } finally { - txn.transitToAutoManagedConnection(Transaction.CLOUD_DB); - txn.close("ClusterHeartBeat"); + txn.transitToAutoManagedConnection(Transaction.CLOUD_DB); + txn.close("ClusterHeartBeat"); } } }; @@ -964,9 +710,9 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { } private void queueNotification(ClusterManagerMessage msg) { - synchronized(this._notificationMsgs) { - this._notificationMsgs.add(msg); - this._notificationMsgs.notifyAll(); + synchronized(_notificationMsgs) { + _notificationMsgs.add(msg); + _notificationMsgs.notifyAll(); } switch(msg.getMessageType()) { @@ -999,9 +745,9 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { } private ClusterManagerMessage getNextNotificationMessage() { - synchronized(this._notificationMsgs) { - if(this._notificationMsgs.size() > 0) { - return this._notificationMsgs.remove(0); + synchronized(_notificationMsgs) { + if(_notificationMsgs.size() > 0) { + return _notificationMsgs.remove(0); } } @@ -1012,9 +758,9 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { // upon startup, for all inactive management server nodes that we see at startup time, we will send notification also to help upper layer perform // missed cleanup Date cutTime = DateUtil.currentGMTTime(); - List inactiveList = _mshostDao.getInactiveList(new Date(cutTime.getTime() - _heartbeatThreshold)); + List inactiveList = _mshostDao.getInactiveList(new Date(cutTime.getTime() - _heartbeatThreshold.value())); - // We don't have foreign key constraints to enforce the mgmt_server_id integrity in host table, when user manually + // We don't have foreign key constraints to enforce the mgmt_server_id integrity in host table, when user manually // remove records from mshost table, this will leave orphan mgmt_serve_id reference in host table. List orphanList = _mshostDao.listOrphanMsids(); if(orphanList.size() > 0) { @@ -1038,12 +784,12 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { for(ManagementServerHostVO host : inactiveList) { if(!pingManagementNode(host)) { s_logger.warn("Management node " + host.getId() + " is detected inactive by timestamp and also not pingable"); - downHostList.add(host); + downHostList.add(host); } } if(downHostList.size() > 0) - this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, downHostList)); + queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, downHostList)); } else { s_logger.info("No inactive management server node found"); } @@ -1057,7 +803,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { Profiler profilerQueryActiveList = new Profiler(); profilerQueryActiveList.start(); - List currentList = _mshostDao.getActiveList(new Date(cutTime.getTime() - _heartbeatThreshold)); + List currentList = _mshostDao.getActiveList(new Date(cutTime.getTime() - _heartbeatThreshold.value())); profilerQueryActiveList.stop(); Profiler profilerSyncClusterInfo = new Profiler(); @@ -1119,7 +865,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { } } - this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, invalidatedNodeList)); + queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, invalidatedNodeList)); } profilerInvalidatedNodeList.stop(); @@ -1144,7 +890,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { } if(removedNodeList.size() > 0) { - this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, removedNodeList)); + queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, removedNodeList)); } profilerRemovedList.stop(); @@ -1167,12 +913,12 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { } if(newNodeList.size() > 0) { - this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeAdded, newNodeList)); + queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeAdded, newNodeList)); } profiler.stop(); - if(profiler.getDuration() >= this._heartbeatInterval) { + if (profiler.getDuration() >= _heartbeatInterval.value()) { if(s_logger.isDebugEnabled()) s_logger.debug("Peer scan takes too long to finish. profiler: " + profiler.toString() + ", profilerQueryActiveList: " + profilerQueryActiveList.toString() @@ -1208,7 +954,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { if (mshost == null) { mshost = new ManagementServerHostVO(); mshost.setMsid(_msId); - mshost.setRunid(this.getCurrentRunId()); + mshost.setRunid(getCurrentRunId()); mshost.setName(NetUtils.getHostName()); mshost.setVersion(version); mshost.setServiceIP(_clusterNodeIP); @@ -1240,7 +986,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { _mshostPeerDao.clearPeerInfo(_mshostId); // use seperate thread for heartbeat updates - _heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), _heartbeatInterval, _heartbeatInterval, TimeUnit.MILLISECONDS); + _heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), _heartbeatInterval.value(), _heartbeatInterval.value(), TimeUnit.MILLISECONDS); _notificationExecutor.submit(getNotificationTask()); } catch (Throwable e) { @@ -1281,23 +1027,19 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { return true; } + protected final ConfigKey HeartBeatInterval = new ConfigKey(Integer.class, "cluster.heartbeat.interval", "management-server", ClusterManager.class, + "1500", "Interval to check for the heart beat between management server nodes", false, "Seconds"); + protected final ConfigKey HeartBeatThreshold = new ConfigKey(Integer.class, "cluster.heartbeat.threshold", "management-server", ClusterManager.class, + "150000", "Threshold before self-fence the management server", true, "Seconds"); + @Override public boolean configure(String name, Map params) throws ConfigurationException { if(s_logger.isInfoEnabled()) { s_logger.info("Start configuring cluster manager : " + name); } - Map configs = _configDao.getConfiguration("management-server", params); - - String value = configs.get("cluster.heartbeat.interval"); - if (value != null) { - _heartbeatInterval = NumbersUtil.parseInt(value, ClusterManager.DEFAULT_HEARTBEAT_INTERVAL); - } - - value = configs.get("cluster.heartbeat.threshold"); - if (value != null) { - _heartbeatThreshold = NumbersUtil.parseInt(value, ClusterManager.DEFAULT_HEARTBEAT_THRESHOLD); - } + _heartbeatInterval = _configDepot.get(HeartBeatInterval); + _heartbeatThreshold = _configDepot.get(HeartBeatThreshold); File dbPropsFile = PropertiesUtil.findConfigFile("db.properties"); Properties dbProps = new Properties(); @@ -1337,16 +1079,6 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { throw new ConfigurationException("Unable to set current cluster service adapter"); } - _agentLBEnabled = Boolean.valueOf(_configDao.getValue(Config.AgentLbEnable.key())); - - String connectedAgentsThreshold = configs.get("agent.load.threshold"); - - if (connectedAgentsThreshold != null) { - _connectedAgentsThreshold = Double.parseDouble(connectedAgentsThreshold); - } - - this.registerListener(new LockMasterListener(_msId)); - checkConflicts(); if(s_logger.isInfoEnabled()) { @@ -1355,21 +1087,14 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { return true; } - @Override - public long getManagementNodeId() { - return _msId; - } - - @Override public long getCurrentRunId() { return _runId; } - @Override public boolean isManagementNodeAlive(long msid) { ManagementServerHostVO mshost = _mshostDao.findByMsid(msid); if(mshost != null) { - if(mshost.getLastUpdateTime().getTime() >= DateUtil.currentGMTTime().getTime() - _heartbeatThreshold) { + if (mshost.getLastUpdateTime().getTime() >= DateUtil.currentGMTTime().getTime() - _heartbeatThreshold.value()) { return true; } } @@ -1377,7 +1102,6 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { return false; } - @Override public boolean pingManagementNode(long msid) { ManagementServerHostVO mshost = _mshostDao.findByMsid(msid); if(mshost == null) { @@ -1434,20 +1158,16 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { @Override public int getHeartbeatThreshold() { - return this._heartbeatThreshold; + return _heartbeatThreshold.value(); } public int getHeartbeatInterval() { - return this._heartbeatInterval; - } - - public void setHeartbeatThreshold(int threshold) { - _heartbeatThreshold = threshold; + return _heartbeatInterval.value(); } private void checkConflicts() throws ConfigurationException { Date cutTime = DateUtil.currentGMTTime(); - List peers = _mshostDao.getActiveList(new Date(cutTime.getTime() - _heartbeatThreshold)); + List peers = _mshostDao.getActiveList(new Date(cutTime.getTime() - _heartbeatThreshold.value())); for(ManagementServerHostVO peer : peers) { String peerIP = peer.getServiceIP().trim(); if(_clusterNodeIP.equals(peerIP)) { @@ -1475,43 +1195,4 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager { } } - @Override - public boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException { - return _rebalanceService.executeRebalanceRequest(agentId, currentOwnerId, futureOwnerId, event); - } - - @Override - public boolean isAgentRebalanceEnabled() { - return _agentLBEnabled; - } - - @Override - public Boolean propagateResourceEvent(long agentId, ResourceState.Event event) throws AgentUnavailableException { - final String msPeer = getPeerName(agentId); - if (msPeer == null) { - return null; - } - - if (s_logger.isDebugEnabled()) { - s_logger.debug("Propagating agent change request event:" + event.toString() + " to agent:" + agentId); - } - Command[] cmds = new Command[1]; - cmds[0] = new PropagateResourceEventCommand(agentId, event); - - Answer[] answers = execute(msPeer, agentId, cmds, true); - if (answers == null) { - throw new AgentUnavailableException(agentId); - } - - if (s_logger.isDebugEnabled()) { - s_logger.debug("Result for agent change is " + answers[0].getResult()); - } - - return answers[0].getResult(); - } - - @Override - public boolean executeResourceUserRequest(long hostId, ResourceState.Event event) throws AgentUnavailableException { - return _resourceMgr.executeUserRequest(hostId, event); - } } diff --git a/server/src/com/cloud/cluster/ClusterManagerListener.java b/framework/cluster/src/com/cloud/cluster/ClusterManagerListener.java similarity index 100% rename from server/src/com/cloud/cluster/ClusterManagerListener.java rename to framework/cluster/src/com/cloud/cluster/ClusterManagerListener.java diff --git a/server/src/com/cloud/cluster/ClusterManagerMBean.java b/framework/cluster/src/com/cloud/cluster/ClusterManagerMBean.java similarity index 95% rename from server/src/com/cloud/cluster/ClusterManagerMBean.java rename to framework/cluster/src/com/cloud/cluster/ClusterManagerMBean.java index 9804f23fdba..961ed729317 100644 --- a/server/src/com/cloud/cluster/ClusterManagerMBean.java +++ b/framework/cluster/src/com/cloud/cluster/ClusterManagerMBean.java @@ -23,5 +23,4 @@ public interface ClusterManagerMBean { public String getVersion(); public int getHeartbeatInterval(); public int getHeartbeatThreshold(); - public void setHeartbeatThreshold(int threshold); } diff --git a/server/src/com/cloud/cluster/ClusterManagerMBeanImpl.java b/framework/cluster/src/com/cloud/cluster/ClusterManagerMBeanImpl.java similarity index 74% rename from server/src/com/cloud/cluster/ClusterManagerMBeanImpl.java rename to framework/cluster/src/com/cloud/cluster/ClusterManagerMBeanImpl.java index 51b3b428b0e..7071832e17a 100644 --- a/server/src/com/cloud/cluster/ClusterManagerMBeanImpl.java +++ b/framework/cluster/src/com/cloud/cluster/ClusterManagerMBeanImpl.java @@ -24,8 +24,8 @@ import javax.management.StandardMBean; import com.cloud.utils.DateUtil; public class ClusterManagerMBeanImpl extends StandardMBean implements ClusterManagerMBean { - private ClusterManagerImpl _clusterMgr; - private ManagementServerHostVO _mshostVo; + private final ClusterManagerImpl _clusterMgr; + private final ManagementServerHostVO _mshostVo; public ClusterManagerMBeanImpl(ClusterManagerImpl clusterMgr, ManagementServerHostVO mshostVo) { super(ClusterManagerMBean.class, false); @@ -34,34 +34,34 @@ public class ClusterManagerMBeanImpl extends StandardMBean implements ClusterMan _mshostVo = mshostVo; } - public long getMsid() { + @Override + public long getMsid() { return _mshostVo.getMsid(); } - public String getLastUpdateTime() { + @Override + public String getLastUpdateTime() { Date date = _mshostVo.getLastUpdateTime(); return DateUtil.getDateDisplayString(TimeZone.getDefault(), date); } - public String getClusterNodeIP() { + @Override + public String getClusterNodeIP() { return _mshostVo.getServiceIP(); } - public String getVersion() { + @Override + public String getVersion() { return _mshostVo.getVersion(); } - public int getHeartbeatInterval() { + @Override + public int getHeartbeatInterval() { return _clusterMgr.getHeartbeatInterval(); } - public int getHeartbeatThreshold() { + @Override + public int getHeartbeatThreshold() { return _clusterMgr.getHeartbeatThreshold(); } - - public void setHeartbeatThreshold(int threshold) { - // to avoid accidentally screwing up cluster manager, we put some guarding logic here - if(threshold >= ClusterManager.DEFAULT_HEARTBEAT_THRESHOLD) - _clusterMgr.setHeartbeatThreshold(threshold); - } } diff --git a/server/src/com/cloud/cluster/ClusterManagerMessage.java b/framework/cluster/src/com/cloud/cluster/ClusterManagerMessage.java similarity index 100% rename from server/src/com/cloud/cluster/ClusterManagerMessage.java rename to framework/cluster/src/com/cloud/cluster/ClusterManagerMessage.java diff --git a/server/src/com/cloud/cluster/ClusterNodeJoinEventArgs.java b/framework/cluster/src/com/cloud/cluster/ClusterNodeJoinEventArgs.java similarity index 100% rename from server/src/com/cloud/cluster/ClusterNodeJoinEventArgs.java rename to framework/cluster/src/com/cloud/cluster/ClusterNodeJoinEventArgs.java diff --git a/server/src/com/cloud/cluster/ClusterNodeLeftEventArgs.java b/framework/cluster/src/com/cloud/cluster/ClusterNodeLeftEventArgs.java similarity index 100% rename from server/src/com/cloud/cluster/ClusterNodeLeftEventArgs.java rename to framework/cluster/src/com/cloud/cluster/ClusterNodeLeftEventArgs.java diff --git a/server/src/com/cloud/cluster/ClusterService.java b/framework/cluster/src/com/cloud/cluster/ClusterService.java similarity index 100% rename from server/src/com/cloud/cluster/ClusterService.java rename to framework/cluster/src/com/cloud/cluster/ClusterService.java diff --git a/server/src/com/cloud/cluster/ClusterServiceAdapter.java b/framework/cluster/src/com/cloud/cluster/ClusterServiceAdapter.java similarity index 100% rename from server/src/com/cloud/cluster/ClusterServiceAdapter.java rename to framework/cluster/src/com/cloud/cluster/ClusterServiceAdapter.java diff --git a/server/src/com/cloud/cluster/ClusterServicePdu.java b/framework/cluster/src/com/cloud/cluster/ClusterServicePdu.java similarity index 100% rename from server/src/com/cloud/cluster/ClusterServicePdu.java rename to framework/cluster/src/com/cloud/cluster/ClusterServicePdu.java diff --git a/server/src/com/cloud/cluster/ClusterServiceRequestPdu.java b/framework/cluster/src/com/cloud/cluster/ClusterServiceRequestPdu.java similarity index 100% rename from server/src/com/cloud/cluster/ClusterServiceRequestPdu.java rename to framework/cluster/src/com/cloud/cluster/ClusterServiceRequestPdu.java diff --git a/server/src/com/cloud/cluster/ClusterServiceServletAdapter.java b/framework/cluster/src/com/cloud/cluster/ClusterServiceServletAdapter.java similarity index 86% rename from server/src/com/cloud/cluster/ClusterServiceServletAdapter.java rename to framework/cluster/src/com/cloud/cluster/ClusterServiceServletAdapter.java index 04026d30168..13b466dd44c 100644 --- a/server/src/com/cloud/cluster/ClusterServiceServletAdapter.java +++ b/framework/cluster/src/com/cloud/cluster/ClusterServiceServletAdapter.java @@ -31,9 +31,11 @@ import javax.naming.ConfigurationException; import org.apache.log4j.Logger; import org.springframework.stereotype.Component; +import org.apache.cloudstack.config.ConfigDepot; +import org.apache.cloudstack.config.ConfigKey; +import org.apache.cloudstack.config.ConfigValue; + import com.cloud.cluster.dao.ManagementServerHostDao; -import com.cloud.configuration.Config; -import com.cloud.configuration.dao.ConfigurationDao; import com.cloud.utils.NumbersUtil; import com.cloud.utils.PropertiesUtil; import com.cloud.utils.component.AdapterBase; @@ -49,14 +51,14 @@ public class ClusterServiceServletAdapter extends AdapterBase implements Cluster @Inject private ClusterManager _manager; @Inject private ManagementServerHostDao _mshostDao; - - @Inject private ConfigurationDao _configDao; + @Inject + protected ConfigDepot _configDepot; private ClusterServiceServletContainer _servletContainer; private int _clusterServicePort = DEFAULT_SERVICE_PORT; - private int _clusterRequestTimeoutSeconds = DEFAULT_REQUEST_TIMEOUT; + private ConfigValue _clusterRequestTimeoutSeconds; @Override public ClusterService getPeerService(String strPeer) throws RemoteException { @@ -71,7 +73,7 @@ public class ClusterServiceServletAdapter extends AdapterBase implements Cluster if(serviceUrl == null) return null; - return new ClusterServiceServletImpl(serviceUrl, _clusterRequestTimeoutSeconds); + return new ClusterServiceServletImpl(serviceUrl, _clusterRequestTimeoutSeconds); } @Override @@ -123,12 +125,14 @@ public class ClusterServiceServletAdapter extends AdapterBase implements Cluster return true; } + private final ConfigKey ClusterMessageTimeOut = new ConfigKey(Integer.class, "cluster.message.timeout.seconds", "Advance", ClusterManager.class, "300", + "Time (in seconds) to wait before a inter-management server message post times out.", true, "Seconds"); + private void init() throws ConfigurationException { if(_mshostDao != null) return; - String value = _configDao.getValue(Config.ClusterMessageTimeOutSeconds.key()); - _clusterRequestTimeoutSeconds = NumbersUtil.parseInt(value, DEFAULT_REQUEST_TIMEOUT); + _clusterRequestTimeoutSeconds = _configDepot.get(ClusterMessageTimeOut); s_logger.info("Configure cluster request time out. timeout: " + _clusterRequestTimeoutSeconds + " seconds"); File dbPropsFile = PropertiesUtil.findConfigFile("db.properties"); diff --git a/server/src/com/cloud/cluster/ClusterServiceServletContainer.java b/framework/cluster/src/com/cloud/cluster/ClusterServiceServletContainer.java similarity index 100% rename from server/src/com/cloud/cluster/ClusterServiceServletContainer.java rename to framework/cluster/src/com/cloud/cluster/ClusterServiceServletContainer.java diff --git a/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java b/framework/cluster/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java old mode 100755 new mode 100644 similarity index 100% rename from server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java rename to framework/cluster/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java diff --git a/server/src/com/cloud/cluster/ClusterServiceServletImpl.java b/framework/cluster/src/com/cloud/cluster/ClusterServiceServletImpl.java similarity index 91% rename from server/src/com/cloud/cluster/ClusterServiceServletImpl.java rename to framework/cluster/src/com/cloud/cluster/ClusterServiceServletImpl.java index 3270315785b..c3ed3fe590d 100644 --- a/server/src/com/cloud/cluster/ClusterServiceServletImpl.java +++ b/framework/cluster/src/com/cloud/cluster/ClusterServiceServletImpl.java @@ -27,23 +27,25 @@ import org.apache.commons.httpclient.methods.PostMethod; import org.apache.commons.httpclient.params.HttpClientParams; import org.apache.log4j.Logger; +import org.apache.cloudstack.config.ConfigValue; + public class ClusterServiceServletImpl implements ClusterService { private static final long serialVersionUID = 4574025200012566153L; private static final Logger s_logger = Logger.getLogger(ClusterServiceServletImpl.class); private String _serviceUrl; - private int _requestTimeoutSeconds; + private ConfigValue _requestTimeoutSeconds; protected static HttpClient s_client = null; public ClusterServiceServletImpl() { } - public ClusterServiceServletImpl(String serviceUrl, int requestTimeoutSeconds) { - s_logger.info("Setup cluster service servlet. service url: " + serviceUrl + ", request timeout: " + requestTimeoutSeconds + " seconds"); + public ClusterServiceServletImpl(String serviceUrl, ConfigValue requestTimeoutSeconds) { + s_logger.info("Setup cluster service servlet. service url: " + serviceUrl + ", request timeout: " + requestTimeoutSeconds.value() + " seconds"); - this._serviceUrl = serviceUrl; - this._requestTimeoutSeconds = requestTimeoutSeconds; + _serviceUrl = serviceUrl; + _requestTimeoutSeconds = requestTimeoutSeconds; } @Override @@ -125,7 +127,7 @@ public class ClusterServiceServletImpl implements ClusterService { s_client = new HttpClient(mgr); HttpClientParams clientParams = new HttpClientParams(); - clientParams.setSoTimeout(_requestTimeoutSeconds * 1000); + clientParams.setSoTimeout(_requestTimeoutSeconds.value() * 1000); s_client.setParams(clientParams); } @@ -141,6 +143,6 @@ public class ClusterServiceServletImpl implements ClusterService { System.out.println(result); } catch (RemoteException e) { } -*/ +*/ } } diff --git a/api/src/com/cloud/cluster/ManagementServerHost.java b/framework/cluster/src/com/cloud/cluster/ManagementServerHost.java similarity index 89% rename from api/src/com/cloud/cluster/ManagementServerHost.java rename to framework/cluster/src/com/cloud/cluster/ManagementServerHost.java index 9c88a2b2006..a5764fbb2e1 100644 --- a/api/src/com/cloud/cluster/ManagementServerHost.java +++ b/framework/cluster/src/com/cloud/cluster/ManagementServerHost.java @@ -16,9 +16,9 @@ // under the License. package com.cloud.cluster; -import org.apache.cloudstack.api.InternalIdentity; -public interface ManagementServerHost extends InternalIdentity { +public interface ManagementServerHost { + long getId(); public static enum State { Up, Starting, Down @@ -29,4 +29,6 @@ public interface ManagementServerHost extends InternalIdentity { State getState(); String getVersion(); + + String getServiceIP(); } diff --git a/engine/schema/src/com/cloud/cluster/ManagementServerHostPeerVO.java b/framework/cluster/src/com/cloud/cluster/ManagementServerHostPeerVO.java similarity index 94% rename from engine/schema/src/com/cloud/cluster/ManagementServerHostPeerVO.java rename to framework/cluster/src/com/cloud/cluster/ManagementServerHostPeerVO.java index e5e12ecb8bf..060dd0a824f 100644 --- a/engine/schema/src/com/cloud/cluster/ManagementServerHostPeerVO.java +++ b/framework/cluster/src/com/cloud/cluster/ManagementServerHostPeerVO.java @@ -30,11 +30,10 @@ import javax.persistence.Temporal; import javax.persistence.TemporalType; import com.cloud.utils.DateUtil; -import org.apache.cloudstack.api.InternalIdentity; @Entity @Table(name="mshost_peer") -public class ManagementServerHostPeerVO implements InternalIdentity { +public class ManagementServerHostPeerVO { @Id @GeneratedValue(strategy=GenerationType.IDENTITY) @@ -67,7 +66,7 @@ public class ManagementServerHostPeerVO implements InternalIdentity { this.peerRunid = peerRunid; this.peerState = peerState; - this.lastUpdateTime = DateUtil.currentGMTTime(); + lastUpdateTime = DateUtil.currentGMTTime(); } public long getId() { diff --git a/engine/schema/src/com/cloud/cluster/ManagementServerHostVO.java b/framework/cluster/src/com/cloud/cluster/ManagementServerHostVO.java similarity index 96% rename from engine/schema/src/com/cloud/cluster/ManagementServerHostVO.java rename to framework/cluster/src/com/cloud/cluster/ManagementServerHostVO.java index 31642e4d0c5..966a8748750 100644 --- a/engine/schema/src/com/cloud/cluster/ManagementServerHostVO.java +++ b/framework/cluster/src/com/cloud/cluster/ManagementServerHostVO.java @@ -30,7 +30,6 @@ import javax.persistence.Temporal; import javax.persistence.TemporalType; import com.cloud.utils.db.GenericDao; -import org.apache.cloudstack.api.InternalIdentity; @Entity @Table(name="mshost") @@ -81,10 +80,11 @@ public class ManagementServerHostVO implements ManagementServerHost { this.runid = runid; this.serviceIP = serviceIP; this.servicePort = servicePort; - this.lastUpdateTime = updateTime; + lastUpdateTime = updateTime; } - public long getId() { + @Override + public long getId() { return id; } @@ -119,7 +119,7 @@ public class ManagementServerHostVO implements ManagementServerHost { @Override public ManagementServerHost.State getState() { - return this.state; + return state; } public void setState(ManagementServerHost.State state) { diff --git a/server/src/com/cloud/cluster/RemoteMethodConstants.java b/framework/cluster/src/com/cloud/cluster/RemoteMethodConstants.java similarity index 100% rename from server/src/com/cloud/cluster/RemoteMethodConstants.java rename to framework/cluster/src/com/cloud/cluster/RemoteMethodConstants.java diff --git a/engine/schema/src/com/cloud/cluster/dao/ManagementServerHostDao.java b/framework/cluster/src/com/cloud/cluster/dao/ManagementServerHostDao.java similarity index 100% rename from engine/schema/src/com/cloud/cluster/dao/ManagementServerHostDao.java rename to framework/cluster/src/com/cloud/cluster/dao/ManagementServerHostDao.java diff --git a/engine/schema/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java b/framework/cluster/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java similarity index 99% rename from engine/schema/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java rename to framework/cluster/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java index 3866da1bed3..879c4ce3a27 100644 --- a/engine/schema/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java +++ b/framework/cluster/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java @@ -27,7 +27,6 @@ import java.util.TimeZone; import javax.ejb.Local; import org.apache.log4j.Logger; -import org.springframework.stereotype.Component; import com.cloud.cluster.ClusterInvalidSessionException; import com.cloud.cluster.ManagementServerHost; @@ -42,7 +41,6 @@ import com.cloud.utils.db.SearchCriteria; import com.cloud.utils.db.Transaction; import com.cloud.utils.exception.CloudRuntimeException; -@Component @Local(value={ManagementServerHostDao.class}) public class ManagementServerHostDaoImpl extends GenericDaoBase implements ManagementServerHostDao { private static final Logger s_logger = Logger.getLogger(ManagementServerHostDaoImpl.class); diff --git a/engine/schema/src/com/cloud/cluster/dao/ManagementServerHostPeerDao.java b/framework/cluster/src/com/cloud/cluster/dao/ManagementServerHostPeerDao.java similarity index 100% rename from engine/schema/src/com/cloud/cluster/dao/ManagementServerHostPeerDao.java rename to framework/cluster/src/com/cloud/cluster/dao/ManagementServerHostPeerDao.java diff --git a/engine/schema/src/com/cloud/cluster/dao/ManagementServerHostPeerDaoImpl.java b/framework/cluster/src/com/cloud/cluster/dao/ManagementServerHostPeerDaoImpl.java similarity index 98% rename from engine/schema/src/com/cloud/cluster/dao/ManagementServerHostPeerDaoImpl.java rename to framework/cluster/src/com/cloud/cluster/dao/ManagementServerHostPeerDaoImpl.java index 8ad02cdbeed..8ef2e82a943 100644 --- a/engine/schema/src/com/cloud/cluster/dao/ManagementServerHostPeerDaoImpl.java +++ b/framework/cluster/src/com/cloud/cluster/dao/ManagementServerHostPeerDaoImpl.java @@ -21,7 +21,6 @@ import java.util.List; import javax.ejb.Local; import org.apache.log4j.Logger; -import org.springframework.stereotype.Component; import com.cloud.cluster.ManagementServerHost; import com.cloud.cluster.ManagementServerHostPeerVO; @@ -31,7 +30,6 @@ import com.cloud.utils.db.SearchBuilder; import com.cloud.utils.db.SearchCriteria; import com.cloud.utils.db.Transaction; -@Component @Local(value={ManagementServerHostPeerDao.class}) public class ManagementServerHostPeerDaoImpl extends GenericDaoBase implements ManagementServerHostPeerDao { private static final Logger s_logger = Logger.getLogger(ManagementServerHostPeerDaoImpl.class); diff --git a/framework/pom.xml b/framework/pom.xml index ddcdcb0439a..12cb94649d3 100644 --- a/framework/pom.xml +++ b/framework/pom.xml @@ -34,5 +34,7 @@ rest events jobs + cluster + db diff --git a/server/pom.xml b/server/pom.xml index 4511804312f..484d603b0e2 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -33,6 +33,11 @@ cloud-core ${project.version} + + org.apache.cloudstack + cloud-framework-cluster + ${project.version} + javax.servlet servlet-api diff --git a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java index 4fdb3c6c83b..a399af3b1f9 100755 --- a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java +++ b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java @@ -44,22 +44,30 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import org.apache.log4j.Logger; -import org.springframework.context.annotation.Primary; -import org.springframework.stereotype.Component; + +import com.google.gson.Gson; + +import org.apache.cloudstack.config.ConfigDepot; +import org.apache.cloudstack.config.ConfigKey; +import org.apache.cloudstack.config.ConfigValue; +import org.apache.cloudstack.utils.identity.ManagementServerNode; import com.cloud.agent.AgentManager; import com.cloud.agent.api.Answer; import com.cloud.agent.api.CancelCommand; +import com.cloud.agent.api.ChangeAgentAnswer; import com.cloud.agent.api.ChangeAgentCommand; import com.cloud.agent.api.Command; -import com.cloud.agent.api.TransferAgentCommand; +import com.cloud.agent.api.PropagateResourceEventCommand; import com.cloud.agent.api.ScheduleHostScanTaskCommand; +import com.cloud.agent.api.TransferAgentCommand; import com.cloud.agent.transport.Request; import com.cloud.agent.transport.Request.Version; import com.cloud.agent.transport.Response; import com.cloud.api.ApiDBUtils; import com.cloud.cluster.ClusterManager; import com.cloud.cluster.ClusterManagerListener; +import com.cloud.cluster.ClusterServicePdu; import com.cloud.cluster.ClusteredAgentRebalanceService; import com.cloud.cluster.ManagementServerHost; import com.cloud.cluster.ManagementServerHostVO; @@ -68,7 +76,6 @@ import com.cloud.cluster.agentlb.HostTransferMapVO; import com.cloud.cluster.agentlb.HostTransferMapVO.HostTransferState; import com.cloud.cluster.agentlb.dao.HostTransferMapDao; import com.cloud.cluster.dao.ManagementServerHostDao; -import com.cloud.configuration.Config; import com.cloud.configuration.dao.ConfigurationDao; import com.cloud.exception.AgentUnavailableException; import com.cloud.exception.OperationTimedoutException; @@ -76,10 +83,11 @@ import com.cloud.host.Host; import com.cloud.host.HostVO; import com.cloud.host.Status; import com.cloud.host.Status.Event; +import com.cloud.resource.ResourceState; import com.cloud.resource.ServerResource; import com.cloud.storage.resource.DummySecondaryStorageResource; import com.cloud.utils.DateUtil; -import com.cloud.utils.NumbersUtil; +import com.cloud.utils.Profiler; import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.db.SearchCriteria.Op; import com.cloud.utils.db.SearchCriteria2; @@ -98,10 +106,10 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust public final static long STARTUP_DELAY = 5000; public final static long SCAN_INTERVAL = 90000; // 90 seconds, it takes 60 sec for xenserver to fail login public final static int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds - public long _loadSize = 100; - protected int _directAgentScanInterval = 90; // 90 seconds protected Set _agentToTransferIds = new HashSet(); + Gson _gson; + @Inject protected ClusterManager _clusterMgr = null; @@ -118,30 +126,46 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust @Inject protected List _lbPlanners; @Inject ConfigurationDao _configDao; + @Inject + ConfigDepot _configDepot; protected ClusteredAgentManagerImpl() { super(); } + protected final ConfigKey EnableLB = new ConfigKey(Boolean.class, "agent.lb.enabled", "Advanced", AgentManager.class, "false", + "Enable agent load balancing between management server nodes", true, "True/False"); + protected final ConfigKey ConnectedAgentThreshold = new ConfigKey(Double.class, "agent.load.threshold", "Advanced", AgentManager.class, "0.7", + "What percentage of the agents can be held by one management server before load balancing happens", true, "0-1"); + protected final ConfigKey LoadSize = new ConfigKey(Integer.class, "direct.agent.load.size", "Advanced", AgentManager.class, "16", + "How many agents to connect to in each round", true, ""); + protected final ConfigKey ScanInterval = new ConfigKey(Integer.class, "direct.agent.scan.interval", "Advanced", AgentManager.class, "90", + "Interval between scans to load agents", false, "Seconds"); + + + protected ConfigValue _agentLBEnabled; + protected ConfigValue _connectedAgentsThreshold; + protected ConfigValue _loadSize; + protected ConfigValue _directAgentScanInterval; + @Override public boolean configure(String name, Map xmlParams) throws ConfigurationException { _peers = new HashMap(7); _sslEngines = new HashMap(7); - _nodeId = _clusterMgr.getManagementNodeId(); + _nodeId = ManagementServerNode.getManagementServerId(); s_logger.info("Configuring ClusterAgentManagerImpl. management server node id(msid): " + _nodeId); - Map params = _configDao.getConfiguration(xmlParams); - String value = params.get(Config.DirectAgentLoadSize.key()); - _loadSize = NumbersUtil.parseInt(value, 16); - - value = params.get(Config.DirectAgentScanInterval.key()); - _directAgentScanInterval = NumbersUtil.parseInt(value, 90); // defaulted to 90 seconds + _loadSize = _configDepot.get(LoadSize); + _directAgentScanInterval = _configDepot.get(ScanInterval).setMultiplier(1000); + _agentLBEnabled = _configDepot.get(EnableLB); + _connectedAgentsThreshold = _configDepot.get(ConnectedAgentThreshold); ClusteredAgentAttache.initialize(this); _clusterMgr.registerListener(this); - + _clusterMgr.registerDispatcher(new ClusterDispatcher()); + return super.configure(name, xmlParams); } @@ -150,13 +174,13 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust if (!super.start()) { return false; } - _timer.schedule(new DirectAgentScanTimerTask(), STARTUP_DELAY, _directAgentScanInterval * 1000); + _timer.schedule(new DirectAgentScanTimerTask(), STARTUP_DELAY, _directAgentScanInterval.value()); if (s_logger.isDebugEnabled()) { - s_logger.debug("Scheduled direct agent scan task to run at an interval of " + _directAgentScanInterval + " seconds"); + s_logger.debug("Scheduled direct agent scan task to run at an interval of " + _directAgentScanInterval.value() + " seconds"); } // schedule transfer scan executor - if agent LB is enabled - if (_clusterMgr.isAgentRebalanceEnabled()) { + if (isAgentRebalanceEnabled()) { s_transferExecutor.scheduleAtFixedRate(getTransferScanTask(), 60000, ClusteredAgentRebalanceService.DEFAULT_TRANSFER_CHECK_INTERVAL, TimeUnit.MILLISECONDS); } @@ -182,7 +206,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust // for agents that are self-managed, threshold to be considered as disconnected after pingtimeout long cutSeconds = (System.currentTimeMillis() >> 10) - (_pingTimeout); - List hosts = _hostDao.findAndUpdateDirectAgentToLoad(cutSeconds, _loadSize, _nodeId); + List hosts = _hostDao.findAndUpdateDirectAgentToLoad(cutSeconds, _loadSize.value().longValue(), _nodeId); List appliances = _hostDao.findAndUpdateApplianceToLoad(cutSeconds, _nodeId); hosts.addAll(appliances); @@ -319,7 +343,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust AgentAttache attache = findAttache(hostId); if (attache != null) { //don't process disconnect if the host is being rebalanced - if (_clusterMgr.isAgentRebalanceEnabled()) { + if (isAgentRebalanceEnabled()) { HostTransferMapVO transferVO = _hostTransferDao.findById(hostId); if (transferVO != null) { if (transferVO.getFutureOwner() == _nodeId && transferVO.getState() == HostTransferState.TransferStarted) { @@ -351,7 +375,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust public boolean reconnect(final long hostId) { Boolean result; try { - result = _clusterMgr.propagateAgentEvent(hostId, Event.ShutdownRequested); + result = propagateAgentEvent(hostId, Event.ShutdownRequested); if (result != null) { return result; } @@ -366,7 +390,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust public void notifyNodesInCluster(AgentAttache attache) { s_logger.debug("Notifying other nodes of to disconnect"); Command[] cmds = new Command[] { new ChangeAgentCommand(attache.getId(), Event.AgentDisconnected) }; - _clusterMgr.broadcast(attache.getId(), cmds); + _clusterMgr.broadcast(attache.getId(), _gson.toJson(cmds)); } // notifies MS peers to schedule a host scan task immediately, triggered during addHost operation @@ -375,7 +399,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust s_logger.debug("Notifying other MS nodes to run host scan task"); } Command[] cmds = new Command[] { new ScheduleHostScanTaskCommand() }; - _clusterMgr.broadcast(0, cmds); + _clusterMgr.broadcast(0, _gson.toJson(cmds)); } protected static void logT(byte[] bytes, final String msg) { @@ -428,7 +452,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } public String findPeer(long hostId) { - return _clusterMgr.getPeerName(hostId); + return getPeerName(hostId); } public SSLEngine getSSLEngine(String peerName) { @@ -468,7 +492,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } } if (ch == null || ch == prevCh) { - ManagementServerHostVO ms = _clusterMgr.getPeer(peerName); + ManagementServerHost ms = _clusterMgr.getPeer(peerName); if (ms == null) { s_logger.info("Unable to find peer: " + peerName); return null; @@ -514,7 +538,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } public SocketChannel connectToPeer(long hostId, SocketChannel prevCh) { - String peerName = _clusterMgr.getPeerName(hostId); + String peerName = getPeerName(hostId); if (peerName == null) { return null; } @@ -861,7 +885,9 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust s_logger.debug("Forwarding " + cmds[0].toString() + " to " + peer); } String peerName = Long.toString(peer); - Answer[] answers = _clusterMgr.execute(peerName, agentId, cmds, true); + String cmdStr = _gson.toJson(cmds); + String ansStr = _clusterMgr.execute(peerName, agentId, cmdStr, true); + Answer[] answers = _gson.fromJson(ansStr, Answer[].class); return answers; } catch (Exception e) { s_logger.warn("Caught exception while talking to " + currentOwnerId, e); @@ -869,6 +895,46 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } } + public String getPeerName(long agentHostId) { + + HostVO host = _hostDao.findById(agentHostId); + if (host != null && host.getManagementServerId() != null) { + if (_clusterMgr.getSelfPeerName().equals(Long.toString(host.getManagementServerId()))) { + return null; + } + + return Long.toString(host.getManagementServerId()); + } + return null; + } + + + public Boolean propagateAgentEvent(long agentId, Event event) throws AgentUnavailableException { + final String msPeer = getPeerName(agentId); + if (msPeer == null) { + return null; + } + + if (s_logger.isDebugEnabled()) { + s_logger.debug("Propagating agent change request event:" + event.toString() + " to agent:" + agentId); + } + Command[] cmds = new Command[1]; + cmds[0] = new ChangeAgentCommand(agentId, event); + + String ansStr = _clusterMgr.execute(msPeer, agentId, _gson.toJson(cmds), true); + if (ansStr == null) { + throw new AgentUnavailableException(agentId); + } + + Answer[] answers = _gson.fromJson(ansStr, Answer[].class); + + if (s_logger.isDebugEnabled()) { + s_logger.debug("Result for agent change is " + answers[0].getResult()); + } + + return answers[0].getResult(); + } + private Runnable getTransferScanTask() { return new Runnable() { @Override @@ -1143,4 +1209,227 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } } + private String handleScheduleHostScanTaskCommand(ScheduleHostScanTaskCommand cmd) { + if (s_logger.isDebugEnabled()) { + s_logger.debug("Intercepting resource manager command: " + _gson.toJson(cmd)); + } + + try { + scheduleHostScanTask(); + } catch (Exception e) { + // Scheduling host scan task in peer MS is a best effort operation during host add, regular host scan + // happens at fixed intervals anyways. So handling any exceptions that may be thrown + s_logger.warn("Exception happened while trying to schedule host scan task on mgmt server " + _clusterMgr.getSelfPeerName() + + ", ignoring as regular host scan happens at fixed interval anyways", e); + return null; + } + + Answer[] answers = new Answer[1]; + answers[0] = new Answer(cmd, true, null); + return _gson.toJson(answers); + } + + public Answer[] sendToAgent(Long hostId, Command[] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException { + Commands commands = new Commands(stopOnError ? OnError.Stop : OnError.Continue); + for (Command cmd : cmds) { + commands.addCommand(cmd); + } + return send(hostId, commands); + } + + public Boolean propagateResourceEvent(long agentId, ResourceState.Event event) throws AgentUnavailableException { + final String msPeer = getPeerName(agentId); + if (msPeer == null) { + return null; + } + + if (s_logger.isDebugEnabled()) { + s_logger.debug("Propagating agent change request event:" + event.toString() + " to agent:" + agentId); + } + Command[] cmds = new Command[1]; + cmds[0] = new PropagateResourceEventCommand(agentId, event); + + String AnsStr = _clusterMgr.execute(msPeer, agentId, _gson.toJson(cmds), true); + if (AnsStr == null) { + throw new AgentUnavailableException(agentId); + } + + Answer[] answers = _gson.fromJson(AnsStr, Answer[].class); + + if (s_logger.isDebugEnabled()) { + s_logger.debug("Result for agent change is " + answers[0].getResult()); + } + + return answers[0].getResult(); + } + + public boolean executeResourceUserRequest(long hostId, ResourceState.Event event) throws AgentUnavailableException { + return _resourceMgr.executeUserRequest(hostId, event); + } + + protected class ClusterDispatcher implements ClusterManager.Dispatcher { + @Override + public String getName() { + return "ClusterDispatcher"; + } + + @Override + public String dispatch(ClusterServicePdu pdu) { + + if (s_logger.isDebugEnabled()) { + s_logger.debug("Dispatch ->" + pdu.getAgentId() + ", json: " + pdu.getJsonPackage()); + } + + Command[] cmds = null; + try { + cmds = _gson.fromJson(pdu.getJsonPackage(), Command[].class); + } catch (Throwable e) { + assert (false); + s_logger.error("Excection in gson decoding : ", e); + } + + if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) { //intercepted + ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0]; + + if (s_logger.isDebugEnabled()) { + s_logger.debug("Intercepting command for agent change: agent " + cmd.getAgentId() + " event: " + cmd.getEvent()); + } + boolean result = false; + try { + result = executeAgentUserRequest(cmd.getAgentId(), cmd.getEvent()); + if (s_logger.isDebugEnabled()) { + s_logger.debug("Result is " + result); + } + + } catch (AgentUnavailableException e) { + s_logger.warn("Agent is unavailable", e); + return null; + } + + Answer[] answers = new Answer[1]; + answers[0] = new ChangeAgentAnswer(cmd, result); + return _gson.toJson(answers); + } else if (cmds.length == 1 && cmds[0] instanceof TransferAgentCommand) { + TransferAgentCommand cmd = (TransferAgentCommand)cmds[0]; + + if (s_logger.isDebugEnabled()) { + s_logger.debug("Intercepting command for agent rebalancing: agent " + cmd.getAgentId() + " event: " + cmd.getEvent()); + } + boolean result = false; + try { + result = rebalanceAgent(cmd.getAgentId(), cmd.getEvent(), cmd.getCurrentOwner(), cmd.getFutureOwner()); + if (s_logger.isDebugEnabled()) { + s_logger.debug("Result is " + result); + } + + } catch (AgentUnavailableException e) { + s_logger.warn("Agent is unavailable", e); + return null; + } catch (OperationTimedoutException e) { + s_logger.warn("Operation timed out", e); + return null; + } + Answer[] answers = new Answer[1]; + answers[0] = new Answer(cmd, result, null); + return _gson.toJson(answers); + } else if (cmds.length == 1 && cmds[0] instanceof PropagateResourceEventCommand) { + PropagateResourceEventCommand cmd = (PropagateResourceEventCommand)cmds[0]; + + s_logger.debug("Intercepting command to propagate event " + cmd.getEvent().name() + " for host " + cmd.getHostId()); + + boolean result = false; + try { + result = executeResourceUserRequest(cmd.getHostId(), cmd.getEvent()); + s_logger.debug("Result is " + result); + } catch (AgentUnavailableException ex) { + s_logger.warn("Agent is unavailable", ex); + return null; + } + + Answer[] answers = new Answer[1]; + answers[0] = new Answer(cmd, result, null); + return _gson.toJson(answers); + } else if (cmds.length == 1 && cmds[0] instanceof ScheduleHostScanTaskCommand) { + ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand)cmds[0]; + String response = handleScheduleHostScanTaskCommand(cmd); + return response; + } + + try { + long startTick = System.currentTimeMillis(); + if (s_logger.isDebugEnabled()) { + s_logger.debug("Dispatch -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage()); + } + + Answer[] answers = sendToAgent(pdu.getAgentId(), cmds, pdu.isStopOnError()); + if (answers != null) { + String jsonReturn = _gson.toJson(answers); + + if (s_logger.isDebugEnabled()) { + s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() + + " in " + (System.currentTimeMillis() - startTick) + " ms, return result: " + jsonReturn); + } + + return jsonReturn; + } else { + if (s_logger.isDebugEnabled()) { + s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() + + " in " + (System.currentTimeMillis() - startTick) + " ms, return null result"); + } + } + } catch (AgentUnavailableException e) { + s_logger.warn("Agent is unavailable", e); + } catch (OperationTimedoutException e) { + s_logger.warn("Timed Out", e); + } + + return null; + } + + } + + public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException { + return executeUserRequest(agentId, event); + } + + public boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException { + return _rebalanceService.executeRebalanceRequest(agentId, currentOwnerId, futureOwnerId, event); + } + + public boolean isAgentRebalanceEnabled() { + return _agentLBEnabled.value(); + } + + @Inject + private ClusteredAgentRebalanceService _rebalanceService; + + boolean _agentLbHappened = false; + public void agentrebalance() { + Profiler profilerAgentLB = new Profiler(); + profilerAgentLB.start(); + //initiate agent lb task will be scheduled and executed only once, and only when number of agents loaded exceeds _connectedAgentsThreshold + if (_agentLBEnabled.value() && !_agentLbHappened) { + SearchCriteriaService sc = SearchCriteria2.create(HostVO.class); + sc.addAnd(sc.getEntity().getManagementServerId(), Op.NNULL); + sc.addAnd(sc.getEntity().getType(), Op.EQ, Host.Type.Routing); + List allManagedRoutingAgents = sc.list(); + + sc = SearchCriteria2.create(HostVO.class); + sc.addAnd(sc.getEntity().getType(), Op.EQ, Host.Type.Routing); + List allAgents = sc.list(); + double allHostsCount = allAgents.size(); + double managedHostsCount = allManagedRoutingAgents.size(); + if (allHostsCount > 0.0) { + double load = managedHostsCount / allHostsCount; + if (load >= _connectedAgentsThreshold.value()) { + s_logger.debug("Scheduling agent rebalancing task as the average agent load " + load + " is more than the threshold " + _connectedAgentsThreshold); + _rebalanceService.scheduleRebalanceAgents(); + _agentLbHappened = true; + } else { + s_logger.trace("Not scheduling agent rebalancing task as the averages load " + load + " is less than the threshold " + _connectedAgentsThreshold); + } + } + } + profilerAgentLB.stop(); + } } diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java index f5c6904c9a8..f388d36e439 100644 --- a/server/src/com/cloud/async/AsyncJobManagerImpl.java +++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java @@ -53,6 +53,7 @@ import org.apache.cloudstack.api.response.ExceptionResponse; import org.apache.cloudstack.context.CallContext; import org.apache.cloudstack.framework.events.EventBus; import org.apache.cloudstack.framework.events.EventBusException; +import org.apache.cloudstack.utils.identity.ManagementServerNode; import com.cloud.api.ApiDBUtils; import com.cloud.api.ApiDispatcher; @@ -87,7 +88,6 @@ import com.cloud.utils.db.Transaction; import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.exception.ExceptionUtil; import com.cloud.utils.mgmt.JmxUtil; -import com.cloud.utils.net.MacAddress; @Component @Local(value={AsyncJobManager.class}) @@ -773,11 +773,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, } private long getMsid() { - if(_clusterMgr != null) { - return _clusterMgr.getManagementNodeId(); - } - - return MacAddress.getMacAddress().toLong(); + return ManagementServerNode.getManagementServerId(); } private void cleanupPendingJobs(List l) { diff --git a/server/src/com/cloud/cluster/ClusterManager.java b/server/src/com/cloud/cluster/ClusterManager.java deleted file mode 100755 index 017ba311a66..00000000000 --- a/server/src/com/cloud/cluster/ClusterManager.java +++ /dev/null @@ -1,67 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -package com.cloud.cluster; - -import com.cloud.agent.api.Answer; -import com.cloud.agent.api.Command; -import com.cloud.exception.AgentUnavailableException; -import com.cloud.exception.OperationTimedoutException; -import com.cloud.host.Status.Event; -import com.cloud.resource.ResourceState; -import com.cloud.utils.component.Manager; - -public interface ClusterManager extends Manager { - public static final int DEFAULT_HEARTBEAT_INTERVAL = 1500; - public static final int DEFAULT_HEARTBEAT_THRESHOLD = 150000; - public static final String ALERT_SUBJECT = "cluster-alert"; - - public void OnReceiveClusterServicePdu(ClusterServicePdu pdu); - public void executeAsync(String strPeer, long agentId, Command [] cmds, boolean stopOnError); - public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError); - - public Answer[] sendToAgent(Long hostId, Command [] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException; - public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException; - public Boolean propagateAgentEvent(long agentId, Event event) throws AgentUnavailableException; - public Boolean propagateResourceEvent(long agentId, ResourceState.Event event) throws AgentUnavailableException; - public boolean executeResourceUserRequest(long hostId, ResourceState.Event event) throws AgentUnavailableException; - - public int getHeartbeatThreshold(); - - public long getManagementNodeId(); // msid of current management server node - public boolean isManagementNodeAlive(long msid); - public boolean pingManagementNode(long msid); - public long getCurrentRunId(); - - public String getSelfPeerName(); - public String getSelfNodeIP(); - public String getPeerName(long agentHostId); - - public void registerListener(ClusterManagerListener listener); - public void unregisterListener(ClusterManagerListener listener); - public ManagementServerHostVO getPeer(String peerName); - - /** - * Broadcast the command to all of the management server nodes. - * @param agentId agent id this broadcast is regarding - * @param cmds commands to broadcast - */ - public void broadcast(long agentId, Command[] cmds); - - boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException; - - boolean isAgentRebalanceEnabled(); -} diff --git a/server/src/com/cloud/cluster/DummyClusterManagerImpl.java b/server/src/com/cloud/cluster/DummyClusterManagerImpl.java deleted file mode 100755 index 12972b9804f..00000000000 --- a/server/src/com/cloud/cluster/DummyClusterManagerImpl.java +++ /dev/null @@ -1,179 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -package com.cloud.cluster; - -import java.util.Map; - -import javax.ejb.Local; -import javax.naming.ConfigurationException; - -import org.apache.log4j.Logger; -import org.springframework.stereotype.Component; - -import com.cloud.agent.api.Answer; -import com.cloud.agent.api.Command; -import com.cloud.exception.AgentUnavailableException; -import com.cloud.exception.OperationTimedoutException; -import com.cloud.host.Status.Event; -import com.cloud.utils.component.ManagerBase; -import com.cloud.utils.exception.CloudRuntimeException; -import com.cloud.utils.net.MacAddress; - -@Local(value={ClusterManager.class}) -public class DummyClusterManagerImpl extends ManagerBase implements ClusterManager { - private static final Logger s_logger = Logger.getLogger(DummyClusterManagerImpl.class); - - protected long _id = MacAddress.getMacAddress().toLong(); - protected long _runId = System.currentTimeMillis(); - - private final String _clusterNodeIP = "127.0.0.1"; - - @Override - public void OnReceiveClusterServicePdu(ClusterServicePdu pdu) { - throw new CloudRuntimeException("Unsupported feature"); - } - - @Override - public void executeAsync(String strPeer, long agentId, Command [] cmds, boolean stopOnError) { - throw new CloudRuntimeException("Unsupported feature"); - } - - @Override - public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError) { - throw new CloudRuntimeException("Unsupported feature"); - } - - @Override - public Answer[] sendToAgent(Long hostId, Command [] cmds, boolean stopOnError) - throws AgentUnavailableException, OperationTimedoutException { - throw new CloudRuntimeException("Unsupported feature"); - } - -/* - @Override - public long sendToAgent(Long hostId, Command[] cmds, boolean stopOnError, Listener listener) throws AgentUnavailableException { - throw new CloudRuntimeException("Unsupported feature"); - } -*/ - @Override - public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException { - throw new CloudRuntimeException("Unsupported feature"); - } - - @Override - public Boolean propagateAgentEvent(long agentId, Event event) throws AgentUnavailableException { - throw new CloudRuntimeException("Unsupported feature"); - } - - @Override - public int getHeartbeatThreshold() { - return ClusterManager.DEFAULT_HEARTBEAT_INTERVAL; - } - - @Override - public long getManagementNodeId() { - return _id; - } - - @Override - public long getCurrentRunId() { - return _runId; - } - - @Override - public ManagementServerHostVO getPeer(String str) { - return null; - } - - @Override - public String getSelfPeerName() { - return Long.toString(_id); - } - - @Override - public String getSelfNodeIP() { - return _clusterNodeIP; - } - - @Override - public boolean isManagementNodeAlive(long msid) { - return true; - } - - @Override - public boolean pingManagementNode(long msid) { - return false; - } - - @Override - public String getPeerName(long agentHostId) { - throw new CloudRuntimeException("Unsupported feature"); - } - - @Override - public void registerListener(ClusterManagerListener listener) { - } - - @Override - public void unregisterListener(ClusterManagerListener listener) { - } - - @Override - public boolean configure(String name, Map params) - throws ConfigurationException { - return true; - } - - @Override - public void broadcast(long hostId, Command[] cmds) { - } - - @Override - public boolean start() { - if(s_logger.isInfoEnabled()) - s_logger.info("Starting cluster manager, msid : " + _id); - - return true; - } - - @Override - public boolean stop() { - return true; - } - - @Override - public boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException { - return false; - } - - @Override - public boolean isAgentRebalanceEnabled() { - return false; - } - - @Override - public Boolean propagateResourceEvent(long agentId, com.cloud.resource.ResourceState.Event event) throws AgentUnavailableException { - // TODO Auto-generated method stub - return null; - } - - @Override - public boolean executeResourceUserRequest(long hostId, com.cloud.resource.ResourceState.Event event) throws AgentUnavailableException { - // TODO Auto-generated method stub - return false; - } -} diff --git a/server/src/com/cloud/cluster/LockMasterListener.java b/server/src/com/cloud/server/LockMasterListener.java similarity index 92% rename from server/src/com/cloud/cluster/LockMasterListener.java rename to server/src/com/cloud/server/LockMasterListener.java index cc10e2c9967..ee9c9a9d50c 100644 --- a/server/src/com/cloud/cluster/LockMasterListener.java +++ b/server/src/com/cloud/server/LockMasterListener.java @@ -14,10 +14,12 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -package com.cloud.cluster; +package com.cloud.server; import java.util.List; +import com.cloud.cluster.ClusterManagerListener; +import com.cloud.cluster.ManagementServerHostVO; import com.cloud.utils.db.Merovingian2; /** diff --git a/server/src/com/cloud/server/ManagementServerImpl.java b/server/src/com/cloud/server/ManagementServerImpl.java index 77c77e118fb..d96536ec5ab 100755 --- a/server/src/com/cloud/server/ManagementServerImpl.java +++ b/server/src/com/cloud/server/ManagementServerImpl.java @@ -427,6 +427,7 @@ import org.apache.cloudstack.engine.subsystem.api.storage.StoragePoolAllocator; import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory; import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao; import org.apache.cloudstack.storage.datastore.db.StoragePoolVO; +import org.apache.cloudstack.utils.identity.ManagementServerNode; import com.cloud.agent.AgentManager; import com.cloud.agent.api.GetVncPortAnswer; @@ -825,6 +826,8 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe public boolean start() { s_logger.info("Startup CloudStack management server..."); + _clusterMgr.registerListener(new LockMasterListener(ManagementServerNode.getManagementServerId())); + enableAdminUser("password"); return true; }