mirror of
https://github.com/apache/cloudstack.git
synced 2025-10-26 08:42:29 +01:00
-when processing static nat rule, add a mangle table rule, to mark the traffic from the guest vm when it has associated static nat rule so that traffic gets routed using the route tabe of the device which has public ip associated -fix the case where nic_device_id is empty when ip is getting disassociated resulting in empty deviceid in ips.json -add utility methods in CsRule, and CsRoute to add 'ip rule' and 'ip route' rules respectivley -ensure traffic from all public interfaces are connection marked with device number, and restored for the reverse traffic. use the connection marked number to do device specific routing table lookup fill the device specific routing table with default route -component tests for testing multiple public interfaces of VR
1313 lines
54 KiB
Python
1313 lines
54 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
""" BVT tests for network services on public IP's from different public IP
|
|
range than that of associated source NAT IP of the network. Each IP associated
|
|
with network from a different public IP range results in a new public
|
|
interface on VR (eth3, eth4 etc) and iptable
|
|
"""
|
|
# Import Local Modules
|
|
from marvin.codes import (FAILED, STATIC_NAT_RULE, LB_RULE,
|
|
NAT_RULE, PASS)
|
|
from marvin.cloudstackTestCase import cloudstackTestCase
|
|
from marvin.cloudstackException import CloudstackAPIException
|
|
from marvin.cloudstackAPI import rebootRouter
|
|
from marvin.sshClient import SshClient
|
|
from marvin.lib.utils import cleanup_resources, get_process_status
|
|
from marvin.lib.base import (Account,
|
|
VirtualMachine,
|
|
ServiceOffering,
|
|
NATRule,
|
|
PublicIPAddress,
|
|
StaticNATRule,
|
|
FireWallRule,
|
|
Network,
|
|
NetworkOffering,
|
|
LoadBalancerRule,
|
|
PublicIpRange,
|
|
Router,
|
|
VpcOffering,
|
|
VPC,
|
|
NetworkACLList,
|
|
NetworkACL)
|
|
from marvin.lib.common import (get_domain,
|
|
get_zone,
|
|
get_template,
|
|
list_hosts,
|
|
list_publicIP,
|
|
list_nat_rules,
|
|
list_routers,
|
|
list_virtual_machines,
|
|
list_lb_rules,
|
|
list_configurations,
|
|
verifyGuestTrafficPortGroups)
|
|
from nose.plugins.attrib import attr
|
|
from ddt import ddt, data
|
|
# Import System modules
|
|
import socket
|
|
import time
|
|
import logging
|
|
|
|
_multiprocess_shared_ = True
|
|
|
|
logger = logging.getLogger('TestNetworkOps')
|
|
stream_handler = logging.StreamHandler()
|
|
logger.setLevel(logging.DEBUG)
|
|
logger.addHandler(stream_handler)
|
|
|
|
class TestPortForwarding(cloudstackTestCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
|
|
testClient = super(TestPortForwarding, cls).getClsTestClient()
|
|
cls.apiclient = testClient.getApiClient()
|
|
cls.services = testClient.getParsedTestDataConfig()
|
|
cls.hypervisor = testClient.getHypervisorInfo()
|
|
# Get Zone, Domain and templates
|
|
cls.domain = get_domain(cls.apiclient)
|
|
cls.zone = get_zone(cls.apiclient, testClient.getZoneForTests())
|
|
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
|
|
cls.services["zoneid"] = cls.zone.id
|
|
template = get_template(
|
|
cls.apiclient,
|
|
cls.zone.id,
|
|
cls.services["ostype"]
|
|
)
|
|
if template == FAILED:
|
|
assert False, "get_template() failed to return template with description %s" % cls.services[
|
|
"ostype"]
|
|
|
|
# Create an account, network, VM and IP addresses
|
|
cls.account = Account.create(
|
|
cls.apiclient,
|
|
cls.services["account"],
|
|
admin=True,
|
|
domainid=cls.domain.id
|
|
)
|
|
cls.services["publiciprange"]["zoneid"] = cls.zone.id
|
|
cls.service_offering = ServiceOffering.create(
|
|
cls.apiclient,
|
|
cls.services["service_offerings"]["tiny"]
|
|
)
|
|
cls.virtual_machine = VirtualMachine.create(
|
|
cls.apiclient,
|
|
cls.services["virtual_machine"],
|
|
templateid=template.id,
|
|
accountid=cls.account.name,
|
|
domainid=cls.account.domainid,
|
|
serviceofferingid=cls.service_offering.id
|
|
)
|
|
cls._cleanup = [
|
|
cls.virtual_machine,
|
|
cls.account,
|
|
cls.service_offering
|
|
]
|
|
|
|
def setUp(self):
|
|
self.apiclient = self.testClient.getApiClient()
|
|
self.cleanup = []
|
|
return
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
try:
|
|
cls.apiclient = super(
|
|
TestPortForwarding,
|
|
cls).getClsTestClient().getApiClient()
|
|
cleanup_resources(cls.apiclient, cls._cleanup)
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
|
|
def tearDown(self):
|
|
cleanup_resources(self.apiclient, self.cleanup)
|
|
return
|
|
|
|
@attr(tags=["advanced", "smoke"], required_hardware="true")
|
|
def test_port_forwarding_on_ip_from_non_src_nat_ip_range(self):
|
|
"""Test for port forwarding on a IP which is in pubic IP range different
|
|
from public IP range that has source NAT IP associated with network
|
|
"""
|
|
|
|
# Validate the following:
|
|
# 1. Create a new public IP range and dedicate to a account
|
|
# 2. Acquire a IP from new public range
|
|
# 3. create a port forwarding on acquired IP from new range
|
|
# 4. Create a firewall rule to open up the port
|
|
# 5. Test SSH works to the VM
|
|
|
|
self.public_ip_range = PublicIpRange.create(
|
|
self.apiclient,
|
|
self.services["publiciprange"]
|
|
)
|
|
|
|
logger.debug("Dedicating Public IP range to the account");
|
|
dedicate_public_ip_range_response = PublicIpRange.dedicate(
|
|
self.apiclient,
|
|
self.public_ip_range.vlan.id,
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
ip_address = PublicIPAddress.create(
|
|
self.apiclient,
|
|
self.account.name,
|
|
self.zone.id,
|
|
self.account.domainid,
|
|
self.services["virtual_machine"]
|
|
)
|
|
self.cleanup.append(ip_address)
|
|
self.cleanup.append(self.public_ip_range)
|
|
# Check if VM is in Running state before creating NAT and firewall rules
|
|
vm_response = VirtualMachine.list(
|
|
self.apiclient,
|
|
id=self.virtual_machine.id
|
|
)
|
|
|
|
self.assertEqual(
|
|
isinstance(vm_response, list),
|
|
True,
|
|
"Check list VM returns a valid list"
|
|
)
|
|
|
|
self.assertNotEqual(
|
|
len(vm_response),
|
|
0,
|
|
"Check Port Forwarding Rule is created"
|
|
)
|
|
self.assertEqual(
|
|
vm_response[0].state,
|
|
'Running',
|
|
"VM state should be Running before creating a NAT rule."
|
|
)
|
|
|
|
# Open up firewall port for SSH
|
|
FireWallRule.create(
|
|
self.apiclient,
|
|
ipaddressid=ip_address.ipaddress.id,
|
|
protocol=self.services["natrule"]["protocol"],
|
|
cidrlist=['0.0.0.0/0'],
|
|
startport=self.services["natrule"]["publicport"],
|
|
endport=self.services["natrule"]["publicport"]
|
|
)
|
|
|
|
# Create PF rule
|
|
nat_rule = NATRule.create(
|
|
self.apiclient,
|
|
self.virtual_machine,
|
|
self.services["natrule"],
|
|
ip_address.ipaddress.id
|
|
)
|
|
|
|
try:
|
|
logger.debug("SSHing into VM with IP address %s with NAT IP %s" %
|
|
(
|
|
self.virtual_machine.ipaddress,
|
|
ip_address.ipaddress.ipaddress
|
|
))
|
|
self.virtual_machine.get_ssh_client(ip_address.ipaddress.ipaddress)
|
|
except Exception as e:
|
|
self.fail(
|
|
"SSH Access failed for %s: %s" %
|
|
(self.virtual_machine.ipaddress, e)
|
|
)
|
|
|
|
nat_rule.delete(self.apiclient)
|
|
|
|
class TestStaticNat(cloudstackTestCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
|
|
testClient = super(TestStaticNat, cls).getClsTestClient()
|
|
cls.apiclient = testClient.getApiClient()
|
|
cls.services = testClient.getParsedTestDataConfig()
|
|
cls.hypervisor = testClient.getHypervisorInfo()
|
|
# Get Zone, Domain and templates
|
|
cls.domain = get_domain(cls.apiclient)
|
|
cls.zone = get_zone(cls.apiclient, testClient.getZoneForTests())
|
|
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
|
|
cls.services["zoneid"] = cls.zone.id
|
|
template = get_template(
|
|
cls.apiclient,
|
|
cls.zone.id,
|
|
cls.services["ostype"]
|
|
)
|
|
if template == FAILED:
|
|
assert False, "get_template() failed to return template with description %s" % cls.services[
|
|
"ostype"]
|
|
|
|
# Create an account, network, VM and IP addresses
|
|
cls.account = Account.create(
|
|
cls.apiclient,
|
|
cls.services["account"],
|
|
admin=True,
|
|
domainid=cls.domain.id
|
|
)
|
|
cls.services["publiciprange"]["zoneid"] = cls.zone.id
|
|
cls.service_offering = ServiceOffering.create(
|
|
cls.apiclient,
|
|
cls.services["service_offerings"]["tiny"]
|
|
)
|
|
cls.virtual_machine = VirtualMachine.create(
|
|
cls.apiclient,
|
|
cls.services["virtual_machine"],
|
|
templateid=template.id,
|
|
accountid=cls.account.name,
|
|
domainid=cls.account.domainid,
|
|
serviceofferingid=cls.service_offering.id
|
|
)
|
|
cls.defaultNetworkId = cls.virtual_machine.nic[0].networkid
|
|
cls._cleanup = [
|
|
cls.virtual_machine,
|
|
cls.account,
|
|
cls.service_offering
|
|
]
|
|
|
|
def setUp(self):
|
|
self.apiclient = self.testClient.getApiClient()
|
|
self.cleanup = []
|
|
return
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
try:
|
|
cls.apiclient = super(
|
|
TestStaticNat,
|
|
cls).getClsTestClient().getApiClient()
|
|
cleanup_resources(cls.apiclient, cls._cleanup)
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
|
|
def tearDown(self):
|
|
cleanup_resources(self.apiclient, self.cleanup)
|
|
return
|
|
|
|
@attr(tags=["advanced", "smoke"], required_hardware="true")
|
|
def test_static_nat_on_ip_from_non_src_nat_ip_range(self):
|
|
"""Test for static nat on a IP which is in pubic IP range different
|
|
from public IP range that has source NAT IP associated with network
|
|
"""
|
|
|
|
# Validate the following:
|
|
# 1. Create a new public IP range and dedicate to a account
|
|
# 2. Acquire a IP from new public range
|
|
# 3. Enable static NAT on acquired IP from new range
|
|
# 4. Create a firewall rule to open up the port
|
|
# 5. Test SSH works to the VM
|
|
|
|
self.public_ip_range = PublicIpRange.create(
|
|
self.apiclient,
|
|
self.services["publiciprange"]
|
|
)
|
|
|
|
logger.debug("Dedicating Public IP range to the account");
|
|
dedicate_public_ip_range_response = PublicIpRange.dedicate(
|
|
self.apiclient,
|
|
self.public_ip_range.vlan.id,
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
ip_address = PublicIPAddress.create(
|
|
self.apiclient,
|
|
self.account.name,
|
|
self.zone.id,
|
|
self.account.domainid,
|
|
self.services["virtual_machine"]
|
|
)
|
|
self.cleanup.append(ip_address)
|
|
self.cleanup.append(self.public_ip_range)
|
|
# Check if VM is in Running state before creating NAT and firewall rules
|
|
vm_response = VirtualMachine.list(
|
|
self.apiclient,
|
|
id=self.virtual_machine.id
|
|
)
|
|
|
|
self.assertEqual(
|
|
isinstance(vm_response, list),
|
|
True,
|
|
"Check list VM returns a valid list"
|
|
)
|
|
|
|
self.assertNotEqual(
|
|
len(vm_response),
|
|
0,
|
|
"Check Port Forwarding Rule is created"
|
|
)
|
|
self.assertEqual(
|
|
vm_response[0].state,
|
|
'Running',
|
|
"VM state should be Running before creating a NAT rule."
|
|
)
|
|
|
|
# Open up firewall port for SSH
|
|
FireWallRule.create(
|
|
self.apiclient,
|
|
ipaddressid=ip_address.ipaddress.id,
|
|
protocol=self.services["natrule"]["protocol"],
|
|
cidrlist=['0.0.0.0/0'],
|
|
startport=self.services["natrule"]["publicport"],
|
|
endport=self.services["natrule"]["publicport"]
|
|
)
|
|
|
|
# Create Static NAT rule
|
|
StaticNATRule.enable(
|
|
self.apiclient,
|
|
ip_address.ipaddress.id,
|
|
self.virtual_machine.id,
|
|
self.defaultNetworkId
|
|
)
|
|
|
|
try:
|
|
logger.debug("SSHing into VM with IP address %s with NAT IP %s" %
|
|
(
|
|
self.virtual_machine.ipaddress,
|
|
ip_address.ipaddress.ipaddress
|
|
))
|
|
self.virtual_machine.get_ssh_client(ip_address.ipaddress.ipaddress)
|
|
except Exception as e:
|
|
self.fail(
|
|
"SSH Access failed for %s: %s" %
|
|
(self.virtual_machine.ipaddress, e)
|
|
)
|
|
|
|
StaticNATRule.disable(
|
|
self.apiclient,
|
|
ip_address.ipaddress.id,
|
|
self.virtual_machine.id
|
|
)
|
|
|
|
class TestRouting(cloudstackTestCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
|
|
testClient = super(TestRouting, cls).getClsTestClient()
|
|
cls.apiclient = testClient.getApiClient()
|
|
cls.services = testClient.getParsedTestDataConfig()
|
|
cls.hypervisor = testClient.getHypervisorInfo()
|
|
# Get Zone, Domain and templates
|
|
cls.domain = get_domain(cls.apiclient)
|
|
cls.zone = get_zone(cls.apiclient, testClient.getZoneForTests())
|
|
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
|
|
cls.services["zoneid"] = cls.zone.id
|
|
template = get_template(
|
|
cls.apiclient,
|
|
cls.zone.id,
|
|
cls.services["ostype"]
|
|
)
|
|
if template == FAILED:
|
|
assert False, "get_template() failed to return template with description %s" % cls.services[
|
|
"ostype"]
|
|
|
|
# Create an account, network, VM and IP addresses
|
|
cls.account = Account.create(
|
|
cls.apiclient,
|
|
cls.services["account"],
|
|
admin=True,
|
|
domainid=cls.domain.id
|
|
)
|
|
cls.services["publiciprange"]["zoneid"] = cls.zone.id
|
|
cls.service_offering = ServiceOffering.create(
|
|
cls.apiclient,
|
|
cls.services["service_offerings"]["tiny"]
|
|
)
|
|
cls.hostConfig = cls.config.__dict__["zones"][0].__dict__["pods"][0].__dict__["clusters"][0].__dict__["hosts"][0].__dict__
|
|
cls.virtual_machine = VirtualMachine.create(
|
|
cls.apiclient,
|
|
cls.services["virtual_machine"],
|
|
templateid=template.id,
|
|
accountid=cls.account.name,
|
|
domainid=cls.account.domainid,
|
|
serviceofferingid=cls.service_offering.id
|
|
)
|
|
cls._cleanup = [
|
|
cls.virtual_machine,
|
|
cls.account,
|
|
cls.service_offering
|
|
]
|
|
|
|
def setUp(self):
|
|
self.apiclient = self.testClient.getApiClient()
|
|
self.cleanup = []
|
|
return
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
try:
|
|
cls.apiclient = super(
|
|
TestRouting,
|
|
cls).getClsTestClient().getApiClient()
|
|
cleanup_resources(cls.apiclient, cls._cleanup)
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
|
|
def tearDown(self):
|
|
cleanup_resources(self.apiclient, self.cleanup)
|
|
return
|
|
|
|
@attr(tags=["advanced", "smoke"], required_hardware="true")
|
|
def test_routing_tables(self):
|
|
"""Test routing table in case we have IP associated with a network which is in
|
|
different pubic IP range from that of public IP range that has source NAT IP.
|
|
When IP is associated we should see a new route table created.
|
|
When IP is associated we should see a that route table is deleted.
|
|
"""
|
|
|
|
# Validate the following:
|
|
# 1. Create a new public IP range and dedicate to a account
|
|
# 2. Acquire a IP from new public range
|
|
# 3. Create a firewall rule to open up the port, so that IP is associated with network
|
|
# 5. Login to VR and verify routing tables, there should be Table_eth3
|
|
# 6. Delete firewall rule, since its last IP, routing table Table_eth3 should be deleted
|
|
|
|
self.public_ip_range = PublicIpRange.create(
|
|
self.apiclient,
|
|
self.services["publiciprange"]
|
|
)
|
|
|
|
logger.debug("Dedicating Public IP range to the account");
|
|
dedicate_public_ip_range_response = PublicIpRange.dedicate(
|
|
self.apiclient,
|
|
self.public_ip_range.vlan.id,
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
ip_address = PublicIPAddress.create(
|
|
self.apiclient,
|
|
self.account.name,
|
|
self.zone.id,
|
|
self.account.domainid,
|
|
self.services["virtual_machine"]
|
|
)
|
|
self.cleanup.append(ip_address)
|
|
self.cleanup.append(self.public_ip_range)
|
|
# Check if VM is in Running state before creating NAT and firewall rules
|
|
vm_response = VirtualMachine.list(
|
|
self.apiclient,
|
|
id=self.virtual_machine.id
|
|
)
|
|
|
|
self.assertEqual(
|
|
isinstance(vm_response, list),
|
|
True,
|
|
"Check list VM returns a valid list"
|
|
)
|
|
|
|
self.assertNotEqual(
|
|
len(vm_response),
|
|
0,
|
|
"Check Port Forwarding Rule is created"
|
|
)
|
|
self.assertEqual(
|
|
vm_response[0].state,
|
|
'Running',
|
|
"VM state should be Running before creating Firewall rule."
|
|
)
|
|
|
|
# Open up firewall port for SSH, this will associate IP with VR
|
|
firewall_rule = FireWallRule.create(
|
|
self.apiclient,
|
|
ipaddressid=ip_address.ipaddress.id,
|
|
protocol=self.services["natrule"]["protocol"],
|
|
cidrlist=['0.0.0.0/0'],
|
|
startport=self.services["natrule"]["publicport"],
|
|
endport=self.services["natrule"]["publicport"]
|
|
)
|
|
|
|
# Get the router details associated with account
|
|
routers = list_routers(
|
|
self.apiclient,
|
|
account=self.account.name,
|
|
domainid=self.account.domainid,
|
|
)
|
|
router = routers[0]
|
|
|
|
if (self.hypervisor.lower() == 'vmware'
|
|
or self.hypervisor.lower() == 'hyperv'):
|
|
result = get_process_status(
|
|
self.apiclient.connection.mgtSvr,
|
|
22,
|
|
self.apiclient.connection.user,
|
|
self.apiclient.connection.passwd,
|
|
router.linklocalip,
|
|
'ip route list table Table_eth3',
|
|
hypervisor=self.hypervisor
|
|
)
|
|
else:
|
|
hosts = list_hosts(
|
|
self.apiclient,
|
|
id=router.hostid,
|
|
)
|
|
self.assertEqual(
|
|
isinstance(hosts, list),
|
|
True,
|
|
"Check for list hosts response return valid data"
|
|
)
|
|
host = hosts[0]
|
|
host.user = self.hostConfig['username']
|
|
host.passwd = self.hostConfig['password']
|
|
try:
|
|
result = get_process_status(
|
|
host.ipaddress,
|
|
22,
|
|
host.user,
|
|
host.passwd,
|
|
router.linklocalip,
|
|
'ip route list table Table_eth3'
|
|
)
|
|
except KeyError:
|
|
self.skipTest(
|
|
"Provide a marvin config file with host\
|
|
credentials to run %s" %
|
|
self._testMethodName)
|
|
|
|
logger.debug("ip route list table Table_eth3: %s" % result)
|
|
public_range_gateway = self.services["publiciprange"]["gateway"]
|
|
default_route_rule = "default via " + public_range_gateway + " dev eth3 proto static"
|
|
logger.debug("default route result: %s" % str(result[0]))
|
|
self.assertEqual(
|
|
default_route_rule,
|
|
str(result[0]),
|
|
"Check default route table entry for public ip range"
|
|
)
|
|
|
|
res = str(result)
|
|
self.assertEqual(
|
|
res.count("throw") == 2,
|
|
True,
|
|
"Check routing rules to throw rest of the traffic. Count shoule be Atleast 2 for the control and guest traffic "
|
|
)
|
|
|
|
firewall_rule.delete(self.apiclient)
|
|
|
|
if (self.hypervisor.lower() == 'vmware'
|
|
or self.hypervisor.lower() == 'hyperv'):
|
|
result = get_process_status(
|
|
self.apiclient.connection.mgtSvr,
|
|
22,
|
|
self.apiclient.connection.user,
|
|
self.apiclient.connection.passwd,
|
|
router.linklocalip,
|
|
'ip route list table Table_eth3',
|
|
hypervisor=self.hypervisor
|
|
)
|
|
else:
|
|
hosts = list_hosts(
|
|
self.apiclient,
|
|
id=router.hostid,
|
|
)
|
|
self.assertEqual(
|
|
isinstance(hosts, list),
|
|
True,
|
|
"Check for list hosts response return valid data"
|
|
)
|
|
host = hosts[0]
|
|
host.user = self.hostConfig['username']
|
|
host.passwd = self.hostConfig['password']
|
|
try:
|
|
result = get_process_status(
|
|
host.ipaddress,
|
|
22,
|
|
host.user,
|
|
host.passwd,
|
|
router.linklocalip,
|
|
'ip route list table Table_eth3'
|
|
)
|
|
except KeyError:
|
|
self.skipTest(
|
|
"Provide a marvin config file with host\
|
|
credentials to run %s" %
|
|
self._testMethodName)
|
|
|
|
logger.debug("ip route list table Table_eth3: %s" % result)
|
|
res = str(result)
|
|
self.assertEqual(
|
|
res.count("default via"),
|
|
0,
|
|
"Check to ensure there should not be any default rule"
|
|
)
|
|
|
|
self.assertEqual(
|
|
res.count("throw"),
|
|
0,
|
|
"Check to ensure there should not be any throw rule"
|
|
)
|
|
|
|
class TestIptables(cloudstackTestCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
|
|
testClient = super(TestIptables, cls).getClsTestClient()
|
|
cls.apiclient = testClient.getApiClient()
|
|
cls.services = testClient.getParsedTestDataConfig()
|
|
cls.hypervisor = testClient.getHypervisorInfo()
|
|
# Get Zone, Domain and templates
|
|
cls.domain = get_domain(cls.apiclient)
|
|
cls.zone = get_zone(cls.apiclient, testClient.getZoneForTests())
|
|
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
|
|
cls.services["zoneid"] = cls.zone.id
|
|
template = get_template(
|
|
cls.apiclient,
|
|
cls.zone.id,
|
|
cls.services["ostype"]
|
|
)
|
|
if template == FAILED:
|
|
assert False, "get_template() failed to return template with description %s" % cls.services[
|
|
"ostype"]
|
|
|
|
# Create an account, network, VM and IP addresses
|
|
cls.account = Account.create(
|
|
cls.apiclient,
|
|
cls.services["account"],
|
|
admin=True,
|
|
domainid=cls.domain.id
|
|
)
|
|
cls.services["publiciprange"]["zoneid"] = cls.zone.id
|
|
cls.service_offering = ServiceOffering.create(
|
|
cls.apiclient,
|
|
cls.services["service_offerings"]["tiny"]
|
|
)
|
|
cls.hostConfig = cls.config.__dict__["zones"][0].__dict__["pods"][0].__dict__["clusters"][0].__dict__["hosts"][0].__dict__
|
|
cls.virtual_machine = VirtualMachine.create(
|
|
cls.apiclient,
|
|
cls.services["virtual_machine"],
|
|
templateid=template.id,
|
|
accountid=cls.account.name,
|
|
domainid=cls.account.domainid,
|
|
serviceofferingid=cls.service_offering.id
|
|
)
|
|
cls._cleanup = [
|
|
cls.virtual_machine,
|
|
cls.account,
|
|
cls.service_offering
|
|
]
|
|
|
|
def setUp(self):
|
|
self.apiclient = self.testClient.getApiClient()
|
|
self.cleanup = []
|
|
return
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
try:
|
|
cls.apiclient = super(
|
|
TestIptables,
|
|
cls).getClsTestClient().getApiClient()
|
|
cleanup_resources(cls.apiclient, cls._cleanup)
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
|
|
def tearDown(self):
|
|
cleanup_resources(self.apiclient, self.cleanup)
|
|
return
|
|
|
|
@attr(tags=["advanced", "smoke"], required_hardware="true")
|
|
def test_iptable_rules(self):
|
|
"""Test iptable rules in case we have IP associated with a network which is in
|
|
different pubic IP range from that of public IP range that has source NAT IP.
|
|
When IP is associated we should see a rule '-i eth3 -o eth0 -m state --state RELATED,ESTABLISHED -j ACCEPT' in FORWARD table.
|
|
When IP is dis-associated we should see a rule in the FORWARD table is deleted.
|
|
"""
|
|
|
|
# Validate the following:
|
|
# 1. Create a new public IP range and dedicate to a account
|
|
# 2. Acquire a IP from new public range
|
|
# 3. Create a firewall rule to open up the port, so that IP is associated with network
|
|
# 5. Login to VR and verify routing tables, there should be Table_eth3
|
|
# 6. Delete firewall rule, since its last IP, routing table Table_eth3 should be deleted
|
|
|
|
self.public_ip_range = PublicIpRange.create(
|
|
self.apiclient,
|
|
self.services["publiciprange"]
|
|
)
|
|
|
|
logger.debug("Dedicating Public IP range to the account");
|
|
dedicate_public_ip_range_response = PublicIpRange.dedicate(
|
|
self.apiclient,
|
|
self.public_ip_range.vlan.id,
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
ip_address = PublicIPAddress.create(
|
|
self.apiclient,
|
|
self.account.name,
|
|
self.zone.id,
|
|
self.account.domainid,
|
|
self.services["virtual_machine"]
|
|
)
|
|
self.cleanup.append(ip_address)
|
|
self.cleanup.append(self.public_ip_range)
|
|
# Check if VM is in Running state before creating NAT and firewall rules
|
|
vm_response = VirtualMachine.list(
|
|
self.apiclient,
|
|
id=self.virtual_machine.id
|
|
)
|
|
|
|
self.assertEqual(
|
|
isinstance(vm_response, list),
|
|
True,
|
|
"Check list VM returns a valid list"
|
|
)
|
|
|
|
self.assertNotEqual(
|
|
len(vm_response),
|
|
0,
|
|
"Check Port Forwarding Rule is created"
|
|
)
|
|
self.assertEqual(
|
|
vm_response[0].state,
|
|
'Running',
|
|
"VM state should be Running before creating a NAT rule."
|
|
)
|
|
|
|
# Open up firewall port for SSH
|
|
firewall_rule = FireWallRule.create(
|
|
self.apiclient,
|
|
ipaddressid=ip_address.ipaddress.id,
|
|
protocol=self.services["natrule"]["protocol"],
|
|
cidrlist=['0.0.0.0/0'],
|
|
startport=self.services["natrule"]["publicport"],
|
|
endport=self.services["natrule"]["publicport"]
|
|
)
|
|
|
|
# Get the router details associated with account
|
|
routers = list_routers(
|
|
self.apiclient,
|
|
account=self.account.name,
|
|
domainid=self.account.domainid,
|
|
)
|
|
router = routers[0]
|
|
|
|
if (self.hypervisor.lower() == 'vmware'
|
|
or self.hypervisor.lower() == 'hyperv'):
|
|
result = get_process_status(
|
|
self.apiclient.connection.mgtSvr,
|
|
22,
|
|
self.apiclient.connection.user,
|
|
self.apiclient.connection.passwd,
|
|
router.linklocalip,
|
|
'iptables -t filter -L FORWARD -v',
|
|
hypervisor=self.hypervisor
|
|
)
|
|
else:
|
|
hosts = list_hosts(
|
|
self.apiclient,
|
|
id=router.hostid,
|
|
)
|
|
self.assertEqual(
|
|
isinstance(hosts, list),
|
|
True,
|
|
"Check for list hosts response return valid data"
|
|
)
|
|
host = hosts[0]
|
|
host.user = self.hostConfig['username']
|
|
host.passwd = self.hostConfig['password']
|
|
try:
|
|
result = get_process_status(
|
|
host.ipaddress,
|
|
22,
|
|
host.user,
|
|
host.passwd,
|
|
router.linklocalip,
|
|
'iptables -t filter -L FORWARD -v'
|
|
)
|
|
except KeyError:
|
|
self.skipTest(
|
|
"Provide a marvin config file with host\
|
|
credentials to run %s" %
|
|
self._testMethodName)
|
|
|
|
logger.debug("iptables -t filter -L FORWARD -v: %s" % result)
|
|
res = str(result)
|
|
self.assertEqual(
|
|
res.count("eth3 eth0 anywhere anywhere state RELATED,ESTABLISHED"),
|
|
1,
|
|
"Check to ensure there is a iptable rule to accept the RELATED,ESTABLISHED traffic"
|
|
)
|
|
|
|
|
|
class TestVPCPortForwarding(cloudstackTestCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
|
|
socket.setdefaulttimeout(60)
|
|
|
|
testClient = super(TestVPCPortForwarding, cls).getClsTestClient()
|
|
cls.api_client = cls.testClient.getApiClient()
|
|
|
|
cls.services = testClient.getParsedTestDataConfig()
|
|
|
|
# Get Zone, Domain and templates
|
|
cls.domain = get_domain(cls.api_client)
|
|
cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests())
|
|
cls.template = get_template(
|
|
cls.api_client,
|
|
cls.zone.id,
|
|
cls.services["ostype"]
|
|
)
|
|
cls.services["vpc_offering"] = { "name": 'VPC off',
|
|
"displaytext": 'VPC off',
|
|
"supportedservices": 'Dhcp,Dns,SourceNat,PortForwarding,Vpn,Lb,UserData,StaticNat',
|
|
}
|
|
cls.services["network_offering"] = {
|
|
"name": 'VPC Network offering',
|
|
"displaytext": 'VPC Network off',
|
|
"guestiptype": 'Isolated',
|
|
"supportedservices": 'Vpn,Dhcp,Dns,SourceNat,PortForwarding,Lb,UserData,StaticNat,NetworkACL',
|
|
"traffictype": 'GUEST',
|
|
"availability": 'Optional',
|
|
"useVpc": 'on',
|
|
"serviceProviderList": {
|
|
"Vpn": 'VpcVirtualRouter',
|
|
"Dhcp": 'VpcVirtualRouter',
|
|
"Dns": 'VpcVirtualRouter',
|
|
"SourceNat": 'VpcVirtualRouter',
|
|
"PortForwarding": 'VpcVirtualRouter',
|
|
"Lb": 'VpcVirtualRouter',
|
|
"UserData": 'VpcVirtualRouter',
|
|
"StaticNat": 'VpcVirtualRouter',
|
|
"NetworkACL": 'VpcVirtualRouter'
|
|
},
|
|
}
|
|
cls.services["network"] = {
|
|
"name": "Test Network",
|
|
"displaytext": "Test Network",
|
|
"netmask": '255.255.255.0'
|
|
}
|
|
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
|
|
cls.services["virtual_machine"]["template"] = cls.template.id
|
|
cls.services["publiciprange"]["zoneid"] = cls.zone.id
|
|
|
|
cls.service_offering = ServiceOffering.create(
|
|
cls.api_client,
|
|
cls.services["service_offering"]
|
|
)
|
|
cls._cleanup = [cls.service_offering]
|
|
return
|
|
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
try:
|
|
#Cleanup resources used
|
|
cleanup_resources(cls.api_client, cls._cleanup)
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
return
|
|
|
|
|
|
def setUp(self):
|
|
self.apiclient = self.testClient.getApiClient()
|
|
self.account = Account.create(
|
|
self.apiclient,
|
|
self.services["account"],
|
|
admin=True,
|
|
domainid=self.domain.id
|
|
)
|
|
self.cleanup = [self.account]
|
|
logger.debug("Creating a VPC offering..")
|
|
self.vpc_off = VpcOffering.create(
|
|
self.apiclient,
|
|
self.services["vpc_offering"]
|
|
)
|
|
self._cleanup.append(self.vpc_off)
|
|
logger.debug("Enabling the VPC offering created")
|
|
self.vpc_off.update(self.apiclient, state='Enabled')
|
|
|
|
logger.debug("Creating a VPC network in the account: %s" % self.account.name)
|
|
self.services["vpc"]["cidr"] = '10.1.0.0/16'
|
|
self.vpc = VPC.create(
|
|
self.apiclient,
|
|
self.services["vpc"],
|
|
vpcofferingid=self.vpc_off.id,
|
|
zoneid=self.zone.id,
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
return
|
|
|
|
def tearDown(self):
|
|
try:
|
|
#Clean up, terminate the created network offerings
|
|
cleanup_resources(self.apiclient, self.cleanup)
|
|
except Exception as e:
|
|
logger.debug("Warning: Exception during cleanup : %s" % e)
|
|
return
|
|
|
|
def check_ssh_into_vm(self, vm, public_ip, testnegative=False):
|
|
logger.debug("Checking if we can SSH into VM=%s on public_ip=%s" % (vm.name, public_ip.ipaddress.ipaddress))
|
|
try:
|
|
vm.get_ssh_client(ipaddress=public_ip.ipaddress.ipaddress)
|
|
if not testnegative:
|
|
logger.debug("SSH into VM=%s on public_ip=%s is successfully" % (vm.name, public_ip.ipaddress.ipaddress))
|
|
else:
|
|
self.fail("SSH into VM=%s on public_ip=%s is successfully" % (vm.name, public_ip.ipaddress.ipaddress))
|
|
except:
|
|
if not testnegative:
|
|
self.fail("Failed to SSH into VM - %s" % (public_ip.ipaddress.ipaddress))
|
|
else:
|
|
logger.debug("Failed to SSH into VM - %s" % (public_ip.ipaddress.ipaddress))
|
|
|
|
def create_natrule(self, vm, public_ip, network, services=None):
|
|
logger.debug("Creating NAT rule in network for vm with public IP")
|
|
if not services:
|
|
services = self.services["natrule"]
|
|
nat_rule = NATRule.create(self.apiclient,
|
|
vm,
|
|
services,
|
|
ipaddressid=public_ip.ipaddress.id,
|
|
openfirewall=False,
|
|
networkid=network.id,
|
|
vpcid=self.vpc.id
|
|
)
|
|
return nat_rule
|
|
|
|
def acquire_publicip(self, network):
|
|
logger.debug("Associating public IP for network: %s" % network.name)
|
|
public_ip = PublicIPAddress.create(self.apiclient,
|
|
accountid=self.account.name,
|
|
zoneid=self.zone.id,
|
|
domainid=self.account.domainid,
|
|
networkid=network.id,
|
|
vpcid=self.vpc.id
|
|
)
|
|
logger.debug("Associated %s with network %s" % (public_ip.ipaddress.ipaddress,
|
|
network.id
|
|
))
|
|
return public_ip
|
|
|
|
def create_network(self, net_offerring, gateway='10.1.1.1',vpc=None):
|
|
try:
|
|
logger.debug('Create NetworkOffering')
|
|
net_offerring["name"] = "NET_OFF-" + str(gateway)
|
|
nw_off = NetworkOffering.create(self.apiclient,
|
|
net_offerring,
|
|
conservemode=False
|
|
)
|
|
# Enable Network offering
|
|
nw_off.update(self.apiclient, state='Enabled')
|
|
self._cleanup.append(nw_off)
|
|
logger.debug('Created and Enabled NetworkOffering')
|
|
|
|
self.services["network"]["name"] = "NETWORK-" + str(gateway)
|
|
logger.debug('Adding Network=%s' % self.services["network"])
|
|
default_acl = NetworkACLList.list(self.apiclient, name="default_allow")[0]
|
|
obj_network = Network.create(self.apiclient,
|
|
self.services["network"],
|
|
accountid=self.account.name,
|
|
domainid=self.account.domainid,
|
|
networkofferingid=nw_off.id,
|
|
zoneid=self.zone.id,
|
|
gateway=gateway,
|
|
aclid=default_acl.id,
|
|
vpcid=vpc.id if vpc else self.vpc.id
|
|
)
|
|
logger.debug("Created network with ID: %s" % obj_network.id)
|
|
return obj_network
|
|
except Exception, e:
|
|
self.fail('Unable to create a Network with offering=%s because of %s ' % (net_offerring, e))
|
|
|
|
def deployvm_in_network(self, network, host_id=None):
|
|
try:
|
|
logger.debug('Creating VM in network=%s' % network.name)
|
|
vm = VirtualMachine.create(
|
|
self.apiclient,
|
|
self.services["virtual_machine"],
|
|
accountid=self.account.name,
|
|
domainid=self.account.domainid,
|
|
serviceofferingid=self.service_offering.id,
|
|
networkids=[str(network.id)],
|
|
hostid=host_id
|
|
)
|
|
logger.debug('Created VM=%s in network=%s' % (vm.id, network.name))
|
|
|
|
return vm
|
|
except:
|
|
self.fail('Unable to create VM in a Network=%s' % network.name)
|
|
|
|
@attr(tags=["advanced", "intervlan"], required_hardware="true")
|
|
def test_network_services_VPC_CreatePF(self):
|
|
""" Test Create VPC PF rules on acquired public ip when VpcVirtualRouter is Running
|
|
"""
|
|
|
|
# Validate the following
|
|
# 1. Create a VPC with cidr - 10.1.1.1/16
|
|
# 2. Create a Network offering - NO1 with all supported services
|
|
# 3. Add network1(10.1.1.1/24) using N01 to this VPC.
|
|
# 4. Deploy vm1 in network1.
|
|
# 5. Use the Create PF rule for vm in network1.
|
|
# 6. Successfully ssh into the Guest VM using the PF rule
|
|
|
|
network_1 = self.create_network(self.services["network_offering"])
|
|
vm_1 = self.deployvm_in_network(network_1)
|
|
self.public_ip_range = PublicIpRange.create(
|
|
self.apiclient,
|
|
self.services["publiciprange"]
|
|
)
|
|
self._cleanup.append(self.public_ip_range)
|
|
logger.debug("Dedicating Public IP range to the account");
|
|
dedicate_public_ip_range_response = PublicIpRange.dedicate(
|
|
self.apiclient,
|
|
self.public_ip_range.vlan.id,
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
public_ip_1 = self.acquire_publicip(network_1)
|
|
self.create_natrule( vm_1, public_ip_1, network_1)
|
|
self.check_ssh_into_vm(vm_1, public_ip_1, testnegative=False)
|
|
self.public_ip_range.release(self.apiclient)
|
|
return
|
|
|
|
class TestVPCStaticNat(cloudstackTestCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
|
|
socket.setdefaulttimeout(60)
|
|
|
|
testClient = super(TestVPCStaticNat, cls).getClsTestClient()
|
|
cls.api_client = cls.testClient.getApiClient()
|
|
|
|
cls.services = testClient.getParsedTestDataConfig()
|
|
|
|
# Get Zone, Domain and templates
|
|
cls.domain = get_domain(cls.api_client)
|
|
cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests())
|
|
cls.template = get_template(
|
|
cls.api_client,
|
|
cls.zone.id,
|
|
cls.services["ostype"]
|
|
)
|
|
cls.services["vpc_offering"] = { "name": 'VPC off',
|
|
"displaytext": 'VPC off',
|
|
"supportedservices": 'Dhcp,Dns,SourceNat,PortForwarding,Vpn,Lb,UserData,StaticNat',
|
|
}
|
|
cls.services["network_offering"] = {
|
|
"name": 'VPC Network offering',
|
|
"displaytext": 'VPC Network off',
|
|
"guestiptype": 'Isolated',
|
|
"supportedservices": 'Vpn,Dhcp,Dns,SourceNat,PortForwarding,Lb,UserData,StaticNat,NetworkACL',
|
|
"traffictype": 'GUEST',
|
|
"availability": 'Optional',
|
|
"useVpc": 'on',
|
|
"serviceProviderList": {
|
|
"Vpn": 'VpcVirtualRouter',
|
|
"Dhcp": 'VpcVirtualRouter',
|
|
"Dns": 'VpcVirtualRouter',
|
|
"SourceNat": 'VpcVirtualRouter',
|
|
"PortForwarding": 'VpcVirtualRouter',
|
|
"Lb": 'VpcVirtualRouter',
|
|
"UserData": 'VpcVirtualRouter',
|
|
"StaticNat": 'VpcVirtualRouter',
|
|
"NetworkACL": 'VpcVirtualRouter'
|
|
},
|
|
}
|
|
cls.services["network"] = {
|
|
"name": "Test Network",
|
|
"displaytext": "Test Network",
|
|
"netmask": '255.255.255.0'
|
|
}
|
|
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
|
|
cls.services["virtual_machine"]["template"] = cls.template.id
|
|
cls.services["publiciprange"]["zoneid"] = cls.zone.id
|
|
|
|
cls.service_offering = ServiceOffering.create(
|
|
cls.api_client,
|
|
cls.services["service_offering"]
|
|
)
|
|
cls._cleanup = [cls.service_offering]
|
|
return
|
|
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
try:
|
|
#Cleanup resources used
|
|
cleanup_resources(cls.api_client, cls._cleanup)
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
return
|
|
|
|
|
|
def setUp(self):
|
|
self.apiclient = self.testClient.getApiClient()
|
|
self.account = Account.create(
|
|
self.apiclient,
|
|
self.services["account"],
|
|
admin=True,
|
|
domainid=self.domain.id
|
|
)
|
|
self.cleanup = [self.account]
|
|
logger.debug("Creating a VPC offering..")
|
|
self.vpc_off = VpcOffering.create(
|
|
self.apiclient,
|
|
self.services["vpc_offering"]
|
|
)
|
|
self._cleanup.append(self.vpc_off)
|
|
logger.debug("Enabling the VPC offering created")
|
|
self.vpc_off.update(self.apiclient, state='Enabled')
|
|
|
|
logger.debug("Creating a VPC network in the account: %s" % self.account.name)
|
|
self.services["vpc"]["cidr"] = '10.1.0.0/16'
|
|
self.vpc = VPC.create(
|
|
self.apiclient,
|
|
self.services["vpc"],
|
|
vpcofferingid=self.vpc_off.id,
|
|
zoneid=self.zone.id,
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
return
|
|
|
|
def tearDown(self):
|
|
try:
|
|
#Clean up, terminate the created network offerings
|
|
cleanup_resources(self.apiclient, self.cleanup)
|
|
except Exception as e:
|
|
logger.debug("Warning: Exception during cleanup : %s" % e)
|
|
return
|
|
|
|
def check_ssh_into_vm(self, vm, public_ip, testnegative=False):
|
|
logger.debug("Checking if we can SSH into VM=%s on public_ip=%s" % (vm.name, public_ip.ipaddress.ipaddress))
|
|
try:
|
|
vm.get_ssh_client(ipaddress=public_ip.ipaddress.ipaddress)
|
|
if not testnegative:
|
|
logger.debug("SSH into VM=%s on public_ip=%s is successfully" % (vm.name, public_ip.ipaddress.ipaddress))
|
|
else:
|
|
self.fail("SSH into VM=%s on public_ip=%s is successfully" % (vm.name, public_ip.ipaddress.ipaddress))
|
|
except:
|
|
if not testnegative:
|
|
self.fail("Failed to SSH into VM - %s" % (public_ip.ipaddress.ipaddress))
|
|
else:
|
|
logger.debug("Failed to SSH into VM - %s" % (public_ip.ipaddress.ipaddress))
|
|
|
|
|
|
def acquire_publicip(self, network):
|
|
logger.debug("Associating public IP for network: %s" % network.name)
|
|
public_ip = PublicIPAddress.create(self.apiclient,
|
|
accountid=self.account.name,
|
|
zoneid=self.zone.id,
|
|
domainid=self.account.domainid,
|
|
networkid=network.id,
|
|
vpcid=self.vpc.id
|
|
)
|
|
logger.debug("Associated %s with network %s" % (public_ip.ipaddress.ipaddress,
|
|
network.id
|
|
))
|
|
return public_ip
|
|
|
|
def create_network(self, net_offerring, gateway='10.1.1.1',vpc=None):
|
|
try:
|
|
logger.debug('Create NetworkOffering')
|
|
net_offerring["name"] = "NET_OFF-" + str(gateway)
|
|
nw_off = NetworkOffering.create(self.apiclient,
|
|
net_offerring,
|
|
conservemode=False
|
|
)
|
|
# Enable Network offering
|
|
nw_off.update(self.apiclient, state='Enabled')
|
|
self._cleanup.append(nw_off)
|
|
logger.debug('Created and Enabled NetworkOffering')
|
|
|
|
self.services["network"]["name"] = "NETWORK-" + str(gateway)
|
|
logger.debug('Adding Network=%s' % self.services["network"])
|
|
default_acl = NetworkACLList.list(self.apiclient, name="default_allow")[0]
|
|
obj_network = Network.create(self.apiclient,
|
|
self.services["network"],
|
|
accountid=self.account.name,
|
|
domainid=self.account.domainid,
|
|
networkofferingid=nw_off.id,
|
|
zoneid=self.zone.id,
|
|
gateway=gateway,
|
|
aclid=default_acl.id,
|
|
vpcid=vpc.id if vpc else self.vpc.id
|
|
)
|
|
logger.debug("Created network with ID: %s" % obj_network.id)
|
|
return obj_network
|
|
except Exception, e:
|
|
self.fail('Unable to create a Network with offering=%s because of %s ' % (net_offerring, e))
|
|
|
|
def deployvm_in_network(self, network, host_id=None):
|
|
try:
|
|
logger.debug('Creating VM in network=%s' % network.name)
|
|
vm = VirtualMachine.create(
|
|
self.apiclient,
|
|
self.services["virtual_machine"],
|
|
accountid=self.account.name,
|
|
domainid=self.account.domainid,
|
|
serviceofferingid=self.service_offering.id,
|
|
networkids=[str(network.id)],
|
|
hostid=host_id
|
|
)
|
|
logger.debug('Created VM=%s in network=%s' % (vm.id, network.name))
|
|
|
|
return vm
|
|
except:
|
|
self.fail('Unable to create VM in a Network=%s' % network.name)
|
|
|
|
def create_StaticNatRule_For_VM(self, vm, public_ip, network, services=None):
|
|
logger.debug("Enabling static NAT for IP: %s" %public_ip.ipaddress.ipaddress)
|
|
if not services:
|
|
services = self.services["natrule"]
|
|
try:
|
|
StaticNATRule.enable(
|
|
self.apiclient,
|
|
ipaddressid=public_ip.ipaddress.id,
|
|
virtualmachineid=vm.id,
|
|
networkid=network.id
|
|
)
|
|
logger.debug("Static NAT enabled for IP: %s" %
|
|
public_ip.ipaddress.ipaddress)
|
|
logger.debug("Adding NetworkACL rules to make NAT rule accessible")
|
|
except Exception as e:
|
|
self.fail("Failed to enable static NAT on IP: %s - %s" % (
|
|
public_ip.ipaddress.ipaddress, e))
|
|
|
|
@attr(tags=["advanced", "intervlan"], required_hardware="true")
|
|
def test_network_services_VPC_CreatePF(self):
|
|
""" Test Create VPC PF rules on acquired public ip when VpcVirtualRouter is Running
|
|
"""
|
|
|
|
# Validate the following
|
|
# 1. Create a VPC with cidr - 10.1.1.1/16
|
|
# 2. Create a Network offering - NO1 with all supported services
|
|
# 3. Add network1(10.1.1.1/24) using N01 to this VPC.
|
|
# 4. Deploy vm1 in network1.
|
|
# 5. Use the Create PF rule for vm in network1.
|
|
# 6. Successfully ssh into the Guest VM using the PF rule
|
|
|
|
network_1 = self.create_network(self.services["network_offering"])
|
|
vm_1 = self.deployvm_in_network(network_1)
|
|
self.public_ip_range = PublicIpRange.create(
|
|
self.apiclient,
|
|
self.services["publiciprange"]
|
|
)
|
|
self._cleanup.append(self.public_ip_range)
|
|
logger.debug("Dedicating Public IP range to the account");
|
|
dedicate_public_ip_range_response = PublicIpRange.dedicate(
|
|
self.apiclient,
|
|
self.public_ip_range.vlan.id,
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
public_ip_1 = self.acquire_publicip(network_1)
|
|
self.create_StaticNatRule_For_VM( vm_1, public_ip_1, network_1)
|
|
self.check_ssh_into_vm(vm_1, public_ip_1, testnegative=False)
|
|
self.public_ip_range.release(self.apiclient)
|
|
return
|