mirror of
				https://github.com/apache/cloudstack.git
				synced 2025-11-04 00:02:37 +01:00 
			
		
		
		
	As of now, CloudStack can automatically import LDAP users based on the configuration to a domain or an account. However, any new users in LDAP aren't automatically reflected. The admin has to manually import them again. This feature enables admin to map LDAP group/OU to a CloudStack domain and any changes are reflected in ACS as well.
		
			
				
	
	
		
			599 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			599 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# Licensed to the Apache Software Foundation (ASF) under one
 | 
						|
# or more contributor license agreements.  See the NOTICE file
 | 
						|
# distributed with this work for additional information
 | 
						|
# regarding copyright ownership.  The ASF licenses this file
 | 
						|
# to you under the Apache License, Version 2.0 (the
 | 
						|
# "License"); you may not use this file except in compliance
 | 
						|
# with the License.  You may obtain a copy of the License at
 | 
						|
#
 | 
						|
#   http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing,
 | 
						|
# software distributed under the License is distributed on an
 | 
						|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 | 
						|
# KIND, either express or implied.  See the License for the
 | 
						|
# specific language governing permissions and limitations
 | 
						|
# under the License.
 | 
						|
 | 
						|
from marvin.cloudstackTestCase import cloudstackTestCase
 | 
						|
from marvin.cloudstackAPI import (
 | 
						|
    updateConfiguration,
 | 
						|
    deleteAccount,
 | 
						|
    addLdapConfiguration,
 | 
						|
    linkDomainToLdap,
 | 
						|
    deleteLdapConfiguration,
 | 
						|
    disableAccount)
 | 
						|
from marvin.lib.common import get_domain
 | 
						|
from marvin.lib.base import (Account,
 | 
						|
                             Configurations,
 | 
						|
                             Domain)
 | 
						|
from marvin.cloudstackAPI import login
 | 
						|
from marvin.lib.utils import (cleanup_resources)
 | 
						|
from nose.plugins.attrib import attr
 | 
						|
import telnetlib
 | 
						|
import random
 | 
						|
import string
 | 
						|
 | 
						|
 | 
						|
def randomword(length):
 | 
						|
    return ''.join(random.choice(string.lowercase) for i in range(length))
 | 
						|
 | 
						|
 | 
						|
def addLdapConfiguration1(cls, ldapConfiguration):
 | 
						|
    """
 | 
						|
            :param ldapConfiguration
 | 
						|
            """
 | 
						|
    cls.chkConfig = checkLdapConfiguration(cls, ldapConfiguration)
 | 
						|
    if not cls.chkConfig:
 | 
						|
        return 0
 | 
						|
 | 
						|
    # Setup Global settings
 | 
						|
    Configurations.update(
 | 
						|
        cls.apiClient,
 | 
						|
        name="ldap.basedn",
 | 
						|
        value=ldapConfiguration['basedn']
 | 
						|
    )
 | 
						|
    Configurations.update(
 | 
						|
        cls.apiClient,
 | 
						|
        name="ldap.bind.password",
 | 
						|
        value=ldapConfiguration['bindpassword']
 | 
						|
    )
 | 
						|
    Configurations.update(
 | 
						|
        cls.apiClient,
 | 
						|
        name="ldap.bind.principal",
 | 
						|
        value=ldapConfiguration['principal']
 | 
						|
    )
 | 
						|
    Configurations.update(
 | 
						|
        cls.apiClient,
 | 
						|
        name="ldap.email.attribute",
 | 
						|
        value=ldapConfiguration['emailAttribute']
 | 
						|
    )
 | 
						|
    Configurations.update(
 | 
						|
        cls.apiClient,
 | 
						|
        name="ldap.user.object",
 | 
						|
        value=ldapConfiguration['userObject']
 | 
						|
    )
 | 
						|
    Configurations.update(
 | 
						|
        cls.apiClient,
 | 
						|
        name="ldap.username.attribute",
 | 
						|
        value=ldapConfiguration['usernameAttribute']
 | 
						|
    )
 | 
						|
    Configurations.update(
 | 
						|
        cls.apiClient,
 | 
						|
        name="ldap.nested.groups.enable",
 | 
						|
        value="true"
 | 
						|
    )
 | 
						|
 | 
						|
    ldapServer = addLdapConfiguration.addLdapConfigurationCmd()
 | 
						|
    ldapServer.hostname = ldapConfiguration['hostname']
 | 
						|
    ldapServer.port = ldapConfiguration['port']
 | 
						|
 | 
						|
    cls.debug("calling addLdapConfiguration API command")
 | 
						|
    try:
 | 
						|
        cls.apiClient.addLdapConfiguration(ldapServer)
 | 
						|
        cls.debug("addLdapConfiguration was successful")
 | 
						|
        return 1
 | 
						|
    except Exception as e:
 | 
						|
        cls.debug(
 | 
						|
            "addLdapConfiguration failed %s Check the Passed passed"
 | 
						|
            " ldap attributes" %
 | 
						|
            e)
 | 
						|
        cls.reason = "addLdapConfiguration failed %s Check the Passed " \
 | 
						|
                     "passed ldap attributes" % e
 | 
						|
        raise Exception(
 | 
						|
            "addLdapConfiguration failed %s Check the Passed passed"
 | 
						|
            " ldap attributes" %
 | 
						|
            e)
 | 
						|
        return 1
 | 
						|
 | 
						|
 | 
						|
def checklogin(cls, username, password, domain, method):
 | 
						|
    """
 | 
						|
    :param username:
 | 
						|
    :param password:
 | 
						|
    """
 | 
						|
    cls.debug("Attempting to login.")
 | 
						|
    try:
 | 
						|
        loginParams = login.loginCmd()
 | 
						|
        loginParams.username = username
 | 
						|
        loginParams.password = password
 | 
						|
        loginParams.domain = domain
 | 
						|
        loginRes = cls.apiClient.login(loginParams, method)
 | 
						|
        cls.debug("login response %s" % loginRes)
 | 
						|
        if loginRes is None:
 | 
						|
            cls.debug("login not successful")
 | 
						|
            return 0
 | 
						|
        else:
 | 
						|
            cls.debug("login successful")
 | 
						|
            return 1
 | 
						|
    except Exception as p:
 | 
						|
        cls.debug("login operation failed %s" % p)
 | 
						|
        cls.reason = "Login operation Failed %s" % p
 | 
						|
 | 
						|
 | 
						|
def checkLdapConfiguration(cls, ldapConfiguration):
 | 
						|
    """This function checks whether the passed ldap server in
 | 
						|
        the configuration is up and running or not.
 | 
						|
                 """
 | 
						|
    flag = False
 | 
						|
    try:
 | 
						|
        tn = telnetlib.Telnet(
 | 
						|
            ldapConfiguration['hostname'],
 | 
						|
            ldapConfiguration['port'],
 | 
						|
            timeout=15)
 | 
						|
        if tn is not None:
 | 
						|
            tn.set_debuglevel(1)
 | 
						|
            print tn.msg("Connected to the server")
 | 
						|
            cls.debug(
 | 
						|
                "Ldap Server is Up and listening on the port %s" %
 | 
						|
                tn.msg("Connected to the server"))
 | 
						|
            flag = True
 | 
						|
            tn.close()
 | 
						|
    except Exception as e:
 | 
						|
        cls.debug(
 | 
						|
            "Not able to reach the LDAP server ,"
 | 
						|
            "please check the Services on LDAP %s and  exception is %s" %
 | 
						|
            ((ldapConfiguration['hostname']), e))
 | 
						|
        cls.reason = "Not able to reach the LDAP server ,please check" \
 | 
						|
                     " the Services on LDAP %s and exception is %s" \
 | 
						|
                     % ((ldapConfiguration['hostname']), e)
 | 
						|
    return flag
 | 
						|
 | 
						|
 | 
						|
class TestLdap(cloudstackTestCase):
 | 
						|
    """
 | 
						|
    LDAP AutoImport smoke tests
 | 
						|
    """
 | 
						|
    @classmethod
 | 
						|
    def setUpClass(cls):
 | 
						|
        """
 | 
						|
        :type cls: object
 | 
						|
        """
 | 
						|
        testClient = super(TestLdap, cls).getClsTestClient()
 | 
						|
        cls.api_client = testClient.getApiClient()
 | 
						|
        cls.services = testClient.getParsedTestDataConfig()
 | 
						|
        cls.cleanup = []
 | 
						|
        cls.domain = get_domain(cls.api_client)
 | 
						|
        cls.delflag = 0
 | 
						|
        cls.reason = ""
 | 
						|
 | 
						|
        cls.apiClient = cls.testClient.getApiClient()
 | 
						|
        try:
 | 
						|
            cls.ldapconfRes = addLdapConfiguration1(
 | 
						|
                cls, cls.services["configurableData"]["ldap_configuration"])
 | 
						|
        except Exception as e:
 | 
						|
            raise Exception("Configuring LDAP failed. Check attributes")
 | 
						|
 | 
						|
        cls.cleanup.append(cls.ldapconfRes)
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def tearDownClass(cls):
 | 
						|
        """
 | 
						|
        #cleanup includes : delete normal account, remove ldap configuration
 | 
						|
        :type cls: object
 | 
						|
        """
 | 
						|
        testClient = super(TestLdap, cls).getClsTestClient()
 | 
						|
        cls.api_client = testClient.getApiClient()
 | 
						|
        cls.services = testClient.getParsedTestDataConfig()
 | 
						|
 | 
						|
        if cls.ldapconfRes == 1:
 | 
						|
            ldapserver = deleteLdapConfiguration.deleteLdapConfigurationCmd()
 | 
						|
            ldapserver.hostname = cls.services["configurableData"][
 | 
						|
                "ldap_configuration"]["hostname"]
 | 
						|
 | 
						|
            try:
 | 
						|
                cls.apiClient.deleteLdapConfiguration(ldapserver)
 | 
						|
                cls.debug("deleteLdapConfiguration was successful")
 | 
						|
                return 1
 | 
						|
            except Exception as e:
 | 
						|
                cls.debug("deleteLdapConfiguration failed %s" % e)
 | 
						|
                return 0
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        self.user = self.services["configurableData"]["link_ldap_details"]["linkLdapUsername"]
 | 
						|
        self.password = self.services["configurableData"]["link_ldap_details"]["linkLdapPassword"]
 | 
						|
        self.delflag1 = 0
 | 
						|
        self.delflag2 = 0
 | 
						|
        self.delflag3 = 0
 | 
						|
        self.delflag4 = 0
 | 
						|
 | 
						|
        self.apiclient = self.testClient.getApiClient()
 | 
						|
        self.dbclient = self.testClient.getDbConnection()
 | 
						|
        self.cleanup = []
 | 
						|
 | 
						|
        self.parent_domain = Domain.create(
 | 
						|
            self.apiclient,
 | 
						|
            services=self.services["domain"],
 | 
						|
            parentdomainid=self.domain.id)
 | 
						|
 | 
						|
        self.ldaplink = linkDomainToLdap.linkDomainToLdapCmd()
 | 
						|
        self.ldaplink.domainid = self.parent_domain.id
 | 
						|
        self.ldaplink.accounttype = self.services[
 | 
						|
            "configurableData"]["link_ldap_details"]["accounttype"]
 | 
						|
        self.ldaplink.name = self.services[
 | 
						|
            "configurableData"]["link_ldap_details"]["name"]
 | 
						|
        self.ldaplink.type = self.services[
 | 
						|
            "configurableData"]["link_ldap_details"]["type"]
 | 
						|
        if self.services["configurableData"][
 | 
						|
            "link_ldap_details"]["admin"] is not None:
 | 
						|
            self.ldaplink.admin = self.services[
 | 
						|
                "configurableData"]["link_ldap_details"]["admin"]
 | 
						|
 | 
						|
        if self.ldaplink.domainid == "" or self.ldaplink.accounttype == "" \
 | 
						|
                or self.ldaplink.name == "" \
 | 
						|
                or self.ldaplink.type == "":
 | 
						|
            self.debug(
 | 
						|
                "Please rerun the test by providing "
 | 
						|
                "values in link_ldap configuration user details")
 | 
						|
            self.skipTest(
 | 
						|
                "Please rerun the test by providing "
 | 
						|
                "proper values in configuration file(link ldap)")
 | 
						|
        else:
 | 
						|
            self.delflag1 = 1
 | 
						|
            self.ldaplinkRes = self.apiClient.linkDomainToLdap(self.ldaplink)
 | 
						|
        self.assertEquals(
 | 
						|
            self.delflag1,
 | 
						|
            1,
 | 
						|
            "Linking LDAP failed,please check the configuration")
 | 
						|
        loginRes = checklogin(self,
 | 
						|
                              self.user, self.password,
 | 
						|
                              self.parent_domain.name,
 | 
						|
                              method="POST")
 | 
						|
        self.debug(loginRes)
 | 
						|
        self.assertEquals(loginRes, 1, self.reason)
 | 
						|
 | 
						|
        lsap_user = Account.list(self.api_client,
 | 
						|
                                 domainid=self.parent_domain.id,
 | 
						|
                                 name=self.user
 | 
						|
                                 )
 | 
						|
        self.ldapacctID = lsap_user[0].id
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
 | 
						|
        try:
 | 
						|
            self.parent_domain.delete(self.apiclient, cleanup=True)
 | 
						|
        except Exception as e:
 | 
						|
            raise Exception(
 | 
						|
                "Warning: Exception during cleanup of domain : %s" % e)
 | 
						|
        try:
 | 
						|
            # Clean up, terminate the created instance, volumes and snapshots
 | 
						|
            cleanup_resources(self.apiclient, self.cleanup)
 | 
						|
            pass
 | 
						|
        except Exception as e:
 | 
						|
            raise Exception("Warning: Exception during cleanup : %s" % e)
 | 
						|
        return
 | 
						|
 | 
						|
    @attr(tags=["advanced", "basic"], required_hardware="true")
 | 
						|
    def test_01_ldap(self):
 | 
						|
        """Check the linkDomainToLdap functionality"""
 | 
						|
        self.domain1 = Domain.create(
 | 
						|
            self.apiclient,
 | 
						|
            services=self.services["domain"],
 | 
						|
            parentdomainid=self.domain.id)
 | 
						|
 | 
						|
        self.ldaplink4 = linkDomainToLdap.linkDomainToLdapCmd()
 | 
						|
        self.ldaplink4.domainid = self.domain1.id
 | 
						|
        self.ldaplink4.accounttype = self.services[
 | 
						|
            "configurableData"]["link_ldap_details"]["accounttype"]
 | 
						|
        self.ldaplink4.name = self.services[
 | 
						|
            "configurableData"]["link_ldap_details"]["name"]
 | 
						|
        self.ldaplink4.type = self.services[
 | 
						|
            "configurableData"]["link_ldap_details"]["type"]
 | 
						|
        if self.services["configurableData"][
 | 
						|
            "link_ldap_details"]["admin"] is not None:
 | 
						|
            self.ldaplink4.admin = self.services[
 | 
						|
                "configurableData"]["link_ldap_details"]["admin"]
 | 
						|
 | 
						|
        try:
 | 
						|
            self.ldaplinkRes4 = self.apiClient.linkDomainToLdap(self.ldaplink4)
 | 
						|
 | 
						|
        except Exception as e:
 | 
						|
            raise Exception(
 | 
						|
                "Linking LDAP failed,please check the configuration")
 | 
						|
 | 
						|
        try:
 | 
						|
            self.domain1.delete(self.apiclient)
 | 
						|
        except Exception as e:
 | 
						|
            raise Exception(
 | 
						|
                "Warning: Exception during deletion of domain : %s" % e)
 | 
						|
 | 
						|
    @attr(tags=["advanced", "basic"], required_hardware="true")
 | 
						|
    def test_02_ldap(self):
 | 
						|
        """User is both in LDAP and imported into CS(i.e already logged in
 | 
						|
           once.So just check the log in again)"""
 | 
						|
        loginRes = checklogin(
 | 
						|
            self,
 | 
						|
            self.user,
 | 
						|
            self.password,
 | 
						|
            self.parent_domain.name,
 | 
						|
            method="POST")
 | 
						|
        self.debug(loginRes)
 | 
						|
        self.assertEquals(loginRes, 1, self.reason)
 | 
						|
 | 
						|
    @attr(tags=["advanced", "basic"], required_hardware="true")
 | 
						|
    def test_03_ldap(self):
 | 
						|
        """User in LDAP, wrong password --> login should fail"""
 | 
						|
        loginRes = checklogin(
 | 
						|
            self,
 | 
						|
            self.user,
 | 
						|
            randomword(8),
 | 
						|
            self.parent_domain.name,
 | 
						|
            method="POST")
 | 
						|
        self.debug(loginRes)
 | 
						|
        self.assertEquals(loginRes, None, self.reason)
 | 
						|
 | 
						|
    @attr(tags=["advanced", "basic"], required_hardware="true")
 | 
						|
    def test_04_ldap(self):
 | 
						|
        """User is only present locally, password is wrong --> login should
 | 
						|
          fail"""
 | 
						|
        loginRes = checklogin(
 | 
						|
            self,
 | 
						|
            self.services["configurableData"]["ldap_account"]["username"],
 | 
						|
            randomword(10),
 | 
						|
            "",
 | 
						|
            method="POST")
 | 
						|
        self.debug(loginRes)
 | 
						|
        self.assertEquals(loginRes, None, self.reason)
 | 
						|
 | 
						|
    @attr(tags=["advanced", "basic"], required_hardware="true")
 | 
						|
    def test_05_ldap(self):
 | 
						|
        """user is not present anywhere --> login should fail"""
 | 
						|
        loginRes = checklogin(self, randomword(10), randomword(10),
 | 
						|
                              self.parent_domain.name,
 | 
						|
                              method="POST")
 | 
						|
        self.debug(loginRes)
 | 
						|
        self.assertEquals(loginRes, None, self.reason)
 | 
						|
 | 
						|
    @attr(tags=["advanced", "basic"], required_hardware="true")
 | 
						|
    def test_06_ldap(self):
 | 
						|
        """Delete the LDAP user from CS and try to login --> User should be
 | 
						|
          created again"""
 | 
						|
        try:
 | 
						|
 | 
						|
            deleteAcct2 = deleteAccount.deleteAccountCmd()
 | 
						|
            deleteAcct2.id = self.ldapacctID
 | 
						|
 | 
						|
            acct_name = self.services["configurableData"][
 | 
						|
                "link_ldap_details"]["linkLdapUsername"]
 | 
						|
 | 
						|
            self.apiClient.deleteAccount(deleteAcct2)
 | 
						|
 | 
						|
            self.debug(
 | 
						|
                "Deleted the the following account name %s:" %
 | 
						|
                acct_name)
 | 
						|
 | 
						|
        except Exception as e:
 | 
						|
            raise Exception(
 | 
						|
                "Warning: Exception during deleting "
 | 
						|
                "ldap imported account : %s" %
 | 
						|
                e)
 | 
						|
 | 
						|
        loginRes = checklogin(
 | 
						|
            self,
 | 
						|
            self.user,
 | 
						|
            self.password,
 | 
						|
            self.parent_domain.name,
 | 
						|
            method="POST")
 | 
						|
        self.debug(loginRes)
 | 
						|
        self.assertEquals(loginRes, 1, self.reason)
 | 
						|
 | 
						|
    @attr(tags=["advanced", "basic"], required_hardware="true")
 | 
						|
    def test_07_ldap(self):
 | 
						|
        """Lock the user from CS and attempt to login --> login should fail"""
 | 
						|
        self.lockAcct = disableAccount.disableAccountCmd()
 | 
						|
        self.lockAcct.lock = 'true'
 | 
						|
        self.lockAcct.account = self.services["configurableData"][
 | 
						|
            "ldap_account"]["username"]
 | 
						|
        self.lockAcct.domainid = self.parent_domain.id
 | 
						|
        self.apiClient.disableAccount(self.lockAcct)
 | 
						|
 | 
						|
        loginRes = checklogin(
 | 
						|
            self,
 | 
						|
            self.user,
 | 
						|
            self.password,
 | 
						|
            self.parent_domain.name,
 | 
						|
            method="POST")
 | 
						|
        self.debug(loginRes)
 | 
						|
        self.assertEquals(loginRes, None, self.reason)
 | 
						|
 | 
						|
    @attr(tags=["advanced", "basic"], required_hardware="true")
 | 
						|
    def test_08_ldap(self):
 | 
						|
        """Create different domains and link all of them to LDAP. Check
 | 
						|
                        login in each domain --> login should be successful"""
 | 
						|
        try:
 | 
						|
            loginRes = checklogin(
 | 
						|
                self,
 | 
						|
                self.user,
 | 
						|
                self.password,
 | 
						|
                self.parent_domain.name,
 | 
						|
                method="POST")
 | 
						|
            self.debug(loginRes)
 | 
						|
            self.assertEquals(loginRes, 1, self.reason)
 | 
						|
 | 
						|
            self.domain2 = Domain.create(
 | 
						|
                self.apiclient,
 | 
						|
                services=self.services["domain"],
 | 
						|
                parentdomainid=self.domain.id)
 | 
						|
 | 
						|
            # here link ldap to domain
 | 
						|
            self.ldaplink2 = linkDomainToLdap.linkDomainToLdapCmd()
 | 
						|
            self.ldaplink2.domainid = self.domain2.id
 | 
						|
            self.ldaplink2.accounttype = self.services[
 | 
						|
                "configurableData"]["link_ldap_details"]["accounttype"]
 | 
						|
            self.ldaplink2.name = self.services[
 | 
						|
                "configurableData"]["link_ldap_details"]["name"]
 | 
						|
            self.ldaplink2.type = self.services[
 | 
						|
                "configurableData"]["link_ldap_details"]["type"]
 | 
						|
 | 
						|
            if self.services["configurableData"][
 | 
						|
                "link_ldap_details"]["admin"] is not None:
 | 
						|
                self.ldaplink2.admin = self.services[
 | 
						|
                    "configurableData"]["link_ldap_details"]["admin"]
 | 
						|
 | 
						|
            if self.ldaplink2.domainid == "" \
 | 
						|
                    or self.ldaplink2.accounttype == "" \
 | 
						|
                    or self.ldaplink2.name == "" \
 | 
						|
                    or self.ldaplink2.type == "":
 | 
						|
                self.debug(
 | 
						|
                    "Please rerun the test by providing"
 | 
						|
                    " values in link_ldap configuration user details")
 | 
						|
                self.skipTest(
 | 
						|
                    "Please rerun the test by providing "
 | 
						|
                    "proper values in configuration file(link ldap)")
 | 
						|
 | 
						|
            else:
 | 
						|
                self.delflag2 = 1
 | 
						|
                self.ldaplinkRes2 = self.apiClient.linkDomainToLdap(
 | 
						|
                    self.ldaplink2)
 | 
						|
            self.assertEquals(
 | 
						|
                self.delflag2,
 | 
						|
                1,
 | 
						|
                "Linking LDAP failed,please check the configuration")
 | 
						|
 | 
						|
            loginRes = checklogin(
 | 
						|
                self,
 | 
						|
                self.user,
 | 
						|
                self.password,
 | 
						|
                self.domain2.name,
 | 
						|
                method="POST")
 | 
						|
            self.debug(loginRes)
 | 
						|
            self.assertEquals(loginRes, 1, self.reason)
 | 
						|
 | 
						|
            self.domain3 = Domain.create(
 | 
						|
                self.apiclient,
 | 
						|
                services=self.services["domain"],
 | 
						|
                parentdomainid=self.domain.id)
 | 
						|
            # here link ldap to domain
 | 
						|
            self.ldaplink3 = linkDomainToLdap.linkDomainToLdapCmd()
 | 
						|
            self.ldaplink3.domainid = self.domain3.id
 | 
						|
            self.ldaplink3.accounttype = self.services[
 | 
						|
                "configurableData"]["link_ldap_details"]["accounttype"]
 | 
						|
            self.ldaplink3.name = self.services[
 | 
						|
                "configurableData"]["link_ldap_details"]["name"]
 | 
						|
            self.ldaplink3.type = self.services[
 | 
						|
                "configurableData"]["link_ldap_details"]["type"]
 | 
						|
            if self.services["configurableData"][
 | 
						|
                "link_ldap_details"]["admin"] is not None:
 | 
						|
                self.ldaplink3.admin = self.services[
 | 
						|
                    "configurableData"]["link_ldap_details"]["admin"]
 | 
						|
 | 
						|
            if self.ldaplink3.domainid == "" \
 | 
						|
                    or self.ldaplink3.accounttype == "" \
 | 
						|
                    or self.ldaplink3.name == "" \
 | 
						|
                    or self.ldaplink3.type == "":
 | 
						|
                self.debug(
 | 
						|
                    "Please rerun the test by providing"
 | 
						|
                    " values in link_ldap configuration user details")
 | 
						|
                self.skipTest(
 | 
						|
                    "Please rerun the test by providing "
 | 
						|
                    "proper values in configuration file(link ldap)")
 | 
						|
            else:
 | 
						|
                self.delflag3 = 1
 | 
						|
                self.ldaplinkRes3 = self.apiClient.linkDomainToLdap(
 | 
						|
                    self.ldaplink3)
 | 
						|
            self.assertEquals(
 | 
						|
                self.delflag3,
 | 
						|
                1,
 | 
						|
                "Linking LDAP failed,please check the configuration")
 | 
						|
            loginRes = checklogin(
 | 
						|
                self,
 | 
						|
                self.user,
 | 
						|
                self.password,
 | 
						|
                self.domain2.name,
 | 
						|
                method="POST")
 | 
						|
            self.debug(loginRes)
 | 
						|
            self.assertEquals(loginRes, 1, self.reason)
 | 
						|
 | 
						|
        finally:
 | 
						|
            try:
 | 
						|
                self.domain2.delete(self.apiclient, cleanup=True)
 | 
						|
            except Exception as e:
 | 
						|
                raise Exception(
 | 
						|
                    "Warning: Exception during deletion of domain : %s" % e)
 | 
						|
 | 
						|
            try:
 | 
						|
                self.domain3.delete(self.apiclient, cleanup=True)
 | 
						|
            except Exception as e:
 | 
						|
                raise Exception(
 | 
						|
                    "Warning: Exception during deletion of domain : %s" % e)
 | 
						|
 | 
						|
        return
 | 
						|
 | 
						|
    @attr(tags=["advanced", "basic"], required_hardware="true")
 | 
						|
    def test_09_ldap(self):
 | 
						|
        """ Enable nested groups and try to login with a user that is in
 | 
						|
             nested group --> login should be successful"""
 | 
						|
        if self.services["configurableData"]["link_ldap_details"]["linkLdapNestedUser"] == "":
 | 
						|
            self.skipTest("No nested user mentioned")
 | 
						|
        updateConfigurationCmd = updateConfiguration.updateConfigurationCmd()
 | 
						|
        updateConfigurationCmd.name = "ldap.nested.groups.enable"
 | 
						|
        updateConfigurationCmd.value = 'true'
 | 
						|
        self.apiClient.updateConfiguration(updateConfigurationCmd)
 | 
						|
        loginRes = checklogin(
 | 
						|
            self,
 | 
						|
            self.services["configurableData"]["link_ldap_details"]["linkLdapNestedUser"],
 | 
						|
            self.services["configurableData"]["link_ldap_details"]["linkLdapNestedPassword"],
 | 
						|
            self.parent_domain.name,
 | 
						|
            method="POST")
 | 
						|
        self.debug(loginRes)
 | 
						|
        self.assertEquals(loginRes, 1, self.reason)
 | 
						|
 | 
						|
    @attr(tags=["advanced", "basic"], required_hardware="true")
 | 
						|
    def test_10_ldap(self):
 | 
						|
        """Check db tables"""
 | 
						|
        db_check = 1
 | 
						|
 | 
						|
        domainID = self.dbclient.execute(
 | 
						|
            "SELECT id FROM domain WHERE uuid=" + "'" +
 | 
						|
            self.parent_domain.id + "'" + ";",
 | 
						|
            db="cloud")
 | 
						|
 | 
						|
        dbChecking = self.dbclient.execute(
 | 
						|
            "SELECT type,name,account_type "
 | 
						|
            "FROM ldap_trust_map WHERE domain_id=" + "'" +
 | 
						|
            str(domainID[0][0]) + "'" + ";",
 | 
						|
            db="cloud")
 | 
						|
 | 
						|
        if dbChecking is not None and str(
 | 
						|
                dbChecking[0][0]) == \
 | 
						|
                self.services["configurableData"]["link_ldap_details"]["type"] \
 | 
						|
                and str(
 | 
						|
                    dbChecking[0][1]) == \
 | 
						|
                        self.services["configurableData"]["link_ldap_details"]["name"] \
 | 
						|
                and str(
 | 
						|
                    dbChecking[0][2]) == \
 | 
						|
                        self.services["configurableData"]["link_ldap_details"]["accounttype"]:
 | 
						|
            db_check = 0
 | 
						|
        self.assertEquals(db_check, 0, "DB check failed")
 | 
						|
 | 
						|
    @attr(tags=["advanced", "basic"], required_hardware="true")
 | 
						|
    def test_11_ldap(self):
 | 
						|
        """Password/domain empty --> login should fail"""
 | 
						|
        loginRes = checklogin(
 | 
						|
            self,
 | 
						|
            "", "", self.parent_domain.name, method="POST")
 | 
						|
        self.debug(loginRes)
 | 
						|
        self.assertEquals(loginRes, None, self.reason)
 | 
						|
 | 
						|
 |