mirror of
https://github.com/apache/cloudstack.git
synced 2025-10-26 08:42:29 +01:00
This is an extention of #3732 for kvm. This is restricted to ovs > 2.9.2 Since Xen uses ovs 2.6, pvlan is unsupported. This also fixes issues of vms on the same pvlan unable to communicate if they're on the same host
1841 lines
63 KiB
Python
1841 lines
63 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
""" BVT tests for Network Life Cycle
|
|
"""
|
|
# Import Local Modules
|
|
from marvin.codes import (FAILED, STATIC_NAT_RULE, LB_RULE,
|
|
NAT_RULE, PASS)
|
|
from marvin.cloudstackTestCase import cloudstackTestCase
|
|
from marvin.cloudstackException import CloudstackAPIException
|
|
from marvin.cloudstackAPI import rebootRouter
|
|
from marvin.sshClient import SshClient
|
|
from marvin.lib.utils import cleanup_resources, get_process_status
|
|
from marvin.lib.base import (Account,
|
|
VirtualMachine,
|
|
ServiceOffering,
|
|
NATRule,
|
|
PublicIPAddress,
|
|
StaticNATRule,
|
|
FireWallRule,
|
|
Network,
|
|
NetworkOffering,
|
|
LoadBalancerRule,
|
|
Router,
|
|
NIC,
|
|
Cluster)
|
|
from marvin.lib.common import (get_domain,
|
|
get_zone,
|
|
get_test_template,
|
|
list_hosts,
|
|
list_publicIP,
|
|
list_nat_rules,
|
|
list_routers,
|
|
list_virtual_machines,
|
|
list_lb_rules,
|
|
list_configurations,
|
|
verifyGuestTrafficPortGroups)
|
|
from nose.plugins.attrib import attr
|
|
from marvin.lib.decoratorGenerators import skipTestIf
|
|
from ddt import ddt, data
|
|
# Import System modules
|
|
import time
|
|
import logging
|
|
|
|
_multiprocess_shared_ = True
|
|
|
|
logger = logging.getLogger('TestNetworkOps')
|
|
stream_handler = logging.StreamHandler()
|
|
logger.setLevel(logging.DEBUG)
|
|
logger.addHandler(stream_handler)
|
|
|
|
class TestPublicIP(cloudstackTestCase):
|
|
|
|
def setUp(self):
|
|
self.apiclient = self.testClient.getApiClient()
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
testClient = super(TestPublicIP, cls).getClsTestClient()
|
|
cls.apiclient = testClient.getApiClient()
|
|
cls.services = testClient.getParsedTestDataConfig()
|
|
|
|
# Get Zone, Domain and templates
|
|
cls.domain = get_domain(cls.apiclient)
|
|
cls.zone = get_zone(cls.apiclient, testClient.getZoneForTests())
|
|
cls.services['mode'] = cls.zone.networktype
|
|
# Create Accounts & networks
|
|
cls.account = Account.create(
|
|
cls.apiclient,
|
|
cls.services["account"],
|
|
admin=True,
|
|
domainid=cls.domain.id
|
|
)
|
|
|
|
cls.user = Account.create(
|
|
cls.apiclient,
|
|
cls.services["account"],
|
|
domainid=cls.domain.id
|
|
)
|
|
cls.services["network"]["zoneid"] = cls.zone.id
|
|
|
|
cls.network_offering = NetworkOffering.create(
|
|
cls.apiclient,
|
|
cls.services["network_offering"],
|
|
)
|
|
# Enable Network offering
|
|
cls.network_offering.update(cls.apiclient, state='Enabled')
|
|
|
|
cls.services["network"]["networkoffering"] = cls.network_offering.id
|
|
cls.account_network = Network.create(
|
|
cls.apiclient,
|
|
cls.services["network"],
|
|
cls.account.name,
|
|
cls.account.domainid
|
|
)
|
|
cls.user_network = Network.create(
|
|
cls.apiclient,
|
|
cls.services["network"],
|
|
cls.user.name,
|
|
cls.user.domainid
|
|
)
|
|
|
|
cls.service_offering = ServiceOffering.create(
|
|
cls.apiclient,
|
|
cls.services["service_offerings"]["tiny"],
|
|
)
|
|
|
|
cls.hypervisor = testClient.getHypervisorInfo()
|
|
cls.template = get_test_template(
|
|
cls.apiclient,
|
|
cls.zone.id,
|
|
cls.hypervisor
|
|
)
|
|
if cls.template == FAILED:
|
|
assert False, "get_test_template() failed to return template"
|
|
|
|
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
|
|
|
|
cls.account_vm = VirtualMachine.create(
|
|
cls.apiclient,
|
|
cls.services["virtual_machine"],
|
|
templateid=cls.template.id,
|
|
accountid=cls.account.name,
|
|
domainid=cls.account.domainid,
|
|
networkids=cls.account_network.id,
|
|
serviceofferingid=cls.service_offering.id
|
|
)
|
|
|
|
cls.user_vm = VirtualMachine.create(
|
|
cls.apiclient,
|
|
cls.services["virtual_machine"],
|
|
templateid=cls.template.id,
|
|
accountid=cls.user.name,
|
|
domainid=cls.user.domainid,
|
|
networkids=cls.user_network.id,
|
|
serviceofferingid=cls.service_offering.id
|
|
)
|
|
|
|
# Create Source NAT IP addresses
|
|
PublicIPAddress.create(
|
|
cls.apiclient,
|
|
cls.account.name,
|
|
cls.zone.id,
|
|
cls.account.domainid
|
|
)
|
|
PublicIPAddress.create(
|
|
cls.apiclient,
|
|
cls.user.name,
|
|
cls.zone.id,
|
|
cls.user.domainid
|
|
)
|
|
cls._cleanup = [
|
|
cls.account_vm,
|
|
cls.user_vm,
|
|
cls.account_network,
|
|
cls.user_network,
|
|
cls.account,
|
|
cls.user,
|
|
cls.network_offering
|
|
]
|
|
return
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
try:
|
|
# Cleanup resources used
|
|
cleanup_resources(cls.apiclient, cls._cleanup)
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
return
|
|
|
|
@attr(tags=["advanced", "advancedns", "smoke", "dvs"], required_hardware="false")
|
|
def test_public_ip_admin_account(self):
|
|
"""Test for Associate/Disassociate public IP address for admin account"""
|
|
|
|
# Validate the following:
|
|
# 1. listPubliIpAddresses API returns the list of acquired addresses
|
|
# 2. the returned list should contain our acquired IP address
|
|
|
|
ip_address = PublicIPAddress.create(
|
|
self.apiclient,
|
|
self.account.name,
|
|
self.zone.id,
|
|
self.account.domainid
|
|
)
|
|
list_pub_ip_addr_resp = list_publicIP(
|
|
self.apiclient,
|
|
id=ip_address.ipaddress.id
|
|
)
|
|
self.assertEqual(
|
|
isinstance(list_pub_ip_addr_resp, list),
|
|
True,
|
|
"Check list response returns a valid list"
|
|
)
|
|
# listPublicIpAddresses should return newly created public IP
|
|
self.assertNotEqual(
|
|
len(list_pub_ip_addr_resp),
|
|
0,
|
|
"Check if new IP Address is associated"
|
|
)
|
|
self.assertEqual(
|
|
list_pub_ip_addr_resp[0].id,
|
|
ip_address.ipaddress.id,
|
|
"Check Correct IP Address is returned in the List Cacls"
|
|
)
|
|
|
|
ip_address.delete(self.apiclient)
|
|
time.sleep(30)
|
|
|
|
# Validate the following:
|
|
# 1.listPublicIpAddresses should no more return the released address
|
|
list_pub_ip_addr_resp = list_publicIP(
|
|
self.apiclient,
|
|
id=ip_address.ipaddress.id
|
|
)
|
|
if list_pub_ip_addr_resp is None:
|
|
return
|
|
if (list_pub_ip_addr_resp) and (
|
|
isinstance(
|
|
list_pub_ip_addr_resp,
|
|
list)) and (
|
|
len(list_pub_ip_addr_resp) > 0):
|
|
self.fail("list public ip response is not empty")
|
|
return
|
|
|
|
@attr(tags=["advanced", "advancedns", "smoke", "dvs"], required_hardware="false")
|
|
def test_public_ip_user_account(self):
|
|
"""Test for Associate/Disassociate public IP address for user account"""
|
|
|
|
# Validate the following:
|
|
# 1. listPubliIpAddresses API returns the list of acquired addresses
|
|
# 2. the returned list should contain our acquired IP address
|
|
|
|
ip_address = PublicIPAddress.create(
|
|
self.apiclient,
|
|
self.user.name,
|
|
self.zone.id,
|
|
self.user.domainid
|
|
)
|
|
|
|
# listPublicIpAddresses should return newly created public IP
|
|
list_pub_ip_addr_resp = list_publicIP(
|
|
self.apiclient,
|
|
id=ip_address.ipaddress.id
|
|
)
|
|
self.assertEqual(
|
|
isinstance(list_pub_ip_addr_resp, list),
|
|
True,
|
|
"Check list response returns a valid list"
|
|
)
|
|
self.assertNotEqual(
|
|
len(list_pub_ip_addr_resp),
|
|
0,
|
|
"Check if new IP Address is associated"
|
|
)
|
|
self.assertEqual(
|
|
list_pub_ip_addr_resp[0].id,
|
|
ip_address.ipaddress.id,
|
|
"Check Correct IP Address is returned in the List Call"
|
|
)
|
|
|
|
ip_address.delete(self.apiclient)
|
|
|
|
list_pub_ip_addr_resp = list_publicIP(
|
|
self.apiclient,
|
|
id=ip_address.ipaddress.id
|
|
)
|
|
|
|
self.assertEqual(
|
|
list_pub_ip_addr_resp,
|
|
None,
|
|
"Check if disassociated IP Address is no longer available"
|
|
)
|
|
return
|
|
|
|
|
|
class TestPortForwarding(cloudstackTestCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
|
|
testClient = super(TestPortForwarding, cls).getClsTestClient()
|
|
cls.apiclient = testClient.getApiClient()
|
|
cls.services = testClient.getParsedTestDataConfig()
|
|
cls.hypervisor = testClient.getHypervisorInfo()
|
|
# Get Zone, Domain and templates
|
|
cls.domain = get_domain(cls.apiclient)
|
|
cls.zone = get_zone(cls.apiclient, testClient.getZoneForTests())
|
|
template = get_test_template(
|
|
cls.apiclient,
|
|
cls.zone.id,
|
|
cls.hypervisor
|
|
)
|
|
if template == FAILED:
|
|
assert False, "get_test_template() failed to return template"
|
|
|
|
# Create an account, network, VM and IP addresses
|
|
cls.account = Account.create(
|
|
cls.apiclient,
|
|
cls.services["account"],
|
|
admin=True,
|
|
domainid=cls.domain.id
|
|
)
|
|
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
|
|
cls.service_offering = ServiceOffering.create(
|
|
cls.apiclient,
|
|
cls.services["service_offerings"]["tiny"]
|
|
)
|
|
cls.virtual_machine = VirtualMachine.create(
|
|
cls.apiclient,
|
|
cls.services["virtual_machine"],
|
|
templateid=template.id,
|
|
accountid=cls.account.name,
|
|
domainid=cls.account.domainid,
|
|
serviceofferingid=cls.service_offering.id
|
|
)
|
|
cls._cleanup = [
|
|
cls.virtual_machine,
|
|
cls.account,
|
|
cls.service_offering
|
|
]
|
|
|
|
def setUp(self):
|
|
self.apiclient = self.testClient.getApiClient()
|
|
self.cleanup = []
|
|
return
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
try:
|
|
cls.apiclient = super(
|
|
TestPortForwarding,
|
|
cls).getClsTestClient().getApiClient()
|
|
cleanup_resources(cls.apiclient, cls._cleanup)
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
|
|
def tearDown(self):
|
|
cleanup_resources(self.apiclient, self.cleanup)
|
|
return
|
|
|
|
@attr(tags=["advanced", "advancedns", "smoke", "dvs"], required_hardware="true")
|
|
def test_01_port_fwd_on_src_nat(self):
|
|
"""Test for port forwarding on source NAT"""
|
|
|
|
# Validate the following:
|
|
# 1. listPortForwarding rules API should return the added PF rule
|
|
# 2. attempt to do an ssh into the user VM through the sourceNAT
|
|
|
|
src_nat_ip_addrs = list_publicIP(
|
|
self.apiclient,
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
|
|
self.assertEqual(
|
|
isinstance(src_nat_ip_addrs, list),
|
|
True,
|
|
"Check list response returns a valid list"
|
|
)
|
|
src_nat_ip_addr = src_nat_ip_addrs[0]
|
|
|
|
# Check if VM is in Running state before creating NAT rule
|
|
vm_response = VirtualMachine.list(
|
|
self.apiclient,
|
|
id=self.virtual_machine.id
|
|
)
|
|
|
|
self.assertEqual(
|
|
isinstance(vm_response, list),
|
|
True,
|
|
"Check list VM returns a valid list"
|
|
)
|
|
|
|
self.assertNotEqual(
|
|
len(vm_response),
|
|
0,
|
|
"Check Port Forwarding Rule is created"
|
|
)
|
|
self.assertEqual(
|
|
vm_response[0].state,
|
|
'Running',
|
|
"VM state should be Running before creating a NAT rule."
|
|
)
|
|
# Open up firewall port for SSH
|
|
FireWallRule.create(
|
|
self.apiclient,
|
|
ipaddressid=src_nat_ip_addr.id,
|
|
protocol=self.services["natrule"]["protocol"],
|
|
cidrlist=['0.0.0.0/0'],
|
|
startport=self.services["natrule"]["publicport"],
|
|
endport=self.services["natrule"]["publicport"]
|
|
)
|
|
|
|
# Create NAT rule
|
|
nat_rule = NATRule.create(
|
|
self.apiclient,
|
|
self.virtual_machine,
|
|
self.services["natrule"],
|
|
src_nat_ip_addr.id
|
|
)
|
|
|
|
list_nat_rule_response = list_nat_rules(
|
|
self.apiclient,
|
|
id=nat_rule.id
|
|
)
|
|
self.assertEqual(
|
|
isinstance(list_nat_rule_response, list),
|
|
True,
|
|
"Check list response returns a valid list"
|
|
)
|
|
|
|
self.assertNotEqual(
|
|
len(list_nat_rule_response),
|
|
0,
|
|
"Check Port Forwarding Rule is created"
|
|
)
|
|
self.assertEqual(
|
|
list_nat_rule_response[0].id,
|
|
nat_rule.id,
|
|
"Check Correct Port forwarding Rule is returned"
|
|
)
|
|
# SSH virtual machine to test port forwarding
|
|
try:
|
|
logger.debug("SSHing into VM with IP address %s with NAT IP %s" %
|
|
(
|
|
self.virtual_machine.ipaddress,
|
|
src_nat_ip_addr.ipaddress
|
|
))
|
|
|
|
self.virtual_machine.get_ssh_client(src_nat_ip_addr.ipaddress)
|
|
vm_response = VirtualMachine.list(
|
|
self.apiclient,
|
|
id=self.virtual_machine.id
|
|
)
|
|
if vm_response[0].state != 'Running':
|
|
self.fail(
|
|
"State of VM : %s is not found to be Running" % str(
|
|
self.virtual_machine.ipaddress))
|
|
|
|
except Exception as e:
|
|
self.fail(
|
|
"SSH Access failed for %s: %s" %
|
|
(self.virtual_machine.ipaddress, e)
|
|
)
|
|
|
|
try:
|
|
nat_rule.delete(self.apiclient)
|
|
except Exception as e:
|
|
self.fail("NAT Rule Deletion Failed: %s" % e)
|
|
|
|
# NAT rule listing should fail as the nat rule does not exist
|
|
with self.assertRaises(Exception):
|
|
list_nat_rules(self.apiclient,
|
|
id=nat_rule.id)
|
|
|
|
# Check if the Public SSH port is inaccessible
|
|
with self.assertRaises(Exception):
|
|
logger.debug(
|
|
"SSHing into VM with IP address %s after NAT rule deletion" %
|
|
self.virtual_machine.ipaddress)
|
|
|
|
SshClient(
|
|
src_nat_ip_addr.ipaddress,
|
|
self.virtual_machine.ssh_port,
|
|
self.virtual_machine.username,
|
|
self.virtual_machine.password,
|
|
retries=2,
|
|
delay=0
|
|
)
|
|
return
|
|
|
|
@attr(tags=["advanced", "advancedns", "smoke", "dvs"], required_hardware="true")
|
|
def test_02_port_fwd_on_non_src_nat(self):
|
|
"""Test for port forwarding on non source NAT"""
|
|
|
|
# Validate the following:
|
|
# 1. listPortForwardingRules should not return the deleted rule anymore
|
|
# 2. attempt to do ssh should now fail
|
|
|
|
ip_address = PublicIPAddress.create(
|
|
self.apiclient,
|
|
self.account.name,
|
|
self.zone.id,
|
|
self.account.domainid,
|
|
self.services["virtual_machine"]
|
|
)
|
|
self.cleanup.append(ip_address)
|
|
|
|
# Check if VM is in Running state before creating NAT rule
|
|
vm_response = VirtualMachine.list(
|
|
self.apiclient,
|
|
id=self.virtual_machine.id
|
|
)
|
|
|
|
self.assertEqual(
|
|
isinstance(vm_response, list),
|
|
True,
|
|
"Check list VM returns a valid list"
|
|
)
|
|
|
|
self.assertNotEqual(
|
|
len(vm_response),
|
|
0,
|
|
"Check Port Forwarding Rule is created"
|
|
)
|
|
self.assertEqual(
|
|
vm_response[0].state,
|
|
'Running',
|
|
"VM state should be Running before creating a NAT rule."
|
|
)
|
|
# Open up firewall port for SSH
|
|
FireWallRule.create(
|
|
self.apiclient,
|
|
ipaddressid=ip_address.ipaddress.id,
|
|
protocol=self.services["natrule"]["protocol"],
|
|
cidrlist=['0.0.0.0/0'],
|
|
startport=self.services["natrule"]["publicport"],
|
|
endport=self.services["natrule"]["publicport"]
|
|
)
|
|
# Create NAT rule
|
|
nat_rule = NATRule.create(
|
|
self.apiclient,
|
|
self.virtual_machine,
|
|
self.services["natrule"],
|
|
ip_address.ipaddress.id
|
|
)
|
|
# Validate the following:
|
|
# 1. listPortForwardingRules should not return the deleted rule anymore
|
|
# 2. attempt to do ssh should now fail
|
|
|
|
list_nat_rule_response = list_nat_rules(
|
|
self.apiclient,
|
|
id=nat_rule.id
|
|
)
|
|
self.assertEqual(
|
|
isinstance(list_nat_rule_response, list),
|
|
True,
|
|
"Check list response returns a valid list"
|
|
)
|
|
self.assertNotEqual(
|
|
len(list_nat_rule_response),
|
|
0,
|
|
"Check Port Forwarding Rule is created"
|
|
)
|
|
self.assertEqual(
|
|
list_nat_rule_response[0].id,
|
|
nat_rule.id,
|
|
"Check Correct Port forwarding Rule is returned"
|
|
)
|
|
|
|
try:
|
|
logger.debug("SSHing into VM with IP address %s with NAT IP %s" %
|
|
(
|
|
self.virtual_machine.ipaddress,
|
|
ip_address.ipaddress.ipaddress
|
|
))
|
|
self.virtual_machine.get_ssh_client(ip_address.ipaddress.ipaddress)
|
|
except Exception as e:
|
|
self.fail(
|
|
"SSH Access failed for %s: %s" %
|
|
(self.virtual_machine.ipaddress, e)
|
|
)
|
|
|
|
nat_rule.delete(self.apiclient)
|
|
|
|
try:
|
|
list_nat_rule_response = list_nat_rules(
|
|
self.apiclient,
|
|
id=nat_rule.id
|
|
)
|
|
except CloudstackAPIException:
|
|
logger.debug("Nat Rule is deleted")
|
|
|
|
# Check if the Public SSH port is inaccessible
|
|
with self.assertRaises(Exception):
|
|
logger.debug(
|
|
"SSHing into VM with IP address %s after NAT rule deletion" %
|
|
self.virtual_machine.ipaddress)
|
|
|
|
SshClient(
|
|
ip_address.ipaddress.ipaddress,
|
|
self.virtual_machine.ssh_port,
|
|
self.virtual_machine.username,
|
|
self.virtual_machine.password,
|
|
retries=2,
|
|
delay=0
|
|
)
|
|
return
|
|
|
|
@attr(tags=["dvs"], required_hardware="true")
|
|
def test_guest_traffic_port_groups_isolated_network(self):
|
|
""" Verify port groups are created for guest traffic
|
|
used by isolated network """
|
|
|
|
if self.hypervisor.lower() == "vmware":
|
|
response = verifyGuestTrafficPortGroups(self.apiclient,
|
|
self.config,
|
|
self.zone)
|
|
assert response[0] == PASS, response[1]
|
|
|
|
|
|
class TestRebootRouter(cloudstackTestCase):
|
|
|
|
def setUp(self):
|
|
|
|
self.apiclient = self.testClient.getApiClient()
|
|
self.services = self.testClient.getParsedTestDataConfig()
|
|
|
|
# Get Zone, Domain and templates
|
|
self.domain = get_domain(self.apiclient)
|
|
self.zone = get_zone(self.apiclient, self.testClient.getZoneForTests())
|
|
self.hypervisor = self.testClient.getHypervisorInfo()
|
|
template = get_test_template(
|
|
self.apiclient,
|
|
self.zone.id,
|
|
self.hypervisor
|
|
)
|
|
if template == FAILED:
|
|
self.fail("get_test_template() failed to return template")
|
|
|
|
self.services["virtual_machine"]["zoneid"] = self.zone.id
|
|
|
|
# Create an account, network, VM and IP addresses
|
|
self.account = Account.create(
|
|
self.apiclient,
|
|
self.services["account"],
|
|
admin=True,
|
|
domainid=self.domain.id
|
|
)
|
|
self.service_offering = ServiceOffering.create(
|
|
self.apiclient,
|
|
self.services["service_offerings"]["tiny"]
|
|
)
|
|
self.vm_1 = VirtualMachine.create(
|
|
self.apiclient,
|
|
self.services["virtual_machine"],
|
|
templateid=template.id,
|
|
accountid=self.account.name,
|
|
domainid=self.account.domainid,
|
|
serviceofferingid=self.service_offering.id
|
|
)
|
|
|
|
# Wait for VM to come up
|
|
time.sleep(120)
|
|
|
|
src_nat_ip_addrs = list_publicIP(
|
|
self.apiclient,
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
try:
|
|
src_nat_ip_addr = src_nat_ip_addrs[0]
|
|
except Exception as e:
|
|
raise Exception(
|
|
"Warning: Exception during fetching source NAT: %s" %
|
|
e)
|
|
|
|
self.public_ip = PublicIPAddress.create(
|
|
self.apiclient,
|
|
self.vm_1.account,
|
|
self.vm_1.zoneid,
|
|
self.vm_1.domainid,
|
|
self.services["virtual_machine"]
|
|
)
|
|
# Open up firewall port for SSH
|
|
FireWallRule.create(
|
|
self.apiclient,
|
|
ipaddressid=self.public_ip.ipaddress.id,
|
|
protocol=self.services["lbrule"]["protocol"],
|
|
cidrlist=['0.0.0.0/0'],
|
|
startport=self.services["lbrule"]["publicport"],
|
|
endport=self.services["lbrule"]["publicport"]
|
|
)
|
|
|
|
lb_rule = LoadBalancerRule.create(
|
|
self.apiclient,
|
|
self.services["lbrule"],
|
|
src_nat_ip_addr.id,
|
|
self.account.name
|
|
)
|
|
lb_rule.assign(self.apiclient, [self.vm_1])
|
|
self.nat_rule = NATRule.create(
|
|
self.apiclient,
|
|
self.vm_1,
|
|
self.services["natrule"],
|
|
ipaddressid=self.public_ip.ipaddress.id
|
|
)
|
|
self.cleanup = [self.nat_rule,
|
|
lb_rule,
|
|
self.vm_1,
|
|
self.service_offering,
|
|
self.account,
|
|
]
|
|
return
|
|
|
|
@attr(tags=["advanced", "advancedns", "smoke", "dvs"], required_hardware="true")
|
|
def test_reboot_router(self):
|
|
"""Test for reboot router"""
|
|
|
|
# Validate the Following
|
|
# 1. Post restart PF and LB rules should still function
|
|
# 2. verify if the ssh into the virtual machine
|
|
# still works through the sourceNAT Ip
|
|
|
|
# Retrieve router for the user account
|
|
|
|
logger.debug("Public IP: %s" % self.vm_1.ssh_ip)
|
|
logger.debug("Public IP: %s" % self.public_ip.ipaddress.ipaddress)
|
|
routers = list_routers(
|
|
self.apiclient,
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
self.assertEqual(
|
|
isinstance(routers, list),
|
|
True,
|
|
"Check list routers returns a valid list"
|
|
)
|
|
|
|
router = routers[0]
|
|
|
|
logger.debug("Rebooting the router (ID: %s)" % router.id)
|
|
|
|
cmd = rebootRouter.rebootRouterCmd()
|
|
cmd.id = router.id
|
|
self.apiclient.rebootRouter(cmd)
|
|
|
|
# Poll listVM to ensure VM is stopped properly
|
|
timeout = self.services["timeout"]
|
|
|
|
while True:
|
|
time.sleep(self.services["sleep"])
|
|
|
|
# Ensure that VM is in stopped state
|
|
list_vm_response = list_virtual_machines(
|
|
self.apiclient,
|
|
id=self.vm_1.id
|
|
)
|
|
|
|
if isinstance(list_vm_response, list):
|
|
|
|
vm = list_vm_response[0]
|
|
if vm.state == 'Running':
|
|
logger.debug("VM state: %s" % vm.state)
|
|
break
|
|
|
|
if timeout == 0:
|
|
raise Exception(
|
|
"Failed to start VM (ID: %s) in change service offering" %
|
|
vm.id)
|
|
|
|
timeout = timeout - 1
|
|
|
|
# we should be able to SSH after successful reboot
|
|
try:
|
|
logger.debug("SSH into VM (ID : %s ) after reboot" % self.vm_1.id)
|
|
|
|
SshClient(
|
|
self.public_ip.ipaddress.ipaddress,
|
|
self.services["natrule"]["publicport"],
|
|
self.vm_1.username,
|
|
self.vm_1.password,
|
|
retries=5
|
|
)
|
|
except Exception as e:
|
|
self.fail(
|
|
"SSH Access failed for %s: %s" %
|
|
(self.public_ip.ipaddress.ipaddress, e))
|
|
return
|
|
|
|
def tearDown(self):
|
|
cleanup_resources(self.apiclient, self.cleanup)
|
|
return
|
|
|
|
|
|
class TestReleaseIP(cloudstackTestCase):
|
|
|
|
def setUp(self):
|
|
self.apiclient = self.testClient.getApiClient()
|
|
self.services = self.testClient.getParsedTestDataConfig()
|
|
|
|
# Get Zone, Domain and templates
|
|
self.domain = get_domain(self.apiclient)
|
|
self.zone = get_zone(self.apiclient, self.testClient.getZoneForTests())
|
|
self.hypervisor = self.testClient.getHypervisorInfo()
|
|
template = get_test_template(
|
|
self.apiclient,
|
|
self.zone.id,
|
|
self.hypervisor
|
|
)
|
|
self.services["virtual_machine"]["zoneid"] = self.zone.id
|
|
|
|
# Create an account, network, VM, Port forwarding rule, LB rules
|
|
self.account = Account.create(
|
|
self.apiclient,
|
|
self.services["account"],
|
|
admin=True,
|
|
domainid=self.domain.id
|
|
)
|
|
|
|
self.service_offering = ServiceOffering.create(
|
|
self.apiclient,
|
|
self.services["service_offerings"]["tiny"]
|
|
)
|
|
|
|
self.virtual_machine = VirtualMachine.create(
|
|
self.apiclient,
|
|
self.services["virtual_machine"],
|
|
templateid=template.id,
|
|
accountid=self.account.name,
|
|
domainid=self.account.domainid,
|
|
serviceofferingid=self.service_offering.id
|
|
)
|
|
|
|
self.ip_address = PublicIPAddress.create(
|
|
self.apiclient,
|
|
self.account.name,
|
|
self.zone.id,
|
|
self.account.domainid
|
|
)
|
|
|
|
ip_addrs = list_publicIP(
|
|
self.apiclient,
|
|
account=self.account.name,
|
|
domainid=self.account.domainid,
|
|
issourcenat=False
|
|
)
|
|
try:
|
|
self.ip_addr = ip_addrs[0]
|
|
except Exception as e:
|
|
raise Exception(
|
|
"Failed: During acquiring source NAT for account: %s, :%s" %
|
|
(self.account.name, e))
|
|
|
|
self.nat_rule = NATRule.create(
|
|
self.apiclient,
|
|
self.virtual_machine,
|
|
self.services["natrule"],
|
|
self.ip_addr.id
|
|
)
|
|
self.lb_rule = LoadBalancerRule.create(
|
|
self.apiclient,
|
|
self.services["lbrule"],
|
|
self.ip_addr.id,
|
|
accountid=self.account.name
|
|
)
|
|
self.cleanup = [
|
|
self.virtual_machine,
|
|
self.account
|
|
]
|
|
return
|
|
|
|
def tearDown(self):
|
|
cleanup_resources(self.apiclient, self.cleanup)
|
|
|
|
@attr(tags=["advanced", "advancedns", "smoke", "dvs"], required_hardware="false")
|
|
def test_releaseIP(self):
|
|
"""Test for release public IP address"""
|
|
|
|
logger.debug("Deleting Public IP : %s" % self.ip_addr.id)
|
|
|
|
self.ip_address.delete(self.apiclient)
|
|
|
|
retriesCount = 10
|
|
isIpAddressDisassociated = False
|
|
while retriesCount > 0:
|
|
listResponse = list_publicIP(
|
|
self.apiclient,
|
|
id=self.ip_addr.id
|
|
)
|
|
if listResponse is None:
|
|
isIpAddressDisassociated = True
|
|
break
|
|
retriesCount -= 1
|
|
time.sleep(60)
|
|
# End while
|
|
|
|
self.assertTrue(
|
|
isIpAddressDisassociated,
|
|
"Failed to disassociate IP address")
|
|
|
|
# ListPortForwardingRules should not list
|
|
# associated rules with Public IP address
|
|
try:
|
|
list_nat_rule = list_nat_rules(
|
|
self.apiclient,
|
|
id=self.nat_rule.id
|
|
)
|
|
logger.debug("List NAT Rule response" + str(list_nat_rule))
|
|
except CloudstackAPIException:
|
|
logger.debug("Port Forwarding Rule is deleted")
|
|
|
|
# listLoadBalancerRules should not list
|
|
# associated rules with Public IP address
|
|
try:
|
|
list_lb_rule = list_lb_rules(
|
|
self.apiclient,
|
|
id=self.lb_rule.id
|
|
)
|
|
logger.debug("List LB Rule response" + str(list_lb_rule))
|
|
except CloudstackAPIException:
|
|
logger.debug("Port Forwarding Rule is deleted")
|
|
|
|
# SSH Attempt though public IP should fail
|
|
with self.assertRaises(Exception):
|
|
SshClient(
|
|
self.ip_addr.ipaddress,
|
|
self.services["natrule"]["publicport"],
|
|
self.virtual_machine.username,
|
|
self.virtual_machine.password,
|
|
retries=2,
|
|
delay=0
|
|
)
|
|
return
|
|
|
|
|
|
class TestDeleteAccount(cloudstackTestCase):
|
|
|
|
def setUp(self):
|
|
|
|
self.apiclient = self.testClient.getApiClient()
|
|
self.services = self.testClient.getParsedTestDataConfig()
|
|
|
|
# Get Zone, Domain and templates
|
|
self.domain = get_domain(self.apiclient)
|
|
self.zone = get_zone(self.apiclient, self.testClient.getZoneForTests())
|
|
self.hypervisor = self.testClient.getHypervisorInfo()
|
|
template = get_test_template(
|
|
self.apiclient,
|
|
self.zone.id,
|
|
self.hypervisor
|
|
)
|
|
self.services["virtual_machine"]["zoneid"] = self.zone.id
|
|
|
|
# Create an account, network, VM and IP addresses
|
|
self.account = Account.create(
|
|
self.apiclient,
|
|
self.services["account"],
|
|
admin=True,
|
|
domainid=self.domain.id
|
|
)
|
|
self.service_offering = ServiceOffering.create(
|
|
self.apiclient,
|
|
self.services["service_offerings"]["tiny"]
|
|
)
|
|
self.vm_1 = VirtualMachine.create(
|
|
self.apiclient,
|
|
self.services["virtual_machine"],
|
|
templateid=template.id,
|
|
accountid=self.account.name,
|
|
domainid=self.account.domainid,
|
|
serviceofferingid=self.service_offering.id
|
|
)
|
|
|
|
src_nat_ip_addrs = list_publicIP(
|
|
self.apiclient,
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
|
|
try:
|
|
src_nat_ip_addr = src_nat_ip_addrs[0]
|
|
except Exception as e:
|
|
self.fail("SSH failed for VM with IP: %s %s" %
|
|
(src_nat_ip_addr.ipaddress, e))
|
|
|
|
self.lb_rule = LoadBalancerRule.create(
|
|
self.apiclient,
|
|
self.services["lbrule"],
|
|
src_nat_ip_addr.id,
|
|
self.account.name
|
|
)
|
|
self.lb_rule.assign(self.apiclient, [self.vm_1])
|
|
|
|
self.nat_rule = NATRule.create(
|
|
self.apiclient,
|
|
self.vm_1,
|
|
self.services["natrule"],
|
|
src_nat_ip_addr.id
|
|
)
|
|
self.cleanup = []
|
|
return
|
|
|
|
@attr(tags=["advanced", "advancedns", "smoke"], required_hardware="false")
|
|
def test_delete_account(self):
|
|
"""Test for delete account"""
|
|
|
|
# Validate the Following
|
|
# 1. after account.cleanup.interval (global setting)
|
|
# time all the PF/LB rules should be deleted
|
|
# 2. verify that list(LoadBalancer/PortForwarding)Rules
|
|
# API does not return any rules for the account
|
|
# 3. The domR should have been expunged for this account
|
|
|
|
self.account.delete(self.apiclient)
|
|
interval = list_configurations(
|
|
self.apiclient,
|
|
name='account.cleanup.interval'
|
|
)
|
|
self.assertEqual(
|
|
isinstance(interval, list),
|
|
True,
|
|
"Check if account.cleanup.interval config present"
|
|
)
|
|
# Sleep to ensure that all resources are deleted
|
|
time.sleep(int(interval[0].value))
|
|
|
|
# ListLoadBalancerRules should not list
|
|
# associated rules with deleted account
|
|
# Unable to find account testuser1 in domain 1 : Exception
|
|
try:
|
|
list_lb_rules(
|
|
self.apiclient,
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
except CloudstackAPIException:
|
|
logger.debug("Port Forwarding Rule is deleted")
|
|
|
|
# ListPortForwardingRules should not
|
|
# list associated rules with deleted account
|
|
try:
|
|
list_nat_rules(
|
|
self.apiclient,
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
except CloudstackAPIException:
|
|
logger.debug("NATRule is deleted")
|
|
|
|
# Retrieve router for the user account
|
|
try:
|
|
routers = list_routers(
|
|
self.apiclient,
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
self.assertEqual(
|
|
routers,
|
|
None,
|
|
"Check routers are properly deleted."
|
|
)
|
|
except CloudstackAPIException:
|
|
logger.debug("Router is deleted")
|
|
|
|
except Exception as e:
|
|
raise Exception(
|
|
"Encountered %s raised while fetching routers for account: %s" %
|
|
(e, self.account.name))
|
|
return
|
|
|
|
def tearDown(self):
|
|
cleanup_resources(self.apiclient, self.cleanup)
|
|
return
|
|
|
|
|
|
@ddt
|
|
class TestRouterRules(cloudstackTestCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
|
|
testClient = super(TestRouterRules, cls).getClsTestClient()
|
|
cls.apiclient = testClient.getApiClient()
|
|
cls.services = testClient.getParsedTestDataConfig()
|
|
# Get Zone, Domain and templates
|
|
cls.domain = get_domain(cls.apiclient)
|
|
cls.zone = get_zone(cls.apiclient, testClient.getZoneForTests())
|
|
cls.hypervisor = testClient.getHypervisorInfo()
|
|
cls.hostConfig = cls.config.__dict__["zones"][0].__dict__["pods"][0].__dict__["clusters"][0].__dict__["hosts"][0].__dict__
|
|
template = get_test_template(
|
|
cls.apiclient,
|
|
cls.zone.id,
|
|
cls.hypervisor
|
|
)
|
|
if template == FAILED:
|
|
assert False, "get_test_template() failed to return template"
|
|
|
|
# Create an account, network, VM and IP addresses
|
|
cls.account = Account.create(
|
|
cls.apiclient,
|
|
cls.services["account"],
|
|
admin=True,
|
|
domainid=cls.domain.id
|
|
)
|
|
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
|
|
cls.service_offering = ServiceOffering.create(
|
|
cls.apiclient,
|
|
cls.services["service_offerings"]["tiny"]
|
|
)
|
|
cls.virtual_machine = VirtualMachine.create(
|
|
cls.apiclient,
|
|
cls.services["virtual_machine"],
|
|
templateid=template.id,
|
|
accountid=cls.account.name,
|
|
domainid=cls.account.domainid,
|
|
serviceofferingid=cls.service_offering.id
|
|
)
|
|
cls.defaultNetworkId = cls.virtual_machine.nic[0].networkid
|
|
|
|
cls._cleanup = [
|
|
cls.virtual_machine,
|
|
cls.account,
|
|
cls.service_offering
|
|
]
|
|
|
|
def setUp(self):
|
|
self.apiclient = self.testClient.getApiClient()
|
|
self.cleanup = []
|
|
return
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
try:
|
|
cleanup_resources(cls.apiclient, cls._cleanup)
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
|
|
def tearDown(self):
|
|
cleanup_resources(self.apiclient, self.cleanup)
|
|
return
|
|
|
|
def getCommandResultFromRouter(self, router, command):
|
|
"""Run given command on router and return the result"""
|
|
|
|
if (self.hypervisor.lower() == 'vmware'
|
|
or self.hypervisor.lower() == 'hyperv'):
|
|
result = get_process_status(
|
|
self.apiclient.connection.mgtSvr,
|
|
22,
|
|
self.apiclient.connection.user,
|
|
self.apiclient.connection.passwd,
|
|
router.linklocalip,
|
|
command,
|
|
hypervisor=self.hypervisor
|
|
)
|
|
else:
|
|
hosts = list_hosts(
|
|
self.apiclient,
|
|
id=router.hostid,
|
|
)
|
|
self.assertEqual(
|
|
isinstance(hosts, list),
|
|
True,
|
|
"Check for list hosts response return valid data"
|
|
)
|
|
host = hosts[0]
|
|
host.user = self.hostConfig['username']
|
|
host.passwd = self.hostConfig['password']
|
|
|
|
result = get_process_status(
|
|
host.ipaddress,
|
|
22,
|
|
host.user,
|
|
host.passwd,
|
|
router.linklocalip,
|
|
command
|
|
)
|
|
return result
|
|
|
|
def createNetworkRules(self, rule, ipaddressobj, networkid):
|
|
""" Create specified rule on acquired public IP and
|
|
default network of virtual machine
|
|
"""
|
|
# Open up firewall port for SSH
|
|
self.fw_rule = FireWallRule.create(
|
|
self.apiclient,
|
|
ipaddressid=ipaddressobj.ipaddress.id,
|
|
protocol=self.services["fwrule"]["protocol"],
|
|
cidrlist=['0.0.0.0/0'],
|
|
startport=self.services["fwrule"]["startport"],
|
|
endport=self.services["fwrule"]["endport"]
|
|
)
|
|
|
|
if rule == STATIC_NAT_RULE:
|
|
StaticNATRule.enable(
|
|
self.apiclient,
|
|
ipaddressobj.ipaddress.id,
|
|
self.virtual_machine.id,
|
|
networkid
|
|
)
|
|
|
|
elif rule == LB_RULE:
|
|
self.lb_rule = LoadBalancerRule.create(
|
|
self.apiclient,
|
|
self.services["lbrule"],
|
|
ipaddressid=ipaddressobj.ipaddress.id,
|
|
accountid=self.account.name,
|
|
networkid=self.virtual_machine.nic[0].networkid,
|
|
domainid=self.account.domainid)
|
|
|
|
vmidipmap = [{"vmid": str(self.virtual_machine.id),
|
|
"vmip": str(self.virtual_machine.nic[0].ipaddress)}]
|
|
|
|
self.lb_rule.assign(
|
|
self.apiclient,
|
|
vmidipmap=vmidipmap
|
|
)
|
|
else:
|
|
self.nat_rule = NATRule.create(
|
|
self.apiclient,
|
|
self.virtual_machine,
|
|
self.services["natrule"],
|
|
ipaddressobj.ipaddress.id
|
|
)
|
|
return
|
|
|
|
def removeNetworkRules(self, rule):
|
|
""" Remove specified rule on acquired public IP and
|
|
default network of virtual machine
|
|
"""
|
|
self.fw_rule.delete(self.apiclient)
|
|
|
|
if rule == STATIC_NAT_RULE:
|
|
StaticNATRule.disable(
|
|
self.apiclient,
|
|
self.ipaddress.ipaddress.id)
|
|
|
|
elif rule == LB_RULE:
|
|
self.lb_rule.delete(self.apiclient)
|
|
else:
|
|
self.nat_rule.delete(self.apiclient)
|
|
|
|
logger.debug("Releasing IP %s from account %s" % (self.ipaddress.ipaddress.ipaddress, self.account.name))
|
|
self.ipaddress.delete(self.apiclient)
|
|
|
|
return
|
|
|
|
@data(STATIC_NAT_RULE, NAT_RULE, LB_RULE)
|
|
@attr(tags=["advanced", "advancedns", "smoke", "dvs"], required_hardware="true")
|
|
def test_network_rules_acquired_public_ip(self, value):
|
|
"""Test for Router rules for network rules on acquired public IP"""
|
|
|
|
# Validate the following:
|
|
# 1. listPortForwardingRules should not return the deleted rule anymore
|
|
# 2. attempt to do ssh should now fail
|
|
|
|
self.ipaddress = PublicIPAddress.create(
|
|
self.apiclient,
|
|
accountid=self.account.name,
|
|
zoneid=self.zone.id,
|
|
domainid=self.account.domainid,
|
|
networkid=self.defaultNetworkId
|
|
)
|
|
|
|
self.createNetworkRules(rule=value,
|
|
ipaddressobj=self.ipaddress,
|
|
networkid=self.defaultNetworkId)
|
|
|
|
router = Router.list(self.apiclient,
|
|
networkid=self.virtual_machine.nic[0].networkid,
|
|
listall=True)[0]
|
|
|
|
response = self.getCommandResultFromRouter(router, "ip addr")
|
|
logger.debug(response)
|
|
stringToMatch = "inet %s" % self.ipaddress.ipaddress.ipaddress
|
|
self.assertTrue(stringToMatch in str(response), "IP address is\
|
|
not added to the VR!")
|
|
|
|
try:
|
|
logger.debug("SSHing into VM with IP address %s with NAT IP %s" %
|
|
(
|
|
self.virtual_machine.ipaddress,
|
|
self.ipaddress.ipaddress.ipaddress
|
|
))
|
|
self.virtual_machine.get_ssh_client(
|
|
self.ipaddress.ipaddress.ipaddress)
|
|
except Exception as e:
|
|
self.fail(
|
|
"SSH Access failed for %s: %s" %
|
|
(self.virtual_machine.ipaddress, e)
|
|
)
|
|
|
|
# Validate the following:
|
|
# 1. listIpForwardingRules should not return the deleted rule anymore
|
|
# 2. attempt to do ssh should now fail
|
|
|
|
self.removeNetworkRules(rule=value)
|
|
|
|
response = self.getCommandResultFromRouter(router, "ip addr")
|
|
logger.debug(response)
|
|
stringToMatch = "inet %s" % self.ipaddress.ipaddress.ipaddress
|
|
self.assertFalse(stringToMatch in str(response), "IP address is\
|
|
not removed from VR even after disabling stat in NAT")
|
|
|
|
# Check if the Public SSH port is inaccessible
|
|
with self.assertRaises(Exception):
|
|
logger.debug(
|
|
"SSHing into VM with IP address %s after NAT rule deletion" %
|
|
self.virtual_machine.ipaddress)
|
|
|
|
SshClient(
|
|
self.ipaddress.ipaddress.ipaddress,
|
|
self.virtual_machine.ssh_port,
|
|
self.virtual_machine.username,
|
|
self.virtual_machine.password,
|
|
retries=2,
|
|
delay=0
|
|
)
|
|
return
|
|
|
|
class TestL2Networks(cloudstackTestCase):
|
|
|
|
def setUp(self):
|
|
self.apiclient = self.testClient.getApiClient()
|
|
self.services["network"]["networkoffering"] = self.network_offering.id
|
|
|
|
self.l2_network = Network.create(
|
|
self.apiclient,
|
|
self.services["l2-network"],
|
|
zoneid=self.zone.id,
|
|
networkofferingid=self.network_offering.id
|
|
)
|
|
self.cleanup = [
|
|
self.l2_network]
|
|
|
|
def tearDown(self):
|
|
cleanup_resources(self.apiclient, self.cleanup)
|
|
return
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
testClient = super(TestL2Networks, cls).getClsTestClient()
|
|
cls.apiclient = testClient.getApiClient()
|
|
cls.services = testClient.getParsedTestDataConfig()
|
|
|
|
# Get Zone, Domain and templates
|
|
cls.domain = get_domain(cls.apiclient)
|
|
cls.zone = get_zone(cls.apiclient, testClient.getZoneForTests())
|
|
cls.hypervisor = testClient.getHypervisorInfo()
|
|
cls.services['mode'] = cls.zone.networktype
|
|
# Create Accounts & networks
|
|
cls.account = Account.create(
|
|
cls.apiclient,
|
|
cls.services["account"],
|
|
admin=True,
|
|
domainid=cls.domain.id
|
|
)
|
|
cls.template = get_test_template(
|
|
cls.apiclient,
|
|
cls.zone.id,
|
|
cls.hypervisor
|
|
)
|
|
cls.service_offering = ServiceOffering.create(
|
|
cls.apiclient,
|
|
cls.services["service_offerings"]["tiny"]
|
|
)
|
|
cls.services["network"]["zoneid"] = cls.zone.id
|
|
|
|
cls.network_offering = NetworkOffering.create(
|
|
cls.apiclient,
|
|
cls.services["l2-network_offering"],
|
|
)
|
|
# Enable Network offering
|
|
cls.network_offering.update(cls.apiclient, state='Enabled')
|
|
|
|
cls._cleanup = [
|
|
cls.account,
|
|
cls.network_offering,
|
|
cls.service_offering
|
|
]
|
|
return
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
try:
|
|
# Cleanup resources used
|
|
cleanup_resources(cls.apiclient, cls._cleanup)
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
return
|
|
|
|
@attr(tags=["advanced", "advancedns", "smoke"], required_hardware="false")
|
|
def test_deploy_vm_l2network(self):
|
|
"""Creates an l2 network and verifies user is able to deploy a VM in it"""
|
|
|
|
# Validate the following:
|
|
# 1. Deploys a VM
|
|
# 2. There are no network services available since this is L2 Network
|
|
|
|
self.virtual_machine = VirtualMachine.create(
|
|
self.apiclient,
|
|
self.services["virtual_machine"],
|
|
templateid=self.template.id,
|
|
serviceofferingid=self.service_offering.id,
|
|
networkids=self.l2_network.id,
|
|
zoneid=self.zone.id
|
|
)
|
|
|
|
self.cleanup.insert(0, self.virtual_machine)
|
|
|
|
list_vm = list_virtual_machines(
|
|
self.apiclient,
|
|
id = self.virtual_machine.id
|
|
)
|
|
self.assertEqual(
|
|
isinstance(list_vm, list),
|
|
True,
|
|
"Check if virtual machine is present"
|
|
)
|
|
|
|
self.assertEqual(
|
|
list_vm[0].nic[0].type,
|
|
'L2',
|
|
"Check Correct Network type is available"
|
|
)
|
|
|
|
self.assertFalse(
|
|
'gateway' in str(list_vm[0].nic[0])
|
|
)
|
|
|
|
self.assertFalse(
|
|
'ipaddress' in str(list_vm[0].nic[0])
|
|
)
|
|
|
|
return
|
|
|
|
@attr(tags=["advanced", "advancedns", "smoke"], required_hardware="false")
|
|
def test_delete_network_while_vm_on_it(self):
|
|
"""It verifies the user is not able to delete network which has running vms"""
|
|
|
|
# Validate the following:
|
|
# 1. Deploys a VM
|
|
# 2. Tries to delete network and expects exception to appear
|
|
|
|
self.virtual_machine = VirtualMachine.create(
|
|
self.apiclient,
|
|
self.services["virtual_machine"],
|
|
templateid=self.template.id,
|
|
serviceofferingid=self.service_offering.id,
|
|
networkids=self.l2_network.id,
|
|
zoneid=self.zone.id
|
|
)
|
|
|
|
self.cleanup.insert(0, self.virtual_machine)
|
|
|
|
list_vm = list_virtual_machines(
|
|
self.apiclient,
|
|
id = self.virtual_machine.id
|
|
)
|
|
self.assertEqual(
|
|
isinstance(list_vm, list),
|
|
True,
|
|
"Check if virtual machine is present"
|
|
)
|
|
|
|
try:
|
|
self.l2_network.delete(self.apiclient)
|
|
except Exception:
|
|
pass
|
|
else:
|
|
self.fail("Expected an exception to be thrown, failing")
|
|
|
|
return
|
|
|
|
@attr(tags=["advanced", "advancedns", "smoke"], required_hardware="false")
|
|
def test_l2network_restart(self):
|
|
"""This test covers a few scenarios around restarting a network"""
|
|
|
|
# Validate the following:
|
|
# 1. Creates a l2 network
|
|
# 2. Tries to restart a network with no VMs, which trows error 'not in the right state'
|
|
# 3. Deploys a VM
|
|
# 4. Restarts the network without cleanup
|
|
# 5. Restarts the network with cleanup
|
|
|
|
try:
|
|
self.l2_network.restart(self.apiclient, cleanup=True)
|
|
except Exception:
|
|
pass
|
|
else:
|
|
self.fail("Expected an exception to be thrown, failing")
|
|
|
|
li_net = self.l2_network.list(self.apiclient)[0]
|
|
|
|
self.assertTrue(
|
|
li_net.state,
|
|
'Allocated'
|
|
"Not the correct state"
|
|
)
|
|
|
|
self.virtual_machine = VirtualMachine.create(
|
|
self.apiclient,
|
|
self.services["virtual_machine"],
|
|
templateid=self.template.id,
|
|
serviceofferingid=self.service_offering.id,
|
|
networkids=self.l2_network.id,
|
|
zoneid=self.zone.id
|
|
)
|
|
|
|
self.cleanup.insert(0, self.virtual_machine)
|
|
|
|
list_vm = list_virtual_machines(
|
|
self.apiclient,
|
|
id = self.virtual_machine.id
|
|
)
|
|
self.assertEqual(
|
|
isinstance(list_vm, list),
|
|
True,
|
|
"Check if virtual machine is present"
|
|
)
|
|
|
|
self.l2_network.restart(self.apiclient, cleanup=False)
|
|
|
|
li_net = self.l2_network.list(self.apiclient)[0]
|
|
|
|
self.assertTrue(
|
|
li_net.state,
|
|
'Implemented'
|
|
"Not the correct state"
|
|
)
|
|
|
|
self.l2_network.restart(self.apiclient, cleanup=True)
|
|
|
|
li_net = self.l2_network.list(self.apiclient)[0]
|
|
|
|
self.assertTrue(
|
|
li_net.state,
|
|
'Implemented'
|
|
"Not the correct state"
|
|
)
|
|
|
|
return
|
|
|
|
class TestPrivateVlansL2Networks(cloudstackTestCase):
|
|
|
|
def setUp(self):
|
|
self.apiclient = self.testClient.getApiClient()
|
|
|
|
self.cleanup = [
|
|
]
|
|
|
|
def tearDown(self):
|
|
cleanup_resources(self.apiclient, self.cleanup)
|
|
return
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
testClient = super(TestPrivateVlansL2Networks, cls).getClsTestClient()
|
|
cls.apiclient = testClient.getApiClient()
|
|
cls.services = testClient.getParsedTestDataConfig()
|
|
|
|
cls.domain = get_domain(cls.apiclient)
|
|
cls.zone = get_zone(cls.apiclient, testClient.getZoneForTests())
|
|
cls.hypervisor = testClient.getHypervisorInfo()
|
|
cls.services['mode'] = cls.zone.networktype
|
|
|
|
# Supported hypervisor = Vmware using dvSwitches for guest traffic
|
|
isVmware = False
|
|
isDvSwitch = False
|
|
if cls.hypervisor.lower() in ["vmware"]:
|
|
isVmware = True
|
|
clusters = Cluster.list(cls.apiclient, zoneid=cls.zone.id, hypervisor=cls.hypervisor)
|
|
for cluster in clusters:
|
|
if cluster.resourcedetails.guestvswitchtype == "vmwaredvs":
|
|
# Test only if cluster uses dvSwitch
|
|
isDvSwitch = True
|
|
break
|
|
|
|
# Supported hypervisor = KVM using OVS
|
|
isKVM = cls.hypervisor.lower() in ["kvm"]
|
|
isOVSEnabled = False
|
|
hostConfig = cls.config.__dict__["zones"][0].__dict__["pods"][0].__dict__["clusters"][0].__dict__["hosts"][0].__dict__
|
|
if isKVM :
|
|
# Test only if all the hosts use OVS
|
|
grepCmd = 'grep "network.bridge.type=openvswitch" /etc/cloudstack/agent/agent.properties'
|
|
hosts = list_hosts(cls.apiclient, type='Routing', hypervisor='kvm')
|
|
if len(hosts) > 0 :
|
|
isOVSEnabled = True
|
|
for host in hosts :
|
|
isOVSEnabled = isOVSEnabled and len(SshClient(host.ipaddress, port=22, user=hostConfig["username"],
|
|
passwd=hostConfig["password"]).execute(grepCmd)) != 0
|
|
|
|
supported = isVmware and isDvSwitch or isKVM and isOVSEnabled
|
|
cls.unsupportedHardware = not supported
|
|
|
|
cls._cleanup = []
|
|
|
|
if supported:
|
|
|
|
cls.account = Account.create(
|
|
cls.apiclient,
|
|
cls.services["account"],
|
|
admin=True,
|
|
domainid=cls.domain.id
|
|
)
|
|
cls.template = get_test_template(
|
|
cls.apiclient,
|
|
cls.zone.id,
|
|
cls.hypervisor
|
|
)
|
|
cls.service_offering = ServiceOffering.create(
|
|
cls.apiclient,
|
|
cls.services["service_offerings"]["tiny"]
|
|
)
|
|
cls.services["network"]["zoneid"] = cls.zone.id
|
|
cls.services['mode'] = cls.zone.networktype
|
|
cls.services["small"]["zoneid"] = cls.zone.id
|
|
cls.services["small"]["template"] = cls.template.id
|
|
cls.services["l2-network-pvlan-community-1"] = {
|
|
"name": "Test Network L2 PVLAN Community 1",
|
|
"displaytext": "Test Network L2 PVLAN Community 1",
|
|
"vlan": 900,
|
|
"isolatedpvlan": "901",
|
|
"isolatedpvlantype": "community"
|
|
}
|
|
cls.services["l2-network-pvlan-community-2"] = {
|
|
"name": "Test Network L2 PVLAN Community 2",
|
|
"displaytext": "Test Network L2 PVLAN Community 2",
|
|
"vlan": 900,
|
|
"isolatedpvlan": "902",
|
|
"isolatedpvlantype": "community"
|
|
}
|
|
cls.services["l2-network-pvlan-promiscuous"] = {
|
|
"name": "Test Network L2 PVLAN Promiscuous",
|
|
"displaytext": "Test Network L2 PVLAN Promiscuous",
|
|
"vlan": 900,
|
|
"isolatedpvlan" : "900",
|
|
"isolatedpvlantype": "promiscuous"
|
|
}
|
|
cls.services["l2-network-pvlan-isolated"] = {
|
|
"name": "Test Network L2 PVLAN Isolated",
|
|
"displaytext": "Test Network L2 PVLAN Isolated",
|
|
"vlan": 900,
|
|
"isolatedpvlan": "903",
|
|
"isolatedpvlantype": "isolated"
|
|
}
|
|
|
|
cls.l2_network_offering = NetworkOffering.create(
|
|
cls.apiclient,
|
|
cls.services["l2-network_offering"],
|
|
specifyvlan=True
|
|
)
|
|
cls.isolated_network_offering = NetworkOffering.create(
|
|
cls.apiclient,
|
|
cls.services["network_offering"]
|
|
)
|
|
cls.l2_network_offering.update(cls.apiclient, state='Enabled')
|
|
cls.isolated_network_offering.update(cls.apiclient, state='Enabled')
|
|
|
|
cls.l2_pvlan_community1 = Network.create(
|
|
cls.apiclient,
|
|
cls.services["l2-network-pvlan-community-1"],
|
|
zoneid=cls.zone.id,
|
|
networkofferingid=cls.l2_network_offering.id
|
|
)
|
|
cls.l2_pvlan_community2 = Network.create(
|
|
cls.apiclient,
|
|
cls.services["l2-network-pvlan-community-2"],
|
|
zoneid=cls.zone.id,
|
|
networkofferingid=cls.l2_network_offering.id
|
|
)
|
|
cls.l2_pvlan_isolated = Network.create(
|
|
cls.apiclient,
|
|
cls.services["l2-network-pvlan-isolated"],
|
|
zoneid=cls.zone.id,
|
|
networkofferingid=cls.l2_network_offering.id
|
|
)
|
|
cls.l2_pvlan_promiscuous = Network.create(
|
|
cls.apiclient,
|
|
cls.services["l2-network-pvlan-promiscuous"],
|
|
zoneid=cls.zone.id,
|
|
networkofferingid=cls.l2_network_offering.id
|
|
)
|
|
cls.isolated_network = Network.create(
|
|
cls.apiclient,
|
|
cls.services["isolated_network"],
|
|
zoneid=cls.zone.id,
|
|
networkofferingid=cls.isolated_network_offering.id,
|
|
accountid=cls.account.name,
|
|
domainid=cls.account.domainid
|
|
)
|
|
|
|
cls._cleanup = [
|
|
cls.l2_pvlan_promiscuous,
|
|
cls.l2_pvlan_isolated,
|
|
cls.l2_pvlan_community1,
|
|
cls.l2_pvlan_community2,
|
|
cls.isolated_network,
|
|
cls.l2_network_offering,
|
|
cls.isolated_network_offering,
|
|
cls.service_offering,
|
|
cls.account,
|
|
]
|
|
|
|
return
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
try:
|
|
cleanup_resources(cls.apiclient, cls._cleanup)
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
return
|
|
|
|
def deploy_vm_multiple_nics(self, name, l2net):
|
|
"""
|
|
Deploy VM on L2 network and isolated network so VM can get an IP, to use with arping command for isolation test
|
|
"""
|
|
self.services["small"]["name"] = name
|
|
|
|
vm = VirtualMachine.create(
|
|
self.apiclient,
|
|
self.services["small"],
|
|
accountid=self.account.name,
|
|
domainid=self.account.domainid,
|
|
serviceofferingid=self.service_offering.id,
|
|
networkids=[self.isolated_network.id, l2net.id],
|
|
mode=self.services["mode"]
|
|
)
|
|
|
|
return vm
|
|
|
|
def is_vm_l2_isolated_from_dest(self, vm, eth_device, dest_ip):
|
|
"""
|
|
True if VM is isolated from dest IP - using arping through the NIC on L2 network:
|
|
If arping can reach destination, then response is greater than 3 (no reply)
|
|
"""
|
|
ssh_client = vm.get_ssh_client()
|
|
response = ssh_client.execute("/usr/sbin/arping -c 5 -I %s %s" % (eth_device, str(dest_ip)))
|
|
return len(response) == 3
|
|
|
|
def enable_l2_nic(self, vm):
|
|
vm_ip = list(filter(lambda x: x['networkid'] == self.isolated_network.id, vm.nic))[0]['ipaddress']
|
|
ssh_client = vm.get_ssh_client()
|
|
eth_device = "eth0"
|
|
if len(ssh_client.execute("/sbin/ifconfig %s | grep %s" % (eth_device, vm_ip))) > 0:
|
|
eth_device = "eth1"
|
|
ssh_client.execute("/sbin/ifconfig %s up" % eth_device)
|
|
return vm_ip, eth_device
|
|
|
|
@attr(tags=["advanced", "advancedns", "smoke", "pvlan"], required_hardware="true")
|
|
@skipTestIf("unsupportedHardware")
|
|
def test_l2_network_pvlan_connectivity(self):
|
|
try:
|
|
vm_community1_one = self.deploy_vm_multiple_nics("vmcommunity1one", self.l2_pvlan_community1)
|
|
vm_community1_two = self.deploy_vm_multiple_nics("vmcommunity1two", self.l2_pvlan_community1)
|
|
vm_community2 = self.deploy_vm_multiple_nics("vmcommunity2", self.l2_pvlan_community2)
|
|
|
|
vm_isolated1 = self.deploy_vm_multiple_nics("vmisolated1", self.l2_pvlan_isolated)
|
|
vm_isolated2 = self.deploy_vm_multiple_nics("vmisolated2", self.l2_pvlan_isolated)
|
|
|
|
vm_promiscuous1 = self.deploy_vm_multiple_nics("vmpromiscuous1", self.l2_pvlan_promiscuous)
|
|
vm_promiscuous2 = self.deploy_vm_multiple_nics("vmpromiscuous2", self.l2_pvlan_promiscuous)
|
|
|
|
self.cleanup.append(vm_community1_one)
|
|
self.cleanup.append(vm_community1_two)
|
|
self.cleanup.append(vm_community2)
|
|
self.cleanup.append(vm_isolated1)
|
|
self.cleanup.append(vm_isolated2)
|
|
self.cleanup.append(vm_promiscuous1)
|
|
self.cleanup.append(vm_promiscuous2)
|
|
|
|
vm_community1_one_ip, vm_community1_one_eth = self.enable_l2_nic(vm_community1_one)
|
|
vm_community1_two_ip, vm_community1_two_eth = self.enable_l2_nic(vm_community1_two)
|
|
vm_community2_ip, vm_community2_eth = self.enable_l2_nic(vm_community2)
|
|
vm_isolated1_ip, vm_isolated1_eth = self.enable_l2_nic(vm_isolated1)
|
|
vm_isolated2_ip, vm_isolated2_eth = self.enable_l2_nic(vm_isolated2)
|
|
vm_promiscuous1_ip, vm_promiscuous1_eth = self.enable_l2_nic(vm_promiscuous1)
|
|
vm_promiscuous2_ip, vm_promiscuous2_eth = self.enable_l2_nic(vm_promiscuous2)
|
|
|
|
# Community PVLAN checks
|
|
different_community_isolated = self.is_vm_l2_isolated_from_dest(vm_community1_one, vm_community1_one_eth, vm_community2_ip)
|
|
same_community_isolated = self.is_vm_l2_isolated_from_dest(vm_community1_one, vm_community1_one_eth, vm_community1_two_ip)
|
|
community_to_promiscuous_isolated = self.is_vm_l2_isolated_from_dest(vm_community1_one, vm_community1_one_eth, vm_promiscuous1_ip)
|
|
community_to_isolated = self.is_vm_l2_isolated_from_dest(vm_community1_one, vm_community1_one_eth, vm_isolated1_ip)
|
|
|
|
self.assertTrue(
|
|
different_community_isolated,
|
|
"VMs on different community PVLANs must be isolated on layer 2"
|
|
)
|
|
|
|
self.assertFalse(
|
|
same_community_isolated,
|
|
"VMs on the same community PVLAN must not be isolated on layer 2"
|
|
)
|
|
|
|
self.assertFalse(
|
|
community_to_promiscuous_isolated,
|
|
"VMs on community PVLANs must not be isolated on layer 2 to VMs on promiscuous PVLAN"
|
|
)
|
|
|
|
self.assertTrue(
|
|
community_to_isolated,
|
|
"VMs on community PVLANs must be isolated on layer 2 to Vms on isolated PVLAN"
|
|
)
|
|
|
|
# Isolated PVLAN checks
|
|
same_isolated = self.is_vm_l2_isolated_from_dest(vm_isolated1, vm_isolated1_eth, vm_isolated2_ip)
|
|
isolated_to_community_isolated = self.is_vm_l2_isolated_from_dest(vm_isolated1, vm_isolated1_eth, vm_community1_one_ip)
|
|
isolated_to_promiscuous_isolated = self.is_vm_l2_isolated_from_dest(vm_isolated1, vm_isolated1_eth, vm_promiscuous1_ip)
|
|
|
|
self.assertTrue(
|
|
same_isolated,
|
|
"VMs on isolated PVLANs must be isolated on layer 2"
|
|
)
|
|
self.assertTrue(
|
|
isolated_to_community_isolated,
|
|
"VMs on isolated PVLANs must be isolated on layer 2 to Vms on community PVLAN"
|
|
)
|
|
self.assertFalse(
|
|
isolated_to_promiscuous_isolated,
|
|
"VMs on isolated PVLANs must not be isolated on layer 2 to Vms on promiscuous PVLAN",
|
|
)
|
|
|
|
# Promiscuous PVLAN checks
|
|
same_promiscuous = self.is_vm_l2_isolated_from_dest(vm_promiscuous1, vm_promiscuous1_eth, vm_promiscuous2_ip)
|
|
prom_to_community_isolated = self.is_vm_l2_isolated_from_dest(vm_promiscuous1, vm_promiscuous1_eth, vm_community1_one_ip)
|
|
prom_to_isolated = self.is_vm_l2_isolated_from_dest(vm_promiscuous1, vm_promiscuous1_eth, vm_isolated1_ip)
|
|
|
|
self.assertFalse(
|
|
same_promiscuous,
|
|
"VMs on promiscuous PVLANs must not be isolated on layer 2"
|
|
)
|
|
self.assertFalse(
|
|
prom_to_community_isolated,
|
|
"VMs on promiscuous PVLANs must not be isolated on layer 2 to Vms on isolated PVLAN"
|
|
)
|
|
self.assertFalse(
|
|
prom_to_isolated,
|
|
"VMs on promiscuous PVLANs must not be isolated on layer 2 to Vms on community PVLAN"
|
|
)
|
|
except Exception as e:
|
|
self.fail("Failing test. Error: %s" % e)
|
|
|
|
return
|