Moved ClusterManager into it's own package. Removed the agent load balancing code.

This commit is contained in:
Alex Huang 2013-07-25 18:59:30 -07:00
parent 395cbcc023
commit 9aaa378b08
38 changed files with 527 additions and 709 deletions

View File

28
framework/cluster/pom.xml Normal file
View File

@ -0,0 +1,28 @@
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to you under
the Apache License, Version 2.0 (the "License"); you may not use this file except
in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific language
governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-framework-cluster</artifactId>
<name>Apache CloudStack Framework - Clustering</name>
<parent>
<groupId>org.apache.cloudstack</groupId>
<artifactId>cloudstack-framework</artifactId>
<version>4.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.apache.cloudstack</groupId>
<artifactId>cloud-utils</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -16,9 +16,8 @@
// under the License.
package com.cloud.cluster;
import com.cloud.exception.CloudException;
public class ActiveFencingException extends CloudException {
public class ActiveFencingException extends Exception {
private static final long serialVersionUID = -3975376101728211726L;
public ActiveFencingException(String message) {

View File

@ -16,9 +16,8 @@
// under the License.
package com.cloud.cluster;
import com.cloud.exception.CloudException;
public class ClusterInvalidSessionException extends CloudException {
public class ClusterInvalidSessionException extends Exception {
private static final long serialVersionUID = -6636524194520997512L;

View File

@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.cluster;
import com.cloud.utils.component.Manager;
public interface ClusterManager extends Manager {
static final String ALERT_SUBJECT = "cluster-alert";
void OnReceiveClusterServicePdu(ClusterServicePdu pdu);
/**
* This executes
* @param strPeer
* @param agentId
* @param cmds
* @param stopOnError
* @return
*/
String execute(String strPeer, long agentId, String cmds, boolean stopOnError);
/**
* Broadcast the command to all of the management server nodes.
* @param agentId agent id this broadcast is regarding
* @param cmds commands to broadcast
*/
void broadcast(long agentId, String cmds);
int getHeartbeatThreshold();
void registerListener(ClusterManagerListener listener);
void unregisterListener(ClusterManagerListener listener);
void registerDispatcher(Dispatcher dispatcher);
ManagementServerHost getPeer(String peerName);
String getSelfPeerName();
public interface Dispatcher {
String getName();
String dispatch(ClusterServicePdu pdu);
}
}

View File

@ -45,44 +45,21 @@ import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.AgentManager.OnError;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.ChangeAgentAnswer;
import com.cloud.agent.api.ChangeAgentCommand;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.PropagateResourceEventCommand;
import com.cloud.agent.api.TransferAgentCommand;
import com.cloud.agent.api.ScheduleHostScanTaskCommand;
import com.cloud.agent.manager.ClusteredAgentManagerImpl;
import com.cloud.agent.manager.Commands;
import com.cloud.cluster.agentlb.dao.HostTransferMapDao;
import org.apache.cloudstack.config.ConfigDepot;
import org.apache.cloudstack.config.ConfigKey;
import org.apache.cloudstack.config.ConfigValue;
import org.apache.cloudstack.utils.identity.ManagementServerNode;
import com.cloud.cluster.dao.ManagementServerHostDao;
import com.cloud.cluster.dao.ManagementServerHostPeerDao;
import com.cloud.configuration.Config;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.exception.OperationTimedoutException;
import com.cloud.host.Host;
import com.cloud.host.HostVO;
import com.cloud.host.Status.Event;
import com.cloud.host.dao.HostDao;
import com.cloud.resource.ResourceManager;
import com.cloud.resource.ResourceState;
import com.cloud.serializer.GsonHelper;
import com.cloud.utils.DateUtil;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.Profiler;
import com.cloud.utils.PropertiesUtil;
import com.cloud.utils.component.ComponentContext;
import com.cloud.utils.component.ComponentLifecycle;
import com.cloud.utils.component.ManagerBase;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.ConnectionConcierge;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.SearchCriteria2;
import com.cloud.utils.db.SearchCriteriaService;
import com.cloud.utils.db.Transaction;
import com.cloud.utils.events.SubscriptionMgr;
import com.cloud.utils.exception.CloudRuntimeException;
@ -90,10 +67,6 @@ import com.cloud.utils.exception.ExceptionUtil;
import com.cloud.utils.mgmt.JmxUtil;
import com.cloud.utils.net.NetUtils;
import com.google.gson.Gson;
import org.apache.cloudstack.utils.identity.ManagementServerNode;
@Local(value = { ClusterManager.class })
public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
private static final Logger s_logger = Logger.getLogger(ClusterManagerImpl.class);
@ -103,18 +76,13 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
private final List<ClusterManagerListener> _listeners = new ArrayList<ClusterManagerListener>();
private final Map<Long, ManagementServerHostVO> _activePeers = new HashMap<Long, ManagementServerHostVO>();
private int _heartbeatInterval = ClusterManager.DEFAULT_HEARTBEAT_INTERVAL;
private int _heartbeatThreshold = ClusterManager.DEFAULT_HEARTBEAT_THRESHOLD;
private ConfigValue<Integer> _heartbeatInterval;
private ConfigValue<Integer> _heartbeatThreshold;
private final Map<String, ClusterService> _clusterPeers;
private final Gson _gson;
@Inject
private AgentManager _agentMgr;
@Inject
private ClusteredAgentRebalanceService _rebalanceService;
@Inject
private ResourceManager _resourceMgr;
protected ConfigDepot _configDepot;
private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-Heartbeat"));
private final ExecutorService _notificationExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("Cluster-Notification"));
@ -130,9 +98,8 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
@Inject private ManagementServerHostDao _mshostDao;
@Inject private ManagementServerHostPeerDao _mshostPeerDao;
@Inject private HostDao _hostDao;
@Inject private HostTransferMapDao _hostTransferDao;
@Inject private ConfigurationDao _configDao;
protected Dispatcher _dispatcher;
//
// pay attention to _mshostId and _msid
@ -146,9 +113,6 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
private boolean _peerScanInited = false;
private String _clusterNodeIP = "127.0.0.1";
private boolean _agentLBEnabled = false;
private double _connectedAgentsThreshold = 0.7;
private static boolean _agentLbHappened = false;
private final List<ClusterServicePdu> _clusterPduOutgoingQueue = new ArrayList<ClusterServicePdu>();
private final List<ClusterServicePdu> _clusterPduIncomingQueue = new ArrayList<ClusterServicePdu>();
@ -157,8 +121,6 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
public ClusterManagerImpl() {
_clusterPeers = new HashMap<String, ClusterService>();
_gson = GsonHelper.getGson();
// executor to perform remote-calls in another thread context, to avoid potential
// recursive remote calls between nodes
//
@ -172,6 +134,11 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
}
}
@Override
public void registerDispatcher(Dispatcher dispatcher) {
_dispatcher = dispatcher;
}
private ClusterServiceRequestPdu popRequestPdu(long ackSequenceId) {
synchronized(_outgoingPdusWaitingForAck) {
if(_outgoingPdusWaitingForAck.get(ackSequenceId) != null) {
@ -198,7 +165,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
}
for(ClusterServiceRequestPdu pdu : candidates) {
s_logger.warn("Cancel cluster request PDU to peer: " + strPeer + ", pdu: " + _gson.toJson(pdu));
s_logger.warn("Cancel cluster request PDU to peer: " + strPeer + ", pdu: " + pdu.getJsonPackage());
synchronized(pdu) {
pdu.notifyAll();
}
@ -335,10 +302,10 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
requestPdu.notifyAll();
}
} else {
s_logger.warn("Original request has already been cancelled. pdu: " + _gson.toJson(pdu));
s_logger.warn("Original request has already been cancelled. pdu: " + pdu.getJsonPackage());
}
} else {
String result = dispatchClusterServicePdu(pdu);
String result = _dispatcher.dispatch(pdu);
if(result == null)
result = "";
@ -361,187 +328,14 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
}
}
private String handleScheduleHostScanTaskCommand(ScheduleHostScanTaskCommand cmd) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Intercepting resource manager command: " + _gson.toJson(cmd));
}
try {
// schedule a scan task immediately
if (_agentMgr instanceof ClusteredAgentManagerImpl) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Received notification as part of addHost command to start a host scan task");
}
ClusteredAgentManagerImpl clusteredAgentMgr = (ClusteredAgentManagerImpl)_agentMgr;
clusteredAgentMgr.scheduleHostScanTask();
}
} catch (Exception e) {
// Scheduling host scan task in peer MS is a best effort operation during host add, regular host scan
// happens at fixed intervals anyways. So handling any exceptions that may be thrown
s_logger.warn("Exception happened while trying to schedule host scan task on mgmt server " + getSelfPeerName() + ", ignoring as regular host scan happens at fixed interval anyways", e);
return null;
}
Answer[] answers = new Answer[1];
answers[0] = new Answer(cmd, true, null);
return _gson.toJson(answers);
}
private String dispatchClusterServicePdu(ClusterServicePdu pdu) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Dispatch ->" + pdu.getAgentId() + ", json: " + pdu.getJsonPackage());
}
Command [] cmds = null;
try {
cmds = _gson.fromJson(pdu.getJsonPackage(), Command[].class);
} catch(Throwable e) {
assert(false);
s_logger.error("Excection in gson decoding : ", e);
}
if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) { //intercepted
ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0];
if (s_logger.isDebugEnabled()) {
s_logger.debug("Intercepting command for agent change: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
}
boolean result = false;
try {
result = executeAgentUserRequest(cmd.getAgentId(), cmd.getEvent());
if (s_logger.isDebugEnabled()) {
s_logger.debug("Result is " + result);
}
} catch (AgentUnavailableException e) {
s_logger.warn("Agent is unavailable", e);
return null;
}
Answer[] answers = new Answer[1];
answers[0] = new ChangeAgentAnswer(cmd, result);
return _gson.toJson(answers);
} else if (cmds.length == 1 && cmds[0] instanceof TransferAgentCommand) {
TransferAgentCommand cmd = (TransferAgentCommand) cmds[0];
if (s_logger.isDebugEnabled()) {
s_logger.debug("Intercepting command for agent rebalancing: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
}
boolean result = false;
try {
result = rebalanceAgent(cmd.getAgentId(), cmd.getEvent(), cmd.getCurrentOwner(), cmd.getFutureOwner());
if (s_logger.isDebugEnabled()) {
s_logger.debug("Result is " + result);
}
} catch (AgentUnavailableException e) {
s_logger.warn("Agent is unavailable", e);
return null;
} catch (OperationTimedoutException e) {
s_logger.warn("Operation timed out", e);
return null;
}
Answer[] answers = new Answer[1];
answers[0] = new Answer(cmd, result, null);
return _gson.toJson(answers);
} else if (cmds.length == 1 && cmds[0] instanceof PropagateResourceEventCommand ) {
PropagateResourceEventCommand cmd = (PropagateResourceEventCommand) cmds[0];
s_logger.debug("Intercepting command to propagate event " + cmd.getEvent().name() + " for host " + cmd.getHostId());
boolean result = false;
try {
result = executeResourceUserRequest(cmd.getHostId(), cmd.getEvent());
s_logger.debug("Result is " + result);
} catch (AgentUnavailableException ex) {
s_logger.warn("Agent is unavailable", ex);
return null;
}
Answer[] answers = new Answer[1];
answers[0] = new Answer(cmd, result, null);
return _gson.toJson(answers);
} else if (cmds.length == 1 && cmds[0] instanceof ScheduleHostScanTaskCommand) {
ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand) cmds[0];
String response = handleScheduleHostScanTaskCommand(cmd);
return response;
}
try {
long startTick = System.currentTimeMillis();
if(s_logger.isDebugEnabled()) {
s_logger.debug("Dispatch -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage());
}
Answer[] answers = sendToAgent(pdu.getAgentId(), cmds, pdu.isStopOnError());
if(answers != null) {
String jsonReturn = _gson.toJson(answers);
if(s_logger.isDebugEnabled()) {
s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() +
" in " + (System.currentTimeMillis() - startTick) + " ms, return result: " + jsonReturn);
}
return jsonReturn;
} else {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() +
" in " + (System.currentTimeMillis() - startTick) + " ms, return null result");
}
}
} catch(AgentUnavailableException e) {
s_logger.warn("Agent is unavailable", e);
} catch (OperationTimedoutException e) {
s_logger.warn("Timed Out", e);
}
return null;
}
@Override
public void OnReceiveClusterServicePdu(ClusterServicePdu pdu) {
addIncomingClusterPdu(pdu);
}
@Override
public Answer[] sendToAgent(Long hostId, Command[] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException {
Commands commands = new Commands(stopOnError ? OnError.Stop : OnError.Continue);
for (Command cmd : cmds) {
commands.addCommand(cmd);
}
return _agentMgr.send(hostId, commands);
}
@Override
public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException {
return _agentMgr.executeUserRequest(agentId, event);
}
@Override
public Boolean propagateAgentEvent(long agentId, Event event) throws AgentUnavailableException {
final String msPeer = getPeerName(agentId);
if (msPeer == null) {
return null;
}
if (s_logger.isDebugEnabled()) {
s_logger.debug("Propagating agent change request event:" + event.toString() + " to agent:" + agentId);
}
Command[] cmds = new Command[1];
cmds[0] = new ChangeAgentCommand(agentId, event);
Answer[] answers = execute(msPeer, agentId, cmds, true);
if (answers == null) {
throw new AgentUnavailableException(agentId);
}
if (s_logger.isDebugEnabled()) {
s_logger.debug("Result for agent change is " + answers[0].getResult());
}
return answers[0].getResult();
}
/**
* called by DatabaseUpgradeChecker to see if there are other peers running.
@ -556,10 +350,10 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
}
@Override
public void broadcast(long agentId, Command[] cmds) {
public void broadcast(long agentId, String cmds) {
Date cutTime = DateUtil.currentGMTTime();
List<ManagementServerHostVO> peers = _mshostDao.getActiveList(new Date(cutTime.getTime() - _heartbeatThreshold));
List<ManagementServerHostVO> peers = _mshostDao.getActiveList(new Date(cutTime.getTime() - _heartbeatThreshold.value()));
for (ManagementServerHostVO peer : peers) {
String peerName = Long.toString(peer.getMsid());
if (getSelfPeerName().equals(peerName)) {
@ -567,7 +361,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
}
try {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Forwarding " + cmds[0].toString() + " to " + peer.getMsid());
s_logger.debug("Forwarding " + cmds + " to " + peer.getMsid());
}
executeAsync(peerName, agentId, cmds, true);
} catch (Exception e) {
@ -576,29 +370,27 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
}
}
@Override
public void executeAsync(String strPeer, long agentId, Command [] cmds, boolean stopOnError) {
public void executeAsync(String strPeer, long agentId, String cmds, boolean stopOnError) {
ClusterServicePdu pdu = new ClusterServicePdu();
pdu.setSourcePeer(getSelfPeerName());
pdu.setDestPeer(strPeer);
pdu.setAgentId(agentId);
pdu.setJsonPackage(_gson.toJson(cmds, Command[].class));
pdu.setJsonPackage(cmds);
pdu.setStopOnError(true);
addOutgoingClusterPdu(pdu);
}
@Override
public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError) {
public String execute(String strPeer, long agentId, String cmds, boolean stopOnError) {
if(s_logger.isDebugEnabled()) {
s_logger.debug(getSelfPeerName() + " -> " + strPeer + "." + agentId + " " +
_gson.toJson(cmds, Command[].class));
s_logger.debug(getSelfPeerName() + " -> " + strPeer + "." + agentId + " " + cmds);
}
ClusterServiceRequestPdu pdu = new ClusterServiceRequestPdu();
pdu.setSourcePeer(getSelfPeerName());
pdu.setDestPeer(strPeer);
pdu.setAgentId(agentId);
pdu.setJsonPackage(_gson.toJson(cmds, Command[].class));
pdu.setJsonPackage(cmds);
pdu.setStopOnError(stopOnError);
registerRequestPdu(pdu);
addOutgoingClusterPdu(pdu);
@ -616,30 +408,12 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
}
if(pdu.getResponseResult() != null && pdu.getResponseResult().length() > 0) {
try {
return _gson.fromJson(pdu.getResponseResult(), Answer[].class);
} catch(Throwable e) {
s_logger.error("Exception on parsing gson package from remote call to " + strPeer);
}
return pdu.getResponseResult();
}
return null;
}
@Override
public String getPeerName(long agentHostId) {
HostVO host = _hostDao.findById(agentHostId);
if(host != null && host.getManagementServerId() != null) {
if(getSelfPeerName().equals(Long.toString(host.getManagementServerId()))) {
return null;
}
return Long.toString(host.getManagementServerId());
}
return null;
}
@Override
public ManagementServerHostVO getPeer(String mgmtServerId) {
return _mshostDao.findByMsid(Long.valueOf(mgmtServerId));
@ -650,7 +424,6 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
return Long.toString(_msId);
}
@Override
public String getSelfNodeIP() {
return _clusterNodeIP;
}
@ -765,7 +538,6 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
Profiler profiler = new Profiler();
Profiler profilerHeartbeatUpdate = new Profiler();
Profiler profilerPeerScan = new Profiler();
Profiler profilerAgentLB = new Profiler();
try {
profiler.start();
@ -792,40 +564,14 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
peerScan();
profilerPeerScan.stop();
profilerAgentLB.start();
//initiate agent lb task will be scheduled and executed only once, and only when number of agents loaded exceeds _connectedAgentsThreshold
if (_agentLBEnabled && !_agentLbHappened) {
SearchCriteriaService<HostVO, HostVO> sc = SearchCriteria2.create(HostVO.class);
sc.addAnd(sc.getEntity().getManagementServerId(), Op.NNULL);
sc.addAnd(sc.getEntity().getType(), Op.EQ, Host.Type.Routing);
List<HostVO> allManagedRoutingAgents = sc.list();
sc = SearchCriteria2.create(HostVO.class);
sc.addAnd(sc.getEntity().getType(), Op.EQ, Host.Type.Routing);
List<HostVO> allAgents = sc.list();
double allHostsCount = allAgents.size();
double managedHostsCount = allManagedRoutingAgents.size();
if (allHostsCount > 0.0) {
double load = managedHostsCount/allHostsCount;
if (load >= _connectedAgentsThreshold) {
s_logger.debug("Scheduling agent rebalancing task as the average agent load " + load + " is more than the threshold " + _connectedAgentsThreshold);
_rebalanceService.scheduleRebalanceAgents();
_agentLbHappened = true;
} else {
s_logger.trace("Not scheduling agent rebalancing task as the averages load " + load + " is less than the threshold " + _connectedAgentsThreshold);
}
}
}
profilerAgentLB.stop();
} finally {
profiler.stop();
if(profiler.getDuration() >= _heartbeatInterval) {
if (profiler.getDuration() >= _heartbeatInterval.value()) {
if(s_logger.isDebugEnabled())
s_logger.debug("Management server heartbeat takes too long to finish. profiler: " + profiler.toString() +
", profilerHeartbeatUpdate: " + profilerHeartbeatUpdate.toString() +
", profilerPeerScan: " + profilerPeerScan.toString() +
", profilerAgentLB: " + profilerAgentLB.toString());
", profilerPeerScan: " + profilerPeerScan.toString());
}
}
@ -964,9 +710,9 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
}
private void queueNotification(ClusterManagerMessage msg) {
synchronized(this._notificationMsgs) {
this._notificationMsgs.add(msg);
this._notificationMsgs.notifyAll();
synchronized(_notificationMsgs) {
_notificationMsgs.add(msg);
_notificationMsgs.notifyAll();
}
switch(msg.getMessageType()) {
@ -999,9 +745,9 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
}
private ClusterManagerMessage getNextNotificationMessage() {
synchronized(this._notificationMsgs) {
if(this._notificationMsgs.size() > 0) {
return this._notificationMsgs.remove(0);
synchronized(_notificationMsgs) {
if(_notificationMsgs.size() > 0) {
return _notificationMsgs.remove(0);
}
}
@ -1012,7 +758,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
// upon startup, for all inactive management server nodes that we see at startup time, we will send notification also to help upper layer perform
// missed cleanup
Date cutTime = DateUtil.currentGMTTime();
List<ManagementServerHostVO> inactiveList = _mshostDao.getInactiveList(new Date(cutTime.getTime() - _heartbeatThreshold));
List<ManagementServerHostVO> inactiveList = _mshostDao.getInactiveList(new Date(cutTime.getTime() - _heartbeatThreshold.value()));
// We don't have foreign key constraints to enforce the mgmt_server_id integrity in host table, when user manually
// remove records from mshost table, this will leave orphan mgmt_serve_id reference in host table.
@ -1043,7 +789,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
}
if(downHostList.size() > 0)
this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, downHostList));
queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, downHostList));
} else {
s_logger.info("No inactive management server node found");
}
@ -1057,7 +803,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
Profiler profilerQueryActiveList = new Profiler();
profilerQueryActiveList.start();
List<ManagementServerHostVO> currentList = _mshostDao.getActiveList(new Date(cutTime.getTime() - _heartbeatThreshold));
List<ManagementServerHostVO> currentList = _mshostDao.getActiveList(new Date(cutTime.getTime() - _heartbeatThreshold.value()));
profilerQueryActiveList.stop();
Profiler profilerSyncClusterInfo = new Profiler();
@ -1119,7 +865,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
}
}
this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, invalidatedNodeList));
queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, invalidatedNodeList));
}
profilerInvalidatedNodeList.stop();
@ -1144,7 +890,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
}
if(removedNodeList.size() > 0) {
this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, removedNodeList));
queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, removedNodeList));
}
profilerRemovedList.stop();
@ -1167,12 +913,12 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
}
if(newNodeList.size() > 0) {
this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeAdded, newNodeList));
queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeAdded, newNodeList));
}
profiler.stop();
if(profiler.getDuration() >= this._heartbeatInterval) {
if (profiler.getDuration() >= _heartbeatInterval.value()) {
if(s_logger.isDebugEnabled())
s_logger.debug("Peer scan takes too long to finish. profiler: " + profiler.toString()
+ ", profilerQueryActiveList: " + profilerQueryActiveList.toString()
@ -1208,7 +954,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
if (mshost == null) {
mshost = new ManagementServerHostVO();
mshost.setMsid(_msId);
mshost.setRunid(this.getCurrentRunId());
mshost.setRunid(getCurrentRunId());
mshost.setName(NetUtils.getHostName());
mshost.setVersion(version);
mshost.setServiceIP(_clusterNodeIP);
@ -1240,7 +986,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
_mshostPeerDao.clearPeerInfo(_mshostId);
// use seperate thread for heartbeat updates
_heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), _heartbeatInterval, _heartbeatInterval, TimeUnit.MILLISECONDS);
_heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), _heartbeatInterval.value(), _heartbeatInterval.value(), TimeUnit.MILLISECONDS);
_notificationExecutor.submit(getNotificationTask());
} catch (Throwable e) {
@ -1281,23 +1027,19 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
return true;
}
protected final ConfigKey<Integer> HeartBeatInterval = new ConfigKey<Integer>(Integer.class, "cluster.heartbeat.interval", "management-server", ClusterManager.class,
"1500", "Interval to check for the heart beat between management server nodes", false, "Seconds");
protected final ConfigKey<Integer> HeartBeatThreshold = new ConfigKey<Integer>(Integer.class, "cluster.heartbeat.threshold", "management-server", ClusterManager.class,
"150000", "Threshold before self-fence the management server", true, "Seconds");
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
if(s_logger.isInfoEnabled()) {
s_logger.info("Start configuring cluster manager : " + name);
}
Map<String, String> configs = _configDao.getConfiguration("management-server", params);
String value = configs.get("cluster.heartbeat.interval");
if (value != null) {
_heartbeatInterval = NumbersUtil.parseInt(value, ClusterManager.DEFAULT_HEARTBEAT_INTERVAL);
}
value = configs.get("cluster.heartbeat.threshold");
if (value != null) {
_heartbeatThreshold = NumbersUtil.parseInt(value, ClusterManager.DEFAULT_HEARTBEAT_THRESHOLD);
}
_heartbeatInterval = _configDepot.get(HeartBeatInterval);
_heartbeatThreshold = _configDepot.get(HeartBeatThreshold);
File dbPropsFile = PropertiesUtil.findConfigFile("db.properties");
Properties dbProps = new Properties();
@ -1337,16 +1079,6 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
throw new ConfigurationException("Unable to set current cluster service adapter");
}
_agentLBEnabled = Boolean.valueOf(_configDao.getValue(Config.AgentLbEnable.key()));
String connectedAgentsThreshold = configs.get("agent.load.threshold");
if (connectedAgentsThreshold != null) {
_connectedAgentsThreshold = Double.parseDouble(connectedAgentsThreshold);
}
this.registerListener(new LockMasterListener(_msId));
checkConflicts();
if(s_logger.isInfoEnabled()) {
@ -1355,21 +1087,14 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
return true;
}
@Override
public long getManagementNodeId() {
return _msId;
}
@Override
public long getCurrentRunId() {
return _runId;
}
@Override
public boolean isManagementNodeAlive(long msid) {
ManagementServerHostVO mshost = _mshostDao.findByMsid(msid);
if(mshost != null) {
if(mshost.getLastUpdateTime().getTime() >= DateUtil.currentGMTTime().getTime() - _heartbeatThreshold) {
if (mshost.getLastUpdateTime().getTime() >= DateUtil.currentGMTTime().getTime() - _heartbeatThreshold.value()) {
return true;
}
}
@ -1377,7 +1102,6 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
return false;
}
@Override
public boolean pingManagementNode(long msid) {
ManagementServerHostVO mshost = _mshostDao.findByMsid(msid);
if(mshost == null) {
@ -1434,20 +1158,16 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
@Override
public int getHeartbeatThreshold() {
return this._heartbeatThreshold;
return _heartbeatThreshold.value();
}
public int getHeartbeatInterval() {
return this._heartbeatInterval;
}
public void setHeartbeatThreshold(int threshold) {
_heartbeatThreshold = threshold;
return _heartbeatInterval.value();
}
private void checkConflicts() throws ConfigurationException {
Date cutTime = DateUtil.currentGMTTime();
List<ManagementServerHostVO> peers = _mshostDao.getActiveList(new Date(cutTime.getTime() - _heartbeatThreshold));
List<ManagementServerHostVO> peers = _mshostDao.getActiveList(new Date(cutTime.getTime() - _heartbeatThreshold.value()));
for(ManagementServerHostVO peer : peers) {
String peerIP = peer.getServiceIP().trim();
if(_clusterNodeIP.equals(peerIP)) {
@ -1475,43 +1195,4 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
}
}
@Override
public boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException {
return _rebalanceService.executeRebalanceRequest(agentId, currentOwnerId, futureOwnerId, event);
}
@Override
public boolean isAgentRebalanceEnabled() {
return _agentLBEnabled;
}
@Override
public Boolean propagateResourceEvent(long agentId, ResourceState.Event event) throws AgentUnavailableException {
final String msPeer = getPeerName(agentId);
if (msPeer == null) {
return null;
}
if (s_logger.isDebugEnabled()) {
s_logger.debug("Propagating agent change request event:" + event.toString() + " to agent:" + agentId);
}
Command[] cmds = new Command[1];
cmds[0] = new PropagateResourceEventCommand(agentId, event);
Answer[] answers = execute(msPeer, agentId, cmds, true);
if (answers == null) {
throw new AgentUnavailableException(agentId);
}
if (s_logger.isDebugEnabled()) {
s_logger.debug("Result for agent change is " + answers[0].getResult());
}
return answers[0].getResult();
}
@Override
public boolean executeResourceUserRequest(long hostId, ResourceState.Event event) throws AgentUnavailableException {
return _resourceMgr.executeUserRequest(hostId, event);
}
}

View File

@ -23,5 +23,4 @@ public interface ClusterManagerMBean {
public String getVersion();
public int getHeartbeatInterval();
public int getHeartbeatThreshold();
public void setHeartbeatThreshold(int threshold);
}

View File

@ -24,8 +24,8 @@ import javax.management.StandardMBean;
import com.cloud.utils.DateUtil;
public class ClusterManagerMBeanImpl extends StandardMBean implements ClusterManagerMBean {
private ClusterManagerImpl _clusterMgr;
private ManagementServerHostVO _mshostVo;
private final ClusterManagerImpl _clusterMgr;
private final ManagementServerHostVO _mshostVo;
public ClusterManagerMBeanImpl(ClusterManagerImpl clusterMgr, ManagementServerHostVO mshostVo) {
super(ClusterManagerMBean.class, false);
@ -34,34 +34,34 @@ public class ClusterManagerMBeanImpl extends StandardMBean implements ClusterMan
_mshostVo = mshostVo;
}
@Override
public long getMsid() {
return _mshostVo.getMsid();
}
@Override
public String getLastUpdateTime() {
Date date = _mshostVo.getLastUpdateTime();
return DateUtil.getDateDisplayString(TimeZone.getDefault(), date);
}
@Override
public String getClusterNodeIP() {
return _mshostVo.getServiceIP();
}
@Override
public String getVersion() {
return _mshostVo.getVersion();
}
@Override
public int getHeartbeatInterval() {
return _clusterMgr.getHeartbeatInterval();
}
@Override
public int getHeartbeatThreshold() {
return _clusterMgr.getHeartbeatThreshold();
}
public void setHeartbeatThreshold(int threshold) {
// to avoid accidentally screwing up cluster manager, we put some guarding logic here
if(threshold >= ClusterManager.DEFAULT_HEARTBEAT_THRESHOLD)
_clusterMgr.setHeartbeatThreshold(threshold);
}
}

View File

@ -31,9 +31,11 @@ import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;
import org.apache.cloudstack.config.ConfigDepot;
import org.apache.cloudstack.config.ConfigKey;
import org.apache.cloudstack.config.ConfigValue;
import com.cloud.cluster.dao.ManagementServerHostDao;
import com.cloud.configuration.Config;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.PropertiesUtil;
import com.cloud.utils.component.AdapterBase;
@ -49,14 +51,14 @@ public class ClusterServiceServletAdapter extends AdapterBase implements Cluster
@Inject private ClusterManager _manager;
@Inject private ManagementServerHostDao _mshostDao;
@Inject private ConfigurationDao _configDao;
@Inject
protected ConfigDepot _configDepot;
private ClusterServiceServletContainer _servletContainer;
private int _clusterServicePort = DEFAULT_SERVICE_PORT;
private int _clusterRequestTimeoutSeconds = DEFAULT_REQUEST_TIMEOUT;
private ConfigValue<Integer> _clusterRequestTimeoutSeconds;
@Override
public ClusterService getPeerService(String strPeer) throws RemoteException {
@ -123,12 +125,14 @@ public class ClusterServiceServletAdapter extends AdapterBase implements Cluster
return true;
}
private final ConfigKey<Integer> ClusterMessageTimeOut = new ConfigKey<Integer>(Integer.class, "cluster.message.timeout.seconds", "Advance", ClusterManager.class, "300",
"Time (in seconds) to wait before a inter-management server message post times out.", true, "Seconds");
private void init() throws ConfigurationException {
if(_mshostDao != null)
return;
String value = _configDao.getValue(Config.ClusterMessageTimeOutSeconds.key());
_clusterRequestTimeoutSeconds = NumbersUtil.parseInt(value, DEFAULT_REQUEST_TIMEOUT);
_clusterRequestTimeoutSeconds = _configDepot.get(ClusterMessageTimeOut);
s_logger.info("Configure cluster request time out. timeout: " + _clusterRequestTimeoutSeconds + " seconds");
File dbPropsFile = PropertiesUtil.findConfigFile("db.properties");

View File

@ -27,23 +27,25 @@ import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.httpclient.params.HttpClientParams;
import org.apache.log4j.Logger;
import org.apache.cloudstack.config.ConfigValue;
public class ClusterServiceServletImpl implements ClusterService {
private static final long serialVersionUID = 4574025200012566153L;
private static final Logger s_logger = Logger.getLogger(ClusterServiceServletImpl.class);
private String _serviceUrl;
private int _requestTimeoutSeconds;
private ConfigValue<Integer> _requestTimeoutSeconds;
protected static HttpClient s_client = null;
public ClusterServiceServletImpl() {
}
public ClusterServiceServletImpl(String serviceUrl, int requestTimeoutSeconds) {
s_logger.info("Setup cluster service servlet. service url: " + serviceUrl + ", request timeout: " + requestTimeoutSeconds + " seconds");
public ClusterServiceServletImpl(String serviceUrl, ConfigValue<Integer> requestTimeoutSeconds) {
s_logger.info("Setup cluster service servlet. service url: " + serviceUrl + ", request timeout: " + requestTimeoutSeconds.value() + " seconds");
this._serviceUrl = serviceUrl;
this._requestTimeoutSeconds = requestTimeoutSeconds;
_serviceUrl = serviceUrl;
_requestTimeoutSeconds = requestTimeoutSeconds;
}
@Override
@ -125,7 +127,7 @@ public class ClusterServiceServletImpl implements ClusterService {
s_client = new HttpClient(mgr);
HttpClientParams clientParams = new HttpClientParams();
clientParams.setSoTimeout(_requestTimeoutSeconds * 1000);
clientParams.setSoTimeout(_requestTimeoutSeconds.value() * 1000);
s_client.setParams(clientParams);
}

View File

@ -16,9 +16,9 @@
// under the License.
package com.cloud.cluster;
import org.apache.cloudstack.api.InternalIdentity;
public interface ManagementServerHost extends InternalIdentity {
public interface ManagementServerHost {
long getId();
public static enum State {
Up, Starting, Down
@ -29,4 +29,6 @@ public interface ManagementServerHost extends InternalIdentity {
State getState();
String getVersion();
String getServiceIP();
}

View File

@ -30,11 +30,10 @@ import javax.persistence.Temporal;
import javax.persistence.TemporalType;
import com.cloud.utils.DateUtil;
import org.apache.cloudstack.api.InternalIdentity;
@Entity
@Table(name="mshost_peer")
public class ManagementServerHostPeerVO implements InternalIdentity {
public class ManagementServerHostPeerVO {
@Id
@GeneratedValue(strategy=GenerationType.IDENTITY)
@ -67,7 +66,7 @@ public class ManagementServerHostPeerVO implements InternalIdentity {
this.peerRunid = peerRunid;
this.peerState = peerState;
this.lastUpdateTime = DateUtil.currentGMTTime();
lastUpdateTime = DateUtil.currentGMTTime();
}
public long getId() {

View File

@ -30,7 +30,6 @@ import javax.persistence.Temporal;
import javax.persistence.TemporalType;
import com.cloud.utils.db.GenericDao;
import org.apache.cloudstack.api.InternalIdentity;
@Entity
@Table(name="mshost")
@ -81,9 +80,10 @@ public class ManagementServerHostVO implements ManagementServerHost {
this.runid = runid;
this.serviceIP = serviceIP;
this.servicePort = servicePort;
this.lastUpdateTime = updateTime;
lastUpdateTime = updateTime;
}
@Override
public long getId() {
return id;
}
@ -119,7 +119,7 @@ public class ManagementServerHostVO implements ManagementServerHost {
@Override
public ManagementServerHost.State getState() {
return this.state;
return state;
}
public void setState(ManagementServerHost.State state) {

View File

@ -27,7 +27,6 @@ import java.util.TimeZone;
import javax.ejb.Local;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;
import com.cloud.cluster.ClusterInvalidSessionException;
import com.cloud.cluster.ManagementServerHost;
@ -42,7 +41,6 @@ import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.Transaction;
import com.cloud.utils.exception.CloudRuntimeException;
@Component
@Local(value={ManagementServerHostDao.class})
public class ManagementServerHostDaoImpl extends GenericDaoBase<ManagementServerHostVO, Long> implements ManagementServerHostDao {
private static final Logger s_logger = Logger.getLogger(ManagementServerHostDaoImpl.class);

View File

@ -21,7 +21,6 @@ import java.util.List;
import javax.ejb.Local;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;
import com.cloud.cluster.ManagementServerHost;
import com.cloud.cluster.ManagementServerHostPeerVO;
@ -31,7 +30,6 @@ import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.Transaction;
@Component
@Local(value={ManagementServerHostPeerDao.class})
public class ManagementServerHostPeerDaoImpl extends GenericDaoBase<ManagementServerHostPeerVO, Long> implements ManagementServerHostPeerDao {
private static final Logger s_logger = Logger.getLogger(ManagementServerHostPeerDaoImpl.class);

View File

@ -34,5 +34,7 @@
<module>rest</module>
<module>events</module>
<module>jobs</module>
<module>cluster</module>
<module>db</module>
</modules>
</project>

View File

@ -33,6 +33,11 @@
<artifactId>cloud-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.cloudstack</groupId>
<artifactId>cloud-framework-cluster</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>

View File

@ -44,22 +44,30 @@ import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.log4j.Logger;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
import com.google.gson.Gson;
import org.apache.cloudstack.config.ConfigDepot;
import org.apache.cloudstack.config.ConfigKey;
import org.apache.cloudstack.config.ConfigValue;
import org.apache.cloudstack.utils.identity.ManagementServerNode;
import com.cloud.agent.AgentManager;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.CancelCommand;
import com.cloud.agent.api.ChangeAgentAnswer;
import com.cloud.agent.api.ChangeAgentCommand;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.TransferAgentCommand;
import com.cloud.agent.api.PropagateResourceEventCommand;
import com.cloud.agent.api.ScheduleHostScanTaskCommand;
import com.cloud.agent.api.TransferAgentCommand;
import com.cloud.agent.transport.Request;
import com.cloud.agent.transport.Request.Version;
import com.cloud.agent.transport.Response;
import com.cloud.api.ApiDBUtils;
import com.cloud.cluster.ClusterManager;
import com.cloud.cluster.ClusterManagerListener;
import com.cloud.cluster.ClusterServicePdu;
import com.cloud.cluster.ClusteredAgentRebalanceService;
import com.cloud.cluster.ManagementServerHost;
import com.cloud.cluster.ManagementServerHostVO;
@ -68,7 +76,6 @@ import com.cloud.cluster.agentlb.HostTransferMapVO;
import com.cloud.cluster.agentlb.HostTransferMapVO.HostTransferState;
import com.cloud.cluster.agentlb.dao.HostTransferMapDao;
import com.cloud.cluster.dao.ManagementServerHostDao;
import com.cloud.configuration.Config;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.exception.OperationTimedoutException;
@ -76,10 +83,11 @@ import com.cloud.host.Host;
import com.cloud.host.HostVO;
import com.cloud.host.Status;
import com.cloud.host.Status.Event;
import com.cloud.resource.ResourceState;
import com.cloud.resource.ServerResource;
import com.cloud.storage.resource.DummySecondaryStorageResource;
import com.cloud.utils.DateUtil;
import com.cloud.utils.NumbersUtil;
import com.cloud.utils.Profiler;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.SearchCriteria2;
@ -98,10 +106,10 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
public final static long STARTUP_DELAY = 5000;
public final static long SCAN_INTERVAL = 90000; // 90 seconds, it takes 60 sec for xenserver to fail login
public final static int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds
public long _loadSize = 100;
protected int _directAgentScanInterval = 90; // 90 seconds
protected Set<Long> _agentToTransferIds = new HashSet<Long>();
Gson _gson;
@Inject
protected ClusterManager _clusterMgr = null;
@ -118,29 +126,45 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
@Inject protected List<AgentLoadBalancerPlanner> _lbPlanners;
@Inject ConfigurationDao _configDao;
@Inject
ConfigDepot _configDepot;
protected ClusteredAgentManagerImpl() {
super();
}
protected final ConfigKey<Boolean> EnableLB = new ConfigKey<Boolean>(Boolean.class, "agent.lb.enabled", "Advanced", AgentManager.class, "false",
"Enable agent load balancing between management server nodes", true, "True/False");
protected final ConfigKey<Double> ConnectedAgentThreshold = new ConfigKey<Double>(Double.class, "agent.load.threshold", "Advanced", AgentManager.class, "0.7",
"What percentage of the agents can be held by one management server before load balancing happens", true, "0-1");
protected final ConfigKey<Integer> LoadSize = new ConfigKey<Integer>(Integer.class, "direct.agent.load.size", "Advanced", AgentManager.class, "16",
"How many agents to connect to in each round", true, "");
protected final ConfigKey<Integer> ScanInterval = new ConfigKey<Integer>(Integer.class, "direct.agent.scan.interval", "Advanced", AgentManager.class, "90",
"Interval between scans to load agents", false, "Seconds");
protected ConfigValue<Boolean> _agentLBEnabled;
protected ConfigValue<Double> _connectedAgentsThreshold;
protected ConfigValue<Integer> _loadSize;
protected ConfigValue<Integer> _directAgentScanInterval;
@Override
public boolean configure(String name, Map<String, Object> xmlParams) throws ConfigurationException {
_peers = new HashMap<String, SocketChannel>(7);
_sslEngines = new HashMap<String, SSLEngine>(7);
_nodeId = _clusterMgr.getManagementNodeId();
_nodeId = ManagementServerNode.getManagementServerId();
s_logger.info("Configuring ClusterAgentManagerImpl. management server node id(msid): " + _nodeId);
Map<String, String> params = _configDao.getConfiguration(xmlParams);
String value = params.get(Config.DirectAgentLoadSize.key());
_loadSize = NumbersUtil.parseInt(value, 16);
value = params.get(Config.DirectAgentScanInterval.key());
_directAgentScanInterval = NumbersUtil.parseInt(value, 90); // defaulted to 90 seconds
_loadSize = _configDepot.get(LoadSize);
_directAgentScanInterval = _configDepot.get(ScanInterval).setMultiplier(1000);
_agentLBEnabled = _configDepot.get(EnableLB);
_connectedAgentsThreshold = _configDepot.get(ConnectedAgentThreshold);
ClusteredAgentAttache.initialize(this);
_clusterMgr.registerListener(this);
_clusterMgr.registerDispatcher(new ClusterDispatcher());
return super.configure(name, xmlParams);
}
@ -150,13 +174,13 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
if (!super.start()) {
return false;
}
_timer.schedule(new DirectAgentScanTimerTask(), STARTUP_DELAY, _directAgentScanInterval * 1000);
_timer.schedule(new DirectAgentScanTimerTask(), STARTUP_DELAY, _directAgentScanInterval.value());
if (s_logger.isDebugEnabled()) {
s_logger.debug("Scheduled direct agent scan task to run at an interval of " + _directAgentScanInterval + " seconds");
s_logger.debug("Scheduled direct agent scan task to run at an interval of " + _directAgentScanInterval.value() + " seconds");
}
// schedule transfer scan executor - if agent LB is enabled
if (_clusterMgr.isAgentRebalanceEnabled()) {
if (isAgentRebalanceEnabled()) {
s_transferExecutor.scheduleAtFixedRate(getTransferScanTask(), 60000, ClusteredAgentRebalanceService.DEFAULT_TRANSFER_CHECK_INTERVAL,
TimeUnit.MILLISECONDS);
}
@ -182,7 +206,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
// for agents that are self-managed, threshold to be considered as disconnected after pingtimeout
long cutSeconds = (System.currentTimeMillis() >> 10) - (_pingTimeout);
List<HostVO> hosts = _hostDao.findAndUpdateDirectAgentToLoad(cutSeconds, _loadSize, _nodeId);
List<HostVO> hosts = _hostDao.findAndUpdateDirectAgentToLoad(cutSeconds, _loadSize.value().longValue(), _nodeId);
List<HostVO> appliances = _hostDao.findAndUpdateApplianceToLoad(cutSeconds, _nodeId);
hosts.addAll(appliances);
@ -319,7 +343,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
AgentAttache attache = findAttache(hostId);
if (attache != null) {
//don't process disconnect if the host is being rebalanced
if (_clusterMgr.isAgentRebalanceEnabled()) {
if (isAgentRebalanceEnabled()) {
HostTransferMapVO transferVO = _hostTransferDao.findById(hostId);
if (transferVO != null) {
if (transferVO.getFutureOwner() == _nodeId && transferVO.getState() == HostTransferState.TransferStarted) {
@ -351,7 +375,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
public boolean reconnect(final long hostId) {
Boolean result;
try {
result = _clusterMgr.propagateAgentEvent(hostId, Event.ShutdownRequested);
result = propagateAgentEvent(hostId, Event.ShutdownRequested);
if (result != null) {
return result;
}
@ -366,7 +390,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
public void notifyNodesInCluster(AgentAttache attache) {
s_logger.debug("Notifying other nodes of to disconnect");
Command[] cmds = new Command[] { new ChangeAgentCommand(attache.getId(), Event.AgentDisconnected) };
_clusterMgr.broadcast(attache.getId(), cmds);
_clusterMgr.broadcast(attache.getId(), _gson.toJson(cmds));
}
// notifies MS peers to schedule a host scan task immediately, triggered during addHost operation
@ -375,7 +399,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
s_logger.debug("Notifying other MS nodes to run host scan task");
}
Command[] cmds = new Command[] { new ScheduleHostScanTaskCommand() };
_clusterMgr.broadcast(0, cmds);
_clusterMgr.broadcast(0, _gson.toJson(cmds));
}
protected static void logT(byte[] bytes, final String msg) {
@ -428,7 +452,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
public String findPeer(long hostId) {
return _clusterMgr.getPeerName(hostId);
return getPeerName(hostId);
}
public SSLEngine getSSLEngine(String peerName) {
@ -468,7 +492,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
}
if (ch == null || ch == prevCh) {
ManagementServerHostVO ms = _clusterMgr.getPeer(peerName);
ManagementServerHost ms = _clusterMgr.getPeer(peerName);
if (ms == null) {
s_logger.info("Unable to find peer: " + peerName);
return null;
@ -514,7 +538,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
public SocketChannel connectToPeer(long hostId, SocketChannel prevCh) {
String peerName = _clusterMgr.getPeerName(hostId);
String peerName = getPeerName(hostId);
if (peerName == null) {
return null;
}
@ -861,7 +885,9 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
s_logger.debug("Forwarding " + cmds[0].toString() + " to " + peer);
}
String peerName = Long.toString(peer);
Answer[] answers = _clusterMgr.execute(peerName, agentId, cmds, true);
String cmdStr = _gson.toJson(cmds);
String ansStr = _clusterMgr.execute(peerName, agentId, cmdStr, true);
Answer[] answers = _gson.fromJson(ansStr, Answer[].class);
return answers;
} catch (Exception e) {
s_logger.warn("Caught exception while talking to " + currentOwnerId, e);
@ -869,6 +895,46 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
}
public String getPeerName(long agentHostId) {
HostVO host = _hostDao.findById(agentHostId);
if (host != null && host.getManagementServerId() != null) {
if (_clusterMgr.getSelfPeerName().equals(Long.toString(host.getManagementServerId()))) {
return null;
}
return Long.toString(host.getManagementServerId());
}
return null;
}
public Boolean propagateAgentEvent(long agentId, Event event) throws AgentUnavailableException {
final String msPeer = getPeerName(agentId);
if (msPeer == null) {
return null;
}
if (s_logger.isDebugEnabled()) {
s_logger.debug("Propagating agent change request event:" + event.toString() + " to agent:" + agentId);
}
Command[] cmds = new Command[1];
cmds[0] = new ChangeAgentCommand(agentId, event);
String ansStr = _clusterMgr.execute(msPeer, agentId, _gson.toJson(cmds), true);
if (ansStr == null) {
throw new AgentUnavailableException(agentId);
}
Answer[] answers = _gson.fromJson(ansStr, Answer[].class);
if (s_logger.isDebugEnabled()) {
s_logger.debug("Result for agent change is " + answers[0].getResult());
}
return answers[0].getResult();
}
private Runnable getTransferScanTask() {
return new Runnable() {
@Override
@ -1143,4 +1209,227 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
}
private String handleScheduleHostScanTaskCommand(ScheduleHostScanTaskCommand cmd) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Intercepting resource manager command: " + _gson.toJson(cmd));
}
try {
scheduleHostScanTask();
} catch (Exception e) {
// Scheduling host scan task in peer MS is a best effort operation during host add, regular host scan
// happens at fixed intervals anyways. So handling any exceptions that may be thrown
s_logger.warn("Exception happened while trying to schedule host scan task on mgmt server " + _clusterMgr.getSelfPeerName()
+ ", ignoring as regular host scan happens at fixed interval anyways", e);
return null;
}
Answer[] answers = new Answer[1];
answers[0] = new Answer(cmd, true, null);
return _gson.toJson(answers);
}
public Answer[] sendToAgent(Long hostId, Command[] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException {
Commands commands = new Commands(stopOnError ? OnError.Stop : OnError.Continue);
for (Command cmd : cmds) {
commands.addCommand(cmd);
}
return send(hostId, commands);
}
public Boolean propagateResourceEvent(long agentId, ResourceState.Event event) throws AgentUnavailableException {
final String msPeer = getPeerName(agentId);
if (msPeer == null) {
return null;
}
if (s_logger.isDebugEnabled()) {
s_logger.debug("Propagating agent change request event:" + event.toString() + " to agent:" + agentId);
}
Command[] cmds = new Command[1];
cmds[0] = new PropagateResourceEventCommand(agentId, event);
String AnsStr = _clusterMgr.execute(msPeer, agentId, _gson.toJson(cmds), true);
if (AnsStr == null) {
throw new AgentUnavailableException(agentId);
}
Answer[] answers = _gson.fromJson(AnsStr, Answer[].class);
if (s_logger.isDebugEnabled()) {
s_logger.debug("Result for agent change is " + answers[0].getResult());
}
return answers[0].getResult();
}
public boolean executeResourceUserRequest(long hostId, ResourceState.Event event) throws AgentUnavailableException {
return _resourceMgr.executeUserRequest(hostId, event);
}
protected class ClusterDispatcher implements ClusterManager.Dispatcher {
@Override
public String getName() {
return "ClusterDispatcher";
}
@Override
public String dispatch(ClusterServicePdu pdu) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Dispatch ->" + pdu.getAgentId() + ", json: " + pdu.getJsonPackage());
}
Command[] cmds = null;
try {
cmds = _gson.fromJson(pdu.getJsonPackage(), Command[].class);
} catch (Throwable e) {
assert (false);
s_logger.error("Excection in gson decoding : ", e);
}
if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) { //intercepted
ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0];
if (s_logger.isDebugEnabled()) {
s_logger.debug("Intercepting command for agent change: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
}
boolean result = false;
try {
result = executeAgentUserRequest(cmd.getAgentId(), cmd.getEvent());
if (s_logger.isDebugEnabled()) {
s_logger.debug("Result is " + result);
}
} catch (AgentUnavailableException e) {
s_logger.warn("Agent is unavailable", e);
return null;
}
Answer[] answers = new Answer[1];
answers[0] = new ChangeAgentAnswer(cmd, result);
return _gson.toJson(answers);
} else if (cmds.length == 1 && cmds[0] instanceof TransferAgentCommand) {
TransferAgentCommand cmd = (TransferAgentCommand)cmds[0];
if (s_logger.isDebugEnabled()) {
s_logger.debug("Intercepting command for agent rebalancing: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
}
boolean result = false;
try {
result = rebalanceAgent(cmd.getAgentId(), cmd.getEvent(), cmd.getCurrentOwner(), cmd.getFutureOwner());
if (s_logger.isDebugEnabled()) {
s_logger.debug("Result is " + result);
}
} catch (AgentUnavailableException e) {
s_logger.warn("Agent is unavailable", e);
return null;
} catch (OperationTimedoutException e) {
s_logger.warn("Operation timed out", e);
return null;
}
Answer[] answers = new Answer[1];
answers[0] = new Answer(cmd, result, null);
return _gson.toJson(answers);
} else if (cmds.length == 1 && cmds[0] instanceof PropagateResourceEventCommand) {
PropagateResourceEventCommand cmd = (PropagateResourceEventCommand)cmds[0];
s_logger.debug("Intercepting command to propagate event " + cmd.getEvent().name() + " for host " + cmd.getHostId());
boolean result = false;
try {
result = executeResourceUserRequest(cmd.getHostId(), cmd.getEvent());
s_logger.debug("Result is " + result);
} catch (AgentUnavailableException ex) {
s_logger.warn("Agent is unavailable", ex);
return null;
}
Answer[] answers = new Answer[1];
answers[0] = new Answer(cmd, result, null);
return _gson.toJson(answers);
} else if (cmds.length == 1 && cmds[0] instanceof ScheduleHostScanTaskCommand) {
ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand)cmds[0];
String response = handleScheduleHostScanTaskCommand(cmd);
return response;
}
try {
long startTick = System.currentTimeMillis();
if (s_logger.isDebugEnabled()) {
s_logger.debug("Dispatch -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage());
}
Answer[] answers = sendToAgent(pdu.getAgentId(), cmds, pdu.isStopOnError());
if (answers != null) {
String jsonReturn = _gson.toJson(answers);
if (s_logger.isDebugEnabled()) {
s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() +
" in " + (System.currentTimeMillis() - startTick) + " ms, return result: " + jsonReturn);
}
return jsonReturn;
} else {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() +
" in " + (System.currentTimeMillis() - startTick) + " ms, return null result");
}
}
} catch (AgentUnavailableException e) {
s_logger.warn("Agent is unavailable", e);
} catch (OperationTimedoutException e) {
s_logger.warn("Timed Out", e);
}
return null;
}
}
public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException {
return executeUserRequest(agentId, event);
}
public boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException {
return _rebalanceService.executeRebalanceRequest(agentId, currentOwnerId, futureOwnerId, event);
}
public boolean isAgentRebalanceEnabled() {
return _agentLBEnabled.value();
}
@Inject
private ClusteredAgentRebalanceService _rebalanceService;
boolean _agentLbHappened = false;
public void agentrebalance() {
Profiler profilerAgentLB = new Profiler();
profilerAgentLB.start();
//initiate agent lb task will be scheduled and executed only once, and only when number of agents loaded exceeds _connectedAgentsThreshold
if (_agentLBEnabled.value() && !_agentLbHappened) {
SearchCriteriaService<HostVO, HostVO> sc = SearchCriteria2.create(HostVO.class);
sc.addAnd(sc.getEntity().getManagementServerId(), Op.NNULL);
sc.addAnd(sc.getEntity().getType(), Op.EQ, Host.Type.Routing);
List<HostVO> allManagedRoutingAgents = sc.list();
sc = SearchCriteria2.create(HostVO.class);
sc.addAnd(sc.getEntity().getType(), Op.EQ, Host.Type.Routing);
List<HostVO> allAgents = sc.list();
double allHostsCount = allAgents.size();
double managedHostsCount = allManagedRoutingAgents.size();
if (allHostsCount > 0.0) {
double load = managedHostsCount / allHostsCount;
if (load >= _connectedAgentsThreshold.value()) {
s_logger.debug("Scheduling agent rebalancing task as the average agent load " + load + " is more than the threshold " + _connectedAgentsThreshold);
_rebalanceService.scheduleRebalanceAgents();
_agentLbHappened = true;
} else {
s_logger.trace("Not scheduling agent rebalancing task as the averages load " + load + " is less than the threshold " + _connectedAgentsThreshold);
}
}
}
profilerAgentLB.stop();
}
}

View File

@ -53,6 +53,7 @@ import org.apache.cloudstack.api.response.ExceptionResponse;
import org.apache.cloudstack.context.CallContext;
import org.apache.cloudstack.framework.events.EventBus;
import org.apache.cloudstack.framework.events.EventBusException;
import org.apache.cloudstack.utils.identity.ManagementServerNode;
import com.cloud.api.ApiDBUtils;
import com.cloud.api.ApiDispatcher;
@ -87,7 +88,6 @@ import com.cloud.utils.db.Transaction;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.exception.ExceptionUtil;
import com.cloud.utils.mgmt.JmxUtil;
import com.cloud.utils.net.MacAddress;
@Component
@Local(value={AsyncJobManager.class})
@ -773,11 +773,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
private long getMsid() {
if(_clusterMgr != null) {
return _clusterMgr.getManagementNodeId();
}
return MacAddress.getMacAddress().toLong();
return ManagementServerNode.getManagementServerId();
}
private void cleanupPendingJobs(List<SyncQueueItemVO> l) {

View File

@ -1,67 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.cluster;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.exception.OperationTimedoutException;
import com.cloud.host.Status.Event;
import com.cloud.resource.ResourceState;
import com.cloud.utils.component.Manager;
public interface ClusterManager extends Manager {
public static final int DEFAULT_HEARTBEAT_INTERVAL = 1500;
public static final int DEFAULT_HEARTBEAT_THRESHOLD = 150000;
public static final String ALERT_SUBJECT = "cluster-alert";
public void OnReceiveClusterServicePdu(ClusterServicePdu pdu);
public void executeAsync(String strPeer, long agentId, Command [] cmds, boolean stopOnError);
public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError);
public Answer[] sendToAgent(Long hostId, Command [] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException;
public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException;
public Boolean propagateAgentEvent(long agentId, Event event) throws AgentUnavailableException;
public Boolean propagateResourceEvent(long agentId, ResourceState.Event event) throws AgentUnavailableException;
public boolean executeResourceUserRequest(long hostId, ResourceState.Event event) throws AgentUnavailableException;
public int getHeartbeatThreshold();
public long getManagementNodeId(); // msid of current management server node
public boolean isManagementNodeAlive(long msid);
public boolean pingManagementNode(long msid);
public long getCurrentRunId();
public String getSelfPeerName();
public String getSelfNodeIP();
public String getPeerName(long agentHostId);
public void registerListener(ClusterManagerListener listener);
public void unregisterListener(ClusterManagerListener listener);
public ManagementServerHostVO getPeer(String peerName);
/**
* Broadcast the command to all of the management server nodes.
* @param agentId agent id this broadcast is regarding
* @param cmds commands to broadcast
*/
public void broadcast(long agentId, Command[] cmds);
boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException;
boolean isAgentRebalanceEnabled();
}

View File

@ -1,179 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.cluster;
import java.util.Map;
import javax.ejb.Local;
import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.Command;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.exception.OperationTimedoutException;
import com.cloud.host.Status.Event;
import com.cloud.utils.component.ManagerBase;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.net.MacAddress;
@Local(value={ClusterManager.class})
public class DummyClusterManagerImpl extends ManagerBase implements ClusterManager {
private static final Logger s_logger = Logger.getLogger(DummyClusterManagerImpl.class);
protected long _id = MacAddress.getMacAddress().toLong();
protected long _runId = System.currentTimeMillis();
private final String _clusterNodeIP = "127.0.0.1";
@Override
public void OnReceiveClusterServicePdu(ClusterServicePdu pdu) {
throw new CloudRuntimeException("Unsupported feature");
}
@Override
public void executeAsync(String strPeer, long agentId, Command [] cmds, boolean stopOnError) {
throw new CloudRuntimeException("Unsupported feature");
}
@Override
public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError) {
throw new CloudRuntimeException("Unsupported feature");
}
@Override
public Answer[] sendToAgent(Long hostId, Command [] cmds, boolean stopOnError)
throws AgentUnavailableException, OperationTimedoutException {
throw new CloudRuntimeException("Unsupported feature");
}
/*
@Override
public long sendToAgent(Long hostId, Command[] cmds, boolean stopOnError, Listener listener) throws AgentUnavailableException {
throw new CloudRuntimeException("Unsupported feature");
}
*/
@Override
public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException {
throw new CloudRuntimeException("Unsupported feature");
}
@Override
public Boolean propagateAgentEvent(long agentId, Event event) throws AgentUnavailableException {
throw new CloudRuntimeException("Unsupported feature");
}
@Override
public int getHeartbeatThreshold() {
return ClusterManager.DEFAULT_HEARTBEAT_INTERVAL;
}
@Override
public long getManagementNodeId() {
return _id;
}
@Override
public long getCurrentRunId() {
return _runId;
}
@Override
public ManagementServerHostVO getPeer(String str) {
return null;
}
@Override
public String getSelfPeerName() {
return Long.toString(_id);
}
@Override
public String getSelfNodeIP() {
return _clusterNodeIP;
}
@Override
public boolean isManagementNodeAlive(long msid) {
return true;
}
@Override
public boolean pingManagementNode(long msid) {
return false;
}
@Override
public String getPeerName(long agentHostId) {
throw new CloudRuntimeException("Unsupported feature");
}
@Override
public void registerListener(ClusterManagerListener listener) {
}
@Override
public void unregisterListener(ClusterManagerListener listener) {
}
@Override
public boolean configure(String name, Map<String, Object> params)
throws ConfigurationException {
return true;
}
@Override
public void broadcast(long hostId, Command[] cmds) {
}
@Override
public boolean start() {
if(s_logger.isInfoEnabled())
s_logger.info("Starting cluster manager, msid : " + _id);
return true;
}
@Override
public boolean stop() {
return true;
}
@Override
public boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException {
return false;
}
@Override
public boolean isAgentRebalanceEnabled() {
return false;
}
@Override
public Boolean propagateResourceEvent(long agentId, com.cloud.resource.ResourceState.Event event) throws AgentUnavailableException {
// TODO Auto-generated method stub
return null;
}
@Override
public boolean executeResourceUserRequest(long hostId, com.cloud.resource.ResourceState.Event event) throws AgentUnavailableException {
// TODO Auto-generated method stub
return false;
}
}

View File

@ -14,10 +14,12 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.cloud.cluster;
package com.cloud.server;
import java.util.List;
import com.cloud.cluster.ClusterManagerListener;
import com.cloud.cluster.ManagementServerHostVO;
import com.cloud.utils.db.Merovingian2;
/**

View File

@ -427,6 +427,7 @@ import org.apache.cloudstack.engine.subsystem.api.storage.StoragePoolAllocator;
import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory;
import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
import org.apache.cloudstack.utils.identity.ManagementServerNode;
import com.cloud.agent.AgentManager;
import com.cloud.agent.api.GetVncPortAnswer;
@ -825,6 +826,8 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe
public boolean start() {
s_logger.info("Startup CloudStack management server...");
_clusterMgr.registerListener(new LockMasterListener(ManagementServerNode.getManagementServerId()));
enableAdminUser("password");
return true;
}