mirror of
https://github.com/apache/cloudstack.git
synced 2025-10-26 08:42:29 +01:00
Fixes domain-admin access check to prevent unauthorized access. Introduces a new non-dynamic global setting - api.allow.internal.db.ids to control whether to allow using internal DB IDs as API parameters or not. Default value for the global setting is false. Co-authored-by: Fabricio Duarte <fabricio.duarte.jr@gmail.com> Co-authored-by: nvazquez <nicovazquez90@gmail.com> Co-authored-by: Abhishek Kumar <abhishek.mrt22@gmail.com>
199 lines
8.0 KiB
Python
199 lines
8.0 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
""" BVT tests for Account User Access
|
|
"""
|
|
# Import Local Modules
|
|
from marvin.cloudstackTestCase import cloudstackTestCase
|
|
from marvin.lib.utils import *
|
|
from marvin.lib.base import (Account,
|
|
User,
|
|
Domain)
|
|
from marvin.lib.common import (get_domain)
|
|
from marvin.cloudstackAPI import (getUserKeys)
|
|
from marvin.cloudstackException import CloudstackAPIException
|
|
from nose.plugins.attrib import attr
|
|
|
|
_multiprocess_shared_ = True
|
|
|
|
class TestAccountAccess(cloudstackTestCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
testClient = super(TestAccountAccess, cls).getClsTestClient()
|
|
cls.apiclient = testClient.getApiClient()
|
|
cls.services = testClient.getParsedTestDataConfig()
|
|
cls.hypervisor = testClient.getHypervisorInfo()
|
|
cls._cleanup = []
|
|
|
|
# Get Zone, Domain and templates
|
|
cls.domain = get_domain(cls.apiclient)
|
|
|
|
cls.domains = []
|
|
cls.domain_admins = {}
|
|
cls.domain_users = {}
|
|
cls.account_users = {}
|
|
|
|
domain_data = {
|
|
"name": "domain_1"
|
|
}
|
|
cls.domain_1 = Domain.create(
|
|
cls.apiclient,
|
|
domain_data,
|
|
)
|
|
cls._cleanup.append(cls.domain_1)
|
|
cls.domains.append(cls.domain_1)
|
|
domain_data["name"] = "domain_11"
|
|
cls.domain_11 = Domain.create(
|
|
cls.apiclient,
|
|
domain_data,
|
|
parentdomainid=cls.domain_1.id
|
|
)
|
|
cls._cleanup.append(cls.domain_11)
|
|
cls.domains.append(cls.domain_11)
|
|
domain_data["name"] = "domain_12"
|
|
cls.domain_12 = Domain.create(
|
|
cls.apiclient,
|
|
domain_data,
|
|
parentdomainid=cls.domain_1.id
|
|
)
|
|
cls._cleanup.append(cls.domain_12)
|
|
cls.domains.append(cls.domain_12)
|
|
domain_data["name"] = "domain_2"
|
|
cls.domain_2 = Domain.create(
|
|
cls.apiclient,
|
|
domain_data,
|
|
)
|
|
cls._cleanup.append(cls.domain_2)
|
|
cls.domains.append(cls.domain_2)
|
|
|
|
|
|
for d in cls.domains:
|
|
cls.create_domainadmin_and_user(d)
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
super(TestAccountAccess, cls).tearDownClass()
|
|
|
|
@classmethod
|
|
def create_account(cls, domain, is_admin):
|
|
cls.debug(f"Creating account for domain {domain.name}, admin: {is_admin}")
|
|
data = {
|
|
"email": "admin-" + domain.name + "@test.com",
|
|
"firstname": "Admin",
|
|
"lastname": domain.name,
|
|
"username": "admin-" + domain.name,
|
|
"password": "password"
|
|
}
|
|
if is_admin == False:
|
|
data["email"] = "user-" + domain.name + "@test.com"
|
|
data["firstname"] = "User"
|
|
data["username"] = "user-" + domain.name
|
|
account = Account.create(
|
|
cls.apiclient,
|
|
data,
|
|
admin=is_admin,
|
|
domainid=domain.id
|
|
)
|
|
cls._cleanup.append(account)
|
|
if is_admin == True:
|
|
cls.domain_admins[domain.id] = account
|
|
else:
|
|
cls.domain_users[domain.id] = account
|
|
|
|
user = User.create(
|
|
cls.apiclient,
|
|
data,
|
|
account=account.name,
|
|
domainid=account.domainid)
|
|
cls._cleanup.append(user)
|
|
cls.account_users[account.id] = user
|
|
|
|
@classmethod
|
|
def create_domainadmin_and_user(cls, domain):
|
|
cls.debug(f"Creating accounts for domain #{domain.id} {domain.name}")
|
|
cls.create_account(domain, True)
|
|
cls.create_account(domain, False)
|
|
|
|
def get_user_keys(self, api_client, user_id):
|
|
getUserKeysCmd = getUserKeys.getUserKeysCmd()
|
|
getUserKeysCmd.id = user_id
|
|
return api_client.getUserKeys(getUserKeysCmd)
|
|
|
|
def is_child_domain(self, parent_domain, child_domain):
|
|
if not parent_domain or not child_domain:
|
|
return False
|
|
parent_domain_prefix = parent_domain.split('-')[0]
|
|
child_domain_prefix = child_domain.split('-')[0]
|
|
if not parent_domain_prefix or not child_domain_prefix:
|
|
return False
|
|
return child_domain_prefix.startswith(parent_domain_prefix)
|
|
|
|
|
|
@attr(tags=["advanced", "advancedns", "smoke", "sg"], required_hardware="false")
|
|
def test_01_user_access(self):
|
|
"""
|
|
Test user account is not accessing any other account
|
|
"""
|
|
|
|
domain_user_accounts = [value for value in self.domain_users.values()]
|
|
all_account_users = [value for value in self.account_users.values()]
|
|
for user_account in domain_user_accounts:
|
|
current_account_user = self.account_users[user_account.id]
|
|
self.debug(f"Check for account {user_account.name} with user {current_account_user.username}")
|
|
user_api_client = self.testClient.getUserApiClient(
|
|
UserName=user_account.name,
|
|
DomainName=user_account.domain
|
|
)
|
|
for user in all_account_users:
|
|
self.debug(f"Checking access for user {user.username} associated with account {user.account}")
|
|
try:
|
|
self.get_user_keys(user_api_client, user.id)
|
|
self.debug(f"API successful")
|
|
if user.id != current_account_user.id:
|
|
self.fail(f"User account #{user_account.id} was able to access another account #{user.id}")
|
|
except CloudstackAPIException as e:
|
|
self.debug(f"Exception occurred: {e}")
|
|
if user.id == current_account_user.id:
|
|
self.fail(f"User account #{user_account.id} not able to access own account")
|
|
|
|
@attr(tags=["advanced", "advancedns", "smoke", "sg"], required_hardware="false")
|
|
def test_02_domain_admin_access(self):
|
|
"""
|
|
Test domain admin account is not accessing any other account from unauthorized domain
|
|
"""
|
|
|
|
domain_admin_accounts = [value for value in self.domain_admins.values()]
|
|
all_account_users = [value for value in self.account_users.values()]
|
|
for admin_account in domain_admin_accounts:
|
|
current_account_user = self.account_users[admin_account.id]
|
|
self.debug(f"Check for domain admin {admin_account.name} with user {current_account_user.username}, {current_account_user.domain}")
|
|
admin_api_client = self.testClient.getUserApiClient(
|
|
UserName=admin_account.name,
|
|
DomainName=admin_account.domain
|
|
)
|
|
for user in all_account_users:
|
|
self.debug(f"Checking access for user {user.username}, {user.domain} associated with account {user.account}")
|
|
try:
|
|
self.get_user_keys(admin_api_client, user.id)
|
|
self.debug(f"API successful")
|
|
if self.is_child_domain(current_account_user.domain, user.domain) == False:
|
|
self.fail(f"User account #{admin_account.id} was able to access another account #{user.id}")
|
|
except CloudstackAPIException as e:
|
|
self.debug(f"Exception occurred: {e}")
|
|
if self.is_child_domain(current_account_user.domain, user.domain) == True:
|
|
self.fail(f"User account #{admin_account.id} not able to access own account")
|