volume upload: management server polling and upload status from agent

- Added config parameters for upload monitoring interval and operation timeout
- Some updates to the volume state machine
This commit is contained in:
Koushik Das 2015-01-16 15:28:17 +05:30
parent 0eee9e8138
commit 1f1c96d2ee
3 changed files with 43 additions and 19 deletions

View File

@ -106,6 +106,8 @@ public interface Volume extends ControlledEntity, Identity, InternalIdentity, Ba
s_fsm.addTransition(new StateMachine2.Transition<State, Event>(Expunged, Event.OperationFailed, Expunged, null)); s_fsm.addTransition(new StateMachine2.Transition<State, Event>(Expunged, Event.OperationFailed, Expunged, null));
s_fsm.addTransition(new StateMachine2.Transition<State, Event>(NotUploaded, Event.OperationTimeout, UploadAbandoned, null)); s_fsm.addTransition(new StateMachine2.Transition<State, Event>(NotUploaded, Event.OperationTimeout, UploadAbandoned, null));
s_fsm.addTransition(new StateMachine2.Transition<State, Event>(NotUploaded, Event.UploadRequested, UploadInProgress, null)); s_fsm.addTransition(new StateMachine2.Transition<State, Event>(NotUploaded, Event.UploadRequested, UploadInProgress, null));
s_fsm.addTransition(new StateMachine2.Transition<State, Event>(NotUploaded, Event.OperationSucceeded, Uploaded, null));
s_fsm.addTransition(new StateMachine2.Transition<State, Event>(NotUploaded, Event.OperationFailed, UploadError, null));
s_fsm.addTransition(new StateMachine2.Transition<State, Event>(UploadInProgress, Event.OperationSucceeded, Uploaded, null)); s_fsm.addTransition(new StateMachine2.Transition<State, Event>(UploadInProgress, Event.OperationSucceeded, Uploaded, null));
s_fsm.addTransition(new StateMachine2.Transition<State, Event>(UploadInProgress, Event.OperationFailed, UploadError, null)); s_fsm.addTransition(new StateMachine2.Transition<State, Event>(UploadInProgress, Event.OperationFailed, UploadError, null));
s_fsm.addTransition(new StateMachine2.Transition<State, Event>(UploadInProgress, Event.OperationTimeout, UploadError, null)); s_fsm.addTransition(new StateMachine2.Transition<State, Event>(UploadInProgress, Event.OperationTimeout, UploadError, null));

View File

@ -22,20 +22,29 @@ package org.apache.cloudstack.storage.command;
import com.cloud.agent.api.Command; import com.cloud.agent.api.Command;
public class UploadStatusCommand extends Command { public class UploadStatusCommand extends Command {
public enum EntityType {
Volume,
Template
}
private long entityId; private long entityId;
private EntityType entityType;
protected UploadStatusCommand() { protected UploadStatusCommand() {
} }
public UploadStatusCommand(long entityId) { public UploadStatusCommand(long entityId, EntityType entityType) {
this.entityId = entityId; this.entityId = entityId;
this.entityType = entityType;
} }
public long getEntityId() { public long getEntityId() {
return entityId; return entityId;
} }
public EntityType getEntityType() {
return entityType;
}
@Override @Override
public boolean executeInSequence() { public boolean executeInSequence() {
return false; return false;

View File

@ -32,10 +32,12 @@ import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager; import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
import org.apache.cloudstack.engine.subsystem.api.storage.EndPoint; import org.apache.cloudstack.engine.subsystem.api.storage.EndPoint;
import org.apache.cloudstack.engine.subsystem.api.storage.EndPointSelector; import org.apache.cloudstack.engine.subsystem.api.storage.EndPointSelector;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao; import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.framework.config.Configurable;
import org.apache.cloudstack.managed.context.ManagedContextRunnable; import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.cloudstack.storage.command.UploadStatusAnswer; import org.apache.cloudstack.storage.command.UploadStatusAnswer;
import org.apache.cloudstack.storage.command.UploadStatusCommand; import org.apache.cloudstack.storage.command.UploadStatusCommand;
import org.apache.cloudstack.storage.command.UploadStatusCommand.EntityType;
import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao; import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao;
import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO; import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO;
import org.apache.cloudstack.utils.identity.ManagementServerNode; import org.apache.cloudstack.utils.identity.ManagementServerNode;
@ -65,12 +67,10 @@ import com.cloud.utils.fsm.StateMachine2;
*/ */
@Component @Component
@Local(value = {ImageStoreUploadMonitor.class}) @Local(value = {ImageStoreUploadMonitor.class})
public class ImageStoreUploadMonitorImpl extends ManagerBase implements ImageStoreUploadMonitor, Listener { public class ImageStoreUploadMonitorImpl extends ManagerBase implements ImageStoreUploadMonitor, Listener, Configurable {
static final Logger s_logger = Logger.getLogger(ImageStoreUploadMonitorImpl.class); static final Logger s_logger = Logger.getLogger(ImageStoreUploadMonitorImpl.class);
@Inject
private ConfigurationDao _configDao;
@Inject @Inject
private VolumeDao _volumeDao; private VolumeDao _volumeDao;
@Inject @Inject
@ -87,12 +87,16 @@ public class ImageStoreUploadMonitorImpl extends ManagerBase implements ImageSto
private int _monitoringInterval; private int _monitoringInterval;
private long _uploadOperationTimeout; private long _uploadOperationTimeout;
static final ConfigKey<Integer> UploadMonitoringInterval = new ConfigKey<Integer>("Advanced", Integer.class, "upload.monitoring.interval", "60",
"Interval (in seconds) to check the status of volumes that are uploaded using HTTP POST request", true);
static final ConfigKey<Integer> UploadOperationTimeout = new ConfigKey<Integer>("Advanced", Integer.class, "upload.operation.timeout", "10",
"Time (in minutes) to wait before abandoning volume upload using HTTP POST request", true);
@Override @Override
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException { public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
_executor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Upload-Monitor")); _executor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Upload-Monitor"));
//TODO: use config variable _monitoringInterval = UploadMonitoringInterval.value();
_monitoringInterval = 60; _uploadOperationTimeout = UploadOperationTimeout.value() * 60 * 1000;
_uploadOperationTimeout = 10 * 60 * 1000; // 10 minutes
_nodeId = ManagementServerNode.getManagementServerId(); _nodeId = ManagementServerNode.getManagementServerId();
return true; return true;
} }
@ -176,18 +180,18 @@ public class ImageStoreUploadMonitorImpl extends ManagerBase implements ImageSto
} }
Host host = _hostDao.findById(ep.getId()); Host host = _hostDao.findById(ep.getId());
if (host != null && host.getManagementServerId() != null && _nodeId == host.getManagementServerId().longValue()) { if (host != null && host.getManagementServerId() != null && _nodeId == host.getManagementServerId().longValue()) {
UploadStatusCommand cmd = new UploadStatusCommand(volume.getId()); UploadStatusCommand cmd = new UploadStatusCommand(volume.getId(), EntityType.Volume);
Answer answer = ep.sendMessage(cmd); Answer answer = ep.sendMessage(cmd);
if (answer == null || !(answer instanceof UploadStatusAnswer)) { if (answer == null || !(answer instanceof UploadStatusAnswer)) {
s_logger.warn("No or invalid answer corresponding to UploadStatusCommand for volume " + volumeDataStore.getVolumeId()); s_logger.warn("No or invalid answer corresponding to UploadStatusCommand for volume " + volumeDataStore.getVolumeId());
continue; continue;
} }
handleStatusResponse((UploadStatusAnswer)answer, volume, volumeDataStore); handleVolumeStatusResponse((UploadStatusAnswer)answer, volume, volumeDataStore);
} }
} }
} }
private void handleStatusResponse(final UploadStatusAnswer answer, final VolumeVO volume, final VolumeDataStoreVO volumeDataStore) { private void handleVolumeStatusResponse(final UploadStatusAnswer answer, final VolumeVO volume, final VolumeDataStoreVO volumeDataStore) {
final StateMachine2<Volume.State, Event, Volume> stateMachine = Volume.State.getStateMachine(); final StateMachine2<Volume.State, Event, Volume> stateMachine = Volume.State.getStateMachine();
Transaction.execute(new TransactionCallbackNoReturn() { Transaction.execute(new TransactionCallbackNoReturn() {
@Override @Override
@ -204,13 +208,13 @@ public class ImageStoreUploadMonitorImpl extends ManagerBase implements ImageSto
} }
break; break;
case IN_PROGRESS: case IN_PROGRESS:
tmpVolumeDataStore.setDownloadState(VMTemplateStorageResourceAssoc.Status.UPLOAD_IN_PROGRESS); if (tmpVolume.getState() == Volume.State.NotUploaded) {
// check for timeout tmpVolumeDataStore.setDownloadState(VMTemplateStorageResourceAssoc.Status.UPLOAD_IN_PROGRESS);
if (tmpVolumeDataStore.getDownloadState() == VMTemplateStorageResourceAssoc.Status.NOT_UPLOADED || stateMachine.transitTo(tmpVolume, Event.UploadRequested, null, _volumeDao);
tmpVolumeDataStore.getDownloadState() == VMTemplateStorageResourceAssoc.Status.UPLOAD_IN_PROGRESS) { } else if (tmpVolume.getState() == Volume.State.UploadInProgress) { // check for timeout
if (System.currentTimeMillis() - tmpVolumeDataStore.getCreated().getTime() > _uploadOperationTimeout) { if (System.currentTimeMillis() - tmpVolumeDataStore.getCreated().getTime() > _uploadOperationTimeout) {
tmpVolumeDataStore.setDownloadState(VMTemplateStorageResourceAssoc.Status.DOWNLOAD_ERROR); tmpVolumeDataStore.setDownloadState(VMTemplateStorageResourceAssoc.Status.UPLOAD_ERROR);
stateMachine.transitTo(tmpVolume, Event.OperationTimeout, null, _volumeDao); stateMachine.transitTo(tmpVolume, Event.OperationFailed, null, _volumeDao);
if (s_logger.isDebugEnabled()) { if (s_logger.isDebugEnabled()) {
s_logger.debug("Volume " + tmpVolume.getUuid() + " failed to upload due to operation timed out"); s_logger.debug("Volume " + tmpVolume.getUuid() + " failed to upload due to operation timed out");
} }
@ -226,7 +230,7 @@ public class ImageStoreUploadMonitorImpl extends ManagerBase implements ImageSto
break; break;
case UNKNOWN: case UNKNOWN:
// check for timeout // check for timeout
if (tmpVolumeDataStore.getDownloadState() == VMTemplateStorageResourceAssoc.Status.NOT_UPLOADED) { if (tmpVolume.getState() == Volume.State.NotUploaded) {
if (System.currentTimeMillis() - tmpVolumeDataStore.getCreated().getTime() > _uploadOperationTimeout) { if (System.currentTimeMillis() - tmpVolumeDataStore.getCreated().getTime() > _uploadOperationTimeout) {
tmpVolumeDataStore.setDownloadState(VMTemplateStorageResourceAssoc.Status.ABANDONED); tmpVolumeDataStore.setDownloadState(VMTemplateStorageResourceAssoc.Status.ABANDONED);
stateMachine.transitTo(tmpVolume, Event.OperationTimeout, null, _volumeDao); stateMachine.transitTo(tmpVolume, Event.OperationTimeout, null, _volumeDao);
@ -246,5 +250,14 @@ public class ImageStoreUploadMonitorImpl extends ManagerBase implements ImageSto
} }
} }
@Override
public String getConfigComponentName() {
return ImageStoreUploadMonitor.class.getSimpleName();
}
@Override
public ConfigKey<?>[] getConfigKeys() {
return new ConfigKey<?>[] {UploadMonitoringInterval, UploadOperationTimeout};
}
} }