mirror of
https://github.com/apache/cloudstack.git
synced 2025-10-26 08:42:29 +01:00
volume upload: management server polling and upload status from agent
MS polling logic to query status for volumes that are uploaded
This commit is contained in:
parent
c2cf2503c1
commit
627f5a62dc
@ -45,9 +45,12 @@ public interface Volume extends ControlledEntity, Identity, InternalIdentity, Ba
|
||||
Destroy("The volume is destroyed, and can't be recovered."),
|
||||
Destroying("The volume is destroying, and can't be recovered."),
|
||||
UploadOp("The volume upload operation is in progress or in short the volume is on secondary storage"),
|
||||
Uploading("volume is uploading"),
|
||||
Copying("volume is copying from image store to primary, in case it's an uploaded volume"),
|
||||
Uploaded("volume is uploaded");
|
||||
Copying("Volume is copying from image store to primary, in case it's an uploaded volume"),
|
||||
Uploaded("Volume is uploaded"),
|
||||
NotUploaded("The volume entry is just created in DB, not yet uploaded"),
|
||||
UploadInProgress("Volume upload is in progress"),
|
||||
UploadError("Volume upload encountered some error"),
|
||||
UploadAbandoned("Volume upload is abandoned since the upload was never initiated within a specificed time");
|
||||
|
||||
String _description;
|
||||
|
||||
@ -100,7 +103,12 @@ public interface Volume extends ControlledEntity, Identity, InternalIdentity, Ba
|
||||
s_fsm.addTransition(new StateMachine2.Transition<State, Event>(Uploaded, Event.DestroyRequested, Destroy, null));
|
||||
s_fsm.addTransition(new StateMachine2.Transition<State, Event>(Expunged, Event.ExpungingRequested, Expunged, null));
|
||||
s_fsm.addTransition(new StateMachine2.Transition<State, Event>(Expunged, Event.OperationSucceeded, Expunged, null));
|
||||
s_fsm.addTransition(new StateMachine2.Transition<State, Event>(Expunged, Event.OperationFailed, Expunged,null));
|
||||
s_fsm.addTransition(new StateMachine2.Transition<State, Event>(Expunged, Event.OperationFailed, Expunged, null));
|
||||
s_fsm.addTransition(new StateMachine2.Transition<State, Event>(NotUploaded, Event.OperationTimeout, UploadAbandoned, null));
|
||||
s_fsm.addTransition(new StateMachine2.Transition<State, Event>(NotUploaded, Event.UploadRequested, UploadInProgress, null));
|
||||
s_fsm.addTransition(new StateMachine2.Transition<State, Event>(UploadInProgress, Event.OperationSucceeded, Uploaded, null));
|
||||
s_fsm.addTransition(new StateMachine2.Transition<State, Event>(UploadInProgress, Event.OperationFailed, UploadError, null));
|
||||
s_fsm.addTransition(new StateMachine2.Transition<State, Event>(UploadInProgress, Event.OperationTimeout, UploadError, null));
|
||||
}
|
||||
}
|
||||
|
||||
@ -120,7 +128,8 @@ public interface Volume extends ControlledEntity, Identity, InternalIdentity, Ba
|
||||
SnapshotRequested,
|
||||
DestroyRequested,
|
||||
ExpungingRequested,
|
||||
ResizeRequested;
|
||||
ResizeRequested,
|
||||
OperationTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -0,0 +1,53 @@
|
||||
//
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
//
|
||||
|
||||
package org.apache.cloudstack.storage.command;
|
||||
|
||||
import com.cloud.agent.api.Answer;
|
||||
|
||||
public class UploadStatusAnswer extends Answer {
|
||||
public static enum UploadStatus {
|
||||
UNKNOWN, IN_PROGRESS, COMPLETED, ERROR
|
||||
}
|
||||
|
||||
private UploadStatus status;
|
||||
|
||||
protected UploadStatusAnswer() {
|
||||
}
|
||||
|
||||
public UploadStatusAnswer(UploadStatusCommand cmd, UploadStatus status, String msg) {
|
||||
super(cmd, false, msg);
|
||||
this.status = UploadStatus.ERROR;
|
||||
}
|
||||
|
||||
public UploadStatusAnswer(UploadStatusCommand cmd, Exception e) {
|
||||
super(cmd, false, e.getMessage());
|
||||
this.status = UploadStatus.ERROR;
|
||||
}
|
||||
|
||||
public UploadStatusAnswer(UploadStatusCommand cmd, UploadStatus status) {
|
||||
super(cmd, true, null);
|
||||
this.status = status;
|
||||
}
|
||||
|
||||
public UploadStatus getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,44 @@
|
||||
//
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
//
|
||||
|
||||
package org.apache.cloudstack.storage.command;
|
||||
|
||||
import com.cloud.agent.api.Command;
|
||||
|
||||
public class UploadStatusCommand extends Command {
|
||||
|
||||
private long entityId;
|
||||
|
||||
protected UploadStatusCommand() {
|
||||
}
|
||||
|
||||
public UploadStatusCommand(long entityId) {
|
||||
this.entityId = entityId;
|
||||
}
|
||||
|
||||
public long getEntityId() {
|
||||
return entityId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean executeInSequence() {
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
@ -21,6 +21,7 @@ import java.util.List;
|
||||
import org.apache.cloudstack.engine.subsystem.api.storage.DataObjectInStore;
|
||||
import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreStateMachine;
|
||||
|
||||
import com.cloud.storage.Volume;
|
||||
import com.cloud.utils.db.GenericDao;
|
||||
import com.cloud.utils.fsm.StateDao;
|
||||
|
||||
@ -48,4 +49,6 @@ public interface VolumeDataStoreDao extends GenericDao<VolumeDataStoreVO, Long>,
|
||||
void expireDnldUrlsForZone(Long dcId);
|
||||
|
||||
List<VolumeDataStoreVO> listUploadedVolumesByStoreId(long id);
|
||||
|
||||
List<VolumeDataStoreVO> listByVolumeState(Volume.State... states);
|
||||
}
|
||||
|
||||
@ -27,7 +27,6 @@ import javax.naming.ConfigurationException;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import org.apache.cloudstack.engine.subsystem.api.storage.DataObjectInStore;
|
||||
import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
|
||||
import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
|
||||
@ -36,7 +35,11 @@ import org.apache.cloudstack.engine.subsystem.api.storage.ObjectInDataStoreState
|
||||
import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao;
|
||||
import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO;
|
||||
|
||||
import com.cloud.storage.Volume;
|
||||
import com.cloud.storage.VolumeVO;
|
||||
import com.cloud.storage.dao.VolumeDao;
|
||||
import com.cloud.utils.db.GenericDaoBase;
|
||||
import com.cloud.utils.db.JoinBuilder.JoinType;
|
||||
import com.cloud.utils.db.SearchBuilder;
|
||||
import com.cloud.utils.db.SearchCriteria;
|
||||
import com.cloud.utils.db.SearchCriteria.Op;
|
||||
@ -53,11 +56,15 @@ public class VolumeDataStoreDaoImpl extends GenericDaoBase<VolumeDataStoreVO, Lo
|
||||
private SearchBuilder<VolumeDataStoreVO> storeVolumeSearch;
|
||||
private SearchBuilder<VolumeDataStoreVO> downloadVolumeSearch;
|
||||
private SearchBuilder<VolumeDataStoreVO> uploadVolumeSearch;
|
||||
private SearchBuilder<VolumeVO> volumeOnlySearch;
|
||||
private SearchBuilder<VolumeDataStoreVO> uploadVolumeStateSearch;
|
||||
private static final String EXPIRE_DOWNLOAD_URLS_FOR_ZONE = "update volume_store_ref set download_url_created=? where store_id in (select id from image_store where data_center_id=?)";
|
||||
|
||||
|
||||
@Inject
|
||||
DataStoreManager storeMgr;
|
||||
@Inject
|
||||
VolumeDao volumeDao;
|
||||
|
||||
@Override
|
||||
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
|
||||
@ -102,6 +109,13 @@ public class VolumeDataStoreDaoImpl extends GenericDaoBase<VolumeDataStoreVO, Lo
|
||||
uploadVolumeSearch.and("destroyed", uploadVolumeSearch.entity().getDestroyed(), SearchCriteria.Op.EQ);
|
||||
uploadVolumeSearch.done();
|
||||
|
||||
volumeOnlySearch = volumeDao.createSearchBuilder();
|
||||
volumeOnlySearch.and("state", volumeOnlySearch.entity().getState(), Op.IN);
|
||||
uploadVolumeStateSearch = createSearchBuilder();
|
||||
uploadVolumeStateSearch.join("volumeOnlySearch", volumeOnlySearch, volumeOnlySearch.entity().getId(), uploadVolumeStateSearch.entity().getVolumeId(), JoinType.LEFT);
|
||||
uploadVolumeStateSearch.and("destroyed", uploadVolumeStateSearch.entity().getDestroyed(), SearchCriteria.Op.EQ);
|
||||
uploadVolumeStateSearch.done();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -304,4 +318,13 @@ public class VolumeDataStoreDaoImpl extends GenericDaoBase<VolumeDataStoreVO, Lo
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<VolumeDataStoreVO> listByVolumeState(Volume.State... states) {
|
||||
SearchCriteria<VolumeDataStoreVO> sc = uploadVolumeStateSearch.create();
|
||||
sc.setJoinParameters("volumeOnlySearch", "state", (Object[])states);
|
||||
sc.setParameters("destroyed", false);
|
||||
return listIncludingRemovedBy(sc);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -19,6 +19,8 @@ package com.cloud.agent.manager;
|
||||
import org.apache.cloudstack.storage.command.DeleteCommand;
|
||||
import org.apache.cloudstack.storage.command.DownloadCommand;
|
||||
import org.apache.cloudstack.storage.command.DownloadProgressCommand;
|
||||
import org.apache.cloudstack.storage.command.UploadStatusAnswer;
|
||||
import org.apache.cloudstack.storage.command.UploadStatusCommand;
|
||||
|
||||
import com.cloud.agent.api.Answer;
|
||||
import com.cloud.agent.api.AttachIsoCommand;
|
||||
@ -106,4 +108,7 @@ public interface MockStorageManager extends Manager {
|
||||
StoragePoolInfo getLocalStorage(String hostGuid, Long storageSize);
|
||||
|
||||
CopyVolumeAnswer CopyVolume(CopyVolumeCommand cmd);
|
||||
|
||||
public UploadStatusAnswer getUploadStatus(UploadStatusCommand cmd);
|
||||
|
||||
}
|
||||
|
||||
@ -33,10 +33,12 @@ import javax.naming.ConfigurationException;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import org.apache.cloudstack.storage.command.DeleteCommand;
|
||||
import org.apache.cloudstack.storage.command.DownloadCommand;
|
||||
import org.apache.cloudstack.storage.command.DownloadProgressCommand;
|
||||
import org.apache.cloudstack.storage.command.UploadStatusAnswer;
|
||||
import org.apache.cloudstack.storage.command.UploadStatusAnswer.UploadStatus;
|
||||
import org.apache.cloudstack.storage.command.UploadStatusCommand;
|
||||
|
||||
import com.cloud.agent.api.Answer;
|
||||
import com.cloud.agent.api.AttachIsoCommand;
|
||||
@ -1255,4 +1257,9 @@ public class MockStorageManagerImpl extends ManagerBase implements MockStorageMa
|
||||
return new CopyVolumeAnswer(cmd, true, null, primaryStorage.getMountPoint(), vol.getPath());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public UploadStatusAnswer getUploadStatus(UploadStatusCommand cmd) {
|
||||
return new UploadStatusAnswer(cmd, UploadStatus.UNKNOWN);
|
||||
}
|
||||
}
|
||||
|
||||
@ -35,6 +35,7 @@ import org.apache.cloudstack.storage.command.DeleteCommand;
|
||||
import org.apache.cloudstack.storage.command.DownloadCommand;
|
||||
import org.apache.cloudstack.storage.command.DownloadProgressCommand;
|
||||
import org.apache.cloudstack.storage.command.StorageSubSystemCommand;
|
||||
import org.apache.cloudstack.storage.command.UploadStatusCommand;
|
||||
|
||||
import com.cloud.agent.api.Answer;
|
||||
import com.cloud.agent.api.AttachIsoCommand;
|
||||
@ -371,6 +372,8 @@ public class SimulatorManagerImpl extends ManagerBase implements SimulatorManage
|
||||
answer = _mockStorageMgr.ComputeChecksum((ComputeChecksumCommand)cmd);
|
||||
} else if (cmd instanceof CreatePrivateTemplateFromVolumeCommand) {
|
||||
answer = _mockStorageMgr.CreatePrivateTemplateFromVolume((CreatePrivateTemplateFromVolumeCommand)cmd);
|
||||
} else if (cmd instanceof UploadStatusCommand) {
|
||||
answer = _mockStorageMgr.getUploadStatus((UploadStatusCommand)cmd);
|
||||
} else if (cmd instanceof MaintainCommand) {
|
||||
answer = _mockAgentMgr.maintain((MaintainCommand)cmd);
|
||||
} else if (cmd instanceof GetVmStatsCommand) {
|
||||
|
||||
@ -264,4 +264,5 @@
|
||||
<property name="gslbServiceProviders" value="#{gslbServiceProvidersRegistry.registered}" />
|
||||
</bean>
|
||||
<bean id="certServiceImpl" class="org.apache.cloudstack.network.lb.CertServiceImpl" />
|
||||
<bean id="imageStoreUploadMonitorImpl" class="com.cloud.storage.ImageStoreUploadMonitorImpl" />
|
||||
</beans>
|
||||
|
||||
27
server/src/com/cloud/storage/ImageStoreUploadMonitor.java
Executable file
27
server/src/com/cloud/storage/ImageStoreUploadMonitor.java
Executable file
@ -0,0 +1,27 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
package com.cloud.storage;
|
||||
|
||||
import com.cloud.utils.component.Manager;
|
||||
|
||||
/**
|
||||
* Monitor upload progress of all entities.
|
||||
*
|
||||
*/
|
||||
public interface ImageStoreUploadMonitor extends Manager {
|
||||
|
||||
}
|
||||
250
server/src/com/cloud/storage/ImageStoreUploadMonitorImpl.java
Executable file
250
server/src/com/cloud/storage/ImageStoreUploadMonitorImpl.java
Executable file
@ -0,0 +1,250 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
package com.cloud.storage;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.ejb.Local;
|
||||
import javax.inject.Inject;
|
||||
import javax.naming.ConfigurationException;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
|
||||
import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
|
||||
import org.apache.cloudstack.engine.subsystem.api.storage.EndPoint;
|
||||
import org.apache.cloudstack.engine.subsystem.api.storage.EndPointSelector;
|
||||
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
|
||||
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
|
||||
import org.apache.cloudstack.storage.command.UploadStatusAnswer;
|
||||
import org.apache.cloudstack.storage.command.UploadStatusCommand;
|
||||
import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreDao;
|
||||
import org.apache.cloudstack.storage.datastore.db.VolumeDataStoreVO;
|
||||
import org.apache.cloudstack.utils.identity.ManagementServerNode;
|
||||
|
||||
import com.cloud.agent.Listener;
|
||||
import com.cloud.agent.api.AgentControlAnswer;
|
||||
import com.cloud.agent.api.AgentControlCommand;
|
||||
import com.cloud.agent.api.Answer;
|
||||
import com.cloud.agent.api.Command;
|
||||
import com.cloud.agent.api.StartupCommand;
|
||||
import com.cloud.exception.ConnectionException;
|
||||
import com.cloud.host.Host;
|
||||
import com.cloud.host.Status;
|
||||
import com.cloud.host.dao.HostDao;
|
||||
import com.cloud.storage.Volume.Event;
|
||||
import com.cloud.storage.dao.VolumeDao;
|
||||
import com.cloud.utils.component.ManagerBase;
|
||||
import com.cloud.utils.concurrency.NamedThreadFactory;
|
||||
import com.cloud.utils.db.Transaction;
|
||||
import com.cloud.utils.db.TransactionCallbackNoReturn;
|
||||
import com.cloud.utils.db.TransactionStatus;
|
||||
import com.cloud.utils.fsm.NoTransitionException;
|
||||
import com.cloud.utils.fsm.StateMachine2;
|
||||
|
||||
/**
|
||||
* Monitors the progress of upload.
|
||||
*/
|
||||
@Component
|
||||
@Local(value = {ImageStoreUploadMonitor.class})
|
||||
public class ImageStoreUploadMonitorImpl extends ManagerBase implements ImageStoreUploadMonitor, Listener {
|
||||
|
||||
static final Logger s_logger = Logger.getLogger(ImageStoreUploadMonitorImpl.class);
|
||||
|
||||
@Inject
|
||||
private ConfigurationDao _configDao;
|
||||
@Inject
|
||||
private VolumeDao _volumeDao;
|
||||
@Inject
|
||||
private VolumeDataStoreDao _volumeDataStoreDao;
|
||||
@Inject
|
||||
private HostDao _hostDao;
|
||||
@Inject
|
||||
private EndPointSelector _epSelector;
|
||||
@Inject
|
||||
private DataStoreManager storeMgr;
|
||||
|
||||
private long _nodeId;
|
||||
private ScheduledExecutorService _executor = null;
|
||||
private int _monitoringInterval;
|
||||
private long _uploadOperationTimeout;
|
||||
|
||||
@Override
|
||||
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
|
||||
_executor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Upload-Monitor"));
|
||||
_monitoringInterval = 60;
|
||||
_uploadOperationTimeout = 10 * 60 * 1000; // 10 minutes
|
||||
_nodeId = ManagementServerNode.getManagementServerId();
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean start() {
|
||||
_executor.scheduleWithFixedDelay(new UploadStatusCheck(), _monitoringInterval, _monitoringInterval, TimeUnit.SECONDS);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean stop() {
|
||||
_executor.shutdownNow();
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean processAnswers(long agentId, long seq, Answer[] answers) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean processCommands(long agentId, long seq, Command[] commands) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AgentControlAnswer processControlCommand(long agentId, AgentControlCommand cmd) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean processDisconnect(long agentId, Status state) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRecurring() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTimeout() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean processTimeout(long agentId, long seq) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processConnect(Host host, StartupCommand cmd, boolean forRebalance) throws ConnectionException {
|
||||
}
|
||||
|
||||
protected class UploadStatusCheck extends ManagedContextRunnable {
|
||||
|
||||
public UploadStatusCheck() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void runInContext() {
|
||||
// 1. Select all entries with download_state = Not_Downloaded or Download_In_Progress
|
||||
// 2. Get corresponding volume
|
||||
// 3. Get EP using _epSelector
|
||||
// 4. Check if SSVM is owned by this MS
|
||||
// 5. If owned by MS then send command to appropriate SSVM
|
||||
// 6. In listener check for the answer and update DB accordingly
|
||||
List<VolumeDataStoreVO> volumeDataStores = _volumeDataStoreDao.listByVolumeState(Volume.State.NotUploaded, Volume.State.UploadInProgress);
|
||||
for (VolumeDataStoreVO volumeDataStore : volumeDataStores) {
|
||||
DataStore dataStore = storeMgr.getDataStore(volumeDataStore.getDataStoreId(), DataStoreRole.Image);
|
||||
EndPoint ep = _epSelector.select(dataStore, volumeDataStore.getExtractUrl());
|
||||
if (ep == null) {
|
||||
s_logger.warn("There is no secondary storage VM for image store " + dataStore.getName());
|
||||
continue;
|
||||
}
|
||||
VolumeVO volume = _volumeDao.findById(volumeDataStore.getVolumeId());
|
||||
if (volume == null) {
|
||||
s_logger.warn("Volume with id " + volumeDataStore.getVolumeId() + " not found");
|
||||
continue;
|
||||
}
|
||||
Host host = _hostDao.findById(ep.getId());
|
||||
if (host != null && host.getManagementServerId() != null && _nodeId == host.getManagementServerId().longValue()) {
|
||||
UploadStatusCommand cmd = new UploadStatusCommand(volume.getId());
|
||||
Answer answer = ep.sendMessage(cmd);
|
||||
if (answer == null || !(answer instanceof UploadStatusAnswer)) {
|
||||
s_logger.warn("No or invalid answer corresponding to UploadStatusCommand for volume " + volumeDataStore.getVolumeId());
|
||||
continue;
|
||||
}
|
||||
handleStatusResponse((UploadStatusAnswer)answer, volume, volumeDataStore);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void handleStatusResponse(final UploadStatusAnswer answer, final VolumeVO volume, final VolumeDataStoreVO volumeDataStore) {
|
||||
final StateMachine2<Volume.State, Event, Volume> stateMachine = Volume.State.getStateMachine();
|
||||
Transaction.execute(new TransactionCallbackNoReturn() {
|
||||
@Override
|
||||
public void doInTransactionWithoutResult(TransactionStatus status) {
|
||||
VolumeVO tmpVolume = _volumeDao.findById(volume.getId());
|
||||
VolumeDataStoreVO tmpVolumeDataStore = _volumeDataStoreDao.findById(volumeDataStore.getId());
|
||||
try {
|
||||
switch (answer.getStatus()) {
|
||||
case COMPLETED:
|
||||
tmpVolumeDataStore.setDownloadState(VMTemplateStorageResourceAssoc.Status.UPLOADED);
|
||||
stateMachine.transitTo(tmpVolume, Event.OperationSucceeded, null, _volumeDao);
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Volume " + tmpVolume.getUuid() + " uploaded successfully");
|
||||
}
|
||||
break;
|
||||
case IN_PROGRESS:
|
||||
tmpVolumeDataStore.setDownloadState(VMTemplateStorageResourceAssoc.Status.UPLOAD_IN_PROGRESS);
|
||||
// check for timeout
|
||||
if (tmpVolumeDataStore.getDownloadState() == VMTemplateStorageResourceAssoc.Status.NOT_UPLOADED ||
|
||||
tmpVolumeDataStore.getDownloadState() == VMTemplateStorageResourceAssoc.Status.UPLOAD_IN_PROGRESS) {
|
||||
if (System.currentTimeMillis() - tmpVolumeDataStore.getCreated().getTime() > _uploadOperationTimeout) {
|
||||
tmpVolumeDataStore.setDownloadState(VMTemplateStorageResourceAssoc.Status.ABANDONED);
|
||||
stateMachine.transitTo(tmpVolume, Event.OperationTimeout, null, _volumeDao);
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Volume " + tmpVolume.getUuid() + " failed to upload due to operation timed out");
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
case ERROR:
|
||||
tmpVolumeDataStore.setDownloadState(VMTemplateStorageResourceAssoc.Status.UPLOAD_ERROR);
|
||||
stateMachine.transitTo(tmpVolume, Event.OperationFailed, null, _volumeDao);
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Volume " + tmpVolume.getUuid() + " failed to upload. Error details: " + answer.getDetails());
|
||||
}
|
||||
break;
|
||||
case UNKNOWN:
|
||||
// check for timeout
|
||||
if (tmpVolumeDataStore.getDownloadState() == VMTemplateStorageResourceAssoc.Status.NOT_UPLOADED ||
|
||||
tmpVolumeDataStore.getDownloadState() == VMTemplateStorageResourceAssoc.Status.UPLOAD_IN_PROGRESS) {
|
||||
if (System.currentTimeMillis() - tmpVolumeDataStore.getCreated().getTime() > _uploadOperationTimeout) {
|
||||
tmpVolumeDataStore.setDownloadState(VMTemplateStorageResourceAssoc.Status.ABANDONED);
|
||||
stateMachine.transitTo(tmpVolume, Event.OperationTimeout, null, _volumeDao);
|
||||
if (s_logger.isDebugEnabled()) {
|
||||
s_logger.debug("Volume " + tmpVolume.getUuid() + " failed to upload due to operation timed out");
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
_volumeDataStoreDao.update(tmpVolumeDataStore.getId(), tmpVolumeDataStore);
|
||||
} catch (NoTransitionException e) {
|
||||
s_logger.error("Unexpected error " + e.getMessage());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@ -50,6 +50,7 @@ import com.cloud.utils.nio.HandlerFactory;
|
||||
import com.cloud.utils.nio.Link;
|
||||
import com.cloud.utils.nio.NioServer;
|
||||
import com.cloud.utils.nio.Task;
|
||||
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
@ -70,6 +71,9 @@ import org.apache.cloudstack.storage.command.CopyCommand;
|
||||
import org.apache.cloudstack.storage.command.DeleteCommand;
|
||||
import org.apache.cloudstack.storage.command.DownloadCommand;
|
||||
import org.apache.cloudstack.storage.command.DownloadProgressCommand;
|
||||
import org.apache.cloudstack.storage.command.UploadStatusAnswer;
|
||||
import org.apache.cloudstack.storage.command.UploadStatusAnswer.UploadStatus;
|
||||
import org.apache.cloudstack.storage.command.UploadStatusCommand;
|
||||
import org.apache.cloudstack.storage.template.DownloadManager;
|
||||
import org.apache.cloudstack.storage.template.DownloadManagerImpl;
|
||||
import org.apache.cloudstack.storage.template.DownloadManagerImpl.ZfsPathParser;
|
||||
@ -238,6 +242,8 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
|
||||
return execute((CopyCommand)cmd);
|
||||
} else if (cmd instanceof DeleteCommand) {
|
||||
return execute((DeleteCommand)cmd);
|
||||
} else if (cmd instanceof UploadStatusCommand) {
|
||||
return execute((UploadStatusCommand)cmd);
|
||||
} else {
|
||||
return Answer.createUnsupportedCommandAnswer(cmd);
|
||||
}
|
||||
@ -1616,6 +1622,10 @@ public class NfsSecondaryStorageResource extends ServerResourceBase implements S
|
||||
return new Answer(cmd, success, result);
|
||||
}
|
||||
|
||||
private UploadStatusAnswer execute(UploadStatusCommand cmd) {
|
||||
return new UploadStatusAnswer(cmd, UploadStatus.COMPLETED);
|
||||
}
|
||||
|
||||
protected GetStorageStatsAnswer execute(final GetStorageStatsCommand cmd) {
|
||||
DataStoreTO store = cmd.getStore();
|
||||
if (store instanceof S3TO || store instanceof SwiftTO) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user