more changes to make it work better in a cluster

This commit is contained in:
Alex Huang 2011-02-09 15:14:15 -08:00
parent db7bc893b9
commit b0f0efa29b
5 changed files with 92 additions and 113 deletions

View File

@ -88,8 +88,6 @@ public interface VirtualMachine extends RunningOn, ControlledEntity, StateObject
s_fsm.addTransition(State.Migrating, VirtualMachine.Event.MigrationRequested, State.Migrating);
s_fsm.addTransition(State.Migrating, VirtualMachine.Event.OperationSucceeded, State.Running);
s_fsm.addTransition(State.Migrating, VirtualMachine.Event.OperationFailed, State.Running);
s_fsm.addTransition(State.Migrating, VirtualMachine.Event.MigrationFailedOnSource, State.Running);
s_fsm.addTransition(State.Migrating, VirtualMachine.Event.MigrationFailedOnDest, State.Running);
s_fsm.addTransition(State.Migrating, VirtualMachine.Event.AgentReportRunning, State.Running);
s_fsm.addTransition(State.Migrating, VirtualMachine.Event.AgentReportStopped, State.Stopped);
s_fsm.addTransition(State.Migrating, VirtualMachine.Event.AgentReportShutdowned, State.Stopped);
@ -139,10 +137,7 @@ public interface VirtualMachine extends RunningOn, ControlledEntity, StateObject
ExpungeOperation,
OperationSucceeded,
OperationFailed,
MigrationFailedOnSource,
MigrationFailedOnDest,
OperationRetry,
OperationCancelled,
AgentReportShutdowned
};

View File

@ -1206,6 +1206,12 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory,
return null;
}
@DB
protected boolean noDbTxn() {
Transaction txn = Transaction.currentTxn();
return !txn.dbTxnStarted();
}
@Override
public Answer[] send(Long hostId, Commands commands, int timeout)
@ -1214,6 +1220,8 @@ public class AgentManagerImpl implements AgentManager, HandlerFactory,
if (hostId == null) {
throw new AgentUnavailableException(-1);
}
assert noDbTxn() : "I know, I know. Why are we so strict as to not allow txn across an agent call? ... Why are we so cruel ... Why are we such a dictator .... Too bad... Sorry...but NO AGENT COMMANDS WRAPPED WITHIN DB TRANSACTIONS!";
Command[] cmds = commands.toCommands();

View File

@ -19,6 +19,8 @@ package com.cloud.vm;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.Id;
import javax.persistence.Table;
@ -30,15 +32,17 @@ import com.cloud.vm.VirtualMachine.State;
public class ItWorkVO {
enum ResourceType {
Volume,
Nic
Nic,
Host
}
enum Step {
Prepare,
Start,
Starting,
Started,
Release,
Done
Done,
Migrating
}
@Id
@ -76,6 +80,13 @@ public class ItWorkVO {
@Column(name="resource_type")
ResourceType resourceType;
@Column(name="vm_type")
@Enumerated(value=EnumType.STRING)
VirtualMachine.Type vmType;
public VirtualMachine.Type getVmType() {
return vmType;
}
public long getResourceId() {
return resourceId;
@ -96,7 +107,7 @@ public class ItWorkVO {
protected ItWorkVO() {
}
protected ItWorkVO(String id, long managementServerId, State type, long instanceId) {
protected ItWorkVO(String id, long managementServerId, State type, VirtualMachine.Type vmType, long instanceId) {
this.id = id;
this.managementServerId = managementServerId;
this.type = type;
@ -106,6 +117,7 @@ public class ItWorkVO {
this.resourceType = null;
this.createdAt = InaccurateClock.getTimeInSeconds();
this.updatedAt = createdAt;
this.vmType = vmType;
}
public String getId() {

View File

@ -125,7 +125,6 @@ import com.cloud.utils.db.DB;
import com.cloud.utils.db.GlobalLock;
import com.cloud.utils.db.Transaction;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.fsm.StateListener;
import com.cloud.utils.fsm.StateMachine2;
import com.cloud.vm.ItWorkVO.Step;
import com.cloud.vm.VirtualMachine.Event;
@ -261,60 +260,6 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
return vm;
}
protected void reserveNics(VirtualMachineProfile<? extends VMInstanceVO> vmProfile, DeployDestination dest, ReservationContext context) throws InsufficientCapacityException, ConcurrentOperationException, ResourceUnavailableException {
// List<NicVO> nics = _nicsDao.listBy(vmProfile.getId());
// for (NicVO nic : nics) {
// Pair<NetworkGuru, NetworkVO> implemented = _networkMgr.implementNetwork(nic.getNetworkId(), dest, context);
// NetworkGuru concierge = implemented.first();
// NetworkVO network = implemented.second();
// NicProfile profile = null;
// if (nic.getReservationStrategy() == ReservationStrategy.Start) {
// nic.setState(Resource.State.Reserving);
// nic.setReservationId(context.getReservationId());
// _nicsDao.update(nic.getId(), nic);
// URI broadcastUri = nic.getBroadcastUri();
// if (broadcastUri == null) {
// network.getBroadcastUri();
// }
//
// URI isolationUri = nic.getIsolationUri();
//
// profile = new NicProfile(nic, network, broadcastUri, isolationUri);
// concierge.reserve(profile, network, vmProfile, dest, context);
// nic.setIp4Address(profile.getIp4Address());
// nic.setIp6Address(profile.getIp6Address());
// nic.setMacAddress(profile.getMacAddress());
// nic.setIsolationUri(profile.getIsolationUri());
// nic.setBroadcastUri(profile.getBroadCastUri());
// nic.setReserver(concierge.getName());
// nic.setState(Resource.State.Reserved);
// nic.setNetmask(profile.getNetmask());
// nic.setGateway(profile.getGateway());
// nic.setAddressFormat(profile.getFormat());
// _nicsDao.update(nic.getId(), nic);
// } else {
// profile = new NicProfile(nic, network, nic.getBroadcastUri(), nic.getIsolationUri());
// }
//
// for (NetworkElement element : _networkElements) {
// if (s_logger.isDebugEnabled()) {
// s_logger.debug("Asking " + element.getName() + " to prepare for " + nic);
// }
// element.prepare(network, profile, vmProfile, dest, context);
// }
//
// vmProfile.addNic(profile);
// _networksDao.changeActiveNicsBy(network.getId(), 1);
// }
}
protected void prepareNics(VirtualMachineProfile<? extends VMInstanceVO> vmProfile, DeployDestination dest, ReservationContext context) {
}
@Override
public <T extends VMInstanceVO> T allocate(T vm,
VMTemplateVO template,
@ -353,7 +298,7 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
try {
if (advanceExpunge(vm, caller, account)) {
//Mark vms as removed
remove(vm, _accountMgr.getSystemUser(), account);
remove(vm, caller, account);
return true;
} else {
s_logger.info("Did not expunge " + vm);
@ -507,7 +452,7 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
protected <T extends VMInstanceVO> Ternary<T, ReservationContext, ItWorkVO> changeToStartState(VirtualMachineGuru<T> vmGuru, T vm, User caller, Account account) throws ConcurrentOperationException {
long vmId = vm.getId();
ItWorkVO work = new ItWorkVO(UUID.randomUUID().toString(), _nodeId, State.Starting, vm.getId());
ItWorkVO work = new ItWorkVO(UUID.randomUUID().toString(), _nodeId, State.Starting, vm.getType(), vm.getId());
int retry = _lockStateRetry;
while (retry-- != 0) {
Transaction txn = Transaction.currentTxn();
@ -643,7 +588,7 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
if (work == null || work.getStep() != Step.Prepare) {
throw new ConcurrentOperationException("Work steps have been changed: " + work);
}
_workDao.updateStep(work, Step.Start);
_workDao.updateStep(work, Step.Starting);
_agentMgr.send(destHostId, cmds);
_workDao.updateStep(work, Step.Started);
@ -749,12 +694,12 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
s_logger.debug("Cleaning up resources for the vm " + vm + " in " + state + " state");
if (state == State.Starting) {
Step step = work.getStep();
if (step == Step.Start && !force) {
if (step == Step.Starting && !force) {
s_logger.warn("Unable to cleanup vm " + vm + "; work state is incorrect: " + step);
return false;
}
if (step == Step.Started || step == Step.Start) {
if (step == Step.Started || step == Step.Starting) {
if (vm.getHostId() != null) {
if (!sendStop(guru, profile, force)) {
s_logger.warn("Failed to stop vm " + vm + " in " + State.Starting + " state as a part of cleanup process");
@ -763,7 +708,7 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
}
}
if (step != Step.Release && step != Step.Prepare && step != Step.Started && step != Step.Start) {
if (step != Step.Release && step != Step.Prepare && step != Step.Started && step != Step.Starting) {
s_logger.debug("Cleanup is not needed for vm " + vm + "; work state is incorrect: " + step);
return true;
}
@ -950,6 +895,14 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
return true;
}
protected boolean checkVmOnHost(VirtualMachine vm, long hostId) throws AgentUnavailableException, OperationTimedoutException {
CheckVirtualMachineAnswer answer = (CheckVirtualMachineAnswer)_agentMgr.send(hostId, new CheckVirtualMachineCommand(vm.getInstanceName()));
if (!answer.getResult() || answer.getState() == State.Stopped) {
return false;
}
return true;
}
@Override
public <T extends VMInstanceVO> T migrate(T vm, long srcHostId, DeployDestination dest) throws ResourceUnavailableException {
@ -964,7 +917,7 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
VirtualMachineGuru<T> vmGuru = getVmGuru(vm);
vm = vmGuru.findById(vm.getId());
if (vm == null || vm.getRemoved() != null) {
if (vm == null) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Unable to find the vm " + vm);
}
@ -986,69 +939,80 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
VirtualMachineTO to = hvGuru.implement(profile);
PrepareForMigrationCommand pfmc = new PrepareForMigrationCommand(to);
PrepareForMigrationAnswer pfma;
ItWorkVO work = new ItWorkVO(UUID.randomUUID().toString(), _nodeId, State.Migrating, vm.getType(), vm.getId());
work.setStep(Step.Prepare);
work.setResourceType(ItWorkVO.ResourceType.Host);
work.setResourceId(dstHostId);
_workDao.persist(work);
PrepareForMigrationAnswer pfma = null;
try {
pfma = (PrepareForMigrationAnswer)_agentMgr.send(dstHostId, pfmc);
if (!pfma.getResult()) {
String msg = "Unable to prepare for migration due to " + pfma.getDetails();
pfma = null;
throw new AgentUnavailableException(msg, dstHostId);
}
} catch (OperationTimedoutException e1) {
throw new AgentUnavailableException("Operation timed out", dstHostId);
} finally {
if (pfma == null) {
work.setStep(Step.Done);
_workDao.update(work.getId(), work);
}
}
if (!pfma.getResult()) {
throw new AgentUnavailableException(pfma.getDetails(), dstHostId);
}
vm.setLastHostId(srcHostId);
if (vm == null || vm.getHostId() == null || vm.getHostId() != srcHostId || !changeState(vm, Event.MigrationRequested, dstHostId, work, Step.Migrating)) {
s_logger.info("Migration cancelled because state has changed: " + vm);
return null;
}
boolean migrated = false;
try {
vm.setLastHostId(srcHostId);
if (vm == null || vm.getRemoved() != null || vm.getHostId() == null || vm.getHostId() != srcHostId || !stateTransitTo(vm, Event.MigrationRequested, dstHostId)) {
s_logger.info("Migration cancelled because state has changed: " + vm);
return null;
}
boolean isWindows = _guestOsCategoryDao.findById(_guestOsDao.findById(vm.getGuestOSId()).getCategoryId()).getName().equalsIgnoreCase("Windows");
MigrateCommand mc = new MigrateCommand(vm.getInstanceName(), dest.getHost().getPrivateIpAddress(), isWindows);
MigrateAnswer ma = (MigrateAnswer)_agentMgr.send(vm.getLastHostId(), mc);
if (!ma.getResult()) {
return null;
try {
MigrateAnswer ma = (MigrateAnswer)_agentMgr.send(vm.getLastHostId(), mc);
if (!ma.getResult()) {
s_logger.error("Unable to migrate due to " + ma.getDetails());
return null;
}
} catch (OperationTimedoutException e) {
if (e.isActive()) {
s_logger.warn("Active migration command so scheduling a restart for " + vm);
_haMgr.scheduleRestart(vm, true);
}
throw new AgentUnavailableException("Operation timed out on migrating " + vm, dstHostId);
}
Commands cmds = new Commands(OnError.Revert);
CheckVirtualMachineCommand cvm = new CheckVirtualMachineCommand(vm.getInstanceName());
cmds.addCommand(cvm);
_agentMgr.send(dstHostId, cmds);
CheckVirtualMachineAnswer answer = cmds.getAnswer(CheckVirtualMachineAnswer.class);
if (!answer.getResult()) {
s_logger.debug("Unable to complete migration for " + vm.toString());
stateTransitTo(vm, VirtualMachine.Event.AgentReportStopped, null);
return null;
changeState(vm, VirtualMachine.Event.OperationSucceeded, dstHostId, work, Step.Started);
try {
if (!checkVmOnHost(vm, dstHostId)) {
s_logger.error("Unable to complete migration for " + vm);
_agentMgr.send(srcHostId, new Commands(cleanup(vm.getInstanceName())), null);
cleanup(vmGuru, new VirtualMachineProfileImpl<T>(vm), work, Event.AgentReportStopped, true, _accountMgr.getSystemUser(), _accountMgr.getSystemAccount());
return null;
}
} catch (OperationTimedoutException e) {
}
State state = answer.getState();
if (state == State.Stopped) {
s_logger.warn("Unable to complete migration as we can not detect it on " + dest.getHost());
stateTransitTo(vm, VirtualMachine.Event.AgentReportStopped, null);
return null;
}
stateTransitTo(vm, VirtualMachine.Event.OperationSucceeded, dstHostId);
migrated = true;
return vm;
} catch (final OperationTimedoutException e) {
s_logger.debug("operation timed out");
if (e.isActive()) {
// FIXME: scheduleRestart(vm, true);
}
throw new AgentUnavailableException("Operation timed out: ", dstHostId);
} finally {
if (!migrated) {
s_logger.info("Migration was unsuccessful. Cleaning up: " + vm);
_alertMgr.sendAlert(alertType, fromHost.getDataCenterId(), fromHost.getPodId(), "Unable to migrate vm " + vm.getName() + " from host " + fromHost.getName() + " in zone " + dest.getDataCenter().getName() + " and pod " + dest.getPod().getName(), "Migrate Command failed. Please check logs.");
stateTransitTo(vm, Event.MigrationFailedOnSource, srcHostId);
_agentMgr.send(dstHostId, new Commands(cleanup(vm.getInstanceName())), null);
Command cleanup = cleanup(vm.getInstanceName());
_agentMgr.easySend(dstHostId, cleanup);
stateTransitTo(vm, Event.OperationFailed, srcHostId);
}
work.setStep(Step.Done);
_workDao.update(work.getId(), work);
}
}
@ -1199,7 +1163,6 @@ public class VirtualMachineManagerImpl implements VirtualMachineManager, Listene
return rebootedVm;
}
@Override
public VMInstanceVO findById(VirtualMachine.Type type, long vmId) {
VirtualMachineGuru<? extends VMInstanceVO> guru = _vmGurus.get(type);

View File

@ -93,11 +93,12 @@ DROP TABLE IF EXISTS `cloud`.`usage_event`;
DROP TABLE IF EXISTS `cloud`.`host_tags`;
CREATE TABLE `cloud`.`op_it_work` (
`id` char(40) COMMENT 'id',
`id` char(40) COMMENT 'reservation id',
`mgmt_server_id` bigint unsigned COMMENT 'management server id',
`created_at` bigint unsigned NOT NULL COMMENT 'when was this work detail created',
`thread` varchar(255) NOT NULL COMMENT 'thread name',
`type` char(32) NOT NULL COMMENT 'type of work',
`vm_type` char(32) NOT NULL COMMENT 'type of vm',
`step` char(32) NOT NULL COMMENT 'state',
`updated_at` bigint unsigned NOT NULL COMMENT 'time it was taken over',
`instance_id` bigint unsigned NOT NULL COMMENT 'vm instance',