# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """ BVT tests for Account User Access """ # Import Local Modules from marvin.cloudstackTestCase import cloudstackTestCase from marvin.lib.utils import * from marvin.lib.base import (Account, User, Domain) from marvin.lib.common import (get_domain) from marvin.cloudstackAPI import (getUserKeys) from marvin.cloudstackException import CloudstackAPIException from nose.plugins.attrib import attr _multiprocess_shared_ = True class TestAccountAccess(cloudstackTestCase): @classmethod def setUpClass(cls): testClient = super(TestAccountAccess, cls).getClsTestClient() cls.apiclient = testClient.getApiClient() cls.services = testClient.getParsedTestDataConfig() cls.hypervisor = testClient.getHypervisorInfo() cls._cleanup = [] # Get Zone, Domain and templates cls.domain = get_domain(cls.apiclient) cls.domains = [] cls.domain_admins = {} cls.domain_users = {} cls.account_users = {} domain_data = { "name": "domain_1" } cls.domain_1 = Domain.create( cls.apiclient, domain_data, ) cls._cleanup.append(cls.domain_1) cls.domains.append(cls.domain_1) domain_data["name"] = "domain_11" cls.domain_11 = Domain.create( cls.apiclient, domain_data, parentdomainid=cls.domain_1.id ) cls._cleanup.append(cls.domain_11) cls.domains.append(cls.domain_11) domain_data["name"] = "domain_12" cls.domain_12 = Domain.create( cls.apiclient, domain_data, parentdomainid=cls.domain_1.id ) cls._cleanup.append(cls.domain_12) cls.domains.append(cls.domain_12) domain_data["name"] = "domain_2" cls.domain_2 = Domain.create( cls.apiclient, domain_data, ) cls._cleanup.append(cls.domain_2) cls.domains.append(cls.domain_2) for d in cls.domains: cls.create_domainadmin_and_user(d) @classmethod def tearDownClass(cls): super(TestAccountAccess, cls).tearDownClass() @classmethod def create_account(cls, domain, is_admin): cls.debug(f"Creating account for domain {domain.name}, admin: {is_admin}") data = { "email": "admin-" + domain.name + "@test.com", "firstname": "Admin", "lastname": domain.name, "username": "admin-" + domain.name, "password": "password" } if is_admin == False: data["email"] = "user-" + domain.name + "@test.com" data["firstname"] = "User" data["username"] = "user-" + domain.name account = Account.create( cls.apiclient, data, admin=is_admin, domainid=domain.id ) cls._cleanup.append(account) if is_admin == True: cls.domain_admins[domain.id] = account else: cls.domain_users[domain.id] = account user = User.create( cls.apiclient, data, account=account.name, domainid=account.domainid) cls._cleanup.append(user) cls.account_users[account.id] = user @classmethod def create_domainadmin_and_user(cls, domain): cls.debug(f"Creating accounts for domain #{domain.id} {domain.name}") cls.create_account(domain, True) cls.create_account(domain, False) def get_user_keys(self, api_client, user_id): getUserKeysCmd = getUserKeys.getUserKeysCmd() getUserKeysCmd.id = user_id return api_client.getUserKeys(getUserKeysCmd) def is_child_domain(self, parent_domain, child_domain): if not parent_domain or not child_domain: return False parent_domain_prefix = parent_domain.split('-')[0] child_domain_prefix = child_domain.split('-')[0] if not parent_domain_prefix or not child_domain_prefix: return False return child_domain_prefix.startswith(parent_domain_prefix) @attr(tags=["advanced", "advancedns", "smoke", "sg"], required_hardware="false") def test_01_user_access(self): """ Test user account is not accessing any other account """ domain_user_accounts = [value for value in self.domain_users.values()] all_account_users = [value for value in self.account_users.values()] for user_account in domain_user_accounts: current_account_user = self.account_users[user_account.id] self.debug(f"Check for account {user_account.name} with user {current_account_user.username}") user_api_client = self.testClient.getUserApiClient( UserName=user_account.name, DomainName=user_account.domain ) for user in all_account_users: self.debug(f"Checking access for user {user.username} associated with account {user.account}") try: self.get_user_keys(user_api_client, user.id) self.debug(f"API successful") if user.id != current_account_user.id: self.fail(f"User account #{user_account.id} was able to access another account #{user.id}") except CloudstackAPIException as e: self.debug(f"Exception occurred: {e}") if user.id == current_account_user.id: self.fail(f"User account #{user_account.id} not able to access own account") @attr(tags=["advanced", "advancedns", "smoke", "sg"], required_hardware="false") def test_02_domain_admin_access(self): """ Test domain admin account is not accessing any other account from unauthorized domain """ domain_admin_accounts = [value for value in self.domain_admins.values()] all_account_users = [value for value in self.account_users.values()] for admin_account in domain_admin_accounts: current_account_user = self.account_users[admin_account.id] self.debug(f"Check for domain admin {admin_account.name} with user {current_account_user.username}, {current_account_user.domain}") admin_api_client = self.testClient.getUserApiClient( UserName=admin_account.name, DomainName=admin_account.domain ) for user in all_account_users: self.debug(f"Checking access for user {user.username}, {user.domain} associated with account {user.account}") try: self.get_user_keys(admin_api_client, user.id) self.debug(f"API successful") if self.is_child_domain(current_account_user.domain, user.domain) == False: self.fail(f"User account #{admin_account.id} was able to access another account #{user.id}") except CloudstackAPIException as e: self.debug(f"Exception occurred: {e}") if self.is_child_domain(current_account_user.domain, user.domain) == True: self.fail(f"User account #{admin_account.id} not able to access own account")