cloudstack/test/integration/component/test_ldap_auto_import.py
2023-09-18 13:16:33 +02:00

597 lines
22 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from marvin.cloudstackTestCase import cloudstackTestCase
from marvin.cloudstackAPI import (
updateConfiguration,
deleteAccount,
addLdapConfiguration,
linkDomainToLdap,
deleteLdapConfiguration,
disableAccount)
from marvin.lib.common import get_domain
from marvin.lib.base import (Account,
Configurations,
Domain)
from marvin.cloudstackAPI import login
from marvin.lib.utils import (cleanup_resources)
from nose.plugins.attrib import attr
import telnetlib
import random
import string
def randomword(length):
return ''.join(random.choice(string.lowercase) for i in range(length))
def addLdapConfiguration1(cls, ldapConfiguration):
"""
:param ldapConfiguration
"""
cls.chkConfig = checkLdapConfiguration(cls, ldapConfiguration)
if not cls.chkConfig:
return 0
# Setup Global settings
Configurations.update(
cls.apiClient,
name="ldap.basedn",
value=ldapConfiguration['basedn']
)
Configurations.update(
cls.apiClient,
name="ldap.bind.password",
value=ldapConfiguration['bindpassword']
)
Configurations.update(
cls.apiClient,
name="ldap.bind.principal",
value=ldapConfiguration['principal']
)
Configurations.update(
cls.apiClient,
name="ldap.email.attribute",
value=ldapConfiguration['emailAttribute']
)
Configurations.update(
cls.apiClient,
name="ldap.user.object",
value=ldapConfiguration['userObject']
)
Configurations.update(
cls.apiClient,
name="ldap.username.attribute",
value=ldapConfiguration['usernameAttribute']
)
Configurations.update(
cls.apiClient,
name="ldap.nested.groups.enable",
value="true"
)
ldapServer = addLdapConfiguration.addLdapConfigurationCmd()
ldapServer.hostname = ldapConfiguration['hostname']
ldapServer.port = ldapConfiguration['port']
cls.debug("calling addLdapConfiguration API command")
try:
cls.apiClient.addLdapConfiguration(ldapServer)
cls.debug("addLdapConfiguration was successful")
return 1
except Exception as e:
cls.debug(
"addLdapConfiguration failed %s Check the Passed passed"
" ldap attributes" %
e)
cls.reason = "addLdapConfiguration failed %s Check the Passed " \
"passed ldap attributes" % e
raise Exception(
"addLdapConfiguration failed %s Check the Passed passed"
" ldap attributes" %
e)
return 1
def checklogin(cls, username, password, domain, method):
"""
:param username:
:param password:
"""
cls.debug("Attempting to login.")
try:
loginParams = login.loginCmd()
loginParams.username = username
loginParams.password = password
loginParams.domain = domain
loginRes = cls.apiClient.login(loginParams, method)
cls.debug("login response %s" % loginRes)
if loginRes is None:
cls.debug("login not successful")
return 0
else:
cls.debug("login successful")
return 1
except Exception as p:
cls.debug("login operation failed %s" % p)
cls.reason = "Login operation Failed %s" % p
def checkLdapConfiguration(cls, ldapConfiguration):
"""This function checks whether the passed ldap server in
the configuration is up and running or not.
"""
flag = False
try:
tn = telnetlib.Telnet(
ldapConfiguration['hostname'],
ldapConfiguration['port'],
timeout=15)
if tn is not None:
tn.set_debuglevel(1)
print(tn.msg("Connected to the server"))
cls.debug(
"Ldap Server is Up and listening on the port %s" %
tn.msg("Connected to the server"))
flag = True
tn.close()
except Exception as e:
cls.debug(
"Not able to reach the LDAP server ,"
"please check the Services on LDAP %s and exception is %s" %
((ldapConfiguration['hostname']), e))
cls.reason = "Not able to reach the LDAP server ,please check" \
" the Services on LDAP %s and exception is %s" \
% ((ldapConfiguration['hostname']), e)
return flag
class TestLdap(cloudstackTestCase):
"""
LDAP AutoImport smoke tests
"""
@classmethod
def setUpClass(cls):
"""
:type cls: object
"""
testClient = super(TestLdap, cls).getClsTestClient()
cls.api_client = testClient.getApiClient()
cls.services = testClient.getParsedTestDataConfig()
cls.cleanup = []
cls.domain = get_domain(cls.api_client)
cls.delflag = 0
cls.reason = ""
cls.apiClient = cls.testClient.getApiClient()
try:
cls.ldapconfRes = addLdapConfiguration1(
cls, cls.services["configurableData"]["ldap_configuration"])
except Exception as e:
raise Exception("Configuring LDAP failed. Check attributes")
cls.cleanup.append(cls.ldapconfRes)
@classmethod
def tearDownClass(cls):
"""
#cleanup includes : delete normal account, remove ldap configuration
:type cls: object
"""
testClient = super(TestLdap, cls).getClsTestClient()
cls.api_client = testClient.getApiClient()
cls.services = testClient.getParsedTestDataConfig()
if cls.ldapconfRes == 1:
ldapserver = deleteLdapConfiguration.deleteLdapConfigurationCmd()
ldapserver.hostname = cls.services["configurableData"][
"ldap_configuration"]["hostname"]
try:
cls.apiClient.deleteLdapConfiguration(ldapserver)
cls.debug("deleteLdapConfiguration was successful")
return 1
except Exception as e:
cls.debug("deleteLdapConfiguration failed %s" % e)
return 0
def setUp(self):
self.user = self.services["configurableData"]["link_ldap_details"]["linkLdapUsername"]
self.password = self.services["configurableData"]["link_ldap_details"]["linkLdapPassword"]
self.delflag1 = 0
self.delflag2 = 0
self.delflag3 = 0
self.delflag4 = 0
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
self.parent_domain = Domain.create(
self.apiclient,
services=self.services["domain"],
parentdomainid=self.domain.id)
self.ldaplink = linkDomainToLdap.linkDomainToLdapCmd()
self.ldaplink.domainid = self.parent_domain.id
self.ldaplink.accounttype = self.services[
"configurableData"]["link_ldap_details"]["accounttype"]
self.ldaplink.name = self.services[
"configurableData"]["link_ldap_details"]["name"]
self.ldaplink.type = self.services[
"configurableData"]["link_ldap_details"]["type"]
if self.services["configurableData"][
"link_ldap_details"]["admin"] is not None:
self.ldaplink.admin = self.services[
"configurableData"]["link_ldap_details"]["admin"]
if self.ldaplink.domainid == "" or self.ldaplink.accounttype == "" \
or self.ldaplink.name == "" \
or self.ldaplink.type == "":
self.debug(
"Please rerun the test by providing "
"values in link_ldap configuration user details")
self.skipTest(
"Please rerun the test by providing "
"proper values in configuration file(link ldap)")
else:
self.delflag1 = 1
self.ldaplinkRes = self.apiClient.linkDomainToLdap(self.ldaplink)
self.assertEqual(
self.delflag1,
1,
"Linking LDAP failed,please check the configuration")
loginRes = checklogin(self,
self.user, self.password,
self.parent_domain.name,
method="POST")
self.debug(loginRes)
self.assertEqual(loginRes, 1, self.reason)
lsap_user = Account.list(self.api_client,
domainid=self.parent_domain.id,
name=self.user
)
self.ldapacctID = lsap_user[0].id
def tearDown(self):
try:
self.parent_domain.delete(self.apiclient, cleanup=True)
except Exception as e:
raise Exception(
"Warning: Exception during cleanup of domain : %s" % e)
try:
# Clean up, terminate the created instance, volumes and snapshots
cleanup_resources(self.apiclient, self.cleanup)
pass
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
@attr(tags=["advanced", "basic"], required_hardware="true")
def test_01_ldap(self):
"""Check the linkDomainToLdap functionality"""
self.domain1 = Domain.create(
self.apiclient,
services=self.services["domain"],
parentdomainid=self.domain.id)
self.ldaplink4 = linkDomainToLdap.linkDomainToLdapCmd()
self.ldaplink4.domainid = self.domain1.id
self.ldaplink4.accounttype = self.services[
"configurableData"]["link_ldap_details"]["accounttype"]
self.ldaplink4.name = self.services[
"configurableData"]["link_ldap_details"]["name"]
self.ldaplink4.type = self.services[
"configurableData"]["link_ldap_details"]["type"]
if self.services["configurableData"][
"link_ldap_details"]["admin"] is not None:
self.ldaplink4.admin = self.services[
"configurableData"]["link_ldap_details"]["admin"]
try:
self.ldaplinkRes4 = self.apiClient.linkDomainToLdap(self.ldaplink4)
except Exception as e:
raise Exception(
"Linking LDAP failed,please check the configuration")
try:
self.domain1.delete(self.apiclient)
except Exception as e:
raise Exception(
"Warning: Exception during deletion of domain : %s" % e)
@attr(tags=["advanced", "basic"], required_hardware="true")
def test_02_ldap(self):
"""User is both in LDAP and imported into CS(i.e already logged in
once.So just check the log in again)"""
loginRes = checklogin(
self,
self.user,
self.password,
self.parent_domain.name,
method="POST")
self.debug(loginRes)
self.assertEqual(loginRes, 1, self.reason)
@attr(tags=["advanced", "basic"], required_hardware="true")
def test_03_ldap(self):
"""User in LDAP, wrong password --> login should fail"""
loginRes = checklogin(
self,
self.user,
randomword(8),
self.parent_domain.name,
method="POST")
self.debug(loginRes)
self.assertEqual(loginRes, None, self.reason)
@attr(tags=["advanced", "basic"], required_hardware="true")
def test_04_ldap(self):
"""User is only present locally, password is wrong --> login should
fail"""
loginRes = checklogin(
self,
self.services["configurableData"]["ldap_account"]["username"],
randomword(10),
"",
method="POST")
self.debug(loginRes)
self.assertEqual(loginRes, None, self.reason)
@attr(tags=["advanced", "basic"], required_hardware="true")
def test_05_ldap(self):
"""user is not present anywhere --> login should fail"""
loginRes = checklogin(self, randomword(10), randomword(10),
self.parent_domain.name,
method="POST")
self.debug(loginRes)
self.assertEqual(loginRes, None, self.reason)
@attr(tags=["advanced", "basic"], required_hardware="true")
def test_06_ldap(self):
"""Delete the LDAP user from CS and try to login --> User should be
created again"""
try:
deleteAcct2 = deleteAccount.deleteAccountCmd()
deleteAcct2.id = self.ldapacctID
acct_name = self.services["configurableData"][
"link_ldap_details"]["linkLdapUsername"]
self.apiClient.deleteAccount(deleteAcct2)
self.debug(
"Deleted the following account name %s:" %
acct_name)
except Exception as e:
raise Exception(
"Warning: Exception during deleting "
"ldap imported account : %s" %
e)
loginRes = checklogin(
self,
self.user,
self.password,
self.parent_domain.name,
method="POST")
self.debug(loginRes)
self.assertEqual(loginRes, 1, self.reason)
@attr(tags=["advanced", "basic"], required_hardware="true")
def test_07_ldap(self):
"""Lock the user from CS and attempt to login --> login should fail"""
self.lockAcct = disableAccount.disableAccountCmd()
self.lockAcct.lock = 'true'
self.lockAcct.account = self.services["configurableData"][
"ldap_account"]["username"]
self.lockAcct.domainid = self.parent_domain.id
self.apiClient.disableAccount(self.lockAcct)
loginRes = checklogin(
self,
self.user,
self.password,
self.parent_domain.name,
method="POST")
self.debug(loginRes)
self.assertEqual(loginRes, None, self.reason)
@attr(tags=["advanced", "basic"], required_hardware="true")
def test_08_ldap(self):
"""Create different domains and link all of them to LDAP. Check
login in each domain --> login should be successful"""
try:
loginRes = checklogin(
self,
self.user,
self.password,
self.parent_domain.name,
method="POST")
self.debug(loginRes)
self.assertEqual(loginRes, 1, self.reason)
self.domain2 = Domain.create(
self.apiclient,
services=self.services["domain"],
parentdomainid=self.domain.id)
# here link ldap to domain
self.ldaplink2 = linkDomainToLdap.linkDomainToLdapCmd()
self.ldaplink2.domainid = self.domain2.id
self.ldaplink2.accounttype = self.services[
"configurableData"]["link_ldap_details"]["accounttype"]
self.ldaplink2.name = self.services[
"configurableData"]["link_ldap_details"]["name"]
self.ldaplink2.type = self.services[
"configurableData"]["link_ldap_details"]["type"]
if self.services["configurableData"][
"link_ldap_details"]["admin"] is not None:
self.ldaplink2.admin = self.services[
"configurableData"]["link_ldap_details"]["admin"]
if self.ldaplink2.domainid == "" \
or self.ldaplink2.accounttype == "" \
or self.ldaplink2.name == "" \
or self.ldaplink2.type == "":
self.debug(
"Please rerun the test by providing"
" values in link_ldap configuration user details")
self.skipTest(
"Please rerun the test by providing "
"proper values in configuration file(link ldap)")
else:
self.delflag2 = 1
self.ldaplinkRes2 = self.apiClient.linkDomainToLdap(
self.ldaplink2)
self.assertEqual(
self.delflag2,
1,
"Linking LDAP failed,please check the configuration")
loginRes = checklogin(
self,
self.user,
self.password,
self.domain2.name,
method="POST")
self.debug(loginRes)
self.assertEqual(loginRes, 1, self.reason)
self.domain3 = Domain.create(
self.apiclient,
services=self.services["domain"],
parentdomainid=self.domain.id)
# here link ldap to domain
self.ldaplink3 = linkDomainToLdap.linkDomainToLdapCmd()
self.ldaplink3.domainid = self.domain3.id
self.ldaplink3.accounttype = self.services[
"configurableData"]["link_ldap_details"]["accounttype"]
self.ldaplink3.name = self.services[
"configurableData"]["link_ldap_details"]["name"]
self.ldaplink3.type = self.services[
"configurableData"]["link_ldap_details"]["type"]
if self.services["configurableData"][
"link_ldap_details"]["admin"] is not None:
self.ldaplink3.admin = self.services[
"configurableData"]["link_ldap_details"]["admin"]
if self.ldaplink3.domainid == "" \
or self.ldaplink3.accounttype == "" \
or self.ldaplink3.name == "" \
or self.ldaplink3.type == "":
self.debug(
"Please rerun the test by providing"
" values in link_ldap configuration user details")
self.skipTest(
"Please rerun the test by providing "
"proper values in configuration file(link ldap)")
else:
self.delflag3 = 1
self.ldaplinkRes3 = self.apiClient.linkDomainToLdap(
self.ldaplink3)
self.assertEqual(
self.delflag3,
1,
"Linking LDAP failed,please check the configuration")
loginRes = checklogin(
self,
self.user,
self.password,
self.domain2.name,
method="POST")
self.debug(loginRes)
self.assertEqual(loginRes, 1, self.reason)
finally:
try:
self.domain2.delete(self.apiclient, cleanup=True)
except Exception as e:
raise Exception(
"Warning: Exception during deletion of domain : %s" % e)
try:
self.domain3.delete(self.apiclient, cleanup=True)
except Exception as e:
raise Exception(
"Warning: Exception during deletion of domain : %s" % e)
return
@attr(tags=["advanced", "basic"], required_hardware="true")
def test_09_ldap(self):
""" Enable nested groups and try to login with a user that is in
nested group --> login should be successful"""
if self.services["configurableData"]["link_ldap_details"]["linkLdapNestedUser"] == "":
self.skipTest("No nested user mentioned")
updateConfigurationCmd = updateConfiguration.updateConfigurationCmd()
updateConfigurationCmd.name = "ldap.nested.groups.enable"
updateConfigurationCmd.value = 'true'
self.apiClient.updateConfiguration(updateConfigurationCmd)
loginRes = checklogin(
self,
self.services["configurableData"]["link_ldap_details"]["linkLdapNestedUser"],
self.services["configurableData"]["link_ldap_details"]["linkLdapNestedPassword"],
self.parent_domain.name,
method="POST")
self.debug(loginRes)
self.assertEqual(loginRes, 1, self.reason)
@attr(tags=["advanced", "basic"], required_hardware="true")
def test_10_ldap(self):
"""Check db tables"""
db_check = 1
domainID = self.dbclient.execute(
"SELECT id FROM domain WHERE uuid=" + "'" +
self.parent_domain.id + "'" + ";",
db="cloud")
dbChecking = self.dbclient.execute(
"SELECT type,name,account_type "
"FROM ldap_trust_map WHERE domain_id=" + "'" +
str(domainID[0][0]) + "'" + ";",
db="cloud")
if dbChecking is not None and str(
dbChecking[0][0]) == \
self.services["configurableData"]["link_ldap_details"]["type"] \
and str(
dbChecking[0][1]) == \
self.services["configurableData"]["link_ldap_details"]["name"] \
and str(
dbChecking[0][2]) == \
self.services["configurableData"]["link_ldap_details"]["accounttype"]:
db_check = 0
self.assertEqual(db_check, 0, "DB check failed")
@attr(tags=["advanced", "basic"], required_hardware="true")
def test_11_ldap(self):
"""Password/domain empty --> login should fail"""
loginRes = checklogin(
self,
"", "", self.parent_domain.name, method="POST")
self.debug(loginRes)
self.assertEqual(loginRes, None, self.reason)