# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """ P1 tests for reset SSH keypair """ #Import Local Modules from marvin.cloudstackTestCase import * from marvin.integration.lib.base import * from marvin.integration.lib.common import * #Import System modules import tempfile import os from nose.plugins.attrib import attr class Services: """Test remote SSH client Services """ def __init__(self): self.services = { "account": { "email": "test@test.com", "firstname": "Test", "lastname": "User", "username": "test", # Random characters are appended in create account to # ensure unique username generated each time "password": "password", }, "virtual_machine": { "displayname": "VM", "username": "root", # VM creds for SSH "password": "password", "ssh_port": 22, "hypervisor": 'XenServer', "privateport": 22, "publicport": 22, "protocol": 'TCP', }, "service_offering": { "name": "Tiny Instance", "displaytext": "Tiny Instance", "cpunumber": 1, "cpuspeed": 100, "memory": 128, }, "egress": { "name": 'web', "protocol": 'TCP', "startport": 80, "endport": 80, "cidrlist": '0.0.0.0/0', }, "template": { "displaytext": "Cent OS Template", "name": "Cent OS Template", "passwordenabled": True, "ispublic": True, }, "ostype": 'CentOS 5.3 (64-bit)', # Cent OS 5.3 (64 bit) "SSHEnabledTemplate": "SSHkey", "SSHPasswordEnabledTemplate": "SSHKeyPassword", "sleep": 60, "timeout": 10, "mode": 'advanced', } def wait_vm_start(apiclient, account, timeout, sleep): while timeout: vms = VirtualMachine.list( apiclient, account=account.name, domainid=account.domainid, listall=True ) if vms and vms[0].state == "Running": return timeout time.sleep(sleep) timeout = timeout - 1 return timeout class TestResetSSHKeypair(cloudstackTestCase): @classmethod def setUpClass(cls): cls.api_client = super( TestResetSSHKeypair, cls).getClsTestClient().getApiClient() cls.services = Services().services # Get Zone, Domain and templates domain = get_domain(cls.api_client, cls.services) cls.zone = get_zone(cls.api_client, cls.services) cls.services['mode'] = cls.zone.networktype # Set Zones and disk offerings cls.services["virtual_machine"]["zoneid"] = cls.zone.id # Create Account, VMs, NAT Rules etc template = get_template( cls.api_client, cls.zone.id, cls.services["ostype"] ) # Set Zones and disk offerings cls.services["virtual_machine"]["zoneid"] = cls.zone.id cls.services["virtual_machine"]["template"] = template.id # Create VMs, NAT Rules etc cls.account = Account.create( cls.api_client, cls.services["account"], domainid=domain.id ) cls.service_offering = ServiceOffering.create( cls.api_client, cls.services["service_offering"] ) cls.virtual_machine = VirtualMachine.create( cls.api_client, cls.services["virtual_machine"], accountid=cls.account.name, domainid=cls.account.domainid, serviceofferingid=cls.service_offering.id, mode=cls.services["mode"] ) networkid = cls.virtual_machine.nic[0].networkid # create egress rule to allow wget of my cloud-set-guest-password script if cls.zone.networktype.lower() == 'advanced': EgressFireWallRule.create(cls.api_client, networkid=networkid, protocol=cls.services["egress"]["protocol"], startport=cls.services["egress"]["startport"], endport=cls.services["egress"]["endport"], cidrlist=cls.services["egress"]["cidrlist"]) cls.virtual_machine.password = cls.services["virtual_machine"]["password"] ssh = cls.virtual_machine.get_ssh_client() # below steps are required to get the new password from VR(reset password) # http://cloudstack.org/dl/cloud-set-guest-password # Copy this file to /etc/init.d # chmod +x /etc/init.d/cloud-set-guest-password # chkconfig --add cloud-set-guest-password # similar steps to get SSH key from web so as to make it ssh enabled cmds = [ "cd /etc/init.d;wget http://people.apache.org/~tsp/cloud-set-guest-password", "chmod +x /etc/init.d/cloud-set-guest-password", "chkconfig --add cloud-set-guest-password", "cd /etc/init.d;wget http://downloads.sourceforge.net/project/cloudstack/SSH%20Key%20Gen%20Script/" + \ "cloud-set-guest-sshkey.in?r=http%3A%2F%2Fsourceforge" + \ ".net%2Fprojects%2Fcloudstack%2Ffiles%2FSSH%2520Key%2520Gen%2520Script%2F&ts=1331225219&use_mirror=iweb", "chmod +x /etc/init.d/cloud-set-guest-sshkey.in", "chkconfig --add cloud-set-guest-sshkey.in" ] for c in cmds: result = ssh.execute(c) #Stop virtual machine cls.virtual_machine.stop(cls.api_client) # Poll listVM to ensure VM is stopped properly timeout = cls.services["timeout"] while True: time.sleep(cls.services["sleep"]) # Ensure that VM is in stopped state list_vm_response = list_virtual_machines( cls.api_client, id=cls.virtual_machine.id ) if isinstance(list_vm_response, list): vm = list_vm_response[0] if vm.state == 'Stopped': break if timeout == 0: raise Exception( "Failed to stop VM (ID: %s) " % vm.id) timeout = timeout - 1 list_volume = list_volumes( cls.api_client, virtualmachineid=cls.virtual_machine.id, type='ROOT', listall=True ) if isinstance(list_volume, list): cls.volume = list_volume[0] else: raise Exception( "Exception: Unable to find root volume for VM: %s" % cls.virtual_machine.id) cls.services["template"]["ostype"] = cls.services["ostype"] #Create templates for Edit, Delete & update permissions testcases cls.pw_ssh_enabled_template = Template.create( cls.api_client, cls.services["template"], cls.volume.id, account=cls.account.name, domainid=cls.account.domainid ) # Delete the VM - No longer needed cls.virtual_machine.delete(cls.api_client) cls._cleanup = [ cls.service_offering, cls.pw_ssh_enabled_template, cls.account ] @classmethod def tearDownClass(cls): # Cleanup VMs, templates etc. cleanup_resources(cls.api_client, cls._cleanup) return def setUp(self): self.apiclient = self.testClient.getApiClient() self.dbclient = self.testClient.getDbConnection() self.services = Services().services self.keypair = SSHKeyPair.create( self.apiclient, name=random_gen() + ".pem", account=self.account.name, domainid=self.account.domainid ) # Cleanup self.cleanup = [] self.tmp_files = [] def tearDown(self): try: #Clean up, terminate the created accounts, domains etc #cleanup_resources(self.apiclient, self.cleanup) for tmp_file in self.tmp_files: os.remove(tmp_file) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) return @attr(tags=["simulator", "basic", "advanced"]) def test_01_reset_ssh_keys(self): """Test Reset SSH keys for VM already having SSH key""" # Validate the following # 1. Create a VM having SSH keyPair # 2. Stop the VM # 3. Reset SSH key pair. Verify VM is not restarted automatically as # result of API execution. User should be able to ssh into the VM # using new keypair when VM is restarted # Spawn an instance virtual_machine = VirtualMachine.create( self.apiclient, self.services["virtual_machine"], templateid=self.pw_ssh_enabled_template.id, accountid=self.account.name, domainid=self.account.domainid, zoneid=self.zone.id, serviceofferingid=self.service_offering.id, keypair=self.keypair.name, mode=self.services["mode"] ) self.debug("Check if the VM is properly deployed or not?") vms = VirtualMachine.list( self.apiclient, id=virtual_machine.id, listall=True ) self.assertEqual( isinstance(vms, list), True, "List VMs should return the valid list" ) vm = vms[0] self.assertEqual( vm.state, "Running", "VM state should be running after deployment" ) self.debug("Stopping the virtual machine") try: virtual_machine.stop(self.apiclient) except Exception as e: self.fail("Failed to stop virtual machine: %s, %s" % (virtual_machine.id, e)) self.debug("Creating a new SSH keypair for account: %s" % self.account.name) new_keypair = SSHKeyPair.create( self.apiclient, name=random_gen() + ".pem", account=self.account.name, domainid=self.account.domainid ) self.debug("Created a new keypair with name: %s" % new_keypair.name) self.debug("Writing the private key to local file") keyPairFilePath = tempfile.gettempdir() + os.sep + new_keypair.name # Clenaup at end of execution self.tmp_files.append(keyPairFilePath) self.debug("File path: %s" % keyPairFilePath) f = open(keyPairFilePath, "w+") f.write(new_keypair.privatekey) f.close() os.system("chmod 400 " + keyPairFilePath) self.debug("Resetting the SSH key pair for instance: %s" % virtual_machine.name) try: virtual_machine.resetSshKey( self.apiclient, keypair=new_keypair.name, name=new_keypair.name, account=self.account.name, domainid=self.account.domainid ) except Exception as e: self.fail("Failed to reset SSH key: %s, %s" % (virtual_machine.name, e)) self.debug("Starting the virtual machine after resetting the keypair") try: virtual_machine.start(self.apiclient) except Exception as e: self.fail("Failed to start virtual machine: %s, %s" % (virtual_machine.name, e)) timeout = wait_vm_start(self.apiclient, self.account, self.services["timeout"], self.services["sleep"]) if timeout == 0: self.fail("The virtual machine %s failed to start even after %s minutes" % (virtual_machine.name, self.services["timeout"])) self.debug("SSH key path: %s" % str(keyPairFilePath)) try: virtual_machine.get_ssh_client(keyPairFileLocation=str(keyPairFilePath)) except Exception as e: self.fail("Failed to SSH into VM with new keypair: %s, %s" % (virtual_machine.name, e)) virtual_machine.delete(self.apiclient) return @attr(tags=["simulator", "basic", "advanced"]) def test_02_reset_ssh_key_password_enabled_template(self): """Reset SSH keys for VM created from password enabled template and already having SSH key """ # Validate the following # 1. Create VM from password enabled template and having SSH keyPair # 2. Stop the VM # 3. Reset SSH key pair. Verify VM is not restarted automatically # as a result of API execution # User should be able to ssh into the VM using new keypair # User should be able to login into VM using new password # returned by the API self.debug("Deploying the virtual machine in default network offering") # Spawn an instance virtual_machine = VirtualMachine.create( self.apiclient, self.services["virtual_machine"], templateid=self.pw_ssh_enabled_template.id, accountid=self.account.name, domainid=self.account.domainid, zoneid=self.zone.id, serviceofferingid=self.service_offering.id, keypair=self.keypair.name, mode=self.services["mode"] ) self.debug("Check if the VM is properly deployed or not?") vms = VirtualMachine.list( self.apiclient, id=virtual_machine.id, listall=True ) self.assertEqual( isinstance(vms, list), True, "List VMs should return the valid list" ) vm = vms[0] self.assertEqual( vm.state, "Running", "VM state should be running after deployment" ) self.debug("Stopping the virtual machine") try: virtual_machine.stop(self.apiclient) except Exception as e: self.fail("Failed to stop virtual machine: %s, %s" % (virtual_machine.id, e)) self.debug("Creating a new SSH keypair for account: %s" % self.account.name) new_keypair = SSHKeyPair.create( self.apiclient, name=random_gen() + ".pem", account=self.account.name, domainid=self.account.domainid ) self.debug("Created a new keypair with name: %s" % new_keypair.name) self.debug("Writing the private key to local file") keyPairFilePath = tempfile.gettempdir() + os.sep + new_keypair.name self.debug("File path: %s" % keyPairFilePath) f = open(keyPairFilePath, "w+") f.write(new_keypair.privatekey) f.close() os.system("chmod 400 " + keyPairFilePath) self.debug("Resetting the SSH key pair for instance: %s" % virtual_machine.name) try: virtual_machine.resetSshKey( self.apiclient, keypair=new_keypair.name, name=new_keypair.name, account=self.account.name, domainid=self.account.domainid ) except Exception as e: self.fail("Failed to reset SSH key: %s, %s" % (virtual_machine.name, e)) self.debug("Starting the virtual machine after resetting the keypair") try: virtual_machine.start(self.apiclient) except Exception as e: self.fail("Failed to start virtual machine: %s, %s" % (virtual_machine.name, e)) timeout = wait_vm_start(self.apiclient, self.account, self.services["timeout"], self.services["sleep"]) if timeout == 0: self.fail("The virtual machine %s failed to start even after %s minutes" % (virtual_machine.name, self.services["timeout"])) self.debug("SSHing with new keypair") try: virtual_machine.get_ssh_client( keyPairFileLocation=str(keyPairFilePath)) except Exception as e: self.fail("Failed to SSH into VM with new keypair: %s, %s" % (virtual_machine.name, e)) try: self.debug("SSHing using password") virtual_machine.get_ssh_client() except Exception as e: self.fail("Failed to SSH into VM with password: %s, %s" % (virtual_machine.name, e)) virtual_machine.delete(self.apiclient) return @attr(tags=["simulator", "basic", "advanced"]) def test_03_reset_ssh_with_no_key(self): """Reset SSH key for VM having no SSH key""" # Validate the following # 1.Create a VM # 2. Stop the VM # 3. Reset SSH key pair # Spawn an instance virtual_machine = VirtualMachine.create( self.apiclient, self.services["virtual_machine"], templateid=self.pw_ssh_enabled_template.id, accountid=self.account.name, domainid=self.account.domainid, zoneid=self.zone.id, serviceofferingid=self.service_offering.id, mode=self.services["mode"] ) self.debug("Check if the VM is properly deployed or not?") vms = VirtualMachine.list( self.apiclient, id=virtual_machine.id, listall=True ) self.assertEqual( isinstance(vms, list), True, "List VMs should return the valid list" ) vm = vms[0] self.assertEqual( vm.state, "Running", "VM state should be running after deployment" ) self.debug("Stopping the virtual machine") try: virtual_machine.stop(self.apiclient) except Exception as e: self.fail("Failed to stop virtual machine: %s, %s" % (virtual_machine.id, e)) self.debug("Creating a new SSH keypair for account: %s" % self.account.name) new_keypair = SSHKeyPair.create( self.apiclient, name=random_gen() + ".pem", account=self.account.name, domainid=self.account.domainid ) self.debug("Created a new keypair with name: %s" % new_keypair.name) self.debug("Writing the private key to local file") keyPairFilePath = tempfile.gettempdir() + os.sep + new_keypair.name self.debug("File path: %s" % keyPairFilePath) f = open(keyPairFilePath, "w+") f.write(new_keypair.privatekey) f.close() os.system("chmod 400 " + keyPairFilePath) self.debug("Resetting the SSH key pair for instance: %s" % virtual_machine.name) try: virtual_machine.resetSshKey( self.apiclient, keypair=new_keypair.name, name=new_keypair.name, account=self.account.name, domainid=self.account.domainid ) except Exception as e: self.fail("Failed to reset SSH key: %s, %s" % (virtual_machine.name, e)) self.debug("Starting the virtual machine after resetting the keypair") try: virtual_machine.start(self.apiclient) except Exception as e: self.fail("Failed to start virtual machine: %s, %s" % (virtual_machine.name, e)) timeout = wait_vm_start(self.apiclient, self.account, self.services["timeout"], self.services["sleep"]) if timeout == 0: self.fail("The virtual machine %s failed to start even after %s minutes" % (virtual_machine.name, self.services["timeout"])) self.debug("SSHing with new keypair") try: virtual_machine.get_ssh_client( keyPairFileLocation=str(keyPairFilePath)) except Exception as e: self.fail("Failed to SSH into VM with new keypair: %s, %s" % (virtual_machine.name, e)) virtual_machine.delete(self.apiclient) return @attr(tags=["simulator", "basic", "advanced"]) def test_04_reset_key_passwd_enabled_no_key(self): """Reset SSH keys for VM created from password enabled template and have no previous SSH key""" # Validate the following # 1.Create a VM from password enabled template # 2. Stop the VM # 3. Reset SSH key pair. Verify VM is not restarted automatically as a # result of API execution # User should be able to ssh into the VM using new keypair when # VM is restarted # User should be able to login into VM using new password returned # by the API self.debug("Deploying the virtual machine in default network offering") # Spawn an instance virtual_machine = VirtualMachine.create( self.apiclient, self.services["virtual_machine"], templateid=self.pw_ssh_enabled_template.id, accountid=self.account.name, domainid=self.account.domainid, zoneid=self.zone.id, serviceofferingid=self.service_offering.id, mode=self.services["mode"] ) self.debug("Check if the VM is properly deployed or not?") vms = VirtualMachine.list( self.apiclient, id=virtual_machine.id, listall=True ) self.assertEqual( isinstance(vms, list), True, "List VMs should return the valid list" ) vm = vms[0] self.assertEqual( vm.state, "Running", "VM state should be running after deployment" ) self.debug("Stopping the virtual machine") try: virtual_machine.stop(self.apiclient) except Exception as e: self.fail("Failed to stop virtual machine: %s, %s" % (virtual_machine.id, e)) self.debug("Creating a new SSH keypair for account: %s" % self.account.name) new_keypair = SSHKeyPair.create( self.apiclient, name=random_gen() + ".pem", account=self.account.name, domainid=self.account.domainid ) self.debug("Created a new keypair with name: %s" % new_keypair.name) self.debug("Writing the private key to local file") keyPairFilePath = tempfile.gettempdir() + os.sep + new_keypair.name self.debug("File path: %s" % keyPairFilePath) f = open(keyPairFilePath, "w+") f.write(new_keypair.privatekey) f.close() os.system("chmod 400 " + keyPairFilePath) self.debug("Resetting the SSH key pair for instance: %s" % virtual_machine.name) try: virtual_machine.resetSshKey( self.apiclient, keypair=new_keypair.name, name=new_keypair.name, account=self.account.name, domainid=self.account.domainid ) except Exception as e: self.fail("Failed to reset SSH key: %s, %s" % (virtual_machine.name, e)) self.debug("Starting the virtual machine after resetting the keypair") try: virtual_machine.start(self.apiclient) except Exception as e: self.fail("Failed to start virtual machine: %s, %s" % (virtual_machine.name, e)) timeout = wait_vm_start(self.apiclient, self.account, self.services["timeout"], self.services["sleep"]) if timeout == 0: self.fail("The virtual machine %s failed to start even after %s minutes" % (virtual_machine.name, self.services["timeout"])) self.debug("SSHing with new keypair") try: virtual_machine.get_ssh_client( keyPairFileLocation=str(keyPairFilePath)) except Exception as e: self.fail("Failed to SSH into VM with new keypair: %s, %s" % (virtual_machine.name, e)) virtual_machine.delete(self.apiclient) return @attr(tags=["simulator", "basic", "advanced"]) def test_05_reset_key_in_running_state(self): """Reset SSH keys for VM already having SSH key when VM is in running state""" # Validate the following # 1.Create a VM having SSH keyPair # 2. Reset SSH key pair. Api returns error message VM is running # Spawn an instance virtual_machine = VirtualMachine.create( self.apiclient, self.services["virtual_machine"], templateid=self.pw_ssh_enabled_template.id, accountid=self.account.name, domainid=self.account.domainid, zoneid=self.zone.id, serviceofferingid=self.service_offering.id, keypair=self.keypair.name, mode=self.services["mode"] ) self.debug("Check if the VM is properly deployed or not?") vms = VirtualMachine.list( self.apiclient, id=virtual_machine.id, listall=True ) self.assertEqual( isinstance(vms, list), True, "List VMs should return the valid list" ) vm = vms[0] self.assertEqual( vm.state, "Running", "VM state should be running after deployment" ) self.debug("Creating a new SSH keypair for account: %s" % self.account.name) new_keypair = SSHKeyPair.create( self.apiclient, name=random_gen() + ".pem", account=self.account.name, domainid=self.account.domainid ) self.debug("Created a new keypair with name: %s" % new_keypair.name) self.debug("Writing the private key to local file") keyPairFilePath = tempfile.gettempdir() + os.sep + new_keypair.name self.debug("File path: %s" % keyPairFilePath) f = open(keyPairFilePath, "w+") f.write(new_keypair.privatekey) f.close() os.system("chmod 400 " + keyPairFilePath) self.debug("Resetting the SSH key pair for instance: %s" % virtual_machine.name) with self.assertRaises(Exception): virtual_machine.resetSshKey( self.apiclient, keypair=new_keypair.name, name=new_keypair.name, account=self.account.name, domainid=self.account.domainid ) virtual_machine.delete(self.apiclient) return @attr(tags=["simulator", "basic", "advanced"]) def test_06_reset_key_passwd_enabled_vm_running(self): """Reset SSH keys for VM created from password enabled template and already having SSH key and VM is in running state""" # Validate the following # 1. Reset SSH keys for VM created from password enabled template # and already having SSH key and VM is in running state # 2. APi returns error message Vm is running self.debug("Deploying the virtual machine in default network offering") # Spawn an instance virtual_machine = VirtualMachine.create( self.apiclient, self.services["virtual_machine"], templateid=self.pw_ssh_enabled_template.id, accountid=self.account.name, domainid=self.account.domainid, zoneid=self.zone.id, serviceofferingid=self.service_offering.id, keypair=self.keypair.name, mode=self.services["mode"] ) self.debug("Check if the VM is properly deployed or not?") vms = VirtualMachine.list( self.apiclient, id=virtual_machine.id, listall=True ) self.assertEqual( isinstance(vms, list), True, "List VMs should return the valid list" ) vm = vms[0] self.assertEqual( vm.state, "Running", "VM state should be running after deployment" ) self.debug("Creating a new SSH keypair for account: %s" % self.account.name) new_keypair = SSHKeyPair.create( self.apiclient, name=random_gen() + ".pem", account=self.account.name, domainid=self.account.domainid ) self.debug("Created a new keypair with name: %s" % new_keypair.name) self.debug("Writing the private key to local file") keyPairFilePath = tempfile.gettempdir() + os.sep + new_keypair.name self.debug("File path: %s" % keyPairFilePath) f = open(keyPairFilePath, "w+") f.write(new_keypair.privatekey) f.close() os.system("chmod 400 " + keyPairFilePath) self.debug("Resetting the SSH key pair for instance: %s" % virtual_machine.name) with self.assertRaises(Exception): virtual_machine.resetSshKey( self.apiclient, keypair=new_keypair.name, name=new_keypair.name, account=self.account.name, domainid=self.account.domainid ) virtual_machine.delete(self.apiclient) return @attr(tags=["simulator", "basic", "advanced"]) def test_07_reset_keypair_invalid_params(self): """Verify API resetSSHKeyForVirtualMachine with incorrect parameters""" # Validate the following # 1. Create the VM # 2. Stop the VM # 3. Call resetSSHKeyForVirtualMachine API with incorrect parameter # Spawn an instance virtual_machine = VirtualMachine.create( self.apiclient, self.services["virtual_machine"], templateid=self.pw_ssh_enabled_template.id, accountid=self.account.name, domainid=self.account.domainid, zoneid=self.zone.id, serviceofferingid=self.service_offering.id, keypair=self.keypair.name, mode=self.services["mode"] ) self.debug("Check if the VM is properly deployed or not?") vms = VirtualMachine.list( self.apiclient, id=virtual_machine.id, listall=True ) self.assertEqual( isinstance(vms, list), True, "List VMs should return the valid list" ) vm = vms[0] self.assertEqual( vm.state, "Running", "VM state should be running after deployment" ) self.debug("Creating a new SSH keypair for account: %s" % self.account.name) new_keypair = SSHKeyPair.create( self.apiclient, name=random_gen() + ".pem", account=self.account.name, domainid=self.account.domainid ) self.debug("Created a new keypair with name: %s" % new_keypair.name) self.debug("Writing the private key to local file") keyPairFilePath = tempfile.gettempdir() + os.sep + new_keypair.name self.debug("File path: %s" % keyPairFilePath) f = open(keyPairFilePath, "w+") f.write(new_keypair.privatekey) f.close() os.system("chmod 400 " + keyPairFilePath) self.debug("Resetting the SSH key pair for instance: %s" % virtual_machine.name) with self.assertRaises(Exception): virtual_machine.resetSshKey( self.apiclient, keypair=random_gen() + ".pem", name=new_keypair.name, account=self.account.name, domainid=self.account.domainid ) self.debug("Reset SSH key pair failed due to invalid parameters") virtual_machine.delete(self.apiclient) return class TestResetSSHKeyUserRights(cloudstackTestCase): @classmethod def setUpClass(cls): cls.api_client = super( TestResetSSHKeyUserRights, cls).getClsTestClient().getApiClient() cls.services = Services().services # Get Zone, Domain and templates cls.domain = get_domain(cls.api_client, cls.services) cls.zone = get_zone(cls.api_client, cls.services) cls.services['mode'] = cls.zone.networktype # Set Zones and disk offerings cls.services["virtual_machine"]["zoneid"] = cls.zone.id # Create Account, VMs, NAT Rules etc template = get_template( cls.api_client, cls.zone.id, cls.services["ostype"] ) # Set Zones and disk offerings cls.services["virtual_machine"]["zoneid"] = cls.zone.id cls.services["virtual_machine"]["template"] = template.id # Create VMs, NAT Rules etc cls.account = Account.create( cls.api_client, cls.services["account"], domainid=cls.domain.id ) cls.service_offering = ServiceOffering.create( cls.api_client, cls.services["service_offering"] ) cls.virtual_machine = VirtualMachine.create( cls.api_client, cls.services["virtual_machine"], accountid=cls.account.name, domainid=cls.account.domainid, serviceofferingid=cls.service_offering.id, mode=cls.services["mode"] ) networkid = cls.virtual_machine.nic[0].networkid # create egress rule to allow wget of my cloud-set-guest-password script if cls.zone.networktype.lower() == 'advanced': EgressFireWallRule.create(cls.api_client, networkid=networkid, protocol=cls.services["egress"]["protocol"], startport=cls.services["egress"]["startport"], endport=cls.services["egress"]["endport"], cidrlist=cls.services["egress"]["cidrlist"]) cls.virtual_machine.password = cls.services["virtual_machine"]["password"] ssh = cls.virtual_machine.get_ssh_client() #below steps are required to get the new password from VR(reset password) #http://cloudstack.org/dl/cloud-set-guest-password #Copy this file to /etc/init.d #chmod +x /etc/init.d/cloud-set-guest-password #chkconfig --add cloud-set-guest-password # Do similar steps to get SSH key from web so as to make it ssh enabled cmds = [ "cd /etc/init.d;wget http://downloads.sourceforge.net/project/cloudstack/SSH%20Key%20Gen%20Script/" + \ "cloud-set-guest-sshkey.in?r=http%3A%2F%2Fsourceforge" + \ ".net%2Fprojects%2Fcloudstack%2Ffiles%2FSSH%2520Key%2520Gen%2520Script%2F&ts=1331225219&use_mirror=iweb", "chmod +x /etc/init.d/cloud-set-guest-sshkey.in", "chkconfig --add cloud-set-guest-sshkey.in" ] for c in cmds: result = ssh.execute(c) #Stop virtual machine cls.virtual_machine.stop(cls.api_client) # Poll listVM to ensure VM is stopped properly timeout = cls.services["timeout"] while True: time.sleep(cls.services["sleep"]) # Ensure that VM is in stopped state list_vm_response = list_virtual_machines( cls.api_client, id=cls.virtual_machine.id ) if isinstance(list_vm_response, list): vm = list_vm_response[0] if vm.state == 'Stopped': break if timeout == 0: raise Exception( "Failed to stop VM (ID: %s) " % vm.id) timeout = timeout - 1 list_volume = list_volumes( cls.api_client, virtualmachineid=cls.virtual_machine.id, type='ROOT', listall=True ) if isinstance(list_volume, list): cls.volume = list_volume[0] else: raise Exception( "Exception: Unable to find root volume for VM: %s" % cls.virtual_machine.id) cls.services["template"]["ostype"] = cls.services["ostype"] #Create templates for Edit, Delete & update permissions testcases cls.pw_ssh_enabled_template = Template.create( cls.api_client, cls.services["template"], cls.volume.id ) # Delete the VM - No longer needed cls.virtual_machine.delete(cls.api_client) cls._cleanup = [ cls.service_offering, cls.pw_ssh_enabled_template, cls.account ] @classmethod def tearDownClass(cls): # Cleanup VMs, templates etc. cleanup_resources(cls.api_client, cls._cleanup) return def setUp(self): self.apiclient = self.testClient.getApiClient() self.dbclient = self.testClient.getDbConnection() self.services = Services().services # Set Zones and disk offerings self.services["virtual_machine"]["zoneid"] = self.zone.id self.service_offering = ServiceOffering.create( self.apiclient, self.services["service_offering"] ) # Cleanup self.cleanup = [self.service_offering] def tearDown(self): try: #Clean up, terminate the created accounts, domains etc cleanup_resources(self.apiclient, self.cleanup) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) return @attr(tags=["simulator", "basic", "advanced"]) def test_01_reset_keypair_normal_user(self): """Verify API resetSSHKeyForVirtualMachine for non admin non root domain user""" # Validate the following # 1. Create a VM for non admin non root domain user # 2. Stop the VM # 3. Reset SSH key pair for non admin non root domain user. Verify VM # is not restarted automatically as a result of API execution # User should be able to ssh into the VM using new keypair when # VM is restarted # Create account, SSH key pair etc. self.debug("Creating a normal user account") self.user_account = Account.create( self.apiclient, self.services["account"], admin=False ) self.cleanup.append(self.user_account) self.debug("Account created: %s" % self.user_account.name) self.services = Services().services # Spawn an instance virtual_machine = VirtualMachine.create( self.apiclient, self.services["virtual_machine"], templateid=self.pw_ssh_enabled_template.id, accountid=self.user_account.name, domainid=self.user_account.domainid, zoneid=self.zone.id, serviceofferingid=self.service_offering.id, mode=self.services["mode"] ) self.debug("Check if the VM is properly deployed or not?") vms = VirtualMachine.list( self.apiclient, id=virtual_machine.id, listall=True ) self.assertEqual( isinstance(vms, list), True, "List VMs should return the valid list" ) vm = vms[0] self.assertEqual( vm.state, "Running", "VM state should be running after deployment" ) self.debug("Stopping the virtual machine") try: virtual_machine.stop(self.apiclient) except Exception as e: self.fail("Failed to stop virtual machine: %s, %s" % (virtual_machine.id, e)) self.debug("Creating a new SSH keypair for account: %s" % self.user_account.name) new_keypair = SSHKeyPair.create( self.apiclient, name=random_gen() + ".pem", account=self.user_account.name, domainid=self.user_account.domainid ) self.debug("Created a new keypair with name: %s" % new_keypair.name) self.debug("Writing the private key to local file") keyPairFilePath = tempfile.gettempdir() + os.sep + new_keypair.name self.debug("File path: %s" % keyPairFilePath) f = open(keyPairFilePath, "w+") f.write(new_keypair.privatekey) f.close() os.system("chmod 400 " + keyPairFilePath) self.debug("Resetting the SSH key pair for instance: %s" % virtual_machine.name) try: virtual_machine.resetSshKey( self.apiclient, keypair=new_keypair.name, name=new_keypair.name, account=self.user_account.name, domainid=self.user_account.domainid ) except Exception as e: self.fail("Failed to reset SSH key: %s, %s" % (virtual_machine.name, e)) self.debug("Starting the virtual machine after resetting the keypair") try: virtual_machine.start(self.apiclient) except Exception as e: self.fail("Failed to start virtual machine: %s, %s" % (virtual_machine.name, e)) timeout = wait_vm_start(self.apiclient, self.account, self.services["timeout"], self.services["sleep"]) if timeout == 0: self.fail("The virtual machine %s failed to start even after %s minutes" % (vms[0].name, self.services["timeout"])) self.debug("SSHing with new keypair") try: virtual_machine.get_ssh_client( keyPairFileLocation=str(keyPairFilePath)) except Exception as e: self.fail("Failed to SSH into VM with new keypair: %s, %s" % (virtual_machine.name, e)) virtual_machine.delete(self.apiclient) return @attr(tags=["simulator", "basic", "advanced"]) def test_02_reset_keypair_domain_admin(self): """Verify API resetSSHKeyForVirtualMachine for domain admin non root domain user""" # Validate the following # 1.Create a VM for domain admin non root domain user # 2. Stop the VM # 3. Reset SSH key pair for domain admin non root domain user. Verify # VM is not restarted automatically as a result of API execution # User should be able to ssh into the VM using new keypair when # VM is restarted # Create account, SSH key pair etc. self.debug("Creating a domain admin account") self.account = Account.create( self.apiclient, self.services["account"], admin=True, domainid=self.domain.id ) self.cleanup.append(self.account) self.debug("Account created: %s" % self.account.name) self.debug("Generating SSH keypair for the account: %s" % self.account.name) self.keypair = SSHKeyPair.create( self.apiclient, name=random_gen() + ".pem", account=self.account.name, domainid=self.account.domainid ) self.debug("Writing the private key to local file") keyPairFilePath = tempfile.gettempdir() + os.sep + self.keypair.name self.debug("File path: %s" % keyPairFilePath) f = open(keyPairFilePath, "w+") f.write(self.keypair.privatekey) f.close() os.system("chmod 400 " + keyPairFilePath) # Spawn an instance virtual_machine = VirtualMachine.create( self.apiclient, self.services["virtual_machine"], templateid=self.pw_ssh_enabled_template.id, accountid=self.account.name, domainid=self.account.domainid, serviceofferingid=self.service_offering.id, keypair=self.keypair.name, mode=self.services["mode"] ) self.debug("Check if the VM is properly deployed or not?") vms = VirtualMachine.list( self.apiclient, id=virtual_machine.id, listall=True ) self.assertEqual( isinstance(vms, list), True, "List VMs should return the valid list" ) vm = vms[0] self.assertEqual( vm.state, "Running", "VM state should be running after deployment" ) self.debug("Stopping the virtual machine") try: virtual_machine.stop(self.apiclient) except Exception as e: self.fail("Failed to stop virtual machine: %s, %s" % (virtual_machine.id, e)) self.debug("Creating a new SSH keypair for account: %s" % self.account.name) new_keypair = SSHKeyPair.create( self.apiclient, name=random_gen() + ".pem", account=self.account.name, domainid=self.account.domainid ) self.debug("Created a new keypair with name: %s" % new_keypair.name) self.debug("Writing the private key to local file") keyPairFilePath = tempfile.gettempdir() + os.sep + new_keypair.name self.debug("File path: %s" % keyPairFilePath) f = open(keyPairFilePath, "w+") f.write(new_keypair.privatekey) f.close() os.system("chmod 400 " + keyPairFilePath) self.debug("Resetting the SSH key pair for instance: %s" % virtual_machine.name) try: virtual_machine.resetSshKey( self.apiclient, keypair=new_keypair.name, name=new_keypair.name, account=self.account.name, domainid=self.account.domainid ) except Exception as e: self.fail("Failed to reset SSH key: %s, %s" % (virtual_machine.name, e)) self.debug("Starting the virtual machine after resetting the keypair") try: virtual_machine.start(self.apiclient) except Exception as e: self.fail("Failed to start virtual machine: %s, %s" % (virtual_machine.name, e)) timeout = wait_vm_start(self.apiclient, self.account, self.services["timeout"], self.services["sleep"]) if timeout == 0: self.fail("The virtual machine %s failed to start even after %s minutes" % (virtual_machine.name, self.services["timeout"])) self.debug("SSHing with new keypair") try: virtual_machine.get_ssh_client( keyPairFileLocation=str(keyPairFilePath)) except Exception as e: self.fail("Failed to SSH into VM with new keypair: %s, %s" % (virtual_machine.name, e)) virtual_machine.delete(self.apiclient) return @attr(tags=["simulator", "basic", "advanced"]) def test_03_reset_keypair_root_admin(self): """Verify API resetSSHKeyForVirtualMachine for domain admin root domain user""" # Validate the following # 1.Create a VM for domain admin, root domain user # 2. Stop the VM # 3. Reset SSH key pair for domain admin non root domain user. Verify # VM is not restarted automatically as a result of API execution # User should be able to ssh into the VM using new keypair when # VM is restarted # Create account, SSH key pair etc. self.debug("Creating a ROOT admin account") self.account = Account.create( self.apiclient, self.services["account"], admin=True ) self.cleanup.append(self.account) self.debug("Account created: %s" % self.account.name) self.debug("Generating SSH keypair for the account: %s" % self.account.name) self.keypair = SSHKeyPair.create( self.apiclient, name=random_gen() + ".pem", account=self.account.name, domainid=self.account.domainid ) self.debug("Writing the private key to local file") keyPairFilePath = tempfile.gettempdir() + os.sep + self.keypair.name self.debug("File path: %s" % keyPairFilePath) f = open(keyPairFilePath, "w+") f.write(self.keypair.privatekey) f.close() os.system("chmod 400 " + keyPairFilePath) self.debug("Deploying the virtual machine in default network offering") # Spawn an instance virtual_machine = VirtualMachine.create( self.apiclient, self.services["virtual_machine"], templateid=self.pw_ssh_enabled_template.id, accountid=self.account.name, domainid=self.account.domainid, serviceofferingid=self.service_offering.id, keypair=self.keypair.name, mode=self.services["mode"] ) self.debug("Check if the VM is properly deployed or not?") vms = VirtualMachine.list( self.apiclient, id=virtual_machine.id, listall=True ) self.assertEqual( isinstance(vms, list), True, "List VMs should return the valid list" ) vm = vms[0] self.assertEqual( vm.state, "Running", "VM state should be running after deployment" ) self.debug("Stopping the virtual machine") try: virtual_machine.stop(self.apiclient) except Exception as e: self.fail("Failed to stop virtual machine: %s, %s" % (virtual_machine.id, e)) self.debug("Creating a new SSH keypair for account: %s" % self.account.name) new_keypair = SSHKeyPair.create( self.apiclient, name=random_gen() + ".pem", account=self.account.name, domainid=self.account.domainid ) self.debug("Created a new keypair with name: %s" % new_keypair.name) self.debug("Writing the private key to local file") keyPairFilePath = tempfile.gettempdir() + os.sep + new_keypair.name self.debug("File path: %s" % keyPairFilePath) f = open(keyPairFilePath, "w+") f.write(new_keypair.privatekey) f.close() os.system("chmod 400 " + keyPairFilePath) self.debug("Resetting the SSH key pair for instance: %s" % virtual_machine.name) try: virtual_machine.resetSshKey( self.apiclient, keypair=new_keypair.name, name=new_keypair.name, account=self.account.name, domainid=self.account.domainid ) except Exception as e: self.fail("Failed to reset SSH key: %s, %s" % (virtual_machine.name, e)) self.debug("Starting the virtual machine after resetting the keypair") try: virtual_machine.start(self.apiclient) except Exception as e: self.fail("Failed to start virtual machine: %s, %s" % (virtual_machine.name, e)) timeout = wait_vm_start(self.apiclient, self.account, self.services["timeout"], self.services["sleep"]) if timeout == 0: self.fail("The virtual machine %s failed to start even after %s minutes" % (virtual_machine.name, self.services["timeout"])) self.debug("SSHing with new keypair") try: virtual_machine.get_ssh_client( keyPairFileLocation=str(keyPairFilePath)) except Exception as e: self.fail("Failed to SSH into VM with new keypair: %s, %s" % (virtual_machine.name, e)) virtual_machine.delete(self.apiclient) return