mirror of
https://github.com/apache/cloudstack.git
synced 2025-10-26 08:42:29 +01:00
565 lines
28 KiB
Python
565 lines
28 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
#Import Local Modules
|
|
from marvin.cloudstackTestCase import cloudstackTestCase
|
|
from marvin.lib.base import (SecurityGroup,
|
|
Account)
|
|
from marvin.lib.common import (get_zone,
|
|
get_domain,
|
|
get_template)
|
|
from marvin.lib.utils import (validateList,
|
|
cleanup_resources)
|
|
from marvin.codes import (PASS, EMPTY_LIST)
|
|
from nose.plugins.attrib import attr
|
|
|
|
class TestSecurityGroups(cloudstackTestCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
try:
|
|
cls._cleanup = []
|
|
cls.testClient = super(TestSecurityGroups, cls).getClsTestClient()
|
|
cls.api_client = cls.testClient.getApiClient()
|
|
cls.services = cls.testClient.getParsedTestDataConfig()
|
|
# Get Domain, Zone, Template
|
|
cls.domain = get_domain(cls.api_client)
|
|
cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests())
|
|
cls.template = get_template(
|
|
cls.api_client,
|
|
cls.zone.id,
|
|
cls.services["ostype"]
|
|
)
|
|
cls.services['mode'] = cls.zone.networktype
|
|
cls.account = Account.create(
|
|
cls.api_client,
|
|
cls.services["account"],
|
|
domainid=cls.domain.id
|
|
)
|
|
# Getting authentication for user in newly created Account
|
|
cls.user = cls.account.user[0]
|
|
cls.userapiclient = cls.testClient.getUserApiClient(cls.user.username, cls.domain.name)
|
|
cls._cleanup.append(cls.account)
|
|
except Exception as e:
|
|
cls.tearDownClass()
|
|
raise Exception("Warning: Exception in setup : %s" % e)
|
|
return
|
|
|
|
def setUp(self):
|
|
|
|
self.apiClient = self.testClient.getApiClient()
|
|
self.cleanup = []
|
|
|
|
def tearDown(self):
|
|
#Clean up, terminate the created resources
|
|
cleanup_resources(self.apiClient, self.cleanup)
|
|
return
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
try:
|
|
cleanup_resources(cls.api_client, cls._cleanup)
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
|
|
return
|
|
|
|
def __verify_values(self, expected_vals, actual_vals):
|
|
"""
|
|
@Desc: Function to verify expected and actual values
|
|
@Steps:
|
|
Step1: Initializing return flag to True
|
|
Step1: Verifying length of expected and actual dictionaries is matching.
|
|
If not matching returning false
|
|
Step2: Listing all the keys from expected dictionary
|
|
Step3: Looping through each key from step2 and verifying expected and actual dictionaries have same value
|
|
If not making return flag to False
|
|
Step4: returning the return flag after all the values are verified
|
|
"""
|
|
return_flag = True
|
|
|
|
if len(expected_vals) != len(actual_vals):
|
|
return False
|
|
|
|
keys = expected_vals.keys()
|
|
for i in range(0, len(expected_vals)):
|
|
exp_val = expected_vals[keys[i]]
|
|
act_val = actual_vals[keys[i]]
|
|
if exp_val == act_val:
|
|
return_flag = return_flag and True
|
|
else:
|
|
return_flag = return_flag and False
|
|
self.debug("expected Value: %s, is not matching with actual value: %s" % (
|
|
exp_val,
|
|
act_val
|
|
))
|
|
return return_flag
|
|
|
|
@attr(tags=["basic"], required_hardware="true")
|
|
def test_01_list_securitygroups_pagination(self):
|
|
"""
|
|
@Desc: Test to List Security Groups pagination
|
|
@steps:
|
|
Step1: Listing all the Security Groups for a user
|
|
Step2: Verifying that list size is 1
|
|
Step3: Creating (page size) number of Security Groups
|
|
Step4: Listing all the Security Groups again for a user
|
|
Step5: Verifying that list size is (page size + 1)
|
|
Step6: Listing all the Security Groups in page1
|
|
Step7: Verifying that list size is (page size)
|
|
Step8: Listing all the Security Groups in page2
|
|
Step9: Verifying that list size is 1
|
|
Step10: Deleting the Security Group present in page 2
|
|
Step11: Listing all the Security Groups in page2
|
|
Step12: Verifying that no security groups are listed
|
|
"""
|
|
# Listing all the Security Groups for a User
|
|
list_securitygroups_before = SecurityGroup.list(
|
|
self.userapiclient,
|
|
listall=self.services["listall"]
|
|
)
|
|
# Verifying that default security group is created
|
|
status = validateList(list_securitygroups_before)
|
|
self.assertEquals(
|
|
PASS,
|
|
status[0],
|
|
"Default Security Groups creation failed"
|
|
)
|
|
# Verifying the size of the list is 1
|
|
self.assertEquals(
|
|
1,
|
|
len(list_securitygroups_before),
|
|
"Count of Security Groups list is not matching"
|
|
)
|
|
# Creating pagesize number of security groups
|
|
for i in range(0, (self.services["pagesize"])):
|
|
securitygroup_created = SecurityGroup.create(
|
|
self.userapiclient,
|
|
self.services["security_group"],
|
|
account=self.account.name,
|
|
domainid=self.domain.id,
|
|
description=self.services["security_group"]["name"]
|
|
)
|
|
self.assertIsNotNone(
|
|
securitygroup_created,
|
|
"Security Group creation failed"
|
|
)
|
|
if (i < self.services["pagesize"]):
|
|
self.cleanup.append(securitygroup_created)
|
|
|
|
# Listing all the security groups for user again
|
|
list_securitygroups_after = SecurityGroup.list(
|
|
self.userapiclient,
|
|
listall=self.services["listall"]
|
|
)
|
|
status = validateList(list_securitygroups_after)
|
|
self.assertEquals(
|
|
PASS,
|
|
status[0],
|
|
"Security Groups creation failed"
|
|
)
|
|
# Verifying that list size is pagesize + 1
|
|
self.assertEquals(
|
|
self.services["pagesize"] + 1,
|
|
len(list_securitygroups_after),
|
|
"Failed to create pagesize + 1 number of Security Groups"
|
|
)
|
|
# Listing all the security groups in page 1
|
|
list_securitygroups_page1 = SecurityGroup.list(
|
|
self.userapiclient,
|
|
listall=self.services["listall"],
|
|
page=1,
|
|
pagesize=self.services["pagesize"]
|
|
)
|
|
status = validateList(list_securitygroups_page1)
|
|
self.assertEquals(
|
|
PASS,
|
|
status[0],
|
|
"Failed to list security groups in page 1"
|
|
)
|
|
# Verifying the list size to be equal to pagesize
|
|
self.assertEquals(
|
|
self.services["pagesize"],
|
|
len(list_securitygroups_page1),
|
|
"Size of security groups in page 1 is not matching"
|
|
)
|
|
# Listing all the security groups in page 2
|
|
list_securitygroups_page2 = SecurityGroup.list(
|
|
self.userapiclient,
|
|
listall=self.services["listall"],
|
|
page=2,
|
|
pagesize=self.services["pagesize"]
|
|
)
|
|
status = validateList(list_securitygroups_page2)
|
|
self.assertEquals(
|
|
PASS,
|
|
status[0],
|
|
"Failed to list security groups in page 2"
|
|
)
|
|
# Verifying the list size to be equal to pagesize
|
|
self.assertEquals(
|
|
1,
|
|
len(list_securitygroups_page2),
|
|
"Size of security groups in page 2 is not matching"
|
|
)
|
|
# Deleting the security group present in page 2
|
|
SecurityGroup.delete(
|
|
securitygroup_created,
|
|
self.userapiclient)
|
|
self.cleanup.remove(securitygroup_created)
|
|
# Listing all the security groups in page 2 again
|
|
list_securitygroups_page2 = SecurityGroup.list(
|
|
self.userapiclient,
|
|
listall=self.services["listall"],
|
|
page=2,
|
|
pagesize=self.services["pagesize"]
|
|
)
|
|
# Verifying that there are no security groups listed
|
|
self.assertIsNone(
|
|
list_securitygroups_page2,
|
|
"Security Groups not deleted from page 2"
|
|
)
|
|
return
|
|
|
|
@attr(tags=["basic"], required_hardware="true")
|
|
def test_02_securitygroups_authorize_revoke_ingress(self):
|
|
"""
|
|
@Desc: Test to Authorize and Revoke Ingress for Security Group
|
|
@steps:
|
|
Step1: Listing all the Security Groups for a user
|
|
Step2: Verifying that list size is 1
|
|
Step3: Creating a Security Groups
|
|
Step4: Listing all the Security Groups again for a user
|
|
Step5: Verifying that list size is 2
|
|
Step6: Authorizing Ingress for the security group created in step3
|
|
Step7: Listing the security groups by passing id of security group created in step3
|
|
Step8: Verifying that list size is 1
|
|
Step9: Verifying that Ingress is authorized to the security group
|
|
Step10: Verifying the details of the Ingress rule are as expected
|
|
Step11: Revoking Ingress for the security group created in step3
|
|
Step12: Listing the security groups by passing id of security group created in step3
|
|
Step13: Verifying that list size is 1
|
|
Step14: Verifying that Ingress is revoked from the security group
|
|
"""
|
|
# Listing all the Security Groups for a User
|
|
list_securitygroups_before = SecurityGroup.list(
|
|
self.userapiclient,
|
|
listall=self.services["listall"]
|
|
)
|
|
# Verifying that default security group is created
|
|
status = validateList(list_securitygroups_before)
|
|
self.assertEquals(
|
|
PASS,
|
|
status[0],
|
|
"Default Security Groups creation failed"
|
|
)
|
|
# Verifying the size of the list is 1
|
|
self.assertEquals(
|
|
1,
|
|
len(list_securitygroups_before),
|
|
"Count of Security Groups list is not matching"
|
|
)
|
|
# Creating a security group
|
|
securitygroup_created = SecurityGroup.create(
|
|
self.userapiclient,
|
|
self.services["security_group"],
|
|
account=self.account.name,
|
|
domainid=self.domain.id,
|
|
description=self.services["security_group"]["name"]
|
|
)
|
|
self.assertIsNotNone(
|
|
securitygroup_created,
|
|
"Security Group creation failed"
|
|
)
|
|
self.cleanup.append(securitygroup_created)
|
|
|
|
# Listing all the security groups for user again
|
|
list_securitygroups_after = SecurityGroup.list(
|
|
self.userapiclient,
|
|
listall=self.services["listall"]
|
|
)
|
|
status = validateList(list_securitygroups_after)
|
|
self.assertEquals(
|
|
PASS,
|
|
status[0],
|
|
"Security Groups creation failed"
|
|
)
|
|
# Verifying that list size is 2
|
|
self.assertEquals(
|
|
2,
|
|
len(list_securitygroups_after),
|
|
"Failed to create Security Group"
|
|
)
|
|
# Authorizing Ingress for the security group created in step3
|
|
securitygroup_created.authorize(
|
|
self.userapiclient,
|
|
self.services["ingress_rule"],
|
|
self.account.name,
|
|
self.domain.id,
|
|
)
|
|
# Listing the security group by Id
|
|
list_securitygroups_byid = SecurityGroup.list(
|
|
self.userapiclient,
|
|
listall=self.services["listall"],
|
|
id=securitygroup_created.id,
|
|
domainid=self.domain.id
|
|
)
|
|
# Verifying that security group is listed
|
|
status = validateList(list_securitygroups_byid)
|
|
self.assertEquals(
|
|
PASS,
|
|
status[0],
|
|
"Listing of Security Groups by id failed"
|
|
)
|
|
# Verifying size of the list is 1
|
|
self.assertEquals(
|
|
1,
|
|
len(list_securitygroups_byid),
|
|
"Count of the listing security group by id is not matching"
|
|
)
|
|
securitygroup_ingress = list_securitygroups_byid[0].ingressrule
|
|
# Validating the Ingress rule
|
|
status = validateList(securitygroup_ingress)
|
|
self.assertEquals(
|
|
PASS,
|
|
status[0],
|
|
"Security Groups Ingress rule authorization failed"
|
|
)
|
|
self.assertEquals(
|
|
1,
|
|
len(securitygroup_ingress),
|
|
"Security Group Ingress rules count is not matching"
|
|
)
|
|
# Verifying the details of the Ingress rule are as expected
|
|
#Creating expected and actual values dictionaries
|
|
expected_dict = {
|
|
"cidr":self.services["ingress_rule"]["cidrlist"],
|
|
"protocol":self.services["ingress_rule"]["protocol"],
|
|
"startport":self.services["ingress_rule"]["startport"],
|
|
"endport":self.services["ingress_rule"]["endport"],
|
|
}
|
|
actual_dict = {
|
|
"cidr":str(securitygroup_ingress[0].cidr),
|
|
"protocol":str(securitygroup_ingress[0].protocol.upper()),
|
|
"startport":str(securitygroup_ingress[0].startport),
|
|
"endport":str(securitygroup_ingress[0].endport),
|
|
}
|
|
ingress_status = self.__verify_values(
|
|
expected_dict,
|
|
actual_dict
|
|
)
|
|
self.assertEqual(
|
|
True,
|
|
ingress_status,
|
|
"Listed Security group Ingress rule details are not as expected"
|
|
)
|
|
# Revoking the Ingress rule from Security Group
|
|
securitygroup_created.revoke(self.userapiclient, securitygroup_ingress[0].ruleid)
|
|
# Listing the security group by Id
|
|
list_securitygroups_byid = SecurityGroup.list(
|
|
self.userapiclient,
|
|
listall=self.services["listall"],
|
|
id=securitygroup_created.id,
|
|
domainid=self.domain.id
|
|
)
|
|
# Verifying that security group is listed
|
|
status = validateList(list_securitygroups_byid)
|
|
self.assertEquals(
|
|
PASS,
|
|
status[0],
|
|
"Listing of Security Groups by id failed"
|
|
)
|
|
# Verifying size of the list is 1
|
|
self.assertEquals(
|
|
1,
|
|
len(list_securitygroups_byid),
|
|
"Count of the listing security group by id is not matching"
|
|
)
|
|
securitygroup_ingress = list_securitygroups_byid[0].ingressrule
|
|
# Verifying that Ingress rule is empty(revoked)
|
|
status = validateList(securitygroup_ingress)
|
|
self.assertEquals(
|
|
EMPTY_LIST,
|
|
status[2],
|
|
"Security Groups Ingress rule is not revoked"
|
|
)
|
|
return
|
|
|
|
@attr(tags=["basic"], required_hardware="true")
|
|
def test_03_securitygroups_authorize_revoke_egress(self):
|
|
"""
|
|
@Desc: Test to Authorize and Revoke Egress for Security Group
|
|
@steps:
|
|
Step1: Listing all the Security Groups for a user
|
|
Step2: Verifying that list size is 1
|
|
Step3: Creating a Security Groups
|
|
Step4: Listing all the Security Groups again for a user
|
|
Step5: Verifying that list size is 2
|
|
Step6: Authorizing Egress for the security group created in step3
|
|
Step7: Listing the security groups by passing id of security group created in step3
|
|
Step8: Verifying that list size is 1
|
|
Step9: Verifying that Egress is authorized to the security group
|
|
Step10: Verifying the details of the Egress rule are as expected
|
|
Step11: Revoking Egress for the security group created in step3
|
|
Step12: Listing the security groups by passing id of security group created in step3
|
|
Step13: Verifying that list size is 1
|
|
Step14: Verifying that Egress is revoked from the security group
|
|
"""
|
|
# Listing all the Security Groups for a User
|
|
list_securitygroups_before = SecurityGroup.list(
|
|
self.userapiclient,
|
|
listall=self.services["listall"]
|
|
)
|
|
# Verifying that default security group is created
|
|
status = validateList(list_securitygroups_before)
|
|
self.assertEquals(
|
|
PASS,
|
|
status[0],
|
|
"Default Security Groups creation failed"
|
|
)
|
|
# Verifying the size of the list is 1
|
|
self.assertEquals(
|
|
1,
|
|
len(list_securitygroups_before),
|
|
"Count of Security Groups list is not matching"
|
|
)
|
|
# Creating a security group
|
|
securitygroup_created = SecurityGroup.create(
|
|
self.userapiclient,
|
|
self.services["security_group"],
|
|
account=self.account.name,
|
|
domainid=self.domain.id,
|
|
description=self.services["security_group"]["name"]
|
|
)
|
|
self.assertIsNotNone(
|
|
securitygroup_created,
|
|
"Security Group creation failed"
|
|
)
|
|
self.cleanup.append(securitygroup_created)
|
|
|
|
# Listing all the security groups for user again
|
|
list_securitygroups_after = SecurityGroup.list(
|
|
self.userapiclient,
|
|
listall=self.services["listall"]
|
|
)
|
|
status = validateList(list_securitygroups_after)
|
|
self.assertEquals(
|
|
PASS,
|
|
status[0],
|
|
"Security Groups creation failed"
|
|
)
|
|
# Verifying that list size is 2
|
|
self.assertEquals(
|
|
2,
|
|
len(list_securitygroups_after),
|
|
"Failed to create Security Group"
|
|
)
|
|
# Authorizing Egress for the security group created in step3
|
|
securitygroup_created.authorizeEgress(
|
|
self.userapiclient,
|
|
self.services["ingress_rule"],
|
|
self.account.name,
|
|
self.domain.id,
|
|
)
|
|
# Listing the security group by Id
|
|
list_securitygroups_byid = SecurityGroup.list(
|
|
self.userapiclient,
|
|
listall=self.services["listall"],
|
|
id=securitygroup_created.id,
|
|
domainid=self.domain.id
|
|
)
|
|
# Verifying that security group is listed
|
|
status = validateList(list_securitygroups_byid)
|
|
self.assertEquals(
|
|
PASS,
|
|
status[0],
|
|
"Listing of Security Groups by id failed"
|
|
)
|
|
# Verifying size of the list is 1
|
|
self.assertEquals(
|
|
1,
|
|
len(list_securitygroups_byid),
|
|
"Count of the listing security group by id is not matching"
|
|
)
|
|
securitygroup_egress = list_securitygroups_byid[0].egressrule
|
|
# Validating the Ingress rule
|
|
status = validateList(securitygroup_egress)
|
|
self.assertEquals(
|
|
PASS,
|
|
status[0],
|
|
"Security Groups Egress rule authorization failed"
|
|
)
|
|
self.assertEquals(
|
|
1,
|
|
len(securitygroup_egress),
|
|
"Security Group Egress rules count is not matching"
|
|
)
|
|
# Verifying the details of the Egress rule are as expected
|
|
#Creating expected and actual values dictionaries
|
|
expected_dict = {
|
|
"cidr":self.services["ingress_rule"]["cidrlist"],
|
|
"protocol":self.services["ingress_rule"]["protocol"],
|
|
"startport":self.services["ingress_rule"]["startport"],
|
|
"endport":self.services["ingress_rule"]["endport"],
|
|
}
|
|
actual_dict = {
|
|
"cidr":str(securitygroup_egress[0].cidr),
|
|
"protocol":str(securitygroup_egress[0].protocol.upper()),
|
|
"startport":str(securitygroup_egress[0].startport),
|
|
"endport":str(securitygroup_egress[0].endport),
|
|
}
|
|
ingress_status = self.__verify_values(
|
|
expected_dict,
|
|
actual_dict
|
|
)
|
|
self.assertEqual(
|
|
True,
|
|
ingress_status,
|
|
"Listed Security group Egress rule details are not as expected"
|
|
)
|
|
# Revoking the Egress rule from Security Group
|
|
securitygroup_created.revokeEgress(self.userapiclient, securitygroup_egress[0].ruleid)
|
|
# Listing the security group by Id
|
|
list_securitygroups_byid = SecurityGroup.list(
|
|
self.userapiclient,
|
|
listall=self.services["listall"],
|
|
id=securitygroup_created.id,
|
|
domainid=self.domain.id
|
|
)
|
|
# Verifying that security group is listed
|
|
status = validateList(list_securitygroups_byid)
|
|
self.assertEquals(
|
|
PASS,
|
|
status[0],
|
|
"Listing of Security Groups by id failed"
|
|
)
|
|
# Verifying size of the list is 1
|
|
self.assertEquals(
|
|
1,
|
|
len(list_securitygroups_byid),
|
|
"Count of the listing security group by id is not matching"
|
|
)
|
|
securitygroup_egress = list_securitygroups_byid[0].egressrule
|
|
# Verifying that Ingress rule is empty(revoked)
|
|
status = validateList(securitygroup_egress)
|
|
self.assertEquals(
|
|
EMPTY_LIST,
|
|
status[2],
|
|
"Security Groups Egress rule is not revoked"
|
|
)
|
|
return
|