mirror of
				https://github.com/apache/cloudstack.git
				synced 2025-11-04 00:02:37 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			708 lines
		
	
	
		
			25 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			708 lines
		
	
	
		
			25 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# Licensed to the Apache Software Foundation (ASF) under one
 | 
						|
# or more contributor license agreements.  See the NOTICE file
 | 
						|
# distributed with this work for additional information
 | 
						|
# regarding copyright ownership.  The ASF licenses this file
 | 
						|
# to you under the Apache License, Version 2.0 (the
 | 
						|
# "License"); you may not use this file except in compliance
 | 
						|
# with the License.  You may obtain a copy of the License at
 | 
						|
#
 | 
						|
#   http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing,
 | 
						|
# software distributed under the License is distributed on an
 | 
						|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 | 
						|
# KIND, either express or implied.  See the License for the
 | 
						|
# specific language governing permissions and limitations
 | 
						|
# under the License.
 | 
						|
 | 
						|
from marvin.cloudstackAPI import *
 | 
						|
from marvin.cloudstackTestCase import cloudstackTestCase
 | 
						|
from marvin.cloudstackException import CloudstackAPIException
 | 
						|
from marvin.lib.base import Account, Role, RolePermission, Configurations
 | 
						|
from marvin.lib.utils import cleanup_resources
 | 
						|
from nose.plugins.attrib import attr
 | 
						|
from random import shuffle
 | 
						|
 | 
						|
import copy
 | 
						|
import random
 | 
						|
import re
 | 
						|
import time
 | 
						|
 | 
						|
 | 
						|
class TestData(object):
 | 
						|
    """Test data object that is required to create resources
 | 
						|
    """
 | 
						|
    def __init__(self):
 | 
						|
        self.testdata = {
 | 
						|
            "account": {
 | 
						|
                "email": "mtu@test.cloud",
 | 
						|
                "firstname": "Marvin",
 | 
						|
                "lastname": "TestUser",
 | 
						|
                "username": "roletest",
 | 
						|
                "password": "password",
 | 
						|
            },
 | 
						|
            "role": {
 | 
						|
                "name": "MarvinFake Role ",
 | 
						|
                "type": "User",
 | 
						|
                "description": "Fake Role created by Marvin test"
 | 
						|
            },
 | 
						|
            "importrole": {
 | 
						|
                "name": "MarvinFake Import Role ",
 | 
						|
                "type": "User",
 | 
						|
                "description": "Fake Import User Role created by Marvin test",
 | 
						|
                "rules" : [{"rule":"list*", "permission":"allow","description":"Listing apis"},
 | 
						|
                           {"rule":"get*", "permission":"allow","description":"Get apis"},
 | 
						|
                           {"rule":"update*", "permission":"deny","description":"Update apis"}]
 | 
						|
            },
 | 
						|
            "roleadmin": {
 | 
						|
                "name": "MarvinFake Admin Role ",
 | 
						|
                "type": "Admin",
 | 
						|
                "description": "Fake Admin Role created by Marvin test"
 | 
						|
            },
 | 
						|
            "roledomainadmin": {
 | 
						|
                "name": "MarvinFake DomainAdmin Role ",
 | 
						|
                "type": "DomainAdmin",
 | 
						|
                "description": "Fake Domain-Admin Role created by Marvin test"
 | 
						|
            },
 | 
						|
            "rolepermission": {
 | 
						|
                "roleid": 1,
 | 
						|
                "rule": "listVirtualMachines",
 | 
						|
                "permission": "allow",
 | 
						|
                "description": "Fake role permission created by Marvin test"
 | 
						|
            },
 | 
						|
            "apiConfig": {
 | 
						|
                "listApis": "allow",
 | 
						|
                "listAccounts": "allow",
 | 
						|
                "listClusters": "deny",
 | 
						|
                "*VM*": "allow",
 | 
						|
                "*Host*": "deny"
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
 | 
						|
class TestDynamicRoles(cloudstackTestCase):
 | 
						|
    """Tests dynamic role and role permission management in CloudStack
 | 
						|
    """
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        self.apiclient = self.testClient.getApiClient()
 | 
						|
        self.dbclient = self.testClient.getDbConnection()
 | 
						|
        self.testdata = TestData().testdata
 | 
						|
 | 
						|
        feature_enabled = self.apiclient.listCapabilities(listCapabilities.listCapabilitiesCmd()).dynamicrolesenabled
 | 
						|
        if not feature_enabled:
 | 
						|
            self.skipTest("Dynamic Role-Based API checker not enabled, skipping test")
 | 
						|
 | 
						|
        self.testdata["role"]["name"] += self.getRandomString()
 | 
						|
        self.role = Role.create(
 | 
						|
            self.apiclient,
 | 
						|
            self.testdata["role"]
 | 
						|
        )
 | 
						|
 | 
						|
        self.testdata["rolepermission"]["roleid"] = self.role.id
 | 
						|
        self.rolepermission = RolePermission.create(
 | 
						|
            self.apiclient,
 | 
						|
            self.testdata["rolepermission"]
 | 
						|
        )
 | 
						|
 | 
						|
        self.account = Account.create(
 | 
						|
            self.apiclient,
 | 
						|
            self.testdata["account"],
 | 
						|
            roleid=self.role.id
 | 
						|
        )
 | 
						|
 | 
						|
        cache_period_config = Configurations.list(
 | 
						|
            self.apiclient,
 | 
						|
            name='dynamic.apichecker.cache.period'
 | 
						|
        )[0]
 | 
						|
 | 
						|
        self.cache_period = int(cache_period_config.value)
 | 
						|
 | 
						|
        self.cleanup = [
 | 
						|
            self.account,
 | 
						|
            self.rolepermission,
 | 
						|
            self.role
 | 
						|
        ]
 | 
						|
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        try:
 | 
						|
           cleanup_resources(self.apiclient, self.cleanup)
 | 
						|
        except Exception as e:
 | 
						|
            self.debug("Warning! Exception in tearDown: %s" % e)
 | 
						|
 | 
						|
 | 
						|
    def translateRoleToAccountType(self, role_type):
 | 
						|
        if role_type == "User":
 | 
						|
            return 0
 | 
						|
        elif role_type == "Admin":
 | 
						|
            return 1
 | 
						|
        elif role_type == "DomainAdmin":
 | 
						|
            return 2
 | 
						|
        elif role_type == "ResourceAdmin":
 | 
						|
            return 3
 | 
						|
        return -1
 | 
						|
 | 
						|
 | 
						|
    def getUserApiClient(self, username, domain='ROOT', role_type='User'):
 | 
						|
        self.user_apiclient = self.testClient.getUserApiClient(UserName=username, DomainName='ROOT', type=self.translateRoleToAccountType(role_type))
 | 
						|
        return self.user_apiclient
 | 
						|
 | 
						|
 | 
						|
    def getRandomString(self):
 | 
						|
        return "".join(random.choice("abcdefghijklmnopqrstuvwxyz0123456789") for _ in range(10))
 | 
						|
 | 
						|
 | 
						|
    def getRandomRoleName(self):
 | 
						|
        return "MarvinFakeRoleNewName-" + self.getRandomString()
 | 
						|
 | 
						|
 | 
						|
    @attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
 | 
						|
    def test_role_lifecycle_list(self):
 | 
						|
        """
 | 
						|
            Tests that default four roles exist
 | 
						|
        """
 | 
						|
        roleTypes = {1: "Admin", 2: "ResourceAdmin", 3: "DomainAdmin", 4: "User"}
 | 
						|
        for idx in range(1,5):
 | 
						|
            list_roles = Role.list(self.apiclient, id=idx)
 | 
						|
            self.assertEqual(
 | 
						|
                isinstance(list_roles, list),
 | 
						|
                True,
 | 
						|
                "List Roles response was not a valid list"
 | 
						|
            )
 | 
						|
            self.assertEqual(
 | 
						|
                len(list_roles),
 | 
						|
                1,
 | 
						|
                "List Roles response size was not 1"
 | 
						|
            )
 | 
						|
            self.assertEqual(
 | 
						|
                list_roles[0].type,
 | 
						|
                roleTypes[idx],
 | 
						|
                msg="Default role type differs from expectation"
 | 
						|
            )
 | 
						|
 | 
						|
 | 
						|
    @attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
 | 
						|
    def test_role_lifecycle_create(self):
 | 
						|
        """
 | 
						|
            Tests normal lifecycle operations for roles
 | 
						|
        """
 | 
						|
        # Reuse self.role created in setUp()
 | 
						|
        try:
 | 
						|
            role = Role.create(
 | 
						|
                self.apiclient,
 | 
						|
                self.testdata["role"]
 | 
						|
            )
 | 
						|
            self.fail("An exception was expected when creating duplicate roles")
 | 
						|
        except CloudstackAPIException: pass
 | 
						|
 | 
						|
        list_roles = Role.list(self.apiclient, id=self.role.id)
 | 
						|
        self.assertEqual(
 | 
						|
            isinstance(list_roles, list),
 | 
						|
            True,
 | 
						|
            "List Roles response was not a valid list"
 | 
						|
        )
 | 
						|
        self.assertEqual(
 | 
						|
            len(list_roles),
 | 
						|
            1,
 | 
						|
            "List Roles response size was not 1"
 | 
						|
        )
 | 
						|
        self.assertEqual(
 | 
						|
            list_roles[0].name,
 | 
						|
            self.testdata["role"]["name"],
 | 
						|
            msg="Role name does not match the test data"
 | 
						|
        )
 | 
						|
        self.assertEqual(
 | 
						|
            list_roles[0].type,
 | 
						|
            self.testdata["role"]["type"],
 | 
						|
            msg="Role type does not match the test data"
 | 
						|
        )
 | 
						|
 | 
						|
    @attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
 | 
						|
    def test_role_lifecycle_clone(self):
 | 
						|
        """
 | 
						|
            Tests create role from existing role
 | 
						|
        """
 | 
						|
        # Use self.role created in setUp()
 | 
						|
        role_to_be_cloned = {
 | 
						|
            "name": "MarvinFake Clone Role ",
 | 
						|
            "roleid": self.role.id,
 | 
						|
            "description": "Fake Role cloned by Marvin test"
 | 
						|
        }
 | 
						|
 | 
						|
        try:
 | 
						|
            role_cloned = Role.create(
 | 
						|
                self.apiclient,
 | 
						|
                role_to_be_cloned
 | 
						|
            )
 | 
						|
            self.cleanup.append(role_cloned)
 | 
						|
        except CloudstackAPIException as e:
 | 
						|
            self.fail("Failed to create the role: %s" % e)
 | 
						|
 | 
						|
        list_role_cloned= Role.list(self.apiclient, id=role_cloned.id)
 | 
						|
        self.assertEqual(
 | 
						|
            isinstance(list_role_cloned, list),
 | 
						|
            True,
 | 
						|
            "List Roles response was not a valid list"
 | 
						|
        )
 | 
						|
        self.assertEqual(
 | 
						|
            len(list_role_cloned),
 | 
						|
            1,
 | 
						|
            "List Roles response size was not 1"
 | 
						|
        )
 | 
						|
        self.assertEqual(
 | 
						|
            list_role_cloned[0].name,
 | 
						|
            role_to_be_cloned["name"],
 | 
						|
            msg="Role name does not match the test data"
 | 
						|
        )
 | 
						|
        self.assertEqual(
 | 
						|
            list_role_cloned[0].type,
 | 
						|
            self.testdata["role"]["type"],
 | 
						|
            msg="Role type does not match the test data"
 | 
						|
        )
 | 
						|
 | 
						|
        list_rolepermissions = RolePermission.list(self.apiclient, roleid=self.role.id)
 | 
						|
        self.validate_permissions_list(list_rolepermissions, role_cloned.id)
 | 
						|
 | 
						|
    @attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
 | 
						|
    def test_role_lifecycle_import(self):
 | 
						|
        """
 | 
						|
            Tests import role with the rules
 | 
						|
        """
 | 
						|
        # use importrole from testdata
 | 
						|
        self.testdata["importrole"]["name"] += self.getRandomString()
 | 
						|
        try:
 | 
						|
            role_imported = Role.importRole(
 | 
						|
                self.apiclient,
 | 
						|
                self.testdata["importrole"]
 | 
						|
            )
 | 
						|
            self.cleanup.append(role_imported)
 | 
						|
        except CloudstackAPIException as e:
 | 
						|
            self.fail("Failed to import the role: %s" % e)
 | 
						|
 | 
						|
        list_role_imported = Role.list(self.apiclient, id=role_imported.id)
 | 
						|
        self.assertEqual(
 | 
						|
            isinstance(list_role_imported, list),
 | 
						|
            True,
 | 
						|
            "List Roles response was not a valid list"
 | 
						|
        )
 | 
						|
        self.assertEqual(
 | 
						|
            len(list_role_imported),
 | 
						|
            1,
 | 
						|
            "List Roles response size was not 1"
 | 
						|
        )
 | 
						|
        self.assertEqual(
 | 
						|
            list_role_imported[0].name,
 | 
						|
            self.testdata["importrole"]["name"],
 | 
						|
            msg="Role name does not match the test data"
 | 
						|
        )
 | 
						|
        self.assertEqual(
 | 
						|
            list_role_imported[0].type,
 | 
						|
            self.testdata["importrole"]["type"],
 | 
						|
            msg="Role type does not match the test data"
 | 
						|
        )
 | 
						|
 | 
						|
        self.validate_permissions_dict(self.testdata["importrole"]["rules"], role_imported.id)
 | 
						|
 | 
						|
    @attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
 | 
						|
    def test_role_lifecycle_update(self):
 | 
						|
        """
 | 
						|
            Tests role update
 | 
						|
        """
 | 
						|
        self.account.delete(self.apiclient)
 | 
						|
        new_role_name = self.getRandomRoleName()
 | 
						|
        new_role_description = "Fake role description created after update"
 | 
						|
        self.role.update(self.apiclient, name=new_role_name, type='Admin', description=new_role_description)
 | 
						|
        update_role = Role.list(self.apiclient, id=self.role.id)[0]
 | 
						|
        self.assertEqual(
 | 
						|
            update_role.name,
 | 
						|
            new_role_name,
 | 
						|
            msg="Role name does not match updated role name"
 | 
						|
        )
 | 
						|
        self.assertEqual(
 | 
						|
            update_role.type,
 | 
						|
            'Admin',
 | 
						|
            msg="Role type does not match updated role type"
 | 
						|
        )
 | 
						|
        self.assertEqual(
 | 
						|
            update_role.description,
 | 
						|
            new_role_description,
 | 
						|
            msg="Role description does not match updated role description"
 | 
						|
            )
 | 
						|
 | 
						|
    @attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
 | 
						|
    def test_role_lifecycle_update_role_inuse(self):
 | 
						|
        """
 | 
						|
            Tests role update when role is in use by an account
 | 
						|
        """
 | 
						|
        new_role_name = self.getRandomRoleName()
 | 
						|
        try:
 | 
						|
            self.role.update(self.apiclient, name=new_role_name, type='Admin')
 | 
						|
            self.fail("Updation of role type is not allowed when role is in use")
 | 
						|
        except CloudstackAPIException: pass
 | 
						|
 | 
						|
        self.role.update(self.apiclient, name=new_role_name)
 | 
						|
        update_role = Role.list(self.apiclient, id=self.role.id)[0]
 | 
						|
        self.assertEqual(
 | 
						|
            update_role.name,
 | 
						|
            new_role_name,
 | 
						|
            msg="Role name does not match updated role name"
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
    @attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
 | 
						|
    def test_role_lifecycle_delete(self):
 | 
						|
        """
 | 
						|
            Tests role update
 | 
						|
        """
 | 
						|
        self.account.delete(self.apiclient)
 | 
						|
        self.role.delete(self.apiclient)
 | 
						|
        list_roles = Role.list(self.apiclient, id=self.role.id)
 | 
						|
        self.assertEqual(
 | 
						|
            list_roles,
 | 
						|
            None,
 | 
						|
            "List Roles response should be empty"
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
    @attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
 | 
						|
    def test_role_inuse_deletion(self):
 | 
						|
        """
 | 
						|
            Test to ensure role in use cannot be deleted
 | 
						|
        """
 | 
						|
        try:
 | 
						|
            self.role.delete(self.apiclient)
 | 
						|
            self.fail("Role with any account should not be allowed to be deleted")
 | 
						|
        except CloudstackAPIException: pass
 | 
						|
 | 
						|
 | 
						|
    @attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
 | 
						|
    def test_default_role_deletion(self):
 | 
						|
        """
 | 
						|
            Test to ensure 4 default roles cannot be deleted
 | 
						|
        """
 | 
						|
        for idx in range(1,5):
 | 
						|
            cmd = deleteRole.deleteRoleCmd()
 | 
						|
            cmd.id = idx
 | 
						|
            try:
 | 
						|
                self.apiclient.deleteRole(cmd)
 | 
						|
                self.fail("Default role got deleted with id: " + idx)
 | 
						|
            except CloudstackAPIException: pass
 | 
						|
 | 
						|
 | 
						|
    @attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
 | 
						|
    def test_rolepermission_lifecycle_list(self):
 | 
						|
        """
 | 
						|
            Tests listing of default role's permission
 | 
						|
        """
 | 
						|
        for idx in range(1,5):
 | 
						|
            list_rolepermissions = RolePermission.list(self.apiclient, roleid=idx)
 | 
						|
            self.assertEqual(
 | 
						|
                isinstance(list_rolepermissions, list),
 | 
						|
                True,
 | 
						|
                "List rolepermissions response was not a valid list"
 | 
						|
            )
 | 
						|
            self.assertTrue(
 | 
						|
                len(list_rolepermissions) > 0,
 | 
						|
                "List rolepermissions response was empty"
 | 
						|
            )
 | 
						|
 | 
						|
 | 
						|
    @attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
 | 
						|
    def test_rolepermission_lifecycle_create(self):
 | 
						|
        """
 | 
						|
            Tests creation of role permission
 | 
						|
        """
 | 
						|
        # Reuse self.rolepermission created in setUp()
 | 
						|
        try:
 | 
						|
            rolepermission = RolePermission.create(
 | 
						|
                self.apiclient,
 | 
						|
                self.testdata["rolepermission"]
 | 
						|
            )
 | 
						|
            self.fail("An exception was expected when creating duplicate role permissions")
 | 
						|
        except CloudstackAPIException: pass
 | 
						|
 | 
						|
        list_rolepermissions = RolePermission.list(self.apiclient, roleid=self.role.id)
 | 
						|
        self.assertEqual(
 | 
						|
            isinstance(list_rolepermissions, list),
 | 
						|
            True,
 | 
						|
            "List rolepermissions response was not a valid list"
 | 
						|
        )
 | 
						|
        self.assertNotEqual(
 | 
						|
            len(list_rolepermissions),
 | 
						|
            0,
 | 
						|
            "List rolepermissions response was empty"
 | 
						|
        )
 | 
						|
        self.assertEqual(
 | 
						|
            list_rolepermissions[0].rule,
 | 
						|
            self.testdata["rolepermission"]["rule"],
 | 
						|
            msg="Role permission rule does not match the test data"
 | 
						|
        )
 | 
						|
        self.assertEqual(
 | 
						|
            list_rolepermissions[0].permission,
 | 
						|
            self.testdata["rolepermission"]["permission"],
 | 
						|
            msg="Role permission permission-type does not match the test data"
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
    @attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
 | 
						|
    def test_rolepermission_lifecycle_update(self):
 | 
						|
        """
 | 
						|
            Tests order updation of role permission
 | 
						|
        """
 | 
						|
        permissions = [self.rolepermission]
 | 
						|
        rules = ['list*', '*Vol*', 'listCapabilities']
 | 
						|
        for rule in rules:
 | 
						|
            data = copy.deepcopy(self.testdata["rolepermission"])
 | 
						|
            data['rule'] = rule
 | 
						|
            permission = RolePermission.create(
 | 
						|
                self.apiclient,
 | 
						|
                data
 | 
						|
            )
 | 
						|
            self.cleanup.append(permission)
 | 
						|
            permissions.append(permission)
 | 
						|
 | 
						|
        # Move last item to the top
 | 
						|
        rule = permissions.pop(len(permissions)-1)
 | 
						|
        permissions = [rule] + permissions
 | 
						|
        rule.update(self.apiclient, ruleorder=",".join([x.id for x in permissions]))
 | 
						|
        self.validate_permissions_list(permissions, self.role.id)
 | 
						|
 | 
						|
        # Move to the bottom
 | 
						|
        rule = permissions.pop(0)
 | 
						|
        permissions = permissions + [rule]
 | 
						|
        rule.update(self.apiclient, ruleorder=",".join([x.id for x in permissions]))
 | 
						|
        self.validate_permissions_list(permissions, self.role.id)
 | 
						|
 | 
						|
        # Random shuffles
 | 
						|
        for _ in range(3):
 | 
						|
            shuffle(permissions)
 | 
						|
            rule.update(self.apiclient, ruleorder=",".join([x.id for x in permissions]))
 | 
						|
            self.validate_permissions_list(permissions, self.role.id)
 | 
						|
 | 
						|
    @attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
 | 
						|
    def test_rolepermission_lifecycle_update_permission(self):
 | 
						|
        """
 | 
						|
            Tests update of Allow to Deny permission of a rule
 | 
						|
        """
 | 
						|
        permissions = [self.rolepermission]
 | 
						|
 | 
						|
        rule = permissions.pop(0)
 | 
						|
        rule.update(self.apiclient, ruleid=rule.id, permission='deny')
 | 
						|
 | 
						|
        list_rolepermissions = RolePermission.list(self.apiclient, roleid=self.role.id)
 | 
						|
        self.assertEqual(
 | 
						|
            list_rolepermissions[0].permission,
 | 
						|
            'deny',
 | 
						|
            msg="List of role permissions do not match created list of permissions"
 | 
						|
        )
 | 
						|
 | 
						|
        rule.update(self.apiclient, ruleid=rule.id, permission='allow')
 | 
						|
 | 
						|
        list_rolepermissions = RolePermission.list(self.apiclient, roleid=self.role.id)
 | 
						|
        self.assertEqual(
 | 
						|
            list_rolepermissions[0].permission,
 | 
						|
            'allow',
 | 
						|
            msg="List of role permissions do not match created list of permissions"
 | 
						|
        )
 | 
						|
 | 
						|
    @attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
 | 
						|
    def test_rolepermission_lifecycle_update_permission_negative(self):
 | 
						|
        """
 | 
						|
            Tests negative test for setting incorrect value as permission
 | 
						|
        """
 | 
						|
        permissions = [self.rolepermission]
 | 
						|
 | 
						|
        rule = permissions.pop(0)
 | 
						|
        try:
 | 
						|
            rule.update(self.apiclient, ruleid=rule.id, permission='some_other_value')
 | 
						|
        except Exception:
 | 
						|
            pass
 | 
						|
        else:
 | 
						|
            self.fail("Negative test: Setting permission to 'some_other_value' should not be successful, failing")
 | 
						|
 | 
						|
    @attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
 | 
						|
    def test_rolepermission_lifecycle_concurrent_updates(self):
 | 
						|
        """
 | 
						|
            Tests concurrent order updation of role permission
 | 
						|
        """
 | 
						|
        permissions = [self.rolepermission]
 | 
						|
        rules = ['list*', '*Vol*', 'listCapabilities']
 | 
						|
        for rule in rules:
 | 
						|
            data = copy.deepcopy(self.testdata["rolepermission"])
 | 
						|
            data['rule'] = rule
 | 
						|
            permission = RolePermission.create(
 | 
						|
                self.apiclient,
 | 
						|
                data
 | 
						|
            )
 | 
						|
            self.cleanup.append(permission)
 | 
						|
            permissions.append(permission)
 | 
						|
 | 
						|
 | 
						|
        # The following rule is considered to be created by another mgmt server
 | 
						|
        data = copy.deepcopy(self.testdata["rolepermission"])
 | 
						|
        data['rule'] = "someRule*"
 | 
						|
        permission = RolePermission.create(
 | 
						|
            self.apiclient,
 | 
						|
            data
 | 
						|
        )
 | 
						|
        self.cleanup.append(permission)
 | 
						|
 | 
						|
        shuffle(permissions)
 | 
						|
        try:
 | 
						|
            permission.update(self.apiclient, ruleorder=",".join([x.id for x in permissions]))
 | 
						|
            self.fail("Reordering should fail in case of concurrent updates by other user")
 | 
						|
        except CloudstackAPIException: pass
 | 
						|
 | 
						|
 | 
						|
    @attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
 | 
						|
    def test_rolepermission_lifecycle_delete(self):
 | 
						|
        """
 | 
						|
            Tests deletion of role permission
 | 
						|
        """
 | 
						|
        permission = self.cleanup.pop(1)
 | 
						|
        permission.delete(self.apiclient)
 | 
						|
        list_rolepermissions = RolePermission.list(self.apiclient, roleid=self.role.id)
 | 
						|
        self.assertEqual(
 | 
						|
            list_rolepermissions,
 | 
						|
            None,
 | 
						|
            "List rolepermissions response should be empty"
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
    def checkApiAvailability(self, apiConfig, userApiClient):
 | 
						|
        """
 | 
						|
            Checks available APIs based on api map
 | 
						|
        """
 | 
						|
        response = userApiClient.listApis(listApis.listApisCmd())
 | 
						|
        allowedApis = [x.name for x in response]
 | 
						|
        for api in allowedApis:
 | 
						|
            for rule, perm in list(apiConfig.items()):
 | 
						|
                if re.match(rule.replace('*', '.*'), api):
 | 
						|
                    if perm.lower() == 'allow':
 | 
						|
                        break
 | 
						|
                    else:
 | 
						|
                        self.fail('Denied API found to be allowed: ' + api)
 | 
						|
 | 
						|
 | 
						|
    def checkApiCall(self, apiConfig, userApiClient):
 | 
						|
        """
 | 
						|
            Performs actual API calls to verify API ACLs
 | 
						|
        """
 | 
						|
        list_accounts = userApiClient.listAccounts(listAccounts.listAccountsCmd())
 | 
						|
        self.assertEqual(
 | 
						|
            isinstance(list_accounts, list),
 | 
						|
            True,
 | 
						|
            "List accounts response was not a valid list"
 | 
						|
        )
 | 
						|
        self.assertNotEqual(
 | 
						|
            len(list_accounts),
 | 
						|
            0,
 | 
						|
            "List accounts response was empty"
 | 
						|
        )
 | 
						|
 | 
						|
        # Perform actual API call for deny API
 | 
						|
        try:
 | 
						|
            userApiClient.listHosts(listHosts.listHostsCmd())
 | 
						|
            self.fail("API call succeeded which is denied for the role")
 | 
						|
        except CloudstackAPIException: pass
 | 
						|
 | 
						|
        # Perform actual API call for API with no allow/deny rule
 | 
						|
        try:
 | 
						|
            userApiClient.listZones(listZones.listZonesCmd())
 | 
						|
            self.fail("API call succeeded which has no allow/deny rule for the role")
 | 
						|
        except CloudstackAPIException: pass
 | 
						|
 | 
						|
 | 
						|
 | 
						|
    @attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
 | 
						|
    def test_role_account_acls(self):
 | 
						|
        """
 | 
						|
            Test to check role, role permissions and account life cycles
 | 
						|
        """
 | 
						|
        apiConfig = self.testdata['apiConfig']
 | 
						|
        for api, perm in list(apiConfig.items()):
 | 
						|
            testdata = self.testdata['rolepermission']
 | 
						|
            testdata['roleid'] = self.role.id
 | 
						|
            testdata['rule'] = api
 | 
						|
            testdata['permission'] = perm.lower()
 | 
						|
 | 
						|
            RolePermission.create(
 | 
						|
                self.apiclient,
 | 
						|
                testdata
 | 
						|
            )
 | 
						|
 | 
						|
        time.sleep(self.cache_period + 5)
 | 
						|
 | 
						|
        userApiClient = self.getUserApiClient(self.account.name, domain=self.account.domain, role_type=self.account.roletype)
 | 
						|
 | 
						|
        # Perform listApis check
 | 
						|
        self.checkApiAvailability(apiConfig, userApiClient)
 | 
						|
 | 
						|
        # Perform actual API call for allow API
 | 
						|
        self.checkApiCall(apiConfig, userApiClient)
 | 
						|
 | 
						|
 | 
						|
    @attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
 | 
						|
    def test_role_account_acls_multiple_mgmt_servers(self):
 | 
						|
        """
 | 
						|
            Test for role-rule enforcement in case of multiple mgmt servers
 | 
						|
            Inserts rule directly in DB and checks expected behaviour
 | 
						|
        """
 | 
						|
        apiConfig = self.testdata["apiConfig"]
 | 
						|
        roleId = self.dbclient.execute("select id from roles where uuid='%s'" % self.role.id)[0][0]
 | 
						|
        sortOrder = 1
 | 
						|
        for rule, perm in list(apiConfig.items()):
 | 
						|
            self.dbclient.execute("insert into role_permissions (uuid, role_id, rule, permission, sort_order) values (UUID(), %d, '%s', '%s', %d)" % (roleId, rule, perm.upper(), sortOrder))
 | 
						|
            sortOrder += 1
 | 
						|
 | 
						|
        time.sleep(self.cache_period + 5)
 | 
						|
 | 
						|
        userApiClient = self.getUserApiClient(self.account.name, domain=self.account.domain, role_type=self.account.roletype)
 | 
						|
 | 
						|
        # Perform listApis check
 | 
						|
        self.checkApiAvailability(apiConfig, userApiClient)
 | 
						|
 | 
						|
        # Perform actual API call for allow API
 | 
						|
        self.checkApiCall(apiConfig, userApiClient)
 | 
						|
 | 
						|
    def validate_permissions_list(self, permissions, roleid):
 | 
						|
        list_rolepermissions = RolePermission.list(self.apiclient, roleid=roleid)
 | 
						|
        self.assertEqual(
 | 
						|
            len(list_rolepermissions),
 | 
						|
            len(permissions),
 | 
						|
            msg="List of role permissions do not match created list of permissions"
 | 
						|
        )
 | 
						|
 | 
						|
        for idx, rolepermission in enumerate(list_rolepermissions):
 | 
						|
            self.assertEqual(
 | 
						|
                rolepermission.rule,
 | 
						|
                permissions[idx].rule,
 | 
						|
                msg="Rule permission don't match with expected item at the index"
 | 
						|
            )
 | 
						|
            self.assertEqual(
 | 
						|
                rolepermission.permission,
 | 
						|
                permissions[idx].permission,
 | 
						|
                msg="Rule permission don't match with expected item at the index"
 | 
						|
            )
 | 
						|
 | 
						|
    def validate_permissions_dict(self, permissions, roleid):
 | 
						|
        list_rolepermissions = RolePermission.list(self.apiclient, roleid=roleid)
 | 
						|
        self.assertEqual(
 | 
						|
            len(list_rolepermissions),
 | 
						|
            len(permissions),
 | 
						|
            msg="List of role permissions do not match created list of permissions"
 | 
						|
        )
 | 
						|
 | 
						|
        for idx, rolepermission in enumerate(list_rolepermissions):
 | 
						|
            self.assertEqual(
 | 
						|
                rolepermission.rule,
 | 
						|
                permissions[idx]["rule"],
 | 
						|
                msg="Rule permission don't match with expected item at the index"
 | 
						|
            )
 | 
						|
            self.assertEqual(
 | 
						|
                rolepermission.permission,
 | 
						|
                permissions[idx]["permission"],
 | 
						|
                msg="Rule permission don't match with expected item at the index"
 | 
						|
            )
 |