# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """ P1 for LDAP Config """ #!/usr/bin/env python import marvin from marvin import cloudstackTestCase from marvin.cloudstackTestCase import * import unittest import hashlib import random from marvin.cloudstackAPI import * from marvin.cloudstackAPI import login from marvin.integration.lib.utils import * from marvin.integration.lib.base import * from marvin.integration.lib.common import * from nose.plugins.attrib import attr import urllib class Services: """Test LDAP Configuration """ def __init__(self): self.services = { "account": { "email": "test@test.com", "firstname": "test", "lastname": "t", "username": "test", "password": "password", }, "ldapCon_1":#valid values&Query filter as email. { "ldapHostname": "10.147.38.163", "port": "389", "binddn": "CN=test,CN=Users,DC=hyd-qa,DC=com", "bindpass": "aaaa_1111", "queryfilter": "(&(mail=%e))", "searchbase": "CN=Users,DC=hyd-qa,DC=com", "ldapusername": "test", "ldappasswd": "aaaa_1111" }, "ldapCon_2": ##valid values&Query filter as displayName. { "ldapHostname": "10.147.38.163", "port": "389", "binddn": "CN=test,CN=Users,DC=hyd-qa,DC=com", "bindpass": "aaaa_1111", "queryfilter": "(&(displayName=%u))", "searchbase": "CN=Users,DC=hyd-qa,DC=com", "ldapusername": "test", "ldappasswd": "aaaa_1111" }, "ldapCon_3": #Configuration with missing parameters value(queryfilter) { "ldapHostname": "10.147.38.163", "port": "389", "binddn": "CN=test,CN=Users,DC=hyd-qa,DC=com", "bindpass": "aaaa_1111", "queryfilter": "", "searchbase": "CN=Users,DC=hyd-qa,DC=com", "ldapusername": "test", "ldappasswd": "aaaa_1111" }, "ldapCon_4": #invalid configuration-wrong query filter { "ldapHostname": "10.147.38.163", "port": "389", "binddn": "CN=test,CN=Users,DC=hyd-qa,DC=com", "bindpass": "aaaa_1111", "queryfilter": "(&(displayName=%p))", "searchbase":"CN=Users,DC=hyd-qa,DC=com", "ldapusername": "test", "ldappasswd": "aaaa_1111" }, "ldapCon_5": #Configuration with invalid ldap credentials { "ldapHostname": "10.147.38.163", "port": "389", "binddn": "CN=test,CN=Users,DC=hyd-qa,DC=com", "bindpass": "aaaa_1111", "queryfilter": "(&(displayName=%u))", "searchbase": "CN=Users,DC=hyd-qa,DC=com", "ldapusername": "test", "ldappasswd": "aaaa" } } class TestLdap(cloudstackTestCase): """ This test perform registering ldap configuration details in CS and create a user[ldap user] in CS and validate user credentials against LDAP server:AD """ @classmethod def setUpClass(cls): cls.api_client = super( TestLdap, cls ).getClsTestClient().getApiClient() cls.services = Services().services cls.account = cls.services["account"] cls._cleanup = [] @classmethod def tearDownClass(cls): try: #Cleanup resources used #print "tear down class" cleanup_resources(cls.api_client, cls._cleanup) except Exception as tde: raise Exception("Warning: Exception during cleanup : %s" % tde) return def setUp(self): self.apiclient = self.testClient.getApiClient() self.acct = createAccount.createAccountCmd() self.acct.accounttype = 0 #We need a regular user. admins have accounttype=1 self.acct.firstname = self.services["account"]["firstname"] self.acct.lastname = self.services["account"]["lastname"] self.acct.password = self.services["account"]["password"] self.acct.username = self.services["account"]["username"] self.acct.email = self.services["account"]["email"] self.acct.account = self.services["account"]["username"] self.acct.domainid = 1 # mapping ldap user by creating same user in cloudstack self.acctRes = self.apiclient.createAccount(self.acct) return def tearDown(self): try: #Clean up, terminate the created accounts, domains etc deleteAcct = deleteAccount.deleteAccountCmd() deleteAcct.id = self.acctRes.id acct_name=self.acctRes.name self.apiclient.deleteAccount(deleteAcct) self.debug("Deleted the the following account name %s:" %acct_name) #delete only if ldapconfig registered in CS if(self.ldapconfRes): deleteldapconfg=ldapRemove.ldapRemoveCmd() res=self.apiclient.ldapRemove(deleteldapconfg) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) return def test_01_configLDAP(self): ''' This test is to verify ldapConfig API with valid values.(i.e query fileter as email) ''' # 1. This test covers ldapConfig & login API with valid ldap credentials.. # require ldap configuration:ldapCon_1 self.debug("start test") self.ldapconfRes=self._testldapConfig(self.services["ldapCon_1"]) if(self.ldapconfRes==1): self.debug("configure ldap successful") #validating the user credentials with ldap Server loginRes = self.chkLogin(self.services["ldapCon_1"]["ldapusername"], self.services["ldapCon_1"]["ldappasswd"]) self.assertEquals(loginRes,1,"ldap Authentication failed") else: self.debug("LDAP Configuration failed with exception") self.assertEquals(self.ldapconfRes,1,"ldapConfig API failed") self.debug("end test") def test_02_configLDAP(self): ''' This test is to verify ldapConfig API with valid values.(i.e query fileter as displayName) ''' # 1. This test covers ldapConfig & login API with valid ldap credentials. # 2. require ldap configuration:ldapCon_2 self.debug("start test") self.ldapconfRes=self._testldapConfig(self.services["ldapCon_2"]) self.assertEquals(self.ldapconfRes,1,"ldapConfig API failed") if(self.ldapconfRes==1): self.debug("configure ldap successful") #validating the user credentials with ldap Server loginRes = self.chkLogin(self.services["ldapCon_2"]["ldapusername"], self.services["ldapCon_2"]["ldappasswd"]) self.assertEquals(loginRes,1,"ldap Authentication failed") else: self.debug("LDAP Configuration failed with exception") self.debug("end test") def test_03_configLDAP(self): ''' This test is to verify ldapConfig API with missing config parameters value(i.queryfilter) ''' # 1. Issue ldapConfig API with no ldap config parameter value and check behavior # 2. require ldap configuration:ldapCon_3 self.debug("start test...") self.ldapconfRes=self._testldapConfig(self.services["ldapCon_3"]) self.assertEquals(self.ldapconfRes,0,"LDAP configuration successful with invalid value.API failed") self.debug("end test") def test_04_configLDAP(self): ''' This test is to verify ldapConfig API with invalid configuration values(by passing wrong query filter) ''' # 1. calling ldapConfig API with invalid query filter value and check behavior # 2. require ldap configuration:ldapCon_4 self.debug("start test...") self.ldapconfRes=self._testldapConfig(self.services["ldapCon_4"]) self.assertEquals(self.ldapconfRes,0,"API failed") def test_05_configLDAP(self): ''' This test is to verify login API functionality by passing wrong ldap credentials ''' # 1.This script first configure the ldap and validates the user credentials using login API # 2. require ldap configuration:ldapCon_5 self.debug("start test") self.ldapconfRes=self._testldapConfig(self.services["ldapCon_5"]) self.assertEquals(self.ldapconfRes,1,"API failed") #validating the cloudstack user credentials with ldap Server loginRes = self.chkLogin(self.services["ldapCon_5"]["ldapusername"], self.services["ldapCon_5"]["ldappasswd"]) self.assertNotEqual(loginRes,1,"login API failed") self.debug("end test") def test_06_removeLDAP(self): ''' This test is to verify ldapRemove API functionality ''' # 1. This script fist configures ldap and removes the configured ldap values # 2. require ldap configuration:ldapCon_1 self.debug("start test") self.ldapconfRes=self._testldapConfig(self.services["ldapCon_1"]) if(self.ldapconfRes==1): self.debug("ldap configured successfully") deleteldapconfg=ldapRemove.ldapRemoveCmd() res=self.apiclient.ldapRemove(deleteldapconfg) self.debug("ldap removed successfully") self.ldapconfRes=0 else: self.debug("LDAP Configuration failed with exception") self.assertEquals(self.ldapconfRes,0,"ldapconfig API failed") self.debug("end test") def _testldapConfig(self,ldapSrvD): """ :param ldapSrvD """ #This Method takes dictionary as parameter, # reads the ldap configuration values from the passed dictionary and # register the ldapconfig detail in cloudstack # & return true or false based on ldapconfig API response self.debug("start ldapconfig test") #creating the ldapconfig cmd object lpconfig = ldapConfig.ldapConfigCmd() #Config the ldap server by assigning the ldapconfig dict variable values to ldapConfig object lpconfig.hostname = ldapSrvD["ldapHostname"] lpconfig.port = ldapSrvD["port"] lpconfig.binddn = ldapSrvD["binddn"] lpconfig.bindpass = ldapSrvD["bindpass"] lpconfig.searchbase = ldapSrvD["searchbase"] lpconfig.queryfilter = ldapSrvD["queryfilter"] #end of assigning the variables #calling the ldapconfig Api self.debug("calling ldapconfig API") try: lpconfig1 = self.apiclient.ldapConfig(lpconfig) self.debug("ldapconfig API succesfful") return 1 except Exception, e: self.debug("ldapconfig API failed %s" %e) return 0 def chkLogin(self, username, password): """ :param username: :param password: """ self.debug("login test") try: login1 = login.loginCmd() login1.username = username login1.password = password loginRes = self.apiclient.login(login1) self.debug("login response %s" % loginRes) if loginRes is None: self.debug("login not successful") else: self.debug("login successful") return 1 except Exception, p: self.debug("login operation failed %s" %p) self.debug("end of Login")