Fix tab/space issues

Several test failures occurred due to tab issues

Signed-off-by: Prasanna Santhanam <tsp@apache.org>
This commit is contained in:
Prasanna Santhanam 2013-06-30 13:20:27 +05:30
parent 2a51c3e2c9
commit c7315975d2
8 changed files with 409 additions and 482 deletions

View File

@ -753,7 +753,7 @@ class TestServiceOfferingHierarchy(cloudstackTestCase):
domainid=cls.domain_2.id
)
cls._cleanup = [
cls._cleanup = [
cls.account_2,
cls.domain_2,
cls.service_offering,

View File

@ -304,15 +304,14 @@ class TestListAffinityGroups(cloudstackTestCase):
def tearDown(self):
try:
cls.api_client = super(TestListAffinityGroups, cls).getClsTestClient().getApiClient()
self.api_client = super(TestListAffinityGroups, self).getClsTestClient().getApiClient()
#Clean up, terminate the created templates
cleanup_resources(cls.api_client, cls.cleanup)
cleanup_resources(self.api_client, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
@classmethod
def tearDownClass(cls):
try:
cls.api_client = super(TestListAffinityGroups, cls).getClsTestClient().getApiClient()
#Clean up, terminate the created templates
@ -327,10 +326,6 @@ class TestListAffinityGroups(cloudstackTestCase):
api_client = self.api_client
if aff_grp == None:
self.services["host_anti_affinity_0"]
#if acc == None:
# acc = self.account.name
#if domainid == None:
# domainid = self.domain.id
try:
self.aff_grp.append(AffinityGroup.create(api_client,
@ -339,34 +334,25 @@ class TestListAffinityGroups(cloudstackTestCase):
raise Exception("Error: Creation of Affinity Group failed : %s" %e)
def create_vm_in_aff_grps(self, ag_list):
#try:
self.debug('Creating VM in AffinityGroup=%s' % ag_list[0])
vm = VirtualMachine.create(
self.api_client,
self.services["virtual_machine"],
templateid=self.template.id,
#accountid=self.account.name,
#domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
affinitygroupnames=ag_list
)
self.debug('Created VM=%s in Affinity Group=%s' %
(vm.id, ag_list[0]))
#except Exception:
#self.debug('Unable to create VM in a Affinity Group=%s'
# % ag_list[0])
list_vm = list_virtual_machines(self.api_client, id=vm.id)
self.debug('Creating VM in AffinityGroup=%s' % ag_list[0])
vm = VirtualMachine.create(
self.api_client,
self.services["virtual_machine"],
templateid=self.template.id,
serviceofferingid=self.service_offering.id,
affinitygroupnames=ag_list
)
self.debug('Created VM=%s in Affinity Group=%s' %
(vm.id, ag_list[0]))
list_vm = list_virtual_machines(self.api_client, id=vm.id)
self.assertEqual(isinstance(list_vm, list), True,
"Check list response returns a valid list")
self.assertNotEqual(len(list_vm),0,
"Check VM available in List Virtual Machines")
vm_response = list_vm[0]
self.assertEqual(vm_response.state, 'Running',
msg="VM is not in Running state")
return vm, vm_response.hostid
def test_01_list_aff_grps_for_vm(self):
@ -543,11 +529,6 @@ class TestDeleteAffinityGroups(cloudstackTestCase):
api_client = self.api_client
if aff_grp == None:
self.services["host_anti_affinity_0"]
#if acc == None:
# acc = self.account.name
#if domainid == None:
# domainid = self.domain.id
try:
self.aff_grp.append(AffinityGroup.create(api_client,
aff_grp, acc, domainid))
@ -555,24 +536,18 @@ class TestDeleteAffinityGroups(cloudstackTestCase):
raise Exception("Error: Creation of Affinity Group failed : %s" %e)
def create_vm_in_aff_grps(self, ag_list):
#try:
self.debug('Creating VM in AffinityGroup=%s' % ag_list[0])
vm = VirtualMachine.create(
self.api_client,
self.services["virtual_machine"],
templateid=self.template.id,
#accountid=self.account.name,
#domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
affinitygroupnames=ag_list
)
self.debug('Created VM=%s in Affinity Group=%s' %
(vm.id, ag_list[0]))
#except Exception:
#self.debug('Unable to create VM in a Affinity Group=%s'
# % ag_list[0])
self.debug('Creating VM in AffinityGroup=%s' % ag_list[0])
vm = VirtualMachine.create(
self.api_client,
self.services["virtual_machine"],
templateid=self.template.id,
serviceofferingid=self.service_offering.id,
affinitygroupnames=ag_list
)
self.debug('Created VM=%s in Affinity Group=%s' %
(vm.id, ag_list[0]))
list_vm = list_virtual_machines(self.api_client, id=vm.id)
list_vm = list_virtual_machines(self.api_client, id=vm.id)
self.assertEqual(isinstance(list_vm, list), True,
"Check list response returns a valid list")
@ -817,11 +792,6 @@ class TestUpdateVMAffinityGroups(cloudstackTestCase):
api_client = self.api_client
if aff_grp == None:
self.services["host_anti_affinity_0"]
#if acc == None:
# acc = self.account.name
#if domainid == None:
# domainid = self.domain.id
try:
self.aff_grp.append(AffinityGroup.create(api_client,
aff_grp, acc, domainid))
@ -829,24 +799,18 @@ class TestUpdateVMAffinityGroups(cloudstackTestCase):
raise Exception("Error: Creation of Affinity Group failed : %s" %e)
def create_vm_in_aff_grps(self, ag_list):
#try:
self.debug('Creating VM in AffinityGroup=%s' % ag_list[0])
vm = VirtualMachine.create(
self.api_client,
self.debug('Creating VM in AffinityGroup=%s' % ag_list[0])
vm = VirtualMachine.create(
self.api_client,
self.services["virtual_machine"],
templateid=self.template.id,
#accountid=self.account.name,
#domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
affinitygroupnames=ag_list
)
self.debug('Created VM=%s in Affinity Group=%s' %
(vm.id, ag_list[0]))
#except Exception:
#self.debug('Unable to create VM in a Affinity Group=%s'
# % ag_list[0])
self.debug('Created VM=%s in Affinity Group=%s' %
(vm.id, ag_list[0]))
list_vm = list_virtual_machines(self.api_client, id=vm.id)
list_vm = list_virtual_machines(self.api_client, id=vm.id)
self.assertEqual(isinstance(list_vm, list), True,
"Check list response returns a valid list")
@ -996,7 +960,7 @@ class TestUpdateVMAffinityGroups(cloudstackTestCase):
vm1.start(self.api_client)
list_aff_grps = AffinityGroup.list(self.api_client,
virtualmachineid=vm.id)
virtualmachineid=vm1.id)
self.assertEqual(list_aff_grps, [], "The affinity groups list is not empyty")
vm1.delete(self.api_client)
@ -1096,10 +1060,6 @@ class TestDeployVMAffinityGroups(cloudstackTestCase):
api_client = self.api_client
if aff_grp == None:
self.services["host_anti_affinity_0"]
#if acc == None:
# acc = self.account.name
#if domainid == None:
# domainid = self.domain.id
try:
self.aff_grp.append(AffinityGroup.create(api_client,
@ -1111,21 +1071,19 @@ class TestDeployVMAffinityGroups(cloudstackTestCase):
if api_client == None:
api_client = self.api_client
self.debug('Creating VM in AffinityGroup=%s' % ag_list)
vm = VirtualMachine.create(
self.debug('Creating VM in AffinityGroup=%s' % ag_list)
vm = VirtualMachine.create(
api_client,
self.services["virtual_machine"],
templateid=self.template.id,
#accountid=self.account.name,
#domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
affinitygroupnames=ag_list,
affinitygroupids=ag_ids
)
self.debug('Created VM=%s in Affinity Group=%s' %
(vm.id, ag_list))
self.debug('Created VM=%s in Affinity Group=%s' %
(vm.id, ag_list))
list_vm = list_virtual_machines(self.api_client, id=vm.id)
list_vm = list_virtual_machines(self.api_client, id=vm.id)
self.assertEqual(isinstance(list_vm, list), True,
"Check list response returns a valid list")
@ -1143,7 +1101,6 @@ class TestDeployVMAffinityGroups(cloudstackTestCase):
"""
Deploy VM without affinity group
"""
vm1, hostid1 = self.create_vm_in_aff_grps()
vm1.delete(self.api_client)
@ -1441,10 +1398,6 @@ class TestAffinityGroupsAdminUser(cloudstackTestCase):
api_client = self.api_client
if aff_grp == None:
self.services["host_anti_affinity_0"]
#if acc == None:
# acc = self.account.name
#if domainid == None:
# domainid = self.domain.id
try:
self.aff_grp.append(AffinityGroup.create(api_client,
@ -1456,21 +1409,19 @@ class TestAffinityGroupsAdminUser(cloudstackTestCase):
if api_client == None:
api_client = self.api_client
self.debug('Creating VM in AffinityGroup=%s' % ag_list)
vm = VirtualMachine.create(
self.debug('Creating VM in AffinityGroup=%s' % ag_list)
vm = VirtualMachine.create(
api_client,
self.services["virtual_machine"],
templateid=self.template.id,
#accountid=self.account.name,
#domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
affinitygroupnames=ag_list,
affinitygroupids=ag_ids
)
self.debug('Created VM=%s in Affinity Group=%s' %
(vm.id, ag_list))
self.debug('Created VM=%s in Affinity Group=%s' %
(vm.id, ag_list))
list_vm = list_virtual_machines(self.api_client, id=vm.id)
list_vm = list_virtual_machines(self.api_client, id=vm.id)
self.assertEqual(isinstance(list_vm, list), True,
"Check list response returns a valid list")

View File

@ -730,9 +730,9 @@ class TestNetScalerDedicated(cloudstackTestCase):
networkofferingid=self.network_offering.id,
zoneid=self.zone.id
)
self.debug("Deploying an instance in account: %s" % self.account_2.account.name)
self.debug("Deploying an instance in account: %s" % self.account_2.account.name)
with self.assertRaises(Exception):
VirtualMachine.create(
VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
accountid=self.account_2.account.name,
@ -740,7 +740,7 @@ class TestNetScalerDedicated(cloudstackTestCase):
serviceofferingid=self.service_offering.id,
networkids=[str(self.network.id)]
)
self.debug("Deply instacne in dedicated Network offering mode failed")
self.debug("Deply instance in dedicated Network offering mode failed")
return
@ -1285,7 +1285,7 @@ class TestNetScalerNoCapacity(cloudstackTestCase):
)
if isinstance(physical_networks, list):
physical_network = physical_networks[0]
cls.services["netscaler"]["lbdevicecapacity"] = 2
cls.services["netscaler"]["lbdevicecapacity"] = 2
cls.netscaler = NetScaler.add(
cls.api_client,
cls.services["netscaler"],

View File

@ -2098,7 +2098,7 @@ class TestSharedNetworks(cloudstackTestCase):
networkofferingid=self.shared_network_offering.id,
zoneid=self.zone.id,
)
self.cleanup_networks.append(self.network1)
self.cleanup_networks.append(self.network1)
self.fail("Network got created with used vlan id, which is invalid")
except Exception as e:
self.debug("Network creation failed because the valn id being used by another network.")

View File

@ -424,9 +424,9 @@ class TestVPCRoutersBasic(cloudstackTestCase):
@attr(tags=["advanced", "intervlan"])
def test_02_reboot_router_after_creating_vpc(self):
""" Test to reboot the router after creating a VPC
"""
# Validate the following
""" Test to reboot the router after creating a VPC
"""
# Validate the following
# 1. Create a VPC with cidr - 10.1.1.1/16
# 2. Reboot the VPC Virtual Router which is created as a result of VPC creation.
# Stop the VPC Router
@ -473,9 +473,9 @@ class TestVPCRoutersBasic(cloudstackTestCase):
@attr(tags=["advanced", "intervlan"])
def test_03_destroy_router_after_creating_vpc(self):
""" Test to destroy the router after creating a VPC
"""
# Validate the following
""" Test to destroy the router after creating a VPC
"""
# Validate the following
# 1. Create a VPC with cidr - 10.1.1.1/16
# 2. Destroy the VPC Virtual Router which is created as a result of VPC creation.
self.validate_vpc_offering(self.vpc_off)
@ -528,15 +528,15 @@ class TestVPCRoutersBasic(cloudstackTestCase):
"List Routers should return a valid list"
)
self.migrate_router(routers[0])
return
return
@attr(tags=["advanced", "intervlan"])
def test_05_change_service_offerring_vpc(self):
""" Tests to change service offering of the Router after
creating a vpc
"""
""" Tests to change service offering of the Router after
creating a vpc
"""
# Validate the following
# Validate the following
# 1. Create a VPC with cidr - 10.1.1.1/16
# 2. Change the service offerings of the VPC Virtual Router which is created as a result of VPC creation.
@ -568,7 +568,7 @@ class TestVPCRoutersBasic(cloudstackTestCase):
)
self.debug("Changing service offering for the Router %s" % router.id)
try:
router = Router.change_service_offering(self.apiclient,
router = Router.change_service_offering(self.apiclient,
router.id,
service_offering.id
)
@ -589,7 +589,7 @@ class TestVPCRoutersBasic(cloudstackTestCase):
"Changing service offering failed as id is %s and expected"
"is %s" % (router.serviceofferingid, service_offering.id)
)
return
return
class TestVPCRouterOneNetwork(cloudstackTestCase):
@ -748,18 +748,6 @@ class TestVPCRouterOneNetwork(cloudstackTestCase):
account=cls.account.name,
domainid=cls.account.domainid
)
# cls.assertEqual(
# isinstance(public_ips, list),
# True,
# "List public Ip for network should list the Ip addr"
# )
# cls.assertEqual(
# public_ips[0].ipaddress,
# public_ip_2.ipaddress.ipaddress,
# "List public Ip for network should list the Ip addr"
# )
#
public_ip_3 = PublicIPAddress.create(
cls.apiclient,
accountid=cls.account.name,
@ -917,8 +905,8 @@ class TestVPCRouterOneNetwork(cloudstackTestCase):
return
def validate_network_rules(self):
""" Validate network rules
"""
""" Validate network rules
"""
vms = VirtualMachine.list(
self.apiclient,
account=self.account.name,
@ -1014,8 +1002,8 @@ class TestVPCRouterOneNetwork(cloudstackTestCase):
@attr(tags=["advanced", "intervlan"])
def test_01_start_stop_router_after_addition_of_one_guest_network(self):
""" Test start/stop of router after addition of one guest network
"""
""" Test start/stop of router after addition of one guest network
"""
# Validations
#1. Create a VPC with cidr - 10.1.1.1/16
#2. Add network1(10.1.1.1/24) to this VPC.
@ -1031,7 +1019,6 @@ class TestVPCRouterOneNetwork(cloudstackTestCase):
self.validate_vpc_offering(self.vpc_off)
self.validate_vpc_network(self.vpc)
#self.validate_network_rules()
self.assertEqual(
isinstance(self.gateways, list),
True,
@ -1063,7 +1050,7 @@ class TestVPCRouterOneNetwork(cloudstackTestCase):
cmd.id = router.id
self.apiclient.stopRouter(cmd)
#List routers to check state of router
#List routers to check state of router
router_response = list_routers(
self.apiclient,
id=router.id
@ -1082,13 +1069,13 @@ class TestVPCRouterOneNetwork(cloudstackTestCase):
self.debug("Stopped the router with ID: %s" % router.id)
# Start The Router
# Start The Router
self.debug("Starting the router with ID: %s" % router.id)
cmd = startRouter.startRouterCmd()
cmd.id = router.id
self.apiclient.startRouter(cmd)
#List routers to check state of router
#List routers to check state of router
router_response = list_routers(
self.apiclient,
id=router.id
@ -1110,8 +1097,8 @@ class TestVPCRouterOneNetwork(cloudstackTestCase):
@attr(tags=["advanced", "intervlan"])
def test_02_reboot_router_after_addition_of_one_guest_network(self):
""" Test reboot of router after addition of one guest network
"""
""" Test reboot of router after addition of one guest network
"""
# Validations
#1. Create a VPC with cidr - 10.1.1.1/16
#2. Add network1(10.1.1.1/24) to this VPC.
@ -1177,8 +1164,8 @@ class TestVPCRouterOneNetwork(cloudstackTestCase):
@attr(tags=["advanced", "intervlan"])
def test_03_destroy_router_after_addition_of_one_guest_network(self):
""" Test destroy of router after addition of one guest network
"""
""" Test destroy of router after addition of one guest network
"""
# Validations
#1. Create a VPC with cidr - 10.1.1.1/16
#2. Add network1(10.1.1.1/24) to this VPC.
@ -1236,8 +1223,8 @@ class TestVPCRouterOneNetwork(cloudstackTestCase):
@attr(tags=["advanced", "intervlan"])
def test_04_migrate_router_after_addition_of_one_guest_network(self):
""" Test migrate of router after addition of one guest network
"""
""" Test migrate of router after addition of one guest network
"""
# Validations
#1. Create a VPC with cidr - 10.1.1.1/16
#2. Add network1(10.1.1.1/24) to this VPC.
@ -1275,12 +1262,12 @@ class TestVPCRouterOneNetwork(cloudstackTestCase):
"List Routers should return a valid list"
)
self.migrate_router(routers[0])
return
return
@attr(tags=["advanced", "intervlan"])
def test_05_chg_srv_off_router_after_addition_of_one_guest_network(self):
""" Test to change service offering of router after addition of one guest network
"""
""" Test to change service offering of router after addition of one guest network
"""
# Validations
#1. Create a VPC with cidr - 10.1.1.1/16
#2. Add network1(10.1.1.1/24) to this VPC.
@ -1332,7 +1319,7 @@ class TestVPCRouterOneNetwork(cloudstackTestCase):
)
self.debug("Changing service offering for the Router %s" % router.id)
try:
router = Router.change_service_offering(self.apiclient,
router = Router.change_service_offering(self.apiclient,
router.id,
service_offering.id
)
@ -1353,5 +1340,4 @@ class TestVPCRouterOneNetwork(cloudstackTestCase):
"Changing service offering failed as id is %s and expected"
"is %s" % (router.serviceofferingid, service_offering.id)
)
return
return

View File

@ -23,30 +23,30 @@ from nose.plugins.attrib import attr
class Services:
def __init__(self):
self.services = {
"account": {
"email": "test@test.com",
"firstname": "Test",
"lastname": "User",
"username": "test",
# Random characters are appended for unique
# username
"password": "password",
},
"service_offering": {
"name": "Planner Service Offering",
"displaytext": "Planner Service Offering",
"cpunumber": 1,
"cpuspeed": 100,
# in MHz
"memory": 128,
# In MBs
},
"ostype": 'CentOS 5.3 (64-bit)',
"virtual_machine": {
"hypervisor": "XenServer",
}
}
self.services = {
"account": {
"email": "test@test.com",
"firstname": "Test",
"lastname": "User",
"username": "test",
# Random characters are appended for unique
# username
"password": "password",
},
"service_offering": {
"name": "Planner Service Offering",
"displaytext": "Planner Service Offering",
"cpunumber": 1,
"cpuspeed": 100,
# in MHz
"memory": 128,
# In MBs
},
"ostype": 'CentOS 5.3 (64-bit)',
"virtual_machine": {
"hypervisor": "XenServer",
}
}
class TestDeployVmWithVariedPlanners(cloudstackTestCase):

View File

@ -669,7 +669,7 @@ class TestLoadBalancingRule(cloudstackTestCase):
self.debug(
"SSH into VM (IPaddress: %s) & NAT Rule (Public IP: %s)" %
(self.vm_1.ipaddress, src_nat_ip_addr.ipaddress)
)
)
ssh_1 = remoteSSHClient(
src_nat_ip_addr.ipaddress,
@ -804,20 +804,20 @@ class TestLoadBalancingRule(cloudstackTestCase):
)
hostnames = []
self.try_ssh(src_nat_ip_addr, hostnames)
self.try_ssh(src_nat_ip_addr, hostnames)
self.try_ssh(src_nat_ip_addr, hostnames)
self.try_ssh(src_nat_ip_addr, hostnames)
self.try_ssh(src_nat_ip_addr, hostnames)
hostnames = []
self.try_ssh(src_nat_ip_addr, hostnames)
self.try_ssh(src_nat_ip_addr, hostnames)
self.try_ssh(src_nat_ip_addr, hostnames)
self.try_ssh(src_nat_ip_addr, hostnames)
self.try_ssh(src_nat_ip_addr, hostnames)
self.debug("Hostnames: %s" % str(hostnames))
self.assertIn(
self.debug("Hostnames: %s" % str(hostnames))
self.assertIn(
self.vm_1.name,
hostnames,
"Check if ssh succeeded for server1"
)
self.assertIn(
self.assertIn(
self.vm_2.name,
hostnames,
"Check if ssh succeeded for server2"
@ -826,8 +826,8 @@ class TestLoadBalancingRule(cloudstackTestCase):
#SSH should pass till there is a last VM associated with LB rule
lb_rule.remove(self.apiclient, [self.vm_2])
# making hostnames list empty
hostnames[:] = []
# making hostnames list empty
hostnames[:] = []
try:
self.debug("SSHing into IP address: %s after removing VM (ID: %s)" %
@ -837,13 +837,11 @@ class TestLoadBalancingRule(cloudstackTestCase):
))
self.try_ssh(src_nat_ip_addr, hostnames)
self.assertIn(
self.vm_1.name,
hostnames,
"Check if ssh succeeded for server1"
)
self.assertIn(
self.vm_1.name,
hostnames,
"Check if ssh succeeded for server1"
)
except Exception as e:
self.fail("%s: SSH failed for VM with IP Address: %s" %
(e, src_nat_ip_addr.ipaddress))
@ -958,23 +956,23 @@ class TestLoadBalancingRule(cloudstackTestCase):
)
try:
hostnames = []
self.try_ssh(self.non_src_nat_ip, hostnames)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.debug("Hostnames: %s" % str(hostnames))
self.assertIn(
self.vm_1.name,
hostnames,
"Check if ssh succeeded for server1"
)
self.assertIn(
self.vm_2.name,
hostnames,
"Check if ssh succeeded for server2"
)
self.debug("Hostnames: %s" % str(hostnames))
self.assertIn(
self.vm_1.name,
hostnames,
"Check if ssh succeeded for server1"
)
self.assertIn(
self.vm_2.name,
hostnames,
"Check if ssh succeeded for server2"
)
#SSH should pass till there is a last VM associated with LB rule
lb_rule.remove(self.apiclient, [self.vm_2])
@ -984,19 +982,16 @@ class TestLoadBalancingRule(cloudstackTestCase):
self.non_src_nat_ip.ipaddress.ipaddress,
self.vm_2.id
))
# Making host list empty
# Making host list empty
hostnames[:] = []
self.try_ssh(self.non_src_nat_ip, hostnames)
self.assertIn(
self.vm_1.name,
hostnames,
"Check if ssh succeeded for server1"
)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.assertIn(
self.vm_1.name,
hostnames,
"Check if ssh succeeded for server1"
)
self.debug("Hostnames after removing VM2: %s" % str(hostnames))
except Exception as e:
self.fail("%s: SSH failed for VM with IP Address: %s" %
(e, self.non_src_nat_ip.ipaddress.ipaddress))
@ -1017,7 +1012,6 @@ class TestLoadBalancingRule(cloudstackTestCase):
ssh_1.execute("hostname")[0]
return
class TestRebootRouter(cloudstackTestCase):
def setUp(self):
@ -1336,31 +1330,29 @@ class TestAssignRemoveLB(cloudstackTestCase):
)
lb_rule.assign(self.apiclient, [self.vm_1, self.vm_2])
hostnames = []
self.try_ssh(self.non_src_nat_ip, hostnames)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.debug("Hostnames: %s" % str(hostnames))
self.assertIn(
self.vm_1.name,
hostnames,
"Check if ssh succeeded for server1"
)
self.assertIn(
self.vm_2.name,
hostnames,
"Check if ssh succeeded for server2"
)
hostnames = []
self.try_ssh(self.non_src_nat_ip, hostnames)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.debug("Hostnames: %s" % str(hostnames))
self.assertIn(
self.vm_1.name,
hostnames,
"Check if ssh succeeded for server1"
)
self.assertIn(
self.vm_2.name,
hostnames,
"Check if ssh succeeded for server2"
)
#Removing VM and assigning another VM to LB rule
lb_rule.remove(self.apiclient, [self.vm_2])
# making hostnames list empty
hostnames[:] = []
# making hostnames list empty
hostnames[:] = []
try:
self.debug("SSHing again into IP address: %s with VM (ID: %s) added to LB rule" %
@ -1370,38 +1362,35 @@ class TestAssignRemoveLB(cloudstackTestCase):
))
self.try_ssh(self.non_src_nat_ip, hostnames)
self.assertIn(
self.vm_1.name,
hostnames,
"Check if ssh succeeded for server1"
)
self.assertIn(
self.vm_1.name,
hostnames,
"Check if ssh succeeded for server1"
)
except Exception as e:
self.fail("SSH failed for VM with IP: %s" %
self.non_src_nat_ip.ipaddress)
lb_rule.assign(self.apiclient, [self.vm_3])
# Making hostnames list empty
# Making hostnames list empty
hostnames[:] = []
self.try_ssh(self.non_src_nat_ip, hostnames)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.debug("Hostnames: %s" % str(hostnames))
self.assertIn(
self.vm_1.name,
hostnames,
"Check if ssh succeeded for server1"
)
self.assertIn(
self.vm_3.name,
hostnames,
"Check if ssh succeeded for server3"
)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.try_ssh(self.non_src_nat_ip, hostnames)
self.debug("Hostnames: %s" % str(hostnames))
self.assertIn(
self.vm_1.name,
hostnames,
"Check if ssh succeeded for server1"
)
self.assertIn(
self.vm_3.name,
hostnames,
"Check if ssh succeeded for server3"
)
return
class TestReleaseIP(cloudstackTestCase):

View File

@ -29,282 +29,283 @@ class Services:
"""
def __init__(self):
self.services = {
"account": {
"email": "test@test.com",
"firstname": "Test",
"lastname": "User",
"username": "test",
# Random characters are appended for unique
# username
"password": "password",
},
"service_offering": {
"name": "Tiny Instance",
"displaytext": "Tiny Instance",
"cpunumber": 1,
"cpuspeed": 200, # in MHz
"memory": 256, # In MBs
},
"server": {
"displayname": "TestVM",
"username": "root",
"password": "password",
"ssh_port": 22,
"hypervisor": 'XenServer',
"privateport": 22,
"publicport": 22,
"protocol": 'TCP',
},
"mgmt_server": {
"ipaddress": '1.2.2.152',
"username": "root",
"password": "password",
"port": 22,
},
"templates": {
"displaytext": 'Template',
"name": 'Template',
"ostype": "CentOS 5.3 (64-bit)",
"templatefilter": 'self',
},
"test_dir": "/tmp",
"random_data": "random.data",
"snapshot_name":"TestSnapshot",
"snapshot_displaytext":"Test",
"ostype": "CentOS 5.3 (64-bit)",
"sleep": 60,
"timeout": 10,
"mode": 'advanced', # Networking mode: Advanced, Basic
}
self.services = {
"account": {
"email": "test@test.com",
"firstname": "Test",
"lastname": "User",
"username": "test",
# Random characters are appended for unique
# username
"password": "password",
},
"service_offering": {
"name": "Tiny Instance",
"displaytext": "Tiny Instance",
"cpunumber": 1,
"cpuspeed": 200, # in MHz
"memory": 256, # In MBs
},
"server": {
"displayname": "TestVM",
"username": "root",
"password": "password",
"ssh_port": 22,
"hypervisor": 'XenServer',
"privateport": 22,
"publicport": 22,
"protocol": 'TCP',
},
"mgmt_server": {
"ipaddress": '1.2.2.152',
"username": "root",
"password": "password",
"port": 22,
},
"templates": {
"displaytext": 'Template',
"name": 'Template',
"ostype": "CentOS 5.3 (64-bit)",
"templatefilter": 'self',
},
"test_dir": "/tmp",
"random_data": "random.data",
"snapshot_name": "TestSnapshot",
"snapshot_displaytext": "Test",
"ostype": "CentOS 5.3 (64-bit)",
"sleep": 60,
"timeout": 10,
"mode": 'advanced', # Networking mode: Advanced, Basic
}
class TestVmSnapshot(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = super(TestVmSnapshot, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
cls.zone = get_zone(cls.api_client, cls.services)
cls.api_client = super(TestVmSnapshot, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
cls.zone = get_zone(cls.api_client, cls.services)
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostype"]
)
cls.services["domainid"] = cls.domain.id
cls.services["server"]["zoneid"] = cls.zone.id
cls.services["templates"]["ostypeid"] = template.ostypeid
cls.services["zoneid"] = cls.zone.id
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostype"]
)
cls.services["domainid"] = cls.domain.id
cls.services["server"]["zoneid"] = cls.zone.id
cls.services["templates"]["ostypeid"] = template.ostypeid
cls.services["zoneid"] = cls.zone.id
# Create VMs, NAT Rules etc
cls.account = Account.create(
cls.api_client,
cls.services["account"],
domainid=cls.domain.id
)
# Create VMs, NAT Rules etc
cls.account = Account.create(
cls.api_client,
cls.services["account"],
domainid=cls.domain.id
)
cls.services["account"] = cls.account.name
cls.services["account"] = cls.account.name
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
cls.virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services["server"],
templateid=template.id,
accountid=cls.account.name,
domainid=cls.account.domainid,
serviceofferingid=cls.service_offering.id,
mode=cls.services["mode"]
)
cls.random_data_0 = random_gen(100)
cls._cleanup = [
cls.service_offering,
cls.account,
]
return
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
cls.virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services["server"],
templateid=template.id,
accountid=cls.account.name,
domainid=cls.account.domainid,
serviceofferingid=cls.service_offering.id,
mode=cls.services["mode"]
)
cls.random_data_0 = random_gen(100)
cls._cleanup = [
cls.service_offering,
cls.account,
]
return
@classmethod
def tearDownClass(cls):
try:
# Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
try:
# Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
return
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
return
def tearDown(self):
try:
# Clean up, terminate the created instance, volumes and snapshots
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
try:
# Clean up, terminate the created instance, volumes and snapshots
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
@attr(tags=["advanced", "advancedns", "smoke"])
def test_01_create_vm_snapshots(self):
"""Test to create VM snapshots
"""
"""Test to create VM snapshots
"""
try:
# Login to VM and write data to file system
ssh_client = self.virtual_machine.get_ssh_client()
try:
# Login to VM and write data to file system
ssh_client = self.virtual_machine.get_ssh_client()
cmds = [
"echo %s > %s/%s" % (self.random_data_0, self.services["test_dir"], self.services["random_data"]),
"cat %s/%s" % (self.services["test_dir"], self.services["random_data"])
]
cmds = [
"echo %s > %s/%s" % (self.random_data_0, self.services["test_dir"], self.services["random_data"]),
"cat %s/%s" % (self.services["test_dir"], self.services["random_data"])
]
for c in cmds:
self.debug(c)
result = ssh_client.execute(c)
self.debug(result)
for c in cmds:
self.debug(c)
result = ssh_client.execute(c)
self.debug(result)
except Exception:
self.fail("SSH failed for Virtual machine: %s" %
self.virtual_machine.ipaddress)
self.assertEqual(
self.random_data_0,
result[0],
"Check the random data has be write into temp file!"
)
except Exception:
self.fail("SSH failed for Virtual machine: %s" %
self.virtual_machine.ipaddress)
self.assertEqual(
self.random_data_0,
result[0],
"Check the random data has be write into temp file!"
)
time.sleep(self.services["sleep"])
time.sleep(self.services["sleep"])
vm_snapshot = VmSnapshot.create(
self.apiclient,
self.virtual_machine.id,
"false",
self.services["snapshot_name"],
self.services["snapshot_displaytext"]
)
self.assertEqual(
vm_snapshot.state,
"Ready",
"Check the snapshot of vm is ready!"
)
return
vm_snapshot = VmSnapshot.create(
self.apiclient,
self.virtual_machine.id,
"false",
self.services["snapshot_name"],
self.services["snapshot_displaytext"]
)
self.assertEqual(
vm_snapshot.state,
"Ready",
"Check the snapshot of vm is ready!"
)
return
@attr(tags=["advanced", "advancedns", "smoke"])
def test_02_revert_vm_snapshots(self):
"""Test to revert VM snapshots
"""
"""Test to revert VM snapshots
"""
try:
ssh_client = self.virtual_machine.get_ssh_client()
try:
ssh_client = self.virtual_machine.get_ssh_client()
cmds = [
"rm -rf %s/%s" % (self.services["test_dir"], self.services["random_data"]),
"ls %s/%s" % (self.services["test_dir"], self.services["random_data"])
]
cmds = [
"rm -rf %s/%s" % (self.services["test_dir"], self.services["random_data"]),
"ls %s/%s" % (self.services["test_dir"], self.services["random_data"])
]
for c in cmds:
self.debug(c)
result = ssh_client.execute(c)
self.debug(result)
for c in cmds:
self.debug(c)
result = ssh_client.execute(c)
self.debug(result)
except Exception:
self.fail("SSH failed for Virtual machine: %s" %
self.virtual_machine.ipaddress)
except Exception:
self.fail("SSH failed for Virtual machine: %s" %
self.virtual_machine.ipaddress)
if str(result[0]).index("No such file or directory") == -1:
self.fail("Check the random data has be delete from temp file!")
if str(result[0]).index("No such file or directory") == -1:
self.fail("Check the random data has be delete from temp file!")
time.sleep(self.services["sleep"])
time.sleep(self.services["sleep"])
list_snapshot_response = VmSnapshot.list(self.apiclient,vmid=self.virtual_machine.id,listall=True)
list_snapshot_response = VmSnapshot.list(self.apiclient, vmid=self.virtual_machine.id, listall=True)
self.assertEqual(
isinstance(list_snapshot_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
list_snapshot_response,
None,
"Check if snapshot exists in ListSnapshot"
)
self.assertEqual(
isinstance(list_snapshot_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
list_snapshot_response,
None,
"Check if snapshot exists in ListSnapshot"
)
self.assertEqual(
list_snapshot_response[0].state,
"Ready",
"Check the snapshot of vm is ready!"
)
self.assertEqual(
list_snapshot_response[0].state,
"Ready",
"Check the snapshot of vm is ready!"
)
VmSnapshot.revertToSnapshot(self.apiclient,list_snapshot_response[0].id)
VmSnapshot.revertToSnapshot(self.apiclient, list_snapshot_response[0].id)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.virtual_machine.id
)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.virtual_machine.id
)
self.assertEqual(
list_vm_response[0].state,
"Stopped",
"Check the state of vm is Stopped!"
)
self.assertEqual(
list_vm_response[0].state,
"Stopped",
"Check the state of vm is Stopped!"
)
cmd = startVirtualMachine.startVirtualMachineCmd()
cmd.id = list_vm_response[0].id
self.apiclient.startVirtualMachine(cmd)
cmd = startVirtualMachine.startVirtualMachineCmd()
cmd.id = list_vm_response[0].id
self.apiclient.startVirtualMachine(cmd)
time.sleep(self.services["sleep"])
time.sleep(self.services["sleep"])
try:
ssh_client = self.virtual_machine.get_ssh_client(reconnect=True)
try:
ssh_client = self.virtual_machine.get_ssh_client(reconnect=True)
cmds = [
"cat %s/%s" % (self.services["test_dir"], self.services["random_data"])
]
cmds = [
"cat %s/%s" % (self.services["test_dir"], self.services["random_data"])
]
for c in cmds:
self.debug(c)
result = ssh_client.execute(c)
self.debug(result)
for c in cmds:
self.debug(c)
result = ssh_client.execute(c)
self.debug(result)
except Exception:
self.fail("SSH failed for Virtual machine: %s" %
self.virtual_machine.ipaddress)
except Exception:
self.fail("SSH failed for Virtual machine: %s" %
self.virtual_machine.ipaddress)
self.assertEqual(
self.random_data_0,
result[0],
"Check the random data is equal with the ramdom file!"
)
self.assertEqual(
self.random_data_0,
result[0],
"Check the random data is equal with the ramdom file!"
)
@attr(tags=["advanced", "advancedns", "smoke"])
def test_03_delete_vm_snapshots(self):
"""Test to delete vm snapshots
"""
"""Test to delete vm snapshots
"""
list_snapshot_response = VmSnapshot.list(self.apiclient,vmid=self.virtual_machine.id,listall=True)
list_snapshot_response = VmSnapshot.list(self.apiclient, vmid=self.virtual_machine.id, listall=True)
self.assertEqual(
isinstance(list_snapshot_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
list_snapshot_response,
None,
"Check if snapshot exists in ListSnapshot"
)
VmSnapshot.deleteVMSnapshot(self.apiclient,list_snapshot_response[0].id)
self.assertEqual(
isinstance(list_snapshot_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
list_snapshot_response,
None,
"Check if snapshot exists in ListSnapshot"
)
VmSnapshot.deleteVMSnapshot(self.apiclient, list_snapshot_response[0].id)
time.sleep(self.services["sleep"]*3)
time.sleep(self.services["sleep"] * 3)
list_snapshot_response = VmSnapshot.list(self.apiclient,vmid=self.virtual_machine.id,listall=True)
list_snapshot_response = VmSnapshot.list(self.apiclient, vmid=self.virtual_machine.id, listall=True)
self.assertEqual(
list_snapshot_response,
None,
"Check list vm snapshot has be deleted"
)
self.assertEqual(
list_snapshot_response,
None,
"Check list vm snapshot has be deleted"
)