mirror of
				https://github.com/apache/cloudstack.git
				synced 2025-11-04 00:02:37 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			367 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			367 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# Licensed to the Apache Software Foundation (ASF) under one
 | 
						|
# or more contributor license agreements.  See the NOTICE file
 | 
						|
# distributed with this work for additional information
 | 
						|
# regarding copyright ownership.  The ASF licenses this file
 | 
						|
# to you under the Apache License, Version 2.0 (the
 | 
						|
# "License"); you may not use this file except in compliance
 | 
						|
# with the License.  You may obtain a copy of the License at
 | 
						|
#
 | 
						|
#   http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing,
 | 
						|
# software distributed under the License is distributed on an
 | 
						|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 | 
						|
# KIND, either express or implied.  See the License for the
 | 
						|
# specific language governing permissions and limitations
 | 
						|
# under the License.
 | 
						|
 | 
						|
""" P1 for LDAP Config
 | 
						|
"""
 | 
						|
 | 
						|
 | 
						|
#!/usr/bin/env python
 | 
						|
 | 
						|
import marvin
 | 
						|
from marvin import cloudstackTestCase
 | 
						|
from marvin.cloudstackTestCase import *
 | 
						|
import unittest
 | 
						|
import hashlib
 | 
						|
import random
 | 
						|
from marvin.cloudstackAPI import *
 | 
						|
from marvin.cloudstackAPI import login
 | 
						|
from marvin.integration.lib.utils import *
 | 
						|
from marvin.integration.lib.base import *
 | 
						|
from marvin.integration.lib.common import *
 | 
						|
from nose.plugins.attrib import attr
 | 
						|
import urllib
 | 
						|
 | 
						|
 | 
						|
 | 
						|
class Services:
 | 
						|
    """Test LDAP Configuration
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self):
 | 
						|
        self.services = {
 | 
						|
            "account": {
 | 
						|
                "email": "test@test.com",
 | 
						|
                "firstname": "test",
 | 
						|
                "lastname": "t",
 | 
						|
                "username": "test",
 | 
						|
                "password": "password",
 | 
						|
                },
 | 
						|
            "ldapCon_1":#valid values&Query filter as email.
 | 
						|
                {
 | 
						|
                    "ldapHostname": "10.147.38.163",
 | 
						|
                    "port": "389",
 | 
						|
                    "binddn": "CN=test,CN=Users,DC=hyd-qa,DC=com",
 | 
						|
                    "bindpass": "aaaa_1111",
 | 
						|
                    "queryfilter": "(&(mail=%e))",
 | 
						|
                    "searchbase": "CN=Users,DC=hyd-qa,DC=com",
 | 
						|
                    "ldapusername": "test",
 | 
						|
                    "ldappasswd": "aaaa_1111"
 | 
						|
                },
 | 
						|
            "ldapCon_2": ##valid values&Query filter as displayName.
 | 
						|
                {
 | 
						|
                    "ldapHostname": "10.147.38.163",
 | 
						|
                    "port": "389",
 | 
						|
                    "binddn": "CN=test,CN=Users,DC=hyd-qa,DC=com",
 | 
						|
                    "bindpass": "aaaa_1111",
 | 
						|
                    "queryfilter": "(&(displayName=%u))",
 | 
						|
                    "searchbase": "CN=Users,DC=hyd-qa,DC=com",
 | 
						|
                    "ldapusername": "test",
 | 
						|
                    "ldappasswd": "aaaa_1111"
 | 
						|
                },
 | 
						|
            "ldapCon_3": #Configuration with missing parameters value(queryfilter)
 | 
						|
                {
 | 
						|
                    "ldapHostname": "10.147.38.163",
 | 
						|
                    "port": "389",
 | 
						|
                    "binddn": "CN=test,CN=Users,DC=hyd-qa,DC=com",
 | 
						|
                    "bindpass": "aaaa_1111",
 | 
						|
                    "queryfilter": "",
 | 
						|
                    "searchbase": "CN=Users,DC=hyd-qa,DC=com",
 | 
						|
                    "ldapusername": "test",
 | 
						|
                    "ldappasswd": "aaaa_1111"
 | 
						|
                },
 | 
						|
 | 
						|
            "ldapCon_4": #invalid configuration-wrong query filter
 | 
						|
                {
 | 
						|
                    "ldapHostname": "10.147.38.163",
 | 
						|
                    "port": "389",
 | 
						|
                    "binddn": "CN=test,CN=Users,DC=hyd-qa,DC=com",
 | 
						|
                    "bindpass": "aaaa_1111",
 | 
						|
                    "queryfilter": "(&(displayName=%p))",
 | 
						|
                    "searchbase":"CN=Users,DC=hyd-qa,DC=com",
 | 
						|
                    "ldapusername": "test",
 | 
						|
                    "ldappasswd": "aaaa_1111"
 | 
						|
                },
 | 
						|
            "ldapCon_5": #Configuration with invalid ldap credentials
 | 
						|
                {
 | 
						|
                    "ldapHostname": "10.147.38.163",
 | 
						|
                    "port": "389",
 | 
						|
                    "binddn": "CN=test,CN=Users,DC=hyd-qa,DC=com",
 | 
						|
                    "bindpass": "aaaa_1111",
 | 
						|
                    "queryfilter": "(&(displayName=%u))",
 | 
						|
                    "searchbase": "CN=Users,DC=hyd-qa,DC=com",
 | 
						|
                    "ldapusername": "test",
 | 
						|
                    "ldappasswd": "aaaa"
 | 
						|
                }
 | 
						|
 | 
						|
 | 
						|
 | 
						|
        }
 | 
						|
 | 
						|
 | 
						|
class TestLdap(cloudstackTestCase):
 | 
						|
    """
 | 
						|
    This test perform registering ldap configuration details in CS and create a user[ldap user] in CS
 | 
						|
     and  validate user credentials against LDAP server:AD
 | 
						|
    """
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def setUpClass(cls):
 | 
						|
 | 
						|
        cls.api_client = super(
 | 
						|
            TestLdap,
 | 
						|
            cls
 | 
						|
        ).getClsTestClient().getApiClient()
 | 
						|
        cls.services = Services().services
 | 
						|
        cls.account = cls.services["account"]
 | 
						|
        cls._cleanup = []
 | 
						|
 | 
						|
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def tearDownClass(cls):
 | 
						|
        try:
 | 
						|
            #Cleanup resources used
 | 
						|
            #print "tear down class"
 | 
						|
            cleanup_resources(cls.api_client, cls._cleanup)
 | 
						|
 | 
						|
        except Exception as tde:
 | 
						|
            raise Exception("Warning: Exception during cleanup : %s" % tde)
 | 
						|
        return
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
 | 
						|
        self.apiclient = self.testClient.getApiClient()
 | 
						|
 | 
						|
        self.acct = createAccount.createAccountCmd()
 | 
						|
        self.acct.accounttype = 0 #We need a regular user. admins have accounttype=1
 | 
						|
        self.acct.firstname = self.services["account"]["firstname"]
 | 
						|
        self.acct.lastname = self.services["account"]["lastname"]
 | 
						|
        self.acct.password = self.services["account"]["password"]
 | 
						|
        self.acct.username = self.services["account"]["username"]
 | 
						|
        self.acct.email = self.services["account"]["email"]
 | 
						|
        self.acct.account = self.services["account"]["username"]
 | 
						|
        self.acct.domainid = 1
 | 
						|
        # mapping ldap user  by creating same user in  cloudstack
 | 
						|
 | 
						|
        self.acctRes = self.apiclient.createAccount(self.acct)
 | 
						|
 | 
						|
 | 
						|
        return
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
 | 
						|
        try:
 | 
						|
            #Clean up, terminate the created accounts, domains etc
 | 
						|
 | 
						|
            deleteAcct = deleteAccount.deleteAccountCmd()
 | 
						|
            deleteAcct.id = self.acctRes.id
 | 
						|
 | 
						|
            acct_name=self.acctRes.name
 | 
						|
 | 
						|
            self.apiclient.deleteAccount(deleteAcct)
 | 
						|
 | 
						|
            self.debug("Deleted the the following account name %s:" %acct_name)
 | 
						|
            #delete only if ldapconfig registered  in CS
 | 
						|
            if(self.ldapconfRes):
 | 
						|
                deleteldapconfg=ldapRemove.ldapRemoveCmd()
 | 
						|
                res=self.apiclient.ldapRemove(deleteldapconfg)
 | 
						|
 | 
						|
 | 
						|
        except Exception as e:
 | 
						|
            raise Exception("Warning: Exception during cleanup : %s" % e)
 | 
						|
        return
 | 
						|
 | 
						|
    @attr(tags=["advanced", "basic"])
 | 
						|
    def test_01_configLDAP(self):
 | 
						|
        '''
 | 
						|
        This test is to verify ldapConfig API  with valid  values.(i.e query fileter as email)
 | 
						|
        '''
 | 
						|
        # 1. This test covers ldapConfig  & login API with valid ldap credentials..
 | 
						|
        # require ldap configuration:ldapCon_1
 | 
						|
 | 
						|
        self.debug("start test")
 | 
						|
 | 
						|
        self.ldapconfRes=self._testldapConfig(self.services["ldapCon_1"])
 | 
						|
 | 
						|
        if(self.ldapconfRes==1):
 | 
						|
 | 
						|
 | 
						|
            self.debug("configure ldap successful")
 | 
						|
 | 
						|
            #validating the user credentials with ldap Server
 | 
						|
            loginRes = self.chkLogin(self.services["ldapCon_1"]["ldapusername"], self.services["ldapCon_1"]["ldappasswd"])
 | 
						|
            self.assertEquals(loginRes,1,"ldap Authentication failed")
 | 
						|
 | 
						|
        else:
 | 
						|
 | 
						|
            self.debug("LDAP Configuration failed with exception")
 | 
						|
 | 
						|
            self.assertEquals(self.ldapconfRes,1,"ldapConfig API failed")
 | 
						|
 | 
						|
 | 
						|
        self.debug("end test")
 | 
						|
 | 
						|
    @attr(tags=["advanced", "basic"])
 | 
						|
    def test_02_configLDAP(self):
 | 
						|
        '''
 | 
						|
        This test is to verify ldapConfig API  with valid  values.(i.e query fileter as displayName)
 | 
						|
        '''
 | 
						|
 | 
						|
        # 1. This test covers ldapConfig  & login API with valid ldap credentials.
 | 
						|
        # 2. require ldap configuration:ldapCon_2
 | 
						|
 | 
						|
        self.debug("start test")
 | 
						|
        self.ldapconfRes=self._testldapConfig(self.services["ldapCon_2"])
 | 
						|
        self.assertEquals(self.ldapconfRes,1,"ldapConfig API failed")
 | 
						|
        if(self.ldapconfRes==1):
 | 
						|
            self.debug("configure ldap successful")
 | 
						|
            #validating the user credentials with ldap Server
 | 
						|
            loginRes = self.chkLogin(self.services["ldapCon_2"]["ldapusername"], self.services["ldapCon_2"]["ldappasswd"])
 | 
						|
            self.assertEquals(loginRes,1,"ldap Authentication failed")
 | 
						|
        else:
 | 
						|
            self.debug("LDAP Configuration failed with exception")
 | 
						|
        self.debug("end test")
 | 
						|
 | 
						|
    @attr(tags=["advanced", "basic"])
 | 
						|
    def test_03_configLDAP(self):
 | 
						|
 | 
						|
        '''
 | 
						|
        This test is to verify ldapConfig API  with missing config parameters value(i.queryfilter)
 | 
						|
        '''
 | 
						|
 | 
						|
        # 1. Issue ldapConfig API with no ldap config parameter value and check behavior
 | 
						|
        # 2. require ldap configuration:ldapCon_3
 | 
						|
 | 
						|
        self.debug("start test...")
 | 
						|
        self.ldapconfRes=self._testldapConfig(self.services["ldapCon_3"])
 | 
						|
        self.assertEquals(self.ldapconfRes,0,"LDAP configuration successful with invalid value.API failed")
 | 
						|
        self.debug("end test")
 | 
						|
    @attr(tags=["advanced", "basic"])
 | 
						|
    def test_04_configLDAP(self):
 | 
						|
        '''
 | 
						|
        This test is to verify ldapConfig API with invalid configuration values(by passing wrong query filter)
 | 
						|
        '''
 | 
						|
        # 1. calling ldapConfig API with invalid query filter value and check behavior
 | 
						|
        # 2. require ldap configuration:ldapCon_4
 | 
						|
 | 
						|
        self.debug("start test...")
 | 
						|
        self.ldapconfRes=self._testldapConfig(self.services["ldapCon_4"])
 | 
						|
        self.assertEquals(self.ldapconfRes,0,"API failed")
 | 
						|
 | 
						|
 | 
						|
    @attr(tags=["advanced", "basic"])
 | 
						|
    def test_05_configLDAP(self):
 | 
						|
 | 
						|
        '''
 | 
						|
        This test is to verify login API functionality by passing wrong ldap credentials
 | 
						|
        '''
 | 
						|
        # 1.This script first  configure the ldap and validates the user credentials using login API
 | 
						|
        # 2. require ldap configuration:ldapCon_5
 | 
						|
 | 
						|
 | 
						|
        self.debug("start test")
 | 
						|
        self.ldapconfRes=self._testldapConfig(self.services["ldapCon_5"])
 | 
						|
        self.assertEquals(self.ldapconfRes,1,"API failed")
 | 
						|
        #validating the cloudstack user credentials with ldap Server
 | 
						|
        loginRes = self.chkLogin(self.services["ldapCon_5"]["ldapusername"], self.services["ldapCon_5"]["ldappasswd"])
 | 
						|
        self.assertNotEqual(loginRes,1,"login API failed")
 | 
						|
        self.debug("end test")
 | 
						|
 | 
						|
    @attr(tags=["advanced", "basic"])
 | 
						|
    def test_06_removeLDAP(self):
 | 
						|
        '''
 | 
						|
        This test is to verify ldapRemove API functionality
 | 
						|
        '''
 | 
						|
        # 1. This script fist configures ldap and removes the configured ldap values
 | 
						|
        # 2. require ldap configuration:ldapCon_1
 | 
						|
 | 
						|
 | 
						|
        self.debug("start test")
 | 
						|
        self.ldapconfRes=self._testldapConfig(self.services["ldapCon_1"])
 | 
						|
        if(self.ldapconfRes==1):
 | 
						|
            self.debug("ldap configured successfully")
 | 
						|
            deleteldapconfg=ldapRemove.ldapRemoveCmd()
 | 
						|
            res=self.apiclient.ldapRemove(deleteldapconfg)
 | 
						|
            self.debug("ldap removed successfully")
 | 
						|
            self.ldapconfRes=0
 | 
						|
        else:
 | 
						|
 | 
						|
            self.debug("LDAP Configuration failed with exception")
 | 
						|
            self.assertEquals(self.ldapconfRes,0,"ldapconfig API failed")
 | 
						|
        self.debug("end test")
 | 
						|
 | 
						|
    def _testldapConfig(self,ldapSrvD):
 | 
						|
 | 
						|
        """
 | 
						|
 | 
						|
        :param ldapSrvD
 | 
						|
 | 
						|
 | 
						|
        """
 | 
						|
        #This Method takes dictionary as parameter,
 | 
						|
        # reads the ldap configuration values from the passed dictionary and
 | 
						|
        # register the ldapconfig detail in cloudstack
 | 
						|
        # & return true or false based on ldapconfig API response
 | 
						|
 | 
						|
        self.debug("start ldapconfig  test")
 | 
						|
        #creating the  ldapconfig cmd object
 | 
						|
        lpconfig = ldapConfig.ldapConfigCmd()
 | 
						|
        #Config the ldap server by assigning the ldapconfig dict variable values to ldapConfig object
 | 
						|
        lpconfig.hostname = ldapSrvD["ldapHostname"]
 | 
						|
        lpconfig.port = ldapSrvD["port"]
 | 
						|
        lpconfig.binddn = ldapSrvD["binddn"]
 | 
						|
        lpconfig.bindpass = ldapSrvD["bindpass"]
 | 
						|
        lpconfig.searchbase = ldapSrvD["searchbase"]
 | 
						|
        lpconfig.queryfilter = ldapSrvD["queryfilter"]
 | 
						|
 | 
						|
        #end of assigning the variables
 | 
						|
 | 
						|
        #calling the ldapconfig Api
 | 
						|
        self.debug("calling ldapconfig API")
 | 
						|
        try:
 | 
						|
            lpconfig1 = self.apiclient.ldapConfig(lpconfig)
 | 
						|
            self.debug("ldapconfig API succesfful")
 | 
						|
            return 1
 | 
						|
        except Exception, e:
 | 
						|
            self.debug("ldapconfig API failed %s" %e)
 | 
						|
            return 0
 | 
						|
 | 
						|
    def chkLogin(self, username, password):
 | 
						|
        """
 | 
						|
 | 
						|
        :param username:
 | 
						|
        :param password:
 | 
						|
 | 
						|
        """
 | 
						|
        self.debug("login test")
 | 
						|
 | 
						|
        try:
 | 
						|
            login1 = login.loginCmd()
 | 
						|
            login1.username = username
 | 
						|
            login1.password = password
 | 
						|
            loginRes = self.apiclient.login(login1)
 | 
						|
            self.debug("login response %s" % loginRes)
 | 
						|
            if loginRes is None:
 | 
						|
                self.debug("login not successful")
 | 
						|
            else:
 | 
						|
                self.debug("login successful")
 | 
						|
                return 1
 | 
						|
 | 
						|
        except Exception, p:
 | 
						|
            self.debug("login operation failed %s" %p)
 | 
						|
        self.debug("end of Login")
 |