cloudstack/test/integration/smoke/test_storage_policy.py
Wei Zhou fadd74aaca
network: fix vm can be deployed on L2 network of other accounts (#5784)
* Update #5769: fix domain admin can deploy vm on L2 network of other users

* test: fix test_storage_policy.py

* Update #5784: revert part of changes in #2420
2022-01-11 12:16:00 +05:30

255 lines
9.1 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
""" Test for VMWare storage policies
"""
# Import Local Modules
from nose.plugins.attrib import attr
from marvin.cloudstackTestCase import cloudstackTestCase
from marvin.lib.utils import cleanup_resources
from marvin.lib.base import (Account,
VirtualMachine,
ServiceOffering,
NetworkOffering,
Network,
Volume,
DiskOffering)
from marvin.lib.common import (get_zone,
get_domain,
get_test_template)
from marvin.cloudstackAPI import (importVsphereStoragePolicies,
listVsphereStoragePolicies,
listVsphereStoragePolicyCompatiblePools)
class TestVMWareStoragePolicies(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.testClient = super(TestVMWareStoragePolicies, cls).getClsTestClient()
cls.apiclient = cls.testClient.getApiClient()
cls.testdata = cls.testClient.getParsedTestDataConfig()
# Get Zone, Domain and templates
cls.domain = get_domain(cls.apiclient)
cls.zone = get_zone(cls.apiclient, cls.testClient.getZoneForTests())
cls._cleanup = []
cls.hypervisor = cls.testClient.getHypervisorInfo()
cls.network_offering = NetworkOffering.create(
cls.apiclient,
cls.testdata["l2-network_offering"],
)
cls.network_offering.update(cls.apiclient, state='Enabled')
cls.template = get_test_template(
cls.apiclient,
cls.zone.id,
cls.hypervisor,
deploy_as_is=cls.hypervisor.lower() == "vmware"
)
cls._cleanup.append(cls.network_offering)
return
@classmethod
def tearDownClass(cls):
try:
cleanup_resources(cls.apiclient, reversed(cls._cleanup))
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
def setUp(self):
self.dbclient = self.testClient.getDbConnection()
self.hypervisor = self.testClient.getHypervisorInfo()
self.testdata["virtual_machine"]["zoneid"] = self.zone.id
self.testdata["virtual_machine"]["template"] = self.template.id
self.account = Account.create(
self.apiclient,
self.testdata["account"],
domainid=self.domain.id
)
self.cleanup = []
self.cleanup.append(self.account)
return
def tearDown(self):
try:
self.debug("Cleaning up the resources")
cleanup_resources(self.apiclient, reversed(self.cleanup))
self.debug("Cleanup complete!")
except Exception as e:
self.debug("Warning! Exception in tearDown: %s" % e)
def import_vmware_storage_policies(self, apiclient):
cmd = importVsphereStoragePolicies.importVsphereStoragePoliciesCmd()
cmd.zoneid = self.zone.id
return apiclient.importVsphereStoragePolicies(cmd)
def list_storage_policies(self, apiclient):
cmd = listVsphereStoragePolicies.listVsphereStoragePoliciesCmd()
cmd.zoneid = self.zone.id
return apiclient.listVsphereStoragePolicies(cmd)
def list_storage_policy_compatible_pools(self, apiclient, policyid):
cmd = listVsphereStoragePolicyCompatiblePools.listVsphereStoragePolicyCompatiblePoolsCmd()
cmd.zoneid = self.zone.id
cmd.policyid = policyid
return apiclient.listVsphereStoragePolicyCompatiblePools(cmd)
def create_volume(self, apiclient):
cmd = create
@attr(
tags=[
"advanced",
"eip",
"advancedns",
"basic",
"sg"],
required_hardware="true")
def test_01_import_storage_policies(self):
"""Test VMWare storage policies
"""
# Validate the following:
# 1. Import VMWare storage policies - the command should return non-zero result
# 2. List current VMWare storage policies - the command should return non-zero result
# 3. Create service offering with first of the imported policies
# 4. Create disk offering with first of the imported policies
# 5. Create VM using the already created service offering
# 6. Create volume using the already created disk offering
# 7. Attach this volume to our VM
# 8. Detach the volume from our VM
self.hypervisor = self.testClient.getHypervisorInfo()
if self.hypervisor.lower() != "vmware":
self.skipTest(
"VMWare storage policies feature is not supported on %s" %
self.hypervisor.lower())
self.debug("Importing VMWare storage policies")
imported_policies = self.import_vmware_storage_policies(self.apiclient)
if len(imported_policies) == 0:
self.skipTest("There are no VMWare storage policies")
self.debug("Listing VMWare storage policies")
listed_policies = self.list_storage_policies(self.apiclient)
self.assertNotEqual(
len(listed_policies),
0,
"Check if list of storage policies is not zero"
)
self.assertEqual(
len(imported_policies),
len(listed_policies),
"Check if the number of imported policies is identical to the number of listed policies"
)
selected_policy = None
for imported_policy in imported_policies:
compatible_pools = self.list_storage_policy_compatible_pools(self.apiclient, imported_policy.id)
if compatible_pools and len(compatible_pools) > 0:
selected_policy = imported_policy
break
if not selected_policy:
self.skipTest("There are no compatible storage pools with the imported policies")
# Create service offering with the first storage policy from the list
service_offering = ServiceOffering.create(
self.apiclient,
self.testdata["service_offering"],
storagepolicy=selected_policy.id
)
self.cleanup.append(service_offering)
# Create disk offering with the first storage policy from the list
disk_offering = DiskOffering.create(
self.apiclient,
self.testdata["disk_offering"],
storagepolicy=selected_policy.id
)
self.cleanup.append(disk_offering)
l2_network = Network.create(
self.apiclient,
self.testdata["l2-network"],
zoneid=self.zone.id,
accountid=self.account.name,
domainid=self.account.domainid,
networkofferingid=self.network_offering.id
)
self.cleanup.append(l2_network)
virtual_machine = VirtualMachine.create(
self.apiclient,
self.testdata["small"],
templateid=self.template.id,
accountid=self.account.name,
domainid=self.account.domainid,
zoneid=self.zone.id,
networkids=l2_network.id,
serviceofferingid=service_offering.id,
)
self.cleanup.append(virtual_machine)
vms = VirtualMachine.list(
self.apiclient,
id=virtual_machine.id,
listall=True
)
self.assertEqual(
isinstance(vms, list),
True,
"listVirtualMachines returned invalid object in response."
)
self.assertNotEqual(
len(vms),
0,
"listVirtualMachines returned empty list."
)
self.debug("Deployed VM on host: %s" % vms[0].hostid)
volume = Volume.create(
self.apiclient,
self.testdata["volume"],
account=self.account.name,
domainid=self.account.domainid,
zoneid=self.zone.id,
diskofferingid=disk_offering.id
)
self.cleanup.append(volume)
list_volume_response = Volume.list(
self.apiclient,
id=volume.id
)
self.assertEqual(
isinstance(list_volume_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
list_volume_response,
None,
"Check if volume exists in ListVolumes"
)
return