mirror of
				https://github.com/apache/cloudstack.git
				synced 2025-10-26 08:42:29 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			247 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			247 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Licensed to the Apache Software Foundation (ASF) under one
 | |
| # or more contributor license agreements.  See the NOTICE file
 | |
| # distributed with this work for additional information
 | |
| # regarding copyright ownership.  The ASF licenses this file
 | |
| # to you under the Apache License, Version 2.0 (the
 | |
| # "License"); you may not use this file except in compliance
 | |
| # with the License.  You may obtain a copy of the License at
 | |
| #
 | |
| #   http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing,
 | |
| # software distributed under the License is distributed on an
 | |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 | |
| # KIND, either express or implied.  See the License for the
 | |
| # specific language governing permissions and limitations
 | |
| # under the License.
 | |
| from marvin.cloudstackTestCase import cloudstackTestCase
 | |
| from marvin.lib.utils import cleanup_resources
 | |
| from marvin.lib.base import (Network, NetworkACLList, NetworkOffering, VpcOffering, VPC, NetworkACL)
 | |
| from marvin.lib.common import (get_domain, get_zone)
 | |
| from nose.plugins.attrib import attr
 | |
| from marvin.cloudstackException import CloudstackAPIException
 | |
| 
 | |
| 
 | |
| class Services:
 | |
|     """Test Global ACLs
 | |
|     """
 | |
| 
 | |
|     def __init__(self):
 | |
|         self.services = {
 | |
|             "root_domain": {
 | |
|                 "name": "ROOT",
 | |
|             },
 | |
|             "domain": {
 | |
|                 "name": "Domain",
 | |
|             },
 | |
|             "user": {
 | |
|                 "username": "user",
 | |
|                 "roletype": 0,
 | |
|             },
 | |
|             "domain_admin": {
 | |
|                 "username": "Domain admin",
 | |
|                 "roletype": 2,
 | |
|             },
 | |
|             "root_admin": {
 | |
|                 "username": "Root admin",
 | |
|                 "roletype": 1,
 | |
|             },
 | |
|             "vpc": {
 | |
|                 "name": "vpc-networkacl",
 | |
|                 "displaytext": "vpc-networkacl",
 | |
|                 "cidr": "10.1.1.0/24",
 | |
|             },
 | |
|             "vpcnetwork": {
 | |
|                 "name": "vpcnetwork",
 | |
|                 "displaytext": "vpcnetwork",
 | |
|             },
 | |
|             "rule": {
 | |
|                 "protocol": "all",
 | |
|                 "traffictype": "ingress",
 | |
|             }
 | |
|         }
 | |
| 
 | |
| 
 | |
| class TestGlobalACLs(cloudstackTestCase):
 | |
| 
 | |
|     @classmethod
 | |
|     def setUpClass(cls):
 | |
|         cls.testClient = super(TestGlobalACLs, cls).getClsTestClient()
 | |
|         cls.apiclient = cls.testClient.getApiClient()
 | |
| 
 | |
|         cls.services = Services().services
 | |
|         cls.domain = get_domain(cls.apiclient)
 | |
|         cls.zone = get_zone(cls.apiclient, cls.testClient.getZoneForTests())
 | |
|         return
 | |
| 
 | |
|     def setUp(self):
 | |
|         self.user_apiclient = self.testClient.getUserApiClient(self.services["user"]["username"],
 | |
|                                                                self.services["domain"]["name"],
 | |
|                                                                self.services["user"]["roletype"])
 | |
| 
 | |
|         self.domain_admin_apiclient = self.testClient.getUserApiClient(self.services["domain_admin"]["username"],
 | |
|                                                                        self.services["domain"]["name"],
 | |
|                                                                        self.services["domain_admin"]["roletype"])
 | |
| 
 | |
|         self.admin_apiclient = self.testClient.getUserApiClient(self.services["root_admin"]["username"],
 | |
|                                                                 self.services["root_domain"]["name"],
 | |
|                                                                 self.services["root_admin"]["roletype"])
 | |
| 
 | |
|         self.cleanup = []
 | |
|         return
 | |
| 
 | |
|     def tearDown(self):
 | |
|         super(TestGlobalACLs, self).tearDown()
 | |
| 
 | |
|     @attr(tags=["advanced", "basic"], required_hardware="false")
 | |
|     def test_create_global_acl(self):
 | |
|         """ Test create global ACL as a normal user, domain admin and root admin users.
 | |
|         """
 | |
| 
 | |
|         self.debug("Creating ACL list as a normal user, should raise exception.")
 | |
|         self.assertRaisesRegex(CloudstackAPIException, "Only Root Admin can create global ACLs.",
 | |
|                                NetworkACLList.create, apiclient=self.user_apiclient, services={},
 | |
|                                name="acl", description="acl")
 | |
| 
 | |
|         self.debug("Creating ACL list as a domain admin, should raise exception.")
 | |
|         self.assertRaisesRegex(CloudstackAPIException, "Only Root Admin can create global ACLs.",
 | |
|                                NetworkACLList.create, apiclient=self.domain_admin_apiclient, services={},
 | |
|                                name="acl", description="acl")
 | |
| 
 | |
|         self.debug("Creating ACL list as a root admin, should work.")
 | |
|         acl = NetworkACLList.create(apiclient=self.admin_apiclient, services={}, name="acl", description="acl")
 | |
|         self.cleanup.append(acl)
 | |
|         self.assertIsNotNone(acl, "A root admin user should be able to create a global ACL.")
 | |
| 
 | |
|         return
 | |
| 
 | |
|     @attr(tags=["advanced", "basic"], required_hardware="false")
 | |
|     def test_replace_acl_of_network(self):
 | |
|         """ Test to replace ACL of a VPC as a normal user, domain admin and root admin users.
 | |
|         """
 | |
|         # Get network offering
 | |
|         networkOffering = NetworkOffering.list(self.apiclient, name="DefaultIsolatedNetworkOfferingForVpcNetworks")
 | |
|         self.assertTrue(networkOffering is not None and len(networkOffering) > 0, "No VPC network offering")
 | |
| 
 | |
|         # Getting VPC offering
 | |
|         vpcOffering = VpcOffering.list(self.apiclient, name="Default VPC offering")
 | |
|         self.assertTrue(vpcOffering is not None and len(vpcOffering) > 0, "No VPC offerings found")
 | |
| 
 | |
|         # Creating VPC
 | |
|         vpc = VPC.create(
 | |
|             apiclient=self.apiclient,
 | |
|             services=self.services["vpc"],
 | |
|             networkDomain="vpc.networkacl",
 | |
|             vpcofferingid=vpcOffering[0].id,
 | |
|             zoneid=self.zone.id,
 | |
|             domainid=self.domain.id
 | |
|         )
 | |
|         self.cleanup.append(vpc)
 | |
|         self.assertTrue(vpc is not None, "VPC creation failed")
 | |
| 
 | |
|         # Creating ACL list
 | |
|         acl = NetworkACLList.create(apiclient=self.apiclient, services={}, name="acl", description="acl")
 | |
|         self.cleanup.append(acl)
 | |
| 
 | |
|         # Creating tier on VPC with ACL list
 | |
|         network = Network.create(
 | |
|             apiclient=self.apiclient,
 | |
|             services=self.services["vpcnetwork"],
 | |
|             accountid="Admin",
 | |
|             domainid=self.domain.id,
 | |
|             networkofferingid=networkOffering[0].id,
 | |
|             zoneid=self.zone.id,
 | |
|             vpcid=vpc.id,
 | |
|             aclid=acl.id,
 | |
|             gateway="10.1.1.1",
 | |
|             netmask="255.255.255.192"
 | |
|         )
 | |
|         self.cleanup.append(network)
 | |
| 
 | |
|         # User should be able to replace ACL
 | |
|         network.replaceACLList(apiclient=self.user_apiclient, aclid=acl.id)
 | |
|         # Domain Admin should be able to replace ACL
 | |
|         network.replaceACLList(apiclient=self.domain_admin_apiclient, aclid=acl.id)
 | |
|         # Admin should be able to replace ACL
 | |
|         network.replaceACLList(apiclient=self.admin_apiclient, aclid=acl.id)
 | |
| 
 | |
|         return
 | |
| 
 | |
|     @attr(tags=["advanced", "basic"], required_hardware="false")
 | |
|     def test_create_acl_rule(self):
 | |
|         """ Test to create ACL rule as a normal user, domain admin and root admin users.
 | |
|         """
 | |
|         # Creating ACL list
 | |
|         acl = NetworkACLList.create(apiclient=self.admin_apiclient, services={}, name="acl", description="acl")
 | |
|         self.cleanup.append(acl)
 | |
| 
 | |
|         self.debug("Creating ACL rule as a user, should raise exception.")
 | |
|         self.assertRaisesRegex(CloudstackAPIException, "Only Root Admins can create rules for a global ACL.",
 | |
|                                NetworkACL.create, self.user_apiclient, services=self.services["rule"], aclid=acl.id)
 | |
|         self.debug("Creating ACL rule as a domain admin, should raise exception.")
 | |
|         self.assertRaisesRegex(CloudstackAPIException, "Only Root Admins can create rules for a global ACL.",
 | |
|                                NetworkACL.create, self.domain_admin_apiclient, services=self.services["rule"], aclid=acl.id)
 | |
|         self.debug("Creating ACL rule as a root admin, should work.")
 | |
|         acl_rule = NetworkACL.create(self.admin_apiclient, services=self.services["rule"], aclid=acl.id)
 | |
|         self.cleanup.append(acl_rule)
 | |
| 
 | |
|         return
 | |
| 
 | |
|     @attr(tags=["advanced", "basic"], required_hardware="false")
 | |
|     def test_delete_acl_rule(self):
 | |
|         """ Test to delete ACL rule as a normal user, domain admin and root admin users.
 | |
|         """
 | |
|         # Creating ACL list
 | |
|         acl = NetworkACLList.create(apiclient=self.apiclient, services={}, name="acl", description="acl")
 | |
|         self.cleanup.append(acl)
 | |
| 
 | |
|         # Creating ACL rule
 | |
|         acl_rule = NetworkACL.create(self.apiclient, services=self.services["rule"], aclid=acl.id)
 | |
|         self.cleanup.append(acl_rule)
 | |
| 
 | |
|         self.debug("Deleting ACL rule as a user, should raise exception.")
 | |
|         self.assertRaisesRegex(Exception, "Only Root Admin can delete global ACL rules.",
 | |
|                                NetworkACL.delete, acl_rule, self.user_apiclient)
 | |
|         self.debug("Deleting ACL rule as a domain admin, should raise exception.")
 | |
|         self.assertRaisesRegex(Exception, "Only Root Admin can delete global ACL rules.",
 | |
|                                NetworkACL.delete, acl_rule, self.domain_admin_apiclient)
 | |
| 
 | |
|         self.debug("Deleting ACL rule as a root admin, should work.")
 | |
|         NetworkACL.delete(acl_rule, self.admin_apiclient)
 | |
|         self.cleanup.remove(acl_rule)
 | |
| 
 | |
|         # Verify if the number of ACL rules is equal to four, i.e. the number of rules
 | |
|         # for the default ACLs `default_allow` (2 rules) and `default_deny` (2 rules) ACLs
 | |
|         number_of_acl_rules = acl_rule.list(apiclient=self.admin_apiclient)
 | |
|         self.assertEqual(len(number_of_acl_rules), 4)
 | |
| 
 | |
|         return
 | |
| 
 | |
| 
 | |
|     @attr(tags=["advanced", "basic"], required_hardware="false")
 | |
|     def test_delete_global_acl(self):
 | |
|         """ Test delete global ACL as a normal user, domain admin and root admin users.
 | |
|         """
 | |
| 
 | |
|         # Creating ACL list. Not adding to cleanup as it will be deleted in this method
 | |
|         acl = NetworkACLList.create(apiclient=self.apiclient, services={}, name="acl", description="acl")
 | |
|         self.cleanup.append(acl)
 | |
| 
 | |
|         self.debug("Deleting ACL list as a normal user, should raise exception.")
 | |
|         self.assertRaisesRegex(Exception, "Only Root Admin can delete global ACLs.",
 | |
|                                NetworkACLList.delete, acl, apiclient=self.user_apiclient)
 | |
| 
 | |
|         self.debug("Deleting ACL list as a domain admin, should raise exception.")
 | |
|         self.assertRaisesRegex(Exception, "Only Root Admin can delete global ACLs.",
 | |
|                                NetworkACLList.delete, acl, apiclient=self.domain_admin_apiclient)
 | |
| 
 | |
|         self.debug("Deleting ACL list as a root admin, should work.")
 | |
|         acl.delete(apiclient=self.admin_apiclient)
 | |
|         self.cleanup.remove(acl)
 | |
| 
 | |
|         # Verify if number of ACLs is equal to two, i.e. the number of default ACLs `default_allow` and `default_deny`
 | |
|         number_of_acls = NetworkACLList.list(apiclient=self.admin_apiclient)
 | |
|         self.assertEqual(len(number_of_acls), 2)
 | |
| 
 | |
|         return
 |