mirror of
				https://github.com/apache/cloudstack.git
				synced 2025-10-26 08:42:29 +01:00 
			
		
		
		
	* cleanup plan * more robust cleanup within member method * ss_domain_limits fixed * egress 'icmp' instead of 'all' and cleanup * configdrive syntax, fixed to be up to par with py2 * cleanup fixed the lb secondary ip tests? * deal with different base64 encoding in py3 for userdata * cleanup of multiple_ips_per_nic * cleanup and reformat of test_volumes * cleanup and fixes for test_ps_domain_limits.py * cleanup and fix of test_ps_limits.py * fix occasional match of float against int * cleanup and fix test_ps_resize_volume.py * cleanup and fix test_snapshots * cleanup ss_max_limits and fix for float vs int problem in API * mere cleanup of test_volume_destroy_recover * add missing command creation * cleanup of test_vpc_on_host_maintenance * cleanup test_vpc_network * cleanup, comments and logging in test_vpc_network_lbrules * cleanup of test_vpc_network_pfrules * cleanup and format code for test_vpc_network_staticnatrule * f string instead of conversion specifiers * check http and ssh fix and cleanup (for vpc pfrules tests) * generalise create network method * make ip optional in creating webserver * remove unused code and add rules to enable ssh * more cleanup * remove unused code and cleanup * small cleanup, mostly precarous run environment required * cleanup and removed unused code * advancedsg only, cleanup, pulled in services * reformat/cleanup * log result of error after verify rule * add nw_off_no_services * tags=["TODO"] for escalations_networks * tags=["TODO"] for organization_states * tags=["TODO"] for browse_templates * tags=["TODO"] for configdrive * tags=["TODO"] for vpc_vms_deployment * add remove network cleanup and fixes * move tests that fail on all platforms out of the way Co-authored-by: Daan Hoogland <dahn@onecht.net>
		
			
				
	
	
		
			1228 lines
		
	
	
		
			48 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1228 lines
		
	
	
		
			48 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Licensed to the Apache Software Foundation (ASF) under one
 | |
| # or more contributor license agreements.  See the NOTICE file
 | |
| # distributed with this work for additional information
 | |
| # regarding copyright ownership.  The ASF licenses this file
 | |
| # to you under the Apache License, Version 2.0 (the
 | |
| # "License"); you may not use this file except in compliance
 | |
| # with the License.  You may obtain a copy of the License at
 | |
| #
 | |
| #   http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing,
 | |
| # software distributed under the License is distributed on an
 | |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 | |
| # KIND, either express or implied.  See the License for the
 | |
| # specific language governing permissions and limitations
 | |
| # under the License.
 | |
| """ BVT tests for network services on public IP's from different public IP
 | |
|   range than that of associated source NAT IP of the network. Each IP associated
 | |
|   with network from a different public IP range results in a new public
 | |
|   interface on VR (eth3, eth4 etc) and iptable
 | |
| """
 | |
| # Import Local Modules
 | |
| from marvin.codes import (FAILED)
 | |
| from marvin.cloudstackTestCase import cloudstackTestCase
 | |
| from marvin.lib.utils import cleanup_resources, get_process_status
 | |
| from marvin.lib.base import (Account,
 | |
|                              VirtualMachine,
 | |
|                              ServiceOffering,
 | |
|                              NATRule,
 | |
|                              PublicIPAddress,
 | |
|                              StaticNATRule,
 | |
|                              FireWallRule,
 | |
|                              Network,
 | |
|                              NetworkOffering,
 | |
|                              LoadBalancerRule,
 | |
|                              PublicIpRange,
 | |
|                              Router,
 | |
|                              VpcOffering,
 | |
|                              VPC,
 | |
|                              NetworkACLList,
 | |
|                              NetworkACL)
 | |
| from marvin.lib.common import (get_domain,
 | |
|                                get_zone,
 | |
|                                get_template,
 | |
|                                list_hosts,
 | |
|                                list_routers)
 | |
| from nose.plugins.attrib import attr
 | |
| 
 | |
| # Import System modules
 | |
| import socket
 | |
| import logging
 | |
| 
 | |
| _multiprocess_shared_ = True
 | |
| 
 | |
| logger = logging.getLogger('TestNetworkOps')
 | |
| stream_handler = logging.StreamHandler()
 | |
| logger.setLevel(logging.DEBUG)
 | |
| logger.addHandler(stream_handler)
 | |
| 
 | |
| class Services:
 | |
|     """Test multiple public interfaces
 | |
|     """
 | |
| 
 | |
|     def __init__(self):
 | |
|         self.services = {
 | |
|             "account": {
 | |
|                 "email": "test@test.com",
 | |
|                 "firstname": "Test",
 | |
|                 "lastname": "User",
 | |
|                 "username": "test",
 | |
|                 # Random characters are appended for unique
 | |
|                 # username
 | |
|                 "password": "password",
 | |
|             },
 | |
|             "domain_admin": {
 | |
|                 "email": "domain@admin.com",
 | |
|                 "firstname": "Domain",
 | |
|                 "lastname": "Admin",
 | |
|                 "username": "DoA",
 | |
|                 # Random characters are appended for unique
 | |
|                 # username
 | |
|                 "password": "password",
 | |
|             },
 | |
|             "service_offering": {
 | |
|                 "name": "Tiny Instance",
 | |
|                 "displaytext": "Tiny Instance",
 | |
|                 "cpunumber": 1,
 | |
|                 "cpuspeed": 100,
 | |
|                 "memory": 128,
 | |
|             },
 | |
|             "publiciprange": {
 | |
|                 "gateway": "10.6.0.254",
 | |
|                 "netmask": "255.255.255.0",
 | |
|                 "startip": "10.6.0.2",
 | |
|                 "endip": "10.6.0.20",
 | |
|                 "forvirtualnetwork": "true",
 | |
|                 "vlan": "300"
 | |
|             },
 | |
|             "extrapubliciprange": {
 | |
|                 "gateway": "10.200.100.1",
 | |
|                 "netmask": "255.255.255.0",
 | |
|                 "startip": "10.200.100.101",
 | |
|                 "endip": "10.200.100.105",
 | |
|                 "forvirtualnetwork": "false",
 | |
|                 "vlan": "301"
 | |
|             },
 | |
|             "network_offering": {
 | |
|                 "name": 'VPC Network offering',
 | |
|                 "displaytext": 'VPC Network off',
 | |
|                 "guestiptype": 'Isolated',
 | |
|                 "supportedservices": 'Vpn,Dhcp,Dns,SourceNat,PortForwarding,Lb,UserData,StaticNat,NetworkACL',
 | |
|                 "traffictype": 'GUEST',
 | |
|                 "availability": 'Optional',
 | |
|                 "useVpc": 'on',
 | |
|                 "serviceProviderList": {
 | |
|                     "Vpn": 'VpcVirtualRouter',
 | |
|                     "Dhcp": 'VpcVirtualRouter',
 | |
|                     "Dns": 'VpcVirtualRouter',
 | |
|                     "SourceNat": 'VpcVirtualRouter',
 | |
|                     "PortForwarding": 'VpcVirtualRouter',
 | |
|                     "Lb": 'VpcVirtualRouter',
 | |
|                     "UserData": 'VpcVirtualRouter',
 | |
|                     "StaticNat": 'VpcVirtualRouter',
 | |
|                     "NetworkACL": 'VpcVirtualRouter'
 | |
|                 },
 | |
|             },
 | |
|             "virtual_machine": {
 | |
|                 "displayname": "Test VM",
 | |
|                 "username": "root",
 | |
|                 "password": "password",
 | |
|                 "ssh_port": 22,
 | |
|                 "privateport": 22,
 | |
|                 "publicport": 22,
 | |
|                 "protocol": "TCP",
 | |
|                 "affinity": {
 | |
|                     "name": "webvms",
 | |
|                     "type": "host anti-affinity",
 | |
|                 }
 | |
|             },
 | |
|             "vpc_offering": {
 | |
|                 "name": 'VPC off',
 | |
|                 "displaytext": 'VPC off',
 | |
|                 "supportedservices": 'Dhcp,Dns,SourceNat,PortForwarding,Vpn,Lb,UserData,StaticNat',
 | |
|             },
 | |
|             "vpc": {
 | |
|                 "name": "TestVPC",
 | |
|                 "displaytext": "TestVPC",
 | |
|                 "cidr": '10.0.0.1/24'
 | |
|             },
 | |
|             "network": {
 | |
|                 "name": "Test Network",
 | |
|                 "displaytext": "Test Network",
 | |
|                 "netmask": '255.255.255.0'
 | |
|             },
 | |
|             "natrule": {
 | |
|                 "privateport": 22,
 | |
|                 "publicport": 22,
 | |
|                 "startport": 22,
 | |
|                 "endport": 22,
 | |
|                 "protocol": "TCP",
 | |
|                 "cidrlist": '0.0.0.0/0',
 | |
|             },
 | |
|             "ostype": "CentOS 5.6 (64-bit)",
 | |
|             "sleep": 60,
 | |
|             "timeout": 10,
 | |
|             "vlan": "10",
 | |
|             "zoneid": '',
 | |
|             "mode": 'advanced'
 | |
|         }
 | |
| 
 | |
| 
 | |
| class TestPortForwarding(cloudstackTestCase):
 | |
| 
 | |
|     @classmethod
 | |
|     def setUpClass(cls):
 | |
| 
 | |
|         testClient = super(TestPortForwarding, cls).getClsTestClient()
 | |
|         cls.apiclient = testClient.getApiClient()
 | |
|         cls.services = Services().services
 | |
|         cls.hypervisor = testClient.getHypervisorInfo()
 | |
|         # Get Zone, Domain and templates
 | |
|         cls.domain = get_domain(cls.apiclient)
 | |
|         cls.zone = get_zone(cls.apiclient, testClient.getZoneForTests())
 | |
|         # cls.services["virtual_machine"]["zoneid"] = cls.zone.id
 | |
|         cls.services["zoneid"] = cls.zone.id
 | |
|         cls.services["publiciprange"]["zoneid"] = cls.zone.id
 | |
|         cls._cleanup = []
 | |
| 
 | |
|         template = get_template(
 | |
|             cls.apiclient,
 | |
|             cls.zone.id,
 | |
|             cls.services["ostype"]
 | |
|         )
 | |
|         if template == FAILED:
 | |
|             assert False, "get_template() failed to return template with description %s" % cls.services[
 | |
|                 "ostype"]
 | |
| 
 | |
|         cls.account = Account.create(
 | |
|             cls.apiclient,
 | |
|             cls.services["account"],
 | |
|             admin=True,
 | |
|             domainid=cls.domain.id
 | |
|         )
 | |
|         cls._cleanup.append(cls.account)
 | |
|         cls.service_offering = ServiceOffering.create(
 | |
|             cls.apiclient,
 | |
|             cls.services["service_offering"]
 | |
|         )
 | |
|         cls._cleanup.append(cls.service_offering)
 | |
|         cls.virtual_machine = VirtualMachine.create(
 | |
|             cls.apiclient,
 | |
|             cls.services["virtual_machine"],
 | |
|             zoneid = cls.services["zoneid"],
 | |
|             templateid=template.id,
 | |
|             accountid=cls.account.name,
 | |
|             domainid=cls.account.domainid,
 | |
|             serviceofferingid=cls.service_offering.id
 | |
|         )
 | |
|         cls._cleanup.append(cls.virtual_machine)
 | |
| 
 | |
|     def setUp(self):
 | |
|         self.apiclient = self.testClient.getApiClient()
 | |
|         self.cleanup = []
 | |
|         return
 | |
| 
 | |
|     @classmethod
 | |
|     def tearDownClass(cls):
 | |
|         super(TestPortForwarding, cls).tearDownClass()
 | |
| 
 | |
|     def tearDown(self):
 | |
|         super(TestPortForwarding, self).tearDown()
 | |
| 
 | |
|     @attr(tags=["advancedsg", "smoke"], required_hardware="true")
 | |
|     def test_port_forwarding_on_ip_from_non_src_nat_ip_range(self):
 | |
|         """Test for port forwarding on a IP which is in pubic IP range different
 | |
|            from public IP range that has source NAT IP associated with network
 | |
|         """
 | |
| 
 | |
|         # Validate the following:
 | |
|         # 1. Create a new public IP range and dedicate to a account
 | |
|         # 2. Acquire a IP from new public range
 | |
|         # 3. create a port forwarding on acquired IP from new range
 | |
|         # 4. Create a firewall rule to open up the port
 | |
|         # 5. Test SSH works to the VM
 | |
| 
 | |
|         self.services["extrapubliciprange"]["zoneid"] = self.services["zoneid"]
 | |
|         self.public_ip_range = PublicIpRange.create(
 | |
|                                     self.apiclient,
 | |
|                                     self.services["extrapubliciprange"]
 | |
|                                )
 | |
|         self.cleanup.append(self.public_ip_range)
 | |
| 
 | |
|         logger.debug("Dedicating Public IP range to the account");
 | |
|         dedicate_public_ip_range_response = PublicIpRange.dedicate(
 | |
|                                                 self.apiclient,
 | |
|                                                 self.public_ip_range.vlan.id,
 | |
|                                                 account=self.account.name,
 | |
|                                                 domainid=self.account.domainid
 | |
|                                             )
 | |
|         ip_address = PublicIPAddress.create(
 | |
|             self.apiclient,
 | |
|             self.account.name,
 | |
|             self.zone.id,
 | |
|             self.account.domainid,
 | |
|             self.services["virtual_machine"]
 | |
|         )
 | |
|         self.cleanup.append(ip_address)
 | |
|         # Check if VM is in Running state before creating NAT and firewall rules
 | |
|         vm_response = VirtualMachine.list(
 | |
|             self.apiclient,
 | |
|             id=self.virtual_machine.id
 | |
|         )
 | |
| 
 | |
|         self.assertEqual(
 | |
|             isinstance(vm_response, list),
 | |
|             True,
 | |
|             "Check list VM returns a valid list"
 | |
|         )
 | |
| 
 | |
|         self.assertNotEqual(
 | |
|             len(vm_response),
 | |
|             0,
 | |
|             "Check Port Forwarding Rule is created"
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             vm_response[0].state,
 | |
|             'Running',
 | |
|             "VM state should be Running before creating a NAT rule."
 | |
|         )
 | |
| 
 | |
|         # Open up firewall port for SSH
 | |
|         fwr = FireWallRule.create(
 | |
|             self.apiclient,
 | |
|             ipaddressid=ip_address.ipaddress.id,
 | |
|             protocol=self.services["natrule"]["protocol"],
 | |
|             cidrlist=['0.0.0.0/0'],
 | |
|             startport=self.services["natrule"]["publicport"],
 | |
|             endport=self.services["natrule"]["publicport"]
 | |
|         )
 | |
|         self.cleanup.append(fwr)
 | |
| 
 | |
|         # Create PF rule
 | |
|         nat_rule = NATRule.create(
 | |
|             self.apiclient,
 | |
|             self.virtual_machine,
 | |
|             self.services["natrule"],
 | |
|             ip_address.ipaddress.id
 | |
|         )
 | |
| 
 | |
|         try:
 | |
|             logger.debug("SSHing into VM with IP address %s with NAT IP %s" %
 | |
|                        (
 | |
|                            self.virtual_machine.ipaddress,
 | |
|                            ip_address.ipaddress.ipaddress
 | |
|                        ))
 | |
|             self.virtual_machine.get_ssh_client(ip_address.ipaddress.ipaddress)
 | |
|         except Exception as e:
 | |
|             self.fail(
 | |
|                 "SSH Access failed for %s: %s" %
 | |
|                 (self.virtual_machine.ipaddress, e)
 | |
|             )
 | |
| 
 | |
|         nat_rule.delete(self.apiclient)
 | |
| 
 | |
| class TestStaticNat(cloudstackTestCase):
 | |
| 
 | |
|     @classmethod
 | |
|     def setUpClass(cls):
 | |
|         testClient = super(TestStaticNat, cls).getClsTestClient()
 | |
|         cls.apiclient = testClient.getApiClient()
 | |
|         cls.services = Services().services
 | |
|         cls.hypervisor = testClient.getHypervisorInfo()
 | |
|         # Get Zone, Domain and templates
 | |
|         cls.domain = get_domain(cls.apiclient)
 | |
|         cls.zone = get_zone(cls.apiclient, testClient.getZoneForTests())
 | |
|         # cls.services["virtual_machine"]["zoneid"] = cls.zone.id
 | |
|         cls.services["zoneid"] = cls.zone.id
 | |
|         template = get_template(
 | |
|             cls.apiclient,
 | |
|             cls.zone.id,
 | |
|             cls.services["ostype"]
 | |
|         )
 | |
|         if template == FAILED:
 | |
|             assert False, "get_template() failed to return template with description %s" % cls.services[
 | |
|                 "ostype"]
 | |
|         cls._cleanup = []
 | |
| 
 | |
|         cls.account = Account.create(
 | |
|             cls.apiclient,
 | |
|             cls.services["account"],
 | |
|             admin=True,
 | |
|             domainid=cls.domain.id
 | |
|         )
 | |
|         cls._cleanup.append(cls.account)
 | |
|         cls.services["publiciprange"]["zoneid"] = cls.zone.id
 | |
|         cls.service_offering = ServiceOffering.create(
 | |
|             cls.apiclient,
 | |
|             cls.services["service_offering"]
 | |
|         )
 | |
|         cls._cleanup.append(cls.service_offering)
 | |
|         cls.virtual_machine = VirtualMachine.create(
 | |
|             cls.apiclient,
 | |
|             cls.services["virtual_machine"],
 | |
|             zoneid = cls.services["zoneid"],
 | |
|             templateid=template.id,
 | |
|             accountid=cls.account.name,
 | |
|             domainid=cls.account.domainid,
 | |
|             serviceofferingid=cls.service_offering.id
 | |
|         )
 | |
|         cls._cleanup.append(cls.virtual_machine)
 | |
|         cls.defaultNetworkId = cls.virtual_machine.nic[0].networkid
 | |
| 
 | |
|     def setUp(self):
 | |
|         self.apiclient = self.testClient.getApiClient()
 | |
|         self.cleanup = []
 | |
|         return
 | |
| 
 | |
|     @classmethod
 | |
|     def tearDownClass(cls):
 | |
|         super(TestStaticNat, cls).tearDownClass()
 | |
| 
 | |
|     def tearDown(self):
 | |
|         super(TestStaticNat, self).tearDown()
 | |
| 
 | |
|     @attr(tags=["advancedsg", "smoke"], required_hardware="true")
 | |
|     def test_static_nat_on_ip_from_non_src_nat_ip_range(self):
 | |
|         """Test for static nat on a IP which is in pubic IP range different
 | |
|            from public IP range that has source NAT IP associated with network
 | |
|         """
 | |
| 
 | |
|         # Validate the following:
 | |
|         # 1. Create a new public IP range and dedicate to a account
 | |
|         # 2. Acquire a IP from new public range
 | |
|         # 3. Enable static NAT on acquired IP from new range
 | |
|         # 4. Create a firewall rule to open up the port
 | |
|         # 5. Test SSH works to the VM
 | |
| 
 | |
|         self.services["extrapubliciprange"]["zoneid"] = self.services["zoneid"]
 | |
|         self.public_ip_range = PublicIpRange.create(
 | |
|                                     self.apiclient,
 | |
|                                     self.services["extrapubliciprange"]
 | |
|                                )
 | |
|         self.cleanup.append(self.public_ip_range)
 | |
|         logger.debug("Dedicating Public IP range to the account");
 | |
|         dedicate_public_ip_range_response = PublicIpRange.dedicate(
 | |
|                                                 self.apiclient,
 | |
|                                                 self.public_ip_range.vlan.id,
 | |
|                                                 account=self.account.name,
 | |
|                                                 domainid=self.account.domainid
 | |
|                                             )
 | |
|         ip_address = PublicIPAddress.create(
 | |
|             self.apiclient,
 | |
|             self.account.name,
 | |
|             self.zone.id,
 | |
|             self.account.domainid,
 | |
|             self.services["virtual_machine"]
 | |
|         )
 | |
|         self.cleanup.append(ip_address)
 | |
|         # Check if VM is in Running state before creating NAT and firewall rules
 | |
|         vm_response = VirtualMachine.list(
 | |
|             self.apiclient,
 | |
|             id=self.virtual_machine.id
 | |
|         )
 | |
| 
 | |
|         self.assertEqual(
 | |
|             isinstance(vm_response, list),
 | |
|             True,
 | |
|             "Check list VM returns a valid list"
 | |
|         )
 | |
| 
 | |
|         self.assertNotEqual(
 | |
|             len(vm_response),
 | |
|             0,
 | |
|             "Check Port Forwarding Rule is created"
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             vm_response[0].state,
 | |
|             'Running',
 | |
|             "VM state should be Running before creating a NAT rule."
 | |
|         )
 | |
| 
 | |
|         # Open up firewall port for SSH
 | |
|         fwr = FireWallRule.create(
 | |
|             self.apiclient,
 | |
|             ipaddressid=ip_address.ipaddress.id,
 | |
|             protocol=self.services["natrule"]["protocol"],
 | |
|             cidrlist=['0.0.0.0/0'],
 | |
|             startport=self.services["natrule"]["publicport"],
 | |
|             endport=self.services["natrule"]["publicport"]
 | |
|         )
 | |
|         self.cleanup.append(fwr)
 | |
| 
 | |
|         # Create Static NAT rule
 | |
|         StaticNATRule.enable(
 | |
|             self.apiclient,
 | |
|             ip_address.ipaddress.id,
 | |
|             self.virtual_machine.id,
 | |
|             self.defaultNetworkId
 | |
|         )
 | |
| 
 | |
|         try:
 | |
|             logger.debug("SSHing into VM with IP address %s with NAT IP %s" %
 | |
|                        (
 | |
|                            self.virtual_machine.ipaddress,
 | |
|                            ip_address.ipaddress.ipaddress
 | |
|                        ))
 | |
|             self.virtual_machine.get_ssh_client(ip_address.ipaddress.ipaddress)
 | |
|         except Exception as e:
 | |
|             self.fail(
 | |
|                 "SSH Access failed for %s: %s" %
 | |
|                 (self.virtual_machine.ipaddress, e)
 | |
|             )
 | |
| 
 | |
|         StaticNATRule.disable(
 | |
|             self.apiclient,
 | |
|             ip_address.ipaddress.id,
 | |
|             self.virtual_machine.id
 | |
|         )
 | |
| 
 | |
| class TestRouting(cloudstackTestCase):
 | |
| 
 | |
|     @classmethod
 | |
|     def setUpClass(cls):
 | |
| 
 | |
|         testClient = super(TestRouting, cls).getClsTestClient()
 | |
|         cls.apiclient = testClient.getApiClient()
 | |
|         cls.services = Services().services
 | |
|         cls.hypervisor = testClient.getHypervisorInfo()
 | |
|         # Get Zone, Domain and templates
 | |
|         cls.domain = get_domain(cls.apiclient)
 | |
|         cls.zone = get_zone(cls.apiclient, testClient.getZoneForTests())
 | |
|         # cls.services["virtual_machine"]["zoneid"] = cls.zone.id
 | |
|         cls.services["zoneid"] = cls.zone.id
 | |
|         cls._cleanup = []
 | |
|         template = get_template(
 | |
|             cls.apiclient,
 | |
|             cls.zone.id,
 | |
|             cls.services["ostype"]
 | |
|         )
 | |
|         if template == FAILED:
 | |
|             assert False, "get_template() failed to return template with description %s" % cls.services[
 | |
|                 "ostype"]
 | |
| 
 | |
|         cls.account = Account.create(
 | |
|             cls.apiclient,
 | |
|             cls.services["account"],
 | |
|             admin=True,
 | |
|             domainid=cls.domain.id
 | |
|         )
 | |
|         cls._cleanup.append(cls.account)
 | |
|         cls.services["publiciprange"]["zoneid"] = cls.zone.id
 | |
|         cls.service_offering = ServiceOffering.create(
 | |
|             cls.apiclient,
 | |
|             cls.services["service_offering"]
 | |
|         )
 | |
|         cls._cleanup.append(cls.service_offering)
 | |
|         cls.hostConfig = cls.config.__dict__["zones"][0].__dict__["pods"][0].__dict__["clusters"][0].__dict__["hosts"][0].__dict__
 | |
|         cls.virtual_machine = VirtualMachine.create(
 | |
|             cls.apiclient,
 | |
|             cls.services["virtual_machine"],
 | |
|             zoneid = cls.services["zoneid"],
 | |
|             templateid=template.id,
 | |
|             accountid=cls.account.name,
 | |
|             domainid=cls.account.domainid,
 | |
|             serviceofferingid=cls.service_offering.id
 | |
|         )
 | |
|         cls._cleanup.append(cls.virtual_machine)
 | |
| 
 | |
|     def setUp(self):
 | |
|         self.apiclient = self.testClient.getApiClient()
 | |
|         self.cleanup = []
 | |
|         return
 | |
| 
 | |
|     @classmethod
 | |
|     def tearDownClass(cls):
 | |
|         super(TestRouting, cls).tearDownClass()
 | |
| 
 | |
|     def tearDown(self):
 | |
|         super(TestRouting, self).tearDown()
 | |
| 
 | |
|     @attr(tags=["advancedsg", "smoke"], required_hardware="true")
 | |
|     def test_routing_tables(self):
 | |
|         """Test routing table in case we have IP associated with a network which is in
 | |
|             different pubic IP range from that of public IP range that has source NAT IP.
 | |
|             When IP is associated we should see a new route table created.
 | |
|             When IP is associated we should see a that route table is deleted.
 | |
|         """
 | |
| 
 | |
|         # Validate the following:
 | |
|         # 1. Create a new public IP range and dedicate to a account
 | |
|         # 2. Acquire a IP from new public range
 | |
|         # 3. Create a firewall rule to open up the port, so that IP is associated with network
 | |
|         # 5. Login to VR and verify routing tables, there should be Table_eth3
 | |
|         # 6. Delete firewall rule, since its last IP, routing table Table_eth3 should be deleted
 | |
| 
 | |
|         self.services["extrapubliciprange"]["zoneid"] = self.services["zoneid"]
 | |
|         self.public_ip_range = PublicIpRange.create(
 | |
|                                     self.apiclient,
 | |
|                                     self.services["extrapubliciprange"]
 | |
|                                )
 | |
|         self.cleanup.append(self.public_ip_range)
 | |
| 
 | |
|         logger.debug("Dedicating Public IP range to the account");
 | |
|         dedicate_public_ip_range_response = PublicIpRange.dedicate(
 | |
|                                                 self.apiclient,
 | |
|                                                 self.public_ip_range.vlan.id,
 | |
|                                                 account=self.account.name,
 | |
|                                                 domainid=self.account.domainid
 | |
|                                             )
 | |
|         ip_address = PublicIPAddress.create(
 | |
|             self.apiclient,
 | |
|             self.account.name,
 | |
|             self.zone.id,
 | |
|             self.account.domainid,
 | |
|             self.services["virtual_machine"]
 | |
|         )
 | |
|         self.cleanup.append(ip_address)
 | |
| 
 | |
|         # Check if VM is in Running state before creating NAT and firewall rules
 | |
|         vm_response = VirtualMachine.list(
 | |
|             self.apiclient,
 | |
|             id=self.virtual_machine.id
 | |
|         )
 | |
| 
 | |
|         self.assertEqual(
 | |
|             isinstance(vm_response, list),
 | |
|             True,
 | |
|             "Check list VM returns a valid list"
 | |
|         )
 | |
| 
 | |
|         self.assertNotEqual(
 | |
|             len(vm_response),
 | |
|             0,
 | |
|             "Check Port Forwarding Rule is created"
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             vm_response[0].state,
 | |
|             'Running',
 | |
|             "VM state should be Running before creating Firewall rule."
 | |
|         )
 | |
| 
 | |
|         # Open up firewall port for SSH, this will associate IP with VR
 | |
|         firewall_rule = FireWallRule.create(
 | |
|             self.apiclient,
 | |
|             ipaddressid=ip_address.ipaddress.id,
 | |
|             protocol=self.services["natrule"]["protocol"],
 | |
|             cidrlist=['0.0.0.0/0'],
 | |
|             startport=self.services["natrule"]["publicport"],
 | |
|             endport=self.services["natrule"]["publicport"]
 | |
|         )
 | |
|         self.cleanup.append(firewall_rule)
 | |
| 
 | |
|         # Get the router details associated with account
 | |
|         routers = list_routers(
 | |
|             self.apiclient,
 | |
|             account=self.account.name,
 | |
|             domainid=self.account.domainid,
 | |
|         )
 | |
|         router = routers[0]
 | |
| 
 | |
|         if (self.hypervisor.lower() == 'vmware'
 | |
|                 or self.hypervisor.lower() == 'hyperv'):
 | |
|             result = get_process_status(
 | |
|                 self.apiclient.connection.mgtSvr,
 | |
|                 22,
 | |
|                 self.apiclient.connection.user,
 | |
|                 self.apiclient.connection.passwd,
 | |
|                 router.linklocalip,
 | |
|                 'ip route list table Table_eth3',
 | |
|                 hypervisor=self.hypervisor
 | |
|             )
 | |
|         else:
 | |
|             hosts = list_hosts(
 | |
|                 self.apiclient,
 | |
|                 id=router.hostid,
 | |
|             )
 | |
|             self.assertEqual(
 | |
|                 isinstance(hosts, list),
 | |
|                 True,
 | |
|                 "Check for list hosts response return valid data"
 | |
|             )
 | |
|             host = hosts[0]
 | |
|             host.user = self.hostConfig['username']
 | |
|             host.passwd = self.hostConfig['password']
 | |
|             try:
 | |
|                 result = get_process_status(
 | |
|                     host.ipaddress,
 | |
|                     22,
 | |
|                     host.user,
 | |
|                     host.passwd,
 | |
|                     router.linklocalip,
 | |
|                     'ip route list table Table_eth3'
 | |
|                 )
 | |
|             except KeyError:
 | |
|                 self.skipTest(
 | |
|                     "Provide a marvin config file with host\
 | |
|                             credentials to run %s" %
 | |
|                     self._testMethodName)
 | |
| 
 | |
|         logger.debug("ip route list table Table_eth3: %s" % result)
 | |
|         public_range_gateway = self.services["publiciprange"]["gateway"]
 | |
|         default_route_rule = "default via " + public_range_gateway + " dev eth3  proto static"
 | |
|         logger.debug("default route result: %s" % str(result[0]))
 | |
|         self.assertEqual(
 | |
|             default_route_rule,
 | |
|             str(result[0]),
 | |
|             "Check default route table entry for public ip range"
 | |
|         )
 | |
| 
 | |
|         res = str(result)
 | |
|         self.assertEqual(
 | |
|             res.count("throw") == 2,
 | |
|             True,
 | |
|             "Check routing rules to throw rest of the traffic. Count shoule be Atleast 2 for the control and guest traffic "
 | |
|         )
 | |
| 
 | |
|         firewall_rule.delete(self.apiclient)
 | |
|         self.cleanup.remove(firewall_rule)
 | |
| 
 | |
|         if (self.hypervisor.lower() == 'vmware'
 | |
|                 or self.hypervisor.lower() == 'hyperv'):
 | |
|             result = get_process_status(
 | |
|                 self.apiclient.connection.mgtSvr,
 | |
|                 22,
 | |
|                 self.apiclient.connection.user,
 | |
|                 self.apiclient.connection.passwd,
 | |
|                 router.linklocalip,
 | |
|                 'ip route list table Table_eth3',
 | |
|                 hypervisor=self.hypervisor
 | |
|             )
 | |
|         else:
 | |
|             hosts = list_hosts(
 | |
|                 self.apiclient,
 | |
|                 id=router.hostid,
 | |
|             )
 | |
|             self.assertEqual(
 | |
|                 isinstance(hosts, list),
 | |
|                 True,
 | |
|                 "Check for list hosts response return valid data"
 | |
|             )
 | |
|             host = hosts[0]
 | |
|             host.user = self.hostConfig['username']
 | |
|             host.passwd = self.hostConfig['password']
 | |
|             try:
 | |
|                 result = get_process_status(
 | |
|                     host.ipaddress,
 | |
|                     22,
 | |
|                     host.user,
 | |
|                     host.passwd,
 | |
|                     router.linklocalip,
 | |
|                     'ip route list table Table_eth3'
 | |
|                 )
 | |
|             except KeyError:
 | |
|                 self.skipTest(
 | |
|                     "Provide a marvin config file with host\
 | |
|                             credentials to run %s" %
 | |
|                     self._testMethodName)
 | |
| 
 | |
|         logger.debug("ip route list table Table_eth3: %s" % result)
 | |
|         res = str(result)
 | |
|         self.assertEqual(
 | |
|             res.count("default via"),
 | |
|             0,
 | |
|             "Check to ensure there should not be any default rule"
 | |
|         )
 | |
| 
 | |
|         self.assertEqual(
 | |
|             res.count("throw"),
 | |
|             0,
 | |
|             "Check to ensure there should not be any throw rule"
 | |
|         )
 | |
| 
 | |
| class TestIptables(cloudstackTestCase):
 | |
| 
 | |
|     @classmethod
 | |
|     def setUpClass(cls):
 | |
| 
 | |
|         testClient = super(TestIptables, cls).getClsTestClient()
 | |
|         cls.apiclient = testClient.getApiClient()
 | |
|         cls.services = Services().services
 | |
|         cls.hypervisor = testClient.getHypervisorInfo()
 | |
|         # Get Zone, Domain and templates
 | |
|         cls.domain = get_domain(cls.apiclient)
 | |
|         cls.zone = get_zone(cls.apiclient, testClient.getZoneForTests())
 | |
|         # cls.services["virtual_machine"]["zoneid"] = cls.zone.id
 | |
|         cls.services["zoneid"] = cls.zone.id
 | |
| 
 | |
|         template = get_template(
 | |
|             cls.apiclient,
 | |
|             cls.zone.id,
 | |
|             cls.services["ostype"]
 | |
|         )
 | |
|         if template == FAILED:
 | |
|             assert False, "get_template() failed to return template with description %s" % cls.services[
 | |
|                 "ostype"]
 | |
| 
 | |
|         cls._cleanup = []
 | |
|         cls.account = Account.create(
 | |
|             cls.apiclient,
 | |
|             cls.services["account"],
 | |
|             admin=True,
 | |
|             domainid=cls.domain.id
 | |
|         )
 | |
|         cls._cleanup.append(cls.account)
 | |
|         cls.services["publiciprange"]["zoneid"] = cls.zone.id
 | |
|         cls.service_offering = ServiceOffering.create(
 | |
|             cls.apiclient,
 | |
|             cls.services["service_offering"]
 | |
|         )
 | |
|         cls._cleanup.append(cls.service_offering)
 | |
|         cls.hostConfig = cls.config.__dict__["zones"][0].__dict__["pods"][0].__dict__["clusters"][0].__dict__["hosts"][0].__dict__
 | |
|         cls.virtual_machine = VirtualMachine.create(
 | |
|             cls.apiclient,
 | |
|             cls.services["virtual_machine"],
 | |
|             zoneid = cls.services["zoneid"],
 | |
|             templateid=template.id,
 | |
|             accountid=cls.account.name,
 | |
|             domainid=cls.account.domainid,
 | |
|             serviceofferingid=cls.service_offering.id
 | |
|         )
 | |
|         cls._cleanup.append(cls.virtual_machine)
 | |
| 
 | |
|     def setUp(self):
 | |
|         self.apiclient = self.testClient.getApiClient()
 | |
|         self.cleanup = []
 | |
|         return
 | |
| 
 | |
|     @classmethod
 | |
|     def tearDownClass(cls):
 | |
|         super(TestIptables, cls).tearDownClass()
 | |
| 
 | |
|     def tearDown(self):
 | |
|         super(TestIptables, self).tearDown()
 | |
| 
 | |
|     @attr(tags=["advancedsg", "smoke"], required_hardware="true")
 | |
|     def test_iptable_rules(self):
 | |
|         """Test iptable rules in case we have IP associated with a network which is in
 | |
|             different pubic IP range from that of public IP range that has source NAT IP.
 | |
|             When IP is associated we should see a rule '-i eth3 -o eth0 -m state --state RELATED,ESTABLISHED -j ACCEPT' in FORWARD table.
 | |
|             When IP is dis-associated we should see a rule in the FORWARD table is deleted.
 | |
|         """
 | |
| 
 | |
|         # Validate the following:
 | |
|         # 1. Create a new public IP range and dedicate to a account
 | |
|         # 2. Acquire a IP from new public range
 | |
|         # 3. Create a firewall rule to open up the port, so that IP is associated with network
 | |
|         # 5. Login to VR and verify routing tables, there should be Table_eth3
 | |
|         # 6. Delete firewall rule, since its last IP, routing table Table_eth3 should be deleted
 | |
| 
 | |
|         self.services["extrapubliciprange"]["zoneid"] = self.services["zoneid"]
 | |
|         self.public_ip_range = PublicIpRange.create(
 | |
|                                     self.apiclient,
 | |
|                                     self.services["extrapubliciprange"]
 | |
|                                )
 | |
|         self.cleanup.append(self.public_ip_range)
 | |
| 
 | |
|         logger.debug("Dedicating Public IP range to the account");
 | |
|         dedicate_public_ip_range_response = PublicIpRange.dedicate(
 | |
|                                                 self.apiclient,
 | |
|                                                 self.public_ip_range.vlan.id,
 | |
|                                                 account=self.account.name,
 | |
|                                                 domainid=self.account.domainid
 | |
|                                             )
 | |
|         ip_address = PublicIPAddress.create(
 | |
|             self.apiclient,
 | |
|             self.account.name,
 | |
|             self.zone.id,
 | |
|             self.account.domainid,
 | |
|             self.services["virtual_machine"]
 | |
|         )
 | |
|         self.cleanup.append(ip_address)
 | |
|         # Check if VM is in Running state before creating NAT and firewall rules
 | |
|         vm_response = VirtualMachine.list(
 | |
|             self.apiclient,
 | |
|             id=self.virtual_machine.id
 | |
|         )
 | |
| 
 | |
|         self.assertEqual(
 | |
|             isinstance(vm_response, list),
 | |
|             True,
 | |
|             "Check list VM returns a valid list"
 | |
|         )
 | |
| 
 | |
|         self.assertNotEqual(
 | |
|             len(vm_response),
 | |
|             0,
 | |
|             "Check Port Forwarding Rule is created"
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             vm_response[0].state,
 | |
|             'Running',
 | |
|             "VM state should be Running before creating a NAT rule."
 | |
|         )
 | |
| 
 | |
|         # Open up firewall port for SSH
 | |
|         firewall_rule = FireWallRule.create(
 | |
|             self.apiclient,
 | |
|             ipaddressid=ip_address.ipaddress.id,
 | |
|             protocol=self.services["natrule"]["protocol"],
 | |
|             cidrlist=['0.0.0.0/0'],
 | |
|             startport=self.services["natrule"]["publicport"],
 | |
|             endport=self.services["natrule"]["publicport"]
 | |
|         )
 | |
|         self.cleanup.append(firewall_rule)
 | |
|         # Get the router details associated with account
 | |
|         routers = list_routers(
 | |
|             self.apiclient,
 | |
|             account=self.account.name,
 | |
|             domainid=self.account.domainid,
 | |
|         )
 | |
|         router = routers[0]
 | |
| 
 | |
|         if (self.hypervisor.lower() == 'vmware'
 | |
|                 or self.hypervisor.lower() == 'hyperv'):
 | |
|             result = get_process_status(
 | |
|                 self.apiclient.connection.mgtSvr,
 | |
|                 22,
 | |
|                 self.apiclient.connection.user,
 | |
|                 self.apiclient.connection.passwd,
 | |
|                 router.linklocalip,
 | |
|                 'iptables -t filter -L FORWARD  -v',
 | |
|                 hypervisor=self.hypervisor
 | |
|             )
 | |
|         else:
 | |
|             hosts = list_hosts(
 | |
|                 self.apiclient,
 | |
|                 id=router.hostid,
 | |
|             )
 | |
|             self.assertEqual(
 | |
|                 isinstance(hosts, list),
 | |
|                 True,
 | |
|                 "Check for list hosts response return valid data"
 | |
|             )
 | |
|             host = hosts[0]
 | |
|             host.user = self.hostConfig['username']
 | |
|             host.passwd = self.hostConfig['password']
 | |
|             try:
 | |
|                 result = get_process_status(
 | |
|                     host.ipaddress,
 | |
|                     22,
 | |
|                     host.user,
 | |
|                     host.passwd,
 | |
|                     router.linklocalip,
 | |
|                     'iptables -t filter -L FORWARD  -v'
 | |
|                 )
 | |
|             except KeyError:
 | |
|                 self.skipTest(
 | |
|                     "Provide a marvin config file with host\
 | |
|                             credentials to run %s" %
 | |
|                     self._testMethodName)
 | |
| 
 | |
|         logger.debug("iptables -t filter -L FORWARD  -v: %s" % result)
 | |
|         res = str(result)
 | |
|         self.assertEqual(
 | |
|             res.count("eth3   eth0    anywhere             anywhere             state RELATED,ESTABLISHED"),
 | |
|             1,
 | |
|             "Check to ensure there is a iptable rule to accept the RELATED,ESTABLISHED traffic"
 | |
|         )
 | |
|         firewall_rule.delete(self.apiclient)
 | |
|         self.cleanup.remove(firewall_rule)
 | |
| 
 | |
| class TestVPCPortForwarding(cloudstackTestCase):
 | |
| 
 | |
|     @classmethod
 | |
|     def setUpClass(cls):
 | |
|         socket.setdefaulttimeout(60)
 | |
|         cls.api_client = cls.testClient.getApiClient()
 | |
|         cls.services = Services().services
 | |
| 
 | |
|         # Get Zone, Domain and templates
 | |
|         cls.domain = get_domain(cls.api_client)
 | |
|         cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests())
 | |
|         cls.template = get_template(
 | |
|                                     cls.api_client,
 | |
|                                     cls.zone.id,
 | |
|                                     cls.services["ostype"]
 | |
|                                     )
 | |
|         cls.services["virtual_machine"]["zoneid"] = cls.zone.id
 | |
|         cls.services["virtual_machine"]["template"] = cls.template.id
 | |
|         cls.services["publiciprange"]["zoneid"] = cls.zone.id
 | |
| 
 | |
|         cls.service_offering = ServiceOffering.create(
 | |
|                                                         cls.api_client,
 | |
|                                                         cls.services["service_offering"]
 | |
|                                                         )
 | |
|         cls._cleanup = [cls.service_offering]
 | |
|         return
 | |
| 
 | |
| 
 | |
|     @classmethod
 | |
|     def tearDownClass(cls):
 | |
|         super(TestVPCPortForwarding, cls).tearDownClass()
 | |
| 
 | |
|     def setUp(self):
 | |
|         self.apiclient = self.testClient.getApiClient()
 | |
|         self.cleanup = []
 | |
|         self.account = Account.create(
 | |
|                                                 self.apiclient,
 | |
|                                                 self.services["account"],
 | |
|                                                 admin=True,
 | |
|                                                 domainid=self.domain.id
 | |
|                                                 )
 | |
|         self.cleanup.append(self.account)
 | |
|         logger.debug("Creating a VPC offering..")
 | |
|         self.vpc_off = VpcOffering.create(
 | |
|                                                 self.apiclient,
 | |
|                                                 self.services["vpc_offering"]
 | |
|                                                 )
 | |
|         self.cleanup.append(self.vpc_off)
 | |
|         logger.debug("Enabling the VPC offering created")
 | |
|         self.vpc_off.update(self.apiclient, state='Enabled')
 | |
| 
 | |
|         logger.debug("Creating a VPC network in the account: %s" % self.account.name)
 | |
|         self.services["vpc"]["cidr"] = '10.1.0.0/16'
 | |
|         self.vpc = VPC.create(
 | |
|                                 self.apiclient,
 | |
|                                 self.services["vpc"],
 | |
|                                 vpcofferingid=self.vpc_off.id,
 | |
|                                 zoneid=self.zone.id,
 | |
|                                 account=self.account.name,
 | |
|                                 domainid=self.account.domainid
 | |
|                                 )
 | |
|         self.cleanup.append(self.vpc)
 | |
|         return
 | |
| 
 | |
|     def tearDown(self):
 | |
|         super(TestVPCPortForwarding, self).tearDown()
 | |
| 
 | |
|     def create_natrule(self, vm, public_ip, network, services=None):
 | |
|         logger.debug("Creating NAT rule in network for vm with public IP")
 | |
|         if not services:
 | |
|             services = self.services["natrule"]
 | |
|         nat_rule = NATRule.create(self.apiclient,
 | |
|                                             vm,
 | |
|                                             services,
 | |
|                                             ipaddressid=public_ip.ipaddress.id,
 | |
|                                             openfirewall=False,
 | |
|                                             networkid=network.id,
 | |
|                                             vpcid=self.vpc.id
 | |
|                                             )
 | |
|         self.cleanup.append(nat_rule)
 | |
|         return nat_rule
 | |
| 
 | |
|     def acquire_publicip(self, network):
 | |
|         logger.debug("Associating public IP for network: %s" % network.name)
 | |
|         public_ip = PublicIPAddress.create(self.apiclient,
 | |
|                                         accountid=self.account.name,
 | |
|                                         zoneid=self.zone.id,
 | |
|                                         domainid=self.account.domainid,
 | |
|                                         networkid=network.id,
 | |
|                                         vpcid=self.vpc.id
 | |
|                                         )
 | |
|         self.cleanup.append(public_ip)
 | |
|         logger.debug("Associated %s with network %s" % (public_ip.ipaddress.ipaddress,
 | |
|                                                     network.id
 | |
|                                                     ))
 | |
|         return public_ip
 | |
| 
 | |
|     def deployvm_in_network(self, network, host_id=None):
 | |
|         try:
 | |
|                 logger.debug('Creating VM in network=%s' % network.name)
 | |
|                 vm = VirtualMachine.create(
 | |
|                                                 self.apiclient,
 | |
|                                                 self.services["virtual_machine"],
 | |
|                                                 accountid=self.account.name,
 | |
|                                                 domainid=self.account.domainid,
 | |
|                                                 serviceofferingid=self.service_offering.id,
 | |
|                                                 networkids=[str(network.id)],
 | |
|                                                 hostid=host_id
 | |
|                                                 )
 | |
|                 self.cleanup.append(vm)
 | |
|                 logger.debug('Created VM=%s in network=%s' % (vm.id, network.name))
 | |
| 
 | |
|                 return vm
 | |
|         except:
 | |
|                 self.fail('Unable to create VM in a Network=%s' % network.name)
 | |
| 
 | |
|     @attr(tags=["advancedsg", "intervlan"], required_hardware="true")
 | |
|     def test_network_services_VPC_CreatePF(self):
 | |
|         """ Test Create VPC PF rules on acquired public ip when VpcVirtualRouter is Running
 | |
|         """
 | |
| 
 | |
|         # Validate the following
 | |
|         # 1. Create a VPC with cidr - 10.1.1.1/16
 | |
|         # 2. Create a Network offering - NO1 with all supported services
 | |
|         # 3. Add network1(10.1.1.1/24) using N01 to this VPC.
 | |
|         # 4. Deploy vm1 in network1.
 | |
|         # 5. Use the Create PF rule for vm in network1.
 | |
|         # 6. Successfully ssh into the Guest VM using the PF rule
 | |
| 
 | |
|         network_1 = self.create_network(self.services["network_offering"])
 | |
|         vm_1 = self.deployvm_in_network(network_1)
 | |
|         self.services["extrapubliciprange"]["zoneid"] = self.services["zoneid"]
 | |
|         self.public_ip_range = PublicIpRange.create(
 | |
|                                     self.apiclient,
 | |
|                                     self.services["extrapubliciprange"]
 | |
|                                )
 | |
|         self.cleanup.append(self.public_ip_range)
 | |
|         logger.debug("Dedicating Public IP range to the account");
 | |
|         dedicate_public_ip_range_response = PublicIpRange.dedicate(
 | |
|                                                 self.apiclient,
 | |
|                                                 self.public_ip_range.vlan.id,
 | |
|                                                 account=self.account.name,
 | |
|                                                 domainid=self.account.domainid
 | |
|                                             )
 | |
|         public_ip_1 = self.acquire_publicip(network_1)
 | |
|         self.create_natrule( vm_1, public_ip_1, network_1)
 | |
|         self.check_ssh_into_vm(vm_1, public_ip_1, testnegative=False)
 | |
|         self.public_ip_range.release(self.apiclient)
 | |
|         self.cleanup.remove(self.public_ip_range)
 | |
|         return
 | |
| 
 | |
| class TestVPCStaticNat(cloudstackTestCase):
 | |
| 
 | |
|     @classmethod
 | |
|     def setUpClass(cls):
 | |
| 
 | |
|         socket.setdefaulttimeout(60)
 | |
| 
 | |
|         testClient = super(TestVPCStaticNat, cls).getClsTestClient()
 | |
|         cls.api_client = cls.testClient.getApiClient()
 | |
|         cls.services = Services().services
 | |
| 
 | |
|         # Get Zone, Domain and templates
 | |
|         cls.domain = get_domain(cls.api_client)
 | |
|         cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests())
 | |
|         cls.template = get_template(
 | |
|                                     cls.api_client,
 | |
|                                     cls.zone.id,
 | |
|                                     cls.services["ostype"]
 | |
|                                     )
 | |
|         cls.services["virtual_machine"]["zoneid"] = cls.zone.id
 | |
|         cls.services["virtual_machine"]["template"] = cls.template.id
 | |
|         cls.services["publiciprange"]["zoneid"] = cls.zone.id
 | |
| 
 | |
|         cls.service_offering = ServiceOffering.create(
 | |
|             cls.api_client,
 | |
|             cls.services["service_offering"]
 | |
|         )
 | |
|         cls._cleanup = [cls.service_offering]
 | |
|         return
 | |
| 
 | |
| 
 | |
|     @classmethod
 | |
|     def tearDownClass(cls):
 | |
|         super(TestVPCStaticNat, cls).tearDownClass()
 | |
| 
 | |
|     def setUp(self):
 | |
|         self.apiclient = self.testClient.getApiClient()
 | |
|         self.cleanup = []
 | |
|         self.account = Account.create(
 | |
|                                                 self.apiclient,
 | |
|                                                 self.services["account"],
 | |
|                                                 admin=True,
 | |
|                                                 domainid=self.domain.id
 | |
|                                                 )
 | |
|         self.cleanup.append(self.account)
 | |
|         logger.debug("Creating a VPC offering..")
 | |
|         self.vpc_off = VpcOffering.create(
 | |
|                                                 self.apiclient,
 | |
|                                                 self.services["vpc_offering"]
 | |
|                                                 )
 | |
|         self.cleanup.append(self.vpc_off)
 | |
|         logger.debug("Enabling the VPC offering created")
 | |
|         self.vpc_off.update(self.apiclient, state='Enabled')
 | |
| 
 | |
|         logger.debug("Creating a VPC network in the account: %s" % self.account.name)
 | |
|         self.services["vpc"]["cidr"] = '10.1.0.0/16'
 | |
|         self.vpc = VPC.create(
 | |
|                                 self.apiclient,
 | |
|                                 self.services["vpc"],
 | |
|                                 vpcofferingid=self.vpc_off.id,
 | |
|                                 zoneid=self.zone.id,
 | |
|                                 account=self.account.name,
 | |
|                                 domainid=self.account.domainid
 | |
|                                 )
 | |
|         self.cleanup.append(self.vpc)
 | |
|         return
 | |
| 
 | |
|     def tearDown(self):
 | |
|         super(TestVPCStaticNat, self).tearDown()
 | |
| 
 | |
|     def acquire_publicip(self, network):
 | |
|         logger.debug("Associating public IP for network: %s" % network.name)
 | |
|         public_ip = PublicIPAddress.create(self.apiclient,
 | |
|                                         accountid=self.account.name,
 | |
|                                         zoneid=self.zone.id,
 | |
|                                         domainid=self.account.domainid,
 | |
|                                         networkid=network.id,
 | |
|                                         vpcid=self.vpc.id
 | |
|                                         )
 | |
|         self.cleanup.append(public_ip)
 | |
|         logger.debug("Associated %s with network %s" % (public_ip.ipaddress.ipaddress,
 | |
|                                                     network.id
 | |
|                                                     ))
 | |
|         return public_ip
 | |
| 
 | |
|     def deployvm_in_network(self, network, host_id=None):
 | |
|         try:
 | |
|                 logger.debug('Creating VM in network=%s' % network.name)
 | |
|                 vm = VirtualMachine.create(
 | |
|                                                 self.apiclient,
 | |
|                                                 self.services["virtual_machine"],
 | |
|                                                 accountid=self.account.name,
 | |
|                                                 domainid=self.account.domainid,
 | |
|                                                 serviceofferingid=self.service_offering.id,
 | |
|                                                 networkids=[str(network.id)],
 | |
|                                                 hostid=host_id
 | |
|                                                 )
 | |
|                 self.cleanup.append(vm)
 | |
|                 logger.debug('Created VM=%s in network=%s' % (vm.id, network.name))
 | |
| 
 | |
|                 return vm
 | |
|         except:
 | |
|                 self.fail('Unable to create VM in a Network=%s' % network.name)
 | |
| 
 | |
|     def create_StaticNatRule_For_VM(self, vm, public_ip, network, services=None):
 | |
|         logger.debug("Enabling static NAT for IP: %s" %public_ip.ipaddress.ipaddress)
 | |
|         if not services:
 | |
|             services = self.services["natrule"]
 | |
|         try:
 | |
|                 StaticNATRule.enable(
 | |
|                                         self.apiclient,
 | |
|                                         ipaddressid=public_ip.ipaddress.id,
 | |
|                                         virtualmachineid=vm.id,
 | |
|                                         networkid=network.id
 | |
|                                         )
 | |
|                 logger.debug("Static NAT enabled for IP: %s" %
 | |
|                                                         public_ip.ipaddress.ipaddress)
 | |
|                 logger.debug("Adding NetworkACL rules to make NAT rule accessible")
 | |
|         except Exception as e:
 | |
|                 self.fail("Failed to enable static NAT on IP: %s - %s" % (
 | |
|                                                     public_ip.ipaddress.ipaddress, e))
 | |
| 
 | |
|     @attr(tags=["advancedsg", "intervlan"], required_hardware="true")
 | |
|     def test_network_services_VPC_CreatePF(self):
 | |
|         """ Test Create VPC PF rules on acquired public ip when VpcVirtualRouter is Running
 | |
|         """
 | |
| 
 | |
|         # Validate the following
 | |
|         # 1. Create a VPC with cidr - 10.1.1.1/16
 | |
|         # 2. Create a Network offering - NO1 with all supported services
 | |
|         # 3. Add network1(10.1.1.1/24) using N01 to this VPC.
 | |
|         # 4. Deploy vm1 in network1.
 | |
|         # 5. Use the Create PF rule for vm in network1.
 | |
|         # 6. Successfully ssh into the Guest VM using the PF rule
 | |
| 
 | |
|         network_1 = self.create_network(self.services["network_offering"])
 | |
|         vm_1 = self.deployvm_in_network(network_1)
 | |
|         self.public_ip_range = PublicIpRange.create(
 | |
|                                     self.apiclient,
 | |
|                                     self.services["publiciprange"]
 | |
|                                )
 | |
|         self.cleanup.append(self.public_ip_range)
 | |
|         logger.debug("Dedicating Public IP range to the account");
 | |
|         dedicate_public_ip_range_response = PublicIpRange.dedicate(
 | |
|                                                 self.apiclient,
 | |
|                                                 self.public_ip_range.vlan.id,
 | |
|                                                 account=self.account.name,
 | |
|                                                 domainid=self.account.domainid
 | |
|                                             )
 | |
|         public_ip_1 = self.acquire_publicip(network_1)
 | |
|         self.create_StaticNatRule_For_VM( vm_1, public_ip_1, network_1)
 | |
|         self.check_ssh_into_vm(vm_1, public_ip_1, testnegative=False)
 | |
|         self.public_ip_range.release(self.apiclient)
 | |
|         self.cleanup.remove(self.public_ip_range)
 | |
|         return
 |