diff --git a/api/src/com/cloud/agent/api/Command.java b/api/src/com/cloud/agent/api/Command.java
index b576108153f..120ed6c7cb6 100755
--- a/api/src/com/cloud/agent/api/Command.java
+++ b/api/src/com/cloud/agent/api/Command.java
@@ -56,4 +56,8 @@ public abstract class Command {
public String getContextParam(String name) {
return contextMap.get(name);
}
+
+ public boolean allowCaching() {
+ return true;
+ }
}
diff --git a/api/src/com/cloud/agent/api/TransferAgentCommand.java b/api/src/com/cloud/agent/api/TransferAgentCommand.java
new file mode 100644
index 00000000000..b2fd36b72a0
--- /dev/null
+++ b/api/src/com/cloud/agent/api/TransferAgentCommand.java
@@ -0,0 +1,52 @@
+/**
+ * Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
+ *
+ * This software is licensed under the GNU General Public License v3 or later.
+ *
+ * It is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or any later version.
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *
+ */
+package com.cloud.agent.api;
+
+import com.cloud.host.Status.Event;
+
+public class TransferAgentCommand extends Command {
+ protected long agentId;
+ protected long futureOwner;
+ Event event;
+
+ protected TransferAgentCommand() {
+ }
+
+ public TransferAgentCommand(long agentId, long futureOwner, Event event) {
+ this.agentId = agentId;
+ this.futureOwner = futureOwner;
+ this.event = event;
+ }
+
+ public long getAgentId() {
+ return agentId;
+ }
+
+ public long getFutureOwner() {
+ return futureOwner;
+ }
+
+ public Event getEvent() {
+ return event;
+ }
+
+ @Override
+ public boolean executeInSequence() {
+ return false;
+ }
+}
diff --git a/api/src/com/cloud/cluster/ManagementServerHost.java b/api/src/com/cloud/cluster/ManagementServerHost.java
new file mode 100644
index 00000000000..82f6a755bcf
--- /dev/null
+++ b/api/src/com/cloud/cluster/ManagementServerHost.java
@@ -0,0 +1,33 @@
+/**
+ * Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
+ *
+ * This software is licensed under the GNU General Public License v3 or later.
+ *
+ * It is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or any later version.
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+
+package com.cloud.cluster;
+
+
+public interface ManagementServerHost {
+
+ public static enum State { Up, Starting, Down };
+
+ long getMsid();
+
+ State getState();
+
+ String getVersion();
+
+}
diff --git a/api/src/com/cloud/host/Status.java b/api/src/com/cloud/host/Status.java
index 1af9dfdf43d..87da0ef2bb7 100644
--- a/api/src/com/cloud/host/Status.java
+++ b/api/src/com/cloud/host/Status.java
@@ -32,7 +32,8 @@ public enum Status {
ErrorInMaintenance(false, false, false),
Maintenance(false, false, false),
Alert(true, true, true),
- Removed(true, false, true);
+ Removed(true, false, true),
+ Rebalance(false, false, false);
private final boolean updateManagementServer;
private final boolean checkManagementServer;
@@ -72,7 +73,11 @@ public enum Status {
WaitedTooLong(false, "Waited too long from the agent to reconnect on its own. Time to do HA"),
Remove(true, "Host is removed"),
Ready(false, "Host is ready for commands"),
- UpdatePassword(false, "Update host password from db");
+ UpdatePassword(false, "Update host password from db"),
+ RequestAgentRebalance(false, "Request rebalance for the certain host"),
+ StartAgentRebalance(false, "Start rebalance for the certain host"),
+ RebalanceCompleted(false, "Host is rebalanced successfully"),
+ RebalanceFailed(false, "Failed to rebalance the host");
private final boolean isUserRequest;
private final String comment;
@@ -132,6 +137,7 @@ public enum Status {
s_fsm.addTransition(Status.Up, Event.Ping, Status.Up);
s_fsm.addTransition(Status.Up, Event.AgentConnected, Status.Connecting);
s_fsm.addTransition(Status.Up, Event.ManagementServerDown, Status.Disconnected);
+ s_fsm.addTransition(Status.Up, Event.StartAgentRebalance, Status.Rebalance);
s_fsm.addTransition(Status.Updating, Event.PingTimeout, Status.Alert);
s_fsm.addTransition(Status.Updating, Event.Ping, Status.Updating);
s_fsm.addTransition(Status.Updating, Event.AgentConnected, Status.Connecting);
@@ -177,6 +183,8 @@ public enum Status {
s_fsm.addTransition(Status.Alert, Event.Ping, Status.Up);
s_fsm.addTransition(Status.Alert, Event.Remove, Status.Removed);
s_fsm.addTransition(Status.Alert, Event.ManagementServerDown, Status.Alert);
+ s_fsm.addTransition(Status.Rebalance, Event.RebalanceFailed, Status.Alert);
+ s_fsm.addTransition(Status.Rebalance, Event.RebalanceCompleted, Status.Connecting);
}
public static void main(String[] args) {
diff --git a/client/tomcatconf/components.xml.in b/client/tomcatconf/components.xml.in
index 17a6d3d3c54..a991e84fcde 100755
--- a/client/tomcatconf/components.xml.in
+++ b/client/tomcatconf/components.xml.in
@@ -93,6 +93,9 @@
+
+
+
diff --git a/deps/XenServerJava/com/xensource/xenapi/Event.java b/deps/XenServerJava/com/xensource/xenapi/Event.java
index 82c4a3b876b..5eb9cc8cf47 100644
--- a/deps/XenServerJava/com/xensource/xenapi/Event.java
+++ b/deps/XenServerJava/com/xensource/xenapi/Event.java
@@ -33,20 +33,18 @@
package com.xensource.xenapi;
-import com.xensource.xenapi.Types.BadServerResponse;
-import com.xensource.xenapi.Types.VersionException;
-import com.xensource.xenapi.Types.XenAPIException;
-
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Date;
import java.util.HashMap;
-import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import org.apache.xmlrpc.XmlRpcException;
+import com.xensource.xenapi.Types.BadServerResponse;
+import com.xensource.xenapi.Types.XenAPIException;
+
/**
* Asynchronous event registration and handling
*
@@ -66,6 +64,7 @@ public class Event extends XenAPIObject {
this.ref = ref;
}
+ @Override
public String toWireString() {
return this.ref;
}
@@ -96,6 +95,7 @@ public class Event extends XenAPIObject {
* Represents all the fields in a Event
*/
public static class Record implements Types.Record {
+ @Override
public String toString() {
StringWriter writer = new StringWriter();
PrintWriter print = new PrintWriter(writer);
@@ -112,6 +112,7 @@ public class Event extends XenAPIObject {
/**
* Convert a event.Record to a Map
*/
+ @Override
public Map toMap() {
Map map = new HashMap();
map.put("id", this.id == null ? 0 : this.id);
diff --git a/server/src/com/cloud/agent/manager/AgentAttache.java b/server/src/com/cloud/agent/manager/AgentAttache.java
index 1c631d9837d..12934b18ece 100644
--- a/server/src/com/cloud/agent/manager/AgentAttache.java
+++ b/server/src/com/cloud/agent/manager/AgentAttache.java
@@ -237,6 +237,10 @@ public abstract class AgentAttache {
public int getQueueSize() {
return _requests.size();
}
+
+ public int getListenersSize() {
+ return _waitForList.size();
+ }
public boolean processAnswers(final long seq, final Response resp) {
resp.logD("Processing: ", true);
diff --git a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
index 91edfc2fb21..5e9f5f503ad 100644
--- a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
+++ b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
@@ -11,11 +11,19 @@ import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import javax.ejb.Local;
import javax.naming.ConfigurationException;
@@ -23,29 +31,40 @@ import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
+import com.cloud.agent.api.Answer;
import com.cloud.agent.api.CancelCommand;
import com.cloud.agent.api.ChangeAgentCommand;
import com.cloud.agent.api.Command;
+import com.cloud.agent.api.TransferAgentCommand;
import com.cloud.agent.transport.Request;
import com.cloud.agent.transport.Request.Version;
import com.cloud.agent.transport.Response;
import com.cloud.api.commands.UpdateHostPasswordCmd;
import com.cloud.cluster.ClusterManager;
import com.cloud.cluster.ClusterManagerListener;
+import com.cloud.cluster.ClusteredAgentRebalanceService;
import com.cloud.cluster.ManagementServerHostVO;
+import com.cloud.cluster.agentlb.AgentLoadBalancerPlanner;
+import com.cloud.cluster.agentlb.HostTransferMapVO;
+import com.cloud.cluster.agentlb.HostTransferMapVO.HostTransferState;
+import com.cloud.cluster.agentlb.dao.HostTransferMapDao;
import com.cloud.cluster.dao.ManagementServerHostDao;
import com.cloud.configuration.Config;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.exception.AgentUnavailableException;
+import com.cloud.exception.OperationTimedoutException;
import com.cloud.host.HostVO;
import com.cloud.host.Status;
import com.cloud.host.Status.Event;
import com.cloud.resource.ServerResource;
import com.cloud.storage.resource.DummySecondaryStorageResource;
import com.cloud.user.User;
+import com.cloud.utils.DateUtil;
import com.cloud.utils.NumbersUtil;
+import com.cloud.utils.component.Adapters;
import com.cloud.utils.component.ComponentLocator;
import com.cloud.utils.component.Inject;
+import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.GlobalLock;
import com.cloud.utils.db.Transaction;
@@ -53,22 +72,31 @@ import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.nio.Link;
import com.cloud.utils.nio.Task;
-@Local(value = { AgentManager.class })
-public class ClusteredAgentManagerImpl extends AgentManagerImpl implements ClusterManagerListener {
+@Local(value = { AgentManager.class, ClusteredAgentRebalanceService.class })
+public class ClusteredAgentManagerImpl extends AgentManagerImpl implements ClusterManagerListener, ClusteredAgentRebalanceService {
final static Logger s_logger = Logger.getLogger(ClusteredAgentManagerImpl.class);
+ private static final ScheduledExecutorService s_transferExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-AgentTransferExecutor"));
public final static long STARTUP_DELAY = 5000;
- public final static long SCAN_INTERVAL = 90000; // 90 seconds, it takes 60 sec for xenserver to fail login
- public final static int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds
+ public final static long SCAN_INTERVAL = 90000; // 90 seconds, it takes 60 sec for xenserver to fail login
+ public final static int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds
public long _loadSize = 100;
+ protected Set _agentToTransferIds = new HashSet();
+ private final long rebalanceTimeOut = 300000; // 5 mins - after this time remove the agent from the transfer list
- @Inject protected ClusterManager _clusterMgr = null;
+ @Inject
+ protected ClusterManager _clusterMgr = null;
protected HashMap _peers;
private final Timer _timer = new Timer("ClusteredAgentManager Timer");
@Inject
protected ManagementServerHostDao _mshostDao;
+ @Inject
+ protected HostTransferMapDao _hostTransferDao;
+
+ @Inject(adapter = AgentLoadBalancerPlanner.class)
+ protected Adapters _lbPlanners;
protected ClusteredAgentManagerImpl() {
super();
@@ -97,6 +125,13 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
return false;
}
_timer.schedule(new DirectAgentScanTimerTask(), STARTUP_DELAY, SCAN_INTERVAL);
+
+ // schedule transfer scan executor - if agent LB is enabled
+ if (_clusterMgr.isAgentRebalanceEnabled()) {
+ s_transferExecutor.scheduleAtFixedRate(getTransferScanTask(), ClusteredAgentRebalanceService.DEFAULT_TRANSFER_CHECK_INTERVAL, ClusteredAgentRebalanceService.DEFAULT_TRANSFER_CHECK_INTERVAL,
+ TimeUnit.MILLISECONDS);
+ }
+
return true;
}
@@ -121,20 +156,20 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
// for agents that are self-managed, threshold to be considered as disconnected is 3 ping intervals
- long cutSeconds = (System.currentTimeMillis() >> 10) - (_pingInterval*3);
- List hosts = _hostDao.findDirectAgentToLoad(_clusterMgr.getManagementNodeId(), cutSeconds, _loadSize);
- if ( hosts != null && hosts.size() == _loadSize ) {
- Long clusterId = hosts.get((int)(_loadSize-1)).getClusterId();
- if ( clusterId != null) {
- for ( int i = (int)(_loadSize-1); i > 0; i-- ) {
- if ( hosts.get(i).getClusterId() == clusterId ) {
+ long cutSeconds = (System.currentTimeMillis() >> 10) - (_pingInterval * 3);
+ List hosts = _hostDao.findDirectAgentToLoad(cutSeconds, _loadSize);
+ if (hosts != null && hosts.size() == _loadSize) {
+ Long clusterId = hosts.get((int) (_loadSize - 1)).getClusterId();
+ if (clusterId != null) {
+ for (int i = (int) (_loadSize - 1); i > 0; i--) {
+ if (hosts.get(i).getClusterId() == clusterId) {
hosts.remove(i);
} else {
break;
}
}
}
- }
+ }
if (hosts != null && hosts.size() > 0) {
for (HostVO host : hosts) {
try {
@@ -318,11 +353,11 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
return super.deleteHost(hostId, isForced, caller);
}
-
+
@Override
public boolean updateHostPassword(UpdateHostPasswordCmd upasscmd) {
if (upasscmd.getClusterId() == null) {
- //update agent attache password
+ // update agent attache password
try {
Boolean result = _clusterMgr.propagateAgentEvent(upasscmd.getHostId(), Event.UpdatePassword);
if (result != null) {
@@ -330,8 +365,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
} catch (AgentUnavailableException e) {
}
- }
- else {
+ } else {
// get agents for the cluster
List hosts = _hostDao.listByCluster(upasscmd.getClusterId());
for (HostVO h : hosts) {
@@ -343,7 +377,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
} catch (AgentUnavailableException e) {
}
}
- }
+ }
return super.updateHostPassword(upasscmd);
}
@@ -507,6 +541,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
}
_timer.cancel();
+ s_transferExecutor.shutdownNow();
return super.stop();
}
@@ -586,6 +621,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
cancel(Long.toString(Request.getManagementServerId(data)), hostId, Request.getSequence(data), e.getMessage());
}
} else {
+
long mgmtId = Request.getManagementServerId(data);
if (mgmtId != -1 && mgmtId != _nodeId) {
routeToPeer(Long.toString(mgmtId), data);
@@ -624,7 +660,6 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
@Override
public void onManagementNodeJoined(List nodeList, long selfNodeId) {
-
}
@Override
@@ -638,4 +673,264 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
@Override
public void onManagementNodeIsolated() {
}
+
+ @Override
+ public void removeAgent(AgentAttache attache, Status nextState) {
+ if (attache == null) {
+ return;
+ }
+
+ super.removeAgent(attache, nextState);
+ }
+
+ @Override
+ public boolean executeRebalanceRequest(long agentId, Event event) throws AgentUnavailableException, OperationTimedoutException {
+ if (event == Event.RequestAgentRebalance) {
+ return setToWaitForRebalance(agentId);
+ } else if (event == Event.StartAgentRebalance) {
+ return rebalanceHost(agentId);
+ }
+
+ return true;
+ }
+
+ @Override
+ public void startRebalanceAgents() {
+ Date cutTime = DateUtil.currentGMTTime();
+ List activeNodes = _mshostDao.getActiveList(new Date(cutTime.getTime() - ClusterManager.DEFAULT_HEARTBEAT_THRESHOLD));
+ List allManagedAgents = _hostDao.listManagedAgents();
+
+ long avLoad = 0L;
+
+ if (!allManagedAgents.isEmpty() && !activeNodes.isEmpty()) {
+ avLoad = allManagedAgents.size() / activeNodes.size();
+ } else {
+ return;
+ }
+
+ for (ManagementServerHostVO node : activeNodes) {
+ if (node.getMsid() != _nodeId) {
+
+ List hostsToRebalance = new ArrayList();
+ for (AgentLoadBalancerPlanner lbPlanner : _lbPlanners) {
+ hostsToRebalance = lbPlanner.getHostsToRebalance(node.getMsid(), avLoad);
+ if (!hostsToRebalance.isEmpty()) {
+ break;
+ }
+ }
+
+ if (!hostsToRebalance.isEmpty()) {
+ //TODO - execute rebalance for every host; right now we are doing it for (0) host just for testing
+ for (HostVO host : hostsToRebalance) {
+ long hostId = host.getId();
+ s_logger.debug("Asking management server " + node.getMsid() + " to give away host id=" + hostId);
+ boolean result = true;
+ HostTransferMapVO transfer = _hostTransferDao.startAgentTransfering(hostId, _nodeId, node.getMsid());
+ try {
+ Answer[] answer = sendRebalanceCommand(hostId, _nodeId, Event.RequestAgentRebalance);
+ if (answer == null) {
+ s_logger.warn("Failed to get host id=" + hostId + " from management server " + node.getMsid());
+ result = false;
+ }
+ } catch (Exception ex) {
+ s_logger.warn("Failed to get host id=" + hostId + " from management server " + node.getMsid(), ex);
+ result = false;
+ } finally {
+ HostTransferMapVO updatedTransfer = _hostTransferDao.findById(transfer.getId());
+ if (!result && updatedTransfer.getState() == HostTransferState.TransferRequested) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Removing mapping from op_host_transfer as it failed to be set to transfer mode");
+ }
+ //just remove the mapping as nothing was done on the peer management server yet
+ _hostTransferDao.remove(transfer.getId());
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private Answer[] sendRebalanceCommand(long agentId, long peer, Event event) {
+ TransferAgentCommand transfer = new TransferAgentCommand(agentId, peer, event);
+ Commands commands = new Commands(OnError.Stop);
+ commands.addCommand(transfer);
+
+ Command[] cmds = commands.toCommands();
+
+ String peerName = Long.toString(peer);
+
+ try {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Forwarding " + cmds[0].toString() + " to " + peer);
+ }
+ Answer[] answers = _clusterMgr.execute(peerName, agentId, cmds, true);
+ return answers;
+ } catch (Exception e) {
+ s_logger.warn("Caught exception while talking to " + peer, e);
+ return null;
+ }
+ }
+
+ private Runnable getTransferScanTask() {
+ return new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // TODO - change to trace level later on
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Clustered agent transfer scan check, management server id:" + _nodeId);
+ }
+
+ if (_agentToTransferIds.size() > 0) {
+ s_logger.debug("Found " + _agentToTransferIds.size() + " agents to transfer");
+ for (Long hostId : _agentToTransferIds) {
+ AgentAttache attache = findAttache(hostId);
+ if (attache.getQueueSize() == 0 && attache.getListenersSize() == 0) {
+ boolean result = true;
+ _agentToTransferIds.remove(hostId);
+ try {
+ result = rebalanceHost(hostId);
+ } finally {
+ if (result) {
+ finishRebalance(hostId, Event.RebalanceCompleted);
+ } else {
+ finishRebalance(hostId, Event.RebalanceFailed);
+ }
+ }
+ } else {
+ // if we timed out waiting for the host to reconnect, remove host from rebalance list and mark it as failed to rebalance
+ // no need to do anything with the real attache
+ Date cutTime = DateUtil.currentGMTTime();
+ if (!(_hostTransferDao.isActive(hostId, new Date(cutTime.getTime() - rebalanceTimeOut)))) {
+ s_logger.debug("Timed out waiting for the host id=" + hostId + " to be ready to transfer, failing rebalance for this host");
+ _agentToTransferIds.remove(hostId);
+ HostTransferMapVO transferMap = _hostTransferDao.findById(hostId);
+ transferMap.setState(HostTransferState.TransferFailed);
+ _hostTransferDao.update(hostId, transferMap);
+ }
+ }
+ }
+ } else {
+ // TODO - change to trace level later on
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Found no agents to be transfered by the management server " + _nodeId);
+ }
+ }
+
+ } catch (Throwable e) {
+ s_logger.error("Problem with the clustered agent transfer scan check!", e);
+ }
+ }
+ };
+ }
+
+
+ private boolean setToWaitForRebalance(final long hostId) {
+ s_logger.debug("Adding agent " + hostId + " to the list of agents to transfer");
+ synchronized (_agentToTransferIds) {
+ return _agentToTransferIds.add(hostId);
+ }
+ }
+
+
+ private boolean rebalanceHost(final long hostId) {
+ HostTransferMapVO map = _hostTransferDao.findById(hostId);
+ HostVO host = _hostDao.findById(hostId);
+
+ boolean result = true;
+ if (map.getInitialOwner() == _nodeId) {
+ ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache)findAttache(hostId);
+
+ if (attache != null && !attache.getTransferMode()) {
+ attache.setTransferMode(true);
+ s_logger.debug("Putting agent id=" + hostId + " to transfer mode");
+ _agents.put(hostId, attache);
+ if (host != null && host.getRemoved() == null) {
+ host.setManagementServerId(null);
+ s_logger.debug("Updating host id=" + hostId + " with the status " + Status.Rebalance);
+ _hostDao.updateStatus(host, Event.StartAgentRebalance, _nodeId);
+ }
+
+ try {
+ Answer[] answer = sendRebalanceCommand(hostId, map.getFutureOwner(), Event.StartAgentRebalance);
+ if (answer == null) {
+ s_logger.warn("Host " + hostId + " failed to connect to the management server " + map.getFutureOwner() + " as a part of rebalance process");
+ result = false;
+ }
+ } catch (Exception ex) {
+ s_logger.warn("Host " + hostId + " failed to connect to the management server " + map.getFutureOwner() + " as a part of rebalance process", ex);
+ result = false;
+ }
+ if (result) {
+ s_logger.debug("Got host id=" + hostId + " from management server " + map.getFutureOwner());
+ }
+
+ }
+ } else if (map.getFutureOwner() == _nodeId) {
+ try {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Loading directly connected host " + host.getId() + "(" + host.getName() + ") as a part of rebalance process");
+ }
+ //TODO - 1) no need to do vmfullSync/storageSetup on the agent side 2) Make sure that if connection fails, host goes from Rebalance state to Alert
+ loadDirectlyConnectedHost(host);
+ } catch (Exception ex) {
+ s_logger.warn("Unable to load directly connected host " + host.getId() + " as a part of rebalance due to exception: ", ex);
+ }
+ }
+
+ return result;
+
+ }
+
+ private boolean finishRebalance(final long hostId, Event event) {
+ HostTransferMapVO map = _hostTransferDao.findById(hostId);
+ AgentAttache attache = findAttache(hostId);
+
+ if (attache == null) {
+ s_logger.debug("Unable to find attache for the host id=" + hostId + ", assuming that the agent disconnected already");
+ HostTransferState state = (event == Event.RebalanceCompleted) ? HostTransferState.TransferCompleted : HostTransferState.TransferFailed;
+ map.setState(state);
+ _hostTransferDao.update(hostId, map);
+ return true;
+ }
+
+ if (map.getInitialOwner() != _nodeId) {
+ s_logger.warn("Why finish rebalance called not by initial host owner???");
+ return false;
+ }
+
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Finishing rebalancing for the host id=" + hostId);
+ }
+
+ if (event == Event.RebalanceFailed) {
+ ((ClusteredDirectAgentAttache) attache).setTransferMode(false);
+ s_logger.debug("Rebalance failed for the host id=" + hostId);
+ map.setState(HostTransferState.TransferFailed);
+ _hostTransferDao.update(hostId, map);
+ } else if (event == Event.RebalanceCompleted) {
+
+ //1) Get all the requests remove transfer attache
+ LinkedList requests = ((ClusteredDirectAgentAttache) attache).getRequests();
+ removeAgent(attache, Status.Rebalance);
+
+ //2) Create forward attache
+ createAttache(hostId);
+
+ //3) forward all the requests to the management server which owns the host now
+ if (!requests.isEmpty()) {
+ for (Request request : requests) {
+ routeToPeer(Long.toString(map.getFutureOwner()), request.getBytes());
+ }
+ }
+
+ map.setState(HostTransferState.TransferCompleted);
+ _hostTransferDao.update(hostId, map);
+
+ return true;
+
+ }
+ return true;
+ }
}
diff --git a/server/src/com/cloud/agent/manager/ClusteredDirectAgentAttache.java b/server/src/com/cloud/agent/manager/ClusteredDirectAgentAttache.java
index 6b8c38f0328..58ef0ef73f2 100644
--- a/server/src/com/cloud/agent/manager/ClusteredDirectAgentAttache.java
+++ b/server/src/com/cloud/agent/manager/ClusteredDirectAgentAttache.java
@@ -17,7 +17,13 @@
*/
package com.cloud.agent.manager;
+import java.util.LinkedList;
+
+import org.apache.log4j.Logger;
+
import com.cloud.agent.AgentManager;
+import com.cloud.agent.Listener;
+import com.cloud.agent.api.Command;
import com.cloud.agent.transport.Request;
import com.cloud.agent.transport.Response;
import com.cloud.exception.AgentUnavailableException;
@@ -26,15 +32,36 @@ import com.cloud.resource.ServerResource;
import com.cloud.utils.exception.CloudRuntimeException;
public class ClusteredDirectAgentAttache extends DirectAgentAttache implements Routable {
+ private final static Logger s_logger = Logger.getLogger(ClusteredDirectAgentAttache.class);
private final ClusteredAgentManagerImpl _mgr;
private final long _nodeId;
-
+ private boolean _transferMode = false;
+
public ClusteredDirectAgentAttache(AgentManager agentMgr, long id, long mgmtId, ServerResource resource, boolean maintenance, ClusteredAgentManagerImpl mgr) {
super(agentMgr, id, resource, maintenance, mgr);
_mgr = mgr;
_nodeId = mgmtId;
}
-
+
+ public synchronized void setTransferMode(final boolean transfer) {
+ _transferMode = transfer;
+ }
+
+ @Override
+ protected void checkAvailability(final Command[] cmds) throws AgentUnavailableException {
+
+ if (_transferMode) {
+ // need to throw some other exception while agent is in rebalancing mode
+ for (final Command cmd : cmds) {
+ if (!cmd.allowCaching()) {
+ throw new AgentUnavailableException("Unable to send " + cmd.getClass().toString() + " because agent is in Rebalancing mode", _id);
+ }
+ }
+ }
+
+ super.checkAvailability(cmds);
+ }
+
@Override
public void routeToAgent(byte[] data) throws AgentUnavailableException {
Request req;
@@ -45,14 +72,14 @@ public class ClusteredDirectAgentAttache extends DirectAgentAttache implements R
} catch (UnsupportedVersionException e) {
throw new CloudRuntimeException("Unable to rout to an agent ", e);
}
-
+
if (req instanceof Response) {
- super.process(((Response)req).getAnswers());
+ super.process(((Response) req).getAnswers());
} else {
super.send(req);
}
}
-
+
@Override
public boolean processAnswers(long seq, Response response) {
long mgmtId = response.getManagementServerId();
@@ -66,4 +93,37 @@ public class ClusteredDirectAgentAttache extends DirectAgentAttache implements R
return super.processAnswers(seq, response);
}
}
+
+ @Override
+ public void send(Request req, final Listener listener) throws AgentUnavailableException {
+ checkAvailability(req.getCommands());
+
+ if (_transferMode) {
+ long seq = req.getSequence();
+
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug(log(seq, "Holding request as the corresponding agent is in transfer mode: "));
+ }
+
+ synchronized (this) {
+ addRequest(req);
+ return;
+ }
+ } else {
+ super.send(req, listener);
+ }
+ }
+
+ public boolean getTransferMode() {
+ return _transferMode;
+ }
+
+ public LinkedList getRequests() {
+ if (_transferMode) {
+ return _requests;
+ } else {
+ return null;
+ }
+ }
+
}
diff --git a/server/src/com/cloud/cluster/ClusterManager.java b/server/src/com/cloud/cluster/ClusterManager.java
index 69c2f19f663..c400558a4c1 100644
--- a/server/src/com/cloud/cluster/ClusterManager.java
+++ b/server/src/com/cloud/cluster/ClusterManager.java
@@ -61,5 +61,9 @@ public interface ClusterManager extends Manager {
* @param agentId agent id this broadcast is regarding
* @param cmds commands to broadcast
*/
- public void broadcast(long agentId, Command[] cmds);
+ public void broadcast(long agentId, Command[] cmds);
+
+ boolean rebalanceAgent(long agentId, Event event) throws AgentUnavailableException, OperationTimedoutException;
+
+ boolean isAgentRebalanceEnabled();
}
diff --git a/server/src/com/cloud/cluster/ClusterManagerImpl.java b/server/src/com/cloud/cluster/ClusterManagerImpl.java
index 1ce22e14125..257fd074288 100644
--- a/server/src/com/cloud/cluster/ClusterManagerImpl.java
+++ b/server/src/com/cloud/cluster/ClusterManagerImpl.java
@@ -50,7 +50,12 @@ import com.cloud.agent.api.Answer;
import com.cloud.agent.api.ChangeAgentCommand;
import com.cloud.agent.api.Command;
import com.cloud.agent.manager.Commands;
+import com.cloud.cluster.ManagementServerHost.State;
+import com.cloud.cluster.agentlb.HostTransferMapVO;
+import com.cloud.cluster.agentlb.HostTransferMapVO.HostTransferState;
+import com.cloud.cluster.agentlb.dao.HostTransferMapDao;
import com.cloud.cluster.dao.ManagementServerHostDao;
+import com.cloud.configuration.Config;
import com.cloud.configuration.dao.ConfigurationDao;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.exception.OperationTimedoutException;
@@ -64,6 +69,7 @@ import com.cloud.utils.Profiler;
import com.cloud.utils.PropertiesUtil;
import com.cloud.utils.component.Adapters;
import com.cloud.utils.component.ComponentLocator;
+import com.cloud.utils.component.Inject;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.Transaction;
@@ -74,11 +80,13 @@ import com.cloud.utils.mgmt.JmxUtil;
import com.cloud.utils.net.NetUtils;
import com.google.gson.Gson;
-@Local(value={ClusterManager.class})
+@Local(value = { ClusterManager.class })
public class ClusterManagerImpl implements ClusterManager {
private static final Logger s_logger = Logger.getLogger(ClusterManagerImpl.class);
- private static final int EXECUTOR_SHUTDOWN_TIMEOUT = 1000; // 1 second
+
+ private static final int EXECUTOR_SHUTDOWN_TIMEOUT = 1000; // 1 second
+
private final List listeners = new ArrayList();
private final Map activePeers = new HashMap();
@@ -90,11 +98,12 @@ public class ClusterManagerImpl implements ClusterManager {
private final Gson gson;
private AgentManager _agentMgr;
+ @Inject
+ private ClusteredAgentRebalanceService _rebalanceService;
private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-Heartbeat"));
-
private final ExecutorService _notificationExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("Cluster-Notification"));
- private List _notificationMsgs = new ArrayList();
+ private final List _notificationMsgs = new ArrayList();
private Connection _heartbeatConnection = null;
private final ExecutorService _executor;
@@ -103,6 +112,7 @@ public class ClusterManagerImpl implements ClusterManager {
private ManagementServerHostDao _mshostDao;
private HostDao _hostDao;
+ private HostTransferMapDao _hostTransferDao;
//
// pay attention to _mshostId and _msid
@@ -110,13 +120,16 @@ public class ClusterManagerImpl implements ClusterManager {
// _msid is the unique persistent identifier that peer name is based upon
//
private Long _mshostId = null;
- protected long _msid = ManagementServerNode.getManagementServerId();
+ protected long _msId = ManagementServerNode.getManagementServerId();
protected long _runId = System.currentTimeMillis();
private boolean _peerScanInited = false;
private String _name;
private String _clusterNodeIP = "127.0.0.1";
+ private boolean _agentLBEnabled = false;
+ private State _state = State.Starting;
+ private final Object stateLock = new Object();
public ClusterManagerImpl() {
clusterPeers = new HashMap();
@@ -131,20 +144,18 @@ public class ClusterManagerImpl implements ClusterManager {
}
@Override
- public Answer[] sendToAgent(Long hostId, Command [] cmds, boolean stopOnError)
- throws AgentUnavailableException, OperationTimedoutException {
+ public Answer[] sendToAgent(Long hostId, Command[] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException {
Commands commands = new Commands(stopOnError ? OnError.Stop : OnError.Continue);
- for (Command cmd : cmds) {
+ for (Command cmd : cmds) {
commands.addCommand(cmd);
}
return _agentMgr.send(hostId, commands);
}
@Override
- public long sendToAgent(Long hostId, Command[] cmds, boolean stopOnError, Listener listener)
- throws AgentUnavailableException {
+ public long sendToAgent(Long hostId, Command[] cmds, boolean stopOnError, Listener listener) throws AgentUnavailableException {
Commands commands = new Commands(stopOnError ? OnError.Stop : OnError.Continue);
- for (Command cmd : cmds) {
+ for (Command cmd : cmds) {
commands.addCommand(cmd);
}
return _agentMgr.send(hostId, commands, listener);
@@ -163,7 +174,7 @@ public class ClusterManagerImpl implements ClusterManager {
}
if (s_logger.isDebugEnabled()) {
- s_logger.debug("Propagating agent change request event:" + event.toString() + " to agent:"+ agentId);
+ s_logger.debug("Propagating agent change request event:" + event.toString() + " to agent:" + agentId);
}
Command[] cmds = new Command[1];
cmds[0] = new ChangeAgentCommand(agentId, event);
@@ -182,13 +193,14 @@ public class ClusterManagerImpl implements ClusterManager {
/**
* called by DatabaseUpgradeChecker to see if there are other peers running.
- * @param notVersion If version is passed in, the peers CANNOT be running at this
- * version. If version is null, return true if any peer is
- * running regardless of version.
+ *
+ * @param notVersion
+ * If version is passed in, the peers CANNOT be running at this version. If version is null, return true if any
+ * peer is running regardless of version.
* @return true if there are peers running and false if not.
*/
public static final boolean arePeersRunning(String notVersion) {
- return false; //TODO: Leaving this for Kelven to take care of.
+ return false; // TODO: Leaving this for Kelven to take care of.
}
@Override
@@ -199,7 +211,7 @@ public class ClusterManagerImpl implements ClusterManager {
for (ManagementServerHostVO peer : peers) {
String peerName = Long.toString(peer.getMsid());
if (getSelfPeerName().equals(peerName)) {
- continue; // Skip myself.
+ continue; // Skip myself.
}
try {
if (s_logger.isDebugEnabled()) {
@@ -248,11 +260,9 @@ public class ClusterManagerImpl implements ClusterManager {
s_logger.error("Exception on parsing gson package from remote call to " + strPeer);
}
}
-
return null;
} catch (RemoteException e) {
invalidatePeerService(strPeer);
-
if(s_logger.isInfoEnabled()) {
s_logger.info("Exception on remote execution, peer: " + strPeer + ", iteration: "
+ i + ", exception message :" + e.getMessage());
@@ -266,7 +276,6 @@ public class ClusterManagerImpl implements ClusterManager {
@Override
public long executeAsync(String strPeer, long agentId, Command[] cmds, boolean stopOnError, Listener listener) {
-
ClusterService peerService = null;
if(s_logger.isDebugEnabled()) {
@@ -280,7 +289,6 @@ public class ClusterManagerImpl implements ClusterManager {
} catch (RemoteException e) {
s_logger.error("Unable to get cluster service on peer : " + strPeer);
}
-
if(peerService != null) {
try {
long seq = 0;
@@ -291,7 +299,6 @@ public class ClusterManagerImpl implements ClusterManager {
long startTick = System.currentTimeMillis();
seq = peerService.executeAsync(getSelfPeerName(), agentId, gson.toJson(cmds, Command[].class), stopOnError);
-
if(seq > 0) {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Completed Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId
@@ -321,7 +328,6 @@ public class ClusterManagerImpl implements ClusterManager {
@Override
public boolean onAsyncResult(String executingPeer, long agentId, long seq, Answer[] answers) {
-
if(s_logger.isDebugEnabled()) {
s_logger.debug("Process Async-call result from remote peer " + executingPeer + ", {" +
agentId + "-" + seq + "} answers: " + (answers != null ? gson.toJson(answers, Answer[].class): "null"));
@@ -373,7 +379,6 @@ public class ClusterManagerImpl implements ClusterManager {
@Override
public boolean forwardAnswer(String targetPeer, long agentId, long seq, Answer[] answers) {
-
if(s_logger.isDebugEnabled()) {
s_logger.debug("Forward -> " + targetPeer + " Async-call answer {" + agentId + "-" + seq +
"} " + (answers != null? gson.toJson(answers, Answer[].class):""));
@@ -444,7 +449,7 @@ public class ClusterManagerImpl implements ClusterManager {
@Override
public String getSelfPeerName() {
- return Long.toString(_msid);
+ return Long.toString(_msId);
}
@Override
@@ -455,15 +460,13 @@ public class ClusterManagerImpl implements ClusterManager {
@Override
public void registerListener(ClusterManagerListener listener) {
// Note : we don't check duplicates
- synchronized(listeners) {
- listeners.add(listener);
+ synchronized (listeners) {
}
}
@Override
public void unregisterListener(ClusterManagerListener listener) {
synchronized(listeners) {
- listeners.remove(listener);
}
}
@@ -571,11 +574,22 @@ public class ClusterManagerImpl implements ClusterManager {
Connection conn = getHeartbeatConnection();
_mshostDao.update(conn, _mshostId, getCurrentRunId(), DateUtil.currentGMTTime());
- if(s_logger.isTraceEnabled()) {
+ // for cluster in Starting state check if there are any agents being transfered
+ if (_state == State.Starting) {
+ synchronized (stateLock) {
+ if (isClusterReadyToStart()) {
+ _mshostDao.update(conn, _mshostId, getCurrentRunId(), State.Up, DateUtil.currentGMTTime());
+ _state = State.Up;
+ stateLock.notifyAll();
+ }
+ }
+ }
+
+ if (s_logger.isTraceEnabled()) {
s_logger.trace("Cluster manager peer-scan, id:" + _mshostId);
}
- if(!_peerScanInited) {
+ if (!_peerScanInited) {
_peerScanInited = true;
initPeerScan(conn);
}
@@ -604,14 +618,41 @@ public class ClusterManagerImpl implements ClusterManager {
s_logger.error("Problem with the cluster heartbeat!", e);
}
}
- };
+
+ private boolean isClusterReadyToStart() {
+ boolean isReady = false;
+ int transferCount = _hostTransferDao.listHostsJoiningCluster(_msId).size();
+ if (transferCount == 0) {
+ //Check how many servers got transfered successfully
+ List rebalancedHosts = _hostTransferDao.listBy(_msId, HostTransferState.TransferCompleted);
+ s_logger.debug(rebalancedHosts.size() + " hosts joined the cluster " + _msId + " as a result of rebalance process");
+ for (HostTransferMapVO host : rebalancedHosts) {
+ _hostTransferDao.remove(host.getId());
+ }
+
+ //Check how many servers failed to transfer
+ List failedToRebalanceHosts = _hostTransferDao.listBy(_msId, HostTransferState.TransferFailed);
+ s_logger.debug(failedToRebalanceHosts.size() + " hosts failed to join the cluster " + _msId + " as a result of rebalance process");
+ for (HostTransferMapVO host : failedToRebalanceHosts) {
+ _hostTransferDao.remove(host.getId());
+ }
+
+ s_logger.debug("There are no hosts currently joining cluser msid=" + _msId + ", so management server is ready to start");
+ isReady = true;
+ } else if (s_logger.isDebugEnabled()) {
+ //TODO : change to trace mode later
+ s_logger.debug("There are " + transferCount + " agents currently joinging the cluster " + _msId);
+ }
+
+ return isReady;
+ }
+ };
}
private boolean isRootCauseConnectionRelated(Throwable e) {
- while(e != null) {
- if(e instanceof com.mysql.jdbc.CommunicationsException || e instanceof com.mysql.jdbc.exceptions.jdbc4.CommunicationsException) {
+ while (e != null) {
+ if (e instanceof com.mysql.jdbc.CommunicationsException || e instanceof com.mysql.jdbc.exceptions.jdbc4.CommunicationsException)
return true;
- }
e = e.getCause();
}
@@ -857,7 +898,7 @@ public class ClusterManagerImpl implements ClusterManager {
@Override @DB
public boolean start() {
if(s_logger.isInfoEnabled()) {
- s_logger.info("Starting cluster manager, msid : " + _msid);
+ s_logger.info("Starting cluster manager, msid : " + _msId);
}
Transaction txn = Transaction.currentTxn();
@@ -867,10 +908,10 @@ public class ClusterManagerImpl implements ClusterManager {
final Class> c = this.getClass();
String version = c.getPackage().getImplementationVersion();
- ManagementServerHostVO mshost = _mshostDao.findByMsid(_msid);
- if(mshost == null) {
+ ManagementServerHostVO mshost = _mshostDao.findByMsid(_msId);
+ if (mshost == null) {
mshost = new ManagementServerHostVO();
- mshost.setMsid(_msid);
+ mshost.setMsid(_msId);
mshost.setRunid(this.getCurrentRunId());
mshost.setName(NetUtils.getHostName());
mshost.setVersion(version);
@@ -879,32 +920,49 @@ public class ClusterManagerImpl implements ClusterManager {
mshost.setLastUpdateTime(DateUtil.currentGMTTime());
mshost.setRemoved(null);
mshost.setAlertCount(0);
- mshost.setState(ManagementServerNode.State.Up);
+ mshost.setState(ManagementServerHost.State.Starting);
_mshostDao.persist(mshost);
- if(s_logger.isInfoEnabled()) {
- s_logger.info("New instance of management server msid " + _msid + " is being started");
+ if (s_logger.isInfoEnabled()) {
+ s_logger.info("New instance of management server msid " + _msId + " is being started");
}
} else {
- if(s_logger.isInfoEnabled()) {
- s_logger.info("Management server " + _msid + " is being started");
+ if (s_logger.isInfoEnabled()) {
+ s_logger.info("Management server " + _msId + " is being started");
}
- _mshostDao.update(mshost.getId(), getCurrentRunId(), NetUtils.getHostName(), version,
- _clusterNodeIP, _currentServiceAdapter.getServicePort(), DateUtil.currentGMTTime());
+ _mshostDao.update(mshost.getId(), getCurrentRunId(), NetUtils.getHostName(), version, _clusterNodeIP, _currentServiceAdapter.getServicePort(), DateUtil.currentGMTTime());
}
txn.commit();
_mshostId = mshost.getId();
- if(s_logger.isInfoEnabled()) {
- s_logger.info("Management server (host id : " + _mshostId + ") is available at " + _clusterNodeIP + ":" + _currentServiceAdapter.getServicePort());
+ if (s_logger.isInfoEnabled()) {
+ s_logger.info("Management server (host id : " + _mshostId + ") is being started at " + _clusterNodeIP + ":" + _currentServiceAdapter.getServicePort());
+ }
+
+ // use seperate thread for heartbeat updates
+ _heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS);
+ _notificationExecutor.submit(getNotificationTask());
+
+ // Do agent rebalancing
+ if (_agentLBEnabled) {
+ s_logger.debug("Management server " + _msId + " is asking other peers to rebalance their agents");
+ _rebalanceService.startRebalanceAgents();
}
- // use seperated thread for heartbeat updates
- _heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), heartbeatInterval,
- heartbeatInterval, TimeUnit.MILLISECONDS);
- _notificationExecutor.submit(getNotificationTask());
+ //wait here for heartbeat task to update the host state
+ try {
+ synchronized (stateLock) {
+ while (_state != State.Up) {
+ stateLock.wait();
+ }
+ }
+ } catch (final InterruptedException e) {
+ } finally {
+ s_logger.debug("Agent rebalancing is completed, management server " + _mshostId + " is ready");
+ }
+
} catch (Throwable e) {
s_logger.error("Unexpected exception : ", e);
txn.rollback();
@@ -912,8 +970,8 @@ public class ClusterManagerImpl implements ClusterManager {
throw new CloudRuntimeException("Unable to initialize cluster info into database");
}
- if(s_logger.isInfoEnabled()) {
- s_logger.info("Cluster manager is started");
+ if (s_logger.isInfoEnabled()) {
+ s_logger.info("Cluster manager was started successfully");
}
return true;
@@ -955,15 +1013,20 @@ public class ClusterManagerImpl implements ClusterManager {
}
_mshostDao = locator.getDao(ManagementServerHostDao.class);
- if(_mshostDao == null) {
+ if (_mshostDao == null) {
throw new ConfigurationException("Unable to get " + ManagementServerHostDao.class.getName());
}
_hostDao = locator.getDao(HostDao.class);
- if(_hostDao == null) {
+ if (_hostDao == null) {
throw new ConfigurationException("Unable to get " + HostDao.class.getName());
}
+ _hostTransferDao = locator.getDao(HostTransferMapDao.class);
+ if (_hostTransferDao == null) {
+ throw new ConfigurationException("Unable to get agent transfer map dao");
+ }
+
ConfigurationDao configDao = locator.getDao(ConfigurationDao.class);
if (configDao == null) {
throw new ConfigurationException("Unable to get the configuration dao.");
@@ -972,12 +1035,12 @@ public class ClusterManagerImpl implements ClusterManager {
Map configs = configDao.getConfiguration("management-server", params);
String value = configs.get("cluster.heartbeat.interval");
- if(value != null) {
+ if (value != null) {
heartbeatInterval = NumbersUtil.parseInt(value, ClusterManager.DEFAULT_HEARTBEAT_INTERVAL);
}
value = configs.get("cluster.heartbeat.threshold");
- if(value != null) {
+ if (value != null) {
heartbeatThreshold = NumbersUtil.parseInt(value, ClusterManager.DEFAULT_HEARTBEAT_THRESHOLD);
}
@@ -991,7 +1054,7 @@ public class ClusterManagerImpl implements ClusterManager {
throw new ConfigurationException("Unable to load db.properties content");
}
_clusterNodeIP = dbProps.getProperty("cluster.node.IP");
- if(_clusterNodeIP == null) {
+ if (_clusterNodeIP == null) {
_clusterNodeIP = "127.0.0.1";
}
_clusterNodeIP = _clusterNodeIP.trim();
@@ -1016,6 +1079,9 @@ public class ClusterManagerImpl implements ClusterManager {
if(_currentServiceAdapter == null) {
throw new ConfigurationException("Unable to set current cluster service adapter");
}
+
+
+ _agentLBEnabled = Boolean.valueOf(configDao.getValue(Config.AgentLbEnable.key()));
checkConflicts();
@@ -1027,7 +1093,7 @@ public class ClusterManagerImpl implements ClusterManager {
@Override
public long getManagementNodeId() {
- return _msid;
+ return _msId;
}
@Override
@@ -1124,11 +1190,23 @@ public class ClusterManagerImpl implements ClusterManager {
s_logger.error(msg);
throw new ConfigurationException(msg);
} else {
- String msg = "Detected that another management node with the same IP " + peer.getServiceIP() + " is considered as running in DB, however it is not pingable, we will continue cluster initialization with this management server node";
+ String msg = "Detected that another management node with the same IP " + peer.getServiceIP()
+ + " is considered as running in DB, however it is not pingable, we will continue cluster initialization with this management server node";
s_logger.info(msg);
}
}
}
}
}
+
+ @Override
+ public boolean rebalanceAgent(long agentId, Event event) throws AgentUnavailableException, OperationTimedoutException {
+ return _rebalanceService.executeRebalanceRequest(agentId, event);
+ }
+
+ @Override
+ public boolean isAgentRebalanceEnabled() {
+ return _agentLBEnabled;
+ }
+
}
diff --git a/server/src/com/cloud/cluster/ClusterManagerMessage.java b/server/src/com/cloud/cluster/ClusterManagerMessage.java
index a1b2d0ec4e0..a2c599be8be 100644
--- a/server/src/com/cloud/cluster/ClusterManagerMessage.java
+++ b/server/src/com/cloud/cluster/ClusterManagerMessage.java
@@ -42,4 +42,5 @@ public class ClusterManagerMessage {
public List getNodes() {
return _nodes;
}
+
}
diff --git a/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java b/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java
index 68737cd48fc..1347825c1f8 100644
--- a/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java
+++ b/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java
@@ -17,7 +17,7 @@
*/
package com.cloud.cluster;
-
+
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URLDecoder;
@@ -38,6 +38,7 @@ import com.cloud.agent.api.Answer;
import com.cloud.agent.api.ChangeAgentAnswer;
import com.cloud.agent.api.ChangeAgentCommand;
import com.cloud.agent.api.Command;
+import com.cloud.agent.api.TransferAgentCommand;
import com.cloud.exception.AgentUnavailableException;
import com.cloud.exception.OperationTimedoutException;
import com.cloud.serializer.GsonHelper;
@@ -204,6 +205,29 @@ public class ClusterServiceServletHttpHandler implements HttpRequestHandler {
Answer[] answers = new Answer[1];
answers[0] = new ChangeAgentAnswer(cmd, result);
return gson.toJson(answers);
+ } else if (cmds.length == 1 && cmds[0] instanceof TransferAgentCommand) {
+ TransferAgentCommand cmd = (TransferAgentCommand) cmds[0];
+
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Intercepting command for agent rebalancing: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
+ }
+ boolean result = false;
+ try {
+ result = manager.rebalanceAgent(cmd.getAgentId(), cmd.getEvent());
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Result is " + result);
+ }
+
+ } catch (AgentUnavailableException e) {
+ s_logger.warn("Agent is unavailable", e);
+ return null;
+ } catch (OperationTimedoutException e) {
+ s_logger.warn("Operation timed out", e);
+ return null;
+ }
+ Answer[] answers = new Answer[1];
+ answers[0] = new Answer(cmd, result, null);
+ return gson.toJson(answers);
}
try {
diff --git a/server/src/com/cloud/cluster/ClusterServiceServletImpl.java b/server/src/com/cloud/cluster/ClusterServiceServletImpl.java
index 9a4bbaed71a..af86457b9dc 100644
--- a/server/src/com/cloud/cluster/ClusterServiceServletImpl.java
+++ b/server/src/com/cloud/cluster/ClusterServiceServletImpl.java
@@ -35,10 +35,10 @@ public class ClusterServiceServletImpl implements ClusterService {
private static final Logger s_logger = Logger.getLogger(ClusterServiceServletImpl.class);
- private String serviceUrl;
-
- private Gson gson;
+ private String serviceUrl;
+ private final Gson gson;
+
public ClusterServiceServletImpl() {
gson = GsonHelper.getGson();
}
@@ -94,7 +94,7 @@ public class ClusterServiceServletImpl implements ClusterService {
s_logger.error("Unable to parse executeAsync return : " + result);
throw new RemoteException("Invalid result returned from async-execution on peer : " + serviceUrl);
}
- }
+ }
@Override
public boolean onAsyncResult(String executingPeer, long agentId, long seq, String gsonPackage) throws RemoteException {
@@ -102,8 +102,7 @@ public class ClusterServiceServletImpl implements ClusterService {
s_logger.debug("Forward Async-call answer to remote listener, agent: " + agentId
+ ", excutingPeer: " + executingPeer
+ ", seq: " + seq + ", gsonPackage: " + gsonPackage);
- }
-
+ }
HttpClient client = new HttpClient();
PostMethod method = new PostMethod(serviceUrl);
@@ -111,7 +110,7 @@ public class ClusterServiceServletImpl implements ClusterService {
method.addParameter("agentId", Long.toString(agentId));
method.addParameter("gsonPackage", gsonPackage);
method.addParameter("seq", Long.toString(seq));
- method.addParameter("executingPeer", executingPeer);
+ method.addParameter("executingPeer", executingPeer);
String result = executePostMethod(client, method);
if(result.contains("recurring=true")) {
@@ -132,7 +131,7 @@ public class ClusterServiceServletImpl implements ClusterService {
if(s_logger.isDebugEnabled()) {
s_logger.debug("Ping at " + serviceUrl);
}
-
+
HttpClient client = new HttpClient();
PostMethod method = new PostMethod(serviceUrl);
@@ -169,6 +168,7 @@ public class ClusterServiceServletImpl implements ClusterService {
} catch(Throwable e) {
s_logger.error("Exception from : " + serviceUrl + ", method : " + method.getParameter("method") + ", exception :", e);
}
+
return result;
}
@@ -179,6 +179,6 @@ public class ClusterServiceServletImpl implements ClusterService {
String result = service.execute("test", 1, "{ p1:v1, p2:v2 }", true);
System.out.println(result);
} catch (RemoteException e) {
- }
+ }
}
}
diff --git a/server/src/com/cloud/cluster/ClusteredAgentRebalanceService.java b/server/src/com/cloud/cluster/ClusteredAgentRebalanceService.java
new file mode 100644
index 00000000000..a655101f9ae
--- /dev/null
+++ b/server/src/com/cloud/cluster/ClusteredAgentRebalanceService.java
@@ -0,0 +1,14 @@
+package com.cloud.cluster;
+
+import com.cloud.exception.AgentUnavailableException;
+import com.cloud.exception.OperationTimedoutException;
+import com.cloud.host.Status.Event;
+
+public interface ClusteredAgentRebalanceService {
+ public static final int DEFAULT_TRANSFER_CHECK_INTERVAL = 10000;
+
+ void startRebalanceAgents();
+
+ boolean executeRebalanceRequest(long agentId, Event event) throws AgentUnavailableException, OperationTimedoutException;
+
+}
diff --git a/server/src/com/cloud/cluster/DummyClusterManagerImpl.java b/server/src/com/cloud/cluster/DummyClusterManagerImpl.java
index 8d19dd0d56e..0844757e69d 100644
--- a/server/src/com/cloud/cluster/DummyClusterManagerImpl.java
+++ b/server/src/com/cloud/cluster/DummyClusterManagerImpl.java
@@ -44,47 +44,58 @@ public class DummyClusterManagerImpl implements ClusterManager {
private String _name;
private final String _clusterNodeIP = "127.0.0.1";
+ @Override
public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError) {
throw new CloudRuntimeException("Unsupported feature");
}
+ @Override
public long executeAsync(String strPeer, long agentId, Command[] cmds, boolean stopOnError, Listener listener) {
throw new CloudRuntimeException("Unsupported feature");
}
+ @Override
public boolean onAsyncResult(String executingPeer, long agentId, long seq, Answer[] answers) {
throw new CloudRuntimeException("Unsupported feature");
}
+ @Override
public boolean forwardAnswer(String targetPeer, long agentId, long seq, Answer[] answers) {
throw new CloudRuntimeException("Unsupported feature");
}
+ @Override
public Answer[] sendToAgent(Long hostId, Command [] cmds, boolean stopOnError)
throws AgentUnavailableException, OperationTimedoutException {
throw new CloudRuntimeException("Unsupported feature");
}
+ @Override
public long sendToAgent(Long hostId, Command[] cmds, boolean stopOnError, Listener listener) throws AgentUnavailableException {
throw new CloudRuntimeException("Unsupported feature");
}
+ @Override
public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException {
throw new CloudRuntimeException("Unsupported feature");
}
+ @Override
public Boolean propagateAgentEvent(long agentId, Event event) throws AgentUnavailableException {
throw new CloudRuntimeException("Unsupported feature");
}
- public int getHeartbeatThreshold() {
+ @Override
+ public int getHeartbeatThreshold() {
return ClusterManager.DEFAULT_HEARTBEAT_INTERVAL;
}
- public long getManagementNodeId() {
+ @Override
+ public long getManagementNodeId() {
return _id;
}
+ @Override
public long getCurrentRunId() {
return _runId;
}
@@ -94,30 +105,37 @@ public class DummyClusterManagerImpl implements ClusterManager {
return null;
}
- public String getSelfPeerName() {
+ @Override
+ public String getSelfPeerName() {
return Long.toString(_id);
}
- public String getSelfNodeIP() {
+ @Override
+ public String getSelfNodeIP() {
return _clusterNodeIP;
}
+ @Override
public boolean isManagementNodeAlive(long msid) {
return true;
}
+ @Override
public boolean pingManagementNode(long msid) {
return false;
}
+ @Override
public String getPeerName(long agentHostId) {
throw new CloudRuntimeException("Unsupported feature");
}
- public void registerListener(ClusterManagerListener listener) {
+ @Override
+ public void registerListener(ClusterManagerListener listener) {
}
- public void unregisterListener(ClusterManagerListener listener) {
+ @Override
+ public void unregisterListener(ClusterManagerListener listener) {
}
@Override
@@ -146,5 +164,15 @@ public class DummyClusterManagerImpl implements ClusterManager {
@Override
public boolean stop() {
return true;
- }
+ }
+
+ @Override
+ public boolean rebalanceAgent(long agentId, Event event) throws AgentUnavailableException, OperationTimedoutException {
+ return false;
+ }
+
+ @Override
+ public boolean isAgentRebalanceEnabled() {
+ return false;
+ }
}
diff --git a/server/src/com/cloud/cluster/ManagementServerHostVO.java b/server/src/com/cloud/cluster/ManagementServerHostVO.java
index 82be23a3434..42e3296ed59 100644
--- a/server/src/com/cloud/cluster/ManagementServerHostVO.java
+++ b/server/src/com/cloud/cluster/ManagementServerHostVO.java
@@ -35,7 +35,7 @@ import com.cloud.utils.db.GenericDao;
@Entity
@Table(name="mshost")
-public class ManagementServerHostVO {
+public class ManagementServerHostVO implements ManagementServerHost{
@Id
@GeneratedValue(strategy=GenerationType.IDENTITY)
@@ -53,7 +53,7 @@ public class ManagementServerHostVO {
@Column(name="state", updatable = true, nullable=false)
@Enumerated(value=EnumType.STRING)
- private ManagementServerNode.State state;
+ private ManagementServerHost.State state;
@Column(name="version", updatable=true, nullable=true)
private String version;
@@ -100,7 +100,8 @@ public class ManagementServerHostVO {
public void setRunid(long runid) {
this.runid = runid;
}
-
+
+ @Override
public long getMsid() {
return msid;
}
@@ -117,14 +118,16 @@ public class ManagementServerHostVO {
this.name = name;
}
- public ManagementServerNode.State getState() {
+ @Override
+ public ManagementServerHost.State getState() {
return this.state;
}
- public void setState(ManagementServerNode.State state) {
+ public void setState(ManagementServerHost.State state) {
this.state = state;
}
-
+
+ @Override
public String getVersion() {
return version;
}
diff --git a/server/src/com/cloud/cluster/agentlb/AgentLoadBalancerPlanner.java b/server/src/com/cloud/cluster/agentlb/AgentLoadBalancerPlanner.java
new file mode 100644
index 00000000000..2a87724d253
--- /dev/null
+++ b/server/src/com/cloud/cluster/agentlb/AgentLoadBalancerPlanner.java
@@ -0,0 +1,31 @@
+/**
+ * Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
+ *
+ * This software is licensed under the GNU General Public License v3 or later.
+ *
+ * It is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or any later version.
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+package com.cloud.cluster.agentlb;
+
+import java.util.List;
+
+import com.cloud.host.HostVO;
+import com.cloud.utils.component.Adapter;
+
+
+public interface AgentLoadBalancerPlanner extends Adapter{
+
+ List getHostsToRebalance(long msId, long avLoad);
+
+}
diff --git a/server/src/com/cloud/cluster/agentlb/ClusterBasedAgentLoadBalancerPlanner.java b/server/src/com/cloud/cluster/agentlb/ClusterBasedAgentLoadBalancerPlanner.java
new file mode 100644
index 00000000000..0f896d7f43c
--- /dev/null
+++ b/server/src/com/cloud/cluster/agentlb/ClusterBasedAgentLoadBalancerPlanner.java
@@ -0,0 +1,167 @@
+/**
+ * Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
+ *
+ * This software is licensed under the GNU General Public License v3 or later.
+ *
+ * It is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or any later version.
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *
+ */
+package com.cloud.cluster.agentlb;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.ejb.Local;
+import javax.naming.ConfigurationException;
+
+import org.apache.log4j.Logger;
+
+import com.cloud.host.HostVO;
+import com.cloud.host.Status;
+import com.cloud.host.dao.HostDao;
+import com.cloud.utils.component.Inject;
+
+
+@Local(value=AgentLoadBalancerPlanner.class)
+public class ClusterBasedAgentLoadBalancerPlanner implements AgentLoadBalancerPlanner{
+ private static final Logger s_logger = Logger.getLogger(AgentLoadBalancerPlanner.class);
+ private String _name;
+
+ @Inject HostDao _hostDao = null;
+
+ @Override
+ public boolean configure(String name, Map params) throws ConfigurationException {
+ _name = name;
+ return true;
+ }
+
+ @Override
+ public String getName() {
+ return _name;
+ }
+
+ @Override
+ public boolean start() {
+ return true;
+ }
+
+ @Override
+ public boolean stop() {
+ return true;
+ }
+
+ @Override
+ public List getHostsToRebalance(long msId, long avLoad) {
+ List allHosts = _hostDao.listByManagementServer(msId);
+
+ if (allHosts.size() <= avLoad) {
+ s_logger.debug("Agent load for management server " + msId + " doesn't exceed av load " + avLoad + "; so it doesn't participate in agent rebalancing process");
+ return null;
+ }
+
+ List directHosts = _hostDao.listDirectHostsBy(msId, Status.Up);
+ if (directHosts.isEmpty()) {
+ s_logger.debug("No direct agents in status " + Status.Up + " exist for the management server " + msId + "; so it doesn't participate in agent rebalancing process");
+ return null;
+ }
+
+
+ Map> hostToClusterMap = new HashMap>();
+
+ for (HostVO directHost : directHosts) {
+ Long clusterId = directHost.getClusterId();
+ List directHostsPerCluster = null;
+ if (!hostToClusterMap.containsKey(clusterId)) {
+ directHostsPerCluster = new ArrayList();
+ } else {
+ directHostsPerCluster = hostToClusterMap.get(clusterId);
+ }
+ directHostsPerCluster.add(directHost);
+ hostToClusterMap.put(clusterId, directHostsPerCluster);
+ }
+
+ hostToClusterMap = sortByClusterSize(hostToClusterMap);
+
+ long hostsToGive = allHosts.size() - avLoad;
+ long hostsLeftToGive = hostsToGive;
+ long hostsLeft = directHosts.size();
+ List hostsToReturn = new ArrayList();
+ int count = 0;
+
+ for (Long cluster : hostToClusterMap.keySet()) {
+ List hostsInCluster = hostToClusterMap.get(cluster);
+ hostsLeft = hostsLeft - hostsInCluster.size();
+ count++;
+ if (hostsToReturn.size() < hostsToGive) {
+ s_logger.debug("Trying cluster id=" + cluster);
+
+ if (hostsInCluster.size() > hostsLeftToGive) {
+ if (hostsLeft >= hostsLeftToGive) {
+ s_logger.debug("Skipping cluster id=" + cluster + " as it has more hosts that we need: " + hostsInCluster.size() + " vs " + hostsLeftToGive);
+ continue;
+ } else {
+ if (count == hostToClusterMap.size()) {
+ //get all hosts that are needed from the cluster
+ for (int i=0; i <= hostsLeftToGive; i++) {
+ hostsToReturn.add(hostsInCluster.get(i));
+ hostsLeftToGive = hostsLeftToGive - 1;
+ s_logger.debug("Taking host " + hostsInCluster.get(i) + " from cluster " + cluster);
+ }
+ }
+ break;
+ }
+ } else {
+ s_logger.debug("Taking all " + hostsInCluster.size() + " hosts: " + hostsInCluster + " from cluster id=" + cluster);
+ hostsToReturn.addAll(hostsInCluster);
+ hostsLeftToGive = hostsLeftToGive - hostsInCluster.size();
+ }
+ } else {
+ break;
+ }
+ }
+
+ return hostsToReturn;
+ }
+
+ public static LinkedHashMap> sortByClusterSize(final Map> hostToClusterMap) {
+ List keys = new ArrayList();
+ keys.addAll(hostToClusterMap.keySet());
+ Collections.sort(keys, new Comparator() {
+ @Override
+ public int compare(Long o1, Long o2) {
+ List v1 = hostToClusterMap.get(o1);
+ List v2 = hostToClusterMap.get(o2);
+ if (v1 == null) {
+ return (v2 == null) ? 0 : 1;
+ }
+
+ if (v1.size() < v2.size()) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+ });
+
+ LinkedHashMap> sortedMap = new LinkedHashMap>();
+ for (Long key : keys) {
+ sortedMap.put(key, hostToClusterMap.get(key));
+ }
+ return sortedMap;
+ }
+
+}
diff --git a/server/src/com/cloud/cluster/agentlb/HostTransferMapVO.java b/server/src/com/cloud/cluster/agentlb/HostTransferMapVO.java
new file mode 100644
index 00000000000..a2bb428b084
--- /dev/null
+++ b/server/src/com/cloud/cluster/agentlb/HostTransferMapVO.java
@@ -0,0 +1,99 @@
+/**
+ * Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
+ *
+ * This software is licensed under the GNU General Public License v3 or later.
+ *
+ * It is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or any later version.
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+package com.cloud.cluster.agentlb;
+
+import java.util.Date;
+
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Inheritance;
+import javax.persistence.InheritanceType;
+import javax.persistence.Table;
+
+import com.cloud.utils.db.GenericDao;
+
+@Entity
+@Table(name = "op_host_transfer")
+@Inheritance(strategy = InheritanceType.TABLE_PER_CLASS)
+public class HostTransferMapVO {
+
+ public enum HostTransferState {
+ TransferRequested, TransferStarted, TransferCompleted, TransferFailed;
+ }
+
+ @Id
+ @Column(name = "id")
+ private long id;
+
+ @Column(name = "initial_mgmt_server_id")
+ private long initialOwner;
+
+ @Column(name = "future_mgmt_server_id")
+ private long futureOwner;
+
+ @Column(name = "state")
+ private HostTransferState state;
+
+ @Column(name=GenericDao.CREATED_COLUMN)
+ private Date created;
+
+ public HostTransferMapVO(long hostId, long initialOwner, long futureOwner) {
+ this.id = hostId;
+ this.initialOwner = initialOwner;
+ this.futureOwner = futureOwner;
+ this.state = HostTransferState.TransferRequested;
+ }
+
+ protected HostTransferMapVO() {
+ }
+
+ public long getInitialOwner() {
+ return initialOwner;
+ }
+
+ public long getFutureOwner() {
+ return futureOwner;
+ }
+
+ public HostTransferState getState() {
+ return state;
+ }
+
+ public void setInitialOwner(long initialOwner) {
+ this.initialOwner = initialOwner;
+ }
+
+ public void setFutureOwner(long futureOwner) {
+ this.futureOwner = futureOwner;
+ }
+
+ public void setState(HostTransferState state) {
+ this.state = state;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public Date getCreated() {
+ return created;
+ }
+
+}
diff --git a/server/src/com/cloud/cluster/agentlb/dao/HostTransferMapDao.java b/server/src/com/cloud/cluster/agentlb/dao/HostTransferMapDao.java
new file mode 100644
index 00000000000..4bb0b172250
--- /dev/null
+++ b/server/src/com/cloud/cluster/agentlb/dao/HostTransferMapDao.java
@@ -0,0 +1,41 @@
+/**
+ * Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
+ *
+ * This software is licensed under the GNU General Public License v3 or later.
+ *
+ * It is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or any later version.
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+package com.cloud.cluster.agentlb.dao;
+
+import java.util.Date;
+import java.util.List;
+
+import com.cloud.cluster.agentlb.HostTransferMapVO;
+import com.cloud.cluster.agentlb.HostTransferMapVO.HostTransferState;
+import com.cloud.utils.db.GenericDao;
+
+public interface HostTransferMapDao extends GenericDao {
+
+ List listHostsLeavingCluster(long clusterId);
+
+ List listHostsJoiningCluster(long clusterId);
+
+ HostTransferMapVO startAgentTransfering(long hostId, long currentOwner, long futureOwner);
+
+ boolean completeAgentTransfering(long hostId, boolean success);
+
+ List listBy(long futureOwnerId, HostTransferState state);
+
+ boolean isActive(long hostId, Date cutTime);
+}
diff --git a/server/src/com/cloud/cluster/agentlb/dao/HostTransferMapDaoImpl.java b/server/src/com/cloud/cluster/agentlb/dao/HostTransferMapDaoImpl.java
new file mode 100644
index 00000000000..8491215e8cc
--- /dev/null
+++ b/server/src/com/cloud/cluster/agentlb/dao/HostTransferMapDaoImpl.java
@@ -0,0 +1,125 @@
+/**
+ * Copyright (C) 2010 Cloud.com, Inc. All rights reserved.
+ *
+ * This software is licensed under the GNU General Public License v3 or later.
+ *
+ * It is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or any later version.
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+package com.cloud.cluster.agentlb.dao;
+
+import java.util.Date;
+import java.util.List;
+
+import javax.ejb.Local;
+
+import org.apache.log4j.Logger;
+
+import com.cloud.cluster.agentlb.HostTransferMapVO;
+import com.cloud.cluster.agentlb.HostTransferMapVO.HostTransferState;
+import com.cloud.utils.db.DB;
+import com.cloud.utils.db.GenericDaoBase;
+import com.cloud.utils.db.SearchBuilder;
+import com.cloud.utils.db.SearchCriteria;
+
+@Local(value = { HostTransferMapDao.class })
+@DB(txn = false)
+public class HostTransferMapDaoImpl extends GenericDaoBase implements HostTransferMapDao {
+ private static final Logger s_logger = Logger.getLogger(HostTransferMapDaoImpl.class);
+
+ protected final SearchBuilder AllFieldsSearch;
+ protected final SearchBuilder IntermediateStateSearch;
+ protected final SearchBuilder InactiveSearch;
+
+ public HostTransferMapDaoImpl() {
+ AllFieldsSearch = createSearchBuilder();
+ AllFieldsSearch.and("id", AllFieldsSearch.entity().getId(), SearchCriteria.Op.EQ);
+ AllFieldsSearch.and("initialOwner", AllFieldsSearch.entity().getInitialOwner(), SearchCriteria.Op.EQ);
+ AllFieldsSearch.and("futureOwner", AllFieldsSearch.entity().getFutureOwner(), SearchCriteria.Op.EQ);
+ AllFieldsSearch.and("state", AllFieldsSearch.entity().getState(), SearchCriteria.Op.EQ);
+ AllFieldsSearch.done();
+
+ IntermediateStateSearch = createSearchBuilder();
+ IntermediateStateSearch.and("futureOwner", IntermediateStateSearch.entity().getFutureOwner(), SearchCriteria.Op.EQ);
+ IntermediateStateSearch.and("state", IntermediateStateSearch.entity().getState(), SearchCriteria.Op.NOTIN);
+ IntermediateStateSearch.done();
+
+ InactiveSearch = createSearchBuilder();
+ InactiveSearch.and("created", InactiveSearch.entity().getCreated(), SearchCriteria.Op.LTEQ);
+ InactiveSearch.and("id", InactiveSearch.entity().getId(), SearchCriteria.Op.EQ);
+ InactiveSearch.and("state", InactiveSearch.entity().getState(), SearchCriteria.Op.EQ);
+ InactiveSearch.done();
+
+ }
+
+ @Override
+ public List listHostsLeavingCluster(long clusterId) {
+ SearchCriteria sc = IntermediateStateSearch.create();
+ sc.setParameters("initialOwner", clusterId);
+ sc.setParameters("state", HostTransferState.TransferRequested, HostTransferState.TransferStarted);
+
+ return listBy(sc);
+ }
+
+ @Override
+ public List listHostsJoiningCluster(long clusterId) {
+ SearchCriteria sc = IntermediateStateSearch.create();
+ sc.setParameters("futureOwner", clusterId);
+ sc.setParameters("state", HostTransferState.TransferRequested, HostTransferState.TransferStarted);
+ return listBy(sc);
+ }
+
+
+
+ @Override
+ public HostTransferMapVO startAgentTransfering(long hostId, long initialOwner, long futureOwner) {
+ HostTransferMapVO transfer = new HostTransferMapVO(hostId, initialOwner, futureOwner);
+ return persist(transfer);
+ }
+
+ @Override
+ public boolean completeAgentTransfering(long hostId, boolean success) {
+ HostTransferMapVO transfer = findById(hostId);
+ if (success) {
+ transfer.setState(HostTransferState.TransferCompleted);
+ } else {
+ transfer.setState(HostTransferState.TransferFailed);
+ }
+ return update(hostId, transfer);
+ }
+
+ @Override
+ public List listBy(long futureOwnerId, HostTransferState state) {
+ SearchCriteria sc = AllFieldsSearch.create();
+ sc.setParameters("futureOwner", futureOwnerId);
+ sc.setParameters("state", state);
+
+ return listBy(sc);
+ }
+
+ @Override
+ public boolean isActive(long hostId, Date cutTime) {
+ SearchCriteria sc = InactiveSearch.create();
+ sc.setParameters("id", hostId);
+ sc.setParameters("state", HostTransferState.TransferRequested);
+ sc.setParameters("created", cutTime);
+
+
+ if (listBy(sc).isEmpty()) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+}
diff --git a/server/src/com/cloud/cluster/dao/ManagementServerHostDao.java b/server/src/com/cloud/cluster/dao/ManagementServerHostDao.java
index 96458594b66..8a92844e90a 100644
--- a/server/src/com/cloud/cluster/dao/ManagementServerHostDao.java
+++ b/server/src/com/cloud/cluster/dao/ManagementServerHostDao.java
@@ -22,10 +22,12 @@ import java.sql.Connection;
import java.util.Date;
import java.util.List;
+import com.cloud.cluster.ManagementServerHost.State;
import com.cloud.cluster.ManagementServerHostVO;
import com.cloud.utils.db.GenericDao;
public interface ManagementServerHostDao extends GenericDao {
+ @Override
boolean remove(Long id);
ManagementServerHostVO findByMsid(long msid);
@@ -41,4 +43,6 @@ public interface ManagementServerHostDao extends GenericDao getActiveList(Connection conn, Date cutTime);
List getInactiveList(Connection conn, Date cutTime);
+
+ void update(Connection conn, long id, long runId, State state, Date lastUpdate);
}
diff --git a/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java b/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java
index 52523d9b2a3..89494bcbbab 100644
--- a/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java
+++ b/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java
@@ -30,8 +30,9 @@ import javax.ejb.Local;
import org.apache.log4j.Logger;
import com.cloud.cluster.ClusterInvalidSessionException;
+import com.cloud.cluster.ManagementServerHost;
+import com.cloud.cluster.ManagementServerHost.State;
import com.cloud.cluster.ManagementServerHostVO;
-import com.cloud.cluster.ManagementServerNode;
import com.cloud.utils.DateUtil;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.GenericDaoBase;
@@ -48,7 +49,8 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase ActiveSearch;
private final SearchBuilder InactiveSearch;
- public void update(Connection conn, long id, long runid, String name, String version, String serviceIP, int servicePort, Date lastUpdate) {
+ @Override
+ public void update(Connection conn, long id, long runid, String name, String version, String serviceIP, int servicePort, Date lastUpdate) {
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement("update mshost set name=?, version=?, service_ip=?, service_port=?, last_update=?, removed=null, alert_count=0, runid=? where id=?");
@@ -75,7 +77,8 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase getActiveList(Connection conn, Date cutTime) {
+ @Override
+ public List getActiveList(Connection conn, Date cutTime) {
Transaction txn = Transaction.openNew("getActiveList", conn);
try {
SearchCriteria sc = ActiveSearch.create();
@@ -135,7 +140,8 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase getInactiveList(Connection conn, Date cutTime) {
+ @Override
+ public List getInactiveList(Connection conn, Date cutTime) {
Transaction txn = Transaction.openNew("getInactiveList", conn);
try {
SearchCriteria sc = InactiveSearch.create();
@@ -147,7 +153,8 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase sc = MsIdSearch.create();
sc.setParameters("msid", msid);
@@ -158,21 +165,23 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase getActiveList(Date cutTime) {
+ @Override
+ public List getActiveList(Date cutTime) {
SearchCriteria sc = ActiveSearch.create();
sc.setParameters("lastUpdateTime", cutTime);
return listIncludingRemovedBy(sc);
}
- public List getInactiveList(Date cutTime) {
+ @Override
+ public List getInactiveList(Date cutTime) {
SearchCriteria sc = InactiveSearch.create();
sc.setParameters("lastUpdateTime", cutTime);
return listIncludingRemovedBy(sc);
}
- @DB
+ @Override
+ @DB
public void increaseAlertCount(long id) {
Transaction txn = Transaction.currentTxn();
PreparedStatement pstmt = null;
@@ -272,5 +286,34 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase _componentClass;
diff --git a/server/src/com/cloud/configuration/DefaultComponentLibrary.java b/server/src/com/cloud/configuration/DefaultComponentLibrary.java
index 63146860079..e85275e5ca3 100644
--- a/server/src/com/cloud/configuration/DefaultComponentLibrary.java
+++ b/server/src/com/cloud/configuration/DefaultComponentLibrary.java
@@ -41,6 +41,7 @@ import com.cloud.cluster.ClusterFenceManagerImpl;
import com.cloud.cluster.ClusterManagerImpl;
import com.cloud.cluster.DummyClusterManagerImpl;
import com.cloud.cluster.ManagementServerNode;
+import com.cloud.cluster.agentlb.dao.HostTransferMapDaoImpl;
import com.cloud.cluster.dao.ManagementServerHostDaoImpl;
import com.cloud.cluster.dao.StackMaidDaoImpl;
import com.cloud.configuration.dao.ConfigurationDaoImpl;
@@ -276,6 +277,7 @@ public class DefaultComponentLibrary extends ComponentLibraryBase implements Com
addDao("KeystoreDao", KeystoreDaoImpl.class);
addDao("DcDetailsDao", DcDetailsDaoImpl.class);
addDao("SwiftDao", SwiftDaoImpl.class);
+ addDao("AgentTransferMapDao", HostTransferMapDaoImpl.class);
}
@Override
diff --git a/server/src/com/cloud/host/dao/HostDao.java b/server/src/com/cloud/host/dao/HostDao.java
index e91b2326224..de035f20743 100644
--- a/server/src/com/cloud/host/dao/HostDao.java
+++ b/server/src/com/cloud/host/dao/HostDao.java
@@ -68,8 +68,7 @@ public interface HostDao extends GenericDao {
*/
List findDirectlyConnectedHosts();
- List findDirectAgentToLoad(long msid, long lastPingSecondsAfter, Long limit);
-
+ List findDirectAgentToLoad(long lastPingSecondsAfter, Long limit);
/**
* Mark the host as disconnected if it is in one of these states.
* The management server id is set to null.
@@ -167,6 +166,12 @@ public interface HostDao extends GenericDao {
List listSecondaryStorageHosts(long dataCenterId);
boolean directConnect(HostVO host, long msId, boolean secondConnect);
+
+ List listDirectHostsBy(long msId, Status status);
+
+ List listManagedDirectAgents();
+
+ List listManagedAgents();
HostVO findTrafficMonitorHost();
@@ -175,4 +180,6 @@ public interface HostDao extends GenericDao {
List listLocalSecondaryStorageHosts(long dataCenterId);
List listAllSecondaryStorageHosts(long dataCenterId);
+
+ List listByManagementServer(long msId);
}
diff --git a/server/src/com/cloud/host/dao/HostDaoImpl.java b/server/src/com/cloud/host/dao/HostDaoImpl.java
index ef7e598c70a..ed74c108703 100644
--- a/server/src/com/cloud/host/dao/HostDaoImpl.java
+++ b/server/src/com/cloud/host/dao/HostDaoImpl.java
@@ -14,9 +14,9 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*
- */
-package com.cloud.host.dao;
-
+ */
+package com.cloud.host.dao;
+
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -25,7 +25,6 @@ import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.TimeZone;
import javax.ejb.Local;
@@ -57,163 +56,168 @@ import com.cloud.utils.db.Transaction;
import com.cloud.utils.db.UpdateBuilder;
import com.cloud.utils.exception.CloudRuntimeException;
-@Local(value = { HostDao.class }) @DB(txn=false)
-@TableGenerator(name="host_req_sq", table="op_host", pkColumnName="id", valueColumnName="sequence", allocationSize=1)
-public class HostDaoImpl extends GenericDaoBase implements HostDao {
+@Local(value = { HostDao.class })
+@DB(txn = false)
+@TableGenerator(name = "host_req_sq", table = "op_host", pkColumnName = "id", valueColumnName = "sequence", allocationSize = 1)
+public class HostDaoImpl extends GenericDaoBase implements HostDao {
private static final Logger s_logger = Logger.getLogger(HostDaoImpl.class);
-
- protected final SearchBuilder TypePodDcStatusSearch;
-
- protected final SearchBuilder IdStatusSearch;
- protected final SearchBuilder TypeDcSearch;
- protected final SearchBuilder TypeDcStatusSearch;
- protected final SearchBuilder LastPingedSearch;
- protected final SearchBuilder LastPingedSearch2;
- protected final SearchBuilder MsStatusSearch;
- protected final SearchBuilder DcPrivateIpAddressSearch;
- protected final SearchBuilder DcStorageIpAddressSearch;
-
- protected final SearchBuilder GuidSearch;
- protected final SearchBuilder DcSearch;
- protected final SearchBuilder PodSearch;
- protected final SearchBuilder TypeSearch;
- protected final SearchBuilder StatusSearch;
+
+ protected final SearchBuilder TypePodDcStatusSearch;
+
+ protected final SearchBuilder IdStatusSearch;
+ protected final SearchBuilder TypeDcSearch;
+ protected final SearchBuilder TypeDcStatusSearch;
+ protected final SearchBuilder LastPingedSearch;
+ protected final SearchBuilder LastPingedSearch2;
+ protected final SearchBuilder MsStatusSearch;
+ protected final SearchBuilder DcPrivateIpAddressSearch;
+ protected final SearchBuilder DcStorageIpAddressSearch;
+
+ protected final SearchBuilder GuidSearch;
+ protected final SearchBuilder DcSearch;
+ protected final SearchBuilder PodSearch;
+ protected final SearchBuilder TypeSearch;
+ protected final SearchBuilder StatusSearch;
protected final SearchBuilder NameLikeSearch;
protected final SearchBuilder NameSearch;
- protected final SearchBuilder SequenceSearch;
+ protected final SearchBuilder SequenceSearch;
protected final SearchBuilder DirectlyConnectedSearch;
protected final SearchBuilder UnmanagedDirectConnectSearch;
protected final SearchBuilder UnmanagedExternalNetworkApplianceSearch;
- protected final SearchBuilder MaintenanceCountSearch;
+ protected final SearchBuilder MaintenanceCountSearch;
protected final SearchBuilder ClusterSearch;
protected final SearchBuilder ConsoleProxyHostSearch;
protected final SearchBuilder AvailHypevisorInZone;
protected final SearchBuilder DirectConnectSearch;
+ protected final SearchBuilder ManagedDirectConnectSearch;
+ protected final SearchBuilder ManagedConnectSearch;
protected final GenericSearchBuilder HostsInStatusSearch;
protected final GenericSearchBuilder CountRoutingByDc;
-
- protected final Attribute _statusAttr;
- protected final Attribute _msIdAttr;
- protected final Attribute _pingTimeAttr;
-
- protected final HostDetailsDaoImpl _detailsDao = ComponentLocator.inject(HostDetailsDaoImpl.class);
- protected final HostTagsDaoImpl _hostTagsDao = ComponentLocator.inject(HostTagsDaoImpl.class);
- public HostDaoImpl() {
+ protected final Attribute _statusAttr;
+ protected final Attribute _msIdAttr;
+ protected final Attribute _pingTimeAttr;
+
+ protected final HostDetailsDaoImpl _detailsDao = ComponentLocator.inject(HostDetailsDaoImpl.class);
+ protected final HostTagsDaoImpl _hostTagsDao = ComponentLocator.inject(HostTagsDaoImpl.class);
+
+ public HostDaoImpl() {
MaintenanceCountSearch = createSearchBuilder();
MaintenanceCountSearch.and("cluster", MaintenanceCountSearch.entity().getClusterId(), SearchCriteria.Op.EQ);
MaintenanceCountSearch.and("status", MaintenanceCountSearch.entity().getStatus(), SearchCriteria.Op.IN);
MaintenanceCountSearch.done();
-
- TypePodDcStatusSearch = createSearchBuilder();
- HostVO entity = TypePodDcStatusSearch.entity();
- TypePodDcStatusSearch.and("type", entity.getType(), SearchCriteria.Op.EQ);
- TypePodDcStatusSearch.and("pod", entity.getPodId(), SearchCriteria.Op.EQ);
+
+ TypePodDcStatusSearch = createSearchBuilder();
+ HostVO entity = TypePodDcStatusSearch.entity();
+ TypePodDcStatusSearch.and("type", entity.getType(), SearchCriteria.Op.EQ);
+ TypePodDcStatusSearch.and("pod", entity.getPodId(), SearchCriteria.Op.EQ);
TypePodDcStatusSearch.and("dc", entity.getDataCenterId(), SearchCriteria.Op.EQ);
- TypePodDcStatusSearch.and("cluster", entity.getClusterId(), SearchCriteria.Op.EQ);
- TypePodDcStatusSearch.and("status", entity.getStatus(), SearchCriteria.Op.EQ);
- TypePodDcStatusSearch.done();
-
- LastPingedSearch = createSearchBuilder();
- LastPingedSearch.and("ping", LastPingedSearch.entity().getLastPinged(), SearchCriteria.Op.LT);
- LastPingedSearch.and("state", LastPingedSearch.entity().getStatus(), SearchCriteria.Op.IN);
- LastPingedSearch.done();
-
- LastPingedSearch2 = createSearchBuilder();
- LastPingedSearch2.and("ping", LastPingedSearch2.entity().getLastPinged(), SearchCriteria.Op.LT);
- LastPingedSearch2.and("type", LastPingedSearch2.entity().getType(), SearchCriteria.Op.EQ);
- LastPingedSearch2.done();
-
- MsStatusSearch = createSearchBuilder();
- MsStatusSearch.and("ms", MsStatusSearch.entity().getManagementServerId(), SearchCriteria.Op.EQ);
- MsStatusSearch.and("statuses", MsStatusSearch.entity().getStatus(), SearchCriteria.Op.IN);
- MsStatusSearch.done();
-
- TypeDcSearch = createSearchBuilder();
- TypeDcSearch.and("type", TypeDcSearch.entity().getType(), SearchCriteria.Op.EQ);
- TypeDcSearch.and("dc", TypeDcSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ);
- TypeDcSearch.done();
-
- TypeDcStatusSearch = createSearchBuilder();
- TypeDcStatusSearch.and("type", TypeDcStatusSearch.entity().getType(), SearchCriteria.Op.EQ);
- TypeDcStatusSearch.and("dc", TypeDcStatusSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ);
- TypeDcStatusSearch.and("status", TypeDcStatusSearch.entity().getStatus(), SearchCriteria.Op.EQ);
- TypeDcStatusSearch.done();
-
- IdStatusSearch = createSearchBuilder();
- IdStatusSearch.and("id", IdStatusSearch.entity().getId(), SearchCriteria.Op.EQ);
- IdStatusSearch.and("states", IdStatusSearch.entity().getStatus(), SearchCriteria.Op.IN);
- IdStatusSearch.done();
-
- DcPrivateIpAddressSearch = createSearchBuilder();
- DcPrivateIpAddressSearch.and("privateIpAddress", DcPrivateIpAddressSearch.entity().getPrivateIpAddress(), SearchCriteria.Op.EQ);
- DcPrivateIpAddressSearch.and("dc", DcPrivateIpAddressSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ);
- DcPrivateIpAddressSearch.done();
-
- DcStorageIpAddressSearch = createSearchBuilder();
- DcStorageIpAddressSearch.and("storageIpAddress", DcStorageIpAddressSearch.entity().getStorageIpAddress(), SearchCriteria.Op.EQ);
- DcStorageIpAddressSearch.and("dc", DcStorageIpAddressSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ);
- DcStorageIpAddressSearch.done();
-
- GuidSearch = createSearchBuilder();
- GuidSearch.and("guid", GuidSearch.entity().getGuid(), SearchCriteria.Op.EQ);
- GuidSearch.done();
-
- DcSearch = createSearchBuilder();
- DcSearch.and("dc", DcSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ);
+ TypePodDcStatusSearch.and("cluster", entity.getClusterId(), SearchCriteria.Op.EQ);
+ TypePodDcStatusSearch.and("status", entity.getStatus(), SearchCriteria.Op.EQ);
+ TypePodDcStatusSearch.done();
+
+ LastPingedSearch = createSearchBuilder();
+ LastPingedSearch.and("ping", LastPingedSearch.entity().getLastPinged(), SearchCriteria.Op.LT);
+ LastPingedSearch.and("state", LastPingedSearch.entity().getStatus(), SearchCriteria.Op.IN);
+ LastPingedSearch.done();
+
+ LastPingedSearch2 = createSearchBuilder();
+ LastPingedSearch2.and("ping", LastPingedSearch2.entity().getLastPinged(), SearchCriteria.Op.LT);
+ LastPingedSearch2.and("type", LastPingedSearch2.entity().getType(), SearchCriteria.Op.EQ);
+ LastPingedSearch2.done();
+
+ MsStatusSearch = createSearchBuilder();
+ MsStatusSearch.and("ms", MsStatusSearch.entity().getManagementServerId(), SearchCriteria.Op.EQ);
+ MsStatusSearch.and("statuses", MsStatusSearch.entity().getStatus(), SearchCriteria.Op.IN);
+ MsStatusSearch.done();
+
+ TypeDcSearch = createSearchBuilder();
+ TypeDcSearch.and("type", TypeDcSearch.entity().getType(), SearchCriteria.Op.EQ);
+ TypeDcSearch.and("dc", TypeDcSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ);
+ TypeDcSearch.done();
+
+ TypeDcStatusSearch = createSearchBuilder();
+ TypeDcStatusSearch.and("type", TypeDcStatusSearch.entity().getType(), SearchCriteria.Op.EQ);
+ TypeDcStatusSearch.and("dc", TypeDcStatusSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ);
+ TypeDcStatusSearch.and("status", TypeDcStatusSearch.entity().getStatus(), SearchCriteria.Op.EQ);
+ TypeDcStatusSearch.done();
+
+ IdStatusSearch = createSearchBuilder();
+ IdStatusSearch.and("id", IdStatusSearch.entity().getId(), SearchCriteria.Op.EQ);
+ IdStatusSearch.and("states", IdStatusSearch.entity().getStatus(), SearchCriteria.Op.IN);
+ IdStatusSearch.done();
+
+ DcPrivateIpAddressSearch = createSearchBuilder();
+ DcPrivateIpAddressSearch.and("privateIpAddress", DcPrivateIpAddressSearch.entity().getPrivateIpAddress(), SearchCriteria.Op.EQ);
+ DcPrivateIpAddressSearch.and("dc", DcPrivateIpAddressSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ);
+ DcPrivateIpAddressSearch.done();
+
+ DcStorageIpAddressSearch = createSearchBuilder();
+ DcStorageIpAddressSearch.and("storageIpAddress", DcStorageIpAddressSearch.entity().getStorageIpAddress(), SearchCriteria.Op.EQ);
+ DcStorageIpAddressSearch.and("dc", DcStorageIpAddressSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ);
+ DcStorageIpAddressSearch.done();
+
+ GuidSearch = createSearchBuilder();
+ GuidSearch.and("guid", GuidSearch.entity().getGuid(), SearchCriteria.Op.EQ);
+ GuidSearch.done();
+
+ DcSearch = createSearchBuilder();
+ DcSearch.and("dc", DcSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ);
DcSearch.done();
ClusterSearch = createSearchBuilder();
ClusterSearch.and("cluster", ClusterSearch.entity().getClusterId(), SearchCriteria.Op.EQ);
- ClusterSearch.done();
+ ClusterSearch.done();
ConsoleProxyHostSearch = createSearchBuilder();
ConsoleProxyHostSearch.and("name", ConsoleProxyHostSearch.entity().getName(), SearchCriteria.Op.EQ);
ConsoleProxyHostSearch.and("type", ConsoleProxyHostSearch.entity().getType(), SearchCriteria.Op.EQ);
ConsoleProxyHostSearch.done();
-
- PodSearch = createSearchBuilder();
- PodSearch.and("pod", PodSearch.entity().getPodId(), SearchCriteria.Op.EQ);
- PodSearch.done();
-
- TypeSearch = createSearchBuilder();
- TypeSearch.and("type", TypeSearch.entity().getType(), SearchCriteria.Op.EQ);
- TypeSearch.done();
-
- StatusSearch =createSearchBuilder();
- StatusSearch.and("status", StatusSearch.entity().getStatus(), SearchCriteria.Op.IN);
- StatusSearch.done();
-
- NameLikeSearch = createSearchBuilder();
- NameLikeSearch.and("name", NameLikeSearch.entity().getName(), SearchCriteria.Op.LIKE);
+
+ PodSearch = createSearchBuilder();
+ PodSearch.and("pod", PodSearch.entity().getPodId(), SearchCriteria.Op.EQ);
+ PodSearch.done();
+
+ TypeSearch = createSearchBuilder();
+ TypeSearch.and("type", TypeSearch.entity().getType(), SearchCriteria.Op.EQ);
+ TypeSearch.done();
+
+ StatusSearch = createSearchBuilder();
+ StatusSearch.and("status", StatusSearch.entity().getStatus(), SearchCriteria.Op.IN);
+ StatusSearch.done();
+
+ NameLikeSearch = createSearchBuilder();
+ NameLikeSearch.and("name", NameLikeSearch.entity().getName(), SearchCriteria.Op.LIKE);
NameLikeSearch.done();
NameSearch = createSearchBuilder();
NameSearch.and("name", NameSearch.entity().getName(), SearchCriteria.Op.EQ);
NameSearch.done();
-
- SequenceSearch = createSearchBuilder();
- SequenceSearch.and("id", SequenceSearch.entity().getId(), SearchCriteria.Op.EQ);
-// SequenceSearch.addRetrieve("sequence", SequenceSearch.entity().getSequence());
- SequenceSearch.done();
-
- DirectlyConnectedSearch = createSearchBuilder();
- DirectlyConnectedSearch.and("resource", DirectlyConnectedSearch.entity().getResource(), SearchCriteria.Op.NNULL);
+
+ SequenceSearch = createSearchBuilder();
+ SequenceSearch.and("id", SequenceSearch.entity().getId(), SearchCriteria.Op.EQ);
+ // SequenceSearch.addRetrieve("sequence", SequenceSearch.entity().getSequence());
+ SequenceSearch.done();
+
+ DirectlyConnectedSearch = createSearchBuilder();
+ DirectlyConnectedSearch.and("resource", DirectlyConnectedSearch.entity().getResource(), SearchCriteria.Op.NNULL);
+ DirectlyConnectedSearch.and("ms", DirectlyConnectedSearch.entity().getManagementServerId(), SearchCriteria.Op.EQ);
+ DirectlyConnectedSearch.and("statuses", DirectlyConnectedSearch.entity().getStatus(), SearchCriteria.Op.EQ);
DirectlyConnectedSearch.done();
- UnmanagedDirectConnectSearch = createSearchBuilder();
+ UnmanagedDirectConnectSearch = createSearchBuilder();
UnmanagedDirectConnectSearch.and("resource", UnmanagedDirectConnectSearch.entity().getResource(), SearchCriteria.Op.NNULL);
UnmanagedDirectConnectSearch.and("server", UnmanagedDirectConnectSearch.entity().getManagementServerId(), SearchCriteria.Op.NULL);
UnmanagedDirectConnectSearch.and("lastPinged", UnmanagedDirectConnectSearch.entity().getLastPinged(), SearchCriteria.Op.LTEQ);
/*
- UnmanagedDirectConnectSearch.op(SearchCriteria.Op.OR, "managementServerId", UnmanagedDirectConnectSearch.entity().getManagementServerId(), SearchCriteria.Op.EQ);
- UnmanagedDirectConnectSearch.and("lastPinged", UnmanagedDirectConnectSearch.entity().getLastPinged(), SearchCriteria.Op.LTEQ);
- UnmanagedDirectConnectSearch.cp();
- UnmanagedDirectConnectSearch.cp();
- */
+ * UnmanagedDirectConnectSearch.op(SearchCriteria.Op.OR, "managementServerId",
+ * UnmanagedDirectConnectSearch.entity().getManagementServerId(), SearchCriteria.Op.EQ);
+ * UnmanagedDirectConnectSearch.and("lastPinged", UnmanagedDirectConnectSearch.entity().getLastPinged(),
+ * SearchCriteria.Op.LTEQ); UnmanagedDirectConnectSearch.cp(); UnmanagedDirectConnectSearch.cp();
+ */
UnmanagedDirectConnectSearch.done();
DirectConnectSearch = createSearchBuilder();
@@ -252,27 +256,37 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao
CountRoutingByDc.and("dc", CountRoutingByDc.entity().getDataCenterId(), SearchCriteria.Op.EQ);
CountRoutingByDc.and("type", CountRoutingByDc.entity().getType(), SearchCriteria.Op.EQ);
CountRoutingByDc.and("status", CountRoutingByDc.entity().getStatus(), SearchCriteria.Op.EQ);
+
CountRoutingByDc.done();
-
- _statusAttr = _allAttributes.get("status");
- _msIdAttr = _allAttributes.get("managementServerId");
- _pingTimeAttr = _allAttributes.get("lastPinged");
-
- assert (_statusAttr != null && _msIdAttr != null && _pingTimeAttr != null) : "Couldn't find one of these attributes";
+
+ ManagedDirectConnectSearch = createSearchBuilder();
+ ManagedDirectConnectSearch.and("resource", ManagedDirectConnectSearch.entity().getResource(), SearchCriteria.Op.NNULL);
+ ManagedDirectConnectSearch.and("server", ManagedDirectConnectSearch.entity().getManagementServerId(), SearchCriteria.Op.NULL);
+ ManagedDirectConnectSearch.done();
+
+ ManagedConnectSearch = createSearchBuilder();
+ ManagedConnectSearch.and("server", ManagedConnectSearch.entity().getManagementServerId(), SearchCriteria.Op.NNULL);
+ ManagedConnectSearch.done();
+
+ _statusAttr = _allAttributes.get("status");
+ _msIdAttr = _allAttributes.get("managementServerId");
+ _pingTimeAttr = _allAttributes.get("lastPinged");
+
+ assert (_statusAttr != null && _msIdAttr != null && _pingTimeAttr != null) : "Couldn't find one of these attributes";
}
@Override
public long countBy(long clusterId, Status... statuses) {
SearchCriteria sc = MaintenanceCountSearch.create();
- sc.setParameters("status", (Object[])statuses);
+ sc.setParameters("status", (Object[]) statuses);
sc.setParameters("cluster", clusterId);
List hosts = listBy(sc);
return hosts.size();
- }
-
- @Override
+ }
+
+ @Override
public HostVO findSecondaryStorageHost(long dcId) {
SearchCriteria sc = TypeDcSearch.create();
sc.setParameters("type", Host.Type.SecondaryStorage);
@@ -284,34 +298,34 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao
Collections.shuffle(storageHosts);
return storageHosts.get(0);
}
- }
-
- @Override
- public List listSecondaryStorageHosts() {
- SearchCriteria sc = createSearchCriteria();
- sc.addAnd("type", SearchCriteria.Op.EQ, Host.Type.SecondaryStorage);
- return search(sc, null);
}
-
+
+ @Override
+ public List listSecondaryStorageHosts() {
+ SearchCriteria sc = createSearchCriteria();
+ sc.addAnd("type", SearchCriteria.Op.EQ, Host.Type.SecondaryStorage);
+ return search(sc, null);
+ }
+
@Override
public List listSecondaryStorageHosts(long dataCenterId) {
- SearchCriteria sc = createSearchCriteria();
+ SearchCriteria sc = createSearchCriteria();
sc.addAnd("dataCenterId", SearchCriteria.Op.EQ, dataCenterId);
sc.addAnd("type", SearchCriteria.Op.EQ, Host.Type.SecondaryStorage);
return search(sc, null);
}
-
+
@Override
public List listLocalSecondaryStorageHosts() {
- SearchCriteria sc = createSearchCriteria();
+ SearchCriteria sc = createSearchCriteria();
sc.addAnd("type", SearchCriteria.Op.EQ, Host.Type.LocalSecondaryStorage);
return search(sc, null);
}
-
+
@Override
public List listLocalSecondaryStorageHosts(long dataCenterId) {
- SearchCriteria sc = createSearchCriteria();
+ SearchCriteria sc = createSearchCriteria();
sc.addAnd("dataCenterId", SearchCriteria.Op.EQ, dataCenterId);
sc.addAnd("type", SearchCriteria.Op.EQ, Host.Type.LocalSecondaryStorage);
return search(sc, null);
@@ -333,7 +347,7 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao
}
@Override
- public List findDirectAgentToLoad(long msid, long lastPingSecondsAfter, Long limit) {
+ public List findDirectAgentToLoad(long lastPingSecondsAfter, Long limit) {
SearchCriteria sc = UnmanagedDirectConnectSearch.create();
sc.setParameters("lastPinged", lastPingSecondsAfter);
return search(sc, new Filter(HostVO.class, "clusterId", true, 0L, limit));
@@ -344,33 +358,33 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao
SearchCriteria sc = MsStatusSearch.create();
sc.setParameters("ms", msId);
- HostVO host = createForUpdate();
+ HostVO host = createForUpdate();
host.setManagementServerId(null);
- host.setLastPinged((System.currentTimeMillis() >> 10) - ( 10 * 60 ));
- host.setDisconnectedOn(new Date());
-
- UpdateBuilder ub = getUpdateBuilder(host);
- ub.set(host, "status", Status.Disconnected);
-
- update(ub, sc, null);
- }
-
- @Override
- public List listBy(Host.Type type, Long clusterId, Long podId, long dcId) {
+ host.setLastPinged((System.currentTimeMillis() >> 10) - (10 * 60));
+ host.setDisconnectedOn(new Date());
+
+ UpdateBuilder ub = getUpdateBuilder(host);
+ ub.set(host, "status", Status.Disconnected);
+
+ update(ub, sc, null);
+ }
+
+ @Override
+ public List listBy(Host.Type type, Long clusterId, Long podId, long dcId) {
SearchCriteria sc = TypePodDcStatusSearch.create();
- if ( type != null ) {
+ if (type != null) {
sc.setParameters("type", type.toString());
}
if (clusterId != null) {
sc.setParameters("cluster", clusterId);
}
- if (podId != null ) {
+ if (podId != null) {
sc.setParameters("pod", podId);
}
sc.setParameters("dc", dcId);
- sc.setParameters("status", Status.Up.toString());
-
- return listBy(sc);
+ sc.setParameters("status", Status.Up.toString());
+
+ return listBy(sc);
}
@Override
@@ -391,7 +405,7 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao
SearchBuilder hostTagSearch = _hostTagsDao.createSearchBuilder();
HostTagVO tagEntity = hostTagSearch.entity();
- hostTagSearch.and("tag",tagEntity.getTag(), SearchCriteria.Op.EQ);
+ hostTagSearch.and("tag", tagEntity.getTag(), SearchCriteria.Op.EQ);
SearchBuilder hostSearch = createSearchBuilder();
HostVO entity = hostSearch.entity();
@@ -402,7 +416,6 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao
hostSearch.and("status", entity.getStatus(), SearchCriteria.Op.EQ);
hostSearch.join("hostTagSearch", hostTagSearch, entity.getId(), tagEntity.getHostId(), JoinBuilder.JoinType.INNER);
-
SearchCriteria sc = hostSearch.create();
sc.setJoinParameters("hostTagSearch", "tag", hostTag);
sc.setParameters("type", type.toString());
@@ -426,15 +439,15 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao
return listBy(sc);
}
-
- @Override
- public List listBy(Host.Type type, long dcId) {
- SearchCriteria sc = TypeDcStatusSearch.create();
- sc.setParameters("type", type.toString());
- sc.setParameters("dc", dcId);
- sc.setParameters("status", Status.Up.toString());
-
- return listBy(sc);
+
+ @Override
+ public List listBy(Host.Type type, long dcId) {
+ SearchCriteria sc = TypeDcStatusSearch.create();
+ sc.setParameters("type", type.toString());
+ sc.setParameters("dc", dcId);
+ sc.setParameters("status", Status.Up.toString());
+
+ return listBy(sc);
}
@Override
@@ -444,36 +457,36 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao
sc.setParameters("dc", dcId);
return listBy(sc);
- }
-
- @Override
- public HostVO findByPrivateIpAddressInDataCenter(long dcId, String privateIpAddress) {
- SearchCriteria sc = DcPrivateIpAddressSearch.create();
- sc.setParameters("dc", dcId);
- sc.setParameters("privateIpAddress", privateIpAddress);
-
- return findOneBy(sc);
- }
-
- @Override
- public HostVO findByStorageIpAddressInDataCenter(long dcId, String privateIpAddress) {
- SearchCriteria sc = DcStorageIpAddressSearch.create();
- sc.setParameters("dc", dcId);
- sc.setParameters("storageIpAddress", privateIpAddress);
-
- return findOneBy(sc);
- }
-
- @Override
- public void loadDetails(HostVO host) {
- Map details =_detailsDao.findDetails(host.getId());
- host.setDetails(details);
- }
+ }
@Override
- public void loadHostTags(HostVO host){
- List hostTags = _hostTagsDao.gethostTags(host.getId());
- host.setHostTags(hostTags);
+ public HostVO findByPrivateIpAddressInDataCenter(long dcId, String privateIpAddress) {
+ SearchCriteria sc = DcPrivateIpAddressSearch.create();
+ sc.setParameters("dc", dcId);
+ sc.setParameters("privateIpAddress", privateIpAddress);
+
+ return findOneBy(sc);
+ }
+
+ @Override
+ public HostVO findByStorageIpAddressInDataCenter(long dcId, String privateIpAddress) {
+ SearchCriteria sc = DcStorageIpAddressSearch.create();
+ sc.setParameters("dc", dcId);
+ sc.setParameters("storageIpAddress", privateIpAddress);
+
+ return findOneBy(sc);
+ }
+
+ @Override
+ public void loadDetails(HostVO host) {
+ Map details = _detailsDao.findDetails(host.getId());
+ host.setDetails(details);
+ }
+
+ @Override
+ public void loadHostTags(HostVO host) {
+ List hostTags = _hostTagsDao.gethostTags(host.getId());
+ host.setHostTags(hostTags);
}
@Override
@@ -490,20 +503,21 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao
ub.set(host, _statusAttr, Status.Connecting);
return update(host, sc) > 0;
- }
+ }
- @Override
- public boolean updateStatus(HostVO host, Event event, long msId) {
- Status oldStatus = host.getStatus();
- long oldPingTime = host.getLastPinged();
- Status newStatus = oldStatus.getNextStatus(event);
- if ( host == null ) {
+ @Override
+ public boolean updateStatus(HostVO host, Event event, long msId) {
+ if (host == null) {
+ return false;
+ }
+
+ Status oldStatus = host.getStatus();
+ long oldPingTime = host.getLastPinged();
+ Status newStatus = oldStatus.getNextStatus(event);
+
+ if (newStatus == null) {
return false;
}
-
- if (newStatus == null) {
- return false;
- }
SearchBuilder sb = createSearchBuilder();
sb.and("status", sb.entity().getStatus(), SearchCriteria.Op.EQ);
@@ -515,162 +529,164 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao
sb.closeParen();
}
sb.done();
-
- SearchCriteria sc = sb.create();
-
- sc.setParameters("status", oldStatus);
- sc.setParameters("id", host.getId());
- if (newStatus.checkManagementServer()) {
- sc.setParameters("ping", oldPingTime);
- sc.setParameters("msid", msId);
- }
-
- UpdateBuilder ub = getUpdateBuilder(host);
- ub.set(host, _statusAttr, newStatus);
+
+ SearchCriteria sc = sb.create();
+
+ sc.setParameters("status", oldStatus);
+ sc.setParameters("id", host.getId());
+ if (newStatus.checkManagementServer()) {
+ sc.setParameters("ping", oldPingTime);
+ sc.setParameters("msid", msId);
+ }
+
+ UpdateBuilder ub = getUpdateBuilder(host);
+ ub.set(host, _statusAttr, newStatus);
if (newStatus.updateManagementServer()) {
if (newStatus.lostConnection()) {
ub.set(host, _msIdAttr, null);
- } else {
+ } else {
ub.set(host, _msIdAttr, msId);
}
- if( event.equals(Event.Ping) || event.equals(Event.AgentConnected)) {
- ub.set(host, _pingTimeAttr, System.currentTimeMillis() >> 10);
- }
+ if (event.equals(Event.Ping) || event.equals(Event.AgentConnected)) {
+ ub.set(host, _pingTimeAttr, System.currentTimeMillis() >> 10);
+ }
}
- if ( event.equals(Event.ManagementServerDown)) {
- ub.set(host, _pingTimeAttr, (( System.currentTimeMillis() >> 10) - ( 10 * 60 )));
- }
- int result = update(ub, sc, null);
- assert result <= 1 : "How can this update " + result + " rows? ";
-
- if (s_logger.isDebugEnabled() && result == 0) {
- HostVO vo = findById(host.getId());
- assert vo != null : "How how how? : " + host.getId();
-
- StringBuilder str = new StringBuilder("Unable to update host for event:").append(event.toString());
- str.append(". New=[status=").append(newStatus.toString()).append(":msid=").append(newStatus.lostConnection() ? "null" : msId).append(":lastpinged=").append(host.getLastPinged()).append("]");
- str.append("; Old=[status=").append(oldStatus.toString()).append(":msid=").append(msId).append(":lastpinged=").append(oldPingTime).append("]");
- str.append("; DB=[status=").append(vo.getStatus().toString()).append(":msid=").append(vo.getManagementServerId()).append(":lastpinged=").append(vo.getLastPinged()).append("]");
- s_logger.debug(str.toString());
- }
- return result > 0;
- }
-
- @Override
- public boolean disconnect(HostVO host, Event event, long msId) {
- host.setDisconnectedOn(new Date());
- if(event!=null && event.equals(Event.Remove)) {
+ if (event.equals(Event.ManagementServerDown)) {
+ ub.set(host, _pingTimeAttr, ((System.currentTimeMillis() >> 10) - (10 * 60)));
+ }
+ int result = update(ub, sc, null);
+ assert result <= 1 : "How can this update " + result + " rows? ";
+
+ if (s_logger.isDebugEnabled() && result == 0) {
+ HostVO vo = findById(host.getId());
+ assert vo != null : "How how how? : " + host.getId();
+
+ StringBuilder str = new StringBuilder("Unable to update host for event:").append(event.toString());
+ str.append(". New=[status=").append(newStatus.toString()).append(":msid=").append(newStatus.lostConnection() ? "null" : msId).append(":lastpinged=").append(host.getLastPinged())
+ .append("]");
+ str.append("; Old=[status=").append(oldStatus.toString()).append(":msid=").append(msId).append(":lastpinged=").append(oldPingTime).append("]");
+ str.append("; DB=[status=").append(vo.getStatus().toString()).append(":msid=").append(vo.getManagementServerId()).append(":lastpinged=").append(vo.getLastPinged()).append("]");
+ s_logger.debug(str.toString());
+ }
+ return result > 0;
+ }
+
+ @Override
+ public boolean disconnect(HostVO host, Event event, long msId) {
+ host.setDisconnectedOn(new Date());
+ if (event != null && event.equals(Event.Remove)) {
host.setGuid(null);
host.setClusterId(null);
- }
- return updateStatus(host, event, msId);
- }
-
- @Override @DB
- public boolean connect(HostVO host, long msId) {
- Transaction txn = Transaction.currentTxn();
- long id = host.getId();
- txn.start();
-
- if (!updateStatus(host, Event.AgentConnected, msId)) {
- return false;
- }
-
- txn.commit();
- return true;
- }
-
- @Override
- public HostVO findByGuid(String guid) {
- SearchCriteria sc = GuidSearch.create("guid", guid);
- return findOneBy(sc);
+ }
+ return updateStatus(host, event, msId);
+ }
+
+ @Override
+ @DB
+ public boolean connect(HostVO host, long msId) {
+ Transaction txn = Transaction.currentTxn();
+ long id = host.getId();
+ txn.start();
+
+ if (!updateStatus(host, Event.AgentConnected, msId)) {
+ return false;
+ }
+
+ txn.commit();
+ return true;
+ }
+
+ @Override
+ public HostVO findByGuid(String guid) {
+ SearchCriteria sc = GuidSearch.create("guid", guid);
+ return findOneBy(sc);
}
@Override
public HostVO findByName(String name) {
SearchCriteria sc = NameSearch.create("name", name);
return findOneBy(sc);
- }
-
- @Override
- public List findLostHosts(long timeout) {
- SearchCriteria sc = LastPingedSearch.create();
- sc.setParameters("ping", timeout);
- sc.setParameters("state", Status.Up, Status.Updating, Status.Disconnected, Status.Connecting);
- return listBy(sc);
- }
-
+ }
+
@Override
- public List findHostsLike(String hostName) {
- SearchCriteria sc = NameLikeSearch.create();
- sc.setParameters("name", "%" + hostName + "%");
- return listBy(sc);
- }
-
- @Override
- public List findLostHosts2(long timeout, Type type) {
- SearchCriteria sc = LastPingedSearch2.create();
- sc.setParameters("ping", timeout);
- sc.setParameters("type", type.toString());
- return listBy(sc);
- }
-
- @Override
- public List listByDataCenter(long dcId) {
- SearchCriteria sc = DcSearch.create("dc", dcId);
- return listBy(sc);
- }
+ public List findLostHosts(long timeout) {
+ SearchCriteria sc = LastPingedSearch.create();
+ sc.setParameters("ping", timeout);
+ sc.setParameters("state", Status.Up, Status.Updating, Status.Disconnected, Status.Connecting);
+ return listBy(sc);
+ }
+
+ @Override
+ public List findHostsLike(String hostName) {
+ SearchCriteria sc = NameLikeSearch.create();
+ sc.setParameters("name", "%" + hostName + "%");
+ return listBy(sc);
+ }
+
+ @Override
+ public List findLostHosts2(long timeout, Type type) {
+ SearchCriteria sc = LastPingedSearch2.create();
+ sc.setParameters("ping", timeout);
+ sc.setParameters("type", type.toString());
+ return listBy(sc);
+ }
+
+ @Override
+ public List listByDataCenter(long dcId) {
+ SearchCriteria sc = DcSearch.create("dc", dcId);
+ return listBy(sc);
+ }
@Override
public HostVO findConsoleProxyHost(String name, Type type) {
SearchCriteria sc = ConsoleProxyHostSearch.create();
sc.setParameters("name", name);
sc.setParameters("type", type);
- ListhostList = listBy(sc);
+ List hostList = listBy(sc);
- if(hostList==null || hostList.size() == 0) {
+ if (hostList == null || hostList.size() == 0) {
return null;
} else {
return hostList.get(0);
}
}
-
- @Override
- public List listByHostPod(long podId) {
- SearchCriteria sc = PodSearch.create("pod", podId);
- return listBy(sc);
- }
-
- @Override
- public List listByStatus(Status... status) {
- SearchCriteria sc = StatusSearch.create();
- sc.setParameters("status", (Object[])status);
- return listBy(sc);
- }
-
- @Override
- public List listByTypeDataCenter(Type type, long dcId) {
- SearchCriteria sc = TypeDcSearch.create();
- sc.setParameters("type", type.toString());
- sc.setParameters("dc", dcId);
-
- return listBy(sc);
- }
-
- @Override
- public List listByType(Type type) {
- SearchCriteria sc = TypeSearch.create();
- sc.setParameters("type", type.toString());
- return listBy(sc);
- }
- @Override
- public void saveDetails(HostVO host) {
- Map details = host.getDetails();
- if (details == null) {
- return;
- }
- _detailsDao.persist(host.getId(), details);
+ @Override
+ public List listByHostPod(long podId) {
+ SearchCriteria sc = PodSearch.create("pod", podId);
+ return listBy(sc);
+ }
+
+ @Override
+ public List listByStatus(Status... status) {
+ SearchCriteria sc = StatusSearch.create();
+ sc.setParameters("status", (Object[]) status);
+ return listBy(sc);
+ }
+
+ @Override
+ public List listByTypeDataCenter(Type type, long dcId) {
+ SearchCriteria sc = TypeDcSearch.create();
+ sc.setParameters("type", type.toString());
+ sc.setParameters("dc", dcId);
+
+ return listBy(sc);
+ }
+
+ @Override
+ public List listByType(Type type) {
+ SearchCriteria sc = TypeSearch.create();
+ sc.setParameters("type", type.toString());
+ return listBy(sc);
+ }
+
+ @Override
+ public void saveDetails(HostVO host) {
+ Map details = host.getDetails();
+ if (details == null) {
+ return;
+ }
+ _detailsDao.persist(host.getId(), details);
}
protected void saveHostTags(HostVO host) {
@@ -679,15 +695,16 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao
return;
}
_hostTagsDao.persist(host.getId(), hostTags);
- }
-
- @Override @DB
+ }
+
+ @Override
+ @DB
public HostVO persist(HostVO host) {
final String InsertSequenceSql = "INSERT INTO op_host(id) VALUES(?)";
-
- Transaction txn = Transaction.currentTxn();
- txn.start();
-
+
+ Transaction txn = Transaction.currentTxn();
+ txn.start();
+
HostVO dbHost = super.persist(host);
try {
@@ -702,71 +719,69 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao
loadDetails(dbHost);
saveHostTags(host);
loadHostTags(dbHost);
-
- txn.commit();
-
- return dbHost;
+
+ txn.commit();
+
+ return dbHost;
}
- @Override @DB
- public boolean update(Long hostId, HostVO host) {
- Transaction txn = Transaction.currentTxn();
- txn.start();
-
- boolean persisted = super.update(hostId, host);
- if (!persisted) {
- return persisted;
- }
-
- saveDetails(host);
- saveHostTags(host);
-
- txn.commit();
-
- return persisted;
- }
-
- @Override @DB
- public List getRunningHostCounts(Date cutTime) {
- String sql = "select * from (" +
- "select h.data_center_id, h.type, count(*) as count from host as h INNER JOIN mshost as m ON h.mgmt_server_id=m.msid " +
- "where h.status='Up' and h.type='SecondaryStorage' and m.last_update > ? " +
- "group by h.data_center_id, h.type " +
- "UNION ALL " +
- "select h.data_center_id, h.type, count(*) as count from host as h INNER JOIN mshost as m ON h.mgmt_server_id=m.msid " +
- "where h.status='Up' and h.type='Routing' and m.last_update > ? " +
- "group by h.data_center_id, h.type) as t " +
- "ORDER by t.data_center_id, t.type";
-
- ArrayList l = new ArrayList();
+ @Override
+ @DB
+ public boolean update(Long hostId, HostVO host) {
+ Transaction txn = Transaction.currentTxn();
+ txn.start();
- Transaction txn = Transaction.currentTxn();;
- PreparedStatement pstmt = null;
- try {
+ boolean persisted = super.update(hostId, host);
+ if (!persisted) {
+ return persisted;
+ }
+
+ saveDetails(host);
+ saveHostTags(host);
+
+ txn.commit();
+
+ return persisted;
+ }
+
+ @Override
+ @DB
+ public List getRunningHostCounts(Date cutTime) {
+ String sql = "select * from (" + "select h.data_center_id, h.type, count(*) as count from host as h INNER JOIN mshost as m ON h.mgmt_server_id=m.msid "
+ + "where h.status='Up' and h.type='SecondaryStorage' and m.last_update > ? " + "group by h.data_center_id, h.type " + "UNION ALL "
+ + "select h.data_center_id, h.type, count(*) as count from host as h INNER JOIN mshost as m ON h.mgmt_server_id=m.msid "
+ + "where h.status='Up' and h.type='Routing' and m.last_update > ? " + "group by h.data_center_id, h.type) as t " + "ORDER by t.data_center_id, t.type";
+
+ ArrayList l = new ArrayList();
+
+ Transaction txn = Transaction.currentTxn();
+ ;
+ PreparedStatement pstmt = null;
+ try {
pstmt = txn.prepareAutoCloseStatement(sql);
String gmtCutTime = DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime);
pstmt.setString(1, gmtCutTime);
pstmt.setString(2, gmtCutTime);
-
- ResultSet rs = pstmt.executeQuery();
- while(rs.next()) {
- RunningHostCountInfo info = new RunningHostCountInfo();
- info.setDcId(rs.getLong(1));
- info.setHostType(rs.getString(2));
- info.setCount(rs.getInt(3));
-
- l.add(info);
- }
- } catch (SQLException e) {
- } catch (Throwable e) {
- }
- return l;
- }
-
+
+ ResultSet rs = pstmt.executeQuery();
+ while (rs.next()) {
+ RunningHostCountInfo info = new RunningHostCountInfo();
+ info.setDcId(rs.getLong(1));
+ info.setHostType(rs.getString(2));
+ info.setCount(rs.getInt(3));
+
+ l.add(info);
+ }
+ } catch (SQLException e) {
+ } catch (Throwable e) {
+ }
+ return l;
+ }
+
@Override
- public long getNextSequence(long hostId) {
- if (s_logger.isTraceEnabled()) {
- s_logger.trace("getNextSequence(), hostId: " + hostId);
+ public long getNextSequence(long hostId) {
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace("getNextSequence(), hostId: " + hostId);
}
TableGenerator tg = _tgs.get("host_req_sq");
@@ -808,13 +823,13 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao
sc.setParameters("type", hostType);
}
- sc.setParameters("statuses", (Object[])statuses);
+ sc.setParameters("statuses", (Object[]) statuses);
return customSearch(sc, null);
}
@Override
- public long countRoutingHostsByDataCenter(long dcId){
+ public long countRoutingHostsByDataCenter(long dcId) {
SearchCriteria sc = CountRoutingByDc.create();
sc.setParameters("dc", dcId);
sc.setParameters("type", Host.Type.Routing);
@@ -834,4 +849,35 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao
return trafficHosts.get(0);
}
}
+
+ @Override
+ public List listDirectHostsBy(long msId, Status status) {
+ SearchCriteria sc = DirectlyConnectedSearch.create();
+ sc.setParameters("ms", msId);
+ if (status != null) {
+ sc.setParameters("statuses", Status.Up);
+ }
+
+ return listBy(sc);
+ }
+
+ @Override
+ public List listManagedDirectAgents() {
+ SearchCriteria sc = ManagedDirectConnectSearch.create();
+ return listBy(sc);
+ }
+
+ @Override
+ public List listManagedAgents() {
+ SearchCriteria sc = ManagedConnectSearch.create();
+ return listBy(sc);
+ }
+
+ @Override
+ public List listByManagementServer(long msId) {
+ SearchCriteria