mirror of
				https://github.com/apache/cloudstack.git
				synced 2025-11-04 00:02:37 +01:00 
			
		
		
		
	This PR prepares marvin and tests for python3. it was part of #4479, until nose2 was decided to be abandoned from that PR. Re-PR of #4543 and #3730 to enable cooperation Co-authored-by: Daan Hoogland <dahn@onecht.net> Co-authored-by: Gabriel Beims Bräscher <gabriel@apache.org> Co-authored-by: Rohit Yadav <rohit.yadav@shapeblue.com>
		
			
				
	
	
		
			135 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			135 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# Licensed to the Apache Software Foundation (ASF) under one
 | 
						|
# or more contributor license agreements.  See the NOTICE file
 | 
						|
# distributed with this work for additional information
 | 
						|
# regarding copyright ownership.  The ASF licenses this file
 | 
						|
# to you under the Apache License, Version 2.0 (the
 | 
						|
# "License"); you may not use this file except in compliance
 | 
						|
# with the License.  You may obtain a copy of the License at
 | 
						|
#
 | 
						|
#   http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing,
 | 
						|
# software distributed under the License is distributed on an
 | 
						|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 | 
						|
# KIND, either express or implied.  See the License for the
 | 
						|
# specific language governing permissions and limitations
 | 
						|
# under the License.
 | 
						|
 | 
						|
from marvin.cloudstackAPI import *
 | 
						|
from marvin.cloudstackTestCase import cloudstackTestCase
 | 
						|
from marvin.cloudstackException import CloudstackAPIException
 | 
						|
from marvin.lib.base import Account
 | 
						|
from marvin.lib.utils import cleanup_resources
 | 
						|
from marvin.sshClient import SshClient
 | 
						|
from nose.plugins.attrib import attr
 | 
						|
 | 
						|
import inspect
 | 
						|
import logging
 | 
						|
import os
 | 
						|
import re
 | 
						|
 | 
						|
 | 
						|
class TestStaticRoles(cloudstackTestCase):
 | 
						|
    """Tests static role api-checker
 | 
						|
    """
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        self.apiclient = self.testClient.getApiClient()
 | 
						|
        self.dbclient = self.testClient.getDbConnection()
 | 
						|
        self.mgtSvrDetails = self.config.__dict__["mgtSvr"][0].__dict__
 | 
						|
        self.cleanup = []
 | 
						|
        self.testdata = {
 | 
						|
            "account": {
 | 
						|
                "email": "mtu@test.cloud",
 | 
						|
                "firstname": "Marvin",
 | 
						|
                "lastname": "TestUser",
 | 
						|
                "username": "staticrole_acctest-",
 | 
						|
                "password": "password",
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
        feature_enabled = self.apiclient.listCapabilities(listCapabilities.listCapabilitiesCmd()).dynamicrolesenabled
 | 
						|
        if feature_enabled:
 | 
						|
            self.skipTest("Dynamic role-based API checker is enabled, skipping tests for static role-base API checker")
 | 
						|
 | 
						|
        commandsProperties = []
 | 
						|
        try:
 | 
						|
            sshClient = SshClient(
 | 
						|
                self.mgtSvrDetails["mgtSvrIp"],
 | 
						|
                22,
 | 
						|
                self.mgtSvrDetails["user"],
 | 
						|
                self.mgtSvrDetails["passwd"],
 | 
						|
                retries=1,
 | 
						|
                log_lvl=logging.INFO
 | 
						|
            )
 | 
						|
            result = sshClient.runCommand("cat /etc/cloudstack/management/commands.properties")
 | 
						|
            if 'status' in result and result['status'] == 'SUCCESS' and 'stdout' in result and len(result['stdout']) > 0:
 | 
						|
                commandsProperties = result['stdout']
 | 
						|
        except Exception:
 | 
						|
            self.debug("Failed to ssh into mgmt server host and grab commands.properties file")
 | 
						|
            testDir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
 | 
						|
            localFileName = os.path.abspath(testDir + "/../../../client/conf/commands.properties.in")
 | 
						|
            if os.path.isfile(localFileName):
 | 
						|
                self.info("Detected that we're running in developer mode with maven, using file at:" + localFileName)
 | 
						|
                with open(localFileName) as f:
 | 
						|
                    commandsProperties = f.readlines()
 | 
						|
 | 
						|
        if len(commandsProperties) < 1:
 | 
						|
            self.skipTest("Unable to find commands.properties, skipping this test")
 | 
						|
 | 
						|
        apiMap = {}
 | 
						|
        for line in commandsProperties:
 | 
						|
            if not line or line == '' or line == '\n' or line.startswith('#'):
 | 
						|
                continue
 | 
						|
            name, value = line.split('=')
 | 
						|
            apiMap[name.strip()] = value.strip()
 | 
						|
 | 
						|
        self.roleApiMap = {} # role to list of apis allowed
 | 
						|
        octetKey = {'Admin':1, 'DomainAdmin':4, 'User':8}
 | 
						|
        for role in list(octetKey.keys()):
 | 
						|
            for api in sorted(apiMap.keys()):
 | 
						|
                if (octetKey[role] & int(apiMap[api])) > 0:
 | 
						|
                    if role not in self.roleApiMap:
 | 
						|
                        self.roleApiMap[role] = []
 | 
						|
                    self.roleApiMap[role].append(api)
 | 
						|
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        try:
 | 
						|
           cleanup_resources(self.apiclient, self.cleanup)
 | 
						|
        except Exception as e:
 | 
						|
            self.debug("Warning! Exception in tearDown: %s" % e)
 | 
						|
 | 
						|
 | 
						|
    def translateRoleToAccountType(self, role_type):
 | 
						|
        if role_type == 'User':
 | 
						|
            return 0
 | 
						|
        elif role_type == 'Admin':
 | 
						|
            return 1
 | 
						|
        elif role_type == 'DomainAdmin':
 | 
						|
            return 2
 | 
						|
        return -1
 | 
						|
 | 
						|
 | 
						|
    @attr(tags=['advanced', 'simulator', 'basic', 'sg'], required_hardware=False)
 | 
						|
    def test_static_role_account_acls(self):
 | 
						|
        """
 | 
						|
            Tests allowed APIs for common account types
 | 
						|
        """
 | 
						|
        for role in ['Admin', 'DomainAdmin', 'User']:
 | 
						|
            accountType = self.translateRoleToAccountType(role)
 | 
						|
            account = Account.create(
 | 
						|
                self.apiclient,
 | 
						|
                self.testdata['account'],
 | 
						|
                admin=accountType
 | 
						|
            )
 | 
						|
            self.cleanup.append(account)
 | 
						|
            userApiClient = self.testClient.getUserApiClient(UserName=account.name, DomainName=account.domain, type=accountType)
 | 
						|
            allowedApis = [x.name for x in userApiClient.listApis(listApis.listApisCmd())]
 | 
						|
            allApis = [x.name for x in self.apiclient.listApis(listApis.listApisCmd())]
 | 
						|
            for api in self.roleApiMap[role]:
 | 
						|
                if api not in allApis:
 | 
						|
                    continue
 | 
						|
                if api not in allowedApis:
 | 
						|
                    self.fail("API configured in commands.properties not returned by listApis: " + api + " for role: " + role)
 |