# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """ P1 for Security groups """ #Import Local Modules import marvin from nose.plugins.attrib import attr from marvin.cloudstackTestCase import * from marvin.cloudstackAPI import * from marvin.lib.utils import * from marvin.lib.base import * from marvin.lib.common import * from marvin.sshClient import SshClient #Import System modules import time import subprocess class Services: """Test Security groups Services """ def __init__(self): self.services = { "disk_offering": { "displaytext": "Small", "name": "Small", "disksize": 1 }, "account": { "email": "test@test.com", "firstname": "Test", "lastname": "User", "username": "test", # Random characters are appended in create account to # ensure unique username generated each time "password": "password", }, "virtual_machine": { # Create a small virtual machine instance with disk offering "displayname": "Test VM", "username": "root", # VM creds for SSH "password": "password", "ssh_port": 22, "hypervisor": 'XenServer', "privateport": 22, "publicport": 22, "protocol": 'TCP', "userdata": 'This is sample data', }, "host": { "publicport": 22, "username": "root", # Host creds for SSH "password": "password", }, "service_offering": { "name": "Tiny Instance", "displaytext": "Tiny Instance", "cpunumber": 1, "cpuspeed": 100, # in MHz "memory": 128, # In MBs }, "security_group": { "name": 'SSH', "protocol": 'TCP', "startport": 22, "endport": 22, "cidrlist": '0.0.0.0/0', }, "security_group_2": { "name": 'ICMP', "protocol": 'ICMP', "startport": -1, "endport": -1, "cidrlist": '0.0.0.0/0', }, "ostype": 'CentOS 5.3 (64-bit)', # CentOS 5.3 (64-bit) "sleep": 60, "timeout": 10, } class TestDefaultSecurityGroup(cloudstackTestCase): def setUp(self): self.apiclient = self.testClient.getApiClient() self.dbclient = self.testClient.getDbConnection() self.cleanup = [] return def tearDown(self): try: #Clean up, terminate the created templates cleanup_resources(self.apiclient, self.cleanup) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) return @classmethod def setUpClass(cls): cls.services = Services().services cls.api_client = super(TestDefaultSecurityGroup, cls).getClsTestClient().getApiClient() # Get Zone, Domain and templates cls.domain = get_domain(cls.api_client, cls.services) cls.zone = get_zone(cls.api_client, cls.services) cls.services['mode'] = cls.zone.networktype template = get_template( cls.api_client, cls.zone.id, cls.services["ostype"] ) cls.services["domainid"] = cls.domain.id cls.services["virtual_machine"]["zoneid"] = cls.zone.id cls.services["virtual_machine"]["template"] = template.id cls.service_offering = ServiceOffering.create( cls.api_client, cls.services["service_offering"] ) cls.account = Account.create( cls.api_client, cls.services["account"], admin=True, domainid=cls.domain.id ) cls.services["account"] = cls.account.name cls._cleanup = [ cls.account, cls.service_offering ] return @classmethod def tearDownClass(cls): try: cls.api_client = super(TestDefaultSecurityGroup, cls).getClsTestClient().getApiClient() #Cleanup resources used cleanup_resources(cls.api_client, cls._cleanup) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) return @attr(tags = ["sg", "eip", "advancedsg"]) def test_01_deployVM_InDefaultSecurityGroup(self): """Test deploy VM in default security group """ # Validate the following: # 1. deploy Virtual machine using admin user # 2. listVM should show a VM in Running state # 3. listRouters should show one router running self.virtual_machine = VirtualMachine.create( self.apiclient, self.services["virtual_machine"], accountid=self.account.name, domainid=self.account.domainid, serviceofferingid=self.service_offering.id ) self.debug("Deployed VM with ID: %s" % self.virtual_machine.id) self.cleanup.append(self.virtual_machine) list_vm_response = list_virtual_machines( self.apiclient, id=self.virtual_machine.id ) self.debug( "Verify listVirtualMachines response for virtual machine: %s" \ % self.virtual_machine.id ) self.assertEqual( isinstance(list_vm_response, list), True, "Check for list VM response" ) vm_response = list_vm_response[0] self.assertNotEqual( len(list_vm_response), 0, "Check VM available in List Virtual Machines" ) self.assertEqual( vm_response.id, self.virtual_machine.id, "Check virtual machine id in listVirtualMachines" ) self.assertEqual( vm_response.displayname, self.virtual_machine.displayname, "Check virtual machine displayname in listVirtualMachines" ) # Verify List Routers response for account self.debug( "Verify list routers response for account: %s" \ % self.account.name ) routers = list_routers( self.apiclient, zoneid=self.zone.id, listall=True ) self.assertEqual( isinstance(routers, list), True, "Check for list Routers response" ) self.debug("Router Response: %s" % routers) self.assertEqual( len(routers), 1, "Check virtual router is created for account or not" ) return @attr(tags = ["sg", "eip", "advancedsg"]) def test_02_listSecurityGroups(self): """Test list security groups for admin account """ # Validate the following: # 1. listSecurityGroups in admin account # 2. There should be one security group (default) listed for the admin account # 3. No Ingress Rules should be part of the default security group sercurity_groups = SecurityGroup.list( self.apiclient, account=self.account.name, domainid=self.account.domainid ) self.assertEqual( isinstance(sercurity_groups, list), True, "Check for list security groups response" ) self.assertNotEqual( len(sercurity_groups), 0, "Check List Security groups response" ) self.debug("List Security groups response: %s" % str(sercurity_groups)) self.assertEqual( hasattr(sercurity_groups, 'ingressrule'), False, "Check ingress rule attribute for default security group" ) return @attr(tags = ["sg", "eip", "advancedsg"]) def test_03_accessInDefaultSecurityGroup(self): """Test access in default security group """ # Validate the following: # 1. deploy Virtual machine using admin user # 2. listVM should show a VM in Running state # 3. listRouters should show one router running self.virtual_machine = VirtualMachine.create( self.apiclient, self.services["virtual_machine"], accountid=self.account.name, domainid=self.account.domainid, serviceofferingid=self.service_offering.id ) self.debug("Deployed VM with ID: %s" % self.virtual_machine.id) self.cleanup.append(self.virtual_machine) list_vm_response = list_virtual_machines( self.apiclient, id=self.virtual_machine.id ) self.assertEqual( isinstance(list_vm_response, list), True, "Check for list VM response" ) self.debug( "Verify listVirtualMachines response for virtual machine: %s" \ % self.virtual_machine.id ) vm_response = list_vm_response[0] self.assertNotEqual( len(list_vm_response), 0, "Check VM available in List Virtual Machines" ) self.assertEqual( vm_response.id, self.virtual_machine.id, "Check virtual machine id in listVirtualMachines" ) self.assertEqual( vm_response.displayname, self.virtual_machine.displayname, "Check virtual machine displayname in listVirtualMachines" ) # Default Security group should not have any ingress rule sercurity_groups = SecurityGroup.list( self.apiclient, account=self.account.name, domainid=self.account.domainid ) self.assertEqual( isinstance(sercurity_groups, list), True, "Check for list security groups response" ) self.debug("List Security groups response: %s" % str(sercurity_groups)) self.assertNotEqual( len(sercurity_groups), 0, "Check List Security groups response" ) self.assertEqual( hasattr(sercurity_groups, 'ingressrule'), False, "Check ingress rule attribute for default security group" ) # SSH Attempt to VM should fail with self.assertRaises(Exception): self.debug("SSH into VM: %s" % self.virtual_machine.ssh_ip) ssh = SshClient( self.virtual_machine.ssh_ip, self.virtual_machine.ssh_port, self.virtual_machine.username, self.virtual_machine.password ) return class TestAuthorizeIngressRule(cloudstackTestCase): def setUp(self): self.apiclient = self.testClient.getApiClient() self.dbclient = self.testClient.getDbConnection() self.cleanup = [] return def tearDown(self): try: #Clean up, terminate the created templates cleanup_resources(self.apiclient, self.cleanup) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) return @classmethod def setUpClass(cls): cls.services = Services().services cls.api_client = super(TestAuthorizeIngressRule, cls).getClsTestClient().getApiClient() # Get Zone, Domain and templates cls.domain = get_domain(cls.api_client, cls.services) cls.zone = get_zone(cls.api_client, cls.services) cls.services['mode'] = cls.zone.networktype template = get_template( cls.api_client, cls.zone.id, cls.services["ostype"] ) cls.services["domainid"] = cls.domain.id cls.services["virtual_machine"]["zoneid"] = cls.zone.id cls.services["virtual_machine"]["template"] = template.id cls.service_offering = ServiceOffering.create( cls.api_client, cls.services["service_offering"] ) cls.account = Account.create( cls.api_client, cls.services["account"], domainid=cls.domain.id ) cls.services["account"] = cls.account.name cls._cleanup = [ cls.account, cls.service_offering ] return @classmethod def tearDownClass(cls): try: cls.api_client = super(TestAuthorizeIngressRule, cls).getClsTestClient().getApiClient() #Cleanup resources used cleanup_resources(cls.api_client, cls._cleanup) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) return @attr(tags = ["sg", "eip", "advancedsg"]) def test_01_authorizeIngressRule(self): """Test authorize ingress rule """ # Validate the following: #1. Create Security group for the account. #2. Createsecuritygroup (ssh-incoming) for this account #3. authorizeSecurityGroupIngress to allow ssh access to the VM #4. deployVirtualMachine into this security group (ssh-incoming) security_group = SecurityGroup.create( self.apiclient, self.services["security_group"], account=self.account.name, domainid=self.account.domainid ) self.debug("Created security group with ID: %s" % security_group.id) # Default Security group should not have any ingress rule sercurity_groups = SecurityGroup.list( self.apiclient, account=self.account.name, domainid=self.account.domainid ) self.assertEqual( isinstance(sercurity_groups, list), True, "Check for list security groups response" ) self.assertEqual( len(sercurity_groups), 2, "Check List Security groups response" ) # Authorize Security group to SSH to VM ingress_rule = security_group.authorize( self.apiclient, self.services["security_group"], account=self.account.name, domainid=self.account.domainid ) self.assertEqual( isinstance(ingress_rule, dict), True, "Check ingress rule created properly" ) self.debug("Authorizing ingress rule for sec group ID: %s for ssh access" % security_group.id) self.virtual_machine = VirtualMachine.create( self.apiclient, self.services["virtual_machine"], accountid=self.account.name, domainid=self.account.domainid, serviceofferingid=self.service_offering.id, securitygroupids=[security_group.id] ) self.debug("Deploying VM in account: %s" % self.account.name) # Should be able to SSH VM try: self.debug("SSH into VM: %s" % self.virtual_machine.id) self.virtual_machine.get_ssh_client() except Exception as e: self.fail("SSH Access failed for %s: %s" % \ (self.virtual_machine.ipaddress, e) ) return class TestRevokeIngressRule(cloudstackTestCase): def setUp(self): self.apiclient = self.testClient.getApiClient() self.dbclient = self.testClient.getDbConnection() self.cleanup = [] return def tearDown(self): try: #Clean up, terminate the created templates cleanup_resources(self.apiclient, self.cleanup) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) return @classmethod def setUpClass(cls): cls.services = Services().services cls.api_client = super(TestRevokeIngressRule, cls).getClsTestClient().getApiClient() # Get Zone, Domain and templates cls.domain = get_domain(cls.api_client, cls.services) cls.zone = get_zone(cls.api_client, cls.services) cls.services['mode'] = cls.zone.networktype template = get_template( cls.api_client, cls.zone.id, cls.services["ostype"] ) cls.services["domainid"] = cls.domain.id cls.services["virtual_machine"]["zoneid"] = cls.zone.id cls.services["virtual_machine"]["template"] = template.id cls.service_offering = ServiceOffering.create( cls.api_client, cls.services["service_offering"] ) cls.account = Account.create( cls.api_client, cls.services["account"], domainid=cls.domain.id ) cls.services["account"] = cls.account.name cls._cleanup = [ cls.account, cls.service_offering ] return @classmethod def tearDownClass(cls): try: cls.api_client = super(TestRevokeIngressRule, cls).getClsTestClient().getApiClient() #Cleanup resources used cleanup_resources(cls.api_client, cls._cleanup) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) return @attr(tags = ["sg", "eip", "advancedsg"]) def test_01_revokeIngressRule(self): """Test revoke ingress rule """ # Validate the following: #1. Create Security group for the account. #2. Createsecuritygroup (ssh-incoming) for this account #3. authorizeSecurityGroupIngress to allow ssh access to the VM #4. deployVirtualMachine into this security group (ssh-incoming) #5. Revoke the ingress rule, SSH access should fail security_group = SecurityGroup.create( self.apiclient, self.services["security_group"], account=self.account.name, domainid=self.account.domainid ) self.debug("Created security group with ID: %s" % security_group.id) # Default Security group should not have any ingress rule sercurity_groups = SecurityGroup.list( self.apiclient, account=self.account.name, domainid=self.account.domainid ) self.assertEqual( isinstance(sercurity_groups, list), True, "Check for list security groups response" ) self.assertEqual( len(sercurity_groups), 2, "Check List Security groups response" ) # Authorize Security group to SSH to VM self.debug("Authorizing ingress rule for sec group ID: %s for ssh access" % security_group.id) ingress_rule = security_group.authorize( self.apiclient, self.services["security_group"], account=self.account.name, domainid=self.account.domainid ) self.assertEqual( isinstance(ingress_rule, dict), True, "Check ingress rule created properly" ) ssh_rule = (ingress_rule["ingressrule"][0]).__dict__ self.virtual_machine = VirtualMachine.create( self.apiclient, self.services["virtual_machine"], accountid=self.account.name, domainid=self.account.domainid, serviceofferingid=self.service_offering.id, securitygroupids=[security_group.id] ) self.debug("Deploying VM in account: %s" % self.account.name) # Should be able to SSH VM try: self.debug("SSH into VM: %s" % self.virtual_machine.id) self.virtual_machine.get_ssh_client() except Exception as e: self.fail("SSH Access failed for %s: %s" % \ (self.virtual_machine.ipaddress, e) ) self.debug("Revoking ingress rule for sec group ID: %s for ssh access" % security_group.id) # Revoke Security group to SSH to VM result = security_group.revoke( self.apiclient, id=ssh_rule["ruleid"] ) # SSH Attempt to VM should fail with self.assertRaises(Exception): self.debug("SSH into VM: %s" % self.virtual_machine.id) SshClient(self.virtual_machine.ssh_ip, self.virtual_machine.ssh_port, self.virtual_machine.username, self.virtual_machine.password ) return class TestDhcpOnlyRouter(cloudstackTestCase): def setUp(self): self.apiclient = self.testClient.getApiClient() self.dbclient = self.testClient.getDbConnection() self.cleanup = [] return def tearDown(self): try: #Clean up, terminate the created templates cleanup_resources(self.apiclient, self.cleanup) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) return @classmethod def setUpClass(cls): cls.services = Services().services cls.api_client = super(TestDhcpOnlyRouter, cls).getClsTestClient().getApiClient() # Get Zone, Domain and templates cls.domain = get_domain(cls.api_client, cls.services) cls.zone = get_zone(cls.api_client, cls.services) cls.services['mode'] = cls.zone.networktype template = get_template( cls.api_client, cls.zone.id, cls.services["ostype"] ) cls.services["domainid"] = cls.domain.id cls.services["virtual_machine"]["zoneid"] = cls.zone.id cls.services["virtual_machine"]["template"] = template.id cls.service_offering = ServiceOffering.create( cls.api_client, cls.services["service_offering"] ) cls.account = Account.create( cls.api_client, cls.services["account"], domainid=cls.domain.id ) cls.services["account"] = cls.account.name cls.virtual_machine = VirtualMachine.create( cls.api_client, cls.services["virtual_machine"], accountid=cls.account.name, domainid=cls.account.domainid, serviceofferingid=cls.service_offering.id ) cls._cleanup = [ cls.account, cls.service_offering ] return @classmethod def tearDownClass(cls): try: cls.api_client = super(TestDhcpOnlyRouter, cls).getClsTestClient().getApiClient() #Cleanup resources used cleanup_resources(cls.api_client, cls._cleanup) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) return @attr(tags = ["sg", "eip", "basic"]) def test_01_dhcpOnlyRouter(self): """Test router services for user account """ # Validate the following #1. List routers for any user account #2. The only service supported by this router should be dhcp # Find router associated with user account list_router_response = list_routers( self.apiclient, zoneid=self.zone.id, listall=True ) self.assertEqual( isinstance(list_router_response, list), True, "Check list response returns a valid list" ) router = list_router_response[0] hosts = list_hosts( self.apiclient, zoneid=router.zoneid, type='Routing', state='Up', id=router.hostid ) self.assertEqual( isinstance(hosts, list), True, "Check list host returns a valid list" ) host = hosts[0] self.debug("Router ID: %s, state: %s" % (router.id, router.state)) self.assertEqual( router.state, 'Running', "Check list router response for router state" ) result = get_process_status( host.ipaddress, self.services['host']["publicport"], self.services['host']["username"], self.services['host']["password"], router.linklocalip, "service dnsmasq status" ) res = str(result) self.debug("Dnsmasq process status: %s" % res) self.assertEqual( res.count("running"), 1, "Check dnsmasq service is running or not" ) return class TestdeployVMWithUserData(cloudstackTestCase): def setUp(self): self.apiclient = self.testClient.getApiClient() self.dbclient = self.testClient.getDbConnection() self.cleanup = [] return def tearDown(self): try: #Clean up, terminate the created templates cleanup_resources(self.apiclient, self.cleanup) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) return @classmethod def setUpClass(cls): cls.services = Services().services cls.api_client = super(TestdeployVMWithUserData, cls).getClsTestClient().getApiClient() # Get Zone, Domain and templates cls.domain = get_domain(cls.api_client, cls.services) cls.zone = get_zone(cls.api_client, cls.services) cls.services['mode'] = cls.zone.networktype template = get_template( cls.api_client, cls.zone.id, cls.services["ostype"] ) cls.services["domainid"] = cls.domain.id cls.services["virtual_machine"]["zoneid"] = cls.zone.id cls.services["virtual_machine"]["template"] = template.id cls.service_offering = ServiceOffering.create( cls.api_client, cls.services["service_offering"] ) cls.account = Account.create( cls.api_client, cls.services["account"], domainid=cls.domain.id ) cls.services["account"] = cls.account.name cls._cleanup = [ cls.account, cls.service_offering ] return @classmethod def tearDownClass(cls): try: cls.api_client = super(TestdeployVMWithUserData, cls).getClsTestClient().getApiClient() #Cleanup resources used cleanup_resources(cls.api_client, cls._cleanup) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) return @attr(tags = ["sg", "eip", "advancedsg"]) def test_01_deployVMWithUserData(self): """Test Deploy VM with User data""" # Validate the following # 1. CreateAccount of type user # 2. CreateSecurityGroup ssh-incoming # 3. authorizeIngressRule to allow ssh-access # 4. deployVirtualMachine into this group with some base64 encoded user-data # 5. wget http://10.1.1.1/latest/user-data to get the latest userdata from the # router for this VM # Find router associated with user account list_router_response = list_routers( self.apiclient, zoneid=self.zone.id, listall=True ) self.assertEqual( isinstance(list_router_response, list), True, "Check list response returns a valid list" ) router = list_router_response[0] security_group = SecurityGroup.create( self.apiclient, self.services["security_group"], account=self.account.name, domainid=self.account.domainid ) self.debug("Created security group with ID: %s" % security_group.id) # Default Security group should not have any ingress rule sercurity_groups = SecurityGroup.list( self.apiclient, account=self.account.name, domainid=self.account.domainid ) self.assertEqual( isinstance(sercurity_groups, list), True, "Check for list security groups response" ) self.assertEqual( len(sercurity_groups), 2, "Check List Security groups response" ) self.debug( "Authorize Ingress Rule for Security Group %s for account: %s" \ % ( security_group.id, self.account.name )) # Authorize Security group to SSH to VM ingress_rule = security_group.authorize( self.apiclient, self.services["security_group"], account=self.account.name, domainid=self.account.domainid ) self.assertEqual( isinstance(ingress_rule, dict), True, "Check ingress rule created properly" ) self.virtual_machine = VirtualMachine.create( self.apiclient, self.services["virtual_machine"], accountid=self.account.name, domainid=self.account.domainid, serviceofferingid=self.service_offering.id, securitygroupids=[security_group.id] ) self.debug("Deploying VM in account: %s" % self.account.name) # Should be able to SSH VM try: self.debug( "SSH to VM with IP Address: %s"\ % self.virtual_machine.ssh_ip ) ssh = self.virtual_machine.get_ssh_client() except Exception as e: self.fail("SSH Access failed for %s: %s" % \ (self.virtual_machine.ipaddress, e) ) cmds = [ "wget http://%s/latest/user-data" % router.guestipaddress, "cat user-data", ] for c in cmds: result = ssh.execute(c) self.debug("%s: %s" % (c, result)) res = str(result) self.assertEqual( res.count(self.services["virtual_machine"]["userdata"]), 1, "Verify user data" ) return class TestDeleteSecurityGroup(cloudstackTestCase): def setUp(self): self.apiclient = self.testClient.getApiClient() self.dbclient = self.testClient.getDbConnection() self.services = Services().services # Get Zone, Domain and templates self.domain = get_domain(self.apiclient, self.services) self.zone = get_zone(self.apiclient, self.services) self.services['mode'] = self.zone.networktype template = get_template( self.apiclient, self.zone.id, self.services["ostype"] ) self.services["domainid"] = self.domain.id self.services["virtual_machine"]["zoneid"] = self.zone.id self.services["virtual_machine"]["template"] = template.id self.service_offering = ServiceOffering.create( self.apiclient, self.services["service_offering"] ) self.account = Account.create( self.apiclient, self.services["account"], domainid=self.domain.id ) self.services["account"] = self.account.name self.cleanup = [ self.account, self.service_offering ] return def tearDown(self): try: #Clean up, terminate the created templates cleanup_resources(self.apiclient, self.cleanup) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) return @classmethod def setUpClass(cls): cls.services = Services().services cls.api_client = super(TestDeleteSecurityGroup, cls).getClsTestClient().getApiClient() cls._cleanup = [] return @classmethod def tearDownClass(cls): try: cls.api_client = super(TestDeleteSecurityGroup, cls).getClsTestClient().getApiClient() #Cleanup resources used cleanup_resources(cls.api_client, cls._cleanup) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) return @attr(tags = ["sg", "eip", "advancedsg"]) def test_01_delete_security_grp_running_vm(self): """Test delete security group with running VM""" # Validate the following # 1. createsecuritygroup (ssh-incoming) for this account # 2. authorizeSecurityGroupIngress to allow ssh access to the VM # 3. deployVirtualMachine into this security group (ssh-incoming) # 4. deleteSecurityGroup created in step 1. Deletion should fail # complaining there are running VMs in this group security_group = SecurityGroup.create( self.apiclient, self.services["security_group"], account=self.account.name, domainid=self.account.domainid ) self.debug("Created security group with ID: %s" % security_group.id) # Default Security group should not have any ingress rule sercurity_groups = SecurityGroup.list( self.apiclient, account=self.account.name, domainid=self.account.domainid ) self.assertEqual( isinstance(sercurity_groups, list), True, "Check for list security groups response" ) self.assertEqual( len(sercurity_groups), 2, "Check List Security groups response" ) self.debug( "Authorize Ingress Rule for Security Group %s for account: %s" \ % ( security_group.id, self.account.name )) # Authorize Security group to SSH to VM ingress_rule = security_group.authorize( self.apiclient, self.services["security_group"], account=self.account.name, domainid=self.account.domainid ) self.assertEqual( isinstance(ingress_rule, dict), True, "Check ingress rule created properly" ) self.virtual_machine = VirtualMachine.create( self.apiclient, self.services["virtual_machine"], accountid=self.account.name, domainid=self.account.domainid, serviceofferingid=self.service_offering.id, securitygroupids=[security_group.id] ) self.debug("Deploying VM in account: %s" % self.account.name) # Deleting Security group should raise exception with self.assertRaises(Exception): security_group.delete(self.apiclient) #sleep to ensure that Security group is deleted properly time.sleep(self.services["sleep"]) # Default Security group should not have any ingress rule sercurity_groups = SecurityGroup.list( self.apiclient, id=security_group.id ) self.assertNotEqual( sercurity_groups, None, "Check List Security groups response" ) return @attr(tags = ["sg", "eip", "advancedsg"]) def test_02_delete_security_grp_withoout_running_vm(self): """Test delete security group without running VM""" # Validate the following # 1. createsecuritygroup (ssh-incoming) for this account # 2. authorizeSecurityGroupIngress to allow ssh access to the VM # 3. deployVirtualMachine into this security group (ssh-incoming) # 4. deleteSecurityGroup created in step 1. Deletion should fail # complaining there are running VMs in this group security_group = SecurityGroup.create( self.apiclient, self.services["security_group"], account=self.account.name, domainid=self.account.domainid ) self.debug("Created security group with ID: %s" % security_group.id) # Default Security group should not have any ingress rule sercurity_groups = SecurityGroup.list( self.apiclient, account=self.account.name, domainid=self.account.domainid ) self.assertEqual( isinstance(sercurity_groups, list), True, "Check for list security groups response" ) self.assertEqual( len(sercurity_groups), 2, "Check List Security groups response" ) self.debug( "Authorize Ingress Rule for Security Group %s for account: %s" \ % ( security_group.id, self.account.name )) # Authorize Security group to SSH to VM ingress_rule = security_group.authorize( self.apiclient, self.services["security_group"], account=self.account.name, domainid=self.account.domainid ) self.assertEqual( isinstance(ingress_rule, dict), True, "Check ingress rule created properly" ) self.virtual_machine = VirtualMachine.create( self.apiclient, self.services["virtual_machine"], accountid=self.account.name, domainid=self.account.domainid, serviceofferingid=self.service_offering.id, securitygroupids=[security_group.id] ) self.debug("Deploying VM in account: %s" % self.account.name) # Destroy the VM self.virtual_machine.delete(self.apiclient) config = list_configurations( self.apiclient, name='expunge.delay' ) self.assertEqual( isinstance(config, list), True, "Check list configurations response" ) response = config[0] self.debug("expunge.delay: %s" % response.value) # Wait for some time more than expunge.delay time.sleep(int(response.value) * 2) # Deleting Security group should raise exception try: self.debug("Deleting Security Group: %s" % security_group.id) security_group.delete(self.apiclient) except Exception as e: self.fail("Failed to delete security group - ID: %s" \ % security_group.id ) return class TestIngressRule(cloudstackTestCase): def setUp(self): self.apiclient = self.testClient.getApiClient() self.dbclient = self.testClient.getDbConnection() self.cleanup = [] self.services = Services().services # Get Zone, Domain and templates self.domain = get_domain(self.apiclient, self.services) self.zone = get_zone(self.apiclient, self.services) self.services['mode'] = self.zone.networktype template = get_template( self.apiclient, self.zone.id, self.services["ostype"] ) self.services["domainid"] = self.domain.id self.services["virtual_machine"]["zoneid"] = self.zone.id self.services["virtual_machine"]["template"] = template.id self.service_offering = ServiceOffering.create( self.apiclient, self.services["service_offering"] ) self.account = Account.create( self.apiclient, self.services["account"], domainid=self.domain.id ) self.services["account"] = self.account.name self.cleanup = [ self.account, self.service_offering ] return def tearDown(self): try: #Clean up, terminate the created templates cleanup_resources(self.apiclient, self.cleanup) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) return @classmethod def setUpClass(cls): cls.services = Services().services cls.api_client = super(TestIngressRule, cls).getClsTestClient().getApiClient() cls._cleanup = [] return @classmethod def tearDownClass(cls): try: cls.api_client = super(TestIngressRule, cls).getClsTestClient().getApiClient() #Cleanup resources used cleanup_resources(cls.api_client, cls._cleanup) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) return @attr(tags = ["sg", "eip", "advancedsg"]) def test_01_authorizeIngressRule_AfterDeployVM(self): """Test delete security group with running VM""" # Validate the following # 1. createsecuritygroup (ssh-incoming, 22via22) for this account # 2. authorizeSecurityGroupIngress to allow ssh access to the VM # 3. deployVirtualMachine into this security group (ssh-incoming) # 4. authorizeSecurityGroupIngress to allow ssh access (startport:222 to # endport:22) to the VM security_group = SecurityGroup.create( self.apiclient, self.services["security_group"], account=self.account.name, domainid=self.account.domainid ) self.debug("Created security group with ID: %s" % security_group.id) # Default Security group should not have any ingress rule sercurity_groups = SecurityGroup.list( self.apiclient, account=self.account.name, domainid=self.account.domainid ) self.assertEqual( isinstance(sercurity_groups, list), True, "Check for list security groups response" ) self.assertEqual( len(sercurity_groups), 2, "Check List Security groups response" ) self.debug( "Authorize Ingress Rule for Security Group %s for account: %s" \ % ( security_group.id, self.account.name )) # Authorize Security group to SSH to VM ingress_rule_1 = security_group.authorize( self.apiclient, self.services["security_group"], account=self.account.name, domainid=self.account.domainid ) self.assertEqual( isinstance(ingress_rule_1, dict), True, "Check ingress rule created properly" ) self.virtual_machine = VirtualMachine.create( self.apiclient, self.services["virtual_machine"], accountid=self.account.name, domainid=self.account.domainid, serviceofferingid=self.service_offering.id, securitygroupids=[security_group.id] ) self.debug("Deploying VM in account: %s" % self.account.name) self.debug( "Authorize Ingress Rule for Security Group %s for account: %s" \ % ( security_group.id, self.account.name )) # Authorize Security group to SSH to VM ingress_rule_2 = security_group.authorize( self.apiclient, self.services["security_group_2"], account=self.account.name, domainid=self.account.domainid ) self.assertEqual( isinstance(ingress_rule_2, dict), True, "Check ingress rule created properly" ) # SSH should be allowed on 22 & 2222 ports try: self.debug("Trying to SSH into VM %s on port %s" % ( self.virtual_machine.ssh_ip, self.services["security_group"]["endport"] )) self.virtual_machine.get_ssh_client() except Exception as e: self.fail("SSH access failed for ingress rule ID: %s, %s" \ % (ingress_rule_1["id"], e)) # User should be able to ping VM try: self.debug("Trying to ping VM %s" % self.virtual_machine.ssh_ip) result = subprocess.call(['ping', '-c 1', self.virtual_machine.ssh_ip]) self.debug("Ping result: %s" % result) # if ping successful, then result should be 0 self.assertEqual( result, 0, "Check if ping is successful or not" ) except Exception as e: self.fail("Ping failed for ingress rule ID: %s, %s" \ % (ingress_rule_2["id"], e)) return @attr(tags = ["sg", "eip", "advancedsg"]) def test_02_revokeIngressRule_AfterDeployVM(self): """Test Revoke ingress rule after deploy VM""" # Validate the following # 1. createsecuritygroup (ssh-incoming, 22via22) for this account # 2. authorizeSecurityGroupIngress to allow ssh access to the VM # 3. deployVirtualMachine into this security group (ssh-incoming) # 4. authorizeSecurityGroupIngress to allow ssh access (startport:222 # to endport:22) to the VM # 5. check ssh access via port 222 # 6. revokeSecurityGroupIngress to revoke rule added in step 5. verify # that ssh-access into the VM is now NOT allowed through ports 222 # but allowed through port 22 security_group = SecurityGroup.create( self.apiclient, self.services["security_group"], account=self.account.name, domainid=self.account.domainid ) self.debug("Created security group with ID: %s" % security_group.id) # Default Security group should not have any ingress rule sercurity_groups = SecurityGroup.list( self.apiclient, account=self.account.name, domainid=self.account.domainid ) self.assertEqual( isinstance(sercurity_groups, list), True, "Check for list security groups response" ) self.assertEqual( len(sercurity_groups), 2, "Check List Security groups response" ) self.debug( "Authorize Ingress Rule for Security Group %s for account: %s" \ % ( security_group.id, self.account.name )) # Authorize Security group to SSH to VM ingress_rule = security_group.authorize( self.apiclient, self.services["security_group"], account=self.account.name, domainid=self.account.domainid ) self.assertEqual( isinstance(ingress_rule, dict), True, "Check ingress rule created properly" ) self.virtual_machine = VirtualMachine.create( self.apiclient, self.services["virtual_machine"], accountid=self.account.name, domainid=self.account.domainid, serviceofferingid=self.service_offering.id, securitygroupids=[security_group.id] ) self.debug("Deploying VM in account: %s" % self.account.name) self.debug( "Authorize Ingress Rule for Security Group %s for account: %s" \ % ( security_group.id, self.account.name )) # Authorize Security group to SSH to VM ingress_rule_2 = security_group.authorize( self.apiclient, self.services["security_group_2"], account=self.account.name, domainid=self.account.domainid ) self.assertEqual( isinstance(ingress_rule_2, dict), True, "Check ingress rule created properly" ) ssh_rule = (ingress_rule["ingressrule"][0]).__dict__ icmp_rule = (ingress_rule_2["ingressrule"][0]).__dict__ # SSH should be allowed on 22 try: self.debug("Trying to SSH into VM %s on port %s" % ( self.virtual_machine.ssh_ip, self.services["security_group"]["endport"] )) self.virtual_machine.get_ssh_client() except Exception as e: self.fail("SSH access failed for ingress rule ID: %s, %s" \ % (ssh_rule["ruleid"], e)) # User should be able to ping VM try: self.debug("Trying to ping VM %s" % self.virtual_machine.ssh_ip) result = subprocess.call(['ping', '-c 1', self.virtual_machine.ssh_ip]) self.debug("Ping result: %s" % result) # if ping successful, then result should be 0 self.assertEqual( result, 0, "Check if ping is successful or not" ) except Exception as e: self.fail("Ping failed for ingress rule ID: %s, %s" \ % (icmp_rule["ruleid"], e)) self.debug( "Revoke Ingress Rule for Security Group %s for account: %s" \ % ( security_group.id, self.account.name )) result = security_group.revoke( self.apiclient, id=icmp_rule["ruleid"] ) self.debug("Revoke ingress rule result: %s" % result) time.sleep(self.services["sleep"]) # User should not be able to ping VM try: self.debug("Trying to ping VM %s" % self.virtual_machine.ssh_ip) result = subprocess.call(['ping', '-c 1', self.virtual_machine.ssh_ip]) self.debug("Ping result: %s" % result) # if ping successful, then result should be 0 self.assertNotEqual( result, 0, "Check if ping is successful or not" ) except Exception as e: self.fail("Ping failed for ingress rule ID: %s, %s" \ % (icmp_rule["ruleid"], e)) return @attr(tags = ["sg", "eip", "advancedsg"]) def test_03_stopStartVM_verifyIngressAccess(self): """Test Start/Stop VM and Verify ingress rule""" # Validate the following # 1. createsecuritygroup (ssh-incoming, 22via22) for this account # 2. authorizeSecurityGroupIngress to allow ssh access to the VM # 3. deployVirtualMachine into this security group (ssh-incoming) # 4. once the VM is running and ssh-access is available, # stopVirtualMachine # 5. startVirtualMachine. After stop start of the VM is complete # verify that ssh-access to the VM is allowed security_group = SecurityGroup.create( self.apiclient, self.services["security_group"], account=self.account.name, domainid=self.account.domainid ) self.debug("Created security group with ID: %s" % security_group.id) # Default Security group should not have any ingress rule sercurity_groups = SecurityGroup.list( self.apiclient, account=self.account.name, domainid=self.account.domainid ) self.assertEqual( isinstance(sercurity_groups, list), True, "Check for list security groups response" ) self.assertEqual( len(sercurity_groups), 2, "Check List Security groups response" ) self.debug( "Authorize Ingress Rule for Security Group %s for account: %s" \ % ( security_group.id, self.account.name )) # Authorize Security group to SSH to VM ingress_rule = security_group.authorize( self.apiclient, self.services["security_group"], account=self.account.name, domainid=self.account.domainid ) self.assertEqual( isinstance(ingress_rule, dict), True, "Check ingress rule created properly" ) self.virtual_machine = VirtualMachine.create( self.apiclient, self.services["virtual_machine"], accountid=self.account.name, domainid=self.account.domainid, serviceofferingid=self.service_offering.id, securitygroupids=[security_group.id] ) self.debug("Deploying VM in account: %s" % self.account.name) # SSH should be allowed on 22 port try: self.debug("Trying to SSH into VM %s" % self.virtual_machine.ssh_ip) self.virtual_machine.get_ssh_client() except Exception as e: self.fail("SSH access failed for ingress rule ID: %s" \ % ingress_rule["id"] ) self.virtual_machine.stop(self.apiclient) # Sleep to ensure that VM is in stopped state time.sleep(self.services["sleep"]) self.virtual_machine.start(self.apiclient) # Sleep to ensure that VM is in running state time.sleep(self.services["sleep"]) # SSH should be allowed on 22 port after restart try: self.debug("Trying to SSH into VM %s" % self.virtual_machine.ssh_ip) self.virtual_machine.get_ssh_client() except Exception as e: self.fail("SSH access failed for ingress rule ID: %s" \ % ingress_rule["id"] ) return