# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """ P1 for LDAP Config """ #!/usr/bin/env python import marvin from marvin import cloudstackTestCase from marvin.cloudstackTestCase import * import unittest import hashlib import random from marvin.cloudstackAPI import * from marvin.cloudstackAPI import login from marvin.lib.utils import * from marvin.lib.base import * from marvin.lib.common import * from nose.plugins.attrib import attr import urllib class TestLdap(cloudstackTestCase): """ This tests attempts to register a LDAP server and authenticate as an LDAP user. """ @classmethod def setUpClass(cls): testClient = super(TestLdap, cls).getClsTestClient() cls.api_client = testClient.getApiClient() cls.services = testClient.getParsedTestDataConfig() cls.account = cls.services["ldap_account"] cls._cleanup = [] @classmethod def tearDownClass(cls): try: cleanup_resources(cls.api_client, cls._cleanup) except Exception as tde: raise Exception("Warning: Exception during cleanup : %s" % tde) return def setUp(self): self.apiClient = self.testClient.getApiClient() self.acct = createAccount.createAccountCmd() self.acct.accounttype = 0 self.acct.firstname = self.services["ldap_account"]["firstname"] self.acct.lastname = self.services["ldap_account"]["lastname"] self.acct.password = self.services["ldap_account"]["password"] self.acct.username = self.services["ldap_account"]["username"] self.acct.email = self.services["ldap_account"]["email"] self.acct.account = self.services["ldap_account"]["username"] self.acct.domainid = 1 self.acctRes = self.apiClient.createAccount(self.acct) return def tearDown(self): try: deleteAcct = deleteAccount.deleteAccountCmd() deleteAcct.id = self.acctRes.id acct_name=self.acctRes.name self.apiClient.deleteAccount(deleteAcct) self.debug("Deleted the the following account name %s:" %acct_name) if(self.ldapconfRes==1): self._deleteLdapConfiguration(self.services["ldapConfiguration_1"]) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) return @attr(tags=["advanced", "basic"], required_hardware="false") def test_01_addLdapConfiguration(self): """ This test configures LDAP and attempts to authenticate as a user. """ self.debug("start test") self.ldapconfRes=self._addLdapConfiguration(self.services["ldapConfiguration_1"]) if(self.ldapconfRes==1): self.debug("Ldap Configuration was succcessful") loginRes = self._checkLogin(self.services["ldapConfiguration_1"]["ldapUsername"],self.services["ldapConfiguration_1"]["ldapPassword"]) self.debug(loginRes) self.assertEquals(loginRes,1,"Ldap Authentication") else: self.debug("LDAP Configuration failed with exception") self.assertEquals(self.ldapconfRes,1,"addLdapConfiguration failed") self.debug("end test") def _addLdapConfiguration(self,ldapConfiguration): """ :param ldapConfiguration """ # Setup Global settings updateConfigurationCmd = updateConfiguration.updateConfigurationCmd() updateConfigurationCmd.name = "ldap.basedn" updateConfigurationCmd.value = ldapConfiguration['basedn'] updateConfigurationResponse = self.apiClient.updateConfiguration(updateConfigurationCmd) self.debug("updated the parameter %s with value %s"%(updateConfigurationResponse.name, updateConfigurationResponse.value)) updateConfigurationCmd = updateConfiguration.updateConfigurationCmd() updateConfigurationCmd.name = "ldap.email.attribute" updateConfigurationCmd.value = ldapConfiguration['emailAttribute'] updateConfigurationResponse = self.apiClient.updateConfiguration(updateConfigurationCmd) self.debug("updated the parameter %s with value %s"%(updateConfigurationResponse.name, updateConfigurationResponse.value)) updateConfigurationCmd = updateConfiguration.updateConfigurationCmd() updateConfigurationCmd.name = "ldap.user.object" updateConfigurationCmd.value = ldapConfiguration['userObject'] updateConfigurationResponse = self.apiClient.updateConfiguration(updateConfigurationCmd) self.debug("updated the parameter %s with value %s"%(updateConfigurationResponse.name, updateConfigurationResponse.value)) updateConfigurationCmd = updateConfiguration.updateConfigurationCmd() updateConfigurationCmd.name = "ldap.username.attribute" updateConfigurationCmd.value = ldapConfiguration['usernameAttribute'] updateConfigurationResponse = self.apiClient.updateConfiguration(updateConfigurationCmd) self.debug("updated the parameter %s with value %s"%(updateConfigurationResponse.name, updateConfigurationResponse.value)) self.debug("start addLdapConfiguration test") ldapServer = addLdapConfiguration.addLdapConfigurationCmd() ldapServer.hostname = ldapConfiguration['hostname'] ldapServer.port = ldapConfiguration['port'] self.debug("calling addLdapConfiguration API command") try: self.apiClient.addLdapConfiguration(ldapServer) self.debug("addLdapConfiguration was successful") return 1 except Exception, e: self.debug("addLdapConfiguration failed %s" %e) return 0 def _deleteLdapConfiguration(self,ldapConfiguration): """ :param ldapConfiguration """ ldapServer = deleteLdapConfiguration.deleteLdapConfigurationCmd() ldapServer.hostname = ldapConfiguration["hostname"] try: self.apiClient.deleteLdapConfiguration(ldapServer) self.debug("deleteLdapConfiguration was successful") return 1 except Exception, e: self.debug("deleteLdapConfiguration failed %s" %e) return 0 def _checkLogin(self, username, password): """ :param username: :param password: """ self.debug("Attempting to login.") try: loginParams = login.loginCmd() loginParams.username = username loginParams.password = password loginRes = self.apiClient.login(loginParams) self.debug("login response %s" % loginRes) if loginRes is None: self.debug("login not successful") return 0 else: self.debug("login successful") return 1 except Exception, p: self.debug("login operation failed %s" %p) self.debug("end of Login")