cloudstack/test/integration/component/test_multiple_public_interfaces.py
Boris Stoyanov - a.k.a Bobby 67f509dc57 CLOUDSTACK-10163: Component tests sanity (#2344)
Fixing some component tests and adding them in travis.

Signed-off-by: Rohit Yadav <rohit.yadav@shapeblue.com>
2018-01-01 16:44:18 +05:30

1305 lines
54 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
""" BVT tests for network services on public IP's from different public IP
range than that of associated source NAT IP of the network. Each IP associated
with network from a different public IP range results in a new public
interface on VR (eth3, eth4 etc) and iptable
"""
# Import Local Modules
from marvin.codes import (FAILED)
from marvin.cloudstackTestCase import cloudstackTestCase
from marvin.cloudstackException import CloudstackAPIException
from marvin.cloudstackAPI import rebootRouter
from marvin.sshClient import SshClient
from marvin.lib.utils import cleanup_resources, get_process_status
from marvin.lib.base import (Account,
VirtualMachine,
ServiceOffering,
NATRule,
PublicIPAddress,
StaticNATRule,
FireWallRule,
Network,
NetworkOffering,
LoadBalancerRule,
PublicIpRange,
Router,
VpcOffering,
VPC,
NetworkACLList,
NetworkACL)
from marvin.lib.common import (get_domain,
get_zone,
get_template,
list_hosts,
list_routers)
from nose.plugins.attrib import attr
from ddt import ddt, data
# Import System modules
import socket
import time
import logging
_multiprocess_shared_ = True
logger = logging.getLogger('TestNetworkOps')
stream_handler = logging.StreamHandler()
logger.setLevel(logging.DEBUG)
logger.addHandler(stream_handler)
class TestPortForwarding(cloudstackTestCase):
@classmethod
def setUpClass(cls):
testClient = super(TestPortForwarding, cls).getClsTestClient()
cls.apiclient = testClient.getApiClient()
cls.services = testClient.getParsedTestDataConfig()
cls.hypervisor = testClient.getHypervisorInfo()
# Get Zone, Domain and templates
cls.domain = get_domain(cls.apiclient)
cls.zone = get_zone(cls.apiclient, testClient.getZoneForTests())
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
cls.services["zoneid"] = cls.zone.id
template = get_template(
cls.apiclient,
cls.zone.id,
cls.services["ostype"]
)
if template == FAILED:
assert False, "get_template() failed to return template with description %s" % cls.services[
"ostype"]
# Create an account, network, VM and IP addresses
cls.account = Account.create(
cls.apiclient,
cls.services["account"],
admin=True,
domainid=cls.domain.id
)
cls.services["publiciprange"]["zoneid"] = cls.zone.id
cls.service_offering = ServiceOffering.create(
cls.apiclient,
cls.services["service_offerings"]["tiny"]
)
cls.virtual_machine = VirtualMachine.create(
cls.apiclient,
cls.services["virtual_machine"],
templateid=template.id,
accountid=cls.account.name,
domainid=cls.account.domainid,
serviceofferingid=cls.service_offering.id
)
cls._cleanup = [
cls.virtual_machine,
cls.account,
cls.service_offering
]
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.cleanup = []
return
@classmethod
def tearDownClass(cls):
try:
cls.apiclient = super(
TestPortForwarding,
cls).getClsTestClient().getApiClient()
cleanup_resources(cls.apiclient, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
def tearDown(self):
cleanup_resources(self.apiclient, self.cleanup)
return
@attr(tags=["advanced", "smoke"], required_hardware="true")
def test_port_forwarding_on_ip_from_non_src_nat_ip_range(self):
"""Test for port forwarding on a IP which is in pubic IP range different
from public IP range that has source NAT IP associated with network
"""
# Validate the following:
# 1. Create a new public IP range and dedicate to a account
# 2. Acquire a IP from new public range
# 3. create a port forwarding on acquired IP from new range
# 4. Create a firewall rule to open up the port
# 5. Test SSH works to the VM
self.public_ip_range = PublicIpRange.create(
self.apiclient,
self.services["publiciprange"]
)
logger.debug("Dedicating Public IP range to the account");
dedicate_public_ip_range_response = PublicIpRange.dedicate(
self.apiclient,
self.public_ip_range.vlan.id,
account=self.account.name,
domainid=self.account.domainid
)
ip_address = PublicIPAddress.create(
self.apiclient,
self.account.name,
self.zone.id,
self.account.domainid,
self.services["virtual_machine"]
)
self.cleanup.append(ip_address)
self.cleanup.append(self.public_ip_range)
# Check if VM is in Running state before creating NAT and firewall rules
vm_response = VirtualMachine.list(
self.apiclient,
id=self.virtual_machine.id
)
self.assertEqual(
isinstance(vm_response, list),
True,
"Check list VM returns a valid list"
)
self.assertNotEqual(
len(vm_response),
0,
"Check Port Forwarding Rule is created"
)
self.assertEqual(
vm_response[0].state,
'Running',
"VM state should be Running before creating a NAT rule."
)
# Open up firewall port for SSH
FireWallRule.create(
self.apiclient,
ipaddressid=ip_address.ipaddress.id,
protocol=self.services["natrule"]["protocol"],
cidrlist=['0.0.0.0/0'],
startport=self.services["natrule"]["publicport"],
endport=self.services["natrule"]["publicport"]
)
# Create PF rule
nat_rule = NATRule.create(
self.apiclient,
self.virtual_machine,
self.services["natrule"],
ip_address.ipaddress.id
)
try:
logger.debug("SSHing into VM with IP address %s with NAT IP %s" %
(
self.virtual_machine.ipaddress,
ip_address.ipaddress.ipaddress
))
self.virtual_machine.get_ssh_client(ip_address.ipaddress.ipaddress)
except Exception as e:
self.fail(
"SSH Access failed for %s: %s" %
(self.virtual_machine.ipaddress, e)
)
nat_rule.delete(self.apiclient)
class TestStaticNat(cloudstackTestCase):
@classmethod
def setUpClass(cls):
testClient = super(TestStaticNat, cls).getClsTestClient()
cls.apiclient = testClient.getApiClient()
cls.services = testClient.getParsedTestDataConfig()
cls.hypervisor = testClient.getHypervisorInfo()
# Get Zone, Domain and templates
cls.domain = get_domain(cls.apiclient)
cls.zone = get_zone(cls.apiclient, testClient.getZoneForTests())
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
cls.services["zoneid"] = cls.zone.id
template = get_template(
cls.apiclient,
cls.zone.id,
cls.services["ostype"]
)
if template == FAILED:
assert False, "get_template() failed to return template with description %s" % cls.services[
"ostype"]
# Create an account, network, VM and IP addresses
cls.account = Account.create(
cls.apiclient,
cls.services["account"],
admin=True,
domainid=cls.domain.id
)
cls.services["publiciprange"]["zoneid"] = cls.zone.id
cls.service_offering = ServiceOffering.create(
cls.apiclient,
cls.services["service_offerings"]["tiny"]
)
cls.virtual_machine = VirtualMachine.create(
cls.apiclient,
cls.services["virtual_machine"],
templateid=template.id,
accountid=cls.account.name,
domainid=cls.account.domainid,
serviceofferingid=cls.service_offering.id
)
cls.defaultNetworkId = cls.virtual_machine.nic[0].networkid
cls._cleanup = [
cls.virtual_machine,
cls.account,
cls.service_offering
]
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.cleanup = []
return
@classmethod
def tearDownClass(cls):
try:
cls.apiclient = super(
TestStaticNat,
cls).getClsTestClient().getApiClient()
cleanup_resources(cls.apiclient, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
def tearDown(self):
cleanup_resources(self.apiclient, self.cleanup)
return
@attr(tags=["advanced", "smoke"], required_hardware="true")
def test_static_nat_on_ip_from_non_src_nat_ip_range(self):
"""Test for static nat on a IP which is in pubic IP range different
from public IP range that has source NAT IP associated with network
"""
# Validate the following:
# 1. Create a new public IP range and dedicate to a account
# 2. Acquire a IP from new public range
# 3. Enable static NAT on acquired IP from new range
# 4. Create a firewall rule to open up the port
# 5. Test SSH works to the VM
self.public_ip_range = PublicIpRange.create(
self.apiclient,
self.services["publiciprange"]
)
logger.debug("Dedicating Public IP range to the account");
dedicate_public_ip_range_response = PublicIpRange.dedicate(
self.apiclient,
self.public_ip_range.vlan.id,
account=self.account.name,
domainid=self.account.domainid
)
ip_address = PublicIPAddress.create(
self.apiclient,
self.account.name,
self.zone.id,
self.account.domainid,
self.services["virtual_machine"]
)
self.cleanup.append(ip_address)
self.cleanup.append(self.public_ip_range)
# Check if VM is in Running state before creating NAT and firewall rules
vm_response = VirtualMachine.list(
self.apiclient,
id=self.virtual_machine.id
)
self.assertEqual(
isinstance(vm_response, list),
True,
"Check list VM returns a valid list"
)
self.assertNotEqual(
len(vm_response),
0,
"Check Port Forwarding Rule is created"
)
self.assertEqual(
vm_response[0].state,
'Running',
"VM state should be Running before creating a NAT rule."
)
# Open up firewall port for SSH
FireWallRule.create(
self.apiclient,
ipaddressid=ip_address.ipaddress.id,
protocol=self.services["natrule"]["protocol"],
cidrlist=['0.0.0.0/0'],
startport=self.services["natrule"]["publicport"],
endport=self.services["natrule"]["publicport"]
)
# Create Static NAT rule
StaticNATRule.enable(
self.apiclient,
ip_address.ipaddress.id,
self.virtual_machine.id,
self.defaultNetworkId
)
try:
logger.debug("SSHing into VM with IP address %s with NAT IP %s" %
(
self.virtual_machine.ipaddress,
ip_address.ipaddress.ipaddress
))
self.virtual_machine.get_ssh_client(ip_address.ipaddress.ipaddress)
except Exception as e:
self.fail(
"SSH Access failed for %s: %s" %
(self.virtual_machine.ipaddress, e)
)
StaticNATRule.disable(
self.apiclient,
ip_address.ipaddress.id,
self.virtual_machine.id
)
class TestRouting(cloudstackTestCase):
@classmethod
def setUpClass(cls):
testClient = super(TestRouting, cls).getClsTestClient()
cls.apiclient = testClient.getApiClient()
cls.services = testClient.getParsedTestDataConfig()
cls.hypervisor = testClient.getHypervisorInfo()
# Get Zone, Domain and templates
cls.domain = get_domain(cls.apiclient)
cls.zone = get_zone(cls.apiclient, testClient.getZoneForTests())
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
cls.services["zoneid"] = cls.zone.id
template = get_template(
cls.apiclient,
cls.zone.id,
cls.services["ostype"]
)
if template == FAILED:
assert False, "get_template() failed to return template with description %s" % cls.services[
"ostype"]
# Create an account, network, VM and IP addresses
cls.account = Account.create(
cls.apiclient,
cls.services["account"],
admin=True,
domainid=cls.domain.id
)
cls.services["publiciprange"]["zoneid"] = cls.zone.id
cls.service_offering = ServiceOffering.create(
cls.apiclient,
cls.services["service_offerings"]["tiny"]
)
cls.hostConfig = cls.config.__dict__["zones"][0].__dict__["pods"][0].__dict__["clusters"][0].__dict__["hosts"][0].__dict__
cls.virtual_machine = VirtualMachine.create(
cls.apiclient,
cls.services["virtual_machine"],
templateid=template.id,
accountid=cls.account.name,
domainid=cls.account.domainid,
serviceofferingid=cls.service_offering.id
)
cls._cleanup = [
cls.virtual_machine,
cls.account,
cls.service_offering
]
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.cleanup = []
return
@classmethod
def tearDownClass(cls):
try:
cls.apiclient = super(
TestRouting,
cls).getClsTestClient().getApiClient()
cleanup_resources(cls.apiclient, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
def tearDown(self):
cleanup_resources(self.apiclient, self.cleanup)
return
@attr(tags=["advanced", "smoke"], required_hardware="true")
def test_routing_tables(self):
"""Test routing table in case we have IP associated with a network which is in
different pubic IP range from that of public IP range that has source NAT IP.
When IP is associated we should see a new route table created.
When IP is associated we should see a that route table is deleted.
"""
# Validate the following:
# 1. Create a new public IP range and dedicate to a account
# 2. Acquire a IP from new public range
# 3. Create a firewall rule to open up the port, so that IP is associated with network
# 5. Login to VR and verify routing tables, there should be Table_eth3
# 6. Delete firewall rule, since its last IP, routing table Table_eth3 should be deleted
self.public_ip_range = PublicIpRange.create(
self.apiclient,
self.services["publiciprange"]
)
self._cleanup.append(self.public_ip_range)
logger.debug("Dedicating Public IP range to the account");
dedicate_public_ip_range_response = PublicIpRange.dedicate(
self.apiclient,
self.public_ip_range.vlan.id,
account=self.account.name,
domainid=self.account.domainid
)
ip_address = PublicIPAddress.create(
self.apiclient,
self.account.name,
self.zone.id,
self.account.domainid,
self.services["virtual_machine"]
)
self.cleanup.append(ip_address)
self.cleanup.append(self.public_ip_range)
# Check if VM is in Running state before creating NAT and firewall rules
vm_response = VirtualMachine.list(
self.apiclient,
id=self.virtual_machine.id
)
self.assertEqual(
isinstance(vm_response, list),
True,
"Check list VM returns a valid list"
)
self.assertNotEqual(
len(vm_response),
0,
"Check Port Forwarding Rule is created"
)
self.assertEqual(
vm_response[0].state,
'Running',
"VM state should be Running before creating Firewall rule."
)
# Open up firewall port for SSH, this will associate IP with VR
firewall_rule = FireWallRule.create(
self.apiclient,
ipaddressid=ip_address.ipaddress.id,
protocol=self.services["natrule"]["protocol"],
cidrlist=['0.0.0.0/0'],
startport=self.services["natrule"]["publicport"],
endport=self.services["natrule"]["publicport"]
)
# Get the router details associated with account
routers = list_routers(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid,
)
router = routers[0]
if (self.hypervisor.lower() == 'vmware'
or self.hypervisor.lower() == 'hyperv'):
result = get_process_status(
self.apiclient.connection.mgtSvr,
22,
self.apiclient.connection.user,
self.apiclient.connection.passwd,
router.linklocalip,
'ip route list table Table_eth3',
hypervisor=self.hypervisor
)
else:
hosts = list_hosts(
self.apiclient,
id=router.hostid,
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check for list hosts response return valid data"
)
host = hosts[0]
host.user = self.hostConfig['username']
host.passwd = self.hostConfig['password']
try:
result = get_process_status(
host.ipaddress,
22,
host.user,
host.passwd,
router.linklocalip,
'ip route list table Table_eth3'
)
except KeyError:
self.skipTest(
"Provide a marvin config file with host\
credentials to run %s" %
self._testMethodName)
logger.debug("ip route list table Table_eth3: %s" % result)
public_range_gateway = self.services["publiciprange"]["gateway"]
default_route_rule = "default via " + public_range_gateway + " dev eth3 proto static"
logger.debug("default route result: %s" % str(result[0]))
self.assertEqual(
default_route_rule,
str(result[0]),
"Check default route table entry for public ip range"
)
res = str(result)
self.assertEqual(
res.count("throw") == 2,
True,
"Check routing rules to throw rest of the traffic. Count shoule be Atleast 2 for the control and guest traffic "
)
firewall_rule.delete(self.apiclient)
if (self.hypervisor.lower() == 'vmware'
or self.hypervisor.lower() == 'hyperv'):
result = get_process_status(
self.apiclient.connection.mgtSvr,
22,
self.apiclient.connection.user,
self.apiclient.connection.passwd,
router.linklocalip,
'ip route list table Table_eth3',
hypervisor=self.hypervisor
)
else:
hosts = list_hosts(
self.apiclient,
id=router.hostid,
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check for list hosts response return valid data"
)
host = hosts[0]
host.user = self.hostConfig['username']
host.passwd = self.hostConfig['password']
try:
result = get_process_status(
host.ipaddress,
22,
host.user,
host.passwd,
router.linklocalip,
'ip route list table Table_eth3'
)
except KeyError:
self.skipTest(
"Provide a marvin config file with host\
credentials to run %s" %
self._testMethodName)
logger.debug("ip route list table Table_eth3: %s" % result)
res = str(result)
self.assertEqual(
res.count("default via"),
0,
"Check to ensure there should not be any default rule"
)
self.assertEqual(
res.count("throw"),
0,
"Check to ensure there should not be any throw rule"
)
class TestIptables(cloudstackTestCase):
@classmethod
def setUpClass(cls):
testClient = super(TestIptables, cls).getClsTestClient()
cls.apiclient = testClient.getApiClient()
cls.services = testClient.getParsedTestDataConfig()
cls.hypervisor = testClient.getHypervisorInfo()
# Get Zone, Domain and templates
cls.domain = get_domain(cls.apiclient)
cls.zone = get_zone(cls.apiclient, testClient.getZoneForTests())
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
cls.services["zoneid"] = cls.zone.id
template = get_template(
cls.apiclient,
cls.zone.id,
cls.services["ostype"]
)
if template == FAILED:
assert False, "get_template() failed to return template with description %s" % cls.services[
"ostype"]
# Create an account, network, VM and IP addresses
cls.account = Account.create(
cls.apiclient,
cls.services["account"],
admin=True,
domainid=cls.domain.id
)
cls.services["publiciprange"]["zoneid"] = cls.zone.id
cls.service_offering = ServiceOffering.create(
cls.apiclient,
cls.services["service_offerings"]["tiny"]
)
cls.hostConfig = cls.config.__dict__["zones"][0].__dict__["pods"][0].__dict__["clusters"][0].__dict__["hosts"][0].__dict__
cls.virtual_machine = VirtualMachine.create(
cls.apiclient,
cls.services["virtual_machine"],
templateid=template.id,
accountid=cls.account.name,
domainid=cls.account.domainid,
serviceofferingid=cls.service_offering.id
)
cls._cleanup = [
cls.virtual_machine,
cls.account,
cls.service_offering
]
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.cleanup = []
return
@classmethod
def tearDownClass(cls):
try:
cls.apiclient = super(
TestIptables,
cls).getClsTestClient().getApiClient()
cleanup_resources(cls.apiclient, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
def tearDown(self):
cleanup_resources(self.apiclient, self.cleanup)
return
@attr(tags=["advanced", "smoke"], required_hardware="true")
def test_iptable_rules(self):
"""Test iptable rules in case we have IP associated with a network which is in
different pubic IP range from that of public IP range that has source NAT IP.
When IP is associated we should see a rule '-i eth3 -o eth0 -m state --state RELATED,ESTABLISHED -j ACCEPT' in FORWARD table.
When IP is dis-associated we should see a rule in the FORWARD table is deleted.
"""
# Validate the following:
# 1. Create a new public IP range and dedicate to a account
# 2. Acquire a IP from new public range
# 3. Create a firewall rule to open up the port, so that IP is associated with network
# 5. Login to VR and verify routing tables, there should be Table_eth3
# 6. Delete firewall rule, since its last IP, routing table Table_eth3 should be deleted
self.public_ip_range = PublicIpRange.create(
self.apiclient,
self.services["publiciprange"]
)
self._cleanup.append(self.public_ip_range)
logger.debug("Dedicating Public IP range to the account");
dedicate_public_ip_range_response = PublicIpRange.dedicate(
self.apiclient,
self.public_ip_range.vlan.id,
account=self.account.name,
domainid=self.account.domainid
)
ip_address = PublicIPAddress.create(
self.apiclient,
self.account.name,
self.zone.id,
self.account.domainid,
self.services["virtual_machine"]
)
self.cleanup.append(ip_address)
# Check if VM is in Running state before creating NAT and firewall rules
vm_response = VirtualMachine.list(
self.apiclient,
id=self.virtual_machine.id
)
self.assertEqual(
isinstance(vm_response, list),
True,
"Check list VM returns a valid list"
)
self.assertNotEqual(
len(vm_response),
0,
"Check Port Forwarding Rule is created"
)
self.assertEqual(
vm_response[0].state,
'Running',
"VM state should be Running before creating a NAT rule."
)
# Open up firewall port for SSH
firewall_rule = FireWallRule.create(
self.apiclient,
ipaddressid=ip_address.ipaddress.id,
protocol=self.services["natrule"]["protocol"],
cidrlist=['0.0.0.0/0'],
startport=self.services["natrule"]["publicport"],
endport=self.services["natrule"]["publicport"]
)
# Get the router details associated with account
routers = list_routers(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid,
)
router = routers[0]
if (self.hypervisor.lower() == 'vmware'
or self.hypervisor.lower() == 'hyperv'):
result = get_process_status(
self.apiclient.connection.mgtSvr,
22,
self.apiclient.connection.user,
self.apiclient.connection.passwd,
router.linklocalip,
'iptables -t filter -L FORWARD -v',
hypervisor=self.hypervisor
)
else:
hosts = list_hosts(
self.apiclient,
id=router.hostid,
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check for list hosts response return valid data"
)
host = hosts[0]
host.user = self.hostConfig['username']
host.passwd = self.hostConfig['password']
try:
result = get_process_status(
host.ipaddress,
22,
host.user,
host.passwd,
router.linklocalip,
'iptables -t filter -L FORWARD -v'
)
except KeyError:
self.skipTest(
"Provide a marvin config file with host\
credentials to run %s" %
self._testMethodName)
logger.debug("iptables -t filter -L FORWARD -v: %s" % result)
res = str(result)
self.assertEqual(
res.count("eth3 eth0 anywhere anywhere state RELATED,ESTABLISHED"),
1,
"Check to ensure there is a iptable rule to accept the RELATED,ESTABLISHED traffic"
)
firewall_rule.delete(self.apiclient)
class TestVPCPortForwarding(cloudstackTestCase):
@classmethod
def setUpClass(cls):
socket.setdefaulttimeout(60)
testClient = super(TestVPCPortForwarding, cls).getClsTestClient()
cls.api_client = cls.testClient.getApiClient()
cls.services = testClient.getParsedTestDataConfig()
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client)
cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests())
cls.template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostype"]
)
cls.services["vpc_offering"] = { "name": 'VPC off',
"displaytext": 'VPC off',
"supportedservices": 'Dhcp,Dns,SourceNat,PortForwarding,Vpn,Lb,UserData,StaticNat',
}
cls.services["network_offering"] = {
"name": 'VPC Network offering',
"displaytext": 'VPC Network off',
"guestiptype": 'Isolated',
"supportedservices": 'Vpn,Dhcp,Dns,SourceNat,PortForwarding,Lb,UserData,StaticNat,NetworkACL',
"traffictype": 'GUEST',
"availability": 'Optional',
"useVpc": 'on',
"serviceProviderList": {
"Vpn": 'VpcVirtualRouter',
"Dhcp": 'VpcVirtualRouter',
"Dns": 'VpcVirtualRouter',
"SourceNat": 'VpcVirtualRouter',
"PortForwarding": 'VpcVirtualRouter',
"Lb": 'VpcVirtualRouter',
"UserData": 'VpcVirtualRouter',
"StaticNat": 'VpcVirtualRouter',
"NetworkACL": 'VpcVirtualRouter'
},
}
cls.services["network"] = {
"name": "Test Network",
"displaytext": "Test Network",
"netmask": '255.255.255.0'
}
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
cls.services["virtual_machine"]["template"] = cls.template.id
cls.services["publiciprange"]["zoneid"] = cls.zone.id
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
cls._cleanup = [cls.service_offering]
return
@classmethod
def tearDownClass(cls):
try:
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.account = Account.create(
self.apiclient,
self.services["account"],
admin=True,
domainid=self.domain.id
)
self.cleanup = [self.account]
logger.debug("Creating a VPC offering..")
self.vpc_off = VpcOffering.create(
self.apiclient,
self.services["vpc_offering"]
)
self._cleanup.append(self.vpc_off)
logger.debug("Enabling the VPC offering created")
self.vpc_off.update(self.apiclient, state='Enabled')
logger.debug("Creating a VPC network in the account: %s" % self.account.name)
self.services["vpc"]["cidr"] = '10.1.0.0/16'
self.vpc = VPC.create(
self.apiclient,
self.services["vpc"],
vpcofferingid=self.vpc_off.id,
zoneid=self.zone.id,
account=self.account.name,
domainid=self.account.domainid
)
return
def tearDown(self):
try:
#Clean up, terminate the created network offerings
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
logger.debug("Warning: Exception during cleanup : %s" % e)
return
def check_ssh_into_vm(self, vm, public_ip, testnegative=False):
logger.debug("Checking if we can SSH into VM=%s on public_ip=%s" % (vm.name, public_ip.ipaddress.ipaddress))
try:
vm.get_ssh_client(ipaddress=public_ip.ipaddress.ipaddress)
if not testnegative:
logger.debug("SSH into VM=%s on public_ip=%s is successfully" % (vm.name, public_ip.ipaddress.ipaddress))
else:
self.fail("SSH into VM=%s on public_ip=%s is successfully" % (vm.name, public_ip.ipaddress.ipaddress))
except:
if not testnegative:
self.fail("Failed to SSH into VM - %s" % (public_ip.ipaddress.ipaddress))
else:
logger.debug("Failed to SSH into VM - %s" % (public_ip.ipaddress.ipaddress))
def create_natrule(self, vm, public_ip, network, services=None):
logger.debug("Creating NAT rule in network for vm with public IP")
if not services:
services = self.services["natrule"]
nat_rule = NATRule.create(self.apiclient,
vm,
services,
ipaddressid=public_ip.ipaddress.id,
openfirewall=False,
networkid=network.id,
vpcid=self.vpc.id
)
return nat_rule
def acquire_publicip(self, network):
logger.debug("Associating public IP for network: %s" % network.name)
public_ip = PublicIPAddress.create(self.apiclient,
accountid=self.account.name,
zoneid=self.zone.id,
domainid=self.account.domainid,
networkid=network.id,
vpcid=self.vpc.id
)
logger.debug("Associated %s with network %s" % (public_ip.ipaddress.ipaddress,
network.id
))
return public_ip
def create_network(self, net_offerring, gateway='10.1.1.1',vpc=None):
try:
logger.debug('Create NetworkOffering')
net_offerring["name"] = "NET_OFF-" + str(gateway)
nw_off = NetworkOffering.create(self.apiclient,
net_offerring,
conservemode=False
)
# Enable Network offering
nw_off.update(self.apiclient, state='Enabled')
self._cleanup.append(nw_off)
logger.debug('Created and Enabled NetworkOffering')
self.services["network"]["name"] = "NETWORK-" + str(gateway)
logger.debug('Adding Network=%s' % self.services["network"])
default_acl = NetworkACLList.list(self.apiclient, name="default_allow")[0]
obj_network = Network.create(self.apiclient,
self.services["network"],
accountid=self.account.name,
domainid=self.account.domainid,
networkofferingid=nw_off.id,
zoneid=self.zone.id,
gateway=gateway,
aclid=default_acl.id,
vpcid=vpc.id if vpc else self.vpc.id
)
logger.debug("Created network with ID: %s" % obj_network.id)
return obj_network
except Exception, e:
self.fail('Unable to create a Network with offering=%s because of %s ' % (net_offerring, e))
def deployvm_in_network(self, network, host_id=None):
try:
logger.debug('Creating VM in network=%s' % network.name)
vm = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
networkids=[str(network.id)],
hostid=host_id
)
logger.debug('Created VM=%s in network=%s' % (vm.id, network.name))
return vm
except:
self.fail('Unable to create VM in a Network=%s' % network.name)
@attr(tags=["advanced", "intervlan"], required_hardware="true")
def test_network_services_VPC_CreatePF(self):
""" Test Create VPC PF rules on acquired public ip when VpcVirtualRouter is Running
"""
# Validate the following
# 1. Create a VPC with cidr - 10.1.1.1/16
# 2. Create a Network offering - NO1 with all supported services
# 3. Add network1(10.1.1.1/24) using N01 to this VPC.
# 4. Deploy vm1 in network1.
# 5. Use the Create PF rule for vm in network1.
# 6. Successfully ssh into the Guest VM using the PF rule
network_1 = self.create_network(self.services["network_offering"])
vm_1 = self.deployvm_in_network(network_1)
self.public_ip_range = PublicIpRange.create(
self.apiclient,
self.services["publiciprange"]
)
self._cleanup.append(self.public_ip_range)
logger.debug("Dedicating Public IP range to the account");
dedicate_public_ip_range_response = PublicIpRange.dedicate(
self.apiclient,
self.public_ip_range.vlan.id,
account=self.account.name,
domainid=self.account.domainid
)
public_ip_1 = self.acquire_publicip(network_1)
self.create_natrule( vm_1, public_ip_1, network_1)
self.check_ssh_into_vm(vm_1, public_ip_1, testnegative=False)
self.public_ip_range.release(self.apiclient)
return
class TestVPCStaticNat(cloudstackTestCase):
@classmethod
def setUpClass(cls):
socket.setdefaulttimeout(60)
testClient = super(TestVPCStaticNat, cls).getClsTestClient()
cls.api_client = cls.testClient.getApiClient()
cls.services = testClient.getParsedTestDataConfig()
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client)
cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests())
cls.template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostype"]
)
cls.services["vpc_offering"] = { "name": 'VPC off',
"displaytext": 'VPC off',
"supportedservices": 'Dhcp,Dns,SourceNat,PortForwarding,Vpn,Lb,UserData,StaticNat',
}
cls.services["network_offering"] = {
"name": 'VPC Network offering',
"displaytext": 'VPC Network off',
"guestiptype": 'Isolated',
"supportedservices": 'Vpn,Dhcp,Dns,SourceNat,PortForwarding,Lb,UserData,StaticNat,NetworkACL',
"traffictype": 'GUEST',
"availability": 'Optional',
"useVpc": 'on',
"serviceProviderList": {
"Vpn": 'VpcVirtualRouter',
"Dhcp": 'VpcVirtualRouter',
"Dns": 'VpcVirtualRouter',
"SourceNat": 'VpcVirtualRouter',
"PortForwarding": 'VpcVirtualRouter',
"Lb": 'VpcVirtualRouter',
"UserData": 'VpcVirtualRouter',
"StaticNat": 'VpcVirtualRouter',
"NetworkACL": 'VpcVirtualRouter'
},
}
cls.services["network"] = {
"name": "Test Network",
"displaytext": "Test Network",
"netmask": '255.255.255.0'
}
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
cls.services["virtual_machine"]["template"] = cls.template.id
cls.services["publiciprange"]["zoneid"] = cls.zone.id
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
cls._cleanup = [cls.service_offering]
return
@classmethod
def tearDownClass(cls):
try:
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.account = Account.create(
self.apiclient,
self.services["account"],
admin=True,
domainid=self.domain.id
)
self.cleanup = [self.account]
logger.debug("Creating a VPC offering..")
self.vpc_off = VpcOffering.create(
self.apiclient,
self.services["vpc_offering"]
)
self._cleanup.append(self.vpc_off)
logger.debug("Enabling the VPC offering created")
self.vpc_off.update(self.apiclient, state='Enabled')
logger.debug("Creating a VPC network in the account: %s" % self.account.name)
self.services["vpc"]["cidr"] = '10.1.0.0/16'
self.vpc = VPC.create(
self.apiclient,
self.services["vpc"],
vpcofferingid=self.vpc_off.id,
zoneid=self.zone.id,
account=self.account.name,
domainid=self.account.domainid
)
return
def tearDown(self):
try:
#Clean up, terminate the created network offerings
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
logger.debug("Warning: Exception during cleanup : %s" % e)
return
def check_ssh_into_vm(self, vm, public_ip, testnegative=False):
logger.debug("Checking if we can SSH into VM=%s on public_ip=%s" % (vm.name, public_ip.ipaddress.ipaddress))
try:
vm.get_ssh_client(ipaddress=public_ip.ipaddress.ipaddress)
if not testnegative:
logger.debug("SSH into VM=%s on public_ip=%s is successfully" % (vm.name, public_ip.ipaddress.ipaddress))
else:
self.fail("SSH into VM=%s on public_ip=%s is successfully" % (vm.name, public_ip.ipaddress.ipaddress))
except:
if not testnegative:
self.fail("Failed to SSH into VM - %s" % (public_ip.ipaddress.ipaddress))
else:
logger.debug("Failed to SSH into VM - %s" % (public_ip.ipaddress.ipaddress))
def acquire_publicip(self, network):
logger.debug("Associating public IP for network: %s" % network.name)
public_ip = PublicIPAddress.create(self.apiclient,
accountid=self.account.name,
zoneid=self.zone.id,
domainid=self.account.domainid,
networkid=network.id,
vpcid=self.vpc.id
)
logger.debug("Associated %s with network %s" % (public_ip.ipaddress.ipaddress,
network.id
))
return public_ip
def create_network(self, net_offerring, gateway='10.1.1.1',vpc=None):
try:
logger.debug('Create NetworkOffering')
net_offerring["name"] = "NET_OFF-" + str(gateway)
nw_off = NetworkOffering.create(self.apiclient,
net_offerring,
conservemode=False
)
# Enable Network offering
nw_off.update(self.apiclient, state='Enabled')
self._cleanup.append(nw_off)
logger.debug('Created and Enabled NetworkOffering')
self.services["network"]["name"] = "NETWORK-" + str(gateway)
logger.debug('Adding Network=%s' % self.services["network"])
default_acl = NetworkACLList.list(self.apiclient, name="default_allow")[0]
obj_network = Network.create(self.apiclient,
self.services["network"],
accountid=self.account.name,
domainid=self.account.domainid,
networkofferingid=nw_off.id,
zoneid=self.zone.id,
gateway=gateway,
aclid=default_acl.id,
vpcid=vpc.id if vpc else self.vpc.id
)
logger.debug("Created network with ID: %s" % obj_network.id)
return obj_network
except Exception, e:
self.fail('Unable to create a Network with offering=%s because of %s ' % (net_offerring, e))
def deployvm_in_network(self, network, host_id=None):
try:
logger.debug('Creating VM in network=%s' % network.name)
vm = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
networkids=[str(network.id)],
hostid=host_id
)
logger.debug('Created VM=%s in network=%s' % (vm.id, network.name))
return vm
except:
self.fail('Unable to create VM in a Network=%s' % network.name)
def create_StaticNatRule_For_VM(self, vm, public_ip, network, services=None):
logger.debug("Enabling static NAT for IP: %s" %public_ip.ipaddress.ipaddress)
if not services:
services = self.services["natrule"]
try:
StaticNATRule.enable(
self.apiclient,
ipaddressid=public_ip.ipaddress.id,
virtualmachineid=vm.id,
networkid=network.id
)
logger.debug("Static NAT enabled for IP: %s" %
public_ip.ipaddress.ipaddress)
logger.debug("Adding NetworkACL rules to make NAT rule accessible")
except Exception as e:
self.fail("Failed to enable static NAT on IP: %s - %s" % (
public_ip.ipaddress.ipaddress, e))
@attr(tags=["advanced", "intervlan"], required_hardware="true")
def test_network_services_VPC_CreatePF(self):
""" Test Create VPC PF rules on acquired public ip when VpcVirtualRouter is Running
"""
# Validate the following
# 1. Create a VPC with cidr - 10.1.1.1/16
# 2. Create a Network offering - NO1 with all supported services
# 3. Add network1(10.1.1.1/24) using N01 to this VPC.
# 4. Deploy vm1 in network1.
# 5. Use the Create PF rule for vm in network1.
# 6. Successfully ssh into the Guest VM using the PF rule
network_1 = self.create_network(self.services["network_offering"])
vm_1 = self.deployvm_in_network(network_1)
self.public_ip_range = PublicIpRange.create(
self.apiclient,
self.services["publiciprange"]
)
self._cleanup.append(self.public_ip_range)
logger.debug("Dedicating Public IP range to the account");
dedicate_public_ip_range_response = PublicIpRange.dedicate(
self.apiclient,
self.public_ip_range.vlan.id,
account=self.account.name,
domainid=self.account.domainid
)
public_ip_1 = self.acquire_publicip(network_1)
self.create_StaticNatRule_For_VM( vm_1, public_ip_1, network_1)
self.check_ssh_into_vm(vm_1, public_ip_1, testnegative=False)
self.public_ip_range.release(self.apiclient)
return