diff --git a/test/integration/component/test_egress_rules.py b/test/integration/component/test_egress_rules.py index 590f72a7cd3..a2443d49402 100644 --- a/test/integration/component/test_egress_rules.py +++ b/test/integration/component/test_egress_rules.py @@ -18,19 +18,18 @@ """ P1 for Egresss & Ingress rules """ #Import Local Modules -import marvin from nose.plugins.attrib import attr -from marvin.cloudstackTestCase import * -from marvin.cloudstackAPI import * -from marvin.sshClient import SshClient -from marvin.integration.lib.utils import * -from marvin.integration.lib.base import * -from marvin.integration.lib.common import * - -#Import System modules -import time -import subprocess - +from marvin.cloudstackTestCase import cloudstackTestCase +from marvin.integration.lib.utils import (random_gen, + cleanup_resources) +from marvin.integration.lib.base import (SecurityGroup, + VirtualMachine, + Account, + ServiceOffering) +from marvin.integration.lib.common import (get_domain, + get_zone, + get_template, + list_virtual_machines) class Services: """Test Security groups Services @@ -78,6 +77,12 @@ class Services: "endport": 22, "cidrlist": '0.0.0.0/0', }, + "egress_icmp": { + "protocol": 'ICMP', + "icmptype": '-1', + "icmpcode": '-1', + "cidrlist": '0.0.0.0/0', + }, "sg_invalid_port": { "name": 'SSH', "protocol": 'TCP', @@ -124,7 +129,6 @@ class Services: "timeout": 10, } - class TestDefaultSecurityGroupEgress(cloudstackTestCase): def setUp(self): @@ -283,7 +287,6 @@ class TestDefaultSecurityGroupEgress(cloudstackTestCase): ) return - class TestAuthorizeIngressRule(cloudstackTestCase): def setUp(self): @@ -567,14 +570,12 @@ class TestDefaultGroupEgress(cloudstackTestCase): "Check ingress rule created properly" ) - ssh_rule = (ingress_rule["ingressrule"][0]).__dict__ - # Authorize Security group to SSH to VM self.debug("Authorizing egress rule for sec group ID: %s for ssh access" % security_group.id) egress_rule = security_group.authorizeEgress( self.apiclient, - self.services["security_group"], + self.services["egress_icmp"], account=self.account.name, domainid=self.account.domainid ) @@ -584,7 +585,6 @@ class TestDefaultGroupEgress(cloudstackTestCase): True, "Check egress rule created properly" ) - ssh_egress_rule = (egress_rule["egressrule"][0]).__dict__ self.virtual_machine = VirtualMachine.create( self.apiclient, @@ -768,8 +768,6 @@ class TestDefaultGroupEgressAfterDeploy(cloudstackTestCase): "Check ingress rule created properly" ) - ssh_rule = (ingress_rule["ingressrule"][0]).__dict__ - self.virtual_machine = VirtualMachine.create( self.apiclient, self.services["virtual_machine"], @@ -786,7 +784,7 @@ class TestDefaultGroupEgressAfterDeploy(cloudstackTestCase): % security_group.id) egress_rule = security_group.authorizeEgress( self.apiclient, - self.services["security_group"], + self.services["egress_icmp"], account=self.account.name, domainid=self.account.domainid ) @@ -796,7 +794,6 @@ class TestDefaultGroupEgressAfterDeploy(cloudstackTestCase): True, "Check egress rule created properly" ) - ssh_egress_rule = (egress_rule["egressrule"][0]).__dict__ # Should be able to SSH VM try: @@ -825,7 +822,6 @@ class TestDefaultGroupEgressAfterDeploy(cloudstackTestCase): ) return - class TestRevokeEgressRule(cloudstackTestCase): def setUp(self): @@ -954,13 +950,29 @@ class TestRevokeEgressRule(cloudstackTestCase): "Check ingress rule created properly" ) - ssh_rule = (ingress_rule["ingressrule"][0]).__dict__ - - # Authorize Security group to SSH to VM + # Authorize Security group to ping outside world self.debug( - "Authorizing egress rule for sec group ID: %s for ssh access" + "Authorizing egress rule with ICMP protocol for sec group ID: %s for ssh access" % security_group.id) - egress_rule = security_group.authorizeEgress( + egress_rule_icmp = security_group.authorizeEgress( + self.apiclient, + self.services["egress_icmp"], + account=self.account.name, + domainid=self.account.domainid + ) + + self.assertEqual( + isinstance(egress_rule_icmp, dict), + True, + "Check egress rule created properly" + ) + ssh_egress_rule_icmp = (egress_rule_icmp["egressrule"][0]).__dict__ + + # Authorize Security group to SSH to other VM + self.debug( + "Authorizing egress rule with TCP protocol for sec group ID: %s for ssh access" + % security_group.id) + egress_rule_tcp = security_group.authorizeEgress( self.apiclient, self.services["security_group"], account=self.account.name, @@ -968,11 +980,11 @@ class TestRevokeEgressRule(cloudstackTestCase): ) self.assertEqual( - isinstance(egress_rule, dict), + isinstance(egress_rule_tcp, dict), True, "Check egress rule created properly" ) - ssh_egress_rule = (egress_rule["egressrule"][0]).__dict__ + ssh_egress_rule_tcp = (egress_rule_tcp["egressrule"][0]).__dict__ self.virtual_machine = VirtualMachine.create( self.apiclient, @@ -1030,7 +1042,7 @@ class TestRevokeEgressRule(cloudstackTestCase): ) self.debug( - "Revoke Egress Rule for Security Group %s for account: %s" \ + "Revoke Egress Rules for Security Group %s for account: %s" \ % ( security_group.id, self.account.name @@ -1038,9 +1050,15 @@ class TestRevokeEgressRule(cloudstackTestCase): result = security_group.revokeEgress( self.apiclient, - id=ssh_egress_rule["ruleid"] + id=ssh_egress_rule_icmp["ruleid"] ) - self.debug("Revoke egress rule result: %s" % result) + self.debug("Revoked egress rule result: %s" % result) + + result = security_group.revokeEgress( + self.apiclient, + id=ssh_egress_rule_tcp["ruleid"] + ) + self.debug("Revoked egress rule result: %s" % result) # Should be able to SSH VM try: @@ -1062,9 +1080,9 @@ class TestRevokeEgressRule(cloudstackTestCase): result = str(res) self.assertEqual( - result.count("0 received"), + result.count("1 received"), 1, - "Ping to outside world from VM should fail" + "Ping to outside world from VM should be successful" ) try: @@ -1087,7 +1105,6 @@ class TestRevokeEgressRule(cloudstackTestCase): ) return - class TestInvalidAccountAuthroize(cloudstackTestCase): def setUp(self): @@ -1201,7 +1218,7 @@ class TestInvalidAccountAuthroize(cloudstackTestCase): "Authorizing egress rule for sec group ID: %s for ssh access" % security_group.id) with self.assertRaises(Exception): - egress_rule = security_group.authorizeEgress( + security_group.authorizeEgress( self.apiclient, self.services["security_group"], account=random_gen(), @@ -1209,7 +1226,6 @@ class TestInvalidAccountAuthroize(cloudstackTestCase): ) return - class TestMultipleAccountsEgressRuleNeg(cloudstackTestCase): def setUp(self): @@ -1350,7 +1366,7 @@ class TestMultipleAccountsEgressRuleNeg(cloudstackTestCase): True, "Check egress rule created properly" ) - ssh_egress_rule = (egress_rule["egressrule"][0]).__dict__ + # Authorize Security group to SSH to VM self.debug( @@ -1369,7 +1385,7 @@ class TestMultipleAccountsEgressRuleNeg(cloudstackTestCase): "Check ingress rule created properly" ) - ssh_rule = (ingress_rule["ingressrule"][0]).__dict__ + self.virtual_machineA = VirtualMachine.create( self.apiclient, @@ -1448,7 +1464,6 @@ class TestMultipleAccountsEgressRuleNeg(cloudstackTestCase): self.fail("SSH Access failed for %s: %s" % \ (self.virtual_machineA.ipaddress, e) ) - result = str(res) # SSH failure may result in one of the following three error messages ssh_failure_result_set = ["ssh: connect to host %s port 22: No route to host" % self.virtual_machineB.ssh_ip, @@ -1460,7 +1475,6 @@ class TestMultipleAccountsEgressRuleNeg(cloudstackTestCase): ) return - class TestMultipleAccountsEgressRule(cloudstackTestCase): def setUp(self): @@ -1628,7 +1642,6 @@ class TestMultipleAccountsEgressRule(cloudstackTestCase): True, "Check egress rule created properly" ) - ssh_egress_rule = (egress_rule["egressrule"][0]).__dict__ # Authorize Security group to SSH to VM self.debug( @@ -1647,8 +1660,6 @@ class TestMultipleAccountsEgressRule(cloudstackTestCase): "Check ingress rule created properly" ) - ssh_ruleA = (ingress_ruleA["ingressrule"][0]).__dict__ - self.virtual_machineA = VirtualMachine.create( self.apiclient, self.services["virtual_machine"], @@ -1695,8 +1706,6 @@ class TestMultipleAccountsEgressRule(cloudstackTestCase): "Check ingress rule created properly" ) - ssh_ruleB = (ingress_ruleB["ingressrule"][0]).__dict__ - self.virtual_machineB = VirtualMachine.create( self.apiclient, self.services["virtual_machine"], @@ -1757,7 +1766,6 @@ class TestMultipleAccountsEgressRule(cloudstackTestCase): ) return - class TestStartStopVMWithEgressRule(cloudstackTestCase): def setUp(self): @@ -1884,8 +1892,6 @@ class TestStartStopVMWithEgressRule(cloudstackTestCase): "Check ingress rule created properly" ) - ssh_rule = (ingress_rule["ingressrule"][0]).__dict__ - self.virtual_machine = VirtualMachine.create( self.apiclient, self.services["virtual_machine"], @@ -1912,7 +1918,6 @@ class TestStartStopVMWithEgressRule(cloudstackTestCase): True, "Check egress rule created properly" ) - ssh_egress_rule = (egress_rule["egressrule"][0]).__dict__ # Stop virtual machine self.debug("Stopping virtual machine: %s" % self.virtual_machine.id) @@ -1961,14 +1966,13 @@ class TestStartStopVMWithEgressRule(cloudstackTestCase): # Should be able to SSH VM try: self.debug("SSH into VM: %s" % self.virtual_machine.ssh_ip) - ssh = self.virtual_machine.get_ssh_client() + self.virtual_machine.get_ssh_client() except Exception as e: self.fail("SSH Access failed for %s: %s" % \ (self.virtual_machine.ipaddress, e) ) return - class TestInvalidParametersForEgress(cloudstackTestCase): def setUp(self): @@ -2084,7 +2088,7 @@ class TestInvalidParametersForEgress(cloudstackTestCase): "Authorizing egress rule for sec group ID: %s with invalid port" % security_group.id) with self.assertRaises(Exception): - egress_rule = security_group.authorizeEgress( + security_group.authorizeEgress( self.apiclient, self.services["sg_invalid_port"], account=self.account.name, @@ -2094,7 +2098,7 @@ class TestInvalidParametersForEgress(cloudstackTestCase): "Authorizing egress rule for sec group ID: %s with invalid cidr" % security_group.id) with self.assertRaises(Exception): - egress_rule = security_group.authorizeEgress( + security_group.authorizeEgress( self.apiclient, self.services["sg_invalid_cidr"], account=self.account.name, @@ -2104,7 +2108,7 @@ class TestInvalidParametersForEgress(cloudstackTestCase): "Authorizing egress rule for sec group ID: %s with invalid account" % security_group.id) with self.assertRaises(Exception): - egress_rule = security_group.authorizeEgress( + security_group.authorizeEgress( self.apiclient, self.services["security_group"], account=random_gen(),