# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """ Test case for Data Disk Attach to VM on ZWPS Test Path """ from nose.plugins.attrib import attr from marvin.cloudstackTestCase import cloudstackTestCase from marvin.lib.utils import (cleanup_resources, validateList) from marvin.lib.base import (Account, ServiceOffering, DiskOffering, Volume, VirtualMachine, StoragePool ) from marvin.lib.common import (get_domain, get_zone, get_template ) from marvin.codes import (PASS, ZONETAG1, CLUSTERTAG1) class TestAttachDataDisk(cloudstackTestCase): @classmethod def setUpClass(cls): testClient = super(TestAttachDataDisk, cls).getClsTestClient() cls.apiclient = testClient.getApiClient() cls.testdata = testClient.getParsedTestDataConfig() cls.hypervisor = cls.testClient.getHypervisorInfo() # Get Zone, Domain and templates cls.domain = get_domain(cls.apiclient) cls.zone = get_zone(cls.apiclient, testClient.getZoneForTests()) cls._cleanup = [] cls.template = get_template( cls.apiclient, cls.zone.id, cls.testdata["ostype"]) cls.skiptest = False try: cls.pools = StoragePool.list(cls.apiclient, zoneid=cls.zone.id) except Exception as e: cls.skiptest = True return try: # Create an account cls.account = Account.create( cls.apiclient, cls.testdata["account"], domainid=cls.domain.id ) cls._cleanup.append(cls.account) # Create user api client of the account cls.userapiclient = testClient.getUserApiClient( UserName=cls.account.name, DomainName=cls.account.domain ) # Create Service offering cls.service_offering_zone1 = ServiceOffering.create( cls.apiclient, cls.testdata["service_offering"], tags=ZONETAG1 ) cls._cleanup.append(cls.service_offering_zone1) # Create Disk offering cls.disk_offering = DiskOffering.create( cls.apiclient, cls.testdata["disk_offering"] ) cls._cleanup.append(cls.disk_offering) except Exception as e: cls.tearDownClass() raise e return @classmethod def tearDownClass(cls): try: cleanup_resources(cls.apiclient, cls._cleanup) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) def setUp(self): self.apiclient = self.testClient.getApiClient() self.dbclient = self.testClient.getDbConnection() self.cleanup = [] def tearDown(self): try: for storagePool in self.pools: StoragePool.update(self.apiclient, id=storagePool.id, tags="") if hasattr(self, "data_volume_created"): data_volumes_list = Volume.list( self.userapiclient, id=self.data_volume_created.id, virtualmachineid=self.vm.id ) if data_volumes_list: self.vm.detach_volume( self.userapiclient, data_volumes_list[0] ) status = validateList(data_volumes_list) self.assertEqual( status[0], PASS, "DATA Volume List Validation Failed") cleanup_resources(self.apiclient, self.cleanup) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) return @attr(tags=["basic", "advanced"], required_hardware="true") def test_01_attach_datadisk_to_vm_on_zwps(self): """ Attach Data Disk To VM on ZWPS 1. Check if zwps storage pool exists. 2. Adding tag to zone wide primary storage 3. Launch a VM on ZWPS 4. Attach data disk to vm which is on zwps. 5. Verify disk is attached. """ # Step 1 if len(list(storagePool for storagePool in self.pools if storagePool.scope == "ZONE")) < 1: self.skipTest("There must be at least one zone wide \ storage pools available in the setup") # Adding tags to Storage Pools zone_no = 1 for storagePool in self.pools: if storagePool.scope == "ZONE": StoragePool.update( self.apiclient, id=storagePool.id, tags=[ZONETAG1[:-1] + repr(zone_no)]) zone_no += 1 self.vm = VirtualMachine.create( self.apiclient, self.testdata["small"], templateid=self.template.id, accountid=self.account.name, domainid=self.account.domainid, serviceofferingid=self.service_offering_zone1.id, zoneid=self.zone.id ) self.data_volume_created = Volume.create( self.userapiclient, self.testdata["volume"], zoneid=self.zone.id, account=self.account.name, domainid=self.account.domainid, diskofferingid=self.disk_offering.id ) self.cleanup.append(self.data_volume_created) # Step 2 self.vm.attach_volume( self.userapiclient, self.data_volume_created ) data_volumes_list = Volume.list( self.userapiclient, id=self.data_volume_created.id, virtualmachineid=self.vm.id ) data_volume = data_volumes_list[0] status = validateList(data_volume) # Step 3 self.assertEqual( status[0], PASS, "Check: Data if Disk is attached to VM") return class TestAttachDataDiskOnCWPS(cloudstackTestCase): @classmethod def setUpClass(cls): testClient = super(TestAttachDataDiskOnCWPS, cls).getClsTestClient() cls.apiclient = testClient.getApiClient() cls.testdata = testClient.getParsedTestDataConfig() cls.hypervisor = cls.testClient.getHypervisorInfo() # Get Zone, Domain and templates cls.domain = get_domain(cls.apiclient) cls.zone = get_zone(cls.apiclient, testClient.getZoneForTests()) cls._cleanup = [] cls.template = get_template( cls.apiclient, cls.zone.id, cls.testdata["ostype"]) cls.skiptest = False try: cls.pools = StoragePool.list( cls.apiclient, zoneid=cls.zone.id, scope="CLUSTER") except Exception as e: cls.skiptest = True return try: # Create an account cls.account = Account.create( cls.apiclient, cls.testdata["account"], domainid=cls.domain.id ) cls._cleanup.append(cls.account) # Create user api client of the account cls.userapiclient = testClient.getUserApiClient( UserName=cls.account.name, DomainName=cls.account.domain ) # Create Service offering cls.service_offering = ServiceOffering.create( cls.apiclient, cls.testdata["service_offering"], ) cls._cleanup.append(cls.service_offering) # Create Disk offering cls.disk_offering = DiskOffering.create( cls.apiclient, cls.testdata["disk_offering"], custom=True, tags=CLUSTERTAG1, ) cls._cleanup.append(cls.disk_offering) except Exception as e: cls.tearDownClass() raise e return @classmethod def tearDownClass(cls): try: cleanup_resources(cls.apiclient, cls._cleanup) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) def setUp(self): self.apiclient = self.testClient.getApiClient() self.dbclient = self.testClient.getDbConnection() self.cleanup = [] def tearDown(self): try: for storagePool in self.pools: StoragePool.update(self.apiclient, id=storagePool.id, tags="") if hasattr(self, "data_volume_created"): data_volumes_list = Volume.list( self.userapiclient, id=self.data_volume_created.id, virtualmachineid=self.vm.id ) if data_volumes_list: self.vm.detach_volume( self.userapiclient, data_volumes_list[0] ) status = validateList(data_volumes_list) self.assertEqual( status[0], PASS, "DATA Volume List Validation Failed") cleanup_resources(self.apiclient, self.cleanup) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) return @attr(tags=["basic", "advanced"], required_hardware="true") def test_01_attach_datadisk_to_vm_on_zwps(self): """ Attach Data Disk on CWPS To VM 1. Check if zwps storage pool exists. 2. Adding tag to zone wide primary storage 3. Launch a VM 4. Attach data disk to vm. 5. Verify disk is attached and in correct storage pool. """ # Step 1 if len(list(self.pools)) < 1: self.skipTest("There must be at least one zone wide \ storage pools available in the setup") # Step 2 # Adding tags to Storage Pools StoragePool.update( self.apiclient, id=self.pools[0].id, tags=[CLUSTERTAG1]) # Launch VM self.vm = VirtualMachine.create( self.apiclient, self.testdata["small"], templateid=self.template.id, accountid=self.account.name, domainid=self.account.domainid, serviceofferingid=self.service_offering_zone1.id, zoneid=self.zone.id ) self.testdata["volume"]["zoneid"] = self.zone.id self.testdata["volume"]["customdisksize"] = 1 self.data_volume_created = Volume.create_custom_disk( self.userapiclient, self.testdata["volume"], account=self.account.name, domainid=self.account.domainid, diskofferingid=self.disk_offering.id, ) self.cleanup.append(self.data_volume_created) # Step 4 self.vm.attach_volume( self.userapiclient, self.data_volume_created ) data_volumes_list = Volume.list( self.userapiclient, virtualmachineid=self.vm.id, type="DATA", listall=True ) self.debug("list volumes using vm id %s" % dir(data_volumes_list[0])) data_volumes_list = Volume.list(self.apiclient, id=self.data_volume_created.id, listall=True) data_volume = data_volumes_list[0] status = validateList(data_volume) # Step 5 self.assertEqual( status[0], PASS, "Check: volume list is valid") self.assertEqual( data_volume.state, "Ready", "Check: Data volume is attached to VM") if data_volume.storage != self.pools[0].name: self.fail("check if volume is created in correct storage pool") return