bug 9415: deleteHost - cleanup vms running on the host to be removed

status 9415: resolved fixed
This commit is contained in:
alena 2011-04-20 14:42:28 -07:00
parent 15e14fbf0a
commit f881d394e2
8 changed files with 3939 additions and 4305 deletions

View File

@ -16,8 +16,8 @@
*
*/
package com.cloud.api.commands;
package com.cloud.api.commands;
import org.apache.log4j.Logger;
import com.cloud.api.ApiConstants;
@ -28,52 +28,56 @@ import com.cloud.api.ServerApiException;
import com.cloud.api.response.SuccessResponse;
import com.cloud.user.Account;
@Implementation(description = "Deletes a host.", responseObject = SuccessResponse.class)
public class DeleteHostCmd extends BaseCmd {
public static final Logger s_logger = Logger.getLogger(DeleteHostCmd.class.getName());
@Implementation(description="Deletes a host.", responseObject=SuccessResponse.class)
public class DeleteHostCmd extends BaseCmd {
public static final Logger s_logger = Logger.getLogger(DeleteHostCmd.class.getName());
private static final String s_name = "deletehostresponse";
/////////////////////////////////////////////////////
//////////////// API parameters /////////////////////
/////////////////////////////////////////////////////
private static final String s_name = "deletehostresponse";
@Parameter(name=ApiConstants.ID, type=CommandType.LONG, required=true, description="the host ID")
// ///////////////////////////////////////////////////
// ////////////// API parameters /////////////////////
// ///////////////////////////////////////////////////
@Parameter(name = ApiConstants.ID, type = CommandType.LONG, required = true, description = "the host ID")
private Long id;
@Parameter(name = ApiConstants.FORCED, type = CommandType.BOOLEAN, description = "Force delete the host. All HA enabled vms running on the host will be put to HA; HA disabled ones will be stopped")
private Boolean forced;
/////////////////////////////////////////////////////
/////////////////// Accessors ///////////////////////
/////////////////////////////////////////////////////
// ///////////////////////////////////////////////////
// ///////////////// Accessors ///////////////////////
// ///////////////////////////////////////////////////
public Long getId() {
return id;
}
public boolean isForced() {
return (forced != null) ? forced : false;
}
/////////////////////////////////////////////////////
/////////////// API Implementation///////////////////
/////////////////////////////////////////////////////
// ///////////////////////////////////////////////////
// ///////////// API Implementation///////////////////
// ///////////////////////////////////////////////////
@Override
public String getCommandName() {
return s_name;
public String getCommandName() {
return s_name;
}
@Override
public long getEntityOwnerId() {
return Account.ACCOUNT_ID_SYSTEM;
}
@Override
public void execute(){
boolean result = _resourceService.deleteHost(this);
public void execute() {
boolean result = _resourceService.deleteHost(getId(), isForced());
if (result) {
SuccessResponse response = new SuccessResponse(getCommandName());
this.setResponseObject(response);
} else {
throw new ServerApiException(BaseCmd.INTERNAL_ERROR, "Failed to delete host");
}
}
}
}

View File

@ -16,8 +16,8 @@
*
*/
package com.cloud.api.commands;
package com.cloud.api.commands;
import org.apache.log4j.Logger;
import com.cloud.api.ApiConstants;
@ -34,37 +34,36 @@ import com.cloud.exception.ResourceUnavailableException;
import com.cloud.network.router.VirtualRouter;
import com.cloud.user.Account;
@Implementation(description = "Stops a router.", responseObject = DomainRouterResponse.class)
public class StopRouterCmd extends BaseAsyncCmd {
public static final Logger s_logger = Logger.getLogger(StopRouterCmd.class.getName());
private static final String s_name = "stoprouterresponse";
@Implementation(description="Stops a router.", responseObject=DomainRouterResponse.class)
public class StopRouterCmd extends BaseAsyncCmd {
public static final Logger s_logger = Logger.getLogger(StopRouterCmd.class.getName());
private static final String s_name = "stoprouterresponse";
/////////////////////////////////////////////////////
//////////////// API parameters /////////////////////
/////////////////////////////////////////////////////
// ///////////////////////////////////////////////////
// ////////////// API parameters /////////////////////
// ///////////////////////////////////////////////////
@Parameter(name=ApiConstants.ID, type=CommandType.LONG, required=true, description="the ID of the router")
@Parameter(name = ApiConstants.ID, type = CommandType.LONG, required = true, description = "the ID of the router")
private Long id;
@Parameter(name=ApiConstants.FORCED, type=CommandType.BOOLEAN, required=false, description="Force stop the VM. The caller knows the VM is stopped.")
@Parameter(name = ApiConstants.FORCED, type = CommandType.BOOLEAN, required = false, description = "Force stop the VM. The caller knows the VM is stopped.")
private Boolean forced;
/////////////////////////////////////////////////////
/////////////////// Accessors ///////////////////////
/////////////////////////////////////////////////////
// ///////////////////////////////////////////////////
// ///////////////// Accessors ///////////////////////
// ///////////////////////////////////////////////////
public Long getId() {
return id;
}
/////////////////////////////////////////////////////
/////////////// API Implementation///////////////////
/////////////////////////////////////////////////////
// ///////////////////////////////////////////////////
// ///////////// API Implementation///////////////////
// ///////////////////////////////////////////////////
@Override
public String getCommandName() {
return s_name;
public String getCommandName() {
return s_name;
}
@Override
@ -81,35 +80,35 @@ public class StopRouterCmd extends BaseAsyncCmd {
public String getEventType() {
return EventTypes.EVENT_ROUTER_STOP;
}
@Override
public String getEventDescription() {
return "stopping router: " + getId();
return "stopping router: " + getId();
}
@Override
public AsyncJob.Type getInstanceType() {
return AsyncJob.Type.DomainRouter;
return AsyncJob.Type.DomainRouter;
}
@Override
public Long getInstanceId() {
return getId();
return getId();
}
public boolean isForced() {
return (forced != null) ? forced : false;
}
@Override
public void execute() throws ConcurrentOperationException, ResourceUnavailableException{
public void execute() throws ConcurrentOperationException, ResourceUnavailableException {
VirtualRouter result = _routerService.stopRouter(getId(), isForced());
if (result != null){
DomainRouterResponse response =_responseGenerator.createDomainRouterResponse(result);
if (result != null) {
DomainRouterResponse response = _responseGenerator.createDomainRouterResponse(result);
response.setResponseName(getCommandName());
this.setResponseObject(response);
} else {
throw new ServerApiException(BaseCmd.INTERNAL_ERROR, "Failed to stop router");
}
}
}
}
}

View File

@ -24,7 +24,6 @@ import com.cloud.api.commands.AddHostCmd;
import com.cloud.api.commands.AddSecondaryStorageCmd;
import com.cloud.api.commands.CancelMaintenanceCmd;
import com.cloud.api.commands.DeleteClusterCmd;
import com.cloud.api.commands.DeleteHostCmd;
import com.cloud.api.commands.PrepareForMaintenanceCmd;
import com.cloud.api.commands.ReconnectHostCmd;
import com.cloud.api.commands.UpdateHostCmd;
@ -66,12 +65,12 @@ public interface ResourceService {
Host maintain(PrepareForMaintenanceCmd cmd) throws InvalidParameterValueException;
/**
* Deletes a host
* @param hostId TODO
* @param isForced TODO
*
* @param cmd - the command specifying hostId
* @param true if deleted, false otherwise
* @throws InvalidParameterValueException
*/
boolean deleteHost(DeleteHostCmd cmd) throws InvalidParameterValueException;
boolean deleteHost(long hostId, boolean isForced);
Host getHost(long hostId);

View File

@ -44,192 +44,224 @@ import com.cloud.service.ServiceOfferingVO;
import com.cloud.storage.StoragePoolVO;
import com.cloud.storage.VMTemplateVO;
import com.cloud.template.VirtualMachineTemplate;
import com.cloud.user.User;
import com.cloud.uservm.UserVm;
import com.cloud.utils.Pair;
import com.cloud.utils.component.Manager;
import com.cloud.vm.VMInstanceVO;
/**
* AgentManager manages hosts. It directly coordinates between the
* DAOs and the connections it manages.
* AgentManager manages hosts. It directly coordinates between the DAOs and the connections it manages.
*/
public interface AgentManager extends Manager {
public enum OnError {
Revert,
Continue,
Stop
Revert, Continue, Stop
}
/**
* easy send method that returns null if there's any errors. It handles all exceptions.
*
* @param hostId host id
* @param cmd command to send.
* @return Answer if successful; null if not.
*/
/**
* easy send method that returns null if there's any errors. It handles all exceptions.
*
* @param hostId
* host id
* @param cmd
* command to send.
* @return Answer if successful; null if not.
*/
Answer easySend(Long hostId, Command cmd);
/**
* Synchronous sending a command to the agent.
*
* @param hostId id of the agent on host
* @param cmd command
* @param hostId
* id of the agent on host
* @param cmd
* command
* @return an Answer
*/
Answer send(Long hostId, Command cmd, int timeout) throws AgentUnavailableException, OperationTimedoutException;
Answer send(Long hostId, Command cmd) throws AgentUnavailableException, OperationTimedoutException;
/**
* Synchronous sending a list of commands to the agent.
*
* @param hostId id of the agent on host
* @param cmds array of commands
* @param isControl Commands sent contains control commands
* @param stopOnError should the agent stop execution on the first error.
* @param hostId
* id of the agent on host
* @param cmds
* array of commands
* @param isControl
* Commands sent contains control commands
* @param stopOnError
* should the agent stop execution on the first error.
* @return an array of Answer
*/
Answer[] send(Long hostId, Commands cmds) throws AgentUnavailableException, OperationTimedoutException;
Answer[] send(Long hostId, Commands cmds, int timeout) throws AgentUnavailableException, OperationTimedoutException;
/**
* Asynchronous sending of a command to the agent.
* @param hostId id of the agent on the host.
* @param cmd Command to send.
* @param listener the listener to process the answer.
*
* @param hostId
* id of the agent on the host.
* @param cmd
* Command to send.
* @param listener
* the listener to process the answer.
* @return sequence number.
*/
long gatherStats(Long hostId, Command cmd, Listener listener);
/**
* Asynchronous sending of a command to the agent.
* @param hostId id of the agent on the host.
* @param cmds Commands to send.
* @param stopOnError should the agent stop execution on the first error.
* @param listener the listener to process the answer.
*
* @param hostId
* id of the agent on the host.
* @param cmds
* Commands to send.
* @param stopOnError
* should the agent stop execution on the first error.
* @param listener
* the listener to process the answer.
* @return sequence number.
*/
long send(Long hostId, Commands cmds, Listener listener) throws AgentUnavailableException;
/**
* Register to listen for host events. These are mostly connection and
* disconnection events.
* Register to listen for host events. These are mostly connection and disconnection events.
*
* @param listener
* @param connections listen for connections
* @param commands listen for connections
* @param priority in listening for events.
* @param connections
* listen for connections
* @param commands
* listen for connections
* @param priority
* in listening for events.
* @return id to unregister if needed.
*/
int registerForHostEvents(Listener listener, boolean connections, boolean commands, boolean priority);
/**
* Unregister for listening to host events.
* @param id returned from registerForHostEvents
*
* @param id
* returned from registerForHostEvents
*/
void unregisterForHostEvents(int id);
/**
* @return hosts currently connected.
*/
Set<Long> getConnectedHosts();
/**
* Disconnect the agent.
*
* @param hostId host to disconnect.
* @param reason the reason why we're disconnecting.
* @param hostId
* host to disconnect.
* @param reason
* the reason why we're disconnecting.
*
*/
void disconnect(long hostId, Status.Event event, boolean investigate);
/**
* Obtains statistics for a host; vCPU utilisation, memory utilisation, and network utilisation
*
* @param hostId
* @return HostStats
*/
HostStats getHostStatistics(long hostId);
Long getGuestOSCategoryId(long hostId);
String getHostTags(long hostId);
/**
* Find a host based on the type needed, data center to deploy in, pod
* to deploy in, service offering, template, and list of host to avoid.
*/
HostStats getHostStatistics(long hostId);
Host findHost(Host.Type type, DataCenterVO dc, HostPodVO pod, StoragePoolVO sp, ServiceOfferingVO offering, VMTemplateVO template, VMInstanceVO vm, Host currentHost, Set<Host> avoid);
List<PodCluster> listByDataCenter(long dcId);
List<PodCluster> listByPod(long podId);
Long getGuestOSCategoryId(long hostId);
/**
* Adds a new host
* @param zoneId
* @param resource
* @param hostType
* @param hostDetails
* @return new Host
*/
public Host addHost(long zoneId, ServerResource resource, Type hostType, Map<String, String> hostDetails);
/**
String getHostTags(long hostId);
/**
* Find a host based on the type needed, data center to deploy in, pod to deploy in, service offering, template, and list of
* host to avoid.
*/
Host findHost(Host.Type type, DataCenterVO dc, HostPodVO pod, StoragePoolVO sp, ServiceOfferingVO offering, VMTemplateVO template, VMInstanceVO vm, Host currentHost, Set<Host> avoid);
List<PodCluster> listByDataCenter(long dcId);
List<PodCluster> listByPod(long podId);
/**
* Adds a new host
*
* @param zoneId
* @param resource
* @param hostType
* @param hostDetails
* @return new Host
*/
public Host addHost(long zoneId, ServerResource resource, Type hostType, Map<String, String> hostDetails);
/**
* Deletes a host
*
* @param hostId
* @param isForced
* TODO
* @param caller
* TODO
* @param true if deleted, false otherwise
*/
boolean deleteHost(long hostId);
boolean deleteHost(long hostId, boolean isForced, User caller);
/**
* Find a pod based on the user id, template, and data center.
*
* @param template
* @param dc
* @param userId
* @return
*/
/**
* Find a pod based on the user id, template, and data center.
*
* @param template
* @param dc
* @param userId
* @return
*/
Pair<HostPodVO, Long> findPod(VirtualMachineTemplate template, ServiceOfferingVO offering, DataCenterVO dc, long userId, Set<Long> avoids);
/**
* Put the agent in maintenance mode.
*
* @param hostId id of the host to put in maintenance mode.
* @return true if it was able to put the agent into maintenance mode. false if not.
* @param hostId
* id of the host to put in maintenance mode.
* @return true if it was able to put the agent into maintenance mode. false if not.
*/
boolean maintain(long hostId) throws AgentUnavailableException;
boolean maintenanceFailed(long hostId);
/**
* Cancel the maintenance mode.
*
* @param hostId host id
* @return true if it's done. false if not.
* @param hostId
* host id
* @return true if it's done. false if not.
*/
boolean cancelMaintenance(long hostId);
/**
* Check to see if a virtual machine can be upgraded to the given service offering
*
*
* @param vm
* @param offering
* @return true if the host can handle the upgrade, false otherwise
*/
boolean isVirtualMachineUpgradable(final UserVm vm, final ServiceOffering offering);
public boolean executeUserRequest(long hostId, Event event) throws AgentUnavailableException;
public boolean reconnect(final long hostId) throws AgentUnavailableException;
public List<HostVO> discoverHosts(Long dcId, Long podId, Long clusterId, String clusterName, String url, String username, String password, String hypervisor, List<String> hostTags)
throws IllegalArgumentException, DiscoveryException, InvalidParameterValueException;
public List<HostVO> discoverHosts(Long dcId, Long podId, Long clusterId, String clusterName, String url, String username, String password, String hypervisor, List<String> hostTags)
throws IllegalArgumentException, DiscoveryException, InvalidParameterValueException;
Answer easySend(Long hostId, Command cmd, int timeout);
Answer easySend(Long hostId, Command cmd, int timeout);
boolean isHostNativeHAEnabled(long hostId);
Answer sendTo(Long dcId, HypervisorType type, Command cmd);
void notifyAnswersToMonitors(long agentId, long seq, Answer[] answers);
}

File diff suppressed because it is too large Load Diff

View File

@ -40,6 +40,7 @@ import com.cloud.host.Status.Event;
import com.cloud.resource.ResourceService;
import com.cloud.resource.ServerResource;
import com.cloud.storage.resource.DummySecondaryStorageResource;
import com.cloud.user.User;
import com.cloud.utils.component.Inject;
import com.cloud.utils.db.DB;
import com.cloud.utils.db.GlobalLock;
@ -47,538 +48,544 @@ import com.cloud.utils.db.Transaction;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.nio.Link;
import com.cloud.utils.nio.Task;
@Local(value={AgentManager.class, ResourceService.class})
@Local(value = { AgentManager.class, ResourceService.class })
public class ClusteredAgentManagerImpl extends AgentManagerImpl implements ClusterManagerListener {
final static Logger s_logger = Logger.getLogger(ClusteredAgentManagerImpl.class);
public final static long STARTUP_DELAY = 5000;
public final static long SCAN_INTERVAL = 90000; // 90 seconds, it takes 60 sec for xenserver to fail login
public final static int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds
public final static long LOAD_SIZE = 100;
@Inject protected ClusterManager _clusterMgr = null;
final static Logger s_logger = Logger.getLogger(ClusteredAgentManagerImpl.class);
public final static long STARTUP_DELAY = 5000;
public final static long SCAN_INTERVAL = 90000; // 90 seconds, it takes 60 sec for xenserver to fail login
public final static int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5; // 5 seconds
public final static long LOAD_SIZE = 100;
@Inject
protected ClusterManager _clusterMgr = null;
protected HashMap<String, SocketChannel> _peers;
private final Timer _timer = new Timer("ClusteredAgentManager Timer");
@Inject protected ManagementServerHostDao _mshostDao;
@Inject
protected ManagementServerHostDao _mshostDao;
protected ClusteredAgentManagerImpl() {
super();
super();
}
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
_peers = new HashMap<String, SocketChannel>(7);
@Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
_peers = new HashMap<String, SocketChannel>(7);
_nodeId = _clusterMgr.getManagementNodeId();
ClusteredAgentAttache.initialize(this);
_clusterMgr.registerListener(this);
return super.configure(name, params);
}
@Override
public boolean start() {
if (!super.start()) {
return false;
}
}
@Override
public boolean start() {
if (!super.start()) {
return false;
}
_timer.schedule(new DirectAgentScanTimerTask(), STARTUP_DELAY, SCAN_INTERVAL);
return true;
}
private void runDirectAgentScanTimerTask() {
GlobalLock scanLock = GlobalLock.getInternLock("clustermgr.scan");
try {
if(scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
try {
scanDirectAgentToLoad();
} finally {
scanLock.unlock();
}
}
} finally {
scanLock.releaseRef();
}
}
private void scanDirectAgentToLoad() {
if(s_logger.isTraceEnabled()) {
}
private void runDirectAgentScanTimerTask() {
GlobalLock scanLock = GlobalLock.getInternLock("clustermgr.scan");
try {
if (scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
try {
scanDirectAgentToLoad();
} finally {
scanLock.unlock();
}
}
} finally {
scanLock.releaseRef();
}
}
private void scanDirectAgentToLoad() {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Begin scanning directly connected hosts");
}
// for agents that are self-managed, threshold to be considered as disconnected is 3 ping intervals
long cutSeconds = (System.currentTimeMillis() >> 10) - (_pingInterval*3);
List<HostVO> hosts = _hostDao.findDirectAgentToLoad(_clusterMgr.getManagementNodeId(), cutSeconds, LOAD_SIZE);
if ( hosts != null && hosts.size() == LOAD_SIZE ) {
Long clusterId = hosts.get((int)(LOAD_SIZE-1)).getClusterId();
if ( clusterId != null) {
for ( int i = (int)(LOAD_SIZE-1); i > 0; i-- ) {
if ( hosts.get(i).getClusterId() == clusterId ) {
// for agents that are self-managed, threshold to be considered as disconnected is 3 ping intervals
long cutSeconds = (System.currentTimeMillis() >> 10) - (_pingInterval * 3);
List<HostVO> hosts = _hostDao.findDirectAgentToLoad(_clusterMgr.getManagementNodeId(), cutSeconds, LOAD_SIZE);
if (hosts != null && hosts.size() == LOAD_SIZE) {
Long clusterId = hosts.get((int) (LOAD_SIZE - 1)).getClusterId();
if (clusterId != null) {
for (int i = (int) (LOAD_SIZE - 1); i > 0; i--) {
if (hosts.get(i).getClusterId() == clusterId) {
hosts.remove(i);
} else {
break;
}
}
}
}
if(hosts != null && hosts.size() > 0) {
for(HostVO host: hosts) {
AgentAttache agentattache = findAttache(host.getId());
if(agentattache != null) {
// already loaded, skip
if(agentattache.forForward()) {
if(s_logger.isInfoEnabled()) {
}
}
if (hosts != null && hosts.size() > 0) {
for (HostVO host : hosts) {
AgentAttache agentattache = findAttache(host.getId());
if (agentattache != null) {
// already loaded, skip
if (agentattache.forForward()) {
if (s_logger.isInfoEnabled()) {
s_logger.info("Host " + host.getName() + " is detected down, but we have a forward attache running, disconnect this one before launching the host");
}
removeAgent(agentattache, Status.Disconnected);
} else {
continue;
}
}
if(s_logger.isDebugEnabled()) {
removeAgent(agentattache, Status.Disconnected);
} else {
continue;
}
}
if (s_logger.isDebugEnabled()) {
s_logger.debug("Loading directly connected host " + host.getId() + "(" + host.getName() + ")");
}
loadDirectlyConnectedHost(host);
}
}
if(s_logger.isTraceEnabled()) {
loadDirectlyConnectedHost(host);
}
}
if (s_logger.isTraceEnabled()) {
s_logger.trace("End scanning directly connected hosts");
}
}
private class DirectAgentScanTimerTask extends TimerTask {
}
private class DirectAgentScanTimerTask extends TimerTask {
@Override
public void run() {
try {
runDirectAgentScanTimerTask();
} catch(Throwable e) {
s_logger.error("Unexpected exception " + e.getMessage(), e);
}
try {
runDirectAgentScanTimerTask();
} catch (Throwable e) {
s_logger.error("Unexpected exception " + e.getMessage(), e);
}
}
}
}
@Override
public Task create(Task.Type type, Link link, byte[] data) {
return new ClusteredAgentHandler(type, link, data);
}
return new ClusteredAgentHandler(type, link, data);
}
@Override
public boolean cancelMaintenance(final long hostId) {
try {
Boolean result = _clusterMgr.propagateAgentEvent(hostId, Event.ResetRequested);
if (result != null) {
return result;
}
} catch (AgentUnavailableException e) {
return false;
}
return super.cancelMaintenance(hostId);
try {
Boolean result = _clusterMgr.propagateAgentEvent(hostId, Event.ResetRequested);
if (result != null) {
return result;
}
} catch (AgentUnavailableException e) {
return false;
}
return super.cancelMaintenance(hostId);
}
protected AgentAttache createAttache(long id) {
s_logger.debug("create forwarding ClusteredAgentAttache for " + id);
final AgentAttache attache = new ClusteredAgentAttache(this, id);
AgentAttache old = null;
synchronized(_agents) {
old = _agents.get(id);
synchronized (_agents) {
old = _agents.get(id);
_agents.put(id, attache);
}
if( old != null ) {
if (old != null) {
old.disconnect(Status.Removed);
}
return attache;
}
@Override
protected AgentAttache createAttache(long id, HostVO server, Link link) {
protected AgentAttache createAttache(long id, HostVO server, Link link) {
s_logger.debug("create ClusteredAgentAttache for " + id);
final AgentAttache attache = new ClusteredAgentAttache(this, id, link, server.getStatus() == Status.Maintenance || server.getStatus() == Status.ErrorInMaintenance || server.getStatus() == Status.PrepareForMaintenance);
final AgentAttache attache = new ClusteredAgentAttache(this, id, link, server.getStatus() == Status.Maintenance || server.getStatus() == Status.ErrorInMaintenance
|| server.getStatus() == Status.PrepareForMaintenance);
link.attach(attache);
AgentAttache old = null;
synchronized(_agents) {
old = _agents.get(id);
synchronized (_agents) {
old = _agents.get(id);
_agents.put(id, attache);
}
if( old != null ) {
if (old != null) {
old.disconnect(Status.Removed);
}
return attache;
}
@Override
protected AgentAttache createAttache(long id, HostVO server, ServerResource resource) {
if (resource instanceof DummySecondaryStorageResource) {
return new DummyAttache(this, id, false);
}
s_logger.debug("create ClusteredDirectAgentAttache for " + id);
final DirectAgentAttache attache = new ClusteredDirectAgentAttache(this, id, _nodeId, resource, server.getStatus() == Status.Maintenance
|| server.getStatus() == Status.ErrorInMaintenance || server.getStatus() == Status.PrepareForMaintenance, this);
final DirectAgentAttache attache = new ClusteredDirectAgentAttache(this, id, _nodeId, resource, server.getStatus() == Status.Maintenance || server.getStatus() == Status.ErrorInMaintenance
|| server.getStatus() == Status.PrepareForMaintenance, this);
AgentAttache old = null;
synchronized (_agents) {
old = _agents.get(id);
_agents.put(id, attache);
}
if( old != null ) {
if (old != null) {
old.disconnect(Status.Removed);
}
return attache;
}
@Override
protected boolean handleDisconnect(AgentAttache attache, Status.Event event, boolean investigate) {
return handleDisconnect(attache, event, investigate, true);
}
protected boolean handleDisconnect(AgentAttache agent, Status.Event event, boolean investigate, boolean broadcast) {
if( agent == null ) {
protected boolean handleDisconnect(AgentAttache agent, Status.Event event, boolean investigate, boolean broadcast) {
if (agent == null) {
return true;
}
if (super.handleDisconnect(agent, event, investigate)) {
if (broadcast) {
notifyNodesInCluster(agent);
}
return true;
} else {
return false;
}
if (super.handleDisconnect(agent, event, investigate)) {
if (broadcast) {
notifyNodesInCluster(agent);
}
return true;
} else {
return false;
}
}
@Override
public boolean executeUserRequest(long hostId, Event event) throws AgentUnavailableException {
if (event == Event.AgentDisconnected) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Received agent disconnect event for host " + hostId);
}
AgentAttache attache = findAttache(hostId);
AgentAttache attache = findAttache(hostId);
if (attache != null) {
handleDisconnect(attache, Event.AgentDisconnected, false, false);
}
return true;
} else {
return super.executeUserRequest(hostId, event);
}
}
@Override
public boolean maintain(final long hostId) throws AgentUnavailableException {
Boolean result = _clusterMgr.propagateAgentEvent(hostId, Event.MaintenanceRequested);
if (result != null) {
return result;
}
return super.maintain(hostId);
Boolean result = _clusterMgr.propagateAgentEvent(hostId, Event.MaintenanceRequested);
if (result != null) {
return result;
}
return super.maintain(hostId);
}
@Override
public boolean reconnect(final long hostId) throws AgentUnavailableException {
Boolean result = _clusterMgr.propagateAgentEvent(hostId, Event.ShutdownRequested);
if (result != null) {
return result;
}
return super.reconnect(hostId);
}
@Override @DB
public boolean deleteHost(long hostId) {
try {
Boolean result = _clusterMgr.propagateAgentEvent(hostId, Event.Remove);
if (result != null) {
return result;
}
} catch (AgentUnavailableException e) {
return false;
}
return super.deleteHost(hostId);
@Override
@DB
public boolean deleteHost(long hostId, boolean isForced, User caller) {
try {
Boolean result = _clusterMgr.propagateAgentEvent(hostId, Event.Remove);
if (result != null) {
return result;
}
} catch (AgentUnavailableException e) {
return false;
}
return super.deleteHost(hostId, isForced, caller);
}
public void notifyNodesInCluster(AgentAttache attache) {
s_logger.debug("Notifying other nodes of to disconnect");
Command[] cmds = new Command[] { new ChangeAgentCommand(attache.getId(), Event.AgentDisconnected) };
_clusterMgr.broadcast(attache.getId(), cmds);
Command[] cmds = new Command[] { new ChangeAgentCommand(attache.getId(), Event.AgentDisconnected) };
_clusterMgr.broadcast(attache.getId(), cmds);
}
protected static void logT(byte[] bytes, final String msg) {
s_logger.trace("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": " + (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
s_logger.trace("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": "
+ (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
}
protected static void logD(byte[] bytes, final String msg) {
s_logger.debug("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": " + (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
s_logger.debug("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": "
+ (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
}
protected static void logI(byte[] bytes, final String msg) {
s_logger.info("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": " + (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
s_logger.info("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": "
+ (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
}
public boolean routeToPeer(String peer, byte[] bytes) {
int i = 0;
SocketChannel ch = null;
while (i++ < 5) {
ch = connectToPeer(peer, ch);
if (ch == null) {
try {
logD(bytes, "Unable to route to peer: " + Request.parse(bytes).toString());
} catch (Exception e) {
}
return false;
}
try {
if (s_logger.isDebugEnabled()) {
logD(bytes, "Routing to peer");
}
Link.write(ch, new ByteBuffer[] { ByteBuffer.wrap(bytes) });
return true;
} catch (IOException e) {
try {
logI(bytes, "Unable to route to peer: " + Request.parse(bytes).toString() + " due to " + e.getMessage());
} catch (Exception ex) {
}
}
}
return false;
int i = 0;
SocketChannel ch = null;
while (i++ < 5) {
ch = connectToPeer(peer, ch);
if (ch == null) {
try {
logD(bytes, "Unable to route to peer: " + Request.parse(bytes).toString());
} catch (Exception e) {
}
return false;
}
try {
if (s_logger.isDebugEnabled()) {
logD(bytes, "Routing to peer");
}
Link.write(ch, new ByteBuffer[] { ByteBuffer.wrap(bytes) });
return true;
} catch (IOException e) {
try {
logI(bytes, "Unable to route to peer: " + Request.parse(bytes).toString() + " due to " + e.getMessage());
} catch (Exception ex) {
}
}
}
return false;
}
public String findPeer(long hostId) {
return _clusterMgr.getPeerName(hostId);
}
public void cancel(String peerName, long hostId, long sequence, String reason) {
CancelCommand cancel = new CancelCommand(sequence, reason);
Request req = new Request(-1, hostId, _nodeId, cancel, true);
req.setControl(true);
routeToPeer(peerName, req.getBytes());
}
public void closePeer(String peerName) {
synchronized(_peers) {
SocketChannel ch = _peers.get(peerName);
if(ch != null) {
try {
ch.close();
} catch(IOException e) {
s_logger.warn("Unable to close peer socket connection to " + peerName);
}
}
_peers.remove(peerName);
}
synchronized (_peers) {
SocketChannel ch = _peers.get(peerName);
if (ch != null) {
try {
ch.close();
} catch (IOException e) {
s_logger.warn("Unable to close peer socket connection to " + peerName);
}
}
_peers.remove(peerName);
}
}
public SocketChannel connectToPeer(String peerName, SocketChannel prevCh) {
synchronized(_peers) {
SocketChannel ch = _peers.get(peerName);
if (prevCh != null) {
try {
prevCh.close();
} catch (Exception e) {
}
}
if (ch == null || ch == prevCh) {
ManagementServerHostVO ms = _clusterMgr.getPeer(peerName);
if (ms == null) {
s_logger.info("Unable to find peer: " + peerName);
return null;
}
String ip = ms.getServiceIP();
InetAddress addr;
try {
addr = InetAddress.getByName(ip);
} catch (UnknownHostException e) {
throw new CloudRuntimeException("Unable to resolve " + ip);
}
try {
ch = SocketChannel.open(new InetSocketAddress(addr, _port));
ch.configureBlocking(true); // make sure we are working at blocking mode
ch.socket().setKeepAlive(true);
ch.socket().setSoTimeout(60 * 1000);
if (s_logger.isDebugEnabled()) {
s_logger.debug("Connection to peer opened: " + peerName + ", ip: " + ip);
}
_peers.put(peerName, ch);
} catch (IOException e) {
s_logger.warn("Unable to connect to peer management server: " + peerName + ", ip: " + ip + " due to " + e.getMessage(), e);
return null;
}
}
if (s_logger.isTraceEnabled()) {
s_logger.trace("Found open channel for peer: " + peerName);
}
return ch;
}
synchronized (_peers) {
SocketChannel ch = _peers.get(peerName);
if (prevCh != null) {
try {
prevCh.close();
} catch (Exception e) {
}
}
if (ch == null || ch == prevCh) {
ManagementServerHostVO ms = _clusterMgr.getPeer(peerName);
if (ms == null) {
s_logger.info("Unable to find peer: " + peerName);
return null;
}
String ip = ms.getServiceIP();
InetAddress addr;
try {
addr = InetAddress.getByName(ip);
} catch (UnknownHostException e) {
throw new CloudRuntimeException("Unable to resolve " + ip);
}
try {
ch = SocketChannel.open(new InetSocketAddress(addr, _port));
ch.configureBlocking(true); // make sure we are working at blocking mode
ch.socket().setKeepAlive(true);
ch.socket().setSoTimeout(60 * 1000);
if (s_logger.isDebugEnabled()) {
s_logger.debug("Connection to peer opened: " + peerName + ", ip: " + ip);
}
_peers.put(peerName, ch);
} catch (IOException e) {
s_logger.warn("Unable to connect to peer management server: " + peerName + ", ip: " + ip + " due to " + e.getMessage(), e);
return null;
}
}
if (s_logger.isTraceEnabled()) {
s_logger.trace("Found open channel for peer: " + peerName);
}
return ch;
}
}
public SocketChannel connectToPeer(long hostId, SocketChannel prevCh) {
String peerName = _clusterMgr.getPeerName(hostId);
if (peerName == null) {
return null;
}
return connectToPeer(peerName, prevCh);
String peerName = _clusterMgr.getPeerName(hostId);
if (peerName == null) {
return null;
}
return connectToPeer(peerName, prevCh);
}
@Override
protected AgentAttache getAttache(final Long hostId) throws AgentUnavailableException {
protected AgentAttache getAttache(final Long hostId) throws AgentUnavailableException {
assert (hostId != null) : "Who didn't check their id value?";
HostVO host = _hostDao.findById(hostId);
if( host == null) {
if (host == null) {
throw new AgentUnavailableException("Can't find the host ", hostId);
}
AgentAttache agent = findAttache(hostId);
if (agent == null) {
if (host.getStatus() == Status.Up && (host.getManagementServerId() != null && host.getManagementServerId() != _nodeId)) {
agent = createAttache(hostId);
}
}
AgentAttache agent = findAttache(hostId);
if (agent == null) {
if (host.getStatus() == Status.Up && (host.getManagementServerId() != null && host.getManagementServerId() != _nodeId)) {
agent = createAttache(hostId);
}
}
if (agent == null) {
throw new AgentUnavailableException("Host is not in the right state", hostId);
}
return agent;
}
@Override
public boolean stop() {
if(_peers != null) {
for (SocketChannel ch : _peers.values()) {
try {
s_logger.info("Closing: " + ch.toString());
ch.close();
} catch (IOException e) {
}
}
}
_timer.cancel();
return super.stop();
if (_peers != null) {
for (SocketChannel ch : _peers.values()) {
try {
s_logger.info("Closing: " + ch.toString());
ch.close();
} catch (IOException e) {
}
}
}
_timer.cancel();
return super.stop();
}
@Override
public void startDirectlyConnectedHosts() {
// override and let it be dummy for purpose, we will scan and load direct agents periodically.
// We may also pickup agents that have been left over from other crashed management server
// override and let it be dummy for purpose, we will scan and load direct agents periodically.
// We may also pickup agents that have been left over from other crashed management server
}
public class ClusteredAgentHandler extends AgentHandler {
public ClusteredAgentHandler(Task.Type type, Link link, byte[] data) {
super(type, link, data);
}
@Override
protected void doTask(final Task task) throws Exception {
Transaction txn = Transaction.open(Transaction.CLOUD_DB);
try {
if (task.getType() != Task.Type.DATA) {
super.doTask(task);
return;
}
final byte[] data = task.getData();
Version ver = Request.getVersion(data);
if (ver.ordinal() < Version.v3.ordinal()) {
super.doTask(task);
return;
}
long hostId = Request.getAgentId(data);
Link link = task.getLink();
if (Request.fromServer(data)) {
AgentAttache agent = findAgent(hostId);
if (Request.isControl(data)) {
if (agent == null) {
logD(data, "No attache to process cancellation");
return;
}
Request req = Request.parse(data);
Command[] cmds = req.getCommands();
CancelCommand cancel = (CancelCommand)cmds[0];
if (s_logger.isDebugEnabled()) {
logD(data, "Cancel request received");
}
agent.cancel(cancel.getSequence());
return;
}
try {
if (agent == null || agent.isClosed()) {
throw new AgentUnavailableException("Unable to route to agent ", hostId);
}
if (Request.isRequest(data) && Request.requiresSequentialExecution(data)) {
// route it to the agent.
// But we have the serialize the control commands here so we have
// to deserialize this and send it through the agent attache.
Request req = Request.parse(data);
agent.send(req, null);
return;
} else {
if (agent instanceof Routable) {
Routable cluster = (Routable)agent;
cluster.routeToAgent(data);
} else {
agent.send(Request.parse(data));
}
return;
}
} catch (AgentUnavailableException e) {
logD(data, e.getMessage());
cancel(Long.toString(Request.getManagementServerId(data)), hostId, Request.getSequence(data), e.getMessage());
}
} else {
long mgmtId = Request.getManagementServerId(data);
if (mgmtId != -1 && mgmtId != _nodeId) {
routeToPeer(Long.toString(mgmtId), data);
if (Request.requiresSequentialExecution(data)) {
AgentAttache attache = (AgentAttache)link.attachment();
if (attache != null) {
attache.sendNext(Request.getSequence(data));
} else if (s_logger.isDebugEnabled()){
logD(data, "No attache to process " + Request.parse(data).toString());
}
}
return;
} else {
if (Request.isRequest(data)) {
super.doTask(task);
} else {
// received an answer.
final Response response = Response.parse(data);
AgentAttache attache = findAttache(response.getAgentId());
if (attache == null) {
s_logger.info("SeqA " + response.getAgentId() + "-" + response.getSequence() + "Unable to find attache to forward " + response.toString());
return;
}
if (!attache.processAnswers(response.getSequence(), response)) {
s_logger.info("SeqA " + attache.getId() + "-" + response.getSequence() + ": Response is not processed: " + response.toString());
}
}
return;
}
}
Transaction txn = Transaction.open(Transaction.CLOUD_DB);
try {
if (task.getType() != Task.Type.DATA) {
super.doTask(task);
return;
}
final byte[] data = task.getData();
Version ver = Request.getVersion(data);
if (ver.ordinal() < Version.v3.ordinal()) {
super.doTask(task);
return;
}
long hostId = Request.getAgentId(data);
Link link = task.getLink();
if (Request.fromServer(data)) {
AgentAttache agent = findAgent(hostId);
if (Request.isControl(data)) {
if (agent == null) {
logD(data, "No attache to process cancellation");
return;
}
Request req = Request.parse(data);
Command[] cmds = req.getCommands();
CancelCommand cancel = (CancelCommand) cmds[0];
if (s_logger.isDebugEnabled()) {
logD(data, "Cancel request received");
}
agent.cancel(cancel.getSequence());
return;
}
try {
if (agent == null || agent.isClosed()) {
throw new AgentUnavailableException("Unable to route to agent ", hostId);
}
if (Request.isRequest(data) && Request.requiresSequentialExecution(data)) {
// route it to the agent.
// But we have the serialize the control commands here so we have
// to deserialize this and send it through the agent attache.
Request req = Request.parse(data);
agent.send(req, null);
return;
} else {
if (agent instanceof Routable) {
Routable cluster = (Routable) agent;
cluster.routeToAgent(data);
} else {
agent.send(Request.parse(data));
}
return;
}
} catch (AgentUnavailableException e) {
logD(data, e.getMessage());
cancel(Long.toString(Request.getManagementServerId(data)), hostId, Request.getSequence(data), e.getMessage());
}
} else {
long mgmtId = Request.getManagementServerId(data);
if (mgmtId != -1 && mgmtId != _nodeId) {
routeToPeer(Long.toString(mgmtId), data);
if (Request.requiresSequentialExecution(data)) {
AgentAttache attache = (AgentAttache) link.attachment();
if (attache != null) {
attache.sendNext(Request.getSequence(data));
} else if (s_logger.isDebugEnabled()) {
logD(data, "No attache to process " + Request.parse(data).toString());
}
}
return;
} else {
if (Request.isRequest(data)) {
super.doTask(task);
} else {
// received an answer.
final Response response = Response.parse(data);
AgentAttache attache = findAttache(response.getAgentId());
if (attache == null) {
s_logger.info("SeqA " + response.getAgentId() + "-" + response.getSequence() + "Unable to find attache to forward " + response.toString());
return;
}
if (!attache.processAnswers(response.getSequence(), response)) {
s_logger.info("SeqA " + attache.getId() + "-" + response.getSequence() + ": Response is not processed: " + response.toString());
}
}
return;
}
}
} finally {
txn.close();
txn.close();
}
}
}
@Override
public void onManagementNodeJoined(List<ManagementServerHostVO> nodeList, long selfNodeId) {
}
@Override
@ -590,6 +597,6 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
@Override
public void onManagementNodeIsolated() {
}
}
public void onManagementNodeIsolated() {
}
}

View File

@ -76,67 +76,66 @@ import com.cloud.vm.VirtualMachineProfile;
import com.cloud.vm.dao.VMInstanceDao;
/**
* HighAvailabilityManagerImpl coordinates the HA process. VMs are registered with
* the HA Manager for HA. The request is stored within a database backed
* work queue. HAManager has a number of workers that pick up
* these work items to perform HA on the VMs.
*
* The HA process goes as follows:
* 1. Check with the list of Investigators to determine that the VM is
* no longer running. If a Investigator finds the VM is still alive,
* the HA process is stopped and the state of the VM reverts back to
* its previous state. If a Investigator finds the VM is dead, then
* HA process is started on the VM, skipping step 2.
* 2. If the list of Investigators can not determine if the VM is dead or
* alive. The list of FenceBuilders is invoked to fence off the VM
* so that it won't do any damage to the storage and network.
* 3. The VM is marked as stopped.
* 4. The VM is started again via the normal process of starting VMs. Note
* that once the VM is marked as stopped, the user may have started the
* VM himself.
* 5. VMs that have re-started more than the configured number of times are
* marked as in Error state and the user is not allowed to restart
* the VM.
*
* @config
* {@table
* || Param Name | Description | Values | Default ||
* || workers | number of worker threads to spin off to do the processing | int | 1 ||
* || time.to.sleep | Time to sleep if no work items are found | seconds | 60 ||
* || max.retries | number of times to retry start | int | 5 ||
* || time.between.failure | Time elapsed between failures before we consider it as another retry | seconds | 3600 ||
* || time.between.cleanup | Time to wait before the cleanup thread runs | seconds | 86400 ||
* || force.ha | Force HA to happen even if the VM says no | boolean | false ||
* || ha.retry.wait | time to wait before retrying the work item | seconds | 120 ||
* || stop.retry.wait | time to wait before retrying the stop | seconds | 120 ||
* }
* HighAvailabilityManagerImpl coordinates the HA process. VMs are registered with the HA Manager for HA. The request is stored
* within a database backed work queue. HAManager has a number of workers that pick up these work items to perform HA on the
* VMs.
*
* The HA process goes as follows: 1. Check with the list of Investigators to determine that the VM is no longer running. If a
* Investigator finds the VM is still alive, the HA process is stopped and the state of the VM reverts back to its previous
* state. If a Investigator finds the VM is dead, then HA process is started on the VM, skipping step 2. 2. If the list of
* Investigators can not determine if the VM is dead or alive. The list of FenceBuilders is invoked to fence off the VM so that
* it won't do any damage to the storage and network. 3. The VM is marked as stopped. 4. The VM is started again via the normal
* process of starting VMs. Note that once the VM is marked as stopped, the user may have started the VM himself. 5. VMs that
* have re-started more than the configured number of times are marked as in Error state and the user is not allowed to restart
* the VM.
*
* @config {@table || Param Name | Description | Values | Default || || workers | number of worker threads to spin off to do the
* processing | int | 1 || || time.to.sleep | Time to sleep if no work items are found | seconds | 60 || || max.retries
* | number of times to retry start | int | 5 || || time.between.failure | Time elapsed between failures before we
* consider it as another retry | seconds | 3600 || || time.between.cleanup | Time to wait before the cleanup thread
* runs | seconds | 86400 || || force.ha | Force HA to happen even if the VM says no | boolean | false || ||
* ha.retry.wait | time to wait before retrying the work item | seconds | 120 || || stop.retry.wait | time to wait
* before retrying the stop | seconds | 120 || * }
**/
@Local(value={HighAvailabilityManager.class})
@Local(value = { HighAvailabilityManager.class })
public class HighAvailabilityManagerImpl implements HighAvailabilityManager, ClusterManagerListener {
protected static final Logger s_logger = Logger.getLogger(HighAvailabilityManagerImpl.class);
String _name;
WorkerThread[] _workers;
boolean _stopped;
long _timeToSleep;
@Inject HighAvailabilityDao _haDao;
@Inject VMInstanceDao _instanceDao;
@Inject HostDao _hostDao;
@Inject DataCenterDao _dcDao;
@Inject HostPodDao _podDao;
@Inject ClusterDetailsDao _clusterDetailsDao;
@Inject
HighAvailabilityDao _haDao;
@Inject
VMInstanceDao _instanceDao;
@Inject
HostDao _hostDao;
@Inject
DataCenterDao _dcDao;
@Inject
HostPodDao _podDao;
@Inject
ClusterDetailsDao _clusterDetailsDao;
long _serverId;
@Inject(adapter=Investigator.class)
@Inject(adapter = Investigator.class)
Adapters<Investigator> _investigators;
@Inject(adapter=FenceBuilder.class)
@Inject(adapter = FenceBuilder.class)
Adapters<FenceBuilder> _fenceBuilders;
@Inject AgentManager _agentMgr;
@Inject AlertManager _alertMgr;
@Inject StorageManager _storageMgr;
@Inject GuestOSDao _guestOSDao;
@Inject GuestOSCategoryDao _guestOSCategoryDao;
@Inject VirtualMachineManager _itMgr;
@Inject AccountManager _accountMgr;
@Inject
AgentManager _agentMgr;
@Inject
AlertManager _alertMgr;
@Inject
StorageManager _storageMgr;
@Inject
GuestOSDao _guestOSDao;
@Inject
GuestOSCategoryDao _guestOSCategoryDao;
@Inject
VirtualMachineManager _itMgr;
@Inject
AccountManager _accountMgr;
String _instance;
ScheduledExecutorService _executor;
int _stopRetryInterval;
@ -167,7 +166,7 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager, Clu
hostState = investigator.isAgentAlive(host);
if (hostState != null) {
if (s_logger.isDebugEnabled()) {
s_logger.debug(investigator.getName()+ " was able to determine host " + hostId + " is in " + hostState.toString());
s_logger.debug(investigator.getName() + " was able to determine host " + hostId + " is in " + hostState.toString());
}
return hostState;
}
@ -179,11 +178,10 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager, Clu
return null;
}
@Override
public void scheduleRestartForVmsOnHost(final HostVO host) {
if( host.getType() != Host.Type.Routing) {
if (host.getType() != Host.Type.Routing) {
return;
}
s_logger.warn("Scheduling restart for VMs on host " + host.getId());
@ -213,7 +211,8 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager, Clu
HostPodVO podVO = _podDao.findById(host.getPodId());
String hostDesc = "name: " + host.getName() + " (id:" + host.getId() + "), availability zone: " + dcVO.getName() + ", pod: " + podVO.getName();
_alertMgr.sendAlert(AlertManager.ALERT_TYPE_HOST, host.getDataCenterId(), host.getPodId(), "Host is down, " + hostDesc, "Host [" + hostDesc + "] is down." + ((sb != null) ? sb.toString() : ""));
_alertMgr.sendAlert(AlertManager.ALERT_TYPE_HOST, host.getDataCenterId(), host.getPodId(), "Host is down, " + hostDesc, "Host [" + hostDesc + "] is down."
+ ((sb != null) ? sb.toString() : ""));
for (final VMInstanceVO vm : vms) {
if (s_logger.isDebugEnabled()) {
@ -225,25 +224,25 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager, Clu
@Override
public void scheduleStop(VMInstanceVO vm, long hostId, WorkType type) {
assert (type == WorkType.CheckStop || type == WorkType.ForceStop || type == WorkType.Stop);
if (_haDao.hasBeenScheduled(vm.getId(), type)) {
s_logger.info("There's already a job scheduled to stop " + vm);
return;
}
HaWorkVO work = new HaWorkVO(vm.getId(), vm.getType(), type, Step.Scheduled, hostId, vm.getState(), 0, vm.getUpdated());
assert (type == WorkType.CheckStop || type == WorkType.ForceStop || type == WorkType.Stop);
if (_haDao.hasBeenScheduled(vm.getId(), type)) {
s_logger.info("There's already a job scheduled to stop " + vm);
return;
}
HaWorkVO work = new HaWorkVO(vm.getId(), vm.getType(), type, Step.Scheduled, hostId, vm.getState(), 0, vm.getUpdated());
_haDao.persist(work);
if (s_logger.isDebugEnabled()) {
s_logger.debug("Scheduled " + work);
s_logger.debug("Scheduled " + work);
}
wakeupWorkers();
}
protected void wakeupWorkers() {
for (WorkerThread worker : _workers) {
worker.wakup();
}
for (WorkerThread worker : _workers) {
worker.wakup();
}
}
@Override
@ -256,15 +255,15 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager, Clu
@Override
public void scheduleRestart(VMInstanceVO vm, final boolean investigate) {
Long hostId = vm.getHostId();
if (hostId == null) {
_itMgr.stateTransitTo(vm, Event.OperationFailed, null);
return;
}
Long hostId = vm.getHostId();
if (hostId == null) {
_itMgr.stateTransitTo(vm, Event.OperationFailed, null);
return;
}
if (!investigate) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("VM does not require investigation so I'm marking it as Stopped: " + vm.toString());
}
if (s_logger.isDebugEnabled()) {
s_logger.debug("VM does not require investigation so I'm marking it as Stopped: " + vm.toString());
}
short alertType = AlertManager.ALERT_TYPE_USERVM;
if (VirtualMachine.Type.DomainRouter.equals(vm.getType())) {
@ -275,12 +274,9 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager, Clu
if (!(_forceHA || vm.isHaEnabled())) {
String hostDesc = "id:" + vm.getHostId() + ", availability zone id:" + vm.getDataCenterId() + ", pod id:" + vm.getPodId();
_alertMgr.sendAlert(alertType, vm.getDataCenterId(), vm.getPodId(), "VM (name: "
+ vm.getName() + ", id: " + vm.getId() + ") stopped unexpectedly on host "
+ hostDesc, "Virtual Machine " + vm.getName() + " (id: "
+ vm.getId() + ") running on host [" + vm.getHostId()
+ "] stopped unexpectedly.");
_alertMgr.sendAlert(alertType, vm.getDataCenterId(), vm.getPodId(), "VM (name: " + vm.getName() + ", id: " + vm.getId() + ") stopped unexpectedly on host " + hostDesc,
"Virtual Machine " + vm.getName() + " (id: " + vm.getId() + ") running on host [" + vm.getHostId() + "] stopped unexpectedly.");
if (s_logger.isDebugEnabled()) {
s_logger.debug("VM is not HA enabled so we're done.");
}
@ -313,11 +309,10 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager, Clu
}
}
if (NeedToAddNew) {
final HaWorkVO work = new HaWorkVO(vm.getId(), vm.getType(), WorkType.HA, investigate ? Step.Investigating : Step.Scheduled, hostId, vm.getState(),
maxRetries + 1, vm.getUpdated());
final HaWorkVO work = new HaWorkVO(vm.getId(), vm.getType(), WorkType.HA, investigate ? Step.Investigating : Step.Scheduled, hostId, vm.getState(), maxRetries + 1, vm.getUpdated());
_haDao.persist(work);
}
if (s_logger.isInfoEnabled()) {
s_logger.info("Schedule vm for HA: " + vm.toString());
}
@ -337,16 +332,11 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager, Clu
s_logger.info("HA on " + vm);
if (vm.getState() != work.getPreviousState() || vm.getUpdated() != work.getUpdateTime()) {
s_logger.info("VM " + vm + " has been changed. Current State = " + vm.getState() + " Previous State = " + work.getPreviousState() + " last updated = " + vm.getUpdated() + " previous updated = " + work.getUpdateTime());
return null;
s_logger.info("VM " + vm + " has been changed. Current State = " + vm.getState() + " Previous State = " + work.getPreviousState() + " last updated = " + vm.getUpdated()
+ " previous updated = " + work.getUpdateTime());
return null;
}
HostVO host = _hostDao.findById(work.getHostId());
DataCenterVO dcVO = _dcDao.findById(host.getDataCenterId());
HostPodVO podVO = _podDao.findById(host.getPodId());
String hostDesc = "name: " + host.getName() + "(id:" + host.getId() + "), availability zone: " + dcVO.getName() + ", pod: " + podVO.getName();
short alertType = AlertManager.ALERT_TYPE_USERVM;
if (VirtualMachine.Type.DomainRouter.equals(vm.getType())) {
alertType = AlertManager.ALERT_TYPE_DOMAIN_ROUTER;
@ -354,82 +344,101 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager, Clu
alertType = AlertManager.ALERT_TYPE_CONSOLE_PROXY;
}
HostVO host = _hostDao.findById(work.getHostId());
boolean isHostRemoved = false;
if (host == null) {
host = _hostDao.findByIdIncludingRemoved(work.getHostId());
if (host != null) {
s_logger.debug("VM " + vm.toString() + " is now no longer on host " + work.getHostId() + " as the host is removed");
isHostRemoved = true;
}
}
DataCenterVO dcVO = _dcDao.findById(host.getDataCenterId());
HostPodVO podVO = _podDao.findById(host.getPodId());
String hostDesc = "name: " + host.getName() + "(id:" + host.getId() + "), availability zone: " + dcVO.getName() + ", pod: " + podVO.getName();
Boolean alive = null;
if (work.getStep() == Step.Investigating) {
if (vm.getHostId() == null || vm.getHostId() != work.getHostId()) {
s_logger.info("VM " + vm.toString() + " is now no longer on host " + work.getHostId());
return null;
}
Enumeration<Investigator> en = _investigators.enumeration();
Investigator investigator = null;
while (en.hasMoreElements()) {
investigator = en.nextElement();
alive = investigator.isVmAlive(vm, host);
s_logger.info(investigator.getName() + " found " + vm + "to be alive? " + alive);
if (alive != null) {
break;
if (!isHostRemoved) {
if (vm.getHostId() == null || vm.getHostId() != work.getHostId()) {
s_logger.info("VM " + vm.toString() + " is now no longer on host " + work.getHostId());
return null;
}
}
boolean fenced = false;
if (alive == null) {
s_logger.debug("Fencing off VM that we don't know the state of");
Enumeration<FenceBuilder> enfb = _fenceBuilders.enumeration();
while (enfb.hasMoreElements()) {
FenceBuilder fb = enfb.nextElement();
Boolean result = fb.fenceOff(vm, host);
s_logger.info("Fencer " + fb.getName() + " returned " + result);
if (result != null && result) {
fenced = true;
break;
Enumeration<Investigator> en = _investigators.enumeration();
Investigator investigator = null;
while (en.hasMoreElements()) {
investigator = en.nextElement();
alive = investigator.isVmAlive(vm, host);
s_logger.info(investigator.getName() + " found " + vm + "to be alive? " + alive);
if (alive != null) {
break;
}
}
} else if (!alive) {
fenced = true;
} else {
s_logger.debug("VM " + vm.getName() + " is found to be alive by " + investigator.getName());
if (host.getStatus() == Status.Up) {
s_logger.info(vm + " is alive and host is up. No need to restart it.");
return null;
} else {
s_logger.debug("Rescheduling because the host is not up but the vm is alive");
return (System.currentTimeMillis() >> 10) + _investigateRetryInterval;
}
}
if (!fenced) {
s_logger.debug("We were unable to fence off the VM " + vm);
_alertMgr.sendAlert(alertType, vm.getDataCenterId(), vm.getPodId(), "Unable to restart " + vm.getName() + " which was running on host " + hostDesc, "Insufficient capacity to restart VM, name: " + vm.getName() + ", id: " + vmId + " which was running on host " + hostDesc);
return (System.currentTimeMillis() >> 10) + _restartRetryInterval;
}
try {
_itMgr.advanceStop(vm, true, _accountMgr.getSystemUser(), _accountMgr.getSystemAccount());
} catch (ResourceUnavailableException e) {
assert false : "How do we hit this when force is true?";
throw new CloudRuntimeException("Caught exception even though it should be handled.", e);
} catch (OperationTimedoutException e) {
assert false : "How do we hit this when force is true?";
throw new CloudRuntimeException("Caught exception even though it should be handled.", e);
} catch (ConcurrentOperationException e) {
assert false : "How do we hit this when force is true?";
throw new CloudRuntimeException("Caught exception even though it should be handled.", e);
boolean fenced = false;
if (alive == null) {
s_logger.debug("Fencing off VM that we don't know the state of");
Enumeration<FenceBuilder> enfb = _fenceBuilders.enumeration();
while (enfb.hasMoreElements()) {
FenceBuilder fb = enfb.nextElement();
Boolean result = fb.fenceOff(vm, host);
s_logger.info("Fencer " + fb.getName() + " returned " + result);
if (result != null && result) {
fenced = true;
break;
}
}
} else if (!alive) {
fenced = true;
} else {
s_logger.debug("VM " + vm.getName() + " is found to be alive by " + investigator.getName());
if (host.getStatus() == Status.Up) {
s_logger.info(vm + " is alive and host is up. No need to restart it.");
return null;
} else {
s_logger.debug("Rescheduling because the host is not up but the vm is alive");
return (System.currentTimeMillis() >> 10) + _investigateRetryInterval;
}
}
if (!fenced) {
s_logger.debug("We were unable to fence off the VM " + vm);
_alertMgr.sendAlert(alertType, vm.getDataCenterId(), vm.getPodId(), "Unable to restart " + vm.getName() + " which was running on host " + hostDesc,
"Insufficient capacity to restart VM, name: " + vm.getName() + ", id: " + vmId + " which was running on host " + hostDesc);
return (System.currentTimeMillis() >> 10) + _restartRetryInterval;
}
try {
_itMgr.advanceStop(vm, true, _accountMgr.getSystemUser(), _accountMgr.getSystemAccount());
} catch (ResourceUnavailableException e) {
assert false : "How do we hit this when force is true?";
throw new CloudRuntimeException("Caught exception even though it should be handled.", e);
} catch (OperationTimedoutException e) {
assert false : "How do we hit this when force is true?";
throw new CloudRuntimeException("Caught exception even though it should be handled.", e);
} catch (ConcurrentOperationException e) {
assert false : "How do we hit this when force is true?";
throw new CloudRuntimeException("Caught exception even though it should be handled.", e);
}
work.setStep(Step.Scheduled);
_haDao.update(work.getId(), work);
} else {
assert false : "How come that HA step is Investigating and the host is removed?";
}
work.setStep(Step.Scheduled);
_haDao.update(work.getId(), work);
}
vm = _itMgr.findById(vm.getType(), vm.getId());
if (!_forceHA && !vm.isHaEnabled()) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("VM is not HA enabled so we're done.");
}
return null; // VM doesn't require HA
return null; // VM doesn't require HA
}
if (!_storageMgr.canVmRestartOnAnotherServer(vm.getId())) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("VM can not restart on another server.");
@ -450,20 +459,24 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager, Clu
}
if (s_logger.isDebugEnabled()) {
s_logger.debug("Rescheduling VM " + vm.toString() + " to try again in " + _restartRetryInterval);
s_logger.debug("Rescheduling VM " + vm.toString() + " to try again in " + _restartRetryInterval);
}
} catch (final InsufficientCapacityException e) {
s_logger.warn("Unable to restart " + vm.toString() + " due to " + e.getMessage());
_alertMgr.sendAlert(alertType, vm.getDataCenterId(), vm.getPodId(), "Unable to restart " + vm.getName() + " which was running on host " + hostDesc, "Insufficient capacity to restart VM, name: " + vm.getName() + ", id: " + vmId + " which was running on host " + hostDesc);
s_logger.warn("Unable to restart " + vm.toString() + " due to " + e.getMessage());
_alertMgr.sendAlert(alertType, vm.getDataCenterId(), vm.getPodId(), "Unable to restart " + vm.getName() + " which was running on host " + hostDesc,
"Insufficient capacity to restart VM, name: " + vm.getName() + ", id: " + vmId + " which was running on host " + hostDesc);
} catch (final ResourceUnavailableException e) {
s_logger.warn("Unable to restart " + vm.toString() + " due to " + e.getMessage());
_alertMgr.sendAlert(alertType, vm.getDataCenterId(), vm.getPodId(), "Unable to restart " + vm.getName() + " which was running on host " + hostDesc, "The Storage is unavailable for trying to restart VM, name: " + vm.getName() + ", id: " + vmId + " which was running on host " + hostDesc);
s_logger.warn("Unable to restart " + vm.toString() + " due to " + e.getMessage());
_alertMgr.sendAlert(alertType, vm.getDataCenterId(), vm.getPodId(), "Unable to restart " + vm.getName() + " which was running on host " + hostDesc,
"The Storage is unavailable for trying to restart VM, name: " + vm.getName() + ", id: " + vmId + " which was running on host " + hostDesc);
} catch (ConcurrentOperationException e) {
s_logger.warn("Unable to restart " + vm.toString() + " due to " + e.getMessage());
_alertMgr.sendAlert(alertType, vm.getDataCenterId(), vm.getPodId(), "Unable to restart " + vm.getName() + " which was running on host " + hostDesc, "The Storage is unavailable for trying to restart VM, name: " + vm.getName() + ", id: " + vmId + " which was running on host " + hostDesc);
s_logger.warn("Unable to restart " + vm.toString() + " due to " + e.getMessage());
_alertMgr.sendAlert(alertType, vm.getDataCenterId(), vm.getPodId(), "Unable to restart " + vm.getName() + " which was running on host " + hostDesc,
"The Storage is unavailable for trying to restart VM, name: " + vm.getName() + ", id: " + vmId + " which was running on host " + hostDesc);
} catch (OperationTimedoutException e) {
s_logger.warn("Unable to restart " + vm.toString() + " due to " + e.getMessage());
_alertMgr.sendAlert(alertType, vm.getDataCenterId(), vm.getPodId(), "Unable to restart " + vm.getName() + " which was running on host " + hostDesc, "The Storage is unavailable for trying to restart VM, name: " + vm.getName() + ", id: " + vmId + " which was running on host " + hostDesc);
_alertMgr.sendAlert(alertType, vm.getDataCenterId(), vm.getPodId(), "Unable to restart " + vm.getName() + " which was running on host " + hostDesc,
"The Storage is unavailable for trying to restart VM, name: " + vm.getName() + ", id: " + vmId + " which was running on host " + hostDesc);
}
vm = _itMgr.findById(vm.getType(), vm.getId());
work.setUpdateTime(vm.getUpdated());
@ -471,15 +484,14 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager, Clu
return (System.currentTimeMillis() >> 10) + _restartRetryInterval;
}
public Long migrate(final HaWorkVO work) {
long vmId = work.getInstanceId();
long srcHostId = work.getHostId();
try {
work.setStep(Step.Migrating);
_haDao.update(work.getId(), work);
if (!_itMgr.migrateAway(work.getType(), vmId, srcHostId)) {
s_logger.warn("Unable to migrate vm from " + srcHostId);
_agentMgr.maintenanceFailed(srcHostId);
@ -488,58 +500,58 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager, Clu
} catch (InsufficientServerCapacityException e) {
s_logger.warn("Insufficient capacity for migrating a VM.");
_agentMgr.maintenanceFailed(srcHostId);
return (System.currentTimeMillis() >> 10) + _migrateRetryInterval;
return (System.currentTimeMillis() >> 10) + _migrateRetryInterval;
} catch (VirtualMachineMigrationException e) {
s_logger.warn("Looks like VM is still starting, we need to retry migrating the VM later.");
_agentMgr.maintenanceFailed(srcHostId);
return (System.currentTimeMillis() >> 10) + _migrateRetryInterval;
return (System.currentTimeMillis() >> 10) + _migrateRetryInterval;
}
}
@Override
public void scheduleDestroy(VMInstanceVO vm, long hostId) {
final HaWorkVO work = new HaWorkVO(vm.getId(), vm.getType(), WorkType.Destroy, Step.Scheduled, hostId, vm.getState(), 0, vm.getUpdated());
_haDao.persist(work);
if (s_logger.isDebugEnabled()) {
s_logger.debug("Scheduled " + work.toString());
s_logger.debug("Scheduled " + work.toString());
}
wakeupWorkers();
}
@Override
public void cancelDestroy(VMInstanceVO vm, Long hostId) {
_haDao.delete(vm.getId(), WorkType.Destroy);
_haDao.delete(vm.getId(), WorkType.Destroy);
}
protected Long destroyVM(HaWorkVO work) {
final VMInstanceVO vm = _itMgr.findById(work.getType(), work.getInstanceId());
s_logger.info("Destroying " + vm.toString());
try {
if (vm.getState() != State.Destroyed) {
s_logger.info("VM is no longer in Destroyed state " + vm.toString());
return null;
}
if (vm.getState() != State.Destroyed) {
s_logger.info("VM is no longer in Destroyed state " + vm.toString());
return null;
}
if (vm.getHostId() != null) {
if (_itMgr.destroy(vm, _accountMgr.getSystemUser(), _accountMgr.getSystemAccount())) {
if (_itMgr.destroy(vm, _accountMgr.getSystemUser(), _accountMgr.getSystemAccount())) {
s_logger.info("Successfully destroy " + vm);
return null;
}
s_logger.debug("Stop for " + vm + " was unsuccessful.");
return null;
}
s_logger.debug("Stop for " + vm + " was unsuccessful.");
} else {
if (s_logger.isDebugEnabled()) {
s_logger.debug(vm + " has already been stopped");
}
if (s_logger.isDebugEnabled()) {
s_logger.debug(vm + " has already been stopped");
}
return null;
}
} catch (final AgentUnavailableException e) {
s_logger.debug("Agnet is not available" + e.getMessage());
} catch (OperationTimedoutException e) {
s_logger.debug("operation timed out: " + e.getMessage());
} catch (ConcurrentOperationException e) {
s_logger.debug("operation timed out: " + e.getMessage());
} catch (ConcurrentOperationException e) {
s_logger.debug("concurrent operation: " + e.getMessage());
}
work.setTimesTried(work.getTimesTried() + 1);
return (System.currentTimeMillis() >> 10) + _stopRetryInterval;
}
@ -553,27 +565,27 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager, Clu
}
s_logger.info("Stopping " + vm);
try {
if (work.getWorkType() == WorkType.Stop) {
if (vm.getHostId() == null) {
if (work.getWorkType() == WorkType.Stop) {
if (vm.getHostId() == null) {
if (s_logger.isDebugEnabled()) {
s_logger.debug(vm.toString() + " has already been stopped");
}
return null;
}
}
if (_itMgr.advanceStop(vm, false, _accountMgr.getSystemUser(), _accountMgr.getSystemAccount())) {
s_logger.info("Successfully stopped " + vm);
s_logger.info("Successfully stopped " + vm);
return null;
}
} else if (work.getWorkType() == WorkType.CheckStop) {
if ((vm.getState() != work.getPreviousState()) || vm.getUpdated() != work.getUpdateTime() || vm.getHostId() == null || vm.getHostId().longValue() != work.getHostId()) {
s_logger.info(vm + " is different now. Scheduled Host: " + work.getHostId() + " Current Host: " + (vm.getHostId() != null ? vm.getHostId() : "none") + " State: " + vm.getState());
return null;
}
if (_itMgr.advanceStop(vm, false, _accountMgr.getSystemUser(), _accountMgr.getSystemAccount())) {
s_logger.info("Stop for " + vm + " was successful");
return null;
}
} else if (work.getWorkType() == WorkType.ForceStop){
} else if (work.getWorkType() == WorkType.CheckStop) {
if ((vm.getState() != work.getPreviousState()) || vm.getUpdated() != work.getUpdateTime() || vm.getHostId() == null || vm.getHostId().longValue() != work.getHostId()) {
s_logger.info(vm + " is different now. Scheduled Host: " + work.getHostId() + " Current Host: " + (vm.getHostId() != null ? vm.getHostId() : "none") + " State: " + vm.getState());
return null;
}
if (_itMgr.advanceStop(vm, false, _accountMgr.getSystemUser(), _accountMgr.getSystemAccount())) {
s_logger.info("Stop for " + vm + " was successful");
return null;
}
} else if (work.getWorkType() == WorkType.ForceStop) {
if ((vm.getState() != work.getPreviousState()) || vm.getUpdated() != work.getUpdateTime() || vm.getHostId() == null || vm.getHostId().longValue() != work.getHostId()) {
s_logger.info(vm + " is different now. Scheduled Host: " + work.getHostId() + " Current Host: " + (vm.getHostId() != null ? vm.getHostId() : "none") + " State: " + vm.getState());
return null;
@ -582,15 +594,15 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager, Clu
s_logger.info("Stop for " + vm + " was successful");
return null;
}
} else {
assert false : "Who decided there's other steps but didn't modify the guy who does the work?";
}
} else {
assert false : "Who decided there's other steps but didn't modify the guy who does the work?";
}
} catch (final ResourceUnavailableException e) {
s_logger.debug("Agnet is not available" + e.getMessage());
} catch (OperationTimedoutException e) {
s_logger.debug("operation timed out: " + e.getMessage());
}
s_logger.debug("operation timed out: " + e.getMessage());
}
work.setTimesTried(work.getTimesTried() + 1);
if (s_logger.isDebugEnabled()) {
s_logger.debug("Stop was unsuccessful. Rescheduling");
@ -604,15 +616,15 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager, Clu
_haDao.deleteMigrationWorkItems(host.getId(), type, _serverId);
}
@Override
public List<VMInstanceVO> findTakenMigrationWork() {
List<HaWorkVO> works = _haDao.findTakenWorkItems(WorkType.Migration);
List<VMInstanceVO> vms = new ArrayList<VMInstanceVO>(works.size());
for (HaWorkVO work : works) {
vms.add(_instanceDao.findById(work.getInstanceId()));
}
return vms;
List<HaWorkVO> works = _haDao.findTakenWorkItems(WorkType.Migration);
List<VMInstanceVO> vms = new ArrayList<VMInstanceVO>(works.size());
for (HaWorkVO work : works) {
vms.add(_instanceDao.findById(work.getInstanceId()));
}
return vms;
}
@Override
@ -620,7 +632,7 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager, Clu
_name = name;
ComponentLocator locator = ComponentLocator.getLocator(ManagementServer.Name);
_serverId = ((ManagementServer)ComponentLocator.getComponent(ManagementServer.Name)).getId();
_serverId = ((ManagementServer) ComponentLocator.getComponent(ManagementServer.Name)).getId();
_investigators = locator.getAdapters(Investigator.class);
_fenceBuilders = locator.getAdapters(FenceBuilder.class);
@ -652,16 +664,16 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager, Clu
value = params.get("time.between.cleanup");
_timeBetweenCleanups = NumbersUtil.parseLong(value, 3600 * 24);
value = params.get("stop.retry.interval");
_stopRetryInterval = NumbersUtil.parseInt(value, 10 * 60);
value = params.get("restart.retry.interval");
_restartRetryInterval = NumbersUtil.parseInt(value, 10 * 60);
value = params.get("investigate.retry.interval");
_investigateRetryInterval = NumbersUtil.parseInt(value, 1 * 60);
value = params.get("migrate.retry.interval");
_migrateRetryInterval = NumbersUtil.parseInt(value, 2 * 60);
@ -669,16 +681,16 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager, Clu
if (_instance == null) {
_instance = "VMOPS";
}
_haDao.releaseWorkItems(_serverId);
_stopped = true;
_executor = Executors.newScheduledThreadPool(count, new NamedThreadFactory("HA"));
return true;
}
@Override
public String getName() {
return _name;
@ -696,7 +708,7 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager, Clu
return true;
}
@Override
public boolean stop() {
_stopped = true;
@ -714,23 +726,23 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager, Clu
s_logger.info("HA Cleanup Thread Running");
try {
_haDao.cleanup(System.currentTimeMillis() - _timeBetweenFailures);
_haDao.cleanup(System.currentTimeMillis() - _timeBetweenFailures);
} catch (Exception e) {
s_logger.warn("Error while cleaning up", e);
} finally {
StackMaid.current().exitCleanup();
StackMaid.current().exitCleanup();
}
}
}
protected class WorkerThread extends Thread {
public WorkerThread(String name) {
super(name);
}
public WorkerThread(String name) {
super(name);
}
@Override
public void run() {
s_logger.info("Starting work");
s_logger.info("Starting work");
while (!_stopped) {
HaWorkVO work = null;
try {
@ -738,9 +750,9 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager, Clu
work = _haDao.take(_serverId);
if (work == null) {
try {
synchronized(this) {
wait(_timeToSleep);
}
synchronized (this) {
wait(_timeToSleep);
}
continue;
} catch (final InterruptedException e) {
s_logger.info("Interrupted");
@ -761,17 +773,17 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager, Clu
} else if (wt == WorkType.Stop || wt == WorkType.CheckStop || wt == WorkType.ForceStop) {
nextTime = stopVM(work);
} else if (wt == WorkType.Destroy) {
nextTime = destroyVM(work);
nextTime = destroyVM(work);
} else {
assert false : "How did we get here with " + wt.toString();
assert false : "How did we get here with " + wt.toString();
continue;
}
if (nextTime == null) {
s_logger.info("Completed " + work);
s_logger.info("Completed " + work);
work.setStep(Step.Done);
} else {
s_logger.info("Rescheduling " + work + " to try again at " + new Date(nextTime << 10));
s_logger.info("Rescheduling " + work + " to try again at " + new Date(nextTime << 10));
work.setTimeToTry(nextTime);
work.setServerId(null);
work.setDateTaken(null);
@ -781,20 +793,20 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager, Clu
work.setStep(Step.Error);
}
_haDao.update(work.getId(), work);
} catch(final Throwable th) {
} catch (final Throwable th) {
s_logger.error("Caught this throwable, ", th);
} finally {
StackMaid.current().exitCleanup();
if (work != null) {
NDC.pop();
}
StackMaid.current().exitCleanup();
if (work != null) {
NDC.pop();
}
}
}
s_logger.info("Time to go home!");
}
public synchronized void wakup() {
notifyAll();
notifyAll();
}
}
@ -808,8 +820,8 @@ public class HighAvailabilityManagerImpl implements HighAvailabilityManager, Clu
_haDao.releaseWorkItems(node.getMsid());
}
}
@Override
public void onManagementNodeIsolated() {
}
public void onManagementNodeIsolated() {
}
}

File diff suppressed because it is too large Load Diff