From 3574d8d20be01b90ca09e58ab4b492604c1a8d1f Mon Sep 17 00:00:00 2001 From: dahn Date: Mon, 4 Oct 2021 17:21:21 +0200 Subject: [PATCH] parallel nic adding (#5541) * trace nics additions * work queue patch for network to add * add secondary key to job * logging improvements and naming of field(s) * several naming corrections * extra check if net already exists for vm * placeholder job with secondary object * constraint on entering the same job multiple times * error handling/warning message * review comments applied Co-authored-by: Daan Hoogland Co-authored-by: Wei Zhou --- .../cloud/vm/VirtualMachineManagerImpl.java | 93 ++++++++++++++----- .../META-INF/db/schema-41520to41600.sql | 3 + .../framework/jobs/dao/VmWorkJobDao.java | 2 + .../framework/jobs/dao/VmWorkJobDaoImpl.java | 15 +++ .../framework/jobs/impl/AsyncJobVO.java | 2 +- .../framework/jobs/impl/VmWorkJobVO.java | 24 +++++ .../java/com/cloud/vm/UserVmManagerImpl.java | 21 +++-- 7 files changed, 129 insertions(+), 31 deletions(-) diff --git a/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java index 3aa8a5026c0..5f6f883624a 100755 --- a/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java +++ b/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java @@ -42,6 +42,7 @@ import java.util.concurrent.TimeUnit; import javax.inject.Inject; import javax.naming.ConfigurationException; +import javax.persistence.EntityExistsException; import org.apache.cloudstack.affinity.dao.AffinityGroupVMMapDao; import org.apache.cloudstack.annotation.AnnotationService; @@ -3981,7 +3982,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac if (jobContext.isJobDispatchedBy(VmWorkConstants.VM_WORK_JOB_DISPATCHER)) { // avoid re-entrance VmWorkJobVO placeHolder = null; - placeHolder = createPlaceHolderWork(vm.getId()); + placeHolder = createPlaceHolderWork(vm.getId(), network.getUuid()); try { return orchestrateAddVmToNetwork(vm, network, requested); } finally { @@ -4021,10 +4022,23 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } } + /** + * duplicated in {@see UserVmManagerImpl} for a {@see UserVmVO} + */ + private void checkIfNetworkExistsForVM(VirtualMachine virtualMachine, Network network) { + List allNics = _nicsDao.listByVmId(virtualMachine.getId()); + for (NicVO nic : allNics) { + if (nic.getNetworkId() == network.getId()) { + throw new CloudRuntimeException("A NIC already exists for VM:" + virtualMachine.getInstanceName() + " in network: " + network.getUuid()); + } + } + } + private NicProfile orchestrateAddVmToNetwork(final VirtualMachine vm, final Network network, final NicProfile requested) throws ConcurrentOperationException, ResourceUnavailableException, InsufficientCapacityException { final CallContext cctx = CallContext.current(); + checkIfNetworkExistsForVM(vm, network); s_logger.debug("Adding vm " + vm + " to network " + network + "; requested nic profile " + requested); final VMInstanceVO vmVO = _vmDao.findById(vm.getId()); final ReservationContext context = new ReservationContextImpl(null, null, cctx.getCallingUser(), cctx.getCallingAccount()); @@ -5375,7 +5389,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac Map volumeStorageMap = dest.getStorageForDisks(); if (volumeStorageMap != null) { for (Volume vol : volumeStorageMap.keySet()) { - checkConcurrentJobsPerDatastoreThreshhold(volumeStorageMap.get(vol)); + checkConcurrentJobsPerDatastoreThreshold(volumeStorageMap.get(vol)); } } @@ -5540,7 +5554,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac return new VmJobVirtualMachineOutcome(workJob, vm.getId()); } - private void checkConcurrentJobsPerDatastoreThreshhold(final StoragePool destPool) { + private void checkConcurrentJobsPerDatastoreThreshold(final StoragePool destPool) { final Long threshold = VolumeApiService.ConcurrentMigrationsThresholdPerDatastore.value(); if (threshold != null && threshold > 0) { long count = _jobMgr.countPendingJobs("\"storageid\":\"" + destPool.getUuid() + "\"", MigrateVMCmd.class.getName(), MigrateVolumeCmd.class.getName(), MigrateVolumeCmdByAdmin.class.getName()); @@ -5561,7 +5575,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac Set uniquePoolIds = new HashSet<>(poolIds); for (Long poolId : uniquePoolIds) { StoragePoolVO pool = _storagePoolDao.findById(poolId); - checkConcurrentJobsPerDatastoreThreshhold(pool); + checkConcurrentJobsPerDatastoreThreshold(pool); } final VMInstanceVO vm = _vmDao.findByUuid(vmUuid); @@ -5608,37 +5622,63 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac final List pendingWorkJobs = _workJobDao.listPendingWorkJobs( VirtualMachine.Type.Instance, vm.getId(), - VmWorkAddVmToNetwork.class.getName()); + VmWorkAddVmToNetwork.class.getName(), network.getUuid()); VmWorkJobVO workJob = null; if (pendingWorkJobs != null && pendingWorkJobs.size() > 0) { - assert pendingWorkJobs.size() == 1; + if (pendingWorkJobs.size() > 1) { + s_logger.warn(String.format("The number of jobs to add network %s to vm %s are %d", network.getUuid(), vm.getInstanceName(), pendingWorkJobs.size())); + } workJob = pendingWorkJobs.get(0); } else { + if (s_logger.isTraceEnabled()) { + s_logger.trace(String.format("no jobs to add network %s for vm %s yet", network, vm)); + } - workJob = new VmWorkJobVO(context.getContextId()); - - workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); - workJob.setCmd(VmWorkAddVmToNetwork.class.getName()); - - workJob.setAccountId(account.getId()); - workJob.setUserId(user.getId()); - workJob.setVmType(VirtualMachine.Type.Instance); - workJob.setVmInstanceId(vm.getId()); - workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); - - // save work context info (there are some duplications) - final VmWorkAddVmToNetwork workInfo = new VmWorkAddVmToNetwork(user.getId(), account.getId(), vm.getId(), - VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, network.getId(), requested); - workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); - - _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + workJob = createVmWorkJobToAddNetwork(vm, network, requested, context, user, account); } AsyncJobExecutionContext.getCurrentExecutionContext().joinJob(workJob.getId()); return new VmJobVirtualMachineOutcome(workJob, vm.getId()); } + private VmWorkJobVO createVmWorkJobToAddNetwork( + VirtualMachine vm, + Network network, + NicProfile requested, + CallContext context, + User user, + Account account) { + VmWorkJobVO workJob; + workJob = new VmWorkJobVO(context.getContextId()); + + workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_DISPATCHER); + workJob.setCmd(VmWorkAddVmToNetwork.class.getName()); + + workJob.setAccountId(account.getId()); + workJob.setUserId(user.getId()); + workJob.setVmType(VirtualMachine.Type.Instance); + workJob.setVmInstanceId(vm.getId()); + workJob.setRelated(AsyncJobExecutionContext.getOriginJobId()); + workJob.setSecondaryObjectIdentifier(network.getUuid()); + + // save work context info (there are some duplications) + final VmWorkAddVmToNetwork workInfo = new VmWorkAddVmToNetwork(user.getId(), account.getId(), vm.getId(), + VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, network.getId(), requested); + workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo)); + + try { + _jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId()); + } catch (CloudRuntimeException e) { + if (e.getCause() instanceof EntityExistsException) { + String msg = String.format("A job to add a nic for network %s to vm %s already exists", network.getUuid(), vm.getUuid()); + s_logger.warn(msg, e); + } + throw e; + } + return workJob; + } + public Outcome removeNicFromVmThroughJobQueue( final VirtualMachine vm, final Nic nic) { @@ -5945,6 +5985,10 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac } private VmWorkJobVO createPlaceHolderWork(final long instanceId) { + return createPlaceHolderWork(instanceId, null); + } + + private VmWorkJobVO createPlaceHolderWork(final long instanceId, String secondaryObjectIdentifier) { final VmWorkJobVO workJob = new VmWorkJobVO(""); workJob.setDispatcher(VmWorkConstants.VM_WORK_JOB_PLACEHOLDER); @@ -5956,6 +6000,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac workJob.setStep(VmWorkJobVO.Step.Starting); workJob.setVmType(VirtualMachine.Type.Instance); workJob.setVmInstanceId(instanceId); + if(StringUtils.isNotBlank(secondaryObjectIdentifier)) { + workJob.setSecondaryObjectIdentifier(secondaryObjectIdentifier); + } workJob.setInitMsid(ManagementServerNode.getManagementServerId()); _workJobDao.persist(workJob); diff --git a/engine/schema/src/main/resources/META-INF/db/schema-41520to41600.sql b/engine/schema/src/main/resources/META-INF/db/schema-41520to41600.sql index 64c381e0e7a..abbf4a03634 100644 --- a/engine/schema/src/main/resources/META-INF/db/schema-41520to41600.sql +++ b/engine/schema/src/main/resources/META-INF/db/schema-41520to41600.sql @@ -791,3 +791,6 @@ ALTER TABLE cloud.user_vm_details MODIFY value varchar(5120) NOT NULL; ALTER TABLE cloud_usage.usage_network DROP PRIMARY KEY, ADD PRIMARY KEY (`account_id`,`zone_id`,`host_id`,`network_id`,`event_time_millis`); ALTER TABLE `cloud`.`user_statistics` DROP INDEX `account_id`, ADD UNIQUE KEY `account_id` (`account_id`,`data_center_id`,`public_ip_address`,`device_id`,`device_type`, `network_id`); ALTER TABLE `cloud_usage`.`user_statistics` DROP INDEX `account_id`, ADD UNIQUE KEY `account_id` (`account_id`,`data_center_id`,`public_ip_address`,`device_id`,`device_type`, `network_id`); + +ALTER TABLE `cloud`.`vm_work_job` ADD COLUMN `secondary_object` char(100) COMMENT 'any additional item that must be checked during queueing' AFTER `vm_instance_id`; +ALTER TABLE cloud.vm_work_job ADD CONSTRAINT vm_work_job_step_and_objects UNIQUE KEY (step,vm_instance_id,secondary_object); diff --git a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java index 44e39e40291..89601e6b5d2 100644 --- a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java +++ b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDao.java @@ -32,6 +32,8 @@ public interface VmWorkJobDao extends GenericDao { List listPendingWorkJobs(VirtualMachine.Type type, long instanceId, String jobCmd); + List listPendingWorkJobs(VirtualMachine.Type type, long instanceId, String jobCmd, String secondaryObjectIdentifier); + void updateStep(long workJobId, Step step); void expungeCompletedWorkJobs(Date cutDate); diff --git a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java index e81ab1ebbf7..497f12d7366 100644 --- a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java +++ b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/dao/VmWorkJobDaoImpl.java @@ -67,6 +67,7 @@ public class VmWorkJobDaoImpl extends GenericDaoBase implemen PendingWorkJobByCommandSearch.and("jobStatus", PendingWorkJobByCommandSearch.entity().getStatus(), Op.EQ); PendingWorkJobByCommandSearch.and("vmType", PendingWorkJobByCommandSearch.entity().getVmType(), Op.EQ); PendingWorkJobByCommandSearch.and("vmInstanceId", PendingWorkJobByCommandSearch.entity().getVmInstanceId(), Op.EQ); + PendingWorkJobByCommandSearch.and("secondaryObjectIdentifier", PendingWorkJobByCommandSearch.entity().getSecondaryObjectIdentifier(), Op.EQ); PendingWorkJobByCommandSearch.and("step", PendingWorkJobByCommandSearch.entity().getStep(), Op.NEQ); PendingWorkJobByCommandSearch.and("cmd", PendingWorkJobByCommandSearch.entity().getCmd(), Op.EQ); PendingWorkJobByCommandSearch.done(); @@ -119,6 +120,20 @@ public class VmWorkJobDaoImpl extends GenericDaoBase implemen return this.listBy(sc, filter); } + @Override + public List listPendingWorkJobs(VirtualMachine.Type type, long instanceId, String jobCmd, String secondaryObjectIdentifier) { + + SearchCriteria sc = PendingWorkJobByCommandSearch.create(); + sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS); + sc.setParameters("vmType", type); + sc.setParameters("vmInstanceId", instanceId); + sc.setParameters("secondaryObjectIdentifier", secondaryObjectIdentifier); + sc.setParameters("cmd", jobCmd); + + Filter filter = new Filter(VmWorkJobVO.class, "created", true, null, null); + return this.listBy(sc, filter); + } + @Override public void updateStep(long workJobId, Step step) { VmWorkJobVO jobVo = findById(workJobId); diff --git a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java index 9d30c2c87b9..777fcba5a3d 100644 --- a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java +++ b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/AsyncJobVO.java @@ -384,7 +384,7 @@ public class AsyncJobVO implements AsyncJob, JobInfo { @Override public String toString() { StringBuffer sb = new StringBuffer(); - sb.append("AsyncJobVO {id:").append(getId()); + sb.append("AsyncJobVO : {id:").append(getId()); sb.append(", userId: ").append(getUserId()); sb.append(", accountId: ").append(getAccountId()); sb.append(", instanceType: ").append(getInstanceType()); diff --git a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/VmWorkJobVO.java b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/VmWorkJobVO.java index ef0ac7daddf..a8a05d483dc 100644 --- a/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/VmWorkJobVO.java +++ b/framework/jobs/src/main/java/org/apache/cloudstack/framework/jobs/impl/VmWorkJobVO.java @@ -58,6 +58,9 @@ public class VmWorkJobVO extends AsyncJobVO { @Column(name = "vm_instance_id") long vmInstanceId; + @Column(name = "secondary_object") + String secondaryObjectIdentifier; + protected VmWorkJobVO() { } @@ -89,4 +92,25 @@ public class VmWorkJobVO extends AsyncJobVO { public void setVmInstanceId(long vmInstanceId) { this.vmInstanceId = vmInstanceId; } + + public String getSecondaryObjectIdentifier() { + return secondaryObjectIdentifier; + } + + public void setSecondaryObjectIdentifier(String secondaryObjectIdentifier) { + this.secondaryObjectIdentifier = secondaryObjectIdentifier; + } + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("VmWorkJobVO : {"). + append(", step: ").append(getStep()). + append(", vmType: ").append(getVmType()). + append(", vmInstanceId: ").append(getVmInstanceId()). + append(", secondaryObjectIdentifier: ").append(getSecondaryObjectIdentifier()). + append(super.toString()). + append("}"); + return sb.toString(); + } + } diff --git a/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java b/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java index d371e58cd67..42496927fad 100644 --- a/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java +++ b/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java @@ -1384,12 +1384,7 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Vir Account vmOwner = _accountMgr.getAccount(vmInstance.getAccountId()); _networkModel.checkNetworkPermissions(vmOwner, network); - List allNics = _nicDao.listByVmId(vmInstance.getId()); - for (NicVO nic : allNics) { - if (nic.getNetworkId() == network.getId()) { - throw new CloudRuntimeException("A NIC already exists for VM:" + vmInstance.getInstanceName() + " in network: " + network.getUuid()); - } - } + checkIfNetExistsForVM(vmInstance, network); macAddress = validateOrReplaceMacAddress(macAddress, network.getId()); @@ -1456,10 +1451,22 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Vir } } CallContext.current().putContextParameter(Nic.class, guestNic.getUuid()); - s_logger.debug("Successful addition of " + network + " from " + vmInstance); + s_logger.debug(String.format("Successful addition of %s from %s through %s", network, vmInstance, guestNic)); return _vmDao.findById(vmInstance.getId()); } + /** + * duplicated in {@see VirtualMachineManagerImpl} for a {@see VMInstanceVO} + */ + private void checkIfNetExistsForVM(VirtualMachine virtualMachine, Network network) { + List allNics = _nicDao.listByVmId(virtualMachine.getId()); + for (NicVO nic : allNics) { + if (nic.getNetworkId() == network.getId()) { + throw new CloudRuntimeException("A NIC already exists for VM:" + virtualMachine.getInstanceName() + " in network: " + network.getUuid()); + } + } + } + /** * If the given MAC address is invalid it replaces the given MAC with the next available MAC address */