From 55692fd7cf0cf83231ec588a98ef54d6e242001f Mon Sep 17 00:00:00 2001 From: alena Date: Mon, 23 May 2011 11:19:20 -0700 Subject: [PATCH] bug 9127: intermediate checkin for agent load balancer --- api/src/com/cloud/agent/api/Command.java | 4 + .../cloud/agent/api/TransferAgentCommand.java | 52 ++ .../cloud/cluster/ManagementServerHost.java | 33 + api/src/com/cloud/host/Status.java | 12 +- client/tomcatconf/components.xml.in | 3 + .../com/xensource/xenapi/Event.java | 11 +- .../com/cloud/agent/manager/AgentAttache.java | 4 + .../manager/ClusteredAgentManagerImpl.java | 333 ++++++- .../manager/ClusteredDirectAgentAttache.java | 70 +- .../src/com/cloud/cluster/ClusterManager.java | 6 +- .../com/cloud/cluster/ClusterManagerImpl.java | 198 ++-- .../cloud/cluster/ClusterManagerMessage.java | 1 + .../ClusterServiceServletHttpHandler.java | 26 +- .../cluster/ClusterServiceServletImpl.java | 18 +- .../ClusteredAgentRebalanceService.java | 14 + .../cluster/DummyClusterManagerImpl.java | 42 +- .../cloud/cluster/ManagementServerHostVO.java | 15 +- .../agentlb/AgentLoadBalancerPlanner.java | 31 + .../ClusterBasedAgentLoadBalancerPlanner.java | 167 ++++ .../cluster/agentlb/HostTransferMapVO.java | 99 ++ .../agentlb/dao/HostTransferMapDao.java | 41 + .../agentlb/dao/HostTransferMapDaoImpl.java | 125 +++ .../cluster/dao/ManagementServerHostDao.java | 4 + .../dao/ManagementServerHostDaoImpl.java | 77 +- .../src/com/cloud/configuration/Config.java | 5 +- .../DefaultComponentLibrary.java | 2 + server/src/com/cloud/host/dao/HostDao.java | 11 +- .../src/com/cloud/host/dao/HostDaoImpl.java | 856 +++++++++--------- setup/db/create-schema.sql | 13 + 29 files changed, 1733 insertions(+), 540 deletions(-) create mode 100644 api/src/com/cloud/agent/api/TransferAgentCommand.java create mode 100644 api/src/com/cloud/cluster/ManagementServerHost.java create mode 100644 server/src/com/cloud/cluster/ClusteredAgentRebalanceService.java create mode 100644 server/src/com/cloud/cluster/agentlb/AgentLoadBalancerPlanner.java create mode 100644 server/src/com/cloud/cluster/agentlb/ClusterBasedAgentLoadBalancerPlanner.java create mode 100644 server/src/com/cloud/cluster/agentlb/HostTransferMapVO.java create mode 100644 server/src/com/cloud/cluster/agentlb/dao/HostTransferMapDao.java create mode 100644 server/src/com/cloud/cluster/agentlb/dao/HostTransferMapDaoImpl.java diff --git a/api/src/com/cloud/agent/api/Command.java b/api/src/com/cloud/agent/api/Command.java index b576108153f..120ed6c7cb6 100755 --- a/api/src/com/cloud/agent/api/Command.java +++ b/api/src/com/cloud/agent/api/Command.java @@ -56,4 +56,8 @@ public abstract class Command { public String getContextParam(String name) { return contextMap.get(name); } + + public boolean allowCaching() { + return true; + } } diff --git a/api/src/com/cloud/agent/api/TransferAgentCommand.java b/api/src/com/cloud/agent/api/TransferAgentCommand.java new file mode 100644 index 00000000000..b2fd36b72a0 --- /dev/null +++ b/api/src/com/cloud/agent/api/TransferAgentCommand.java @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2010 Cloud.com, Inc. All rights reserved. + * + * This software is licensed under the GNU General Public License v3 or later. + * + * It is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ +package com.cloud.agent.api; + +import com.cloud.host.Status.Event; + +public class TransferAgentCommand extends Command { + protected long agentId; + protected long futureOwner; + Event event; + + protected TransferAgentCommand() { + } + + public TransferAgentCommand(long agentId, long futureOwner, Event event) { + this.agentId = agentId; + this.futureOwner = futureOwner; + this.event = event; + } + + public long getAgentId() { + return agentId; + } + + public long getFutureOwner() { + return futureOwner; + } + + public Event getEvent() { + return event; + } + + @Override + public boolean executeInSequence() { + return false; + } +} diff --git a/api/src/com/cloud/cluster/ManagementServerHost.java b/api/src/com/cloud/cluster/ManagementServerHost.java new file mode 100644 index 00000000000..82f6a755bcf --- /dev/null +++ b/api/src/com/cloud/cluster/ManagementServerHost.java @@ -0,0 +1,33 @@ +/** + * Copyright (C) 2010 Cloud.com, Inc. All rights reserved. + * + * This software is licensed under the GNU General Public License v3 or later. + * + * It is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ + + +package com.cloud.cluster; + + +public interface ManagementServerHost { + + public static enum State { Up, Starting, Down }; + + long getMsid(); + + State getState(); + + String getVersion(); + +} diff --git a/api/src/com/cloud/host/Status.java b/api/src/com/cloud/host/Status.java index 1af9dfdf43d..87da0ef2bb7 100644 --- a/api/src/com/cloud/host/Status.java +++ b/api/src/com/cloud/host/Status.java @@ -32,7 +32,8 @@ public enum Status { ErrorInMaintenance(false, false, false), Maintenance(false, false, false), Alert(true, true, true), - Removed(true, false, true); + Removed(true, false, true), + Rebalance(false, false, false); private final boolean updateManagementServer; private final boolean checkManagementServer; @@ -72,7 +73,11 @@ public enum Status { WaitedTooLong(false, "Waited too long from the agent to reconnect on its own. Time to do HA"), Remove(true, "Host is removed"), Ready(false, "Host is ready for commands"), - UpdatePassword(false, "Update host password from db"); + UpdatePassword(false, "Update host password from db"), + RequestAgentRebalance(false, "Request rebalance for the certain host"), + StartAgentRebalance(false, "Start rebalance for the certain host"), + RebalanceCompleted(false, "Host is rebalanced successfully"), + RebalanceFailed(false, "Failed to rebalance the host"); private final boolean isUserRequest; private final String comment; @@ -132,6 +137,7 @@ public enum Status { s_fsm.addTransition(Status.Up, Event.Ping, Status.Up); s_fsm.addTransition(Status.Up, Event.AgentConnected, Status.Connecting); s_fsm.addTransition(Status.Up, Event.ManagementServerDown, Status.Disconnected); + s_fsm.addTransition(Status.Up, Event.StartAgentRebalance, Status.Rebalance); s_fsm.addTransition(Status.Updating, Event.PingTimeout, Status.Alert); s_fsm.addTransition(Status.Updating, Event.Ping, Status.Updating); s_fsm.addTransition(Status.Updating, Event.AgentConnected, Status.Connecting); @@ -177,6 +183,8 @@ public enum Status { s_fsm.addTransition(Status.Alert, Event.Ping, Status.Up); s_fsm.addTransition(Status.Alert, Event.Remove, Status.Removed); s_fsm.addTransition(Status.Alert, Event.ManagementServerDown, Status.Alert); + s_fsm.addTransition(Status.Rebalance, Event.RebalanceFailed, Status.Alert); + s_fsm.addTransition(Status.Rebalance, Event.RebalanceCompleted, Status.Connecting); } public static void main(String[] args) { diff --git a/client/tomcatconf/components.xml.in b/client/tomcatconf/components.xml.in index 17a6d3d3c54..a991e84fcde 100755 --- a/client/tomcatconf/components.xml.in +++ b/client/tomcatconf/components.xml.in @@ -93,6 +93,9 @@ + + + diff --git a/deps/XenServerJava/com/xensource/xenapi/Event.java b/deps/XenServerJava/com/xensource/xenapi/Event.java index 82c4a3b876b..5eb9cc8cf47 100644 --- a/deps/XenServerJava/com/xensource/xenapi/Event.java +++ b/deps/XenServerJava/com/xensource/xenapi/Event.java @@ -33,20 +33,18 @@ package com.xensource.xenapi; -import com.xensource.xenapi.Types.BadServerResponse; -import com.xensource.xenapi.Types.VersionException; -import com.xensource.xenapi.Types.XenAPIException; - import java.io.PrintWriter; import java.io.StringWriter; import java.util.Date; import java.util.HashMap; -import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; import org.apache.xmlrpc.XmlRpcException; +import com.xensource.xenapi.Types.BadServerResponse; +import com.xensource.xenapi.Types.XenAPIException; + /** * Asynchronous event registration and handling * @@ -66,6 +64,7 @@ public class Event extends XenAPIObject { this.ref = ref; } + @Override public String toWireString() { return this.ref; } @@ -96,6 +95,7 @@ public class Event extends XenAPIObject { * Represents all the fields in a Event */ public static class Record implements Types.Record { + @Override public String toString() { StringWriter writer = new StringWriter(); PrintWriter print = new PrintWriter(writer); @@ -112,6 +112,7 @@ public class Event extends XenAPIObject { /** * Convert a event.Record to a Map */ + @Override public Map toMap() { Map map = new HashMap(); map.put("id", this.id == null ? 0 : this.id); diff --git a/server/src/com/cloud/agent/manager/AgentAttache.java b/server/src/com/cloud/agent/manager/AgentAttache.java index 1c631d9837d..12934b18ece 100644 --- a/server/src/com/cloud/agent/manager/AgentAttache.java +++ b/server/src/com/cloud/agent/manager/AgentAttache.java @@ -237,6 +237,10 @@ public abstract class AgentAttache { public int getQueueSize() { return _requests.size(); } + + public int getListenersSize() { + return _waitForList.size(); + } public boolean processAnswers(final long seq, final Response resp) { resp.logD("Processing: ", true); diff --git a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java index 91edfc2fb21..5e9f5f503ad 100644 --- a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java +++ b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java @@ -11,11 +11,19 @@ import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import javax.ejb.Local; import javax.naming.ConfigurationException; @@ -23,29 +31,40 @@ import javax.naming.ConfigurationException; import org.apache.log4j.Logger; import com.cloud.agent.AgentManager; +import com.cloud.agent.api.Answer; import com.cloud.agent.api.CancelCommand; import com.cloud.agent.api.ChangeAgentCommand; import com.cloud.agent.api.Command; +import com.cloud.agent.api.TransferAgentCommand; import com.cloud.agent.transport.Request; import com.cloud.agent.transport.Request.Version; import com.cloud.agent.transport.Response; import com.cloud.api.commands.UpdateHostPasswordCmd; import com.cloud.cluster.ClusterManager; import com.cloud.cluster.ClusterManagerListener; +import com.cloud.cluster.ClusteredAgentRebalanceService; import com.cloud.cluster.ManagementServerHostVO; +import com.cloud.cluster.agentlb.AgentLoadBalancerPlanner; +import com.cloud.cluster.agentlb.HostTransferMapVO; +import com.cloud.cluster.agentlb.HostTransferMapVO.HostTransferState; +import com.cloud.cluster.agentlb.dao.HostTransferMapDao; import com.cloud.cluster.dao.ManagementServerHostDao; import com.cloud.configuration.Config; import com.cloud.configuration.dao.ConfigurationDao; import com.cloud.exception.AgentUnavailableException; +import com.cloud.exception.OperationTimedoutException; import com.cloud.host.HostVO; import com.cloud.host.Status; import com.cloud.host.Status.Event; import com.cloud.resource.ServerResource; import com.cloud.storage.resource.DummySecondaryStorageResource; import com.cloud.user.User; +import com.cloud.utils.DateUtil; import com.cloud.utils.NumbersUtil; +import com.cloud.utils.component.Adapters; import com.cloud.utils.component.ComponentLocator; import com.cloud.utils.component.Inject; +import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.db.DB; import com.cloud.utils.db.GlobalLock; import com.cloud.utils.db.Transaction; @@ -53,22 +72,31 @@ import com.cloud.utils.exception.CloudRuntimeException; import com.cloud.utils.nio.Link; import com.cloud.utils.nio.Task; -@Local(value = { AgentManager.class }) -public class ClusteredAgentManagerImpl extends AgentManagerImpl implements ClusterManagerListener { +@Local(value = { AgentManager.class, ClusteredAgentRebalanceService.class }) +public class ClusteredAgentManagerImpl extends AgentManagerImpl implements ClusterManagerListener, ClusteredAgentRebalanceService { final static Logger s_logger = Logger.getLogger(ClusteredAgentManagerImpl.class); + private static final ScheduledExecutorService s_transferExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-AgentTransferExecutor")); public final static long STARTUP_DELAY = 5000; - public final static long SCAN_INTERVAL = 90000; // 90 seconds, it takes 60 sec for xenserver to fail login - public final static int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds + public final static long SCAN_INTERVAL = 90000; // 90 seconds, it takes 60 sec for xenserver to fail login + public final static int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds public long _loadSize = 100; + protected Set _agentToTransferIds = new HashSet(); + private final long rebalanceTimeOut = 300000; // 5 mins - after this time remove the agent from the transfer list - @Inject protected ClusterManager _clusterMgr = null; + @Inject + protected ClusterManager _clusterMgr = null; protected HashMap _peers; private final Timer _timer = new Timer("ClusteredAgentManager Timer"); @Inject protected ManagementServerHostDao _mshostDao; + @Inject + protected HostTransferMapDao _hostTransferDao; + + @Inject(adapter = AgentLoadBalancerPlanner.class) + protected Adapters _lbPlanners; protected ClusteredAgentManagerImpl() { super(); @@ -97,6 +125,13 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust return false; } _timer.schedule(new DirectAgentScanTimerTask(), STARTUP_DELAY, SCAN_INTERVAL); + + // schedule transfer scan executor - if agent LB is enabled + if (_clusterMgr.isAgentRebalanceEnabled()) { + s_transferExecutor.scheduleAtFixedRate(getTransferScanTask(), ClusteredAgentRebalanceService.DEFAULT_TRANSFER_CHECK_INTERVAL, ClusteredAgentRebalanceService.DEFAULT_TRANSFER_CHECK_INTERVAL, + TimeUnit.MILLISECONDS); + } + return true; } @@ -121,20 +156,20 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } // for agents that are self-managed, threshold to be considered as disconnected is 3 ping intervals - long cutSeconds = (System.currentTimeMillis() >> 10) - (_pingInterval*3); - List hosts = _hostDao.findDirectAgentToLoad(_clusterMgr.getManagementNodeId(), cutSeconds, _loadSize); - if ( hosts != null && hosts.size() == _loadSize ) { - Long clusterId = hosts.get((int)(_loadSize-1)).getClusterId(); - if ( clusterId != null) { - for ( int i = (int)(_loadSize-1); i > 0; i-- ) { - if ( hosts.get(i).getClusterId() == clusterId ) { + long cutSeconds = (System.currentTimeMillis() >> 10) - (_pingInterval * 3); + List hosts = _hostDao.findDirectAgentToLoad(cutSeconds, _loadSize); + if (hosts != null && hosts.size() == _loadSize) { + Long clusterId = hosts.get((int) (_loadSize - 1)).getClusterId(); + if (clusterId != null) { + for (int i = (int) (_loadSize - 1); i > 0; i--) { + if (hosts.get(i).getClusterId() == clusterId) { hosts.remove(i); } else { break; } } } - } + } if (hosts != null && hosts.size() > 0) { for (HostVO host : hosts) { try { @@ -318,11 +353,11 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust return super.deleteHost(hostId, isForced, caller); } - + @Override public boolean updateHostPassword(UpdateHostPasswordCmd upasscmd) { if (upasscmd.getClusterId() == null) { - //update agent attache password + // update agent attache password try { Boolean result = _clusterMgr.propagateAgentEvent(upasscmd.getHostId(), Event.UpdatePassword); if (result != null) { @@ -330,8 +365,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } } catch (AgentUnavailableException e) { } - } - else { + } else { // get agents for the cluster List hosts = _hostDao.listByCluster(upasscmd.getClusterId()); for (HostVO h : hosts) { @@ -343,7 +377,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } catch (AgentUnavailableException e) { } } - } + } return super.updateHostPassword(upasscmd); } @@ -507,6 +541,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust } } _timer.cancel(); + s_transferExecutor.shutdownNow(); return super.stop(); } @@ -586,6 +621,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust cancel(Long.toString(Request.getManagementServerId(data)), hostId, Request.getSequence(data), e.getMessage()); } } else { + long mgmtId = Request.getManagementServerId(data); if (mgmtId != -1 && mgmtId != _nodeId) { routeToPeer(Long.toString(mgmtId), data); @@ -624,7 +660,6 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust @Override public void onManagementNodeJoined(List nodeList, long selfNodeId) { - } @Override @@ -638,4 +673,264 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust @Override public void onManagementNodeIsolated() { } + + @Override + public void removeAgent(AgentAttache attache, Status nextState) { + if (attache == null) { + return; + } + + super.removeAgent(attache, nextState); + } + + @Override + public boolean executeRebalanceRequest(long agentId, Event event) throws AgentUnavailableException, OperationTimedoutException { + if (event == Event.RequestAgentRebalance) { + return setToWaitForRebalance(agentId); + } else if (event == Event.StartAgentRebalance) { + return rebalanceHost(agentId); + } + + return true; + } + + @Override + public void startRebalanceAgents() { + Date cutTime = DateUtil.currentGMTTime(); + List activeNodes = _mshostDao.getActiveList(new Date(cutTime.getTime() - ClusterManager.DEFAULT_HEARTBEAT_THRESHOLD)); + List allManagedAgents = _hostDao.listManagedAgents(); + + long avLoad = 0L; + + if (!allManagedAgents.isEmpty() && !activeNodes.isEmpty()) { + avLoad = allManagedAgents.size() / activeNodes.size(); + } else { + return; + } + + for (ManagementServerHostVO node : activeNodes) { + if (node.getMsid() != _nodeId) { + + List hostsToRebalance = new ArrayList(); + for (AgentLoadBalancerPlanner lbPlanner : _lbPlanners) { + hostsToRebalance = lbPlanner.getHostsToRebalance(node.getMsid(), avLoad); + if (!hostsToRebalance.isEmpty()) { + break; + } + } + + if (!hostsToRebalance.isEmpty()) { + //TODO - execute rebalance for every host; right now we are doing it for (0) host just for testing + for (HostVO host : hostsToRebalance) { + long hostId = host.getId(); + s_logger.debug("Asking management server " + node.getMsid() + " to give away host id=" + hostId); + boolean result = true; + HostTransferMapVO transfer = _hostTransferDao.startAgentTransfering(hostId, _nodeId, node.getMsid()); + try { + Answer[] answer = sendRebalanceCommand(hostId, _nodeId, Event.RequestAgentRebalance); + if (answer == null) { + s_logger.warn("Failed to get host id=" + hostId + " from management server " + node.getMsid()); + result = false; + } + } catch (Exception ex) { + s_logger.warn("Failed to get host id=" + hostId + " from management server " + node.getMsid(), ex); + result = false; + } finally { + HostTransferMapVO updatedTransfer = _hostTransferDao.findById(transfer.getId()); + if (!result && updatedTransfer.getState() == HostTransferState.TransferRequested) { + if (s_logger.isDebugEnabled()) { + s_logger.debug("Removing mapping from op_host_transfer as it failed to be set to transfer mode"); + } + //just remove the mapping as nothing was done on the peer management server yet + _hostTransferDao.remove(transfer.getId()); + } + } + } + } + } + } + } + + private Answer[] sendRebalanceCommand(long agentId, long peer, Event event) { + TransferAgentCommand transfer = new TransferAgentCommand(agentId, peer, event); + Commands commands = new Commands(OnError.Stop); + commands.addCommand(transfer); + + Command[] cmds = commands.toCommands(); + + String peerName = Long.toString(peer); + + try { + if (s_logger.isDebugEnabled()) { + s_logger.debug("Forwarding " + cmds[0].toString() + " to " + peer); + } + Answer[] answers = _clusterMgr.execute(peerName, agentId, cmds, true); + return answers; + } catch (Exception e) { + s_logger.warn("Caught exception while talking to " + peer, e); + return null; + } + } + + private Runnable getTransferScanTask() { + return new Runnable() { + @Override + public void run() { + try { + // TODO - change to trace level later on + if (s_logger.isDebugEnabled()) { + s_logger.debug("Clustered agent transfer scan check, management server id:" + _nodeId); + } + + if (_agentToTransferIds.size() > 0) { + s_logger.debug("Found " + _agentToTransferIds.size() + " agents to transfer"); + for (Long hostId : _agentToTransferIds) { + AgentAttache attache = findAttache(hostId); + if (attache.getQueueSize() == 0 && attache.getListenersSize() == 0) { + boolean result = true; + _agentToTransferIds.remove(hostId); + try { + result = rebalanceHost(hostId); + } finally { + if (result) { + finishRebalance(hostId, Event.RebalanceCompleted); + } else { + finishRebalance(hostId, Event.RebalanceFailed); + } + } + } else { + // if we timed out waiting for the host to reconnect, remove host from rebalance list and mark it as failed to rebalance + // no need to do anything with the real attache + Date cutTime = DateUtil.currentGMTTime(); + if (!(_hostTransferDao.isActive(hostId, new Date(cutTime.getTime() - rebalanceTimeOut)))) { + s_logger.debug("Timed out waiting for the host id=" + hostId + " to be ready to transfer, failing rebalance for this host"); + _agentToTransferIds.remove(hostId); + HostTransferMapVO transferMap = _hostTransferDao.findById(hostId); + transferMap.setState(HostTransferState.TransferFailed); + _hostTransferDao.update(hostId, transferMap); + } + } + } + } else { + // TODO - change to trace level later on + if (s_logger.isDebugEnabled()) { + s_logger.debug("Found no agents to be transfered by the management server " + _nodeId); + } + } + + } catch (Throwable e) { + s_logger.error("Problem with the clustered agent transfer scan check!", e); + } + } + }; + } + + + private boolean setToWaitForRebalance(final long hostId) { + s_logger.debug("Adding agent " + hostId + " to the list of agents to transfer"); + synchronized (_agentToTransferIds) { + return _agentToTransferIds.add(hostId); + } + } + + + private boolean rebalanceHost(final long hostId) { + HostTransferMapVO map = _hostTransferDao.findById(hostId); + HostVO host = _hostDao.findById(hostId); + + boolean result = true; + if (map.getInitialOwner() == _nodeId) { + ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache)findAttache(hostId); + + if (attache != null && !attache.getTransferMode()) { + attache.setTransferMode(true); + s_logger.debug("Putting agent id=" + hostId + " to transfer mode"); + _agents.put(hostId, attache); + if (host != null && host.getRemoved() == null) { + host.setManagementServerId(null); + s_logger.debug("Updating host id=" + hostId + " with the status " + Status.Rebalance); + _hostDao.updateStatus(host, Event.StartAgentRebalance, _nodeId); + } + + try { + Answer[] answer = sendRebalanceCommand(hostId, map.getFutureOwner(), Event.StartAgentRebalance); + if (answer == null) { + s_logger.warn("Host " + hostId + " failed to connect to the management server " + map.getFutureOwner() + " as a part of rebalance process"); + result = false; + } + } catch (Exception ex) { + s_logger.warn("Host " + hostId + " failed to connect to the management server " + map.getFutureOwner() + " as a part of rebalance process", ex); + result = false; + } + if (result) { + s_logger.debug("Got host id=" + hostId + " from management server " + map.getFutureOwner()); + } + + } + } else if (map.getFutureOwner() == _nodeId) { + try { + if (s_logger.isDebugEnabled()) { + s_logger.debug("Loading directly connected host " + host.getId() + "(" + host.getName() + ") as a part of rebalance process"); + } + //TODO - 1) no need to do vmfullSync/storageSetup on the agent side 2) Make sure that if connection fails, host goes from Rebalance state to Alert + loadDirectlyConnectedHost(host); + } catch (Exception ex) { + s_logger.warn("Unable to load directly connected host " + host.getId() + " as a part of rebalance due to exception: ", ex); + } + } + + return result; + + } + + private boolean finishRebalance(final long hostId, Event event) { + HostTransferMapVO map = _hostTransferDao.findById(hostId); + AgentAttache attache = findAttache(hostId); + + if (attache == null) { + s_logger.debug("Unable to find attache for the host id=" + hostId + ", assuming that the agent disconnected already"); + HostTransferState state = (event == Event.RebalanceCompleted) ? HostTransferState.TransferCompleted : HostTransferState.TransferFailed; + map.setState(state); + _hostTransferDao.update(hostId, map); + return true; + } + + if (map.getInitialOwner() != _nodeId) { + s_logger.warn("Why finish rebalance called not by initial host owner???"); + return false; + } + + if (s_logger.isDebugEnabled()) { + s_logger.debug("Finishing rebalancing for the host id=" + hostId); + } + + if (event == Event.RebalanceFailed) { + ((ClusteredDirectAgentAttache) attache).setTransferMode(false); + s_logger.debug("Rebalance failed for the host id=" + hostId); + map.setState(HostTransferState.TransferFailed); + _hostTransferDao.update(hostId, map); + } else if (event == Event.RebalanceCompleted) { + + //1) Get all the requests remove transfer attache + LinkedList requests = ((ClusteredDirectAgentAttache) attache).getRequests(); + removeAgent(attache, Status.Rebalance); + + //2) Create forward attache + createAttache(hostId); + + //3) forward all the requests to the management server which owns the host now + if (!requests.isEmpty()) { + for (Request request : requests) { + routeToPeer(Long.toString(map.getFutureOwner()), request.getBytes()); + } + } + + map.setState(HostTransferState.TransferCompleted); + _hostTransferDao.update(hostId, map); + + return true; + + } + return true; + } } diff --git a/server/src/com/cloud/agent/manager/ClusteredDirectAgentAttache.java b/server/src/com/cloud/agent/manager/ClusteredDirectAgentAttache.java index 6b8c38f0328..58ef0ef73f2 100644 --- a/server/src/com/cloud/agent/manager/ClusteredDirectAgentAttache.java +++ b/server/src/com/cloud/agent/manager/ClusteredDirectAgentAttache.java @@ -17,7 +17,13 @@ */ package com.cloud.agent.manager; +import java.util.LinkedList; + +import org.apache.log4j.Logger; + import com.cloud.agent.AgentManager; +import com.cloud.agent.Listener; +import com.cloud.agent.api.Command; import com.cloud.agent.transport.Request; import com.cloud.agent.transport.Response; import com.cloud.exception.AgentUnavailableException; @@ -26,15 +32,36 @@ import com.cloud.resource.ServerResource; import com.cloud.utils.exception.CloudRuntimeException; public class ClusteredDirectAgentAttache extends DirectAgentAttache implements Routable { + private final static Logger s_logger = Logger.getLogger(ClusteredDirectAgentAttache.class); private final ClusteredAgentManagerImpl _mgr; private final long _nodeId; - + private boolean _transferMode = false; + public ClusteredDirectAgentAttache(AgentManager agentMgr, long id, long mgmtId, ServerResource resource, boolean maintenance, ClusteredAgentManagerImpl mgr) { super(agentMgr, id, resource, maintenance, mgr); _mgr = mgr; _nodeId = mgmtId; } - + + public synchronized void setTransferMode(final boolean transfer) { + _transferMode = transfer; + } + + @Override + protected void checkAvailability(final Command[] cmds) throws AgentUnavailableException { + + if (_transferMode) { + // need to throw some other exception while agent is in rebalancing mode + for (final Command cmd : cmds) { + if (!cmd.allowCaching()) { + throw new AgentUnavailableException("Unable to send " + cmd.getClass().toString() + " because agent is in Rebalancing mode", _id); + } + } + } + + super.checkAvailability(cmds); + } + @Override public void routeToAgent(byte[] data) throws AgentUnavailableException { Request req; @@ -45,14 +72,14 @@ public class ClusteredDirectAgentAttache extends DirectAgentAttache implements R } catch (UnsupportedVersionException e) { throw new CloudRuntimeException("Unable to rout to an agent ", e); } - + if (req instanceof Response) { - super.process(((Response)req).getAnswers()); + super.process(((Response) req).getAnswers()); } else { super.send(req); } } - + @Override public boolean processAnswers(long seq, Response response) { long mgmtId = response.getManagementServerId(); @@ -66,4 +93,37 @@ public class ClusteredDirectAgentAttache extends DirectAgentAttache implements R return super.processAnswers(seq, response); } } + + @Override + public void send(Request req, final Listener listener) throws AgentUnavailableException { + checkAvailability(req.getCommands()); + + if (_transferMode) { + long seq = req.getSequence(); + + if (s_logger.isDebugEnabled()) { + s_logger.debug(log(seq, "Holding request as the corresponding agent is in transfer mode: ")); + } + + synchronized (this) { + addRequest(req); + return; + } + } else { + super.send(req, listener); + } + } + + public boolean getTransferMode() { + return _transferMode; + } + + public LinkedList getRequests() { + if (_transferMode) { + return _requests; + } else { + return null; + } + } + } diff --git a/server/src/com/cloud/cluster/ClusterManager.java b/server/src/com/cloud/cluster/ClusterManager.java index 69c2f19f663..c400558a4c1 100644 --- a/server/src/com/cloud/cluster/ClusterManager.java +++ b/server/src/com/cloud/cluster/ClusterManager.java @@ -61,5 +61,9 @@ public interface ClusterManager extends Manager { * @param agentId agent id this broadcast is regarding * @param cmds commands to broadcast */ - public void broadcast(long agentId, Command[] cmds); + public void broadcast(long agentId, Command[] cmds); + + boolean rebalanceAgent(long agentId, Event event) throws AgentUnavailableException, OperationTimedoutException; + + boolean isAgentRebalanceEnabled(); } diff --git a/server/src/com/cloud/cluster/ClusterManagerImpl.java b/server/src/com/cloud/cluster/ClusterManagerImpl.java index 1ce22e14125..257fd074288 100644 --- a/server/src/com/cloud/cluster/ClusterManagerImpl.java +++ b/server/src/com/cloud/cluster/ClusterManagerImpl.java @@ -50,7 +50,12 @@ import com.cloud.agent.api.Answer; import com.cloud.agent.api.ChangeAgentCommand; import com.cloud.agent.api.Command; import com.cloud.agent.manager.Commands; +import com.cloud.cluster.ManagementServerHost.State; +import com.cloud.cluster.agentlb.HostTransferMapVO; +import com.cloud.cluster.agentlb.HostTransferMapVO.HostTransferState; +import com.cloud.cluster.agentlb.dao.HostTransferMapDao; import com.cloud.cluster.dao.ManagementServerHostDao; +import com.cloud.configuration.Config; import com.cloud.configuration.dao.ConfigurationDao; import com.cloud.exception.AgentUnavailableException; import com.cloud.exception.OperationTimedoutException; @@ -64,6 +69,7 @@ import com.cloud.utils.Profiler; import com.cloud.utils.PropertiesUtil; import com.cloud.utils.component.Adapters; import com.cloud.utils.component.ComponentLocator; +import com.cloud.utils.component.Inject; import com.cloud.utils.concurrency.NamedThreadFactory; import com.cloud.utils.db.DB; import com.cloud.utils.db.Transaction; @@ -74,11 +80,13 @@ import com.cloud.utils.mgmt.JmxUtil; import com.cloud.utils.net.NetUtils; import com.google.gson.Gson; -@Local(value={ClusterManager.class}) +@Local(value = { ClusterManager.class }) public class ClusterManagerImpl implements ClusterManager { private static final Logger s_logger = Logger.getLogger(ClusterManagerImpl.class); - private static final int EXECUTOR_SHUTDOWN_TIMEOUT = 1000; // 1 second + + private static final int EXECUTOR_SHUTDOWN_TIMEOUT = 1000; // 1 second + private final List listeners = new ArrayList(); private final Map activePeers = new HashMap(); @@ -90,11 +98,12 @@ public class ClusterManagerImpl implements ClusterManager { private final Gson gson; private AgentManager _agentMgr; + @Inject + private ClusteredAgentRebalanceService _rebalanceService; private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-Heartbeat")); - private final ExecutorService _notificationExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("Cluster-Notification")); - private List _notificationMsgs = new ArrayList(); + private final List _notificationMsgs = new ArrayList(); private Connection _heartbeatConnection = null; private final ExecutorService _executor; @@ -103,6 +112,7 @@ public class ClusterManagerImpl implements ClusterManager { private ManagementServerHostDao _mshostDao; private HostDao _hostDao; + private HostTransferMapDao _hostTransferDao; // // pay attention to _mshostId and _msid @@ -110,13 +120,16 @@ public class ClusterManagerImpl implements ClusterManager { // _msid is the unique persistent identifier that peer name is based upon // private Long _mshostId = null; - protected long _msid = ManagementServerNode.getManagementServerId(); + protected long _msId = ManagementServerNode.getManagementServerId(); protected long _runId = System.currentTimeMillis(); private boolean _peerScanInited = false; private String _name; private String _clusterNodeIP = "127.0.0.1"; + private boolean _agentLBEnabled = false; + private State _state = State.Starting; + private final Object stateLock = new Object(); public ClusterManagerImpl() { clusterPeers = new HashMap(); @@ -131,20 +144,18 @@ public class ClusterManagerImpl implements ClusterManager { } @Override - public Answer[] sendToAgent(Long hostId, Command [] cmds, boolean stopOnError) - throws AgentUnavailableException, OperationTimedoutException { + public Answer[] sendToAgent(Long hostId, Command[] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException { Commands commands = new Commands(stopOnError ? OnError.Stop : OnError.Continue); - for (Command cmd : cmds) { + for (Command cmd : cmds) { commands.addCommand(cmd); } return _agentMgr.send(hostId, commands); } @Override - public long sendToAgent(Long hostId, Command[] cmds, boolean stopOnError, Listener listener) - throws AgentUnavailableException { + public long sendToAgent(Long hostId, Command[] cmds, boolean stopOnError, Listener listener) throws AgentUnavailableException { Commands commands = new Commands(stopOnError ? OnError.Stop : OnError.Continue); - for (Command cmd : cmds) { + for (Command cmd : cmds) { commands.addCommand(cmd); } return _agentMgr.send(hostId, commands, listener); @@ -163,7 +174,7 @@ public class ClusterManagerImpl implements ClusterManager { } if (s_logger.isDebugEnabled()) { - s_logger.debug("Propagating agent change request event:" + event.toString() + " to agent:"+ agentId); + s_logger.debug("Propagating agent change request event:" + event.toString() + " to agent:" + agentId); } Command[] cmds = new Command[1]; cmds[0] = new ChangeAgentCommand(agentId, event); @@ -182,13 +193,14 @@ public class ClusterManagerImpl implements ClusterManager { /** * called by DatabaseUpgradeChecker to see if there are other peers running. - * @param notVersion If version is passed in, the peers CANNOT be running at this - * version. If version is null, return true if any peer is - * running regardless of version. + * + * @param notVersion + * If version is passed in, the peers CANNOT be running at this version. If version is null, return true if any + * peer is running regardless of version. * @return true if there are peers running and false if not. */ public static final boolean arePeersRunning(String notVersion) { - return false; //TODO: Leaving this for Kelven to take care of. + return false; // TODO: Leaving this for Kelven to take care of. } @Override @@ -199,7 +211,7 @@ public class ClusterManagerImpl implements ClusterManager { for (ManagementServerHostVO peer : peers) { String peerName = Long.toString(peer.getMsid()); if (getSelfPeerName().equals(peerName)) { - continue; // Skip myself. + continue; // Skip myself. } try { if (s_logger.isDebugEnabled()) { @@ -248,11 +260,9 @@ public class ClusterManagerImpl implements ClusterManager { s_logger.error("Exception on parsing gson package from remote call to " + strPeer); } } - return null; } catch (RemoteException e) { invalidatePeerService(strPeer); - if(s_logger.isInfoEnabled()) { s_logger.info("Exception on remote execution, peer: " + strPeer + ", iteration: " + i + ", exception message :" + e.getMessage()); @@ -266,7 +276,6 @@ public class ClusterManagerImpl implements ClusterManager { @Override public long executeAsync(String strPeer, long agentId, Command[] cmds, boolean stopOnError, Listener listener) { - ClusterService peerService = null; if(s_logger.isDebugEnabled()) { @@ -280,7 +289,6 @@ public class ClusterManagerImpl implements ClusterManager { } catch (RemoteException e) { s_logger.error("Unable to get cluster service on peer : " + strPeer); } - if(peerService != null) { try { long seq = 0; @@ -291,7 +299,6 @@ public class ClusterManagerImpl implements ClusterManager { long startTick = System.currentTimeMillis(); seq = peerService.executeAsync(getSelfPeerName(), agentId, gson.toJson(cmds, Command[].class), stopOnError); - if(seq > 0) { if(s_logger.isDebugEnabled()) { s_logger.debug("Completed Async " + getSelfPeerName() + " -> " + strPeer + "." + agentId @@ -321,7 +328,6 @@ public class ClusterManagerImpl implements ClusterManager { @Override public boolean onAsyncResult(String executingPeer, long agentId, long seq, Answer[] answers) { - if(s_logger.isDebugEnabled()) { s_logger.debug("Process Async-call result from remote peer " + executingPeer + ", {" + agentId + "-" + seq + "} answers: " + (answers != null ? gson.toJson(answers, Answer[].class): "null")); @@ -373,7 +379,6 @@ public class ClusterManagerImpl implements ClusterManager { @Override public boolean forwardAnswer(String targetPeer, long agentId, long seq, Answer[] answers) { - if(s_logger.isDebugEnabled()) { s_logger.debug("Forward -> " + targetPeer + " Async-call answer {" + agentId + "-" + seq + "} " + (answers != null? gson.toJson(answers, Answer[].class):"")); @@ -444,7 +449,7 @@ public class ClusterManagerImpl implements ClusterManager { @Override public String getSelfPeerName() { - return Long.toString(_msid); + return Long.toString(_msId); } @Override @@ -455,15 +460,13 @@ public class ClusterManagerImpl implements ClusterManager { @Override public void registerListener(ClusterManagerListener listener) { // Note : we don't check duplicates - synchronized(listeners) { - listeners.add(listener); + synchronized (listeners) { } } @Override public void unregisterListener(ClusterManagerListener listener) { synchronized(listeners) { - listeners.remove(listener); } } @@ -571,11 +574,22 @@ public class ClusterManagerImpl implements ClusterManager { Connection conn = getHeartbeatConnection(); _mshostDao.update(conn, _mshostId, getCurrentRunId(), DateUtil.currentGMTTime()); - if(s_logger.isTraceEnabled()) { + // for cluster in Starting state check if there are any agents being transfered + if (_state == State.Starting) { + synchronized (stateLock) { + if (isClusterReadyToStart()) { + _mshostDao.update(conn, _mshostId, getCurrentRunId(), State.Up, DateUtil.currentGMTTime()); + _state = State.Up; + stateLock.notifyAll(); + } + } + } + + if (s_logger.isTraceEnabled()) { s_logger.trace("Cluster manager peer-scan, id:" + _mshostId); } - if(!_peerScanInited) { + if (!_peerScanInited) { _peerScanInited = true; initPeerScan(conn); } @@ -604,14 +618,41 @@ public class ClusterManagerImpl implements ClusterManager { s_logger.error("Problem with the cluster heartbeat!", e); } } - }; + + private boolean isClusterReadyToStart() { + boolean isReady = false; + int transferCount = _hostTransferDao.listHostsJoiningCluster(_msId).size(); + if (transferCount == 0) { + //Check how many servers got transfered successfully + List rebalancedHosts = _hostTransferDao.listBy(_msId, HostTransferState.TransferCompleted); + s_logger.debug(rebalancedHosts.size() + " hosts joined the cluster " + _msId + " as a result of rebalance process"); + for (HostTransferMapVO host : rebalancedHosts) { + _hostTransferDao.remove(host.getId()); + } + + //Check how many servers failed to transfer + List failedToRebalanceHosts = _hostTransferDao.listBy(_msId, HostTransferState.TransferFailed); + s_logger.debug(failedToRebalanceHosts.size() + " hosts failed to join the cluster " + _msId + " as a result of rebalance process"); + for (HostTransferMapVO host : failedToRebalanceHosts) { + _hostTransferDao.remove(host.getId()); + } + + s_logger.debug("There are no hosts currently joining cluser msid=" + _msId + ", so management server is ready to start"); + isReady = true; + } else if (s_logger.isDebugEnabled()) { + //TODO : change to trace mode later + s_logger.debug("There are " + transferCount + " agents currently joinging the cluster " + _msId); + } + + return isReady; + } + }; } private boolean isRootCauseConnectionRelated(Throwable e) { - while(e != null) { - if(e instanceof com.mysql.jdbc.CommunicationsException || e instanceof com.mysql.jdbc.exceptions.jdbc4.CommunicationsException) { + while (e != null) { + if (e instanceof com.mysql.jdbc.CommunicationsException || e instanceof com.mysql.jdbc.exceptions.jdbc4.CommunicationsException) return true; - } e = e.getCause(); } @@ -857,7 +898,7 @@ public class ClusterManagerImpl implements ClusterManager { @Override @DB public boolean start() { if(s_logger.isInfoEnabled()) { - s_logger.info("Starting cluster manager, msid : " + _msid); + s_logger.info("Starting cluster manager, msid : " + _msId); } Transaction txn = Transaction.currentTxn(); @@ -867,10 +908,10 @@ public class ClusterManagerImpl implements ClusterManager { final Class c = this.getClass(); String version = c.getPackage().getImplementationVersion(); - ManagementServerHostVO mshost = _mshostDao.findByMsid(_msid); - if(mshost == null) { + ManagementServerHostVO mshost = _mshostDao.findByMsid(_msId); + if (mshost == null) { mshost = new ManagementServerHostVO(); - mshost.setMsid(_msid); + mshost.setMsid(_msId); mshost.setRunid(this.getCurrentRunId()); mshost.setName(NetUtils.getHostName()); mshost.setVersion(version); @@ -879,32 +920,49 @@ public class ClusterManagerImpl implements ClusterManager { mshost.setLastUpdateTime(DateUtil.currentGMTTime()); mshost.setRemoved(null); mshost.setAlertCount(0); - mshost.setState(ManagementServerNode.State.Up); + mshost.setState(ManagementServerHost.State.Starting); _mshostDao.persist(mshost); - if(s_logger.isInfoEnabled()) { - s_logger.info("New instance of management server msid " + _msid + " is being started"); + if (s_logger.isInfoEnabled()) { + s_logger.info("New instance of management server msid " + _msId + " is being started"); } } else { - if(s_logger.isInfoEnabled()) { - s_logger.info("Management server " + _msid + " is being started"); + if (s_logger.isInfoEnabled()) { + s_logger.info("Management server " + _msId + " is being started"); } - _mshostDao.update(mshost.getId(), getCurrentRunId(), NetUtils.getHostName(), version, - _clusterNodeIP, _currentServiceAdapter.getServicePort(), DateUtil.currentGMTTime()); + _mshostDao.update(mshost.getId(), getCurrentRunId(), NetUtils.getHostName(), version, _clusterNodeIP, _currentServiceAdapter.getServicePort(), DateUtil.currentGMTTime()); } txn.commit(); _mshostId = mshost.getId(); - if(s_logger.isInfoEnabled()) { - s_logger.info("Management server (host id : " + _mshostId + ") is available at " + _clusterNodeIP + ":" + _currentServiceAdapter.getServicePort()); + if (s_logger.isInfoEnabled()) { + s_logger.info("Management server (host id : " + _mshostId + ") is being started at " + _clusterNodeIP + ":" + _currentServiceAdapter.getServicePort()); + } + + // use seperate thread for heartbeat updates + _heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS); + _notificationExecutor.submit(getNotificationTask()); + + // Do agent rebalancing + if (_agentLBEnabled) { + s_logger.debug("Management server " + _msId + " is asking other peers to rebalance their agents"); + _rebalanceService.startRebalanceAgents(); } - // use seperated thread for heartbeat updates - _heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), heartbeatInterval, - heartbeatInterval, TimeUnit.MILLISECONDS); - _notificationExecutor.submit(getNotificationTask()); + //wait here for heartbeat task to update the host state + try { + synchronized (stateLock) { + while (_state != State.Up) { + stateLock.wait(); + } + } + } catch (final InterruptedException e) { + } finally { + s_logger.debug("Agent rebalancing is completed, management server " + _mshostId + " is ready"); + } + } catch (Throwable e) { s_logger.error("Unexpected exception : ", e); txn.rollback(); @@ -912,8 +970,8 @@ public class ClusterManagerImpl implements ClusterManager { throw new CloudRuntimeException("Unable to initialize cluster info into database"); } - if(s_logger.isInfoEnabled()) { - s_logger.info("Cluster manager is started"); + if (s_logger.isInfoEnabled()) { + s_logger.info("Cluster manager was started successfully"); } return true; @@ -955,15 +1013,20 @@ public class ClusterManagerImpl implements ClusterManager { } _mshostDao = locator.getDao(ManagementServerHostDao.class); - if(_mshostDao == null) { + if (_mshostDao == null) { throw new ConfigurationException("Unable to get " + ManagementServerHostDao.class.getName()); } _hostDao = locator.getDao(HostDao.class); - if(_hostDao == null) { + if (_hostDao == null) { throw new ConfigurationException("Unable to get " + HostDao.class.getName()); } + _hostTransferDao = locator.getDao(HostTransferMapDao.class); + if (_hostTransferDao == null) { + throw new ConfigurationException("Unable to get agent transfer map dao"); + } + ConfigurationDao configDao = locator.getDao(ConfigurationDao.class); if (configDao == null) { throw new ConfigurationException("Unable to get the configuration dao."); @@ -972,12 +1035,12 @@ public class ClusterManagerImpl implements ClusterManager { Map configs = configDao.getConfiguration("management-server", params); String value = configs.get("cluster.heartbeat.interval"); - if(value != null) { + if (value != null) { heartbeatInterval = NumbersUtil.parseInt(value, ClusterManager.DEFAULT_HEARTBEAT_INTERVAL); } value = configs.get("cluster.heartbeat.threshold"); - if(value != null) { + if (value != null) { heartbeatThreshold = NumbersUtil.parseInt(value, ClusterManager.DEFAULT_HEARTBEAT_THRESHOLD); } @@ -991,7 +1054,7 @@ public class ClusterManagerImpl implements ClusterManager { throw new ConfigurationException("Unable to load db.properties content"); } _clusterNodeIP = dbProps.getProperty("cluster.node.IP"); - if(_clusterNodeIP == null) { + if (_clusterNodeIP == null) { _clusterNodeIP = "127.0.0.1"; } _clusterNodeIP = _clusterNodeIP.trim(); @@ -1016,6 +1079,9 @@ public class ClusterManagerImpl implements ClusterManager { if(_currentServiceAdapter == null) { throw new ConfigurationException("Unable to set current cluster service adapter"); } + + + _agentLBEnabled = Boolean.valueOf(configDao.getValue(Config.AgentLbEnable.key())); checkConflicts(); @@ -1027,7 +1093,7 @@ public class ClusterManagerImpl implements ClusterManager { @Override public long getManagementNodeId() { - return _msid; + return _msId; } @Override @@ -1124,11 +1190,23 @@ public class ClusterManagerImpl implements ClusterManager { s_logger.error(msg); throw new ConfigurationException(msg); } else { - String msg = "Detected that another management node with the same IP " + peer.getServiceIP() + " is considered as running in DB, however it is not pingable, we will continue cluster initialization with this management server node"; + String msg = "Detected that another management node with the same IP " + peer.getServiceIP() + + " is considered as running in DB, however it is not pingable, we will continue cluster initialization with this management server node"; s_logger.info(msg); } } } } } + + @Override + public boolean rebalanceAgent(long agentId, Event event) throws AgentUnavailableException, OperationTimedoutException { + return _rebalanceService.executeRebalanceRequest(agentId, event); + } + + @Override + public boolean isAgentRebalanceEnabled() { + return _agentLBEnabled; + } + } diff --git a/server/src/com/cloud/cluster/ClusterManagerMessage.java b/server/src/com/cloud/cluster/ClusterManagerMessage.java index a1b2d0ec4e0..a2c599be8be 100644 --- a/server/src/com/cloud/cluster/ClusterManagerMessage.java +++ b/server/src/com/cloud/cluster/ClusterManagerMessage.java @@ -42,4 +42,5 @@ public class ClusterManagerMessage { public List getNodes() { return _nodes; } + } diff --git a/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java b/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java index 68737cd48fc..1347825c1f8 100644 --- a/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java +++ b/server/src/com/cloud/cluster/ClusterServiceServletHttpHandler.java @@ -17,7 +17,7 @@ */ package com.cloud.cluster; - + import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.URLDecoder; @@ -38,6 +38,7 @@ import com.cloud.agent.api.Answer; import com.cloud.agent.api.ChangeAgentAnswer; import com.cloud.agent.api.ChangeAgentCommand; import com.cloud.agent.api.Command; +import com.cloud.agent.api.TransferAgentCommand; import com.cloud.exception.AgentUnavailableException; import com.cloud.exception.OperationTimedoutException; import com.cloud.serializer.GsonHelper; @@ -204,6 +205,29 @@ public class ClusterServiceServletHttpHandler implements HttpRequestHandler { Answer[] answers = new Answer[1]; answers[0] = new ChangeAgentAnswer(cmd, result); return gson.toJson(answers); + } else if (cmds.length == 1 && cmds[0] instanceof TransferAgentCommand) { + TransferAgentCommand cmd = (TransferAgentCommand) cmds[0]; + + if (s_logger.isDebugEnabled()) { + s_logger.debug("Intercepting command for agent rebalancing: agent " + cmd.getAgentId() + " event: " + cmd.getEvent()); + } + boolean result = false; + try { + result = manager.rebalanceAgent(cmd.getAgentId(), cmd.getEvent()); + if (s_logger.isDebugEnabled()) { + s_logger.debug("Result is " + result); + } + + } catch (AgentUnavailableException e) { + s_logger.warn("Agent is unavailable", e); + return null; + } catch (OperationTimedoutException e) { + s_logger.warn("Operation timed out", e); + return null; + } + Answer[] answers = new Answer[1]; + answers[0] = new Answer(cmd, result, null); + return gson.toJson(answers); } try { diff --git a/server/src/com/cloud/cluster/ClusterServiceServletImpl.java b/server/src/com/cloud/cluster/ClusterServiceServletImpl.java index 9a4bbaed71a..af86457b9dc 100644 --- a/server/src/com/cloud/cluster/ClusterServiceServletImpl.java +++ b/server/src/com/cloud/cluster/ClusterServiceServletImpl.java @@ -35,10 +35,10 @@ public class ClusterServiceServletImpl implements ClusterService { private static final Logger s_logger = Logger.getLogger(ClusterServiceServletImpl.class); - private String serviceUrl; - - private Gson gson; + private String serviceUrl; + private final Gson gson; + public ClusterServiceServletImpl() { gson = GsonHelper.getGson(); } @@ -94,7 +94,7 @@ public class ClusterServiceServletImpl implements ClusterService { s_logger.error("Unable to parse executeAsync return : " + result); throw new RemoteException("Invalid result returned from async-execution on peer : " + serviceUrl); } - } + } @Override public boolean onAsyncResult(String executingPeer, long agentId, long seq, String gsonPackage) throws RemoteException { @@ -102,8 +102,7 @@ public class ClusterServiceServletImpl implements ClusterService { s_logger.debug("Forward Async-call answer to remote listener, agent: " + agentId + ", excutingPeer: " + executingPeer + ", seq: " + seq + ", gsonPackage: " + gsonPackage); - } - + } HttpClient client = new HttpClient(); PostMethod method = new PostMethod(serviceUrl); @@ -111,7 +110,7 @@ public class ClusterServiceServletImpl implements ClusterService { method.addParameter("agentId", Long.toString(agentId)); method.addParameter("gsonPackage", gsonPackage); method.addParameter("seq", Long.toString(seq)); - method.addParameter("executingPeer", executingPeer); + method.addParameter("executingPeer", executingPeer); String result = executePostMethod(client, method); if(result.contains("recurring=true")) { @@ -132,7 +131,7 @@ public class ClusterServiceServletImpl implements ClusterService { if(s_logger.isDebugEnabled()) { s_logger.debug("Ping at " + serviceUrl); } - + HttpClient client = new HttpClient(); PostMethod method = new PostMethod(serviceUrl); @@ -169,6 +168,7 @@ public class ClusterServiceServletImpl implements ClusterService { } catch(Throwable e) { s_logger.error("Exception from : " + serviceUrl + ", method : " + method.getParameter("method") + ", exception :", e); } + return result; } @@ -179,6 +179,6 @@ public class ClusterServiceServletImpl implements ClusterService { String result = service.execute("test", 1, "{ p1:v1, p2:v2 }", true); System.out.println(result); } catch (RemoteException e) { - } + } } } diff --git a/server/src/com/cloud/cluster/ClusteredAgentRebalanceService.java b/server/src/com/cloud/cluster/ClusteredAgentRebalanceService.java new file mode 100644 index 00000000000..a655101f9ae --- /dev/null +++ b/server/src/com/cloud/cluster/ClusteredAgentRebalanceService.java @@ -0,0 +1,14 @@ +package com.cloud.cluster; + +import com.cloud.exception.AgentUnavailableException; +import com.cloud.exception.OperationTimedoutException; +import com.cloud.host.Status.Event; + +public interface ClusteredAgentRebalanceService { + public static final int DEFAULT_TRANSFER_CHECK_INTERVAL = 10000; + + void startRebalanceAgents(); + + boolean executeRebalanceRequest(long agentId, Event event) throws AgentUnavailableException, OperationTimedoutException; + +} diff --git a/server/src/com/cloud/cluster/DummyClusterManagerImpl.java b/server/src/com/cloud/cluster/DummyClusterManagerImpl.java index 8d19dd0d56e..0844757e69d 100644 --- a/server/src/com/cloud/cluster/DummyClusterManagerImpl.java +++ b/server/src/com/cloud/cluster/DummyClusterManagerImpl.java @@ -44,47 +44,58 @@ public class DummyClusterManagerImpl implements ClusterManager { private String _name; private final String _clusterNodeIP = "127.0.0.1"; + @Override public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError) { throw new CloudRuntimeException("Unsupported feature"); } + @Override public long executeAsync(String strPeer, long agentId, Command[] cmds, boolean stopOnError, Listener listener) { throw new CloudRuntimeException("Unsupported feature"); } + @Override public boolean onAsyncResult(String executingPeer, long agentId, long seq, Answer[] answers) { throw new CloudRuntimeException("Unsupported feature"); } + @Override public boolean forwardAnswer(String targetPeer, long agentId, long seq, Answer[] answers) { throw new CloudRuntimeException("Unsupported feature"); } + @Override public Answer[] sendToAgent(Long hostId, Command [] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException { throw new CloudRuntimeException("Unsupported feature"); } + @Override public long sendToAgent(Long hostId, Command[] cmds, boolean stopOnError, Listener listener) throws AgentUnavailableException { throw new CloudRuntimeException("Unsupported feature"); } + @Override public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException { throw new CloudRuntimeException("Unsupported feature"); } + @Override public Boolean propagateAgentEvent(long agentId, Event event) throws AgentUnavailableException { throw new CloudRuntimeException("Unsupported feature"); } - public int getHeartbeatThreshold() { + @Override + public int getHeartbeatThreshold() { return ClusterManager.DEFAULT_HEARTBEAT_INTERVAL; } - public long getManagementNodeId() { + @Override + public long getManagementNodeId() { return _id; } + @Override public long getCurrentRunId() { return _runId; } @@ -94,30 +105,37 @@ public class DummyClusterManagerImpl implements ClusterManager { return null; } - public String getSelfPeerName() { + @Override + public String getSelfPeerName() { return Long.toString(_id); } - public String getSelfNodeIP() { + @Override + public String getSelfNodeIP() { return _clusterNodeIP; } + @Override public boolean isManagementNodeAlive(long msid) { return true; } + @Override public boolean pingManagementNode(long msid) { return false; } + @Override public String getPeerName(long agentHostId) { throw new CloudRuntimeException("Unsupported feature"); } - public void registerListener(ClusterManagerListener listener) { + @Override + public void registerListener(ClusterManagerListener listener) { } - public void unregisterListener(ClusterManagerListener listener) { + @Override + public void unregisterListener(ClusterManagerListener listener) { } @Override @@ -146,5 +164,15 @@ public class DummyClusterManagerImpl implements ClusterManager { @Override public boolean stop() { return true; - } + } + + @Override + public boolean rebalanceAgent(long agentId, Event event) throws AgentUnavailableException, OperationTimedoutException { + return false; + } + + @Override + public boolean isAgentRebalanceEnabled() { + return false; + } } diff --git a/server/src/com/cloud/cluster/ManagementServerHostVO.java b/server/src/com/cloud/cluster/ManagementServerHostVO.java index 82be23a3434..42e3296ed59 100644 --- a/server/src/com/cloud/cluster/ManagementServerHostVO.java +++ b/server/src/com/cloud/cluster/ManagementServerHostVO.java @@ -35,7 +35,7 @@ import com.cloud.utils.db.GenericDao; @Entity @Table(name="mshost") -public class ManagementServerHostVO { +public class ManagementServerHostVO implements ManagementServerHost{ @Id @GeneratedValue(strategy=GenerationType.IDENTITY) @@ -53,7 +53,7 @@ public class ManagementServerHostVO { @Column(name="state", updatable = true, nullable=false) @Enumerated(value=EnumType.STRING) - private ManagementServerNode.State state; + private ManagementServerHost.State state; @Column(name="version", updatable=true, nullable=true) private String version; @@ -100,7 +100,8 @@ public class ManagementServerHostVO { public void setRunid(long runid) { this.runid = runid; } - + + @Override public long getMsid() { return msid; } @@ -117,14 +118,16 @@ public class ManagementServerHostVO { this.name = name; } - public ManagementServerNode.State getState() { + @Override + public ManagementServerHost.State getState() { return this.state; } - public void setState(ManagementServerNode.State state) { + public void setState(ManagementServerHost.State state) { this.state = state; } - + + @Override public String getVersion() { return version; } diff --git a/server/src/com/cloud/cluster/agentlb/AgentLoadBalancerPlanner.java b/server/src/com/cloud/cluster/agentlb/AgentLoadBalancerPlanner.java new file mode 100644 index 00000000000..2a87724d253 --- /dev/null +++ b/server/src/com/cloud/cluster/agentlb/AgentLoadBalancerPlanner.java @@ -0,0 +1,31 @@ +/** + * Copyright (C) 2010 Cloud.com, Inc. All rights reserved. + * + * This software is licensed under the GNU General Public License v3 or later. + * + * It is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ + +package com.cloud.cluster.agentlb; + +import java.util.List; + +import com.cloud.host.HostVO; +import com.cloud.utils.component.Adapter; + + +public interface AgentLoadBalancerPlanner extends Adapter{ + + List getHostsToRebalance(long msId, long avLoad); + +} diff --git a/server/src/com/cloud/cluster/agentlb/ClusterBasedAgentLoadBalancerPlanner.java b/server/src/com/cloud/cluster/agentlb/ClusterBasedAgentLoadBalancerPlanner.java new file mode 100644 index 00000000000..0f896d7f43c --- /dev/null +++ b/server/src/com/cloud/cluster/agentlb/ClusterBasedAgentLoadBalancerPlanner.java @@ -0,0 +1,167 @@ +/** + * Copyright (C) 2010 Cloud.com, Inc. All rights reserved. + * + * This software is licensed under the GNU General Public License v3 or later. + * + * It is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ +package com.cloud.cluster.agentlb; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import javax.ejb.Local; +import javax.naming.ConfigurationException; + +import org.apache.log4j.Logger; + +import com.cloud.host.HostVO; +import com.cloud.host.Status; +import com.cloud.host.dao.HostDao; +import com.cloud.utils.component.Inject; + + +@Local(value=AgentLoadBalancerPlanner.class) +public class ClusterBasedAgentLoadBalancerPlanner implements AgentLoadBalancerPlanner{ + private static final Logger s_logger = Logger.getLogger(AgentLoadBalancerPlanner.class); + private String _name; + + @Inject HostDao _hostDao = null; + + @Override + public boolean configure(String name, Map params) throws ConfigurationException { + _name = name; + return true; + } + + @Override + public String getName() { + return _name; + } + + @Override + public boolean start() { + return true; + } + + @Override + public boolean stop() { + return true; + } + + @Override + public List getHostsToRebalance(long msId, long avLoad) { + List allHosts = _hostDao.listByManagementServer(msId); + + if (allHosts.size() <= avLoad) { + s_logger.debug("Agent load for management server " + msId + " doesn't exceed av load " + avLoad + "; so it doesn't participate in agent rebalancing process"); + return null; + } + + List directHosts = _hostDao.listDirectHostsBy(msId, Status.Up); + if (directHosts.isEmpty()) { + s_logger.debug("No direct agents in status " + Status.Up + " exist for the management server " + msId + "; so it doesn't participate in agent rebalancing process"); + return null; + } + + + Map> hostToClusterMap = new HashMap>(); + + for (HostVO directHost : directHosts) { + Long clusterId = directHost.getClusterId(); + List directHostsPerCluster = null; + if (!hostToClusterMap.containsKey(clusterId)) { + directHostsPerCluster = new ArrayList(); + } else { + directHostsPerCluster = hostToClusterMap.get(clusterId); + } + directHostsPerCluster.add(directHost); + hostToClusterMap.put(clusterId, directHostsPerCluster); + } + + hostToClusterMap = sortByClusterSize(hostToClusterMap); + + long hostsToGive = allHosts.size() - avLoad; + long hostsLeftToGive = hostsToGive; + long hostsLeft = directHosts.size(); + List hostsToReturn = new ArrayList(); + int count = 0; + + for (Long cluster : hostToClusterMap.keySet()) { + List hostsInCluster = hostToClusterMap.get(cluster); + hostsLeft = hostsLeft - hostsInCluster.size(); + count++; + if (hostsToReturn.size() < hostsToGive) { + s_logger.debug("Trying cluster id=" + cluster); + + if (hostsInCluster.size() > hostsLeftToGive) { + if (hostsLeft >= hostsLeftToGive) { + s_logger.debug("Skipping cluster id=" + cluster + " as it has more hosts that we need: " + hostsInCluster.size() + " vs " + hostsLeftToGive); + continue; + } else { + if (count == hostToClusterMap.size()) { + //get all hosts that are needed from the cluster + for (int i=0; i <= hostsLeftToGive; i++) { + hostsToReturn.add(hostsInCluster.get(i)); + hostsLeftToGive = hostsLeftToGive - 1; + s_logger.debug("Taking host " + hostsInCluster.get(i) + " from cluster " + cluster); + } + } + break; + } + } else { + s_logger.debug("Taking all " + hostsInCluster.size() + " hosts: " + hostsInCluster + " from cluster id=" + cluster); + hostsToReturn.addAll(hostsInCluster); + hostsLeftToGive = hostsLeftToGive - hostsInCluster.size(); + } + } else { + break; + } + } + + return hostsToReturn; + } + + public static LinkedHashMap> sortByClusterSize(final Map> hostToClusterMap) { + List keys = new ArrayList(); + keys.addAll(hostToClusterMap.keySet()); + Collections.sort(keys, new Comparator() { + @Override + public int compare(Long o1, Long o2) { + List v1 = hostToClusterMap.get(o1); + List v2 = hostToClusterMap.get(o2); + if (v1 == null) { + return (v2 == null) ? 0 : 1; + } + + if (v1.size() < v2.size()) { + return 1; + } else { + return 0; + } + } + }); + + LinkedHashMap> sortedMap = new LinkedHashMap>(); + for (Long key : keys) { + sortedMap.put(key, hostToClusterMap.get(key)); + } + return sortedMap; + } + +} diff --git a/server/src/com/cloud/cluster/agentlb/HostTransferMapVO.java b/server/src/com/cloud/cluster/agentlb/HostTransferMapVO.java new file mode 100644 index 00000000000..a2bb428b084 --- /dev/null +++ b/server/src/com/cloud/cluster/agentlb/HostTransferMapVO.java @@ -0,0 +1,99 @@ +/** + * Copyright (C) 2010 Cloud.com, Inc. All rights reserved. + * + * This software is licensed under the GNU General Public License v3 or later. + * + * It is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ + +package com.cloud.cluster.agentlb; + +import java.util.Date; + +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.Inheritance; +import javax.persistence.InheritanceType; +import javax.persistence.Table; + +import com.cloud.utils.db.GenericDao; + +@Entity +@Table(name = "op_host_transfer") +@Inheritance(strategy = InheritanceType.TABLE_PER_CLASS) +public class HostTransferMapVO { + + public enum HostTransferState { + TransferRequested, TransferStarted, TransferCompleted, TransferFailed; + } + + @Id + @Column(name = "id") + private long id; + + @Column(name = "initial_mgmt_server_id") + private long initialOwner; + + @Column(name = "future_mgmt_server_id") + private long futureOwner; + + @Column(name = "state") + private HostTransferState state; + + @Column(name=GenericDao.CREATED_COLUMN) + private Date created; + + public HostTransferMapVO(long hostId, long initialOwner, long futureOwner) { + this.id = hostId; + this.initialOwner = initialOwner; + this.futureOwner = futureOwner; + this.state = HostTransferState.TransferRequested; + } + + protected HostTransferMapVO() { + } + + public long getInitialOwner() { + return initialOwner; + } + + public long getFutureOwner() { + return futureOwner; + } + + public HostTransferState getState() { + return state; + } + + public void setInitialOwner(long initialOwner) { + this.initialOwner = initialOwner; + } + + public void setFutureOwner(long futureOwner) { + this.futureOwner = futureOwner; + } + + public void setState(HostTransferState state) { + this.state = state; + } + + public long getId() { + return id; + } + + public Date getCreated() { + return created; + } + +} diff --git a/server/src/com/cloud/cluster/agentlb/dao/HostTransferMapDao.java b/server/src/com/cloud/cluster/agentlb/dao/HostTransferMapDao.java new file mode 100644 index 00000000000..4bb0b172250 --- /dev/null +++ b/server/src/com/cloud/cluster/agentlb/dao/HostTransferMapDao.java @@ -0,0 +1,41 @@ +/** + * Copyright (C) 2010 Cloud.com, Inc. All rights reserved. + * + * This software is licensed under the GNU General Public License v3 or later. + * + * It is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ + +package com.cloud.cluster.agentlb.dao; + +import java.util.Date; +import java.util.List; + +import com.cloud.cluster.agentlb.HostTransferMapVO; +import com.cloud.cluster.agentlb.HostTransferMapVO.HostTransferState; +import com.cloud.utils.db.GenericDao; + +public interface HostTransferMapDao extends GenericDao { + + List listHostsLeavingCluster(long clusterId); + + List listHostsJoiningCluster(long clusterId); + + HostTransferMapVO startAgentTransfering(long hostId, long currentOwner, long futureOwner); + + boolean completeAgentTransfering(long hostId, boolean success); + + List listBy(long futureOwnerId, HostTransferState state); + + boolean isActive(long hostId, Date cutTime); +} diff --git a/server/src/com/cloud/cluster/agentlb/dao/HostTransferMapDaoImpl.java b/server/src/com/cloud/cluster/agentlb/dao/HostTransferMapDaoImpl.java new file mode 100644 index 00000000000..8491215e8cc --- /dev/null +++ b/server/src/com/cloud/cluster/agentlb/dao/HostTransferMapDaoImpl.java @@ -0,0 +1,125 @@ +/** + * Copyright (C) 2010 Cloud.com, Inc. All rights reserved. + * + * This software is licensed under the GNU General Public License v3 or later. + * + * It is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + */ + +package com.cloud.cluster.agentlb.dao; + +import java.util.Date; +import java.util.List; + +import javax.ejb.Local; + +import org.apache.log4j.Logger; + +import com.cloud.cluster.agentlb.HostTransferMapVO; +import com.cloud.cluster.agentlb.HostTransferMapVO.HostTransferState; +import com.cloud.utils.db.DB; +import com.cloud.utils.db.GenericDaoBase; +import com.cloud.utils.db.SearchBuilder; +import com.cloud.utils.db.SearchCriteria; + +@Local(value = { HostTransferMapDao.class }) +@DB(txn = false) +public class HostTransferMapDaoImpl extends GenericDaoBase implements HostTransferMapDao { + private static final Logger s_logger = Logger.getLogger(HostTransferMapDaoImpl.class); + + protected final SearchBuilder AllFieldsSearch; + protected final SearchBuilder IntermediateStateSearch; + protected final SearchBuilder InactiveSearch; + + public HostTransferMapDaoImpl() { + AllFieldsSearch = createSearchBuilder(); + AllFieldsSearch.and("id", AllFieldsSearch.entity().getId(), SearchCriteria.Op.EQ); + AllFieldsSearch.and("initialOwner", AllFieldsSearch.entity().getInitialOwner(), SearchCriteria.Op.EQ); + AllFieldsSearch.and("futureOwner", AllFieldsSearch.entity().getFutureOwner(), SearchCriteria.Op.EQ); + AllFieldsSearch.and("state", AllFieldsSearch.entity().getState(), SearchCriteria.Op.EQ); + AllFieldsSearch.done(); + + IntermediateStateSearch = createSearchBuilder(); + IntermediateStateSearch.and("futureOwner", IntermediateStateSearch.entity().getFutureOwner(), SearchCriteria.Op.EQ); + IntermediateStateSearch.and("state", IntermediateStateSearch.entity().getState(), SearchCriteria.Op.NOTIN); + IntermediateStateSearch.done(); + + InactiveSearch = createSearchBuilder(); + InactiveSearch.and("created", InactiveSearch.entity().getCreated(), SearchCriteria.Op.LTEQ); + InactiveSearch.and("id", InactiveSearch.entity().getId(), SearchCriteria.Op.EQ); + InactiveSearch.and("state", InactiveSearch.entity().getState(), SearchCriteria.Op.EQ); + InactiveSearch.done(); + + } + + @Override + public List listHostsLeavingCluster(long clusterId) { + SearchCriteria sc = IntermediateStateSearch.create(); + sc.setParameters("initialOwner", clusterId); + sc.setParameters("state", HostTransferState.TransferRequested, HostTransferState.TransferStarted); + + return listBy(sc); + } + + @Override + public List listHostsJoiningCluster(long clusterId) { + SearchCriteria sc = IntermediateStateSearch.create(); + sc.setParameters("futureOwner", clusterId); + sc.setParameters("state", HostTransferState.TransferRequested, HostTransferState.TransferStarted); + return listBy(sc); + } + + + + @Override + public HostTransferMapVO startAgentTransfering(long hostId, long initialOwner, long futureOwner) { + HostTransferMapVO transfer = new HostTransferMapVO(hostId, initialOwner, futureOwner); + return persist(transfer); + } + + @Override + public boolean completeAgentTransfering(long hostId, boolean success) { + HostTransferMapVO transfer = findById(hostId); + if (success) { + transfer.setState(HostTransferState.TransferCompleted); + } else { + transfer.setState(HostTransferState.TransferFailed); + } + return update(hostId, transfer); + } + + @Override + public List listBy(long futureOwnerId, HostTransferState state) { + SearchCriteria sc = AllFieldsSearch.create(); + sc.setParameters("futureOwner", futureOwnerId); + sc.setParameters("state", state); + + return listBy(sc); + } + + @Override + public boolean isActive(long hostId, Date cutTime) { + SearchCriteria sc = InactiveSearch.create(); + sc.setParameters("id", hostId); + sc.setParameters("state", HostTransferState.TransferRequested); + sc.setParameters("created", cutTime); + + + if (listBy(sc).isEmpty()) { + return true; + } else { + return false; + } + } + +} diff --git a/server/src/com/cloud/cluster/dao/ManagementServerHostDao.java b/server/src/com/cloud/cluster/dao/ManagementServerHostDao.java index 96458594b66..8a92844e90a 100644 --- a/server/src/com/cloud/cluster/dao/ManagementServerHostDao.java +++ b/server/src/com/cloud/cluster/dao/ManagementServerHostDao.java @@ -22,10 +22,12 @@ import java.sql.Connection; import java.util.Date; import java.util.List; +import com.cloud.cluster.ManagementServerHost.State; import com.cloud.cluster.ManagementServerHostVO; import com.cloud.utils.db.GenericDao; public interface ManagementServerHostDao extends GenericDao { + @Override boolean remove(Long id); ManagementServerHostVO findByMsid(long msid); @@ -41,4 +43,6 @@ public interface ManagementServerHostDao extends GenericDao getActiveList(Connection conn, Date cutTime); List getInactiveList(Connection conn, Date cutTime); + + void update(Connection conn, long id, long runId, State state, Date lastUpdate); } diff --git a/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java b/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java index 52523d9b2a3..89494bcbbab 100644 --- a/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java +++ b/server/src/com/cloud/cluster/dao/ManagementServerHostDaoImpl.java @@ -30,8 +30,9 @@ import javax.ejb.Local; import org.apache.log4j.Logger; import com.cloud.cluster.ClusterInvalidSessionException; +import com.cloud.cluster.ManagementServerHost; +import com.cloud.cluster.ManagementServerHost.State; import com.cloud.cluster.ManagementServerHostVO; -import com.cloud.cluster.ManagementServerNode; import com.cloud.utils.DateUtil; import com.cloud.utils.db.DB; import com.cloud.utils.db.GenericDaoBase; @@ -48,7 +49,8 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase ActiveSearch; private final SearchBuilder InactiveSearch; - public void update(Connection conn, long id, long runid, String name, String version, String serviceIP, int servicePort, Date lastUpdate) { + @Override + public void update(Connection conn, long id, long runid, String name, String version, String serviceIP, int servicePort, Date lastUpdate) { PreparedStatement pstmt = null; try { pstmt = conn.prepareStatement("update mshost set name=?, version=?, service_ip=?, service_port=?, last_update=?, removed=null, alert_count=0, runid=? where id=?"); @@ -75,7 +77,8 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase getActiveList(Connection conn, Date cutTime) { + @Override + public List getActiveList(Connection conn, Date cutTime) { Transaction txn = Transaction.openNew("getActiveList", conn); try { SearchCriteria sc = ActiveSearch.create(); @@ -135,7 +140,8 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase getInactiveList(Connection conn, Date cutTime) { + @Override + public List getInactiveList(Connection conn, Date cutTime) { Transaction txn = Transaction.openNew("getInactiveList", conn); try { SearchCriteria sc = InactiveSearch.create(); @@ -147,7 +153,8 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase sc = MsIdSearch.create(); sc.setParameters("msid", msid); @@ -158,21 +165,23 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase getActiveList(Date cutTime) { + @Override + public List getActiveList(Date cutTime) { SearchCriteria sc = ActiveSearch.create(); sc.setParameters("lastUpdateTime", cutTime); return listIncludingRemovedBy(sc); } - public List getInactiveList(Date cutTime) { + @Override + public List getInactiveList(Date cutTime) { SearchCriteria sc = InactiveSearch.create(); sc.setParameters("lastUpdateTime", cutTime); return listIncludingRemovedBy(sc); } - @DB + @Override + @DB public void increaseAlertCount(long id) { Transaction txn = Transaction.currentTxn(); PreparedStatement pstmt = null; @@ -272,5 +286,34 @@ public class ManagementServerHostDaoImpl extends GenericDaoBase _componentClass; diff --git a/server/src/com/cloud/configuration/DefaultComponentLibrary.java b/server/src/com/cloud/configuration/DefaultComponentLibrary.java index 63146860079..e85275e5ca3 100644 --- a/server/src/com/cloud/configuration/DefaultComponentLibrary.java +++ b/server/src/com/cloud/configuration/DefaultComponentLibrary.java @@ -41,6 +41,7 @@ import com.cloud.cluster.ClusterFenceManagerImpl; import com.cloud.cluster.ClusterManagerImpl; import com.cloud.cluster.DummyClusterManagerImpl; import com.cloud.cluster.ManagementServerNode; +import com.cloud.cluster.agentlb.dao.HostTransferMapDaoImpl; import com.cloud.cluster.dao.ManagementServerHostDaoImpl; import com.cloud.cluster.dao.StackMaidDaoImpl; import com.cloud.configuration.dao.ConfigurationDaoImpl; @@ -276,6 +277,7 @@ public class DefaultComponentLibrary extends ComponentLibraryBase implements Com addDao("KeystoreDao", KeystoreDaoImpl.class); addDao("DcDetailsDao", DcDetailsDaoImpl.class); addDao("SwiftDao", SwiftDaoImpl.class); + addDao("AgentTransferMapDao", HostTransferMapDaoImpl.class); } @Override diff --git a/server/src/com/cloud/host/dao/HostDao.java b/server/src/com/cloud/host/dao/HostDao.java index e91b2326224..de035f20743 100644 --- a/server/src/com/cloud/host/dao/HostDao.java +++ b/server/src/com/cloud/host/dao/HostDao.java @@ -68,8 +68,7 @@ public interface HostDao extends GenericDao { */ List findDirectlyConnectedHosts(); - List findDirectAgentToLoad(long msid, long lastPingSecondsAfter, Long limit); - + List findDirectAgentToLoad(long lastPingSecondsAfter, Long limit); /** * Mark the host as disconnected if it is in one of these states. * The management server id is set to null. @@ -167,6 +166,12 @@ public interface HostDao extends GenericDao { List listSecondaryStorageHosts(long dataCenterId); boolean directConnect(HostVO host, long msId, boolean secondConnect); + + List listDirectHostsBy(long msId, Status status); + + List listManagedDirectAgents(); + + List listManagedAgents(); HostVO findTrafficMonitorHost(); @@ -175,4 +180,6 @@ public interface HostDao extends GenericDao { List listLocalSecondaryStorageHosts(long dataCenterId); List listAllSecondaryStorageHosts(long dataCenterId); + + List listByManagementServer(long msId); } diff --git a/server/src/com/cloud/host/dao/HostDaoImpl.java b/server/src/com/cloud/host/dao/HostDaoImpl.java index ef7e598c70a..ed74c108703 100644 --- a/server/src/com/cloud/host/dao/HostDaoImpl.java +++ b/server/src/com/cloud/host/dao/HostDaoImpl.java @@ -14,9 +14,9 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see . * - */ -package com.cloud.host.dao; - + */ +package com.cloud.host.dao; + import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -25,7 +25,6 @@ import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.TimeZone; import javax.ejb.Local; @@ -57,163 +56,168 @@ import com.cloud.utils.db.Transaction; import com.cloud.utils.db.UpdateBuilder; import com.cloud.utils.exception.CloudRuntimeException; -@Local(value = { HostDao.class }) @DB(txn=false) -@TableGenerator(name="host_req_sq", table="op_host", pkColumnName="id", valueColumnName="sequence", allocationSize=1) -public class HostDaoImpl extends GenericDaoBase implements HostDao { +@Local(value = { HostDao.class }) +@DB(txn = false) +@TableGenerator(name = "host_req_sq", table = "op_host", pkColumnName = "id", valueColumnName = "sequence", allocationSize = 1) +public class HostDaoImpl extends GenericDaoBase implements HostDao { private static final Logger s_logger = Logger.getLogger(HostDaoImpl.class); - - protected final SearchBuilder TypePodDcStatusSearch; - - protected final SearchBuilder IdStatusSearch; - protected final SearchBuilder TypeDcSearch; - protected final SearchBuilder TypeDcStatusSearch; - protected final SearchBuilder LastPingedSearch; - protected final SearchBuilder LastPingedSearch2; - protected final SearchBuilder MsStatusSearch; - protected final SearchBuilder DcPrivateIpAddressSearch; - protected final SearchBuilder DcStorageIpAddressSearch; - - protected final SearchBuilder GuidSearch; - protected final SearchBuilder DcSearch; - protected final SearchBuilder PodSearch; - protected final SearchBuilder TypeSearch; - protected final SearchBuilder StatusSearch; + + protected final SearchBuilder TypePodDcStatusSearch; + + protected final SearchBuilder IdStatusSearch; + protected final SearchBuilder TypeDcSearch; + protected final SearchBuilder TypeDcStatusSearch; + protected final SearchBuilder LastPingedSearch; + protected final SearchBuilder LastPingedSearch2; + protected final SearchBuilder MsStatusSearch; + protected final SearchBuilder DcPrivateIpAddressSearch; + protected final SearchBuilder DcStorageIpAddressSearch; + + protected final SearchBuilder GuidSearch; + protected final SearchBuilder DcSearch; + protected final SearchBuilder PodSearch; + protected final SearchBuilder TypeSearch; + protected final SearchBuilder StatusSearch; protected final SearchBuilder NameLikeSearch; protected final SearchBuilder NameSearch; - protected final SearchBuilder SequenceSearch; + protected final SearchBuilder SequenceSearch; protected final SearchBuilder DirectlyConnectedSearch; protected final SearchBuilder UnmanagedDirectConnectSearch; protected final SearchBuilder UnmanagedExternalNetworkApplianceSearch; - protected final SearchBuilder MaintenanceCountSearch; + protected final SearchBuilder MaintenanceCountSearch; protected final SearchBuilder ClusterSearch; protected final SearchBuilder ConsoleProxyHostSearch; protected final SearchBuilder AvailHypevisorInZone; protected final SearchBuilder DirectConnectSearch; + protected final SearchBuilder ManagedDirectConnectSearch; + protected final SearchBuilder ManagedConnectSearch; protected final GenericSearchBuilder HostsInStatusSearch; protected final GenericSearchBuilder CountRoutingByDc; - - protected final Attribute _statusAttr; - protected final Attribute _msIdAttr; - protected final Attribute _pingTimeAttr; - - protected final HostDetailsDaoImpl _detailsDao = ComponentLocator.inject(HostDetailsDaoImpl.class); - protected final HostTagsDaoImpl _hostTagsDao = ComponentLocator.inject(HostTagsDaoImpl.class); - public HostDaoImpl() { + protected final Attribute _statusAttr; + protected final Attribute _msIdAttr; + protected final Attribute _pingTimeAttr; + + protected final HostDetailsDaoImpl _detailsDao = ComponentLocator.inject(HostDetailsDaoImpl.class); + protected final HostTagsDaoImpl _hostTagsDao = ComponentLocator.inject(HostTagsDaoImpl.class); + + public HostDaoImpl() { MaintenanceCountSearch = createSearchBuilder(); MaintenanceCountSearch.and("cluster", MaintenanceCountSearch.entity().getClusterId(), SearchCriteria.Op.EQ); MaintenanceCountSearch.and("status", MaintenanceCountSearch.entity().getStatus(), SearchCriteria.Op.IN); MaintenanceCountSearch.done(); - - TypePodDcStatusSearch = createSearchBuilder(); - HostVO entity = TypePodDcStatusSearch.entity(); - TypePodDcStatusSearch.and("type", entity.getType(), SearchCriteria.Op.EQ); - TypePodDcStatusSearch.and("pod", entity.getPodId(), SearchCriteria.Op.EQ); + + TypePodDcStatusSearch = createSearchBuilder(); + HostVO entity = TypePodDcStatusSearch.entity(); + TypePodDcStatusSearch.and("type", entity.getType(), SearchCriteria.Op.EQ); + TypePodDcStatusSearch.and("pod", entity.getPodId(), SearchCriteria.Op.EQ); TypePodDcStatusSearch.and("dc", entity.getDataCenterId(), SearchCriteria.Op.EQ); - TypePodDcStatusSearch.and("cluster", entity.getClusterId(), SearchCriteria.Op.EQ); - TypePodDcStatusSearch.and("status", entity.getStatus(), SearchCriteria.Op.EQ); - TypePodDcStatusSearch.done(); - - LastPingedSearch = createSearchBuilder(); - LastPingedSearch.and("ping", LastPingedSearch.entity().getLastPinged(), SearchCriteria.Op.LT); - LastPingedSearch.and("state", LastPingedSearch.entity().getStatus(), SearchCriteria.Op.IN); - LastPingedSearch.done(); - - LastPingedSearch2 = createSearchBuilder(); - LastPingedSearch2.and("ping", LastPingedSearch2.entity().getLastPinged(), SearchCriteria.Op.LT); - LastPingedSearch2.and("type", LastPingedSearch2.entity().getType(), SearchCriteria.Op.EQ); - LastPingedSearch2.done(); - - MsStatusSearch = createSearchBuilder(); - MsStatusSearch.and("ms", MsStatusSearch.entity().getManagementServerId(), SearchCriteria.Op.EQ); - MsStatusSearch.and("statuses", MsStatusSearch.entity().getStatus(), SearchCriteria.Op.IN); - MsStatusSearch.done(); - - TypeDcSearch = createSearchBuilder(); - TypeDcSearch.and("type", TypeDcSearch.entity().getType(), SearchCriteria.Op.EQ); - TypeDcSearch.and("dc", TypeDcSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ); - TypeDcSearch.done(); - - TypeDcStatusSearch = createSearchBuilder(); - TypeDcStatusSearch.and("type", TypeDcStatusSearch.entity().getType(), SearchCriteria.Op.EQ); - TypeDcStatusSearch.and("dc", TypeDcStatusSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ); - TypeDcStatusSearch.and("status", TypeDcStatusSearch.entity().getStatus(), SearchCriteria.Op.EQ); - TypeDcStatusSearch.done(); - - IdStatusSearch = createSearchBuilder(); - IdStatusSearch.and("id", IdStatusSearch.entity().getId(), SearchCriteria.Op.EQ); - IdStatusSearch.and("states", IdStatusSearch.entity().getStatus(), SearchCriteria.Op.IN); - IdStatusSearch.done(); - - DcPrivateIpAddressSearch = createSearchBuilder(); - DcPrivateIpAddressSearch.and("privateIpAddress", DcPrivateIpAddressSearch.entity().getPrivateIpAddress(), SearchCriteria.Op.EQ); - DcPrivateIpAddressSearch.and("dc", DcPrivateIpAddressSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ); - DcPrivateIpAddressSearch.done(); - - DcStorageIpAddressSearch = createSearchBuilder(); - DcStorageIpAddressSearch.and("storageIpAddress", DcStorageIpAddressSearch.entity().getStorageIpAddress(), SearchCriteria.Op.EQ); - DcStorageIpAddressSearch.and("dc", DcStorageIpAddressSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ); - DcStorageIpAddressSearch.done(); - - GuidSearch = createSearchBuilder(); - GuidSearch.and("guid", GuidSearch.entity().getGuid(), SearchCriteria.Op.EQ); - GuidSearch.done(); - - DcSearch = createSearchBuilder(); - DcSearch.and("dc", DcSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ); + TypePodDcStatusSearch.and("cluster", entity.getClusterId(), SearchCriteria.Op.EQ); + TypePodDcStatusSearch.and("status", entity.getStatus(), SearchCriteria.Op.EQ); + TypePodDcStatusSearch.done(); + + LastPingedSearch = createSearchBuilder(); + LastPingedSearch.and("ping", LastPingedSearch.entity().getLastPinged(), SearchCriteria.Op.LT); + LastPingedSearch.and("state", LastPingedSearch.entity().getStatus(), SearchCriteria.Op.IN); + LastPingedSearch.done(); + + LastPingedSearch2 = createSearchBuilder(); + LastPingedSearch2.and("ping", LastPingedSearch2.entity().getLastPinged(), SearchCriteria.Op.LT); + LastPingedSearch2.and("type", LastPingedSearch2.entity().getType(), SearchCriteria.Op.EQ); + LastPingedSearch2.done(); + + MsStatusSearch = createSearchBuilder(); + MsStatusSearch.and("ms", MsStatusSearch.entity().getManagementServerId(), SearchCriteria.Op.EQ); + MsStatusSearch.and("statuses", MsStatusSearch.entity().getStatus(), SearchCriteria.Op.IN); + MsStatusSearch.done(); + + TypeDcSearch = createSearchBuilder(); + TypeDcSearch.and("type", TypeDcSearch.entity().getType(), SearchCriteria.Op.EQ); + TypeDcSearch.and("dc", TypeDcSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ); + TypeDcSearch.done(); + + TypeDcStatusSearch = createSearchBuilder(); + TypeDcStatusSearch.and("type", TypeDcStatusSearch.entity().getType(), SearchCriteria.Op.EQ); + TypeDcStatusSearch.and("dc", TypeDcStatusSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ); + TypeDcStatusSearch.and("status", TypeDcStatusSearch.entity().getStatus(), SearchCriteria.Op.EQ); + TypeDcStatusSearch.done(); + + IdStatusSearch = createSearchBuilder(); + IdStatusSearch.and("id", IdStatusSearch.entity().getId(), SearchCriteria.Op.EQ); + IdStatusSearch.and("states", IdStatusSearch.entity().getStatus(), SearchCriteria.Op.IN); + IdStatusSearch.done(); + + DcPrivateIpAddressSearch = createSearchBuilder(); + DcPrivateIpAddressSearch.and("privateIpAddress", DcPrivateIpAddressSearch.entity().getPrivateIpAddress(), SearchCriteria.Op.EQ); + DcPrivateIpAddressSearch.and("dc", DcPrivateIpAddressSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ); + DcPrivateIpAddressSearch.done(); + + DcStorageIpAddressSearch = createSearchBuilder(); + DcStorageIpAddressSearch.and("storageIpAddress", DcStorageIpAddressSearch.entity().getStorageIpAddress(), SearchCriteria.Op.EQ); + DcStorageIpAddressSearch.and("dc", DcStorageIpAddressSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ); + DcStorageIpAddressSearch.done(); + + GuidSearch = createSearchBuilder(); + GuidSearch.and("guid", GuidSearch.entity().getGuid(), SearchCriteria.Op.EQ); + GuidSearch.done(); + + DcSearch = createSearchBuilder(); + DcSearch.and("dc", DcSearch.entity().getDataCenterId(), SearchCriteria.Op.EQ); DcSearch.done(); ClusterSearch = createSearchBuilder(); ClusterSearch.and("cluster", ClusterSearch.entity().getClusterId(), SearchCriteria.Op.EQ); - ClusterSearch.done(); + ClusterSearch.done(); ConsoleProxyHostSearch = createSearchBuilder(); ConsoleProxyHostSearch.and("name", ConsoleProxyHostSearch.entity().getName(), SearchCriteria.Op.EQ); ConsoleProxyHostSearch.and("type", ConsoleProxyHostSearch.entity().getType(), SearchCriteria.Op.EQ); ConsoleProxyHostSearch.done(); - - PodSearch = createSearchBuilder(); - PodSearch.and("pod", PodSearch.entity().getPodId(), SearchCriteria.Op.EQ); - PodSearch.done(); - - TypeSearch = createSearchBuilder(); - TypeSearch.and("type", TypeSearch.entity().getType(), SearchCriteria.Op.EQ); - TypeSearch.done(); - - StatusSearch =createSearchBuilder(); - StatusSearch.and("status", StatusSearch.entity().getStatus(), SearchCriteria.Op.IN); - StatusSearch.done(); - - NameLikeSearch = createSearchBuilder(); - NameLikeSearch.and("name", NameLikeSearch.entity().getName(), SearchCriteria.Op.LIKE); + + PodSearch = createSearchBuilder(); + PodSearch.and("pod", PodSearch.entity().getPodId(), SearchCriteria.Op.EQ); + PodSearch.done(); + + TypeSearch = createSearchBuilder(); + TypeSearch.and("type", TypeSearch.entity().getType(), SearchCriteria.Op.EQ); + TypeSearch.done(); + + StatusSearch = createSearchBuilder(); + StatusSearch.and("status", StatusSearch.entity().getStatus(), SearchCriteria.Op.IN); + StatusSearch.done(); + + NameLikeSearch = createSearchBuilder(); + NameLikeSearch.and("name", NameLikeSearch.entity().getName(), SearchCriteria.Op.LIKE); NameLikeSearch.done(); NameSearch = createSearchBuilder(); NameSearch.and("name", NameSearch.entity().getName(), SearchCriteria.Op.EQ); NameSearch.done(); - - SequenceSearch = createSearchBuilder(); - SequenceSearch.and("id", SequenceSearch.entity().getId(), SearchCriteria.Op.EQ); -// SequenceSearch.addRetrieve("sequence", SequenceSearch.entity().getSequence()); - SequenceSearch.done(); - - DirectlyConnectedSearch = createSearchBuilder(); - DirectlyConnectedSearch.and("resource", DirectlyConnectedSearch.entity().getResource(), SearchCriteria.Op.NNULL); + + SequenceSearch = createSearchBuilder(); + SequenceSearch.and("id", SequenceSearch.entity().getId(), SearchCriteria.Op.EQ); + // SequenceSearch.addRetrieve("sequence", SequenceSearch.entity().getSequence()); + SequenceSearch.done(); + + DirectlyConnectedSearch = createSearchBuilder(); + DirectlyConnectedSearch.and("resource", DirectlyConnectedSearch.entity().getResource(), SearchCriteria.Op.NNULL); + DirectlyConnectedSearch.and("ms", DirectlyConnectedSearch.entity().getManagementServerId(), SearchCriteria.Op.EQ); + DirectlyConnectedSearch.and("statuses", DirectlyConnectedSearch.entity().getStatus(), SearchCriteria.Op.EQ); DirectlyConnectedSearch.done(); - UnmanagedDirectConnectSearch = createSearchBuilder(); + UnmanagedDirectConnectSearch = createSearchBuilder(); UnmanagedDirectConnectSearch.and("resource", UnmanagedDirectConnectSearch.entity().getResource(), SearchCriteria.Op.NNULL); UnmanagedDirectConnectSearch.and("server", UnmanagedDirectConnectSearch.entity().getManagementServerId(), SearchCriteria.Op.NULL); UnmanagedDirectConnectSearch.and("lastPinged", UnmanagedDirectConnectSearch.entity().getLastPinged(), SearchCriteria.Op.LTEQ); /* - UnmanagedDirectConnectSearch.op(SearchCriteria.Op.OR, "managementServerId", UnmanagedDirectConnectSearch.entity().getManagementServerId(), SearchCriteria.Op.EQ); - UnmanagedDirectConnectSearch.and("lastPinged", UnmanagedDirectConnectSearch.entity().getLastPinged(), SearchCriteria.Op.LTEQ); - UnmanagedDirectConnectSearch.cp(); - UnmanagedDirectConnectSearch.cp(); - */ + * UnmanagedDirectConnectSearch.op(SearchCriteria.Op.OR, "managementServerId", + * UnmanagedDirectConnectSearch.entity().getManagementServerId(), SearchCriteria.Op.EQ); + * UnmanagedDirectConnectSearch.and("lastPinged", UnmanagedDirectConnectSearch.entity().getLastPinged(), + * SearchCriteria.Op.LTEQ); UnmanagedDirectConnectSearch.cp(); UnmanagedDirectConnectSearch.cp(); + */ UnmanagedDirectConnectSearch.done(); DirectConnectSearch = createSearchBuilder(); @@ -252,27 +256,37 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao CountRoutingByDc.and("dc", CountRoutingByDc.entity().getDataCenterId(), SearchCriteria.Op.EQ); CountRoutingByDc.and("type", CountRoutingByDc.entity().getType(), SearchCriteria.Op.EQ); CountRoutingByDc.and("status", CountRoutingByDc.entity().getStatus(), SearchCriteria.Op.EQ); + CountRoutingByDc.done(); - - _statusAttr = _allAttributes.get("status"); - _msIdAttr = _allAttributes.get("managementServerId"); - _pingTimeAttr = _allAttributes.get("lastPinged"); - - assert (_statusAttr != null && _msIdAttr != null && _pingTimeAttr != null) : "Couldn't find one of these attributes"; + + ManagedDirectConnectSearch = createSearchBuilder(); + ManagedDirectConnectSearch.and("resource", ManagedDirectConnectSearch.entity().getResource(), SearchCriteria.Op.NNULL); + ManagedDirectConnectSearch.and("server", ManagedDirectConnectSearch.entity().getManagementServerId(), SearchCriteria.Op.NULL); + ManagedDirectConnectSearch.done(); + + ManagedConnectSearch = createSearchBuilder(); + ManagedConnectSearch.and("server", ManagedConnectSearch.entity().getManagementServerId(), SearchCriteria.Op.NNULL); + ManagedConnectSearch.done(); + + _statusAttr = _allAttributes.get("status"); + _msIdAttr = _allAttributes.get("managementServerId"); + _pingTimeAttr = _allAttributes.get("lastPinged"); + + assert (_statusAttr != null && _msIdAttr != null && _pingTimeAttr != null) : "Couldn't find one of these attributes"; } @Override public long countBy(long clusterId, Status... statuses) { SearchCriteria sc = MaintenanceCountSearch.create(); - sc.setParameters("status", (Object[])statuses); + sc.setParameters("status", (Object[]) statuses); sc.setParameters("cluster", clusterId); List hosts = listBy(sc); return hosts.size(); - } - - @Override + } + + @Override public HostVO findSecondaryStorageHost(long dcId) { SearchCriteria sc = TypeDcSearch.create(); sc.setParameters("type", Host.Type.SecondaryStorage); @@ -284,34 +298,34 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao Collections.shuffle(storageHosts); return storageHosts.get(0); } - } - - @Override - public List listSecondaryStorageHosts() { - SearchCriteria sc = createSearchCriteria(); - sc.addAnd("type", SearchCriteria.Op.EQ, Host.Type.SecondaryStorage); - return search(sc, null); } - + + @Override + public List listSecondaryStorageHosts() { + SearchCriteria sc = createSearchCriteria(); + sc.addAnd("type", SearchCriteria.Op.EQ, Host.Type.SecondaryStorage); + return search(sc, null); + } + @Override public List listSecondaryStorageHosts(long dataCenterId) { - SearchCriteria sc = createSearchCriteria(); + SearchCriteria sc = createSearchCriteria(); sc.addAnd("dataCenterId", SearchCriteria.Op.EQ, dataCenterId); sc.addAnd("type", SearchCriteria.Op.EQ, Host.Type.SecondaryStorage); return search(sc, null); } - + @Override public List listLocalSecondaryStorageHosts() { - SearchCriteria sc = createSearchCriteria(); + SearchCriteria sc = createSearchCriteria(); sc.addAnd("type", SearchCriteria.Op.EQ, Host.Type.LocalSecondaryStorage); return search(sc, null); } - + @Override public List listLocalSecondaryStorageHosts(long dataCenterId) { - SearchCriteria sc = createSearchCriteria(); + SearchCriteria sc = createSearchCriteria(); sc.addAnd("dataCenterId", SearchCriteria.Op.EQ, dataCenterId); sc.addAnd("type", SearchCriteria.Op.EQ, Host.Type.LocalSecondaryStorage); return search(sc, null); @@ -333,7 +347,7 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao } @Override - public List findDirectAgentToLoad(long msid, long lastPingSecondsAfter, Long limit) { + public List findDirectAgentToLoad(long lastPingSecondsAfter, Long limit) { SearchCriteria sc = UnmanagedDirectConnectSearch.create(); sc.setParameters("lastPinged", lastPingSecondsAfter); return search(sc, new Filter(HostVO.class, "clusterId", true, 0L, limit)); @@ -344,33 +358,33 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao SearchCriteria sc = MsStatusSearch.create(); sc.setParameters("ms", msId); - HostVO host = createForUpdate(); + HostVO host = createForUpdate(); host.setManagementServerId(null); - host.setLastPinged((System.currentTimeMillis() >> 10) - ( 10 * 60 )); - host.setDisconnectedOn(new Date()); - - UpdateBuilder ub = getUpdateBuilder(host); - ub.set(host, "status", Status.Disconnected); - - update(ub, sc, null); - } - - @Override - public List listBy(Host.Type type, Long clusterId, Long podId, long dcId) { + host.setLastPinged((System.currentTimeMillis() >> 10) - (10 * 60)); + host.setDisconnectedOn(new Date()); + + UpdateBuilder ub = getUpdateBuilder(host); + ub.set(host, "status", Status.Disconnected); + + update(ub, sc, null); + } + + @Override + public List listBy(Host.Type type, Long clusterId, Long podId, long dcId) { SearchCriteria sc = TypePodDcStatusSearch.create(); - if ( type != null ) { + if (type != null) { sc.setParameters("type", type.toString()); } if (clusterId != null) { sc.setParameters("cluster", clusterId); } - if (podId != null ) { + if (podId != null) { sc.setParameters("pod", podId); } sc.setParameters("dc", dcId); - sc.setParameters("status", Status.Up.toString()); - - return listBy(sc); + sc.setParameters("status", Status.Up.toString()); + + return listBy(sc); } @Override @@ -391,7 +405,7 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao SearchBuilder hostTagSearch = _hostTagsDao.createSearchBuilder(); HostTagVO tagEntity = hostTagSearch.entity(); - hostTagSearch.and("tag",tagEntity.getTag(), SearchCriteria.Op.EQ); + hostTagSearch.and("tag", tagEntity.getTag(), SearchCriteria.Op.EQ); SearchBuilder hostSearch = createSearchBuilder(); HostVO entity = hostSearch.entity(); @@ -402,7 +416,6 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao hostSearch.and("status", entity.getStatus(), SearchCriteria.Op.EQ); hostSearch.join("hostTagSearch", hostTagSearch, entity.getId(), tagEntity.getHostId(), JoinBuilder.JoinType.INNER); - SearchCriteria sc = hostSearch.create(); sc.setJoinParameters("hostTagSearch", "tag", hostTag); sc.setParameters("type", type.toString()); @@ -426,15 +439,15 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao return listBy(sc); } - - @Override - public List listBy(Host.Type type, long dcId) { - SearchCriteria sc = TypeDcStatusSearch.create(); - sc.setParameters("type", type.toString()); - sc.setParameters("dc", dcId); - sc.setParameters("status", Status.Up.toString()); - - return listBy(sc); + + @Override + public List listBy(Host.Type type, long dcId) { + SearchCriteria sc = TypeDcStatusSearch.create(); + sc.setParameters("type", type.toString()); + sc.setParameters("dc", dcId); + sc.setParameters("status", Status.Up.toString()); + + return listBy(sc); } @Override @@ -444,36 +457,36 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao sc.setParameters("dc", dcId); return listBy(sc); - } - - @Override - public HostVO findByPrivateIpAddressInDataCenter(long dcId, String privateIpAddress) { - SearchCriteria sc = DcPrivateIpAddressSearch.create(); - sc.setParameters("dc", dcId); - sc.setParameters("privateIpAddress", privateIpAddress); - - return findOneBy(sc); - } - - @Override - public HostVO findByStorageIpAddressInDataCenter(long dcId, String privateIpAddress) { - SearchCriteria sc = DcStorageIpAddressSearch.create(); - sc.setParameters("dc", dcId); - sc.setParameters("storageIpAddress", privateIpAddress); - - return findOneBy(sc); - } - - @Override - public void loadDetails(HostVO host) { - Map details =_detailsDao.findDetails(host.getId()); - host.setDetails(details); - } + } @Override - public void loadHostTags(HostVO host){ - List hostTags = _hostTagsDao.gethostTags(host.getId()); - host.setHostTags(hostTags); + public HostVO findByPrivateIpAddressInDataCenter(long dcId, String privateIpAddress) { + SearchCriteria sc = DcPrivateIpAddressSearch.create(); + sc.setParameters("dc", dcId); + sc.setParameters("privateIpAddress", privateIpAddress); + + return findOneBy(sc); + } + + @Override + public HostVO findByStorageIpAddressInDataCenter(long dcId, String privateIpAddress) { + SearchCriteria sc = DcStorageIpAddressSearch.create(); + sc.setParameters("dc", dcId); + sc.setParameters("storageIpAddress", privateIpAddress); + + return findOneBy(sc); + } + + @Override + public void loadDetails(HostVO host) { + Map details = _detailsDao.findDetails(host.getId()); + host.setDetails(details); + } + + @Override + public void loadHostTags(HostVO host) { + List hostTags = _hostTagsDao.gethostTags(host.getId()); + host.setHostTags(hostTags); } @Override @@ -490,20 +503,21 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao ub.set(host, _statusAttr, Status.Connecting); return update(host, sc) > 0; - } + } - @Override - public boolean updateStatus(HostVO host, Event event, long msId) { - Status oldStatus = host.getStatus(); - long oldPingTime = host.getLastPinged(); - Status newStatus = oldStatus.getNextStatus(event); - if ( host == null ) { + @Override + public boolean updateStatus(HostVO host, Event event, long msId) { + if (host == null) { + return false; + } + + Status oldStatus = host.getStatus(); + long oldPingTime = host.getLastPinged(); + Status newStatus = oldStatus.getNextStatus(event); + + if (newStatus == null) { return false; } - - if (newStatus == null) { - return false; - } SearchBuilder sb = createSearchBuilder(); sb.and("status", sb.entity().getStatus(), SearchCriteria.Op.EQ); @@ -515,162 +529,164 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao sb.closeParen(); } sb.done(); - - SearchCriteria sc = sb.create(); - - sc.setParameters("status", oldStatus); - sc.setParameters("id", host.getId()); - if (newStatus.checkManagementServer()) { - sc.setParameters("ping", oldPingTime); - sc.setParameters("msid", msId); - } - - UpdateBuilder ub = getUpdateBuilder(host); - ub.set(host, _statusAttr, newStatus); + + SearchCriteria sc = sb.create(); + + sc.setParameters("status", oldStatus); + sc.setParameters("id", host.getId()); + if (newStatus.checkManagementServer()) { + sc.setParameters("ping", oldPingTime); + sc.setParameters("msid", msId); + } + + UpdateBuilder ub = getUpdateBuilder(host); + ub.set(host, _statusAttr, newStatus); if (newStatus.updateManagementServer()) { if (newStatus.lostConnection()) { ub.set(host, _msIdAttr, null); - } else { + } else { ub.set(host, _msIdAttr, msId); } - if( event.equals(Event.Ping) || event.equals(Event.AgentConnected)) { - ub.set(host, _pingTimeAttr, System.currentTimeMillis() >> 10); - } + if (event.equals(Event.Ping) || event.equals(Event.AgentConnected)) { + ub.set(host, _pingTimeAttr, System.currentTimeMillis() >> 10); + } } - if ( event.equals(Event.ManagementServerDown)) { - ub.set(host, _pingTimeAttr, (( System.currentTimeMillis() >> 10) - ( 10 * 60 ))); - } - int result = update(ub, sc, null); - assert result <= 1 : "How can this update " + result + " rows? "; - - if (s_logger.isDebugEnabled() && result == 0) { - HostVO vo = findById(host.getId()); - assert vo != null : "How how how? : " + host.getId(); - - StringBuilder str = new StringBuilder("Unable to update host for event:").append(event.toString()); - str.append(". New=[status=").append(newStatus.toString()).append(":msid=").append(newStatus.lostConnection() ? "null" : msId).append(":lastpinged=").append(host.getLastPinged()).append("]"); - str.append("; Old=[status=").append(oldStatus.toString()).append(":msid=").append(msId).append(":lastpinged=").append(oldPingTime).append("]"); - str.append("; DB=[status=").append(vo.getStatus().toString()).append(":msid=").append(vo.getManagementServerId()).append(":lastpinged=").append(vo.getLastPinged()).append("]"); - s_logger.debug(str.toString()); - } - return result > 0; - } - - @Override - public boolean disconnect(HostVO host, Event event, long msId) { - host.setDisconnectedOn(new Date()); - if(event!=null && event.equals(Event.Remove)) { + if (event.equals(Event.ManagementServerDown)) { + ub.set(host, _pingTimeAttr, ((System.currentTimeMillis() >> 10) - (10 * 60))); + } + int result = update(ub, sc, null); + assert result <= 1 : "How can this update " + result + " rows? "; + + if (s_logger.isDebugEnabled() && result == 0) { + HostVO vo = findById(host.getId()); + assert vo != null : "How how how? : " + host.getId(); + + StringBuilder str = new StringBuilder("Unable to update host for event:").append(event.toString()); + str.append(". New=[status=").append(newStatus.toString()).append(":msid=").append(newStatus.lostConnection() ? "null" : msId).append(":lastpinged=").append(host.getLastPinged()) + .append("]"); + str.append("; Old=[status=").append(oldStatus.toString()).append(":msid=").append(msId).append(":lastpinged=").append(oldPingTime).append("]"); + str.append("; DB=[status=").append(vo.getStatus().toString()).append(":msid=").append(vo.getManagementServerId()).append(":lastpinged=").append(vo.getLastPinged()).append("]"); + s_logger.debug(str.toString()); + } + return result > 0; + } + + @Override + public boolean disconnect(HostVO host, Event event, long msId) { + host.setDisconnectedOn(new Date()); + if (event != null && event.equals(Event.Remove)) { host.setGuid(null); host.setClusterId(null); - } - return updateStatus(host, event, msId); - } - - @Override @DB - public boolean connect(HostVO host, long msId) { - Transaction txn = Transaction.currentTxn(); - long id = host.getId(); - txn.start(); - - if (!updateStatus(host, Event.AgentConnected, msId)) { - return false; - } - - txn.commit(); - return true; - } - - @Override - public HostVO findByGuid(String guid) { - SearchCriteria sc = GuidSearch.create("guid", guid); - return findOneBy(sc); + } + return updateStatus(host, event, msId); + } + + @Override + @DB + public boolean connect(HostVO host, long msId) { + Transaction txn = Transaction.currentTxn(); + long id = host.getId(); + txn.start(); + + if (!updateStatus(host, Event.AgentConnected, msId)) { + return false; + } + + txn.commit(); + return true; + } + + @Override + public HostVO findByGuid(String guid) { + SearchCriteria sc = GuidSearch.create("guid", guid); + return findOneBy(sc); } @Override public HostVO findByName(String name) { SearchCriteria sc = NameSearch.create("name", name); return findOneBy(sc); - } - - @Override - public List findLostHosts(long timeout) { - SearchCriteria sc = LastPingedSearch.create(); - sc.setParameters("ping", timeout); - sc.setParameters("state", Status.Up, Status.Updating, Status.Disconnected, Status.Connecting); - return listBy(sc); - } - + } + @Override - public List findHostsLike(String hostName) { - SearchCriteria sc = NameLikeSearch.create(); - sc.setParameters("name", "%" + hostName + "%"); - return listBy(sc); - } - - @Override - public List findLostHosts2(long timeout, Type type) { - SearchCriteria sc = LastPingedSearch2.create(); - sc.setParameters("ping", timeout); - sc.setParameters("type", type.toString()); - return listBy(sc); - } - - @Override - public List listByDataCenter(long dcId) { - SearchCriteria sc = DcSearch.create("dc", dcId); - return listBy(sc); - } + public List findLostHosts(long timeout) { + SearchCriteria sc = LastPingedSearch.create(); + sc.setParameters("ping", timeout); + sc.setParameters("state", Status.Up, Status.Updating, Status.Disconnected, Status.Connecting); + return listBy(sc); + } + + @Override + public List findHostsLike(String hostName) { + SearchCriteria sc = NameLikeSearch.create(); + sc.setParameters("name", "%" + hostName + "%"); + return listBy(sc); + } + + @Override + public List findLostHosts2(long timeout, Type type) { + SearchCriteria sc = LastPingedSearch2.create(); + sc.setParameters("ping", timeout); + sc.setParameters("type", type.toString()); + return listBy(sc); + } + + @Override + public List listByDataCenter(long dcId) { + SearchCriteria sc = DcSearch.create("dc", dcId); + return listBy(sc); + } @Override public HostVO findConsoleProxyHost(String name, Type type) { SearchCriteria sc = ConsoleProxyHostSearch.create(); sc.setParameters("name", name); sc.setParameters("type", type); - ListhostList = listBy(sc); + List hostList = listBy(sc); - if(hostList==null || hostList.size() == 0) { + if (hostList == null || hostList.size() == 0) { return null; } else { return hostList.get(0); } } - - @Override - public List listByHostPod(long podId) { - SearchCriteria sc = PodSearch.create("pod", podId); - return listBy(sc); - } - - @Override - public List listByStatus(Status... status) { - SearchCriteria sc = StatusSearch.create(); - sc.setParameters("status", (Object[])status); - return listBy(sc); - } - - @Override - public List listByTypeDataCenter(Type type, long dcId) { - SearchCriteria sc = TypeDcSearch.create(); - sc.setParameters("type", type.toString()); - sc.setParameters("dc", dcId); - - return listBy(sc); - } - - @Override - public List listByType(Type type) { - SearchCriteria sc = TypeSearch.create(); - sc.setParameters("type", type.toString()); - return listBy(sc); - } - @Override - public void saveDetails(HostVO host) { - Map details = host.getDetails(); - if (details == null) { - return; - } - _detailsDao.persist(host.getId(), details); + @Override + public List listByHostPod(long podId) { + SearchCriteria sc = PodSearch.create("pod", podId); + return listBy(sc); + } + + @Override + public List listByStatus(Status... status) { + SearchCriteria sc = StatusSearch.create(); + sc.setParameters("status", (Object[]) status); + return listBy(sc); + } + + @Override + public List listByTypeDataCenter(Type type, long dcId) { + SearchCriteria sc = TypeDcSearch.create(); + sc.setParameters("type", type.toString()); + sc.setParameters("dc", dcId); + + return listBy(sc); + } + + @Override + public List listByType(Type type) { + SearchCriteria sc = TypeSearch.create(); + sc.setParameters("type", type.toString()); + return listBy(sc); + } + + @Override + public void saveDetails(HostVO host) { + Map details = host.getDetails(); + if (details == null) { + return; + } + _detailsDao.persist(host.getId(), details); } protected void saveHostTags(HostVO host) { @@ -679,15 +695,16 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao return; } _hostTagsDao.persist(host.getId(), hostTags); - } - - @Override @DB + } + + @Override + @DB public HostVO persist(HostVO host) { final String InsertSequenceSql = "INSERT INTO op_host(id) VALUES(?)"; - - Transaction txn = Transaction.currentTxn(); - txn.start(); - + + Transaction txn = Transaction.currentTxn(); + txn.start(); + HostVO dbHost = super.persist(host); try { @@ -702,71 +719,69 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao loadDetails(dbHost); saveHostTags(host); loadHostTags(dbHost); - - txn.commit(); - - return dbHost; + + txn.commit(); + + return dbHost; } - @Override @DB - public boolean update(Long hostId, HostVO host) { - Transaction txn = Transaction.currentTxn(); - txn.start(); - - boolean persisted = super.update(hostId, host); - if (!persisted) { - return persisted; - } - - saveDetails(host); - saveHostTags(host); - - txn.commit(); - - return persisted; - } - - @Override @DB - public List getRunningHostCounts(Date cutTime) { - String sql = "select * from (" + - "select h.data_center_id, h.type, count(*) as count from host as h INNER JOIN mshost as m ON h.mgmt_server_id=m.msid " + - "where h.status='Up' and h.type='SecondaryStorage' and m.last_update > ? " + - "group by h.data_center_id, h.type " + - "UNION ALL " + - "select h.data_center_id, h.type, count(*) as count from host as h INNER JOIN mshost as m ON h.mgmt_server_id=m.msid " + - "where h.status='Up' and h.type='Routing' and m.last_update > ? " + - "group by h.data_center_id, h.type) as t " + - "ORDER by t.data_center_id, t.type"; - - ArrayList l = new ArrayList(); + @Override + @DB + public boolean update(Long hostId, HostVO host) { + Transaction txn = Transaction.currentTxn(); + txn.start(); - Transaction txn = Transaction.currentTxn();; - PreparedStatement pstmt = null; - try { + boolean persisted = super.update(hostId, host); + if (!persisted) { + return persisted; + } + + saveDetails(host); + saveHostTags(host); + + txn.commit(); + + return persisted; + } + + @Override + @DB + public List getRunningHostCounts(Date cutTime) { + String sql = "select * from (" + "select h.data_center_id, h.type, count(*) as count from host as h INNER JOIN mshost as m ON h.mgmt_server_id=m.msid " + + "where h.status='Up' and h.type='SecondaryStorage' and m.last_update > ? " + "group by h.data_center_id, h.type " + "UNION ALL " + + "select h.data_center_id, h.type, count(*) as count from host as h INNER JOIN mshost as m ON h.mgmt_server_id=m.msid " + + "where h.status='Up' and h.type='Routing' and m.last_update > ? " + "group by h.data_center_id, h.type) as t " + "ORDER by t.data_center_id, t.type"; + + ArrayList l = new ArrayList(); + + Transaction txn = Transaction.currentTxn(); + ; + PreparedStatement pstmt = null; + try { pstmt = txn.prepareAutoCloseStatement(sql); String gmtCutTime = DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime); pstmt.setString(1, gmtCutTime); pstmt.setString(2, gmtCutTime); - - ResultSet rs = pstmt.executeQuery(); - while(rs.next()) { - RunningHostCountInfo info = new RunningHostCountInfo(); - info.setDcId(rs.getLong(1)); - info.setHostType(rs.getString(2)); - info.setCount(rs.getInt(3)); - - l.add(info); - } - } catch (SQLException e) { - } catch (Throwable e) { - } - return l; - } - + + ResultSet rs = pstmt.executeQuery(); + while (rs.next()) { + RunningHostCountInfo info = new RunningHostCountInfo(); + info.setDcId(rs.getLong(1)); + info.setHostType(rs.getString(2)); + info.setCount(rs.getInt(3)); + + l.add(info); + } + } catch (SQLException e) { + } catch (Throwable e) { + } + return l; + } + @Override - public long getNextSequence(long hostId) { - if (s_logger.isTraceEnabled()) { - s_logger.trace("getNextSequence(), hostId: " + hostId); + public long getNextSequence(long hostId) { + if (s_logger.isTraceEnabled()) { + s_logger.trace("getNextSequence(), hostId: " + hostId); } TableGenerator tg = _tgs.get("host_req_sq"); @@ -808,13 +823,13 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao sc.setParameters("type", hostType); } - sc.setParameters("statuses", (Object[])statuses); + sc.setParameters("statuses", (Object[]) statuses); return customSearch(sc, null); } @Override - public long countRoutingHostsByDataCenter(long dcId){ + public long countRoutingHostsByDataCenter(long dcId) { SearchCriteria sc = CountRoutingByDc.create(); sc.setParameters("dc", dcId); sc.setParameters("type", Host.Type.Routing); @@ -834,4 +849,35 @@ public class HostDaoImpl extends GenericDaoBase implements HostDao return trafficHosts.get(0); } } + + @Override + public List listDirectHostsBy(long msId, Status status) { + SearchCriteria sc = DirectlyConnectedSearch.create(); + sc.setParameters("ms", msId); + if (status != null) { + sc.setParameters("statuses", Status.Up); + } + + return listBy(sc); + } + + @Override + public List listManagedDirectAgents() { + SearchCriteria sc = ManagedDirectConnectSearch.create(); + return listBy(sc); + } + + @Override + public List listManagedAgents() { + SearchCriteria sc = ManagedConnectSearch.create(); + return listBy(sc); + } + + @Override + public List listByManagementServer(long msId) { + SearchCriteria sc = MsStatusSearch.create(); + sc.setParameters("ms", msId); + + return listBy(sc); + } } diff --git a/setup/db/create-schema.sql b/setup/db/create-schema.sql index 5f3e02faf45..6c226d0634a 100755 --- a/setup/db/create-schema.sql +++ b/setup/db/create-schema.sql @@ -113,6 +113,7 @@ DROP TABLE IF EXISTS `cloud`.`user_vm_details`; DROP TABLE IF EXISTS `cloud`.`vpn_users`; DROP TABLE IF EXISTS `cloud`.`data_center_details`; DROP TABLE IF EXISTS `cloud`.`network_tags`; +DROP TABLE IF EXISTS `cloud`.`op_host_transfer`; CREATE TABLE `cloud`.`version` ( `id` bigint unsigned NOT NULL UNIQUE AUTO_INCREMENT COMMENT 'id', @@ -1520,4 +1521,16 @@ CREATE TABLE `cloud`.`swift` ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8; +CREATE TABLE `cloud`.`op_host_transfer` ( + `id` bigint unsigned UNIQUE NOT NULL COMMENT 'Id of the host', + `initial_mgmt_server_id` bigint unsigned COMMENT 'management server the host is transfered from', + `future_mgmt_server_id` bigint unsigned COMMENT 'management server the host is transfered to', + `state` varchar(32) NOT NULL COMMENT 'the transfer state of the host', + `created` datetime NOT NULL COMMENT 'date created', + PRIMARY KEY (`id`), + CONSTRAINT `fk_op_host_transfer__id` FOREIGN KEY `fk_op_host_transfer__id` (`id`) REFERENCES `host` (`id`), + CONSTRAINT `fk_op_host_transfer__initial_mgmt_server_id` FOREIGN KEY `fk_op_host_transfer__initial_mgmt_server_id`(`initial_mgmt_server_id`) REFERENCES `mshost`(`msid`), + CONSTRAINT `fk_op_host_transfer__future_mgmt_server_id` FOREIGN KEY `fk_op_host_transfer__future_mgmt_server_id`(`future_mgmt_server_id`) REFERENCES `mshost`(`msid`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; + SET foreign_key_checks = 1;