cloudstack/test/integration/component/test_hypervisor_capabilities.py
John Bampton 8e538028b2
Enable Flake8 rule W391 (#6606)
Best practice is to have one blank line at the end of Python files.

Remove unneeded blank lines from the end of files
2022-11-13 11:00:07 +01:00

289 lines
11 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
""" Test cases for Testing Hypervisor Capabilities
"""
from nose.plugins.attrib import attr
from marvin.cloudstackTestCase import cloudstackTestCase
from marvin.lib.utils import (cleanup_resources,
validateList)
from marvin.lib.base import (Account,
ServiceOffering,
DiskOffering,
VirtualMachine,
Volume,
Host,
VmSnapshot)
from marvin.lib.common import (get_domain,
get_zone,
get_template,
list_virtual_machines,
list_ssvms,
list_routers)
from marvin.lib.decoratorGenerators import skipTestIf
from marvin.cloudstackAPI import (updateHypervisorCapabilities,
listHypervisorCapabilities)
from marvin.codes import PASS
class TestHypervisorCapabilities(cloudstackTestCase):
@classmethod
def setUpClass(cls):
testClient = super(TestHypervisorCapabilities, cls).getClsTestClient()
cls.apiclient = testClient.getApiClient()
cls.services = testClient.getParsedTestDataConfig()
cls.zone = get_zone(cls.apiclient, testClient.getZoneForTests())
cls.domain = get_domain(cls.apiclient)
cls.hypervisor = cls.testClient.getHypervisorInfo()
# Get Zone, Domain and templates
cls.notSupported = True
cls._cleanup = []
cls.hosts = Host.list(cls.apiclient, zoneid=cls.zone.id, type='Routing')
if isinstance(cls.hosts, list) and len(cls.hosts) > 0:
cls.host = cls.hosts[0]
cls.notSupported = False
cls.hypervisorversion = "default"
if hasattr(cls.host, 'hypervisorversion'):
cls.hypervisorversion = cls.host.hypervisorversion
if cls.notSupported == False:
cls.notSupported = True
cmdList = listHypervisorCapabilities.listHypervisorCapabilitiesCmd()
cmdList.hypervisor = cls.hypervisor
capabilities = cls.apiclient.listHypervisorCapabilities(cmdList)
for capability in capabilities:
if capability.hypervisorversion == cls.hypervisorversion:
cls.hostCapability = capability
cls.notSupported = False
break
if cls.notSupported == True:
cls.hypervisorversion = "default"
for capability in capabilities:
if capability.hypervisorversion == cls.hypervisorversion:
cls.hostCapability = capability
cls.notSupported = False
break
if cls.notSupported == False:
cls.template = get_template(
cls.apiclient,
cls.zone.id,
cls.services["ostype"])
# Create an account
cls.account = Account.create(
cls.apiclient,
cls.services["account"],
domainid=cls.domain.id
)
cls._cleanup.append(cls.account)
# Create user api client of the account
cls.userapiclient = testClient.getUserApiClient(
UserName=cls.account.name,
DomainName=cls.account.domain
)
# Create Service offering
cls.service_offering = ServiceOffering.create(
cls.apiclient,
cls.services["service_offering"],
hosttags="host1"
)
cls._cleanup.append(cls.service_offering)
cls.disk_offering = DiskOffering.create(
cls.apiclient,
cls.services["disk_offering"]
)
cls._cleanup.append(cls.disk_offering)
return
@classmethod
def tearDownClass(cls):
super(TestHypervisorCapabilities, cls).tearDownClass()
def setUp(self):
self.cleanup = []
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
def tearDown(self):
try:
if self.notSupported == False:
Host.update(self.apiclient, id=self.host.id, hosttags="")
self.updateHostHypervisorCapability(self.hostCapability.id,
self.hostCapability.maxdatavolumeslimit,
self.hostCapability.vmsnapshotenabled)
super(TestHypervisorCapabilities, self).tearDown()
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
@skipTestIf("notSupported")
@attr(tags=["advanced", "basic"], required_hardware="false")
def test_01_check_hypervisor_max_data_volume_limit(self):
""" Test hypervisor maxdatavolumeslimit effect
# 1. Set maxdatavolumeslimit to 1 for hypervisor
# 2. List capabilities and verify value
# 3. Deploy a VM and attach a volume to it
# 4. Try attach another volume, it should fail
# 5. Set maxdatavolumeslimit to 32 for hypervisor
# 6. Try attach second volume, it should succeed
"""
self.updateHostHypervisorCapability(self.hostCapability.id, 1)
capabilities = self.listHostHypervisorCapabilities(self.hostCapability.id)
self.assertTrue(isinstance(capabilities, list), "listHypervisorCapabilities response not a valid list")
self.assertEqual(len(capabilities), 1, "listHypervisorCapabilities response not valid")
self.assertEqual(capabilities[0].maxdatavolumeslimit,
1,
"listHypervisorCapabilities response maxdatavolumeslimit value not 1")
Host.update(self.apiclient, id=self.host.id, hosttags="host1")
volume_created1 = Volume.create(
self.userapiclient,
self.services["volume"],
zoneid=self.zone.id,
diskofferingid=self.disk_offering.id
)
self.cleanup.append(volume_created1)
volume_created2 = Volume.create(
self.userapiclient,
self.services["volume"],
zoneid=self.zone.id,
diskofferingid=self.disk_offering.id
)
self.cleanup.append(volume_created2)
vm = VirtualMachine.create(
self.userapiclient,
self.services["small"],
templateid=self.template.id,
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
zoneid=self.zone.id
)
self.cleanup.append(vm)
vm.attach_volume(
self.userapiclient,
volume_created1
)
try:
vm.attach_volume(
self.userapiclient,
volume_created2
)
vm.detach_volume(self.userapiclient, volume_created1)
vm.detach_volume(self.userapiclient, volume_created2)
self.fail("Successful to attach 2 DATA disks when max DATA disk limit was set to 1")
except Exception as e:
self.debug("Failed to attach 2nd DATA disk when max DATA disk limit was set to 1: %s" % e)
self.updateHostHypervisorCapability(self.hostCapability.id, 32)
vm.attach_volume(
self.userapiclient,
volume_created2
)
vm.stop(self.userapiclient, forced=True)
vm.detach_volume(self.userapiclient, volume_created1)
vm.detach_volume(self.userapiclient, volume_created2)
@skipTestIf("notSupported")
@attr(tags=["advanced", "basic"], required_hardware="false")
def test_02_check_hypervisor_vm_snapshot(self):
""" Test hypervisor vmsnapshotenabled effect
# 1. Set vmsnapshotenabled to false for hypervisor
# 2. List capabilities and verify value
# 3. Deploy a VM
# 4. Try VM snapshot, it should fail
# 5. Set vmsnapshotenabled to true for hypervisor
# 6. Try VM snapshot again, it should succeed
"""
if self.hypervisor == "KVM":
self.skipTest("Skipping test: Reason - VM Snapshot of running VM is not supported for KVM")
self.updateHostHypervisorCapability(self.hostCapability.id, None, False)
capabilities = self.listHostHypervisorCapabilities(self.hostCapability.id)
self.assertTrue(isinstance(capabilities, list), "listHypervisorCapabilities response not a valid list")
self.assertEqual(len(capabilities), 1, "listHypervisorCapabilities response not valid")
self.assertEqual(capabilities[0].vmsnapshotenabled,
False,
"listHypervisorCapabilities response vmsnapshotenabled value not False")
Host.update(self.apiclient, id=self.host.id, hosttags="host1")
vm = VirtualMachine.create(
self.userapiclient,
self.services["small"],
templateid=self.template.id,
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
zoneid=self.zone.id
)
self.cleanup.append(vm)
try:
fail_snapshot = VmSnapshot.create(
self.userapiclient,
vmid=vm.id,
snapshotmemory="false",
name="Test Snapshot",
description="Test Snapshot Desc"
)
self.cleanup.append(fail_snapshot)
self.fail("Successful to take VM snapshot even when vmsnapshotenabled was set to False")
except Exception as e:
self.debug("Failed to take VM snapshot even vmsnapshotenabled was set to False: %s" % e)
self.updateHostHypervisorCapability(self.hostCapability.id, None, True)
vm_snapshot = VmSnapshot.create(
self.userapiclient,
vmid=vm.id,
snapshotmemory="false",
name="Test Snapshot",
description="Test Snapshot Desc"
)
self.cleanup.append(vm_snapshot)
def updateHostHypervisorCapability(self, id, maxDataVolumes, vmSnapshotEnabled=None):
cmd = updateHypervisorCapabilities.updateHypervisorCapabilitiesCmd()
cmd.id = id
if maxDataVolumes != None:
cmd.maxdatavolumeslimit = maxDataVolumes
if vmSnapshotEnabled != None:
cmd.vmsnapshotenabled = vmSnapshotEnabled
self.apiclient.updateHypervisorCapabilities(cmd)
def listHostHypervisorCapabilities(self, id, hypervisor=None):
cmd = listHypervisorCapabilities.listHypervisorCapabilitiesCmd()
cmd.id = id
return self.apiclient.listHypervisorCapabilities(cmd)