Merge pull request #1023 from ekholabs/fix/egress_state-CLOUDSTACK-8925

CLOUDSTACK-8925 - Default allow for Egress rules is not being configured properly in VR iptables rulesThis PR fixes the router default policy for egress. When the default is DENY, the router still allows outgoing connections.

The test component/test_routers_network_ops.py was improved to cover that case as well. The results were:

Test redundant router internals ... === TestName: test_01_isolate_network_FW_PF_default_routes_egress_true | Status : SUCCESS ===
ok
Test redundant router internals ... === TestName: test_02_isolate_network_FW_PF_default_routes_egress_false | Status : SUCCESS ===
ok
Test redundant router internals ... === TestName: test_01_RVR_Network_FW_PF_SSH_default_routes_egress_true | Status : SUCCESS ===
ok
Test redundant router internals ... === TestName: test_02_RVR_Network_FW_PF_SSH_default_routes_egress_false | Status : SUCCESS ===
ok

----------------------------------------------------------------------
Ran 4 tests in 3636.656s

OK
/tmp//MarvinLogs/test_routers_network_ops_QDL429/results.txt (END)

* pr/1023:
  CLOUDSTACK-8925 - Implement the default egress DENY/ALLOW properly
  CLOUDSTACK-8925 - Improve the default egress tests in order to cover newly entered rules
  CLOUDSTACK-8925 - Add egress dataset to test_data.py
  CLOUDSTACK-8925 - Drop the traffic when default egress is set to false

Signed-off-by: Remi Bergsma <github@remi.nl>
This commit is contained in:
Remi Bergsma 2015-11-04 14:59:02 +01:00
commit f948e96299
3 changed files with 534 additions and 94 deletions

View File

@ -95,14 +95,15 @@ class CsAcl(CsDataBag):
if 'src_port_range' in obj:
self.rule['first_port'] = obj['src_port_range'][0]
self.rule['last_port'] = obj['src_port_range'][1]
self.rule['allowed'] = True
self.rule['allowed'] = True
self.rule['action'] = "ACCEPT"
if self.rule['type'] == 'all' and not obj['source_cidr_list']:
self.rule['cidr'] = ['0.0.0.0/0']
else:
self.rule['cidr'] = obj['source_cidr_list']
self.rule['action'] = "ACCEPT"
logging.debug("AclIP created for rule ==> %s", self.rule)
def create(self):
@ -151,7 +152,25 @@ class CsAcl(CsDataBag):
" -m %s " % rule['protocol'] +
" --icmp-type %s -j %s" % (icmp_type, self.rule['action'])])
else:
fwr = " -A FW_EGRESS_RULES"
fwr = " -I FW_EGRESS_RULES"
#In case we have a default rule (accept all or drop all), we have to evaluate the action again.
if rule['type'] == 'all' and not rule['source_cidr_list']:
fwr = " -A FW_EGRESS_RULES"
# For default egress ALLOW or DENY, the logic is inverted.
# Having default_egress_policy == True, means that the default rule should have ACCEPT,
# otherwise DROP. The rule should be appended, not inserted.
if self.rule['default_egress_policy']:
self.rule['action'] = "ACCEPT"
else:
self.rule['action'] = "DROP"
else:
# For other rules added, if default_egress_policy == True, following rules should be DROP,
# otherwise ACCEPT
if self.rule['default_egress_policy']:
self.rule['action'] = "DROP"
else:
self.rule['action'] = "ACCEPT"
if rule['protocol'] != "all":
fwr += " -s %s " % cidr + \
" -p %s " % rule['protocol'] + \
@ -226,7 +245,7 @@ class CsAcl(CsDataBag):
self.protocol = rule['protocol']
self.action = "DROP"
self.dport = ""
if 'allowed' in rule.keys() and rule['allowed'] and rule['allowed']:
if 'allowed' in rule.keys() and rule['allowed']:
self.action = "ACCEPT"
if 'first_port' in rule.keys():
self.dport = "-m %s --dport %s" % (self.protocol, rule['first_port'])

View File

@ -33,7 +33,8 @@ from marvin.lib.base import (ServiceOffering,
PublicIPAddress,
NetworkOffering,
Network,
Router)
Router,
EgressFireWallRule)
from marvin.lib.common import (get_zone,
get_template,
get_domain,
@ -50,11 +51,27 @@ from marvin.lib.common import (get_zone,
import time
import logging
def check_router_command(virtual_machine, public_ip, ssh_command, check_string, test_case, retries=5):
result = 'failed'
try:
ssh = virtual_machine.get_ssh_client(ipaddress=public_ip, retries=retries)
result = str(ssh.execute(ssh_command))
except Exception as e:
test_case.fail("Failed to SSH into the Virtual Machine: %s" % e)
logging.debug("Result from SSH into the Virtual Machine: %s" % result)
return result.count(check_string)
class TestRedundantIsolateNetworks(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.logger = logging.getLogger('TestRedundantIsolateNetworks')
cls.stream_handler = logging.StreamHandler()
cls.logger.setLevel(logging.DEBUG)
cls.logger.addHandler(cls.stream_handler)
cls.testClient = super(TestRedundantIsolateNetworks, cls).getClsTestClient()
cls.api_client = cls.testClient.getApiClient()
@ -82,25 +99,49 @@ class TestRedundantIsolateNetworks(cloudstackTestCase):
cls.services["service_offering"]
)
cls.services["nw_off_persistent_RVR"]["egress_policy"] = "true"
cls.services["nw_off_persistent_RVR_egress_true"] = cls.services["nw_off_persistent_RVR"].copy()
cls.services["nw_off_persistent_RVR_egress_true"]["egress_policy"] = "true"
cls.network_offering = NetworkOffering.create(
cls.services["nw_off_persistent_RVR_egress_false"] = cls.services["nw_off_persistent_RVR"].copy()
cls.services["nw_off_persistent_RVR_egress_false"]["egress_policy"] = "false"
cls.logger.debug("Creating Network Offering with default egress TRUE")
cls.network_offering_egress_true = NetworkOffering.create(
cls.api_client,
cls.services["nw_off_persistent_RVR"],
cls.services["nw_off_persistent_RVR_egress_true"],
conservemode=True
)
cls.network_offering.update(cls.api_client, state='Enabled')
cls.network_offering_egress_true.update(cls.api_client, state='Enabled')
cls.logger.debug("Creating Network Offering with default egress FALSE")
cls.network_offering_egress_false = NetworkOffering.create(
cls.api_client,
cls.services["nw_off_persistent_RVR_egress_false"],
conservemode=True
)
cls.network_offering_egress_false.update(cls.api_client, state='Enabled')
cls.services["egress_80"] = {
"startport": 80,
"endport": 80,
"protocol": "TCP",
"cidrlist": ["0.0.0.0/0"]
}
cls.services["egress_53"] = {
"startport": 53,
"endport": 53,
"protocol": "UDP",
"cidrlist": ["0.0.0.0/0"]
}
cls._cleanup = [
cls.service_offering,
cls.network_offering,
cls.network_offering_egress_true,
cls.network_offering_egress_false,
cls.account
]
cls.logger = logging.getLogger('TestRedundantIsolateNetworks')
cls.stream_handler = logging.StreamHandler()
cls.logger.setLevel(logging.DEBUG)
cls.logger.addHandler(cls.stream_handler)
return
@classmethod
@ -113,14 +154,7 @@ class TestRedundantIsolateNetworks(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.account = Account.create(
self.apiclient,
self.services["account"],
admin=True,
domainid=self.domain.id
)
self.cleanup = []
self.cleanup.insert(0, self.account)
return
def tearDown(self):
@ -131,17 +165,17 @@ class TestRedundantIsolateNetworks(cloudstackTestCase):
return
@attr(tags=["advanced", "advancedns", "ssh"], required_hardware="true")
def test_RVR_Network_FW_PF_SSH_default_routes(self):
def test_01_RVR_Network_FW_PF_SSH_default_routes_egress_true(self):
""" Test redundant router internals """
self.logger.debug("Starting test_RVR_Network_FW_PF_SSH_default_routes...")
self.logger.debug("Starting test_01_RVR_Network_FW_PF_SSH_default_routes_egress_true...")
self.logger.debug("Creating network with network offering: %s" % self.network_offering.id)
self.logger.debug("Creating network with network offering: %s" % self.network_offering_egress_true.id)
network = Network.create(
self.apiclient,
self.services["network"],
accountid=self.account.name,
domainid=self.account.domainid,
networkofferingid=self.network_offering.id,
networkofferingid=self.network_offering_egress_true.id,
zoneid=self.zone.id
)
self.logger.debug("Created network with ID: %s" % network.id)
@ -205,18 +239,161 @@ class TestRedundantIsolateNetworks(cloudstackTestCase):
"Length of the list router should be 2 (Backup & master)"
)
self.logger.debug("Associating public IP for network: %s" % network.name)
public_ip = PublicIPAddress.create(
public_ips = list_publicIP(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid,
zoneid=self.zone.id
)
public_ip = public_ips[0]
self.assertEqual(
isinstance(public_ips, list),
True,
"Check for list public IPs response return valid data"
)
self.logger.debug("Creating Firewall rule for VM ID: %s" % virtual_machine.id)
FireWallRule.create(
self.apiclient,
ipaddressid=public_ip.id,
protocol=self.services["natrule"]["protocol"],
cidrlist=['0.0.0.0/0'],
startport=self.services["natrule"]["publicport"],
endport=self.services["natrule"]["publicport"]
)
self.logger.debug("Creating NAT rule for VM ID: %s" % virtual_machine.id)
nat_rule = NATRule.create(
self.apiclient,
virtual_machine,
self.services["natrule"],
public_ip.id
)
self.cleanup.insert(0, network)
self.cleanup.insert(0, virtual_machine)
# Test SSH after closing port 22
expected = 1
ssh_command = "ping -c 3 8.8.8.8"
check_string = "3 packets received"
result = check_router_command(virtual_machine, nat_rule.ipaddress, ssh_command, check_string, self)
self.assertEqual(
result,
expected,
"Ping to outside world from VM should be successful!"
)
expected = 1
ssh_command = "wget -t 1 -T 5 www.google.com"
check_string = "HTTP request sent, awaiting response... 200 OK"
result = check_router_command(virtual_machine, nat_rule.ipaddress, ssh_command, check_string, self)
self.assertEqual(
result,
expected,
"Attempt to retrieve google.com index page should be successful!"
)
EgressFireWallRule.create(
self.apiclient,
networkid=network.id,
protocol=self.services["egress_80"]["protocol"],
startport=self.services["egress_80"]["startport"],
endport=self.services["egress_80"]["endport"],
cidrlist=self.services["egress_80"]["cidrlist"]
)
expected = 0
ssh_command = "wget -t 1 -T 1 www.google.com"
check_string = "HTTP request sent, awaiting response... 200 OK"
result = check_router_command(virtual_machine, nat_rule.ipaddress, ssh_command, check_string, self)
self.assertEqual(
result,
expected,
"Attempt to retrieve google.com index page should NOT be successful once rule is added!"
)
return
@attr(tags=["advanced", "advancedns", "ssh"], required_hardware="true")
def test_02_RVR_Network_FW_PF_SSH_default_routes_egress_false(self):
""" Test redundant router internals """
self.logger.debug("Starting test_02_RVR_Network_FW_PF_SSH_default_routes_egress_false...")
self.logger.debug("Creating network with network offering: %s" % self.network_offering_egress_false.id)
network = Network.create(
self.apiclient,
self.services["network"],
accountid=self.account.name,
zoneid=self.zone.id,
domainid=self.account.domainid,
networkid=network.id
networkofferingid=self.network_offering_egress_false.id,
zoneid=self.zone.id
)
self.logger.debug("Associated %s with network %s" % (
public_ip.ipaddress.ipaddress,
network.id
))
self.logger.debug("Created network with ID: %s" % network.id)
networks = Network.list(
self.apiclient,
id=network.id,
listall=True
)
self.assertEqual(
isinstance(networks, list),
True,
"List networks should return a valid response for created network"
)
nw_response = networks[0]
self.logger.debug("Deploying VM in account: %s" % self.account.name)
virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
templateid=self.template.id,
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
networkids=[str(network.id)]
)
self.logger.debug("Deployed VM in network: %s" % network.id)
vms = VirtualMachine.list(
self.apiclient,
id=virtual_machine.id,
listall=True
)
self.assertEqual(
isinstance(vms, list),
True,
"List Vms should return a valid list"
)
vm = vms[0]
self.assertEqual(
vm.state,
"Running",
"VM should be in running state after deployment"
)
self.logger.debug("Listing routers for network: %s" % network.name)
routers = Router.list(
self.apiclient,
networkid=network.id,
listall=True
)
self.assertEqual(
isinstance(routers, list),
True,
"list router should return Master and backup routers"
)
self.assertEqual(
len(routers),
2,
"Length of the list router should be 2 (Backup & master)"
)
public_ips = list_publicIP(
self.apiclient,
@ -231,12 +408,12 @@ class TestRedundantIsolateNetworks(cloudstackTestCase):
"Check for list public IPs response return valid data"
)
public_ip_1 = public_ips[0]
public_ip = public_ips[0]
self.logger.debug("Creating Firewall rule for VM ID: %s" % virtual_machine.id)
FireWallRule.create(
self.apiclient,
ipaddressid=public_ip_1.id,
ipaddressid=public_ip.id,
protocol=self.services["natrule"]["protocol"],
cidrlist=['0.0.0.0/0'],
startport=self.services["natrule"]["publicport"],
@ -248,27 +425,61 @@ class TestRedundantIsolateNetworks(cloudstackTestCase):
self.apiclient,
virtual_machine,
self.services["natrule"],
public_ip_1.id
public_ip.id
)
self.cleanup.insert(0, network)
self.cleanup.insert(0, virtual_machine)
result = 'failed'
try:
ssh_command = "ping -c 3 8.8.8.8"
ssh = virtual_machine.get_ssh_client(ipaddress=public_ip.ipaddress.ipaddress, retries=5)
self.logger.debug("Ping to google.com from VM")
result = str(ssh.execute(ssh_command))
self.logger.debug("SSH result: %s; COUNT is ==> %s" % (result, result.count("3 packets received")))
except:
self.fail("Failed to SSH into VM - %s" % (public_ip.ipaddress.ipaddress))
expected = 0
ssh_command = "ping -c 3 8.8.8.8"
check_string = "3 packets received"
result = check_router_command(virtual_machine, nat_rule.ipaddress, ssh_command, check_string, self)
self.assertEqual(
result.count("3 packets received"),
1,
"Ping to outside world from VM should be successful"
result,
expected,
"Ping to outside world from VM should NOT be successful"
)
expected = 0
ssh_command = "wget -t 1 -T 1 www.google.com"
check_string = "HTTP request sent, awaiting response... 200 OK"
result = check_router_command(virtual_machine, nat_rule.ipaddress, ssh_command, check_string, self)
self.assertEqual(
result,
expected,
"Attempt to retrieve google.com index page should NOT be successful"
)
EgressFireWallRule.create(
self.apiclient,
networkid=network.id,
protocol=self.services["egress_80"]["protocol"],
startport=self.services["egress_80"]["startport"],
endport=self.services["egress_80"]["endport"],
cidrlist=self.services["egress_80"]["cidrlist"]
)
EgressFireWallRule.create(
self.apiclient,
networkid=network.id,
protocol=self.services["egress_53"]["protocol"],
startport=self.services["egress_53"]["startport"],
endport=self.services["egress_53"]["endport"],
cidrlist=self.services["egress_53"]["cidrlist"]
)
expected = 1
ssh_command = "wget -t 1 -T 5 www.google.com"
check_string = "HTTP request sent, awaiting response... 200 OK"
result = check_router_command(virtual_machine, nat_rule.ipaddress, ssh_command, check_string, self)
self.assertEqual(
result,
expected,
"Attempt to retrieve google.com index page should be successful once rule is added!"
)
return
@ -279,6 +490,11 @@ class TestIsolatedNetworks(cloudstackTestCase):
@classmethod
def setUpClass(cls):
cls.logger = logging.getLogger('TestIsolatedNetworks')
cls.stream_handler = logging.StreamHandler()
cls.logger.setLevel(logging.DEBUG)
cls.logger.addHandler(cls.stream_handler)
cls.testClient = super(TestIsolatedNetworks, cls).getClsTestClient()
cls.api_client = cls.testClient.getApiClient()
@ -287,7 +503,7 @@ class TestIsolatedNetworks(cloudstackTestCase):
cls.domain = get_domain(cls.api_client)
cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests())
cls.services['mode'] = cls.zone.networktype
template = get_template(
cls.template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostype"]
@ -306,42 +522,40 @@ class TestIsolatedNetworks(cloudstackTestCase):
cls.services["service_offering"]
)
cls.services["network_offering"]["egress_policy"] = "true"
cls.services["network_offering_egress_true"] = cls.services["network_offering"].copy()
cls.services["network_offering_egress_true"]["egress_policy"] = "true"
cls.network_offering = NetworkOffering.create(cls.api_client,
cls.services["network_offering"],
cls.services["network_offering_egress_false"] = cls.services["network_offering"].copy()
cls.services["network_offering_egress_false"]["egress_policy"] = "false"
cls.logger.debug("Creating Network Offering with default egress TRUE")
cls.network_offering_egress_true = NetworkOffering.create(cls.api_client,
cls.services["network_offering_egress_true"],
conservemode=True)
cls.network_offering.update(cls.api_client, state='Enabled')
cls.network_offering_egress_true.update(cls.api_client, state='Enabled')
cls.network = Network.create(cls.api_client,
cls.services["network"],
accountid=cls.account.name,
domainid=cls.account.domainid,
networkofferingid=cls.network_offering.id,
zoneid=cls.zone.id)
cls.logger.debug("Creating Network Offering with default egress FALSE")
cls.network_offering_egress_false = NetworkOffering.create(cls.api_client,
cls.services["network_offering_egress_false"],
conservemode=True)
cls.vm_1 = VirtualMachine.create(cls.api_client,
cls.services["virtual_machine"],
templateid=template.id,
accountid=cls.account.name,
domainid=cls.domain.id,
serviceofferingid=cls.service_offering.id,
networkids=[str(cls.network.id)])
cls.network_offering_egress_false.update(cls.api_client, state='Enabled')
cls.services["egress_80"] = {
"startport": 80,
"endport": 80,
"protocol": "TCP",
"cidrlist": ["0.0.0.0/0"]
}
cls._cleanup = [
cls.vm_1,
cls.network,
cls.network_offering,
cls.network_offering_egress_true,
cls.network_offering_egress_false,
cls.service_offering,
cls.account
]
cls.logger = logging.getLogger('TestIsolatedNetworks')
cls.stream_handler = logging.StreamHandler()
cls.logger.setLevel(logging.DEBUG)
cls.logger.addHandler(cls.stream_handler)
return
@classmethod
@ -354,11 +568,40 @@ class TestIsolatedNetworks(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.cleanup = []
return
def tearDown(self):
try:
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
@attr(tags=["advanced", "advancedns", "ssh"], required_hardware="true")
def test_isolate_network_FW_PF_default_routes(self):
"""Stop existing router, add a PF rule and check we can access the VM """
def test_01_isolate_network_FW_PF_default_routes_egress_true(self):
""" Test redundant router internals """
self.logger.debug("Starting test_01_isolate_network_FW_PF_default_routes_egress_true...")
self.logger.debug("Creating Network with Network Offering ID %s" % self.network_offering_egress_true.id)
network = Network.create(self.apiclient,
self.services["network"],
accountid=self.account.name,
domainid=self.account.domainid,
networkofferingid=self.network_offering_egress_true.id,
zoneid=self.zone.id)
self.logger.debug("Creating Virtual Machine on Network %s" % network.id)
virtual_machine = VirtualMachine.create(self.apiclient,
self.services["virtual_machine"],
templateid=self.template.id,
accountid=self.account.name,
domainid=self.domain.id,
serviceofferingid=self.service_offering.id,
networkids=[str(network.id)])
self.cleanup.insert(0, network)
self.cleanup.insert(0, virtual_machine)
self.logger.debug("Starting test_isolate_network_FW_PF_default_routes...")
routers = list_routers(
@ -402,7 +645,7 @@ class TestIsolatedNetworks(cloudstackTestCase):
public_ip = public_ips[0]
self.logger.debug("Creating Firewall rule for VM ID: %s" % self.vm_1.id)
self.logger.debug("Creating Firewall rule for VM ID: %s" % virtual_machine.id)
FireWallRule.create(
self.apiclient,
ipaddressid=public_ip.id,
@ -412,11 +655,11 @@ class TestIsolatedNetworks(cloudstackTestCase):
endport=self.services["natrule"]["publicport"]
)
self.logger.debug("Creating NAT rule for VM ID: %s" % self.vm_1.id)
self.logger.debug("Creating NAT rule for VM ID: %s" % virtual_machine.id)
# Create NAT rule
nat_rule = NATRule.create(
self.apiclient,
self.vm_1,
virtual_machine,
self.services["natrule"],
public_ip.id
)
@ -436,20 +679,192 @@ class TestIsolatedNetworks(cloudstackTestCase):
"Check list port forwarding rules"
)
result = 'failed'
try:
ssh_command = "ping -c 3 8.8.8.8"
self.logger.debug("SSH into VM with IP: %s" % nat_rule.ipaddress)
ssh = self.vm_1.get_ssh_client(ipaddress=nat_rule.ipaddress, port=self.services["natrule"]["publicport"], retries=5)
result = str(ssh.execute(ssh_command))
self.logger.debug("SSH result: %s; COUNT is ==> %s" % (result, result.count("3 packets received")))
except:
self.fail("Failed to SSH into VM - %s" % (nat_rule.ipaddress))
# Test SSH after closing port 22
expected = 1
ssh_command = "ping -c 3 8.8.8.8"
check_string = "3 packets received"
result = check_router_command(virtual_machine, nat_rule.ipaddress, ssh_command, check_string, self)
self.assertEqual(
result.count("3 packets received"),
1,
"Ping to outside world from VM should be successful"
result,
expected,
"Ping to outside world from VM should be successful!"
)
expected = 1
ssh_command = "wget -t 1 -T 5 www.google.com"
check_string = "HTTP request sent, awaiting response... 200 OK"
result = check_router_command(virtual_machine, nat_rule.ipaddress, ssh_command, check_string, self)
self.assertEqual(
result,
expected,
"Attempt to retrieve google.com index page should be successful!"
)
EgressFireWallRule.create(
self.apiclient,
networkid=network.id,
protocol=self.services["egress_80"]["protocol"],
startport=self.services["egress_80"]["startport"],
endport=self.services["egress_80"]["endport"],
cidrlist=self.services["egress_80"]["cidrlist"]
)
expected = 0
ssh_command = "wget -t 1 -T 1 www.google.com"
check_string = "HTTP request sent, awaiting response... 200 OK"
result = check_router_command(virtual_machine, nat_rule.ipaddress, ssh_command, check_string, self)
self.assertEqual(
result,
expected,
"Attempt to retrieve google.com index page should NOT be successful once rule is added!"
)
return
@attr(tags=["advanced", "advancedns", "ssh"], required_hardware="true")
def test_02_isolate_network_FW_PF_default_routes_egress_false(self):
""" Test redundant router internals """
self.logger.debug("Starting test_02_isolate_network_FW_PF_default_routes_egress_false...")
self.logger.debug("Creating Network with Network Offering ID %s" % self.network_offering_egress_false.id)
network = Network.create(self.apiclient,
self.services["network"],
accountid=self.account.name,
domainid=self.account.domainid,
networkofferingid=self.network_offering_egress_false.id,
zoneid=self.zone.id)
self.logger.debug("Creating Virtual Machine on Network %s" % network.id)
virtual_machine = VirtualMachine.create(self.apiclient,
self.services["virtual_machine"],
templateid=self.template.id,
accountid=self.account.name,
domainid=self.domain.id,
serviceofferingid=self.service_offering.id,
networkids=[str(network.id)])
self.cleanup.insert(0, network)
self.cleanup.insert(0, virtual_machine)
self.logger.debug("Starting test_isolate_network_FW_PF_default_routes...")
routers = list_routers(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(routers, list),
True,
"Check for list routers response return valid data"
)
self.assertNotEqual(
len(routers),
0,
"Check list router response"
)
router = routers[0]
self.assertEqual(
router.state,
'Running',
"Check list router response for router state"
)
public_ips = list_publicIP(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid,
zoneid=self.zone.id
)
self.assertEqual(
isinstance(public_ips, list),
True,
"Check for list public IPs response return valid data"
)
public_ip = public_ips[0]
self.logger.debug("Creating Firewall rule for VM ID: %s" % virtual_machine.id)
FireWallRule.create(
self.apiclient,
ipaddressid=public_ip.id,
protocol=self.services["natrule"]["protocol"],
cidrlist=['0.0.0.0/0'],
startport=self.services["natrule"]["publicport"],
endport=self.services["natrule"]["publicport"]
)
self.logger.debug("Creating NAT rule for VM ID: %s" % virtual_machine.id)
# Create NAT rule
nat_rule = NATRule.create(
self.apiclient,
virtual_machine,
self.services["natrule"],
public_ip.id
)
nat_rules = list_nat_rules(
self.apiclient,
id=nat_rule.id
)
self.assertEqual(
isinstance(nat_rules, list),
True,
"Check for list NAT rules response return valid data"
)
self.assertEqual(
nat_rules[0].state,
'Active',
"Check list port forwarding rules"
)
expected = 0
ssh_command = "ping -c 3 8.8.8.8"
check_string = "3 packets received"
result = check_router_command(virtual_machine, nat_rule.ipaddress, ssh_command, check_string, self)
self.assertEqual(
result,
expected,
"Ping to outside world from VM should NOT be successful"
)
expected = 0
ssh_command = "wget -t 1 -T 1 www.google.com"
check_string = "HTTP request sent, awaiting response... 200 OK"
result = check_router_command(virtual_machine, nat_rule.ipaddress, ssh_command, check_string, self)
self.assertEqual(
result,
expected,
"Attempt to retrieve google.com index page should NOT be successful"
)
EgressFireWallRule.create(
self.apiclient,
networkid=network.id,
protocol=self.services["egress_80"]["protocol"],
startport=self.services["egress_80"]["startport"],
endport=self.services["egress_80"]["endport"],
cidrlist=self.services["egress_80"]["cidrlist"]
)
expected = 1
ssh_command = "wget -t 1 -T 5 www.google.com"
check_string = "HTTP request sent, awaiting response... 200 OK"
result = check_router_command(virtual_machine, nat_rule.ipaddress, ssh_command, check_string, self)
self.assertEqual(
result,
expected,
"Attempt to retrieve google.com index page should be successful once rule is added!"
)
return

View File

@ -715,6 +715,12 @@ test_data = {
"publicport": 22,
"protocol": "TCP"
},
"egress_80": {
"startport": 80,
"endport": 80,
"protocol": "TCP",
"cidrlist": ["0.0.0.0/0"]
},
"lbrule": {
"name": "SSH",
"alg": "roundrobin",