CLOUDSTACK-7946:

remove leftover state in volume and snapshot table in case of mgt server
shutdown during storage operation.
Reviewed-by: Min
This commit is contained in:
Edison Su 2014-11-11 16:02:20 -08:00
parent c05cda0d28
commit 67113ff0b2
17 changed files with 147 additions and 20 deletions

View File

@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.cloudstack.engine.subsystem.api.storage.DataObject; import org.apache.cloudstack.engine.subsystem.api.storage.DataObject;
import org.apache.cloudstack.engine.subsystem.api.storage.DataStore; import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo; import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo;
import org.apache.cloudstack.framework.config.ConfigKey;
import com.cloud.agent.api.to.VirtualMachineTO; import com.cloud.agent.api.to.VirtualMachineTO;
import com.cloud.dc.DataCenter; import com.cloud.dc.DataCenter;
@ -46,7 +47,6 @@ import com.cloud.utils.fsm.NoTransitionException;
import com.cloud.vm.DiskProfile; import com.cloud.vm.DiskProfile;
import com.cloud.vm.VirtualMachine; import com.cloud.vm.VirtualMachine;
import com.cloud.vm.VirtualMachineProfile; import com.cloud.vm.VirtualMachineProfile;
import org.apache.cloudstack.framework.config.ConfigKey;
/** /**
* VolumeOrchestrationService is a PURE orchestration service on CloudStack * VolumeOrchestrationService is a PURE orchestration service on CloudStack
@ -86,6 +86,8 @@ public interface VolumeOrchestrationService {
Volume migrateVolume(Volume volume, StoragePool destPool) throws StorageUnavailableException; Volume migrateVolume(Volume volume, StoragePool destPool) throws StorageUnavailableException;
void cleanupStorageJobs();
void destroyVolume(Volume volume); void destroyVolume(Volume volume);
DiskProfile allocateRawVolume(Type type, String name, DiskOffering offering, Long size, Long minIops, Long maxIops, VirtualMachine vm, VirtualMachineTemplate template, Account owner); DiskProfile allocateRawVolume(Type type, String name, DiskOffering offering, Long size, Long minIops, Long maxIops, VirtualMachine vm, VirtualMachineTemplate template, Account owner);

View File

@ -27,4 +27,6 @@ public interface SnapshotService {
boolean revertSnapshot(Long snapshotId); boolean revertSnapshot(Long snapshotId);
void syncVolumeSnapshotsToRegionStore(long volumeId, DataStore store); void syncVolumeSnapshotsToRegionStore(long volumeId, DataStore store);
void cleanupVolumeDuringSnapshotFailure(Long volumeId, Long snapshotId);
} }

View File

@ -14,9 +14,7 @@
// KIND, either express or implied. See the License for the // KIND, either express or implied. See the License for the
// specific language governing permissions and limitations // specific language governing permissions and limitations
// under the License. // under the License.
package com.cloud.storage; package com.cloud.vm;
import com.cloud.vm.VmWork;
public class VmWorkAttachVolume extends VmWork { public class VmWorkAttachVolume extends VmWork {
private static final long serialVersionUID = 553291814854451740L; private static final long serialVersionUID = 553291814854451740L;

View File

@ -14,9 +14,7 @@
// KIND, either express or implied. See the License for the // KIND, either express or implied. See the License for the
// specific language governing permissions and limitations // specific language governing permissions and limitations
// under the License. // under the License.
package com.cloud.storage; package com.cloud.vm;
import com.cloud.vm.VmWork;
public class VmWorkDetachVolume extends VmWork { public class VmWorkDetachVolume extends VmWork {
private static final long serialVersionUID = -8722243207385263101L; private static final long serialVersionUID = -8722243207385263101L;

View File

@ -14,9 +14,7 @@
// KIND, either express or implied. See the License for the // KIND, either express or implied. See the License for the
// specific language governing permissions and limitations // specific language governing permissions and limitations
// under the License. // under the License.
package com.cloud.storage; package com.cloud.vm;
import com.cloud.vm.VmWork;
public class VmWorkMigrateVolume extends VmWork { public class VmWorkMigrateVolume extends VmWork {
private static final long serialVersionUID = -565778516928408602L; private static final long serialVersionUID = -565778516928408602L;

View File

@ -14,9 +14,7 @@
// KIND, either express or implied. See the License for the // KIND, either express or implied. See the License for the
// specific language governing permissions and limitations // specific language governing permissions and limitations
// under the License. // under the License.
package com.cloud.storage; package com.cloud.vm;
import com.cloud.vm.VmWork;
public class VmWorkResizeVolume extends VmWork { public class VmWorkResizeVolume extends VmWork {
private static final long serialVersionUID = 6112366316907642498L; private static final long serialVersionUID = 6112366316907642498L;

View File

@ -14,9 +14,7 @@
// KIND, either express or implied. See the License for the // KIND, either express or implied. See the License for the
// specific language governing permissions and limitations // specific language governing permissions and limitations
// under the License. // under the License.
package com.cloud.storage; package com.cloud.vm;
import com.cloud.vm.VmWork;
public class VmWorkTakeVolumeSnapshot extends VmWork { public class VmWorkTakeVolumeSnapshot extends VmWork {

View File

@ -566,6 +566,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
_executor.scheduleAtFixedRate(new TransitionTask(), VmOpCleanupInterval.value(), VmOpCleanupInterval.value(), TimeUnit.SECONDS); _executor.scheduleAtFixedRate(new TransitionTask(), VmOpCleanupInterval.value(), VmOpCleanupInterval.value(), TimeUnit.SECONDS);
cancelWorkItems(_nodeId); cancelWorkItems(_nodeId);
volumeMgr.cleanupStorageJobs();
// cleanup left over place holder works // cleanup left over place holder works
_workJobDao.expungeLeftoverWorkJobs(ManagementServerNode.getManagementServerId()); _workJobDao.expungeLeftoverWorkJobs(ManagementServerNode.getManagementServerId());
return true; return true;

View File

@ -54,6 +54,8 @@ import org.apache.cloudstack.framework.async.AsyncCallFuture;
import org.apache.cloudstack.framework.config.ConfigDepot; import org.apache.cloudstack.framework.config.ConfigDepot;
import org.apache.cloudstack.framework.config.ConfigKey; import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.framework.config.Configurable; import org.apache.cloudstack.framework.config.Configurable;
import org.apache.cloudstack.framework.jobs.AsyncJobManager;
import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
import org.apache.cloudstack.storage.command.CommandResult; import org.apache.cloudstack.storage.command.CommandResult;
import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao; import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
import org.apache.cloudstack.storage.datastore.db.StoragePoolVO; import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
@ -64,6 +66,7 @@ import com.cloud.agent.api.to.DataTO;
import com.cloud.agent.api.to.DiskTO; import com.cloud.agent.api.to.DiskTO;
import com.cloud.agent.api.to.VirtualMachineTO; import com.cloud.agent.api.to.VirtualMachineTO;
import com.cloud.agent.manager.allocator.PodAllocator; import com.cloud.agent.manager.allocator.PodAllocator;
import com.cloud.cluster.ClusterManager;
import com.cloud.configuration.Resource.ResourceType; import com.cloud.configuration.Resource.ResourceType;
import com.cloud.dc.DataCenter; import com.cloud.dc.DataCenter;
import com.cloud.dc.Pod; import com.cloud.dc.Pod;
@ -117,6 +120,10 @@ import com.cloud.vm.VirtualMachine;
import com.cloud.vm.VirtualMachine.State; import com.cloud.vm.VirtualMachine.State;
import com.cloud.vm.VirtualMachineProfile; import com.cloud.vm.VirtualMachineProfile;
import com.cloud.vm.VirtualMachineProfileImpl; import com.cloud.vm.VirtualMachineProfileImpl;
import com.cloud.vm.VmWorkAttachVolume;
import com.cloud.vm.VmWorkMigrateVolume;
import com.cloud.vm.VmWorkSerializer;
import com.cloud.vm.VmWorkTakeVolumeSnapshot;
import com.cloud.vm.dao.UserVmDao; import com.cloud.vm.dao.UserVmDao;
public class VolumeOrchestrator extends ManagerBase implements VolumeOrchestrationService, Configurable { public class VolumeOrchestrator extends ManagerBase implements VolumeOrchestrationService, Configurable {
@ -156,6 +163,10 @@ public class VolumeOrchestrator extends ManagerBase implements VolumeOrchestrati
SnapshotService _snapshotSrv; SnapshotService _snapshotSrv;
@Inject @Inject
protected UserVmDao _userVmDao; protected UserVmDao _userVmDao;
@Inject
protected AsyncJobManager _jobMgr;
@Inject
ClusterManager clusterManager;
private final StateMachine2<Volume.State, Volume.Event, Volume> _volStateMachine; private final StateMachine2<Volume.State, Volume.Event, Volume> _volStateMachine;
protected List<StoragePoolAllocator> _storagePoolAllocators; protected List<StoragePoolAllocator> _storagePoolAllocators;
@ -1355,9 +1366,71 @@ public class VolumeOrchestrator extends ManagerBase implements VolumeOrchestrati
return true; return true;
} }
private void cleanupVolumeDuringAttachFailure(Long volumeId) {
VolumeVO volume = _volsDao.findById(volumeId);
if (volume == null) {
return;
}
if (volume.getState().equals(Volume.State.Creating)) {
s_logger.debug("Remove volume: " + volume.getId() + ", as it's leftover from last mgt server stop");
_volsDao.remove(volume.getId());
}
}
private void cleanupVolumeDuringMigrationFailure(Long volumeId, Long destPoolId) {
StoragePool destPool = (StoragePool)dataStoreMgr.getDataStore(destPoolId, DataStoreRole.Primary);
if (destPool == null) {
return;
}
VolumeVO volume = _volsDao.findById(volumeId);
if (volume.getState() == Volume.State.Migrating) {
VolumeVO duplicateVol = _volsDao.findByPoolIdName(destPoolId, volume.getName());
if (duplicateVol != null) {
s_logger.debug("Remove volume " + duplicateVol.getId() + " on storage pool " + destPoolId);
_volsDao.remove(duplicateVol.getId());
}
s_logger.debug("change volume state to ready from migrating in case migration failure for vol: " + volumeId);
volume.setState(Volume.State.Ready);
_volsDao.update(volumeId, volume);
}
}
private void cleanupVolumeDuringSnapshotFailure(Long volumeId, Long snapshotId) {
_snapshotSrv.cleanupVolumeDuringSnapshotFailure(volumeId, snapshotId);
VolumeVO volume = _volsDao.findById(volumeId);
if (volume.getState() == Volume.State.Snapshotting) {
s_logger.debug("change volume state back to Ready: " + volume.getId());
volume.setState(Volume.State.Ready);
_volsDao.update(volume.getId(), volume);
}
}
@Override @Override
public boolean start() { public void cleanupStorageJobs() {
return true; //clean up failure jobs related to volume
List<AsyncJobVO> jobs = _jobMgr.findFailureAsyncJobs(VmWorkAttachVolume.class.getName(),
VmWorkMigrateVolume.class.getName(), VmWorkTakeVolumeSnapshot.class.getName());
for (AsyncJobVO job : jobs) {
try {
if (job.getCmd().equalsIgnoreCase(VmWorkAttachVolume.class.getName())) {
VmWorkAttachVolume work = VmWorkSerializer.deserialize(VmWorkAttachVolume.class, job.getCmdInfo());
cleanupVolumeDuringAttachFailure(work.getVolumeId());
} else if (job.getCmd().equalsIgnoreCase(VmWorkMigrateVolume.class.getName())) {
VmWorkMigrateVolume work = VmWorkSerializer.deserialize(VmWorkMigrateVolume.class, job.getCmdInfo());
cleanupVolumeDuringMigrationFailure(work.getVolumeId(), work.getDestPoolId());
} else if (job.getCmd().equalsIgnoreCase(VmWorkTakeVolumeSnapshot.class.getName())) {
VmWorkTakeVolumeSnapshot work = VmWorkSerializer.deserialize(VmWorkTakeVolumeSnapshot.class, job.getCmdInfo());
cleanupVolumeDuringSnapshotFailure(work.getVolumeId(), work.getSnapshotId());
}
} catch (Exception e) {
s_logger.debug("clean up job failure, will continue", e);
}
}
} }
@Override @Override

View File

@ -61,6 +61,8 @@ public interface VolumeDao extends GenericDao<VolumeVO, Long>, StateDao<Volume.S
List<VolumeVO> findByPoolId(long poolId); List<VolumeVO> findByPoolId(long poolId);
VolumeVO findByPoolIdName(long poolId, String name);
List<VolumeVO> findByPoolId(long poolId, Volume.Type volumeType); List<VolumeVO> findByPoolId(long poolId, Volume.Type volumeType);
List<VolumeVO> findByInstanceAndDeviceId(long instanceId, long deviceId); List<VolumeVO> findByInstanceAndDeviceId(long instanceId, long deviceId);

View File

@ -125,6 +125,14 @@ public class VolumeDaoImpl extends GenericDaoBase<VolumeVO, Long> implements Vol
return listBy(sc); return listBy(sc);
} }
@Override
public VolumeVO findByPoolIdName(long poolId, String name) {
SearchCriteria<VolumeVO> sc = AllFieldsSearch.create();
sc.setParameters("poolId", poolId);
sc.setParameters("name", name);
return findOneBy(sc);
}
@Override @Override
public List<VolumeVO> findByPoolId(long poolId, Volume.Type volumeType) { public List<VolumeVO> findByPoolId(long poolId, Volume.Type volumeType) {
SearchCriteria<VolumeVO> sc = AllFieldsSearch.create(); SearchCriteria<VolumeVO> sc = AllFieldsSearch.create();
@ -306,6 +314,7 @@ public class VolumeDaoImpl extends GenericDaoBase<VolumeVO, Long> implements Vol
AllFieldsSearch.and("destroyed", AllFieldsSearch.entity().getState(), Op.EQ); AllFieldsSearch.and("destroyed", AllFieldsSearch.entity().getState(), Op.EQ);
AllFieldsSearch.and("notDestroyed", AllFieldsSearch.entity().getState(), Op.NEQ); AllFieldsSearch.and("notDestroyed", AllFieldsSearch.entity().getState(), Op.NEQ);
AllFieldsSearch.and("updatedCount", AllFieldsSearch.entity().getUpdatedCount(), Op.EQ); AllFieldsSearch.and("updatedCount", AllFieldsSearch.entity().getUpdatedCount(), Op.EQ);
AllFieldsSearch.and("name", AllFieldsSearch.entity().getName(), Op.EQ);
AllFieldsSearch.done(); AllFieldsSearch.done();
DetachedAccountIdSearch = createSearchBuilder(); DetachedAccountIdSearch = createSearchBuilder();

View File

@ -446,6 +446,28 @@ public class SnapshotServiceImpl implements SnapshotService {
} }
} }
@Override
public void cleanupVolumeDuringSnapshotFailure(Long volumeId, Long snapshotId) {
SnapshotVO snaphsot = _snapshotDao.findById(snapshotId);
if (snaphsot != null) {
if (snaphsot.getState() != Snapshot.State.BackedUp) {
List<SnapshotDataStoreVO> snapshotDataStoreVOs = _snapshotStoreDao.findBySnapshotId(snapshotId);
for (SnapshotDataStoreVO snapshotDataStoreVO : snapshotDataStoreVOs) {
s_logger.debug("Remove snapshot " + snapshotId + ", status " + snapshotDataStoreVO.getState() +
" on snapshot_store_ref table with id: " + snapshotDataStoreVO.getId());
_snapshotStoreDao.remove(snapshotDataStoreVO.getId());
}
s_logger.debug("Remove snapshot " + snapshotId + " status " + snaphsot.getState() + " from snapshot table");
_snapshotDao.remove(snapshotId);
}
}
}
// push one individual snapshots currently on cache store to region store if it is not there already // push one individual snapshots currently on cache store to region store if it is not there already
private void syncSnapshotToRegionStore(long snapshotId, DataStore store){ private void syncSnapshotToRegionStore(long snapshotId, DataStore store){
// if snapshot is already on region wide object store, check if it is really downloaded there (by checking install_path). Sync snapshot to region // if snapshot is already on region wide object store, check if it is really downloaded there (by checking install_path). Sync snapshot to region

View File

@ -41,12 +41,13 @@ import javax.ejb.Local;
import javax.inject.Inject; import javax.inject.Inject;
import javax.naming.ConfigurationException; import javax.naming.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.cloudstack.framework.config.ConfigDepot; import org.apache.cloudstack.framework.config.ConfigDepot;
import org.apache.cloudstack.framework.config.ConfigKey; import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.framework.config.Configurable; import org.apache.cloudstack.framework.config.Configurable;
import org.apache.cloudstack.managed.context.ManagedContextRunnable; import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.cloudstack.utils.identity.ManagementServerNode; import org.apache.cloudstack.utils.identity.ManagementServerNode;
import org.apache.log4j.Logger;
import com.cloud.cluster.dao.ManagementServerHostDao; import com.cloud.cluster.dao.ManagementServerHostDao;
import com.cloud.cluster.dao.ManagementServerHostPeerDao; import com.cloud.cluster.dao.ManagementServerHostPeerDao;
@ -958,7 +959,6 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
mshost.setAlertCount(0); mshost.setAlertCount(0);
mshost.setState(ManagementServerHost.State.Up); mshost.setState(ManagementServerHost.State.Up);
_mshostDao.persist(mshost); _mshostDao.persist(mshost);
if (s_logger.isInfoEnabled()) { if (s_logger.isInfoEnabled()) {
s_logger.info("New instance of management server msid " + _msId + ", runId " + _runId + " is being started"); s_logger.info("New instance of management server msid " + _msId + ", runId " + _runId + " is being started");
} }

View File

@ -129,4 +129,6 @@ public interface AsyncJobManager extends Manager {
String marshallResultObject(Serializable obj); String marshallResultObject(Serializable obj);
Object unmarshallResultObject(AsyncJob job); Object unmarshallResultObject(AsyncJob job);
List<AsyncJobVO> findFailureAsyncJobs(String... cmds);
} }

View File

@ -41,4 +41,6 @@ public interface AsyncJobDao extends GenericDao<AsyncJobVO, Long> {
List<AsyncJobVO> getExpiredCompletedJobs(Date cutTime, int limit); List<AsyncJobVO> getExpiredCompletedJobs(Date cutTime, int limit);
List<AsyncJobVO> getResetJobs(long msid); List<AsyncJobVO> getResetJobs(long msid);
List<AsyncJobVO> getFailureJobsSinceLastMsStart(long msId, String... cmds);
} }

View File

@ -44,6 +44,7 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
private final SearchBuilder<AsyncJobVO> pseudoJobCleanupSearch; private final SearchBuilder<AsyncJobVO> pseudoJobCleanupSearch;
private final SearchBuilder<AsyncJobVO> expiringUnfinishedAsyncJobSearch; private final SearchBuilder<AsyncJobVO> expiringUnfinishedAsyncJobSearch;
private final SearchBuilder<AsyncJobVO> expiringCompletedAsyncJobSearch; private final SearchBuilder<AsyncJobVO> expiringCompletedAsyncJobSearch;
private final SearchBuilder<AsyncJobVO> failureMsidAsyncJobSearch;
public AsyncJobDaoImpl() { public AsyncJobDaoImpl() {
pendingAsyncJobSearch = createSearchBuilder(); pendingAsyncJobSearch = createSearchBuilder();
@ -84,6 +85,13 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
pseudoJobCleanupSearch.and("initMsid", pseudoJobCleanupSearch.entity().getInitMsid(), Op.EQ); pseudoJobCleanupSearch.and("initMsid", pseudoJobCleanupSearch.entity().getInitMsid(), Op.EQ);
pseudoJobCleanupSearch.done(); pseudoJobCleanupSearch.done();
failureMsidAsyncJobSearch = createSearchBuilder();
failureMsidAsyncJobSearch.and("initMsid", failureMsidAsyncJobSearch.entity().getInitMsid(), Op.EQ);
failureMsidAsyncJobSearch.and("instanceType", failureMsidAsyncJobSearch.entity().getInstanceType(), SearchCriteria.Op.EQ);
failureMsidAsyncJobSearch.and("status", failureMsidAsyncJobSearch.entity().getStatus(), SearchCriteria.Op.EQ);
failureMsidAsyncJobSearch.and("job_cmd", failureMsidAsyncJobSearch.entity().getCmd(), Op.IN);
failureMsidAsyncJobSearch.done();
} }
@Override @Override
@ -206,4 +214,13 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
return listIncludingRemovedBy(sc, filter); return listIncludingRemovedBy(sc, filter);
} }
@Override
public List<AsyncJobVO> getFailureJobsSinceLastMsStart(long msId, String... cmds) {
SearchCriteria<AsyncJobVO> sc = failureMsidAsyncJobSearch.create();
sc.setParameters("initMsid", msId);
sc.setParameters("status", AsyncJobVO.Status.FAILED);
sc.setParameters("job_cmd", (Object[])cmds);
return listBy(sc);
}
} }

View File

@ -1055,4 +1055,9 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
_messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL, _messageBus.publish(null, AsyncJob.Topics.JOB_EVENT_PUBLISH, PublishScope.LOCAL,
new Pair<AsyncJob, String>(job, jobEvent)); new Pair<AsyncJob, String>(job, jobEvent));
} }
@Override
public List<AsyncJobVO> findFailureAsyncJobs(String... cmds) {
return _jobDao.getFailureJobsSinceLastMsStart(getMsid(), cmds);
}
} }