mirror of
				https://github.com/apache/cloudstack.git
				synced 2025-10-26 08:42:29 +01:00 
			
		
		
		
	CLOUDSTACK-9782: Improve scheduling of jobs
- Removed three bg thread tasks, uses FSM event-trigger based scheduling - On successful recovery, kicks VM HA - Improves overall HA scheduling and task submission, lower DB access Signed-off-by: Rohit Yadav <rohit.yadav@shapeblue.com>
This commit is contained in:
		
							parent
							
								
									c0b33db5ce
								
							
						
					
					
						commit
						d2c3408da7
					
				| @ -47,8 +47,10 @@ public interface HAConfig extends StateObject<HAConfig.HAState>, InternalIdentit | ||||
|         ActivityCheckFailureUnderThresholdRatio, | ||||
|         PowerCycle, | ||||
|         Recovered, | ||||
|         RetryRecovery, | ||||
|         RecoveryWaitPeriodTimeout, | ||||
|         RecoveryOperationThresholdExceeded, | ||||
|         RetryFencing, | ||||
|         Fenced; | ||||
| 
 | ||||
|         public Long getServerId() { | ||||
| @ -123,6 +125,7 @@ public interface HAConfig extends StateObject<HAConfig.HAState>, InternalIdentit | ||||
| 
 | ||||
|             FSM.addTransition(Recovering, Event.Disabled, Disabled); | ||||
|             FSM.addTransition(Recovering, Event.Ineligible, Ineligible); | ||||
|             FSM.addTransition(Recovering, Event.RetryRecovery, Recovering); | ||||
|             FSM.addTransition(Recovering, Event.Recovered, Recovered); | ||||
|             FSM.addTransition(Recovering, Event.RecoveryOperationThresholdExceeded, Fencing); | ||||
| 
 | ||||
| @ -132,6 +135,7 @@ public interface HAConfig extends StateObject<HAConfig.HAState>, InternalIdentit | ||||
| 
 | ||||
|             FSM.addTransition(Fencing, Event.Disabled, Disabled); | ||||
|             FSM.addTransition(Fencing, Event.Ineligible, Ineligible); | ||||
|             FSM.addTransition(Fencing, Event.RetryFencing, Fencing); | ||||
|             FSM.addTransition(Fencing, Event.Fenced, Fenced); | ||||
| 
 | ||||
|             FSM.addTransition(Fenced, Event.Disabled, Disabled); | ||||
|  | ||||
| @ -54,7 +54,7 @@ public class KVMHAChecker extends KVMHABase implements Callable<Boolean> { | ||||
|             OutputInterpreter.OneLineParser parser = new OutputInterpreter.OneLineParser(); | ||||
|             String result = cmd.execute(parser); | ||||
|             s_logger.debug("KVMHAChecker pool: " + pool._poolIp); | ||||
|             s_logger.debug("KVMHAChecker reture: " + result); | ||||
|             s_logger.debug("KVMHAChecker result: " + result); | ||||
|             s_logger.debug("KVMHAChecker parser: " + parser.getLine()); | ||||
|             if (result == null && parser.getLine().contains("> DEAD <")) { | ||||
|                 s_logger.debug("read heartbeat failed: "); | ||||
|  | ||||
| @ -72,6 +72,9 @@ public class SimulatorHAProvider extends HAAbstractHostProvider implements HAPro | ||||
| 
 | ||||
|     @Override | ||||
|     public boolean isEligible(final Host host) { | ||||
|         if (host == null) { | ||||
|             return false; | ||||
|         } | ||||
|         final SimulatorHAState haState = hostHAStateMap.get(host.getId()); | ||||
|         return !isInMaintenanceMode(host) && !isDisabled(host) && haState != null | ||||
|                 && Hypervisor.HypervisorType.Simulator.equals(host.getHypervisorType()); | ||||
|  | ||||
| @ -40,7 +40,7 @@ | ||||
|     <dependency> | ||||
|         <groupId>br.com.autonomiccs</groupId> | ||||
|         <artifactId>apache-cloudstack-java-client</artifactId> | ||||
|         <version>1.0.4</version> | ||||
|         <version>1.0.5</version> | ||||
|     </dependency> | ||||
|   </dependencies> | ||||
| </project> | ||||
|  | ||||
| @ -17,32 +17,20 @@ | ||||
| 
 | ||||
| package org.apache.cloudstack.ha; | ||||
| 
 | ||||
| import com.cloud.cluster.ClusterManagerListener; | ||||
| import com.cloud.cluster.ManagementServerHost; | ||||
| import com.cloud.dc.ClusterDetailsDao; | ||||
| import com.cloud.dc.ClusterDetailsVO; | ||||
| import com.cloud.dc.DataCenter; | ||||
| import com.cloud.dc.DataCenterDetailVO; | ||||
| import com.cloud.dc.dao.DataCenterDetailsDao; | ||||
| import com.cloud.domain.Domain; | ||||
| import com.cloud.event.ActionEvent; | ||||
| import com.cloud.event.ActionEventUtils; | ||||
| import com.cloud.event.EventTypes; | ||||
| import com.cloud.ha.Investigator; | ||||
| import com.cloud.host.Host; | ||||
| import com.cloud.host.Status; | ||||
| import com.cloud.host.dao.HostDao; | ||||
| import com.cloud.org.Cluster; | ||||
| import com.cloud.utils.component.ComponentContext; | ||||
| import com.cloud.utils.component.ManagerBase; | ||||
| import com.cloud.utils.component.PluggableService; | ||||
| import com.cloud.utils.db.Transaction; | ||||
| import com.cloud.utils.db.TransactionCallback; | ||||
| import com.cloud.utils.db.TransactionStatus; | ||||
| import com.cloud.utils.exception.CloudRuntimeException; | ||||
| import com.cloud.utils.fsm.NoTransitionException; | ||||
| import com.google.common.base.Preconditions; | ||||
| import com.google.common.base.Strings; | ||||
| import java.util.ArrayList; | ||||
| import java.util.HashMap; | ||||
| import java.util.List; | ||||
| import java.util.Map; | ||||
| import java.util.concurrent.ArrayBlockingQueue; | ||||
| import java.util.concurrent.ConcurrentHashMap; | ||||
| import java.util.concurrent.ExecutorService; | ||||
| import java.util.concurrent.Future; | ||||
| import java.util.concurrent.ThreadPoolExecutor; | ||||
| import java.util.concurrent.TimeUnit; | ||||
| 
 | ||||
| import javax.inject.Inject; | ||||
| import javax.naming.ConfigurationException; | ||||
| 
 | ||||
| import org.apache.cloudstack.api.ApiErrorCode; | ||||
| import org.apache.cloudstack.api.ServerApiException; | ||||
| import org.apache.cloudstack.api.command.admin.ha.ConfigureHAForHostCmd; | ||||
| @ -71,20 +59,36 @@ import org.apache.cloudstack.poll.BackgroundPollTask; | ||||
| import org.apache.cloudstack.utils.identity.ManagementServerNode; | ||||
| import org.apache.log4j.Logger; | ||||
| 
 | ||||
| import javax.inject.Inject; | ||||
| import javax.naming.ConfigurationException; | ||||
| import java.util.ArrayList; | ||||
| import java.util.HashMap; | ||||
| import java.util.List; | ||||
| import java.util.Map; | ||||
| import java.util.concurrent.ArrayBlockingQueue; | ||||
| import java.util.concurrent.ConcurrentHashMap; | ||||
| import java.util.concurrent.ExecutorService; | ||||
| import java.util.concurrent.Future; | ||||
| import java.util.concurrent.ThreadPoolExecutor; | ||||
| import java.util.concurrent.TimeUnit; | ||||
| import com.cloud.cluster.ClusterManagerListener; | ||||
| import com.cloud.cluster.ManagementServerHost; | ||||
| import com.cloud.dc.ClusterDetailsDao; | ||||
| import com.cloud.dc.ClusterDetailsVO; | ||||
| import com.cloud.dc.DataCenter; | ||||
| import com.cloud.dc.DataCenterDetailVO; | ||||
| import com.cloud.dc.dao.DataCenterDetailsDao; | ||||
| import com.cloud.domain.Domain; | ||||
| import com.cloud.event.ActionEvent; | ||||
| import com.cloud.event.ActionEventUtils; | ||||
| import com.cloud.event.EventTypes; | ||||
| import com.cloud.ha.Investigator; | ||||
| import com.cloud.host.Host; | ||||
| import com.cloud.host.Status; | ||||
| import com.cloud.host.dao.HostDao; | ||||
| import com.cloud.org.Cluster; | ||||
| import com.cloud.utils.component.ComponentContext; | ||||
| import com.cloud.utils.component.ManagerBase; | ||||
| import com.cloud.utils.component.PluggableService; | ||||
| import com.cloud.utils.db.Transaction; | ||||
| import com.cloud.utils.db.TransactionCallback; | ||||
| import com.cloud.utils.db.TransactionStatus; | ||||
| import com.cloud.utils.exception.CloudRuntimeException; | ||||
| import com.cloud.utils.fsm.NoTransitionException; | ||||
| import com.cloud.utils.fsm.StateListener; | ||||
| import com.cloud.utils.fsm.StateMachine2; | ||||
| import com.google.common.base.Preconditions; | ||||
| import com.google.common.base.Strings; | ||||
| 
 | ||||
| public final class HAManagerImpl extends ManagerBase implements HAManager, ClusterManagerListener, PluggableService, Configurable { | ||||
| public final class HAManagerImpl extends ManagerBase implements HAManager, ClusterManagerListener, PluggableService, Configurable, StateListener<HAConfig.HAState, HAConfig.Event, HAConfig> { | ||||
|     public static final Logger LOG = Logger.getLogger(HAManagerImpl.class); | ||||
| 
 | ||||
|     @Inject | ||||
| @ -307,7 +311,7 @@ public final class HAManagerImpl extends ManagerBase implements HAManager, Clust | ||||
|                     LOG.debug("HA: Agent is available/suspect/checking Up " + host.getId()); | ||||
|                 } | ||||
|                 return Status.Down; | ||||
|             } else if (haConfig.getState() == HAConfig.HAState.Degraded || haConfig.getState() == HAConfig.HAState.Recovering || haConfig.getState() == HAConfig.HAState.Recovered || haConfig.getState() == HAConfig.HAState.Fencing) { | ||||
|             } else if (haConfig.getState() == HAConfig.HAState.Degraded || haConfig.getState() == HAConfig.HAState.Recovering || haConfig.getState() == HAConfig.HAState.Fencing) { | ||||
|                 if (LOG.isDebugEnabled()){ | ||||
|                     LOG.debug("HA: Agent is disconnected " + host.getId()); | ||||
|                 } | ||||
| @ -455,23 +459,84 @@ public final class HAManagerImpl extends ManagerBase implements HAManager, Clust | ||||
|         return cmdList; | ||||
|     } | ||||
| 
 | ||||
|     ////////////////////////////////////////////////////////////////// | ||||
|     //////////////// Clustered Manager Listeners ///////////////////// | ||||
|     ////////////////////////////////////////////////////////////////// | ||||
|     ////////////////////////////////////////////////////// | ||||
|     //////////////// Event Listeners ///////////////////// | ||||
|     ////////////////////////////////////////////////////// | ||||
| 
 | ||||
|     @Override | ||||
|     public void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList, long selfNodeId) { | ||||
| 
 | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public void onManagementNodeLeft(List<? extends ManagementServerHost> nodeList, long selfNodeId) { | ||||
| 
 | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public void onManagementNodeIsolated() { | ||||
|     } | ||||
| 
 | ||||
|     private boolean processHAStateChange(final HAConfig haConfig, final boolean status) { | ||||
|         if (!status || !checkHAOwnership(haConfig)) { | ||||
|             return false; | ||||
|         } | ||||
| 
 | ||||
|         final HAResource resource = validateAndFindHAResource(haConfig); | ||||
|         if (resource == null) { | ||||
|             return false; | ||||
|         } | ||||
| 
 | ||||
|         final HAProvider<HAResource> haProvider = validateAndFindHAProvider(haConfig, resource); | ||||
|         if (haProvider == null) { | ||||
|             return false; | ||||
|         } | ||||
| 
 | ||||
|         final HAResourceCounter counter = getHACounter(haConfig.getResourceId(), haConfig.getResourceType()); | ||||
| 
 | ||||
|         // Perform activity checks | ||||
|         if (haConfig.getState() == HAConfig.HAState.Checking) { | ||||
|             final ActivityCheckTask job = ComponentContext.inject(new ActivityCheckTask(resource, haProvider, haConfig, | ||||
|                     HAProviderConfig.ActivityCheckTimeout, activityCheckExecutor, counter.getSuspectTimeStamp())); | ||||
|             activityCheckExecutor.submit(job); | ||||
|         } | ||||
| 
 | ||||
|         // Attempt recovery | ||||
|         if (haConfig.getState() == HAConfig.HAState.Recovering) { | ||||
|             if (counter.getRecoveryCounter() >= (Long) (haProvider.getConfigValue(HAProviderConfig.MaxRecoveryAttempts, resource))) { | ||||
|                 return false; | ||||
|             } | ||||
|             final RecoveryTask task = ComponentContext.inject(new RecoveryTask(resource, haProvider, haConfig, | ||||
|                     HAProviderConfig.RecoveryTimeout, recoveryExecutor)); | ||||
|             final Future<Boolean> recoveryFuture = recoveryExecutor.submit(task); | ||||
|             counter.setRecoveryFuture(recoveryFuture); | ||||
|         } | ||||
| 
 | ||||
|         // Fencing | ||||
|         if (haConfig.getState() == HAConfig.HAState.Fencing) { | ||||
|             final FenceTask task = ComponentContext.inject(new FenceTask(resource, haProvider, haConfig, | ||||
|                     HAProviderConfig.FenceTimeout, fenceExecutor)); | ||||
|             final Future<Boolean> fenceFuture = fenceExecutor.submit(task); | ||||
|             counter.setFenceFuture(fenceFuture); | ||||
|         } | ||||
|         return true; | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public boolean preStateTransitionEvent(final HAConfig.HAState oldState, final HAConfig.Event event, final HAConfig.HAState newState, final HAConfig haConfig, final boolean status, final Object opaque) { | ||||
|         if (oldState != newState || newState == HAConfig.HAState.Suspect || newState == HAConfig.HAState.Checking) { | ||||
|             return false; | ||||
|         } | ||||
|         if (LOG.isTraceEnabled()) { | ||||
|             LOG.trace("HA state pre-transition:: new state=" + newState + ", old state=" + oldState + ", for resource id=" + haConfig.getResourceId() + ", status=" + status + ", ha config state=" + haConfig.getState()); | ||||
|         } | ||||
|         return processHAStateChange(haConfig, status); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public boolean postStateTransitionEvent(final StateMachine2.Transition<HAConfig.HAState, HAConfig.Event> transition, final HAConfig haConfig, final boolean status, final Object opaque) { | ||||
|         if (LOG.isTraceEnabled()) { | ||||
|             LOG.trace("HA state post-transition:: new state=" + transition.getToState() + ", old state=" + transition.getCurrentState() + ", for resource id=" + haConfig.getResourceId() + ", status=" + status + ", ha config state=" + haConfig.getState()); | ||||
|         } | ||||
|         return processHAStateChange(haConfig, status); | ||||
|     } | ||||
| 
 | ||||
|     /////////////////////////////////////////////////// | ||||
| @ -523,10 +588,8 @@ public final class HAManagerImpl extends ManagerBase implements HAManager, Clust | ||||
|                 0L, TimeUnit.MILLISECONDS, | ||||
|                 new ArrayBlockingQueue<Runnable>(fenceOperationQueueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); | ||||
| 
 | ||||
|         pollManager.submitTask(new HealthCheckPollTask()); | ||||
|         pollManager.submitTask(new ActivityCheckPollTask()); | ||||
|         pollManager.submitTask(new RecoveryPollTask()); | ||||
|         pollManager.submitTask(new FencingPollTask()); | ||||
|         pollManager.submitTask(new HAManagerBgPollTask()); | ||||
|         HAConfig.HAState.getStateMachine().registerListener(this); | ||||
| 
 | ||||
|         LOG.debug("HA manager has been configured"); | ||||
|         return true; | ||||
| @ -559,7 +622,7 @@ public final class HAManagerImpl extends ManagerBase implements HAManager, Clust | ||||
|     //////////////// Poll Tasks ///////////////////// | ||||
|     ///////////////////////////////////////////////// | ||||
| 
 | ||||
|     private final class HealthCheckPollTask extends ManagedContextRunnable implements BackgroundPollTask { | ||||
|     private final class HAManagerBgPollTask extends ManagedContextRunnable implements BackgroundPollTask { | ||||
|         @Override | ||||
|         protected void runInContext() { | ||||
|             try { | ||||
| @ -582,6 +645,19 @@ public final class HAManagerImpl extends ManagerBase implements HAManager, Clust | ||||
|                         continue; | ||||
|                     } | ||||
| 
 | ||||
|                     switch (haConfig.getState()) { | ||||
|                         case Available: | ||||
|                         case Suspect: | ||||
|                         case Degraded: | ||||
|                         case Fenced: | ||||
|                             final HealthCheckTask task = ComponentContext.inject(new HealthCheckTask(resource, haProvider, haConfig, | ||||
|                                     HAProviderConfig.HealthCheckTimeout, healthCheckExecutor)); | ||||
|                             healthCheckExecutor.submit(task); | ||||
|                             break; | ||||
|                     default: | ||||
|                         break; | ||||
|                     } | ||||
| 
 | ||||
|                     final HAResourceCounter counter = getHACounter(haConfig.getResourceId(), haConfig.getResourceType()); | ||||
| 
 | ||||
|                     if (haConfig.getState() == HAConfig.HAState.Suspect) { | ||||
| @ -596,17 +672,25 @@ public final class HAManagerImpl extends ManagerBase implements HAManager, Clust | ||||
|                         } | ||||
|                     } | ||||
| 
 | ||||
|                     switch (haConfig.getState()) { | ||||
|                         case Available: | ||||
|                         case Suspect: | ||||
|                         case Degraded: | ||||
|                         case Fenced: | ||||
|                             final HealthCheckTask task = ComponentContext.inject(new HealthCheckTask(resource, haProvider, haConfig, | ||||
|                                     HAProviderConfig.HealthCheckTimeout, healthCheckExecutor)); | ||||
|                             healthCheckExecutor.submit(task); | ||||
|                             break; | ||||
|                     default: | ||||
|                         break; | ||||
|                     if (haConfig.getState() == HAConfig.HAState.Recovering) { | ||||
|                         if (counter.getRecoveryCounter() >= (Long) (haProvider.getConfigValue(HAProviderConfig.MaxRecoveryAttempts, resource))) { | ||||
|                             transitionHAState(HAConfig.Event.RecoveryOperationThresholdExceeded, haConfig); | ||||
|                         } else { | ||||
|                             transitionHAState(HAConfig.Event.RetryRecovery, haConfig); | ||||
|                         } | ||||
|                     } | ||||
| 
 | ||||
|                     if (haConfig.getState() == HAConfig.HAState.Recovered) { | ||||
|                         counter.markRecoveryStarted(); | ||||
|                         if (counter.canExitRecovery((Long)(haProvider.getConfigValue(HAProviderConfig.RecoveryWaitTimeout, resource)))) { | ||||
|                             if (transitionHAState(HAConfig.Event.RecoveryWaitPeriodTimeout, haConfig)) { | ||||
|                                 counter.markRecoveryCompleted(); | ||||
|                             } | ||||
|                         } | ||||
|                     } | ||||
| 
 | ||||
|                     if (haConfig.getState() == HAConfig.HAState.Fencing && counter.canAttemptFencing()) { | ||||
|                         transitionHAState(HAConfig.Event.RetryFencing, haConfig); | ||||
|                     } | ||||
|                 } | ||||
|             } catch (Throwable t) { | ||||
| @ -614,131 +698,4 @@ public final class HAManagerImpl extends ManagerBase implements HAManager, Clust | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     private final class ActivityCheckPollTask extends ManagedContextRunnable implements BackgroundPollTask { | ||||
|         @Override | ||||
|         protected void runInContext() { | ||||
|             try { | ||||
|                 if (LOG.isTraceEnabled()) { | ||||
|                     LOG.trace("HA activity check task is running..."); | ||||
|                 } | ||||
|                 final List<HAConfig> haConfigList = new ArrayList<HAConfig>(haConfigDao.listAll()); | ||||
|                 for (final HAConfig haConfig : haConfigList) { | ||||
|                     if (!checkHAOwnership(haConfig)) { | ||||
|                         continue; | ||||
|                     } | ||||
| 
 | ||||
|                     final HAResource resource = validateAndFindHAResource(haConfig); | ||||
|                     if (resource == null) { | ||||
|                         continue; | ||||
|                     } | ||||
| 
 | ||||
|                     final HAProvider<HAResource> haProvider = validateAndFindHAProvider(haConfig, resource); | ||||
|                     if (haProvider == null) { | ||||
|                         continue; | ||||
|                     } | ||||
| 
 | ||||
|                     if (haConfig.getState() == HAConfig.HAState.Checking) { | ||||
|                         final HAResourceCounter counter = getHACounter(haConfig.getResourceId(), haConfig.getResourceType()); | ||||
|                         final ActivityCheckTask job = ComponentContext.inject(new ActivityCheckTask(resource, haProvider, haConfig, | ||||
|                                 HAProviderConfig.ActivityCheckTimeout, activityCheckExecutor, counter.getSuspectTimeStamp())); | ||||
|                         activityCheckExecutor.submit(job); | ||||
|                     } | ||||
|                 } | ||||
|             } catch (Throwable t) { | ||||
|                 LOG.error("Error trying to perform activity checks in HA manager", t); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     private final class RecoveryPollTask extends ManagedContextRunnable implements BackgroundPollTask { | ||||
|         @Override | ||||
|         protected void runInContext() { | ||||
|             try { | ||||
|                 if (LOG.isTraceEnabled()) { | ||||
|                     LOG.trace("HA recovery task is running..."); | ||||
|                 } | ||||
|                 final List<HAConfig> haConfigList = new ArrayList<HAConfig>(haConfigDao.listAll()); | ||||
|                 for (final HAConfig haConfig : haConfigList) { | ||||
|                     if (!checkHAOwnership(haConfig)) { | ||||
|                         continue; | ||||
|                     } | ||||
| 
 | ||||
|                     final HAResource resource = validateAndFindHAResource(haConfig); | ||||
|                     if (resource == null) { | ||||
|                         continue; | ||||
|                     } | ||||
| 
 | ||||
|                     final HAProvider<HAResource> haProvider = validateAndFindHAProvider(haConfig, resource); | ||||
|                     if (haProvider == null) { | ||||
|                         continue; | ||||
|                     } | ||||
| 
 | ||||
|                     final HAResourceCounter counter = getHACounter(haConfig.getResourceId(), haConfig.getResourceType()); | ||||
|                     if (haConfig.getState() == HAConfig.HAState.Recovering) { | ||||
|                         if (counter.canAttemptRecovery()) { | ||||
|                             if (counter.getRecoveryCounter() >= (Long)(haProvider.getConfigValue(HAProviderConfig.MaxRecoveryAttempts, resource))) { | ||||
|                                 transitionHAState(HAConfig.Event.RecoveryOperationThresholdExceeded, haConfig); | ||||
|                                 continue; | ||||
|                             } | ||||
| 
 | ||||
|                             final RecoveryTask task = ComponentContext.inject(new RecoveryTask(resource, haProvider, haConfig, | ||||
|                                     HAProviderConfig.RecoveryTimeout, recoveryExecutor)); | ||||
|                             final Future<Boolean> recoveryFuture = recoveryExecutor.submit(task); | ||||
|                             counter.setRecoveryFuture(recoveryFuture); | ||||
|                             counter.incrRecoveryCounter(); | ||||
|                         } | ||||
|                     } | ||||
|                     if (haConfig.getState() == HAConfig.HAState.Recovered) { | ||||
|                         counter.markRecoveryStarted(); | ||||
|                         if (counter.canExitRecovery((Long)(haProvider.getConfigValue(HAProviderConfig.RecoveryWaitTimeout, resource)))) { | ||||
|                             transitionHAState(HAConfig.Event.RecoveryWaitPeriodTimeout, haConfig); | ||||
|                             counter.markRecoveryCompleted(); | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|             } catch (Throwable t) { | ||||
|                 LOG.error("Error trying to perform recovery operation in HA manager", t); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     private final class FencingPollTask extends ManagedContextRunnable implements BackgroundPollTask { | ||||
|         @Override | ||||
|         protected void runInContext() { | ||||
|             try { | ||||
|                 if (LOG.isTraceEnabled()) { | ||||
|                     LOG.trace("HA fencing task is running..."); | ||||
|                 } | ||||
|                 final List<HAConfig> haConfigList = new ArrayList<HAConfig>(haConfigDao.listAll()); | ||||
|                 for (final HAConfig haConfig : haConfigList) { | ||||
|                     if (!checkHAOwnership(haConfig)) { | ||||
|                         continue; | ||||
|                     } | ||||
| 
 | ||||
|                     final HAResource resource = validateAndFindHAResource(haConfig); | ||||
|                     if (resource == null) { | ||||
|                         continue; | ||||
|                     } | ||||
| 
 | ||||
|                     final HAProvider<HAResource> haProvider = validateAndFindHAProvider(haConfig, resource); | ||||
|                     if (haProvider == null) { | ||||
|                         continue; | ||||
|                     } | ||||
| 
 | ||||
|                     final HAResourceCounter counter = getHACounter(haConfig.getResourceId(), haConfig.getResourceType()); | ||||
|                     if (counter.lastFencingCompleted()) { | ||||
|                         if (haConfig.getState() == HAConfig.HAState.Fencing) { | ||||
|                             final FenceTask task = ComponentContext.inject(new FenceTask(resource, haProvider, haConfig, | ||||
|                                     HAProviderConfig.FenceTimeout, fenceExecutor)); | ||||
|                             final Future<Boolean> fenceFuture = fenceExecutor.submit(task); | ||||
|                             counter.setFenceFuture(fenceFuture); | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|             } catch (Throwable t) { | ||||
|                 LOG.error("Error trying to perform fencing operation in HA manager", t); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| @ -41,7 +41,6 @@ public final class HAResourceCounter { | ||||
|     } | ||||
| 
 | ||||
|     public synchronized void incrActivityCounter(final boolean isFailure) { | ||||
|         lastActivityCheckTimestamp = System.currentTimeMillis(); | ||||
|         activityCheckCounter.incrementAndGet(); | ||||
|         if (isFailure) { | ||||
|             activityCheckFailureCounter.incrementAndGet(); | ||||
| @ -71,8 +70,12 @@ public final class HAResourceCounter { | ||||
|         return activityCheckFailureCounter.get() > (activityCheckCounter.get() * failureRatio); | ||||
|     } | ||||
| 
 | ||||
|     public boolean canPerformActivityCheck(final Long activityCheckInterval) { | ||||
|         return lastActivityCheckTimestamp == null || (System.currentTimeMillis() - lastActivityCheckTimestamp) > (activityCheckInterval * 1000); | ||||
|     public synchronized boolean canPerformActivityCheck(final Long activityCheckInterval) { | ||||
|         if (lastActivityCheckTimestamp == null || (System.currentTimeMillis() - lastActivityCheckTimestamp) > (activityCheckInterval * 1000)) { | ||||
|             lastActivityCheckTimestamp = System.currentTimeMillis(); | ||||
|             return true; | ||||
|         } | ||||
|         return false; | ||||
|     } | ||||
| 
 | ||||
|     public boolean canRecheckActivity(final Long maxDegradedPeriod) { | ||||
| @ -121,7 +124,7 @@ public final class HAResourceCounter { | ||||
|         fenceFuture = future; | ||||
|     } | ||||
| 
 | ||||
|     public boolean lastFencingCompleted() { | ||||
|     public boolean canAttemptFencing() { | ||||
|         return fenceFuture == null || fenceFuture.isDone(); | ||||
|     } | ||||
| 
 | ||||
|  | ||||
| @ -17,12 +17,11 @@ | ||||
| 
 | ||||
| package org.apache.cloudstack.ha.provider; | ||||
| 
 | ||||
| import com.cloud.utils.component.Adapter; | ||||
| 
 | ||||
| import org.apache.cloudstack.ha.HAConfig; | ||||
| import org.apache.cloudstack.ha.HAResource; | ||||
| import org.joda.time.DateTime; | ||||
| 
 | ||||
| import org.apache.cloudstack.ha.HAResource; | ||||
| import com.cloud.utils.component.Adapter; | ||||
| 
 | ||||
| public interface HAProvider<R extends HAResource> extends Adapter { | ||||
| 
 | ||||
| @ -57,7 +56,9 @@ public interface HAProvider<R extends HAResource> extends Adapter { | ||||
| 
 | ||||
|     boolean fence(R r) throws HAFenceException; | ||||
| 
 | ||||
|     void setFenced(R r); | ||||
|     void fenceSubResources(R r); | ||||
| 
 | ||||
|     void enableMaintenance(R r); | ||||
| 
 | ||||
|     void sendAlert(R r, HAConfig.HAState nextState); | ||||
| 
 | ||||
|  | ||||
| @ -71,7 +71,7 @@ public abstract class HAAbstractHostProvider extends AdapterBase implements HAPr | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public void setFenced(final Host r) { | ||||
|     public void fenceSubResources(final Host r) { | ||||
|         if (r.getState() != Status.Down) { | ||||
|             try { | ||||
|                 LOG.debug("Trying to disconnect the host without investigation and scheduling HA for the VMs on host id=" + r.getId()); | ||||
| @ -80,11 +80,15 @@ public abstract class HAAbstractHostProvider extends AdapterBase implements HAPr | ||||
|             } catch (Exception e) { | ||||
|                 LOG.error("Failed to disconnect host and schedule HA restart of VMs after fencing the host: ", e); | ||||
|             } | ||||
|             try { | ||||
|                 resourceManager.resourceStateTransitTo(r, ResourceState.Event.InternalEnterMaintenance, ManagementServerNode.getManagementServerId()); | ||||
|             } catch (NoTransitionException e) { | ||||
|                 LOG.error("Failed to put host in maintenance mode after host-ha fencing and scheduling VM-HA: ", e); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public void enableMaintenance(final Host r) { | ||||
|         try { | ||||
|             resourceManager.resourceStateTransitTo(r, ResourceState.Event.InternalEnterMaintenance, ManagementServerNode.getManagementServerId()); | ||||
|         } catch (NoTransitionException e) { | ||||
|             LOG.error("Failed to put host in maintenance mode after host-ha fencing and scheduling VM-HA: ", e); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|  | ||||
| @ -17,6 +17,10 @@ | ||||
| 
 | ||||
| package org.apache.cloudstack.ha.task; | ||||
| 
 | ||||
| import java.util.concurrent.ExecutorService; | ||||
| 
 | ||||
| import javax.inject.Inject; | ||||
| 
 | ||||
| import org.apache.cloudstack.ha.HAConfig; | ||||
| import org.apache.cloudstack.ha.HAManager; | ||||
| import org.apache.cloudstack.ha.HAResource; | ||||
| @ -25,11 +29,7 @@ import org.apache.cloudstack.ha.provider.HACheckerException; | ||||
| import org.apache.cloudstack.ha.provider.HAProvider; | ||||
| import org.apache.cloudstack.ha.provider.HAProvider.HAProviderConfig; | ||||
| import org.apache.log4j.Logger; | ||||
| 
 | ||||
| import javax.inject.Inject; | ||||
| 
 | ||||
| import org.joda.time.DateTime; | ||||
| import java.util.concurrent.ExecutorService; | ||||
| 
 | ||||
| public class ActivityCheckTask extends BaseHATask { | ||||
| 
 | ||||
| @ -38,22 +38,24 @@ public class ActivityCheckTask extends BaseHATask { | ||||
|     @Inject | ||||
|     private HAManager haManager; | ||||
| 
 | ||||
|     private final long disconnectTime; | ||||
|     private long disconnectTime; | ||||
|     private long maxActivityChecks; | ||||
|     private double activityCheckFailureRatio; | ||||
| 
 | ||||
|     public ActivityCheckTask(final HAResource resource, final HAProvider<HAResource> haProvider, final HAConfig haConfig, final HAProvider.HAProviderConfig haProviderConfig, | ||||
|             final ExecutorService executor, final long disconnectTime) { | ||||
|         super(resource, haProvider, haConfig, haProviderConfig, executor); | ||||
|         this.disconnectTime = disconnectTime; | ||||
|         this.maxActivityChecks = (Long)haProvider.getConfigValue(HAProviderConfig.MaxActivityChecks, resource); | ||||
|         this.activityCheckFailureRatio = (Double)haProvider.getConfigValue(HAProviderConfig.ActivityCheckFailureRatio, resource); | ||||
|     } | ||||
| 
 | ||||
|     public boolean performAction() throws HACheckerException { | ||||
|         return getHaProvider().hasActivity(getResource(), new DateTime(disconnectTime)); | ||||
|     } | ||||
| 
 | ||||
|     public void processResult(boolean result, Throwable t) { | ||||
|     public synchronized void processResult(boolean result, Throwable t) { | ||||
|         final HAConfig haConfig = getHaConfig(); | ||||
|         final HAProvider<HAResource> haProvider = getHaProvider(); | ||||
|         final HAResource resource = getResource(); | ||||
|         final HAResourceCounter counter = haManager.getHACounter(haConfig.getResourceId(), haConfig.getResourceType()); | ||||
| 
 | ||||
|         if (t != null && t instanceof HACheckerException) { | ||||
| @ -64,18 +66,17 @@ public class ActivityCheckTask extends BaseHATask { | ||||
| 
 | ||||
|         counter.incrActivityCounter(!result); | ||||
| 
 | ||||
|         long maxActivityChecks = (Long)haProvider.getConfigValue(HAProviderConfig.MaxActivityChecks, resource); | ||||
|         if (counter.getActivityCheckCounter() < maxActivityChecks) { | ||||
|             haManager.transitionHAState(HAConfig.Event.TooFewActivityCheckSamples, haConfig); | ||||
|             return; | ||||
|         } | ||||
| 
 | ||||
|         double activityCheckFailureRatio = (Double)haProvider.getConfigValue(HAProviderConfig.ActivityCheckFailureRatio, resource); | ||||
|         if (counter.hasActivityThresholdExceeded(activityCheckFailureRatio)) { | ||||
|             haManager.transitionHAState(HAConfig.Event.ActivityCheckFailureOverThresholdRatio, haConfig); | ||||
|         } else { | ||||
|             haManager.transitionHAState(HAConfig.Event.ActivityCheckFailureUnderThresholdRatio, haConfig); | ||||
|             counter.markResourceDegraded(); | ||||
|             if (haManager.transitionHAState(HAConfig.Event.ActivityCheckFailureUnderThresholdRatio, haConfig)) { | ||||
|                 counter.markResourceDegraded(); | ||||
|             } | ||||
|         } | ||||
|         counter.resetActivityCounter(); | ||||
|     } | ||||
|  | ||||
| @ -17,6 +17,13 @@ | ||||
| 
 | ||||
| package org.apache.cloudstack.ha.task; | ||||
| 
 | ||||
| import java.util.concurrent.Callable; | ||||
| import java.util.concurrent.ExecutionException; | ||||
| import java.util.concurrent.ExecutorService; | ||||
| import java.util.concurrent.Future; | ||||
| import java.util.concurrent.TimeUnit; | ||||
| import java.util.concurrent.TimeoutException; | ||||
| 
 | ||||
| import org.apache.cloudstack.ha.HAConfig; | ||||
| import org.apache.cloudstack.ha.HAResource; | ||||
| import org.apache.cloudstack.ha.provider.HACheckerException; | ||||
| @ -24,13 +31,7 @@ import org.apache.cloudstack.ha.provider.HAFenceException; | ||||
| import org.apache.cloudstack.ha.provider.HAProvider; | ||||
| import org.apache.cloudstack.ha.provider.HARecoveryException; | ||||
| import org.apache.log4j.Logger; | ||||
| 
 | ||||
| import java.util.concurrent.Callable; | ||||
| import java.util.concurrent.ExecutionException; | ||||
| import java.util.concurrent.ExecutorService; | ||||
| import java.util.concurrent.Future; | ||||
| import java.util.concurrent.TimeUnit; | ||||
| import java.util.concurrent.TimeoutException; | ||||
| import org.joda.time.DateTime; | ||||
| 
 | ||||
| public abstract class BaseHATask implements Callable<Boolean> { | ||||
|     public static final Logger LOG = Logger.getLogger(BaseHATask.class); | ||||
| @ -40,6 +41,7 @@ public abstract class BaseHATask implements Callable<Boolean> { | ||||
|     private final HAConfig haConfig; | ||||
|     private final ExecutorService executor; | ||||
|     private Long timeout; | ||||
|     private DateTime created; | ||||
| 
 | ||||
|     public BaseHATask(final HAResource resource, final HAProvider<HAResource> haProvider, final HAConfig haConfig, final HAProvider.HAProviderConfig haProviderConfig, | ||||
|             final ExecutorService executor) { | ||||
| @ -48,6 +50,7 @@ public abstract class BaseHATask implements Callable<Boolean> { | ||||
|         this.haConfig = haConfig; | ||||
|         this.executor = executor; | ||||
|         this.timeout = (Long)haProvider.getConfigValue(haProviderConfig, resource); | ||||
|         this.created = new DateTime(); | ||||
|     } | ||||
| 
 | ||||
|     public HAProvider<HAResource> getHaProvider() { | ||||
| @ -74,6 +77,9 @@ public abstract class BaseHATask implements Callable<Boolean> { | ||||
| 
 | ||||
|     @Override | ||||
|     public Boolean call() { | ||||
|         if (new DateTime().minusHours(1).isAfter(getCreated())) { | ||||
|             return false; | ||||
|         } | ||||
|         final Future<Boolean> future = executor.submit(new Callable<Boolean>() { | ||||
|             @Override | ||||
|             public Boolean call() throws HACheckerException, HAFenceException, HARecoveryException { | ||||
| @ -99,4 +105,7 @@ public abstract class BaseHATask implements Callable<Boolean> { | ||||
|         return result; | ||||
|     } | ||||
| 
 | ||||
|     public DateTime getCreated() { | ||||
|         return created; | ||||
|     } | ||||
| } | ||||
|  | ||||
| @ -48,7 +48,8 @@ public class FenceTask extends BaseHATask { | ||||
|         if (result) { | ||||
|             counter.resetRecoveryCounter(); | ||||
|             haManager.transitionHAState(HAConfig.Event.Fenced, haConfig); | ||||
|             getHaProvider().setFenced(getResource()); | ||||
|             getHaProvider().fenceSubResources(getResource()); | ||||
|             getHaProvider().enableMaintenance(getResource()); | ||||
|         } | ||||
|         getHaProvider().sendAlert(getResource(), HAConfig.HAState.Fencing); | ||||
|     } | ||||
|  | ||||
| @ -17,16 +17,18 @@ | ||||
| 
 | ||||
| package org.apache.cloudstack.ha.task; | ||||
| 
 | ||||
| import java.util.concurrent.ExecutorService; | ||||
| 
 | ||||
| import javax.inject.Inject; | ||||
| 
 | ||||
| import org.apache.cloudstack.ha.HAConfig; | ||||
| import org.apache.cloudstack.ha.HAManager; | ||||
| import org.apache.cloudstack.ha.HAResource; | ||||
| import org.apache.cloudstack.ha.HAResourceCounter; | ||||
| import org.apache.cloudstack.ha.provider.HACheckerException; | ||||
| import org.apache.cloudstack.ha.provider.HAProvider; | ||||
| import org.apache.cloudstack.ha.provider.HARecoveryException; | ||||
| 
 | ||||
| import javax.inject.Inject; | ||||
| import java.util.concurrent.ExecutorService; | ||||
| 
 | ||||
| public class RecoveryTask extends BaseHATask { | ||||
| 
 | ||||
|     @Inject | ||||
| @ -43,8 +45,13 @@ public class RecoveryTask extends BaseHATask { | ||||
| 
 | ||||
|     public void processResult(boolean result, Throwable e) { | ||||
|         final HAConfig haConfig = getHaConfig(); | ||||
|         final HAResourceCounter counter = haManager.getHACounter(haConfig.getResourceId(), haConfig.getResourceType()); | ||||
|         counter.incrRecoveryCounter(); | ||||
|         counter.resetActivityCounter(); | ||||
| 
 | ||||
|         if (result) { | ||||
|             haManager.transitionHAState(HAConfig.Event.Recovered, haConfig); | ||||
|             getHaProvider().fenceSubResources(getResource()); | ||||
|         } | ||||
|         getHaProvider().sendAlert(getResource(), HAConfig.HAState.Recovering); | ||||
|     } | ||||
|  | ||||
| @ -267,7 +267,7 @@ public class OutOfBandManagementServiceImpl extends ManagerBase implements OutOf | ||||
|     } | ||||
| 
 | ||||
|     public boolean isOutOfBandManagementEnabled(final Host host) { | ||||
|         return isOutOfBandManagementEnabledForZone(host.getDataCenterId()) | ||||
|         return host != null && isOutOfBandManagementEnabledForZone(host.getDataCenterId()) | ||||
|                 && isOutOfBandManagementEnabledForCluster(host.getClusterId()) | ||||
|                 && isOutOfBandManagementEnabledForHost(host.getId()); | ||||
|     } | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user