cloudstack/test/integration/smoke/test_network.py
Prasanna Santhanam a6d13cb799 Unwrap the test docstring for the testrunner
Signed-off-by: Prasanna Santhanam <tsp@apache.org>
2013-05-24 14:24:06 +05:30

1746 lines
72 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
""" BVT tests for Network Life Cycle
"""
#Import Local Modules
import marvin
from marvin.cloudstackException import cloudstackAPIException
from marvin.cloudstackTestCase import *
from marvin.cloudstackAPI import *
from marvin import remoteSSHClient
from marvin.integration.lib.utils import *
from marvin.integration.lib.base import *
from marvin.integration.lib.common import *
from nose.plugins.attrib import attr
#Import System modules
import time
_multiprocess_shared_ = True
class Services:
"""Test Network Services
"""
def __init__(self):
self.services = {
"ostype": "CentOS 5.3 (64-bit)",
# Cent OS 5.3 (64 bit)
"lb_switch_wait": 10,
# Time interval after which LB switches the requests
"sleep": 60,
"timeout":10,
"network_offering": {
"name": 'Test Network offering',
"displaytext": 'Test Network offering',
"guestiptype": 'Isolated',
"supportedservices": 'Dhcp,Dns,SourceNat,PortForwarding',
"traffictype": 'GUEST',
"availability": 'Optional',
"serviceProviderList" : {
"Dhcp": 'VirtualRouter',
"Dns": 'VirtualRouter',
"SourceNat": 'VirtualRouter',
"PortForwarding": 'VirtualRouter',
},
},
"network": {
"name": "Test Network",
"displaytext": "Test Network",
},
"service_offering": {
"name": "Tiny Instance",
"displaytext": "Tiny Instance",
"cpunumber": 1,
"cpuspeed": 100,
# in MHz
"memory": 256,
# In MBs
},
"account": {
"email": "test@test.com",
"firstname": "Test",
"lastname": "User",
"username": "test",
"password": "password",
},
"server":
{
"displayname": "Small Instance",
"username": "root",
"password": "password",
"hypervisor": 'XenServer',
"privateport": 22,
"publicport": 22,
"ssh_port": 22,
"protocol": 'TCP',
},
"natrule":
{
"privateport": 22,
"publicport": 22,
"protocol": "TCP"
},
"lbrule":
{
"name": "SSH",
"alg": "roundrobin",
# Algorithm used for load balancing
"privateport": 22,
"publicport": 2222,
"protocol": 'TCP'
}
}
class TestPublicIP(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.services = Services().services
@classmethod
def setUpClass(cls):
cls.api_client = super(TestPublicIP, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
cls.zone = get_zone(cls.api_client, cls.services)
cls.services['mode'] = cls.zone.networktype
# Create Accounts & networks
cls.account = Account.create(
cls.api_client,
cls.services["account"],
admin=True,
domainid=cls.domain.id
)
cls.user = Account.create(
cls.api_client,
cls.services["account"],
domainid=cls.domain.id
)
cls.services["network"]["zoneid"] = cls.zone.id
cls.network_offering = NetworkOffering.create(
cls.api_client,
cls.services["network_offering"],
)
# Enable Network offering
cls.network_offering.update(cls.api_client, state='Enabled')
cls.services["network"]["networkoffering"] = cls.network_offering.id
cls.account_network = Network.create(
cls.api_client,
cls.services["network"],
cls.account.name,
cls.account.domainid
)
cls.user_network = Network.create(
cls.api_client,
cls.services["network"],
cls.user.name,
cls.user.domainid
)
# Create Source NAT IP addresses
account_src_nat_ip = PublicIPAddress.create(
cls.api_client,
cls.account.name,
cls.zone.id,
cls.account.domainid
)
user_src_nat_ip = PublicIPAddress.create(
cls.api_client,
cls.user.name,
cls.zone.id,
cls.user.domainid
)
cls._cleanup = [
cls.account_network,
cls.user_network,
cls.account,
cls.user,
cls.network_offering
]
return
@classmethod
def tearDownClass(cls):
try:
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
@attr(tags = ["advanced", "advancedns", "smoke"])
def test_public_ip_admin_account(self):
"""Test for Associate/Disassociate public IP address for admin account"""
# Validate the following:
# 1. listPubliIpAddresses API returns the list of acquired addresses
# 2. the returned list should contain our acquired IP address
ip_address = PublicIPAddress.create(
self.apiclient,
self.account.name,
self.zone.id,
self.account.domainid
)
list_pub_ip_addr_resp = list_publicIP(
self.apiclient,
id=ip_address.ipaddress.id
)
self.assertEqual(
isinstance(list_pub_ip_addr_resp, list),
True,
"Check list response returns a valid list"
)
#listPublicIpAddresses should return newly created public IP
self.assertNotEqual(
len(list_pub_ip_addr_resp),
0,
"Check if new IP Address is associated"
)
self.assertEqual(
list_pub_ip_addr_resp[0].id,
ip_address.ipaddress.id,
"Check Correct IP Address is returned in the List Cacls"
)
ip_address.delete(self.apiclient)
# Validate the following:
# 1.listPublicIpAddresses should no more return the released address
list_pub_ip_addr_resp = list_publicIP(
self.apiclient,
id=ip_address.ipaddress.id
)
self.assertEqual(
list_pub_ip_addr_resp,
None,
"Check if disassociated IP Address is no longer available"
)
return
@attr(tags = ["advanced", "advancedns", "smoke"])
def test_public_ip_user_account(self):
"""Test for Associate/Disassociate public IP address for user account"""
# Validate the following:
# 1. listPubliIpAddresses API returns the list of acquired addresses
# 2. the returned list should contain our acquired IP address
ip_address = PublicIPAddress.create(
self.apiclient,
self.user.name,
self.zone.id,
self.user.domainid
)
#listPublicIpAddresses should return newly created public IP
list_pub_ip_addr_resp = list_publicIP(
self.apiclient,
id=ip_address.ipaddress.id
)
self.assertEqual(
isinstance(list_pub_ip_addr_resp, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_pub_ip_addr_resp),
0,
"Check if new IP Address is associated"
)
self.assertEqual(
list_pub_ip_addr_resp[0].id,
ip_address.ipaddress.id,
"Check Correct IP Address is returned in the List Call"
)
ip_address.delete(self.apiclient)
list_pub_ip_addr_resp = list_publicIP(
self.apiclient,
id=ip_address.ipaddress.id
)
self.assertEqual(
list_pub_ip_addr_resp,
None,
"Check if disassociated IP Address is no longer available"
)
return
class TestPortForwarding(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = super(TestPortForwarding, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
cls.zone = get_zone(cls.api_client, cls.services)
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostype"]
)
#Create an account, network, VM and IP addresses
cls.account = Account.create(
cls.api_client,
cls.services["account"],
admin=True,
domainid=cls.domain.id
)
cls.services["server"]["zoneid"] = cls.zone.id
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
cls.virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services["server"],
templateid=template.id,
accountid=cls.account.name,
domainid=cls.account.domainid,
serviceofferingid=cls.service_offering.id
)
cls._cleanup = [
cls.virtual_machine,
cls.account,
cls.service_offering
]
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.cleanup = []
return
@classmethod
def tearDownClass(cls):
try:
cls.api_client = super(TestPortForwarding, cls).getClsTestClient().getApiClient()
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
def tearDown(self):
cleanup_resources(self.apiclient, self.cleanup)
return
@attr(tags = ["advanced", "advancedns", "smoke"])
def test_01_port_fwd_on_src_nat(self):
"""Test for port forwarding on source NAT"""
#Validate the following:
#1. listPortForwarding rules API should return the added PF rule
#2. attempt to do an ssh into the user VM through the sourceNAT
src_nat_ip_addrs = list_publicIP(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(src_nat_ip_addrs, list),
True,
"Check list response returns a valid list"
)
src_nat_ip_addr = src_nat_ip_addrs[0]
# Check if VM is in Running state before creating NAT rule
vm_response = VirtualMachine.list(
self.apiclient,
id=self.virtual_machine.id
)
self.assertEqual(
isinstance(vm_response, list),
True,
"Check list VM returns a valid list"
)
self.assertNotEqual(
len(vm_response),
0,
"Check Port Forwarding Rule is created"
)
self.assertEqual(
vm_response[0].state,
'Running',
"VM state should be Running before creating a NAT rule."
)
# Open up firewall port for SSH
fw_rule = FireWallRule.create(
self.apiclient,
ipaddressid=src_nat_ip_addr.id,
protocol=self.services["natrule"]["protocol"],
cidrlist=['0.0.0.0/0'],
startport=self.services["natrule"]["publicport"],
endport=self.services["natrule"]["publicport"]
)
#Create NAT rule
nat_rule = NATRule.create(
self.apiclient,
self.virtual_machine,
self.services["natrule"],
src_nat_ip_addr.id
)
list_nat_rule_response = list_nat_rules(
self.apiclient,
id=nat_rule.id
)
self.assertEqual(
isinstance(list_nat_rule_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_nat_rule_response),
0,
"Check Port Forwarding Rule is created"
)
self.assertEqual(
list_nat_rule_response[0].id,
nat_rule.id,
"Check Correct Port forwarding Rule is returned"
)
#SSH virtual machine to test port forwarding
try:
self.debug("SSHing into VM with IP address %s with NAT IP %s" %
(
self.virtual_machine.ipaddress,
src_nat_ip_addr.ipaddress
))
self.virtual_machine.get_ssh_client(src_nat_ip_addr.ipaddress)
except Exception as e:
self.fail(
"SSH Access failed for %s: %s" % \
(self.virtual_machine.ipaddress, e)
)
nat_rule.delete(self.apiclient)
try:
list_nat_rule_response = list_nat_rules(
self.apiclient,
id=nat_rule.id
)
except cloudstackAPIException:
self.debug("Nat Rule is deleted")
# Check if the Public SSH port is inaccessible
with self.assertRaises(Exception):
self.debug(
"SSHing into VM with IP address %s after NAT rule deletion" %
self.virtual_machine.ipaddress)
remoteSSHClient(
src_nat_ip_addr.ipaddress,
self.virtual_machine.ssh_port,
self.virtual_machine.username,
self.virtual_machine.password
)
return
@attr(tags = ["advanced", "advancedns", "smoke"])
def test_02_port_fwd_on_non_src_nat(self):
"""Test for port forwarding on non source NAT"""
#Validate the following:
#1. listPortForwardingRules should not return the deleted rule anymore
#2. attempt to do ssh should now fail
ip_address = PublicIPAddress.create(
self.apiclient,
self.account.name,
self.zone.id,
self.account.domainid,
self.services["server"]
)
self.cleanup.append(ip_address)
# Check if VM is in Running state before creating NAT rule
vm_response = VirtualMachine.list(
self.apiclient,
id=self.virtual_machine.id
)
self.assertEqual(
isinstance(vm_response, list),
True,
"Check list VM returns a valid list"
)
self.assertNotEqual(
len(vm_response),
0,
"Check Port Forwarding Rule is created"
)
self.assertEqual(
vm_response[0].state,
'Running',
"VM state should be Running before creating a NAT rule."
)
# Open up firewall port for SSH
fw_rule = FireWallRule.create(
self.apiclient,
ipaddressid=ip_address.ipaddress.id,
protocol=self.services["natrule"]["protocol"],
cidrlist=['0.0.0.0/0'],
startport=self.services["natrule"]["publicport"],
endport=self.services["natrule"]["publicport"]
)
#Create NAT rule
nat_rule = NATRule.create(
self.apiclient,
self.virtual_machine,
self.services["natrule"],
ip_address.ipaddress.id
)
#Validate the following:
#1. listPortForwardingRules should not return the deleted rule anymore
#2. attempt to do ssh should now fail
list_nat_rule_response = list_nat_rules(
self.apiclient,
id=nat_rule.id
)
self.assertEqual(
isinstance(list_nat_rule_response, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(list_nat_rule_response),
0,
"Check Port Forwarding Rule is created"
)
self.assertEqual(
list_nat_rule_response[0].id,
nat_rule.id,
"Check Correct Port forwarding Rule is returned"
)
try:
self.debug("SSHing into VM with IP address %s with NAT IP %s" %
(
self.virtual_machine.ipaddress,
ip_address.ipaddress
))
self.virtual_machine.get_ssh_client(ip_address.ipaddress)
except Exception as e:
self.fail(
"SSH Access failed for %s: %s" % \
(self.virtual_machine.ipaddress, e)
)
nat_rule.delete(self.apiclient)
try:
list_nat_rule_response = list_nat_rules(
self.apiclient,
id=nat_rule.id
)
except cloudstackAPIException:
self.debug("Nat Rule is deleted")
# Check if the Public SSH port is inaccessible
with self.assertRaises(Exception):
self.debug(
"SSHing into VM with IP address %s after NAT rule deletion" %
self.virtual_machine.ipaddress)
remoteSSHClient(
ip_address.ipaddress,
self.virtual_machine.ssh_port,
self.virtual_machine.username,
self.virtual_machine.password
)
return
class TestLoadBalancingRule(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.api_client = super(TestLoadBalancingRule, cls).getClsTestClient().getApiClient()
cls.services = Services().services
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
cls.zone = get_zone(cls.api_client, cls.services)
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostype"]
)
cls.services["server"]["zoneid"] = cls.zone.id
#Create an account, network, VM and IP addresses
cls.account = Account.create(
cls.api_client,
cls.services["account"],
admin=True,
domainid=cls.domain.id
)
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
cls.vm_1 = VirtualMachine.create(
cls.api_client,
cls.services["server"],
templateid=template.id,
accountid=cls.account.name,
domainid=cls.account.domainid,
serviceofferingid=cls.service_offering.id
)
cls.vm_2 = VirtualMachine.create(
cls.api_client,
cls.services["server"],
templateid=template.id,
accountid=cls.account.name,
domainid=cls.account.domainid,
serviceofferingid=cls.service_offering.id
)
cls.non_src_nat_ip = PublicIPAddress.create(
cls.api_client,
cls.account.name,
cls.zone.id,
cls.account.domainid,
cls.services["server"]
)
# Open up firewall port for SSH
cls.fw_rule = FireWallRule.create(
cls.api_client,
ipaddressid=cls.non_src_nat_ip.ipaddress.id,
protocol=cls.services["lbrule"]["protocol"],
cidrlist=['0.0.0.0/0'],
startport=cls.services["lbrule"]["publicport"],
endport=cls.services["lbrule"]["publicport"]
)
cls._cleanup = [
cls.account,
cls.service_offering
]
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.cleanup = []
return
def tearDown(self):
cleanup_resources(self.apiclient, self.cleanup)
return
@classmethod
def tearDownClass(cls):
cleanup_resources(cls.api_client, cls._cleanup)
return
@attr(tags = ["advanced", "advancedns", "smoke"])
def test_01_create_lb_rule_src_nat(self):
"""Test to create Load balancing rule with source NAT"""
# Validate the Following:
#1. listLoadBalancerRules should return the added rule
#2. attempt to ssh twice on the load balanced IP
#3. verify using the hostname of the VM
# that round robin is indeed happening as expected
src_nat_ip_addrs = list_publicIP(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(src_nat_ip_addrs, list),
True,
"Check list response returns a valid list"
)
src_nat_ip_addr = src_nat_ip_addrs[0]
# Check if VM is in Running state before creating LB rule
vm_response = VirtualMachine.list(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(vm_response, list),
True,
"Check list VM returns a valid list"
)
self.assertNotEqual(
len(vm_response),
0,
"Check Port Forwarding Rule is created"
)
for vm in vm_response:
self.assertEqual(
vm.state,
'Running',
"VM state should be Running before creating a NAT rule."
)
#Create Load Balancer rule and assign VMs to rule
lb_rule = LoadBalancerRule.create(
self.apiclient,
self.services["lbrule"],
src_nat_ip_addr.id,
accountid=self.account.name
)
self.cleanup.append(lb_rule)
lb_rule.assign(self.apiclient, [self.vm_1, self.vm_2])
lb_rules = list_lb_rules(
self.apiclient,
id=lb_rule.id
)
self.assertEqual(
isinstance(lb_rules, list),
True,
"Check list response returns a valid list"
)
#verify listLoadBalancerRules lists the added load balancing rule
self.assertNotEqual(
len(lb_rules),
0,
"Check Load Balancer Rule in its List"
)
self.assertEqual(
lb_rules[0].id,
lb_rule.id,
"Check List Load Balancer Rules returns valid Rule"
)
# listLoadBalancerRuleInstances should list all
# instances associated with that LB rule
lb_instance_rules = list_lb_instances(
self.apiclient,
id=lb_rule.id
)
self.assertEqual(
isinstance(lb_instance_rules, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(lb_instance_rules),
0,
"Check Load Balancer instances Rule in its List"
)
self.debug("lb_instance_rules Ids: %s, %s" % (
lb_instance_rules[0].id,
lb_instance_rules[1].id
))
self.debug("VM ids: %s, %s" % (self.vm_1.id, self.vm_2.id))
self.assertIn(
lb_instance_rules[0].id,
[self.vm_1.id, self.vm_2.id],
"Check List Load Balancer instances Rules returns valid VM ID"
)
self.assertIn(
lb_instance_rules[1].id,
[self.vm_1.id, self.vm_2.id],
"Check List Load Balancer instances Rules returns valid VM ID"
)
try:
self.debug(
"SSH into VM (IPaddress: %s) & NAT Rule (Public IP: %s)" %
(self.vm_1.ipaddress, src_nat_ip_addr.ipaddress)
)
ssh_1 = remoteSSHClient(
src_nat_ip_addr.ipaddress,
self.services['lbrule']["publicport"],
self.vm_1.username,
self.vm_1.password
)
# If Round Robin Algorithm is chosen,
# each ssh command should alternate between VMs
hostnames = [ssh_1.execute("hostname")[0]]
except Exception as e:
self.fail("%s: SSH failed for VM with IP Address: %s" %
(e, src_nat_ip_addr.ipaddress))
time.sleep(self.services["lb_switch_wait"])
try:
self.debug("SSHing into IP address: %s after adding VMs (ID: %s , %s)" %
(
src_nat_ip_addr.ipaddress,
self.vm_1.id,
self.vm_2.id
))
ssh_2 = remoteSSHClient(
src_nat_ip_addr.ipaddress,
self.services['lbrule']["publicport"],
self.vm_1.username,
self.vm_1.password
)
hostnames.append(ssh_2.execute("hostname")[0])
except Exception as e:
self.fail("%s: SSH failed for VM with IP Address: %s" %
(e, src_nat_ip_addr.ipaddress))
self.debug("Hostnames: %s" % str(hostnames))
self.assertIn(
self.vm_1.name,
hostnames,
"Check if ssh succeeded for server1"
)
self.assertIn(
self.vm_2.name,
hostnames,
"Check if ssh succeeded for server2"
)
#SSH should pass till there is a last VM associated with LB rule
lb_rule.remove(self.apiclient, [self.vm_2])
try:
self.debug("SSHing into IP address: %s after removing VM (ID: %s)" %
(
src_nat_ip_addr.ipaddress,
self.vm_2.id
))
ssh_1 = remoteSSHClient(
src_nat_ip_addr.ipaddress,
self.services['lbrule']["publicport"],
self.vm_1.username,
self.vm_1.password
)
hostnames.append(ssh_1.execute("hostname")[0])
except Exception as e:
self.fail("%s: SSH failed for VM with IP Address: %s" %
(e, src_nat_ip_addr.ipaddress))
self.assertIn(
self.vm_1.name,
hostnames,
"Check if ssh succeeded for server1"
)
lb_rule.remove(self.apiclient, [self.vm_1])
with self.assertRaises(Exception):
self.debug("Removed all VMs, trying to SSH")
ssh_1 = remoteSSHClient(
src_nat_ip_addr.ipaddress,
self.services['lbrule']["publicport"],
self.vm_1.username,
self.vm_1.password
)
ssh_1.execute("hostname")[0]
return
@attr(tags = ["advanced", "advancedns", "smoke"])
def test_02_create_lb_rule_non_nat(self):
"""Test to create Load balancing rule with source NAT"""
# Validate the Following:
#1. listLoadBalancerRules should return the added rule
#2. attempt to ssh twice on the load balanced IP
#3. verify using the hostname of the VM that
# round robin is indeed happening as expected
# Check if VM is in Running state before creating LB rule
vm_response = VirtualMachine.list(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(vm_response, list),
True,
"Check list VM returns a valid list"
)
self.assertNotEqual(
len(vm_response),
0,
"Check Port Forwarding Rule is created"
)
for vm in vm_response:
self.assertEqual(
vm.state,
'Running',
"VM state should be Running before creating a NAT rule."
)
#Create Load Balancer rule and assign VMs to rule
lb_rule = LoadBalancerRule.create(
self.apiclient,
self.services["lbrule"],
self.non_src_nat_ip.ipaddress.id,
accountid=self.account.name
)
self.cleanup.append(lb_rule)
lb_rule.assign(self.apiclient, [self.vm_1, self.vm_2])
lb_rules = list_lb_rules(
self.apiclient,
id=lb_rule.id
)
self.assertEqual(
isinstance(lb_rules, list),
True,
"Check list response returns a valid list"
)
#verify listLoadBalancerRules lists the added load balancing rule
self.assertNotEqual(
len(lb_rules),
0,
"Check Load Balancer Rule in its List"
)
self.assertEqual(
lb_rules[0].id,
lb_rule.id,
"Check List Load Balancer Rules returns valid Rule"
)
# listLoadBalancerRuleInstances should list
# all instances associated with that LB rule
lb_instance_rules = list_lb_instances(
self.apiclient,
id=lb_rule.id
)
self.assertEqual(
isinstance(lb_instance_rules, list),
True,
"Check list response returns a valid list"
)
self.assertNotEqual(
len(lb_instance_rules),
0,
"Check Load Balancer instances Rule in its List"
)
self.assertIn(
lb_instance_rules[0].id,
[self.vm_1.id, self.vm_2.id],
"Check List Load Balancer instances Rules returns valid VM ID"
)
self.assertIn(
lb_instance_rules[1].id,
[self.vm_1.id, self.vm_2.id],
"Check List Load Balancer instances Rules returns valid VM ID"
)
try:
self.debug("SSHing into IP address: %s after adding VMs (ID: %s , %s)" %
(
self.non_src_nat_ip.ipaddress.ipaddress,
self.vm_1.id,
self.vm_2.id
))
ssh_1 = remoteSSHClient(
self.non_src_nat_ip.ipaddress.ipaddress,
self.services['lbrule']["publicport"],
self.vm_1.username,
self.vm_1.password
)
# If Round Robin Algorithm is chosen,
# each ssh command should alternate between VMs
hostnames = [ssh_1.execute("hostname")[0]]
time.sleep(self.services["lb_switch_wait"])
self.debug("SSHing again into IP address: %s with VMs (ID: %s , %s) added to LB rule" %
(
self.non_src_nat_ip.ipaddress.ipaddress,
self.vm_1.id,
self.vm_2.id
))
ssh_2 = remoteSSHClient(
self.non_src_nat_ip.ipaddress.ipaddress,
self.services['lbrule']["publicport"],
self.vm_1.username,
self.vm_1.password
)
hostnames.append(ssh_2.execute("hostname")[0])
self.debug("Hostnames after adding 2 VMs to LB rule: %s" % str(hostnames))
self.assertIn(
self.vm_1.name,
hostnames,
"Check if ssh succeeded for server1"
)
self.assertIn(
self.vm_2.name,
hostnames,
"Check if ssh succeeded for server2"
)
#SSH should pass till there is a last VM associated with LB rule
lb_rule.remove(self.apiclient, [self.vm_2])
self.debug("SSHing into IP address: %s after removing VM (ID: %s) from LB rule" %
(
self.non_src_nat_ip.ipaddress.ipaddress,
self.vm_2.id
))
ssh_1 = remoteSSHClient(
self.non_src_nat_ip.ipaddress.ipaddress,
self.services['lbrule']["publicport"],
self.vm_1.username,
self.vm_1.password
)
hostnames.append(ssh_1.execute("hostname")[0])
self.debug("Hostnames after removing VM2: %s" % str(hostnames))
except Exception as e:
self.fail("%s: SSH failed for VM with IP Address: %s" %
(e, self.non_src_nat_ip.ipaddress.ipaddress))
self.assertIn(
self.vm_1.name,
hostnames,
"Check if ssh succeeded for server1"
)
lb_rule.remove(self.apiclient, [self.vm_1])
with self.assertRaises(Exception):
self.debug("SSHing into IP address: %s after removing VM (ID: %s) from LB rule" %
(
self.non_src_nat_ip.ipaddress.ipaddress,
self.vm_1.id
))
ssh_1 = remoteSSHClient(
self.non_src_nat_ip.ipaddress.ipaddress,
self.services['lbrule']["publicport"],
self.vm_1.username,
self.vm_1.password
)
ssh_1.execute("hostname")[0]
return
class TestRebootRouter(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.services = Services().services
# Get Zone, Domain and templates
self.domain = get_domain(self.apiclient, self.services)
self.zone = get_zone(self.apiclient, self.services)
template = get_template(
self.apiclient,
self.zone.id,
self.services["ostype"]
)
self.services["server"]["zoneid"] = self.zone.id
#Create an account, network, VM and IP addresses
self.account = Account.create(
self.apiclient,
self.services["account"],
admin=True,
domainid=self.domain.id
)
self.service_offering = ServiceOffering.create(
self.apiclient,
self.services["service_offering"]
)
self.vm_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
templateid=template.id,
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id
)
src_nat_ip_addrs = list_publicIP(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
try:
src_nat_ip_addr = src_nat_ip_addrs[0]
except Exception as e:
raise Exception("Warning: Exception during fetching source NAT: %s" % e)
self.public_ip = PublicIPAddress.create(
self.apiclient,
self.vm_1.account,
self.vm_1.zoneid,
self.vm_1.domainid,
self.services["server"]
)
# Open up firewall port for SSH
fw_rule = FireWallRule.create(
self.apiclient,
ipaddressid=self.public_ip.ipaddress.id,
protocol=self.services["lbrule"]["protocol"],
cidrlist=['0.0.0.0/0'],
startport=self.services["lbrule"]["publicport"],
endport=self.services["lbrule"]["publicport"]
)
lb_rule = LoadBalancerRule.create(
self.apiclient,
self.services["lbrule"],
src_nat_ip_addr.id,
self.account.name
)
lb_rule.assign(self.apiclient, [self.vm_1])
self.nat_rule = NATRule.create(
self.apiclient,
self.vm_1,
self.services["natrule"],
ipaddressid=self.public_ip.ipaddress.id
)
self.cleanup = [
self.vm_1,
lb_rule,
self.service_offering,
self.nat_rule,
self.account,
]
return
@attr(tags = ["advanced", "advancedns", "smoke"])
def test_reboot_router(self):
"""Test for reboot router"""
#Validate the Following
#1. Post restart PF and LB rules should still function
#2. verify if the ssh into the virtual machine
# still works through the sourceNAT Ip
#Retrieve router for the user account
routers = list_routers(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(routers, list),
True,
"Check list routers returns a valid list"
)
router = routers[0]
self.debug("Rebooting the router (ID: %s)" % router.id)
cmd = rebootRouter.rebootRouterCmd()
cmd.id = router.id
self.apiclient.rebootRouter(cmd)
# Poll listVM to ensure VM is stopped properly
timeout = self.services["timeout"]
while True:
time.sleep(self.services["sleep"])
# Ensure that VM is in stopped state
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.vm_1.id
)
if isinstance(list_vm_response, list):
vm = list_vm_response[0]
if vm.state == 'Running':
self.debug("VM state: %s" % vm.state)
break
if timeout == 0:
raise Exception(
"Failed to start VM (ID: %s) in change service offering" % vm.id)
timeout = timeout - 1
#we should be able to SSH after successful reboot
try:
self.debug("SSH into VM (ID : %s ) after reboot" % self.vm_1.id)
remoteSSHClient(
self.nat_rule.ipaddress.ipaddress,
self.services["natrule"]["publicport"],
self.vm_1.username,
self.vm_1.password
)
except Exception as e:
self.fail(
"SSH Access failed for %s: %s" % \
(self.vm_1.ipaddress, e))
return
def tearDown(self):
cleanup_resources(self.apiclient, self.cleanup)
return
class TestAssignRemoveLB(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.services = Services().services
# Get Zone, Domain and templates
self.domain = get_domain(self.apiclient, self.services)
self.zone = get_zone(self.apiclient, self.services)
template = get_template(
self.apiclient,
self.zone.id,
self.services["ostype"]
)
self.services["server"]["zoneid"] = self.zone.id
#Create VMs, accounts
self.account = Account.create(
self.apiclient,
self.services["account"],
admin=True,
domainid=self.domain.id
)
self.service_offering = ServiceOffering.create(
self.apiclient,
self.services["service_offering"]
)
self.vm_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
templateid=template.id,
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id
)
self.vm_2 = VirtualMachine.create(
self.apiclient,
self.services["server"],
templateid=template.id,
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id
)
self.vm_3 = VirtualMachine.create(
self.apiclient,
self.services["server"],
templateid=template.id,
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id
)
self.cleanup = [
self.account,
self.service_offering
]
return
@attr(tags = ["advanced", "advancedns", "smoke"])
def test_assign_and_removal_lb(self):
"""Test for assign & removing load balancing rule"""
# Validate:
#1. Verify list API - listLoadBalancerRules lists
# all the rules with the relevant ports
#2. listLoadBalancerInstances will list
# the instances associated with the corresponding rule.
#3. verify ssh attempts should pass as long as there
# is at least one instance associated with the rule
src_nat_ip_addrs = list_publicIP(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(src_nat_ip_addrs, list),
True,
"Check list response returns a valid list"
)
self.non_src_nat_ip = src_nat_ip_addrs[0]
# Open up firewall port for SSH
fw_rule = FireWallRule.create(
self.apiclient,
ipaddressid=self.non_src_nat_ip.id,
protocol=self.services["lbrule"]["protocol"],
cidrlist=['0.0.0.0/0'],
startport=self.services["lbrule"]["publicport"],
endport=self.services["lbrule"]["publicport"]
)
# Check if VM is in Running state before creating LB rule
vm_response = VirtualMachine.list(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(vm_response, list),
True,
"Check list VM returns a valid list"
)
self.assertNotEqual(
len(vm_response),
0,
"Check Port Forwarding Rule is created"
)
for vm in vm_response:
self.assertEqual(
vm.state,
'Running',
"VM state should be Running before creating a NAT rule."
)
lb_rule = LoadBalancerRule.create(
self.apiclient,
self.services["lbrule"],
self.non_src_nat_ip.id,
self.account.name
)
lb_rule.assign(self.apiclient, [self.vm_1, self.vm_2])
try:
self.debug("SSHing into IP address: %s with VMs (ID: %s , %s) added to LB rule" %
(
self.non_src_nat_ip.ipaddress,
self.vm_1.id,
self.vm_2.id
))
#Create SSH client for each VM
ssh_1 = remoteSSHClient(
self.non_src_nat_ip.ipaddress,
self.services["lbrule"]["publicport"],
self.vm_1.username,
self.vm_1.password
)
except Exception as e:
self.fail("SSH failed for VM with IP: %s" %
self.non_src_nat_ip.ipaddress)
try:
self.debug("SSHing again into IP address: %s with VMs (ID: %s , %s) added to LB rule" %
(
self.non_src_nat_ip.ipaddress,
self.vm_1.id,
self.vm_2.id
))
ssh_2 = remoteSSHClient(
self.non_src_nat_ip.ipaddress,
self.services["lbrule"]["publicport"],
self.vm_2.username,
self.vm_2.password
)
# If Round Robin Algorithm is chosen,
# each ssh command should alternate between VMs
res_1 = ssh_1.execute("hostname")[0]
self.debug(res_1)
time.sleep(self.services["lb_switch_wait"])
res_2 = ssh_2.execute("hostname")[0]
self.debug(res_2)
except Exception as e:
self.fail("SSH failed for VM with IP: %s" %
self.non_src_nat_ip.ipaddress)
self.assertIn(
self.vm_1.name,
res_1,
"Check if ssh succeeded for server1"
)
self.assertIn(
self.vm_2.name,
res_2,
"Check if ssh succeeded for server2"
)
#Removing VM and assigning another VM to LB rule
lb_rule.remove(self.apiclient, [self.vm_2])
try:
self.debug("SSHing again into IP address: %s with VM (ID: %s) added to LB rule" %
(
self.non_src_nat_ip.ipaddress,
self.vm_1.id,
))
# Again make a SSH connection, as previous is not used after LB remove
ssh_1 = remoteSSHClient(
self.non_src_nat_ip.ipaddress,
self.services["lbrule"]["publicport"],
self.vm_1.username,
self.vm_1.password
)
res_1 = ssh_1.execute("hostname")[0]
self.debug(res_1)
except Exception as e:
self.fail("SSH failed for VM with IP: %s" %
self.non_src_nat_ip.ipaddress)
self.assertIn(
self.vm_1.name,
res_1,
"Check if ssh succeeded for server1"
)
lb_rule.assign(self.apiclient, [self.vm_3])
try:
ssh_1 = remoteSSHClient(
self.non_src_nat_ip.ipaddress,
self.services["lbrule"]["publicport"],
self.vm_1.username,
self.vm_1.password
)
ssh_3 = remoteSSHClient(
self.non_src_nat_ip.ipaddress,
self.services["lbrule"]["publicport"],
self.vm_3.username,
self.vm_3.password
)
res_1 = ssh_1.execute("hostname")[0]
self.debug(res_1)
time.sleep(self.services["lb_switch_wait"])
res_3 = ssh_3.execute("hostname")[0]
self.debug(res_3)
except Exception as e:
self.fail("SSH failed for VM with IP: %s" %
self.non_src_nat_ip.ipaddress)
self.assertIn(
self.vm_1.name,
res_1,
"Check if ssh succeeded for server1"
)
self.assertIn(
self.vm_3.name,
res_3,
"Check if ssh succeeded for server3"
)
return
def tearDown(self):
cleanup_resources(self.apiclient, self.cleanup)
return
class TestReleaseIP(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.services = Services().services
# Get Zone, Domain and templates
self.domain = get_domain(self.apiclient, self.services)
self.zone = get_zone(self.apiclient, self.services)
template = get_template(
self.apiclient,
self.zone.id,
self.services["ostype"]
)
self.services["server"]["zoneid"] = self.zone.id
#Create an account, network, VM, Port forwarding rule, LB rules
self.account = Account.create(
self.apiclient,
self.services["account"],
admin=True,
domainid=self.domain.id
)
self.service_offering = ServiceOffering.create(
self.apiclient,
self.services["service_offering"]
)
self.virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["server"],
templateid=template.id,
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id
)
self.ip_address = PublicIPAddress.create(
self.apiclient,
self.account.name,
self.zone.id,
self.account.domainid
)
ip_addrs = list_publicIP(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
try:
self.ip_addr = ip_addrs[0]
except Exception as e:
raise Exception("Failed: During acquiring source NAT for account: %s" %
self.account.name)
self.nat_rule = NATRule.create(
self.apiclient,
self.virtual_machine,
self.services["natrule"],
self.ip_addr.id
)
self.lb_rule = LoadBalancerRule.create(
self.apiclient,
self.services["lbrule"],
self.ip_addr.id,
accountid=self.account.name
)
self.cleanup = [
self.virtual_machine,
self.account
]
return
def tearDown(self):
cleanup_resources(self.apiclient, self.cleanup)
@attr(tags = ["advanced", "advancedns", "smoke"])
def test_releaseIP(self):
"""Test for release public IP address"""
self.debug("Deleting Public IP : %s" % self.ip_addr.id)
self.ip_address.delete(self.apiclient)
# Sleep to ensure that deleted state is reflected in other calls
time.sleep(self.services["sleep"])
# ListPublicIpAddresses should not list deleted Public IP address
list_pub_ip_addr_resp = list_publicIP(
self.apiclient,
id=self.ip_addr.id
)
self.debug("List Public IP response" + str(list_pub_ip_addr_resp))
self.assertEqual(
list_pub_ip_addr_resp,
None,
"Check if disassociated IP Address is no longer available"
)
# ListPortForwardingRules should not list
# associated rules with Public IP address
try:
list_nat_rule = list_nat_rules(
self.apiclient,
id=self.nat_rule.id
)
self.debug("List NAT Rule response" + str(list_nat_rule))
except cloudstackAPIException:
self.debug("Port Forwarding Rule is deleted")
# listLoadBalancerRules should not list
# associated rules with Public IP address
try:
list_lb_rule = list_lb_rules(
self.apiclient,
id=self.lb_rule.id
)
self.debug("List LB Rule response" + str(list_lb_rule))
except cloudstackAPIException:
self.debug("Port Forwarding Rule is deleted")
# SSH Attempt though public IP should fail
with self.assertRaises(Exception):
ssh_2 = remoteSSHClient(
self.ip_addr.ipaddress,
self.services["natrule"]["publicport"],
self.virtual_machine.username,
self.virtual_machine.password
)
return
class TestDeleteAccount(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.services = Services().services
# Get Zone, Domain and templates
self.domain = get_domain(self.apiclient, self.services)
self.zone = get_zone(self.apiclient, self.services)
template = get_template(
self.apiclient,
self.zone.id,
self.services["ostype"]
)
self.services["server"]["zoneid"] = self.zone.id
#Create an account, network, VM and IP addresses
self.account = Account.create(
self.apiclient,
self.services["account"],
admin=True,
domainid=self.domain.id
)
self.service_offering = ServiceOffering.create(
self.apiclient,
self.services["service_offering"]
)
self.vm_1 = VirtualMachine.create(
self.apiclient,
self.services["server"],
templateid=template.id,
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id
)
src_nat_ip_addrs = list_publicIP(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
try:
src_nat_ip_addr = src_nat_ip_addrs[0]
except Exception as e:
self.fail("SSH failed for VM with IP: %s" %
src_nat_ip_addr.ipaddress)
self.lb_rule = LoadBalancerRule.create(
self.apiclient,
self.services["lbrule"],
src_nat_ip_addr.id,
self.account.name
)
self.lb_rule.assign(self.apiclient, [self.vm_1])
self.nat_rule = NATRule.create(
self.apiclient,
self.vm_1,
self.services["natrule"],
src_nat_ip_addr.id
)
self.cleanup = []
return
@attr(tags = ["advanced", "advancedns", "smoke"])
def test_delete_account(self):
"""Test for delete account"""
#Validate the Following
# 1. after account.cleanup.interval (global setting)
# time all the PF/LB rules should be deleted
# 2. verify that list(LoadBalancer/PortForwarding)Rules
# API does not return any rules for the account
# 3. The domR should have been expunged for this account
self.account.delete(self.apiclient)
interval = list_configurations(
self.apiclient,
name='account.cleanup.interval'
)
self.assertEqual(
isinstance(interval, list),
True,
"Check if account.cleanup.interval config present"
)
# Sleep to ensure that all resources are deleted
time.sleep(int(interval[0].value))
# ListLoadBalancerRules should not list
# associated rules with deleted account
# Unable to find account testuser1 in domain 1 : Exception
try:
list_lb_reponse = list_lb_rules(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
except cloudstackAPIException:
self.debug("Port Forwarding Rule is deleted")
# ListPortForwardingRules should not
# list associated rules with deleted account
try:
list_nat_reponse = list_nat_rules(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
except cloudstackAPIException:
self.debug("NATRule is deleted")
#Retrieve router for the user account
try:
routers = list_routers(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
routers,
None,
"Check routers are properly deleted."
)
except Exception as e:
raise Exception(
"Encountered %s raised while fetching routers for account: %s" % (e,
self.account.name))
return
def tearDown(self):
cleanup_resources(self.apiclient, self.cleanup)
return