cloudstack/test/integration/component/test_escalations_securitygroups.py
dahn a1f825e7c7
python3: Migrate Marvin and smoketests to python3 (#4727)
This PR prepares marvin and tests for python3. it was part of #4479, until nose2 was decided to be abandoned from that PR.

Re-PR of #4543 and #3730 to enable cooperation

Co-authored-by: Daan Hoogland <dahn@onecht.net>
Co-authored-by: Gabriel Beims Bräscher <gabriel@apache.org>
Co-authored-by: Rohit Yadav <rohit.yadav@shapeblue.com>
2021-05-04 23:19:37 +05:30

565 lines
28 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#Import Local Modules
from marvin.cloudstackTestCase import cloudstackTestCase
from marvin.lib.base import (SecurityGroup,
Account)
from marvin.lib.common import (get_zone,
get_domain,
get_template)
from marvin.lib.utils import (validateList,
cleanup_resources)
from marvin.codes import (PASS, EMPTY_LIST)
from nose.plugins.attrib import attr
class TestSecurityGroups(cloudstackTestCase):
@classmethod
def setUpClass(cls):
try:
cls._cleanup = []
cls.testClient = super(TestSecurityGroups, cls).getClsTestClient()
cls.api_client = cls.testClient.getApiClient()
cls.services = cls.testClient.getParsedTestDataConfig()
# Get Domain, Zone, Template
cls.domain = get_domain(cls.api_client)
cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests())
cls.template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostype"]
)
cls.services['mode'] = cls.zone.networktype
cls.account = Account.create(
cls.api_client,
cls.services["account"],
domainid=cls.domain.id
)
# Getting authentication for user in newly created Account
cls.user = cls.account.user[0]
cls.userapiclient = cls.testClient.getUserApiClient(cls.user.username, cls.domain.name)
cls._cleanup.append(cls.account)
except Exception as e:
cls.tearDownClass()
raise Exception("Warning: Exception in setup : %s" % e)
return
def setUp(self):
self.apiClient = self.testClient.getApiClient()
self.cleanup = []
def tearDown(self):
#Clean up, terminate the created resources
cleanup_resources(self.apiClient, self.cleanup)
return
@classmethod
def tearDownClass(cls):
try:
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
def __verify_values(self, expected_vals, actual_vals):
"""
@Desc: Function to verify expected and actual values
@Steps:
Step1: Initializing return flag to True
Step1: Verifying length of expected and actual dictionaries is matching.
If not matching returning false
Step2: Listing all the keys from expected dictionary
Step3: Looping through each key from step2 and verifying expected and actual dictionaries have same value
If not making return flag to False
Step4: returning the return flag after all the values are verified
"""
return_flag = True
if len(expected_vals) != len(actual_vals):
return False
keys = list(expected_vals.keys())
for i in range(0, len(expected_vals)):
exp_val = expected_vals[keys[i]]
act_val = actual_vals[keys[i]]
if exp_val == act_val:
return_flag = return_flag and True
else:
return_flag = return_flag and False
self.debug("expected Value: %s, is not matching with actual value: %s" % (
exp_val,
act_val
))
return return_flag
@attr(tags=["basic"], required_hardware="true")
def test_01_list_securitygroups_pagination(self):
"""
@Desc: Test to List Security Groups pagination
@steps:
Step1: Listing all the Security Groups for a user
Step2: Verifying that list size is 1
Step3: Creating (page size) number of Security Groups
Step4: Listing all the Security Groups again for a user
Step5: Verifying that list size is (page size + 1)
Step6: Listing all the Security Groups in page1
Step7: Verifying that list size is (page size)
Step8: Listing all the Security Groups in page2
Step9: Verifying that list size is 1
Step10: Deleting the Security Group present in page 2
Step11: Listing all the Security Groups in page2
Step12: Verifying that no security groups are listed
"""
# Listing all the Security Groups for a User
list_securitygroups_before = SecurityGroup.list(
self.userapiclient,
listall=self.services["listall"]
)
# Verifying that default security group is created
status = validateList(list_securitygroups_before)
self.assertEqual(
PASS,
status[0],
"Default Security Groups creation failed"
)
# Verifying the size of the list is 1
self.assertEqual(
1,
len(list_securitygroups_before),
"Count of Security Groups list is not matching"
)
# Creating pagesize number of security groups
for i in range(0, (self.services["pagesize"])):
securitygroup_created = SecurityGroup.create(
self.userapiclient,
self.services["security_group"],
account=self.account.name,
domainid=self.domain.id,
description=self.services["security_group"]["name"]
)
self.assertIsNotNone(
securitygroup_created,
"Security Group creation failed"
)
if (i < self.services["pagesize"]):
self.cleanup.append(securitygroup_created)
# Listing all the security groups for user again
list_securitygroups_after = SecurityGroup.list(
self.userapiclient,
listall=self.services["listall"]
)
status = validateList(list_securitygroups_after)
self.assertEqual(
PASS,
status[0],
"Security Groups creation failed"
)
# Verifying that list size is pagesize + 1
self.assertEqual(
self.services["pagesize"] + 1,
len(list_securitygroups_after),
"Failed to create pagesize + 1 number of Security Groups"
)
# Listing all the security groups in page 1
list_securitygroups_page1 = SecurityGroup.list(
self.userapiclient,
listall=self.services["listall"],
page=1,
pagesize=self.services["pagesize"]
)
status = validateList(list_securitygroups_page1)
self.assertEqual(
PASS,
status[0],
"Failed to list security groups in page 1"
)
# Verifying the list size to be equal to pagesize
self.assertEqual(
self.services["pagesize"],
len(list_securitygroups_page1),
"Size of security groups in page 1 is not matching"
)
# Listing all the security groups in page 2
list_securitygroups_page2 = SecurityGroup.list(
self.userapiclient,
listall=self.services["listall"],
page=2,
pagesize=self.services["pagesize"]
)
status = validateList(list_securitygroups_page2)
self.assertEqual(
PASS,
status[0],
"Failed to list security groups in page 2"
)
# Verifying the list size to be equal to pagesize
self.assertEqual(
1,
len(list_securitygroups_page2),
"Size of security groups in page 2 is not matching"
)
# Deleting the security group present in page 2
SecurityGroup.delete(
securitygroup_created,
self.userapiclient)
self.cleanup.remove(securitygroup_created)
# Listing all the security groups in page 2 again
list_securitygroups_page2 = SecurityGroup.list(
self.userapiclient,
listall=self.services["listall"],
page=2,
pagesize=self.services["pagesize"]
)
# Verifying that there are no security groups listed
self.assertIsNone(
list_securitygroups_page2,
"Security Groups not deleted from page 2"
)
return
@attr(tags=["basic"], required_hardware="true")
def test_02_securitygroups_authorize_revoke_ingress(self):
"""
@Desc: Test to Authorize and Revoke Ingress for Security Group
@steps:
Step1: Listing all the Security Groups for a user
Step2: Verifying that list size is 1
Step3: Creating a Security Groups
Step4: Listing all the Security Groups again for a user
Step5: Verifying that list size is 2
Step6: Authorizing Ingress for the security group created in step3
Step7: Listing the security groups by passing id of security group created in step3
Step8: Verifying that list size is 1
Step9: Verifying that Ingress is authorized to the security group
Step10: Verifying the details of the Ingress rule are as expected
Step11: Revoking Ingress for the security group created in step3
Step12: Listing the security groups by passing id of security group created in step3
Step13: Verifying that list size is 1
Step14: Verifying that Ingress is revoked from the security group
"""
# Listing all the Security Groups for a User
list_securitygroups_before = SecurityGroup.list(
self.userapiclient,
listall=self.services["listall"]
)
# Verifying that default security group is created
status = validateList(list_securitygroups_before)
self.assertEqual(
PASS,
status[0],
"Default Security Groups creation failed"
)
# Verifying the size of the list is 1
self.assertEqual(
1,
len(list_securitygroups_before),
"Count of Security Groups list is not matching"
)
# Creating a security group
securitygroup_created = SecurityGroup.create(
self.userapiclient,
self.services["security_group"],
account=self.account.name,
domainid=self.domain.id,
description=self.services["security_group"]["name"]
)
self.assertIsNotNone(
securitygroup_created,
"Security Group creation failed"
)
self.cleanup.append(securitygroup_created)
# Listing all the security groups for user again
list_securitygroups_after = SecurityGroup.list(
self.userapiclient,
listall=self.services["listall"]
)
status = validateList(list_securitygroups_after)
self.assertEqual(
PASS,
status[0],
"Security Groups creation failed"
)
# Verifying that list size is 2
self.assertEqual(
2,
len(list_securitygroups_after),
"Failed to create Security Group"
)
# Authorizing Ingress for the security group created in step3
securitygroup_created.authorize(
self.userapiclient,
self.services["ingress_rule"],
self.account.name,
self.domain.id,
)
# Listing the security group by Id
list_securitygroups_byid = SecurityGroup.list(
self.userapiclient,
listall=self.services["listall"],
id=securitygroup_created.id,
domainid=self.domain.id
)
# Verifying that security group is listed
status = validateList(list_securitygroups_byid)
self.assertEqual(
PASS,
status[0],
"Listing of Security Groups by id failed"
)
# Verifying size of the list is 1
self.assertEqual(
1,
len(list_securitygroups_byid),
"Count of the listing security group by id is not matching"
)
securitygroup_ingress = list_securitygroups_byid[0].ingressrule
# Validating the Ingress rule
status = validateList(securitygroup_ingress)
self.assertEqual(
PASS,
status[0],
"Security Groups Ingress rule authorization failed"
)
self.assertEqual(
1,
len(securitygroup_ingress),
"Security Group Ingress rules count is not matching"
)
# Verifying the details of the Ingress rule are as expected
#Creating expected and actual values dictionaries
expected_dict = {
"cidr":self.services["ingress_rule"]["cidrlist"],
"protocol":self.services["ingress_rule"]["protocol"],
"startport":self.services["ingress_rule"]["startport"],
"endport":self.services["ingress_rule"]["endport"],
}
actual_dict = {
"cidr":str(securitygroup_ingress[0].cidr),
"protocol":str(securitygroup_ingress[0].protocol.upper()),
"startport":str(securitygroup_ingress[0].startport),
"endport":str(securitygroup_ingress[0].endport),
}
ingress_status = self.__verify_values(
expected_dict,
actual_dict
)
self.assertEqual(
True,
ingress_status,
"Listed Security group Ingress rule details are not as expected"
)
# Revoking the Ingress rule from Security Group
securitygroup_created.revoke(self.userapiclient, securitygroup_ingress[0].ruleid)
# Listing the security group by Id
list_securitygroups_byid = SecurityGroup.list(
self.userapiclient,
listall=self.services["listall"],
id=securitygroup_created.id,
domainid=self.domain.id
)
# Verifying that security group is listed
status = validateList(list_securitygroups_byid)
self.assertEqual(
PASS,
status[0],
"Listing of Security Groups by id failed"
)
# Verifying size of the list is 1
self.assertEqual(
1,
len(list_securitygroups_byid),
"Count of the listing security group by id is not matching"
)
securitygroup_ingress = list_securitygroups_byid[0].ingressrule
# Verifying that Ingress rule is empty(revoked)
status = validateList(securitygroup_ingress)
self.assertEqual(
EMPTY_LIST,
status[2],
"Security Groups Ingress rule is not revoked"
)
return
@attr(tags=["basic"], required_hardware="true")
def test_03_securitygroups_authorize_revoke_egress(self):
"""
@Desc: Test to Authorize and Revoke Egress for Security Group
@steps:
Step1: Listing all the Security Groups for a user
Step2: Verifying that list size is 1
Step3: Creating a Security Groups
Step4: Listing all the Security Groups again for a user
Step5: Verifying that list size is 2
Step6: Authorizing Egress for the security group created in step3
Step7: Listing the security groups by passing id of security group created in step3
Step8: Verifying that list size is 1
Step9: Verifying that Egress is authorized to the security group
Step10: Verifying the details of the Egress rule are as expected
Step11: Revoking Egress for the security group created in step3
Step12: Listing the security groups by passing id of security group created in step3
Step13: Verifying that list size is 1
Step14: Verifying that Egress is revoked from the security group
"""
# Listing all the Security Groups for a User
list_securitygroups_before = SecurityGroup.list(
self.userapiclient,
listall=self.services["listall"]
)
# Verifying that default security group is created
status = validateList(list_securitygroups_before)
self.assertEqual(
PASS,
status[0],
"Default Security Groups creation failed"
)
# Verifying the size of the list is 1
self.assertEqual(
1,
len(list_securitygroups_before),
"Count of Security Groups list is not matching"
)
# Creating a security group
securitygroup_created = SecurityGroup.create(
self.userapiclient,
self.services["security_group"],
account=self.account.name,
domainid=self.domain.id,
description=self.services["security_group"]["name"]
)
self.assertIsNotNone(
securitygroup_created,
"Security Group creation failed"
)
self.cleanup.append(securitygroup_created)
# Listing all the security groups for user again
list_securitygroups_after = SecurityGroup.list(
self.userapiclient,
listall=self.services["listall"]
)
status = validateList(list_securitygroups_after)
self.assertEqual(
PASS,
status[0],
"Security Groups creation failed"
)
# Verifying that list size is 2
self.assertEqual(
2,
len(list_securitygroups_after),
"Failed to create Security Group"
)
# Authorizing Egress for the security group created in step3
securitygroup_created.authorizeEgress(
self.userapiclient,
self.services["ingress_rule"],
self.account.name,
self.domain.id,
)
# Listing the security group by Id
list_securitygroups_byid = SecurityGroup.list(
self.userapiclient,
listall=self.services["listall"],
id=securitygroup_created.id,
domainid=self.domain.id
)
# Verifying that security group is listed
status = validateList(list_securitygroups_byid)
self.assertEqual(
PASS,
status[0],
"Listing of Security Groups by id failed"
)
# Verifying size of the list is 1
self.assertEqual(
1,
len(list_securitygroups_byid),
"Count of the listing security group by id is not matching"
)
securitygroup_egress = list_securitygroups_byid[0].egressrule
# Validating the Ingress rule
status = validateList(securitygroup_egress)
self.assertEqual(
PASS,
status[0],
"Security Groups Egress rule authorization failed"
)
self.assertEqual(
1,
len(securitygroup_egress),
"Security Group Egress rules count is not matching"
)
# Verifying the details of the Egress rule are as expected
#Creating expected and actual values dictionaries
expected_dict = {
"cidr":self.services["ingress_rule"]["cidrlist"],
"protocol":self.services["ingress_rule"]["protocol"],
"startport":self.services["ingress_rule"]["startport"],
"endport":self.services["ingress_rule"]["endport"],
}
actual_dict = {
"cidr":str(securitygroup_egress[0].cidr),
"protocol":str(securitygroup_egress[0].protocol.upper()),
"startport":str(securitygroup_egress[0].startport),
"endport":str(securitygroup_egress[0].endport),
}
ingress_status = self.__verify_values(
expected_dict,
actual_dict
)
self.assertEqual(
True,
ingress_status,
"Listed Security group Egress rule details are not as expected"
)
# Revoking the Egress rule from Security Group
securitygroup_created.revokeEgress(self.userapiclient, securitygroup_egress[0].ruleid)
# Listing the security group by Id
list_securitygroups_byid = SecurityGroup.list(
self.userapiclient,
listall=self.services["listall"],
id=securitygroup_created.id,
domainid=self.domain.id
)
# Verifying that security group is listed
status = validateList(list_securitygroups_byid)
self.assertEqual(
PASS,
status[0],
"Listing of Security Groups by id failed"
)
# Verifying size of the list is 1
self.assertEqual(
1,
len(list_securitygroups_byid),
"Count of the listing security group by id is not matching"
)
securitygroup_egress = list_securitygroups_byid[0].egressrule
# Verifying that Ingress rule is empty(revoked)
status = validateList(securitygroup_egress)
self.assertEqual(
EMPTY_LIST,
status[2],
"Security Groups Egress rule is not revoked"
)
return