# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. #Import Local Modules from marvin.cloudstackTestCase import cloudstackTestCase from marvin.lib.base import (SecurityGroup, Account) from marvin.lib.common import (get_zone, get_domain, get_template) from marvin.lib.utils import (validateList, cleanup_resources) from marvin.codes import (PASS, EMPTY_LIST) from nose.plugins.attrib import attr class TestSecurityGroups(cloudstackTestCase): @classmethod def setUpClass(cls): try: cls._cleanup = [] cls.testClient = super(TestSecurityGroups, cls).getClsTestClient() cls.api_client = cls.testClient.getApiClient() cls.services = cls.testClient.getParsedTestDataConfig() # Get Domain, Zone, Template cls.domain = get_domain(cls.api_client) cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests()) cls.template = get_template( cls.api_client, cls.zone.id, cls.services["ostype"] ) cls.services['mode'] = cls.zone.networktype cls.account = Account.create( cls.api_client, cls.services["account"], domainid=cls.domain.id ) # Getting authentication for user in newly created Account cls.user = cls.account.user[0] cls.userapiclient = cls.testClient.getUserApiClient(cls.user.username, cls.domain.name) cls._cleanup.append(cls.account) except Exception as e: cls.tearDownClass() raise Exception("Warning: Exception in setup : %s" % e) return def setUp(self): self.apiClient = self.testClient.getApiClient() self.cleanup = [] def tearDown(self): #Clean up, terminate the created resources cleanup_resources(self.apiClient, self.cleanup) return @classmethod def tearDownClass(cls): try: cleanup_resources(cls.api_client, cls._cleanup) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) return def __verify_values(self, expected_vals, actual_vals): """ @Desc: Function to verify expected and actual values @Steps: Step1: Initializing return flag to True Step1: Verifying length of expected and actual dictionaries is matching. If not matching returning false Step2: Listing all the keys from expected dictionary Step3: Looping through each key from step2 and verifying expected and actual dictionaries have same value If not making return flag to False Step4: returning the return flag after all the values are verified """ return_flag = True if len(expected_vals) != len(actual_vals): return False keys = list(expected_vals.keys()) for i in range(0, len(expected_vals)): exp_val = expected_vals[keys[i]] act_val = actual_vals[keys[i]] if exp_val == act_val: return_flag = return_flag and True else: return_flag = return_flag and False self.debug("expected Value: %s, is not matching with actual value: %s" % ( exp_val, act_val )) return return_flag @attr(tags=["basic"], required_hardware="true") def test_01_list_securitygroups_pagination(self): """ @Desc: Test to List Security Groups pagination @steps: Step1: Listing all the Security Groups for a user Step2: Verifying that list size is 1 Step3: Creating (page size) number of Security Groups Step4: Listing all the Security Groups again for a user Step5: Verifying that list size is (page size + 1) Step6: Listing all the Security Groups in page1 Step7: Verifying that list size is (page size) Step8: Listing all the Security Groups in page2 Step9: Verifying that list size is 1 Step10: Deleting the Security Group present in page 2 Step11: Listing all the Security Groups in page2 Step12: Verifying that no security groups are listed """ # Listing all the Security Groups for a User list_securitygroups_before = SecurityGroup.list( self.userapiclient, listall=self.services["listall"] ) # Verifying that default security group is created status = validateList(list_securitygroups_before) self.assertEqual( PASS, status[0], "Default Security Groups creation failed" ) # Verifying the size of the list is 1 self.assertEqual( 1, len(list_securitygroups_before), "Count of Security Groups list is not matching" ) # Creating pagesize number of security groups for i in range(0, (self.services["pagesize"])): securitygroup_created = SecurityGroup.create( self.userapiclient, self.services["security_group"], account=self.account.name, domainid=self.domain.id, description=self.services["security_group"]["name"] ) self.assertIsNotNone( securitygroup_created, "Security Group creation failed" ) if (i < self.services["pagesize"]): self.cleanup.append(securitygroup_created) # Listing all the security groups for user again list_securitygroups_after = SecurityGroup.list( self.userapiclient, listall=self.services["listall"] ) status = validateList(list_securitygroups_after) self.assertEqual( PASS, status[0], "Security Groups creation failed" ) # Verifying that list size is pagesize + 1 self.assertEqual( self.services["pagesize"] + 1, len(list_securitygroups_after), "Failed to create pagesize + 1 number of Security Groups" ) # Listing all the security groups in page 1 list_securitygroups_page1 = SecurityGroup.list( self.userapiclient, listall=self.services["listall"], page=1, pagesize=self.services["pagesize"] ) status = validateList(list_securitygroups_page1) self.assertEqual( PASS, status[0], "Failed to list security groups in page 1" ) # Verifying the list size to be equal to pagesize self.assertEqual( self.services["pagesize"], len(list_securitygroups_page1), "Size of security groups in page 1 is not matching" ) # Listing all the security groups in page 2 list_securitygroups_page2 = SecurityGroup.list( self.userapiclient, listall=self.services["listall"], page=2, pagesize=self.services["pagesize"] ) status = validateList(list_securitygroups_page2) self.assertEqual( PASS, status[0], "Failed to list security groups in page 2" ) # Verifying the list size to be equal to pagesize self.assertEqual( 1, len(list_securitygroups_page2), "Size of security groups in page 2 is not matching" ) # Deleting the security group present in page 2 SecurityGroup.delete( securitygroup_created, self.userapiclient) self.cleanup.remove(securitygroup_created) # Listing all the security groups in page 2 again list_securitygroups_page2 = SecurityGroup.list( self.userapiclient, listall=self.services["listall"], page=2, pagesize=self.services["pagesize"] ) # Verifying that there are no security groups listed self.assertIsNone( list_securitygroups_page2, "Security Groups not deleted from page 2" ) return @attr(tags=["basic"], required_hardware="true") def test_02_securitygroups_authorize_revoke_ingress(self): """ @Desc: Test to Authorize and Revoke Ingress for Security Group @steps: Step1: Listing all the Security Groups for a user Step2: Verifying that list size is 1 Step3: Creating a Security Groups Step4: Listing all the Security Groups again for a user Step5: Verifying that list size is 2 Step6: Authorizing Ingress for the security group created in step3 Step7: Listing the security groups by passing id of security group created in step3 Step8: Verifying that list size is 1 Step9: Verifying that Ingress is authorized to the security group Step10: Verifying the details of the Ingress rule are as expected Step11: Revoking Ingress for the security group created in step3 Step12: Listing the security groups by passing id of security group created in step3 Step13: Verifying that list size is 1 Step14: Verifying that Ingress is revoked from the security group """ # Listing all the Security Groups for a User list_securitygroups_before = SecurityGroup.list( self.userapiclient, listall=self.services["listall"] ) # Verifying that default security group is created status = validateList(list_securitygroups_before) self.assertEqual( PASS, status[0], "Default Security Groups creation failed" ) # Verifying the size of the list is 1 self.assertEqual( 1, len(list_securitygroups_before), "Count of Security Groups list is not matching" ) # Creating a security group securitygroup_created = SecurityGroup.create( self.userapiclient, self.services["security_group"], account=self.account.name, domainid=self.domain.id, description=self.services["security_group"]["name"] ) self.assertIsNotNone( securitygroup_created, "Security Group creation failed" ) self.cleanup.append(securitygroup_created) # Listing all the security groups for user again list_securitygroups_after = SecurityGroup.list( self.userapiclient, listall=self.services["listall"] ) status = validateList(list_securitygroups_after) self.assertEqual( PASS, status[0], "Security Groups creation failed" ) # Verifying that list size is 2 self.assertEqual( 2, len(list_securitygroups_after), "Failed to create Security Group" ) # Authorizing Ingress for the security group created in step3 securitygroup_created.authorize( self.userapiclient, self.services["ingress_rule"], self.account.name, self.domain.id, ) # Listing the security group by Id list_securitygroups_byid = SecurityGroup.list( self.userapiclient, listall=self.services["listall"], id=securitygroup_created.id, domainid=self.domain.id ) # Verifying that security group is listed status = validateList(list_securitygroups_byid) self.assertEqual( PASS, status[0], "Listing of Security Groups by id failed" ) # Verifying size of the list is 1 self.assertEqual( 1, len(list_securitygroups_byid), "Count of the listing security group by id is not matching" ) securitygroup_ingress = list_securitygroups_byid[0].ingressrule # Validating the Ingress rule status = validateList(securitygroup_ingress) self.assertEqual( PASS, status[0], "Security Groups Ingress rule authorization failed" ) self.assertEqual( 1, len(securitygroup_ingress), "Security Group Ingress rules count is not matching" ) # Verifying the details of the Ingress rule are as expected #Creating expected and actual values dictionaries expected_dict = { "cidr":self.services["ingress_rule"]["cidrlist"], "protocol":self.services["ingress_rule"]["protocol"], "startport":self.services["ingress_rule"]["startport"], "endport":self.services["ingress_rule"]["endport"], } actual_dict = { "cidr":str(securitygroup_ingress[0].cidr), "protocol":str(securitygroup_ingress[0].protocol.upper()), "startport":str(securitygroup_ingress[0].startport), "endport":str(securitygroup_ingress[0].endport), } ingress_status = self.__verify_values( expected_dict, actual_dict ) self.assertEqual( True, ingress_status, "Listed Security group Ingress rule details are not as expected" ) # Revoking the Ingress rule from Security Group securitygroup_created.revoke(self.userapiclient, securitygroup_ingress[0].ruleid) # Listing the security group by Id list_securitygroups_byid = SecurityGroup.list( self.userapiclient, listall=self.services["listall"], id=securitygroup_created.id, domainid=self.domain.id ) # Verifying that security group is listed status = validateList(list_securitygroups_byid) self.assertEqual( PASS, status[0], "Listing of Security Groups by id failed" ) # Verifying size of the list is 1 self.assertEqual( 1, len(list_securitygroups_byid), "Count of the listing security group by id is not matching" ) securitygroup_ingress = list_securitygroups_byid[0].ingressrule # Verifying that Ingress rule is empty(revoked) status = validateList(securitygroup_ingress) self.assertEqual( EMPTY_LIST, status[2], "Security Groups Ingress rule is not revoked" ) return @attr(tags=["basic"], required_hardware="true") def test_03_securitygroups_authorize_revoke_egress(self): """ @Desc: Test to Authorize and Revoke Egress for Security Group @steps: Step1: Listing all the Security Groups for a user Step2: Verifying that list size is 1 Step3: Creating a Security Groups Step4: Listing all the Security Groups again for a user Step5: Verifying that list size is 2 Step6: Authorizing Egress for the security group created in step3 Step7: Listing the security groups by passing id of security group created in step3 Step8: Verifying that list size is 1 Step9: Verifying that Egress is authorized to the security group Step10: Verifying the details of the Egress rule are as expected Step11: Revoking Egress for the security group created in step3 Step12: Listing the security groups by passing id of security group created in step3 Step13: Verifying that list size is 1 Step14: Verifying that Egress is revoked from the security group """ # Listing all the Security Groups for a User list_securitygroups_before = SecurityGroup.list( self.userapiclient, listall=self.services["listall"] ) # Verifying that default security group is created status = validateList(list_securitygroups_before) self.assertEqual( PASS, status[0], "Default Security Groups creation failed" ) # Verifying the size of the list is 1 self.assertEqual( 1, len(list_securitygroups_before), "Count of Security Groups list is not matching" ) # Creating a security group securitygroup_created = SecurityGroup.create( self.userapiclient, self.services["security_group"], account=self.account.name, domainid=self.domain.id, description=self.services["security_group"]["name"] ) self.assertIsNotNone( securitygroup_created, "Security Group creation failed" ) self.cleanup.append(securitygroup_created) # Listing all the security groups for user again list_securitygroups_after = SecurityGroup.list( self.userapiclient, listall=self.services["listall"] ) status = validateList(list_securitygroups_after) self.assertEqual( PASS, status[0], "Security Groups creation failed" ) # Verifying that list size is 2 self.assertEqual( 2, len(list_securitygroups_after), "Failed to create Security Group" ) # Authorizing Egress for the security group created in step3 securitygroup_created.authorizeEgress( self.userapiclient, self.services["ingress_rule"], self.account.name, self.domain.id, ) # Listing the security group by Id list_securitygroups_byid = SecurityGroup.list( self.userapiclient, listall=self.services["listall"], id=securitygroup_created.id, domainid=self.domain.id ) # Verifying that security group is listed status = validateList(list_securitygroups_byid) self.assertEqual( PASS, status[0], "Listing of Security Groups by id failed" ) # Verifying size of the list is 1 self.assertEqual( 1, len(list_securitygroups_byid), "Count of the listing security group by id is not matching" ) securitygroup_egress = list_securitygroups_byid[0].egressrule # Validating the Ingress rule status = validateList(securitygroup_egress) self.assertEqual( PASS, status[0], "Security Groups Egress rule authorization failed" ) self.assertEqual( 1, len(securitygroup_egress), "Security Group Egress rules count is not matching" ) # Verifying the details of the Egress rule are as expected #Creating expected and actual values dictionaries expected_dict = { "cidr":self.services["ingress_rule"]["cidrlist"], "protocol":self.services["ingress_rule"]["protocol"], "startport":self.services["ingress_rule"]["startport"], "endport":self.services["ingress_rule"]["endport"], } actual_dict = { "cidr":str(securitygroup_egress[0].cidr), "protocol":str(securitygroup_egress[0].protocol.upper()), "startport":str(securitygroup_egress[0].startport), "endport":str(securitygroup_egress[0].endport), } ingress_status = self.__verify_values( expected_dict, actual_dict ) self.assertEqual( True, ingress_status, "Listed Security group Egress rule details are not as expected" ) # Revoking the Egress rule from Security Group securitygroup_created.revokeEgress(self.userapiclient, securitygroup_egress[0].ruleid) # Listing the security group by Id list_securitygroups_byid = SecurityGroup.list( self.userapiclient, listall=self.services["listall"], id=securitygroup_created.id, domainid=self.domain.id ) # Verifying that security group is listed status = validateList(list_securitygroups_byid) self.assertEqual( PASS, status[0], "Listing of Security Groups by id failed" ) # Verifying size of the list is 1 self.assertEqual( 1, len(list_securitygroups_byid), "Count of the listing security group by id is not matching" ) securitygroup_egress = list_securitygroups_byid[0].egressrule # Verifying that Ingress rule is empty(revoked) status = validateList(securitygroup_egress) self.assertEqual( EMPTY_LIST, status[2], "Security Groups Egress rule is not revoked" ) return