mirror of
				https://github.com/apache/cloudstack.git
				synced 2025-10-26 08:42:29 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			2560 lines
		
	
	
		
			105 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			2560 lines
		
	
	
		
			105 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Licensed to the Apache Software Foundation (ASF) under one
 | |
| # or more contributor license agreements.  See the NOTICE file
 | |
| # distributed with this work for additional information
 | |
| # regarding copyright ownership.  The ASF licenses this file
 | |
| # to you under the Apache License, Version 2.0 (the
 | |
| # "License"); you may not use this file except in compliance
 | |
| # with the License.  You may obtain a copy of the License at
 | |
| #
 | |
| #   http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing,
 | |
| # software distributed under the License is distributed on an
 | |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 | |
| # KIND, either express or implied.  See the License for the
 | |
| # specific language governing permissions and limitations
 | |
| # under the License.
 | |
| 
 | |
| """ Component tests for user data, meta data, ssh keys
 | |
|     and password reset functionality with
 | |
|     ConfigDrive
 | |
| """
 | |
| 
 | |
| # Import Local Modules
 | |
| from marvin.cloudstackAPI import (restartVPC)
 | |
| from marvin.cloudstackTestCase import cloudstackTestCase
 | |
| from marvin.lib.base import (
 | |
|     Account,
 | |
|     createVlanIpRange,
 | |
|     Configurations,
 | |
|     FireWallRule,
 | |
|     Host,
 | |
|     listVlanIpRanges,
 | |
|     Network,
 | |
|     NetworkACL,
 | |
|     NetworkACLList,
 | |
|     NetworkOffering,
 | |
|     NetworkServiceProvider,
 | |
|     PublicIPAddress,
 | |
|     Router,
 | |
|     ServiceOffering,
 | |
|     createSSHKeyPair,
 | |
|     deleteSSHKeyPair,
 | |
|     StaticNATRule,
 | |
|     VirtualMachine,
 | |
|     VPC,
 | |
|     VpcOffering,
 | |
|     Hypervisor, Template)
 | |
| from marvin.lib.common import (
 | |
|     get_domain,
 | |
|     get_zone, get_test_template,
 | |
|     is_config_suitable)
 | |
| from marvin.lib.utils import random_gen
 | |
| 
 | |
| # Import System Modules
 | |
| import base64
 | |
| import os
 | |
| import socket
 | |
| import subprocess
 | |
| import tempfile
 | |
| import time
 | |
| from contextlib import contextmanager
 | |
| from nose.plugins.attrib import attr
 | |
| 
 | |
| VPC_SERVICES = 'Dhcp,StaticNat,SourceNat,NetworkACL,UserData,Dns'
 | |
| ISO_SERVICES = 'Dhcp,SourceNat,StaticNat,UserData,Firewall,Dns'
 | |
| 
 | |
| NO_SUCH_FILE = "No such file or directory"
 | |
| 
 | |
| 
 | |
| class MySSHKeyPair:
 | |
|     """Manage SSH Key pairs"""
 | |
| 
 | |
|     def __init__(self, items):
 | |
|         self.private_key_file = None
 | |
|         self.__dict__.update(items)
 | |
| 
 | |
|     @classmethod
 | |
|     def create(cls, apiclient, name=None, account=None,
 | |
|                domainid=None, projectid=None):
 | |
|         """Creates SSH keypair"""
 | |
|         cmd = createSSHKeyPair.createSSHKeyPairCmd()
 | |
|         cmd.name = name
 | |
|         if account is not None:
 | |
|             cmd.account = account
 | |
|         if domainid is not None:
 | |
|             cmd.domainid = domainid
 | |
|         if projectid is not None:
 | |
|             cmd.projectid = projectid
 | |
|         return MySSHKeyPair(apiclient.createSSHKeyPair(cmd).__dict__)
 | |
| 
 | |
|     def delete(self, apiclient):
 | |
|         """Delete SSH key pair"""
 | |
|         cmd = deleteSSHKeyPair.deleteSSHKeyPairCmd()
 | |
|         cmd.name = self.name
 | |
|         cmd.account = self.account
 | |
|         cmd.domainid = self.domainid
 | |
|         apiclient.deleteSSHKeyPair(cmd)
 | |
| 
 | |
| 
 | |
| class Services:
 | |
|     """Test Add Remove Network Services
 | |
|     """
 | |
| 
 | |
|     def __init__(self):
 | |
|         self.services = {
 | |
|             "test_templates": {
 | |
|                 "kvm": {
 | |
|                     "name": "Centos-5.5-sshkey-and-configdrive",
 | |
|                     "displaytext": "SSHkey and ConfigDrive enabled CentOS",
 | |
|                     "format": "qcow2",
 | |
|                     "hypervisor": "kvm",
 | |
|                     "ostype": "CentOS 5.5 (64-bit)",
 | |
|                     "url": "http://people.apache.org/~weizhou/centos55-sshkey-configdrive.qcow2.bz2",
 | |
|                     "requireshvm": "False",
 | |
|                     "ispublic": "True",
 | |
|                     "isextractable": "True"
 | |
|                 }
 | |
|             },
 | |
|             "vpc_offering_configdrive": {
 | |
|                 "name": 'VPC offering ConfigDrive',
 | |
|                 "displaytext": 'VPC offering ConfigDrive',
 | |
|                 "supportedservices": VPC_SERVICES,
 | |
|                 "serviceProviderList": {
 | |
|                     "Dhcp": "VpcVirtualRouter",
 | |
|                     "StaticNat": "VpcVirtualRouter",
 | |
|                     "SourceNat": "VpcVirtualRouter",
 | |
|                     "NetworkACL": "VpcVirtualRouter",
 | |
|                     "UserData": "ConfigDrive",
 | |
|                     "Dns": "VpcVirtualRouter"
 | |
|                 }
 | |
|             },
 | |
|             "vpc_network_offering_configdrive": {
 | |
|                 "name": 'vpc_net_off_marvin_configdrive',
 | |
|                 "displaytext": 'vpc_net_off_marvin_configdrive',
 | |
|                 "guestiptype": 'Isolated',
 | |
|                 "supportedservices": VPC_SERVICES,
 | |
|                 "traffictype": 'GUEST',
 | |
|                 "availability": 'Optional',
 | |
|                 "useVpc": 'on',
 | |
|                 "ispersistent": 'True',
 | |
|                 "serviceProviderList": {
 | |
|                     "Dhcp": "VpcVirtualRouter",
 | |
|                     "StaticNat": "VpcVirtualRouter",
 | |
|                     "SourceNat": "VpcVirtualRouter",
 | |
|                     "NetworkACL": "VpcVirtualRouter",
 | |
|                     "UserData": "ConfigDrive",
 | |
|                     "Dns": "VpcVirtualRouter"
 | |
|                 }
 | |
|             },
 | |
|             "isolated_configdrive_network_offering": {
 | |
|                 "name": 'isolated_configdrive_net_off_marvin',
 | |
|                 "displaytext": 'isolated_configdrive_net_off_marvin',
 | |
|                 "guestiptype": 'Isolated',
 | |
|                 "supportedservices": ISO_SERVICES,
 | |
|                 "traffictype": 'GUEST',
 | |
|                 "availability": 'Optional',
 | |
|                 "tags": 'native',
 | |
|                 "serviceProviderList": {
 | |
|                     "Dhcp": 'VirtualRouter',
 | |
|                     "StaticNat": 'VirtualRouter',
 | |
|                     "SourceNat": 'VirtualRouter',
 | |
|                     "Firewall": 'VirtualRouter',
 | |
|                     "UserData": 'ConfigDrive',
 | |
|                     "Dns": 'VirtualRouter'
 | |
|                 }
 | |
|             },
 | |
|             "shared_network_config_drive_offering": {
 | |
|                 "name": 'shared_network_config_drive_offering',
 | |
|                 "displaytext": 'shared_network_config_drive_offering',
 | |
|                 "guestiptype": 'shared',
 | |
|                 "supportedservices": 'Dhcp,UserData',
 | |
|                 "traffictype": 'GUEST',
 | |
|                 "specifyVlan": "True",
 | |
|                 "specifyIpRanges": "True",
 | |
|                 "availability": 'Optional',
 | |
|                 "serviceProviderList": {
 | |
|                     "Dhcp": "VirtualRouter",
 | |
|                     "UserData": 'ConfigDrive'
 | |
|                 }
 | |
|             },
 | |
|             "publiciprange2": {
 | |
|                 "gateway": "10.219.1.1",
 | |
|                 "netmask": "255.255.255.0",
 | |
|                 "startip": "10.219.1.2",
 | |
|                 "endip": "10.219.1.5",
 | |
|                 "forvirtualnetwork": "false"
 | |
|             },
 | |
|             "acl": {
 | |
|                 "network_all_1": {
 | |
|                     "name": "SharedNetwork-All-1",
 | |
|                     "displaytext": "SharedNetwork-All-1",
 | |
|                     "vlan": "4001",
 | |
|                     "gateway": "10.200.100.1",
 | |
|                     "netmask": "255.255.255.0",
 | |
|                     "startip": "10.200.100.21",
 | |
|                     "endip": "10.200.100.100",
 | |
|                     "acltype": "Domain"
 | |
|                 },
 | |
|                 "network_all_2": {
 | |
|                     "name": "SharedNetwork2-All-2",
 | |
|                     "displaytext": "SharedNetwork2-All-2",
 | |
|                     "vlan": "4002",
 | |
|                     "gateway": "10.200.200.1",
 | |
|                     "netmask": "255.255.255.0",
 | |
|                     "startip": "10.200.200.21",
 | |
|                     "endip": "10.200.200.100",
 | |
|                     "acltype": "Domain"
 | |
|                 }
 | |
|             }
 | |
|         }
 | |
| 
 | |
| 
 | |
| class ConfigDriveUtils:
 | |
|     template = None
 | |
| 
 | |
|     class CreateResult:
 | |
|         def __init__(self, success, offering=None, network=None, vpc=None):
 | |
|             self.success = success
 | |
|             self.network = network
 | |
|             self.offering = offering
 | |
|             self.vpc = vpc
 | |
| 
 | |
|     class PasswordTest:
 | |
|         def __init__(self, vm=None, expect_pw=None):
 | |
|             """
 | |
|             :param vm: vm
 | |
|             :param expect_pw: Is a password expected
 | |
|             """
 | |
|             self.presence = expect_pw
 | |
|             self.password = None
 | |
| 
 | |
|             if vm:
 | |
|                 self.password = vm.password
 | |
|                 self.presence = True
 | |
| 
 | |
|             self.test_presence = self.presence is not None
 | |
| 
 | |
|         def __str__(self):
 | |
|             if self.test_presence:
 | |
|                 return "PasswordTest(presence=%s, password=%s)" % \
 | |
|                        (self.presence, self.password)
 | |
|             else:
 | |
|                 return "NoPasswordTest()"
 | |
| 
 | |
|     def __init__(self):
 | |
|         self.offering = None
 | |
|         self.vpc = None
 | |
|         self.vpc_acl_list = None
 | |
|         self.vpc_acl_rule = None
 | |
| 
 | |
|     @contextmanager
 | |
|     def stopped_vm(self, vm):
 | |
|         was_running = (vm.state == VirtualMachine.RUNNING)
 | |
| 
 | |
|         if was_running:
 | |
|             vm.stop(self.api_client)
 | |
|             vm.state = VirtualMachine.STOPPED
 | |
| 
 | |
|         yield
 | |
| 
 | |
|         if was_running:
 | |
|             vm.start(self.api_client)
 | |
|             vm.state = VirtualMachine.RUNNING
 | |
|             vm.ssh_client = None
 | |
| 
 | |
|     def update_template(self, **kwargs):
 | |
|         """Updates value of the guest VM template's password enabled setting
 | |
|         :param passwordenabled:
 | |
|         :type passwordenabled: bool
 | |
|         """
 | |
|         self.debug("Updating value of guest VM template's password enabled "
 | |
|                    "setting")
 | |
|         if not isinstance(self.template, Template):
 | |
|             self.template = Template(self.template.__dict__)
 | |
| 
 | |
|         Template.update(self.template, self.api_client, **kwargs)
 | |
|         response = Template.list(self.api_client,
 | |
|                                  templatefilter="all",
 | |
|                                  id=self.template.id)
 | |
|         self.template = Template(response[0].__dict__)
 | |
|         self.debug("Updated guest VM template")
 | |
| 
 | |
|     def get_userdata_url(self, vm):
 | |
|         """Returns user data URL for the given VM object"""
 | |
|         self.debug("Getting user data url")
 | |
|         nic = vm.nic[0]
 | |
|         gateway = str(nic.gateway)
 | |
|         self.debug("Gateway: " + gateway)
 | |
|         user_data_url = 'curl "http://' + gateway + ':80/latest/user-data"'
 | |
|         return user_data_url
 | |
| 
 | |
|     def generate_ssh_keys(self):
 | |
|         """Generates ssh key pair
 | |
| 
 | |
|         Writes the private key into a temp file and returns the file name
 | |
| 
 | |
|         :returns: generated keypair
 | |
|         :rtype: MySSHKeyPair
 | |
|         """
 | |
|         self.keypair = MySSHKeyPair.create(
 | |
|             self.api_client,
 | |
|             name=random_gen() + ".pem",
 | |
|             account=self.account.user[0].account,
 | |
|             domainid=self.account.domainid)
 | |
| 
 | |
|         self.cleanup.append(self.keypair)
 | |
|         self.debug("Created keypair with name: %s" % self.keypair.name)
 | |
|         self.debug("Writing the private key to local file")
 | |
|         pkfile = tempfile.gettempdir() + os.sep + self.keypair.name
 | |
|         self.keypair.private_key_file = pkfile
 | |
|         self.tmp_files.append(pkfile)
 | |
|         self.debug("File path: %s" % pkfile)
 | |
|         with open(pkfile, "w+") as f:
 | |
|             f.write(self.keypair.privatekey)
 | |
|         os.chmod(pkfile, 0o400)
 | |
| 
 | |
|         return self.keypair
 | |
| 
 | |
|     def validate_acl_rule(self, fw_rule):
 | |
|         pass
 | |
| 
 | |
|     def validate_vm_networking(self, vm):
 | |
|         pass
 | |
| 
 | |
|     def validate_network_networking(self, network, vpc):
 | |
|         pass
 | |
| 
 | |
|     def validate_shared_networking(self, network, vm):
 | |
|         pass
 | |
| 
 | |
|     def validate_StaticNat_rule_For_VM(self, public_ip, network, vm):
 | |
|         self.validate_PublicIPAddress(
 | |
|             public_ip, network, static_nat=True, vm=vm)
 | |
| 
 | |
|     # =========================================================================
 | |
|     # ---             Config Drive Validation helper methods                ---
 | |
|     # =========================================================================
 | |
| 
 | |
|     def _mount_config_drive(self, ssh):
 | |
|         """
 | |
|         This method is to verify whether configdrive iso
 | |
|         is attached to vm or not
 | |
|         Returns mount path if config drive is attached else None
 | |
|         """
 | |
|         mountdir = "/root/iso"
 | |
|         cmd = "blkid -t LABEL='config-2' " \
 | |
|               "/dev/sr? /dev/hd? /dev/sd? /dev/xvd? -o device"
 | |
|         tmp_cmd = [
 | |
|             'bash -c "if [ ! -d {0} ]; then mkdir {0}; fi"'.format(mountdir),
 | |
|             "umount %s" % mountdir]
 | |
|         self.debug("Unmounting drive from %s" % mountdir)
 | |
|         for tcmd in tmp_cmd:
 | |
|             ssh.execute(tcmd)
 | |
| 
 | |
|         self.debug("Trying to find ConfigDrive device")
 | |
|         configDrive = ssh.execute(cmd)
 | |
|         if not configDrive:
 | |
|             self.warn("ConfigDrive is not attached")
 | |
|             return None
 | |
| 
 | |
|         res = ssh.execute("mount {} {}".format(str(configDrive[0]), mountdir))
 | |
|         if str(res).lower().find("mounting read-only") > -1:
 | |
|             self.debug("ConfigDrive iso is mounted at location %s" % mountdir)
 | |
|             return mountdir
 | |
|         else:
 | |
|             return None
 | |
| 
 | |
|     def _get_config_drive_data(self, ssh, file, name, fail_on_missing=True):
 | |
|         """Fetches the content of a file on the config drive
 | |
| 
 | |
|         :param ssh: SSH connection to the VM
 | |
|         :param file: path to the file to fetch
 | |
|         :param name: description of the file
 | |
|         :param fail_on_missing:
 | |
|                  whether the test should fail if the file is missing
 | |
|         :type ssh: marvin.sshClient.SshClient
 | |
|         :type file: str
 | |
|         :type name: str
 | |
|         :type fail_on_missing: bool
 | |
|         :returns: the content of the file
 | |
|         :rtype: str
 | |
|         """
 | |
|         cmd = "cat %s" % file
 | |
|         res = ssh.execute(cmd)
 | |
|         content = '\n'.join(res)
 | |
| 
 | |
|         if fail_on_missing and NO_SUCH_FILE in content:
 | |
|             self.debug("{} is not found".format(name))
 | |
|             self.fail("{} is not found".format(name))
 | |
| 
 | |
|         return content
 | |
| 
 | |
|     def _verify_config_drive_data(self, ssh, file, expected_content, name):
 | |
|         """Verifies that the file contains the expected content
 | |
| 
 | |
|         :param ssh: SSH connection to the VM
 | |
|         :param file: path to the file to verify
 | |
|         :param expected_content:
 | |
|         :param name:
 | |
|         :type ssh: marvin.sshClient.SshClient
 | |
|         :type file: str
 | |
|         :type expected_content: str
 | |
|         :type name: str
 | |
|         """
 | |
|         actual_content = self._get_config_drive_data(ssh, file, name)
 | |
| 
 | |
|         self.debug("Expected {}: {}".format(name, expected_content))
 | |
|         self.debug("Actual {}: {}".format(name, actual_content))
 | |
| 
 | |
|         self.assertEqual(expected_content, actual_content,
 | |
|                          'Userdata found: %s is not equal to expected: %s'
 | |
|                          % (actual_content, expected_content))
 | |
| 
 | |
|     def _verify_userdata(self, ssh, mount_path, userdata):
 | |
|         """
 | |
|         verify Userdata
 | |
| 
 | |
|         :param ssh: SSH connection to the VM
 | |
|         :param mount_path: mount point of the config drive
 | |
|         :param userdata: Expected userdata
 | |
|         :type ssh: marvin.sshClient.SshClient
 | |
|         :type mount_path: str
 | |
|         :type userdata: str
 | |
|         """
 | |
|         self._verify_config_drive_data(
 | |
|             ssh,
 | |
|             mount_path + "/cloudstack/userdata/user_data.txt",
 | |
|             userdata,
 | |
|             "userdata (ACS)"
 | |
|         )
 | |
| 
 | |
|     def _verify_openstack_userdata(self, ssh, mount_path, userdata):
 | |
|         """
 | |
|         verify Userdata in Openstack format
 | |
| 
 | |
|         :param ssh: SSH connection to the VM
 | |
|         :param mount_path: mount point of the config drive
 | |
|         :param userdata: Expected userdata
 | |
|         :type ssh: marvin.sshClient.SshClient
 | |
|         :type mount_path: str
 | |
|         :type userdata: str
 | |
|         """
 | |
|         self._verify_config_drive_data(
 | |
|             ssh,
 | |
|             mount_path + "/openstack/latest/user_data",
 | |
|             userdata,
 | |
|             "userdata (Openstack)"
 | |
|         )
 | |
| 
 | |
|     def _verifyPassword(self, ssh, mount_path, password_test):
 | |
|         """
 | |
|         Verify Password
 | |
|         :param ssh: SSH connection to the VM
 | |
|         :param mount_path: Mount path of the config drive disk
 | |
|         :param password_test: expected Password behavior
 | |
|         :type ssh: marvin.sshClient.SshClient
 | |
|         :type mount_path: str
 | |
|         :type password_test: ConfigDriveUtils.PasswordTest
 | |
|         """
 | |
| 
 | |
|         if not password_test.test_presence:
 | |
|             return
 | |
| 
 | |
|         if password_test.password is not None:
 | |
|             self.debug("Expected VM password is %s " % password_test.password)
 | |
| 
 | |
|         password_file = mount_path + "/cloudstack/password/vm_password.txt"
 | |
|         vmpassword = self._get_config_drive_data(ssh, password_file,
 | |
|                                                  "ConfigDrive password",
 | |
|                                                  fail_on_missing=False)
 | |
| 
 | |
|         password_found = NO_SUCH_FILE not in vmpassword
 | |
| 
 | |
|         self.assertEqual(password_found, password_test.presence,
 | |
|                          "Expected is that password is present: %s "
 | |
|                          " but found is: %s"
 | |
|                          % (password_test.presence, password_found))
 | |
| 
 | |
|         if password_test.password is not None:
 | |
|             self.debug("ConfigDrive password is %s " % vmpassword)
 | |
|             self.debug("Expected Password for vm is %s" %
 | |
|                        password_test.password)
 | |
|             self.assertTrue(password_test.password in vmpassword,
 | |
|                             "Password value test failed, expected %s, was %s" %
 | |
|                             (password_test.password, vmpassword))
 | |
| 
 | |
|     def _verify_ssh_key(self, ssh, mount_path, ssh_key):
 | |
|         """
 | |
|         Verify SSH Key
 | |
|         :param ssh: SSH connection to the VM
 | |
|         :param mount_path: Mount path of the config drive disk
 | |
|         :param ssh_key: expected SSH key
 | |
|         :type ssh: marvin.sshClient.SshClient
 | |
|         :type mount_path: str
 | |
|         :type ssh_key: MySSHKeyPair
 | |
|         """
 | |
| 
 | |
|         self.debug("Fingerprint of Expected sshkey %s is %s " %
 | |
|                    (ssh_key.name, ssh_key.fingerprint))
 | |
|         publicKey_file = mount_path + "/cloudstack/metadata/public-keys.txt"
 | |
|         cmd = "ssh-keygen -lf %s | cut -f2 -d' '" % publicKey_file
 | |
|         res = ssh.execute(cmd)
 | |
|         vmsshkey = str(res[0])
 | |
| 
 | |
|         self.debug("ConfigDrive ssh key is %s " % vmsshkey)
 | |
| 
 | |
|         if NO_SUCH_FILE in vmsshkey:
 | |
|             self.fail("SSH keyfile is not found")
 | |
| 
 | |
|         self.assertEqual(
 | |
|             vmsshkey,
 | |
|             ssh_key.fingerprint,
 | |
|             "Fingerprint of authorized key does not match ssh key fingerprint"
 | |
|         )
 | |
| 
 | |
|     def _verify_metadata(self, vm, ssh, mount_path):
 | |
|         """
 | |
|         verify metadata files in CloudStack format
 | |
| 
 | |
|         :param vm: the VM
 | |
|         :param ssh: SSH connection to the VM
 | |
|         :param mount_path: mount point of the config drive
 | |
|         :type vm: VirtualMachine
 | |
|         :type ssh: marvin.sshClient.SshClient
 | |
|         :type mount_path: str
 | |
|         """
 | |
| 
 | |
|         metadata_dir = mount_path + "/cloudstack/metadata/"
 | |
|         vm_files = ["availability-zone.txt",
 | |
|                     "service-offering.txt",
 | |
|                     "instance-id.txt",
 | |
|                     "vm-id.txt",
 | |
|                     "local-hostname.txt",
 | |
|                     "local-ipv4.txt",
 | |
|                     "public-ipv4.txt"]
 | |
|         # Verify hostname if the appropriate settings are true
 | |
|         configs = Configurations.list(
 | |
|             self.api_client,
 | |
|             name="global.allow.expose.host.hostname",
 | |
|             listall=True
 | |
|         )
 | |
|         exposeHypevisorHostnameGS = configs[0].value
 | |
| 
 | |
|         configs = Configurations.list(
 | |
|             self.api_client,
 | |
|             name="account.allow.expose.host.hostname",
 | |
|             listall=True
 | |
|         )
 | |
| 
 | |
|         exposeHypevisorHostnameAcc = configs[0].value
 | |
| 
 | |
|         if exposeHypevisorHostnameGS == 'true' and exposeHypevisorHostnameAcc == 'true':
 | |
|             vm_files.append("hypervisor-host-name.txt")
 | |
| 
 | |
|         configs = Configurations.list(
 | |
|             self.api_client,
 | |
|             name="metadata.allow.expose.domain",
 | |
|             listall=True
 | |
|         )
 | |
| 
 | |
|         exposeDomain = configs[0].value
 | |
| 
 | |
|         if exposeDomain == 'true':
 | |
|             vm_files.append("cloud-domain.txt")
 | |
|             vm_files.append("cloud-domain-id.txt")
 | |
| 
 | |
|         def get_name(vm_file):
 | |
|             return "{} metadata".format(
 | |
|                 vm_file.split('.'[-1].replace('-', ' '))
 | |
|             )
 | |
| 
 | |
|         metadata = {vm_file:
 | |
|                         self._get_config_drive_data(ssh,
 | |
|                                                     metadata_dir + vm_file,
 | |
|                                                     get_name(vm_file))
 | |
|                     for vm_file in vm_files}
 | |
| 
 | |
|         self.assertEqual(
 | |
|             str(metadata["availability-zone.txt"]),
 | |
|             self.zone.name,
 | |
|             "Zone name inside metadata does not match with the zone"
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             str(metadata["local-hostname.txt"]),
 | |
|             vm.name,
 | |
|             "vm name inside metadata does not match with the "
 | |
|             "instance name"
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             str(metadata["vm-id.txt"]),
 | |
|             vm.id,
 | |
|             "vm name inside metadata does not match with the "
 | |
|             "instance name"
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             str(metadata["instance-id.txt"]),
 | |
|             vm.id,
 | |
|             "vm name inside metadata does not match with the "
 | |
|             "instance name"
 | |
|         )
 | |
|         self.assertEqual(
 | |
|             str(metadata["service-offering.txt"]),
 | |
|             vm.serviceofferingname,
 | |
|             "Service offering inside metadata does not match "
 | |
|             "with the instance offering"
 | |
|         )
 | |
| 
 | |
|         if exposeHypevisorHostnameGS == 'true' and exposeHypevisorHostnameAcc == 'true':
 | |
|             hostname = vm.hostname
 | |
| 
 | |
|             self.debug("Verifying hypervisor hostname of the VM: %s" % vm.name)
 | |
|             self.assertEqual(
 | |
|                 str(metadata["hypervisor-host-name.txt"]),
 | |
|                 hostname,
 | |
|                 "Hostname in the metadata file does not match the host "
 | |
|                 "on which the VM is spawned"
 | |
|             )
 | |
| 
 | |
|         if exposeDomain == 'true':
 | |
|             self.assertEqual(
 | |
|                 str(metadata["cloud-domain-id.txt"]),
 | |
|                 self.domain.id,
 | |
|                 "Domain name in the metadata file does not match expected"
 | |
|             )
 | |
|             self.assertEqual(
 | |
|                 str(metadata["cloud-domain.txt"]),
 | |
|                 self.domain.name,
 | |
|                 "Domain name in the metadata file does not match expected"
 | |
|             )
 | |
| 
 | |
|         return
 | |
| 
 | |
|     def _verify_openstack_metadata(self, ssh, mount_path):
 | |
|         """
 | |
|         verify existence of metadata and user data files
 | |
|         in OpenStack format
 | |
| 
 | |
|         :param ssh: SSH connection to the VM
 | |
|         :param mount_path: mount point of the config drive
 | |
|         :type ssh: marvin.sshClient.SshClient
 | |
|         :type mount_path: str
 | |
|         """
 | |
|         openstackdata_dir = mount_path + "/openstack/latest/"
 | |
|         openstackdata_files = ["meta_data.json",
 | |
|                                "vendor_data.json",
 | |
|                                "network_data.json"]
 | |
|         for file in openstackdata_files:
 | |
|             res = ssh.execute("cat %s" % openstackdata_dir + file)
 | |
|             if NO_SUCH_FILE in res[0]:
 | |
|                 self.fail("{} file not found in vm openstack".format(file))
 | |
| 
 | |
|     def _umount_config_drive(self, ssh, mount_path):
 | |
|         """unmount config drive inside guest vm
 | |
| 
 | |
|         :param ssh: SSH connection to the VM
 | |
|         :param mount_path: mount point of the config drive
 | |
|         :type ssh: marvin.sshClient.SshClient
 | |
|         :type mount_path: str
 | |
|         """
 | |
|         ssh.execute("umount -d %s" % mount_path)
 | |
|         # Give the VM time to unlock the iso device
 | |
|         time.sleep(0.5)
 | |
|         # Verify umount
 | |
|         result = ssh.execute("ls %s" % mount_path)
 | |
|         self.assertTrue(len(result) == 0,
 | |
|                         "After umount directory should be empty "
 | |
|                         "but contains: %s" % result)
 | |
| 
 | |
|     # =========================================================================
 | |
|     # ---                    Gherkin style helper methods                   ---
 | |
|     # =========================================================================
 | |
| 
 | |
|     def given_template_password_enabled_is(self, new_state):
 | |
|         """Updates value of the guest VM template's password enabled setting
 | |
|         :param new_state:
 | |
|         :type new_state: bool
 | |
|         """
 | |
|         orig_state = self.template.passwordenabled
 | |
|         self.debug("Updating guest VM template to password enabled "
 | |
|                    "from %s to %s" % (orig_state, new_state))
 | |
|         self.update_template(passwordenabled=new_state)
 | |
|         self.assertEqual(self.template.passwordenabled, new_state,
 | |
|                          "Guest VM template is not password enabled")
 | |
|         return orig_state
 | |
| 
 | |
|     def given_config_drive_provider_is(self, new_state):
 | |
|         """
 | |
|         Enables or disables the ConfigDrive Service Provider
 | |
| 
 | |
|         :param new_state: "Enabled" | "Disabled"
 | |
|         :type new_state: str
 | |
|         :return: original state
 | |
|         :rtype: str
 | |
|         """
 | |
|         self.debug("Updating Service Provider ConfigDrive to %s" % new_state)
 | |
|         configdriveprovider = self.get_configdrive_provider()
 | |
|         orig_state = configdriveprovider.state
 | |
|         NetworkServiceProvider.update(self.api_client,
 | |
|                                       configdriveprovider.id,
 | |
|                                       state=new_state)
 | |
|         self.validate_NetworkServiceProvider("ConfigDrive", state=new_state)
 | |
|         return orig_state
 | |
| 
 | |
|     def given_a_network_offering(self, offering_name):
 | |
|         self.offering = self.create_NetworkOffering(self._get_test_data(
 | |
|             offering_name))
 | |
| 
 | |
|     def given_a_network_offering_with_configdrive(self):
 | |
|         self.given_a_network_offering(self.get_network_offering_name())
 | |
| 
 | |
|     def given_a_network_offering_for_vpc_with_configdrive(self):
 | |
|         self.given_a_network_offering(self.get_network_offering_name_for_vpc())
 | |
| 
 | |
|     def given_a_vpc_with_offering(self, offering_name):
 | |
|         self.given_config_drive_provider_is("Enabled")
 | |
|         create_vpc = self.verify_vpc_creation(
 | |
|             offering_name=offering_name)
 | |
|         self.assertTrue(create_vpc.success,
 | |
|                         "Vpc found success = %s, expected success = %s"
 | |
|                         % (str(create_vpc.success), 'True'))
 | |
|         self.vpc_acl_list = self.create_NetworkAclList(
 | |
|             name="acl", description="acl", vpc=create_vpc.vpc)
 | |
|         self.vpc_acl_rule = self.create_NetworkAclRule(
 | |
|             self.test_data["ingress_rule"], acl_list=self.vpc_acl_list)
 | |
| 
 | |
|         self.vpc = create_vpc.vpc
 | |
| 
 | |
|     def given_a_vpc(self):
 | |
|         self.given_a_vpc_with_offering(self.get_vpc_offering_name())
 | |
| 
 | |
|     def when_I_create_a_network_with_that_offering(self, gateway='10.1.1.1'):
 | |
|         return self.verify_network_creation(
 | |
|             offering=self.offering,
 | |
|             gateway=gateway)
 | |
| 
 | |
|     def when_I_create_a_vpc_tier_with_that_offering(self, gateway='10.1.1.1'):
 | |
|         return self.verify_network_creation(
 | |
|             offering=self.offering,
 | |
|             gateway=gateway,
 | |
|             vpc=self.vpc,
 | |
|             acl_list=self.vpc_acl_list)
 | |
| 
 | |
|     def when_I_restart_the_vpc_with(self, cleanup=True):
 | |
|         self.restart_Vpc(self.vpc, cleanup=cleanup)
 | |
|         self.validate_vpc(self.vpc, state="Enabled")
 | |
| 
 | |
|     def when_I_restart_the_network_with(self, network, cleanup):
 | |
|         network.restart(self.api_client, cleanup=cleanup)
 | |
|         self.validate_Network(network, state="Implemented")
 | |
| 
 | |
|     def when_I_deploy_a_vm(self, networks, acl_item=None,
 | |
|                            vpc=None, keypair=None):
 | |
| 
 | |
|         test_data = self.test_data["virtual_machine_userdata"]
 | |
| 
 | |
|         vm = self.create_VM(
 | |
|             networks,
 | |
|             testdata=test_data,
 | |
|             keypair=keypair)
 | |
| 
 | |
|         # Check VM
 | |
|         self.check_VM_state(vm, state="Running")
 | |
|         self.validate_vm_networking(vm)
 | |
| 
 | |
|         if keypair and vm.passwordenabled:
 | |
|             self._decrypt_password(vm)
 | |
| 
 | |
|         vm.key_pair = self.keypair if keypair else None
 | |
|         vm.user_data = test_data["userdata"]
 | |
|         vm.password_test = self.PasswordTest(vm=vm) \
 | |
|             if vm.passwordenabled \
 | |
|             else self.PasswordTest(expect_pw=False)
 | |
| 
 | |
|         # Check networks
 | |
|         network_list = networks \
 | |
|             if hasattr(networks, "__iter__") \
 | |
|             else [networks]
 | |
| 
 | |
|         for network in network_list:
 | |
|             self.validate_Network(network, state="Implemented")
 | |
|             if network.type == "Shared":
 | |
|                 self.validate_shared_networking(network, self.vpc)
 | |
|             else:
 | |
|                 self.validate_network_networking(network, self.vpc)
 | |
| 
 | |
|         if self.vpc_acl_rule is not None:
 | |
|             self.validate_acl_rule(self.vpc_acl_rule)
 | |
| 
 | |
|         return vm
 | |
| 
 | |
|     def when_I_deploy_a_vm_with_keypair_in(self, network):
 | |
|         return self.when_I_deploy_a_vm(network,
 | |
|                                        keypair=self.keypair.name)
 | |
| 
 | |
|     def when_I_create_a_static_nat_ip_to(self, vm, network, public_ip=None):
 | |
|         """
 | |
|         Creates and verifies (Ingress) firewall rule
 | |
|         with a Static NAT rule enabled public IP
 | |
|         :type vm: VirtualMachine
 | |
|         :type network: Network
 | |
|         :type public_ip: PublicIPAddress
 | |
|         :rtype: PublicIPAddress
 | |
|         """
 | |
| 
 | |
|         if not public_ip:
 | |
|             public_ip = self.acquire_PublicIPAddress(network, vpc=self.vpc)
 | |
| 
 | |
|         self.debug("Creating and verifying firewall rule")
 | |
|         self.create_StaticNatRule_For_VM(vm, public_ip, network)
 | |
| 
 | |
|         # Verification
 | |
| 
 | |
|         # self.validate_StaticNat_rule_For_VM(public_ip, network, vm)
 | |
| 
 | |
|         if not self.vpc:
 | |
|             fw_rule = self.create_FirewallRule(public_ip,
 | |
|                                                self.test_data["ingress_rule"])
 | |
|             self.validate_acl_rule(fw_rule)
 | |
|             self.debug("Successfully created and verified firewall rule")
 | |
| 
 | |
|         self.cleanup.append(public_ip)
 | |
| 
 | |
|         return public_ip
 | |
| 
 | |
|     def then_creating_a_network_with_that_offering_fails(self):
 | |
|         create_network = self.verify_network_creation(
 | |
|             offering=self.offering,
 | |
|             gateway='10.6.6.6')
 | |
|         self.assertFalse(create_network.success,
 | |
|                          'Network found success = %s, expected success =%s'
 | |
|                          % (str(create_network.success), 'False'))
 | |
| 
 | |
|     def then_creating_a_vpc_tier_with_that_offering_fails(self):
 | |
|         create_network = self.verify_network_creation(
 | |
|             offering=self.offering,
 | |
|             gateway='10.6.6.6',
 | |
|             vpc=self.vpc,
 | |
|             acl_list=self.vpc_acl_list)
 | |
|         self.assertFalse(create_network.success,
 | |
|                          'Create Tier success = %s, expected success =%s'
 | |
|                          % (str(create_network.success), 'False'))
 | |
| 
 | |
|     def then_the_network_has(self, network_result, state):
 | |
|         self.validate_Network(network_result.network, state=state)
 | |
| 
 | |
|     def then_the_network_is_successfully_created(self, network):
 | |
|         self.assertTrue(network.success,
 | |
|                         'Network found success = %s, expected success = %s'
 | |
|                         % (str(network.success), 'True'))
 | |
| 
 | |
|     def then_vr_is_as_expected(self, network):
 | |
|         self.check_Router_state(network=network, state="Running")
 | |
| 
 | |
|     def then_config_drive_is_as_expected(self, vm,
 | |
|                                          public_ip,
 | |
|                                          metadata=False,
 | |
|                                          reconnect=True):
 | |
|         """Verify Config Drive Content
 | |
| 
 | |
|         :param vm: Virtual Machine
 | |
|         :param public_ip: public IP
 | |
|         :param metadata: whether to verify metadata
 | |
|         :param reconnect: recreate SSH Connection
 | |
|         :type vm: VirtualMachine
 | |
|         :type public_ip: PublicIPAddress
 | |
|         :type metadata: bool
 | |
|         :type reconnect: bool
 | |
|         """
 | |
| 
 | |
|         if self.isSimulator:
 | |
|             self.debug("Simulator Environment: "
 | |
|                        "Skipping Config Drive content verification")
 | |
|             return
 | |
| 
 | |
|         self.debug("SSHing into the VM %s" % vm.name)
 | |
| 
 | |
|         ssh = self.ssh_into_VM(vm, public_ip, reconnect=reconnect, keypair=vm.key_pair)
 | |
| 
 | |
|         d = {x.name: x for x in ssh.logger.handlers}
 | |
|         ssh.logger.handlers = list(d.values())
 | |
| 
 | |
|         mount_path = self._mount_config_drive(ssh)
 | |
|         self.assertIsNotNone(mount_path,
 | |
|                              'ConfigdriveIso is not attached to vm')
 | |
|         if metadata:
 | |
|             self.debug("Verifying metadata for vm: %s" % vm.name)
 | |
|             self._verify_metadata(vm, ssh, mount_path)
 | |
|             self.debug("Verifying openstackdata for vm: %s" % vm.name)
 | |
|             self._verify_openstack_metadata(ssh, mount_path)
 | |
| 
 | |
|         if hasattr(vm, "user_data") and vm.user_data is not None:
 | |
|             self.debug("Verifying userdata for vm: %s" % vm.name)
 | |
|             self._verify_userdata(ssh, mount_path, vm.user_data)
 | |
|             self._verify_openstack_userdata(ssh, mount_path, vm.user_data)
 | |
| 
 | |
|         if hasattr(vm, "password_test") \
 | |
|                 and vm.password_test is not None \
 | |
|                 and vm.password_test.test_presence:
 | |
|             self.debug("Verifying password for vm: %s" % vm.name)
 | |
|             self._verifyPassword(ssh, mount_path, vm.password_test)
 | |
| 
 | |
|         if hasattr(vm, "key_pair") and vm.key_pair is not None:
 | |
|             self.debug("Verifying sshkey for vm: %s" % vm.name)
 | |
|             self._verify_ssh_key(ssh, mount_path, vm.key_pair)
 | |
| 
 | |
|         self._umount_config_drive(ssh, mount_path)
 | |
| 
 | |
|     # =========================================================================
 | |
| 
 | |
|     def _get_test_data(self, key):
 | |
|         return self.test_data[key]
 | |
| 
 | |
|     def get_vpc_offering_name(self):
 | |
|         return "vpc_offering_configdrive"
 | |
| 
 | |
|     def get_network_offering_name(self):
 | |
|         return "isolated_configdrive_network_offering"
 | |
| 
 | |
|     def get_network_offering_name_for_vpc(self):
 | |
|         return "vpc_network_offering_configdrive"
 | |
| 
 | |
|     def get_configdrive_provider(self):
 | |
|         return NetworkServiceProvider.list(
 | |
|             self.api_client,
 | |
|             name="ConfigDrive")[0]
 | |
| 
 | |
|     def verify_network_creation(self, offering=None,
 | |
|                                 offering_name=None,
 | |
|                                 gateway=None,
 | |
|                                 vpc=None, acl_list=None, testdata=None):
 | |
|         """
 | |
|         Creates a network
 | |
| 
 | |
|         :param offering: Network Offering
 | |
|         :type offering: NetworkOffering
 | |
|         :param offering_name: Offering name
 | |
|         :type offering_name: Optional[str]
 | |
|         :param gateway: gateway
 | |
|         :type gateway: str
 | |
|         :param vpc: in case of a VPC tier, the parent VPC
 | |
|         :type vpc: VPC
 | |
|         :param acl_list: in case of a VPC tier, the acl list
 | |
|         :type acl_list: NetworkACLList
 | |
|         :param testdata: Test data
 | |
|         :type testdata: dict
 | |
|         :return: Network Creation Result
 | |
|         :rtype: ConfigDriveUtils.CreateResult
 | |
|         """
 | |
|         if offering is None:
 | |
|             self.debug("Creating network offering...")
 | |
|             offering = self.create_NetworkOffering(
 | |
|                 self._get_test_data(offering_name))
 | |
|             self.validate_NetworkOffering(offering, state="Enabled")
 | |
|         try:
 | |
|             network = self.create_Network(offering,
 | |
|                                           gateway=gateway,
 | |
|                                           vpc=vpc,
 | |
|                                           acl_list=acl_list,
 | |
|                                           testdata=testdata)
 | |
|             return self.CreateResult(True, offering=offering, network=network)
 | |
|         except Exception as e:
 | |
|             self.debug("Exception: %s" % e)
 | |
|             return self.CreateResult(False, offering=offering)
 | |
| 
 | |
|     def verify_vpc_creation(self, offering=None, offering_name=None):
 | |
|         """
 | |
|         Creates a VPC
 | |
| 
 | |
|         :param offering: VPC Offering
 | |
|         :type offering: VpcOffering
 | |
|         :param offering_name: Offering name
 | |
|         :type offering_name: Optional[str]
 | |
|         :return: VPC Creation Result
 | |
|         :rtype: CreateResult
 | |
|         """
 | |
|         if offering is None:
 | |
|             self.debug("Creating VPC offering...")
 | |
|             offering = self.create_VpcOffering(
 | |
|                 self._get_test_data(offering_name))
 | |
|             self.validate_VpcOffering(offering, state="Enabled")
 | |
|         try:
 | |
|             vpc = self.create_vpc(offering, cidr='10.1.0.0/16')
 | |
|             self.validate_vpc(vpc, state="Enabled")
 | |
|             return self.CreateResult(True, offering=offering, vpc=vpc)
 | |
|         except Exception as e:
 | |
|             self.debug("Exception: %s" % e)
 | |
|             return self.CreateResult(False, offering=offering)
 | |
| 
 | |
|     def _find_nic(self, vm, network):
 | |
|         vm = VirtualMachine.list(self.api_client, id=vm.id)[0]
 | |
|         return next(nic for nic in vm.nic if nic.networkid == network.id)
 | |
| 
 | |
|     def get_public_shared_ip(self, vm, network):
 | |
|         nic = self._find_nic(vm, network)
 | |
|         return PublicIPAddress({"ipaddress": nic})
 | |
| 
 | |
|     def plug_nic(self, vm, network):
 | |
|         vm.add_nic(self.api_client, network.id)
 | |
|         self.debug("Added NIC in VM with ID - %s and network with ID - %s"
 | |
|                    % (vm.id, network.id))
 | |
|         vm.password_test = ConfigDriveUtils.PasswordTest(expect_pw=False)
 | |
| 
 | |
|     def unplug_nic(self, vm, network):
 | |
|         nic = self._find_nic(vm, network)
 | |
|         vm.remove_nic(self.api_client, nic.id)
 | |
|         self.debug("Removed NIC with ID - %s in VM with ID - %s and "
 | |
|                    "network with ID - %s" % (nic.id, vm.id, network.id))
 | |
| 
 | |
|     def update_default_nic(self, vm, network):
 | |
|         nic = self._find_nic(vm, network)
 | |
|         vm.update_default_nic(self.api_client, nic.id)
 | |
|         self.debug("Removed NIC with ID - %s in VM with ID - %s and "
 | |
|                    "network with ID - %s" % (nic.id, vm.id, network.id))
 | |
| 
 | |
|     def when_I_update_userdata(self, vm, new_user_data):
 | |
|         """Updates the user data of a VM
 | |
| 
 | |
|         :param public_ip: Public ip of the VM
 | |
|         :param vm: the Virtual Machine
 | |
|         :param new_user_data: UserData to set
 | |
|         :type public_ip: PublicIPAddress
 | |
|         :type vm: VirtualMachine
 | |
|         :type new_user_data: str
 | |
|         :returns: User data in base64 format
 | |
|         :rtype: str
 | |
|         """
 | |
|         self.debug("Updating userdata for VM - %s" % vm.name)
 | |
|         updated_user_data = base64.encodebytes(new_user_data.encode()).decode()
 | |
|         with self.stopped_vm(vm):
 | |
|             vm.update(self.api_client, userdata=updated_user_data)
 | |
| 
 | |
|         vm.user_data = new_user_data
 | |
|         if vm.state == VirtualMachine.RUNNING:
 | |
|             vm.password_test = ConfigDriveUtils.PasswordTest(expect_pw=False)
 | |
| 
 | |
|     def update_and_validate_userdata(self, vm, new_user_data, public_ip=None,
 | |
|                                      **kwargs):
 | |
|         """Updates the user data of a VM
 | |
| 
 | |
|         :param public_ip: Public ip of the VM
 | |
|         :param vm: the Virtual Machine
 | |
|         :param new_user_data: UserData to set
 | |
|         :type public_ip: PublicIPAddress
 | |
|         :type vm: VirtualMachine
 | |
|         :type new_user_data: str
 | |
|         :returns: User data in base64 format
 | |
|         :rtype: str
 | |
|         """
 | |
| 
 | |
|         self.when_I_update_userdata(vm, new_user_data)
 | |
|         self.then_config_drive_is_as_expected(vm, public_ip, **kwargs)
 | |
| 
 | |
|     def when_I_reset_the_password(self, vm):
 | |
|         """Resets the password of a VM
 | |
| 
 | |
|         :param vm: the Virtual Machine
 | |
|         :type vm: VirtualMachine
 | |
|         :returns: The new password
 | |
|         :rtype: str
 | |
|         """
 | |
| 
 | |
|         self.debug("Resetting password for VM - %s" % vm.name)
 | |
|         with self.stopped_vm(vm):
 | |
|             vm.password = vm.resetPassword(self.api_client)
 | |
| 
 | |
|         self.debug("Password reset to - %s" % vm.password)
 | |
|         self.debug("VM - %s password - %s !" %
 | |
|                    (vm.name, vm.password))
 | |
| 
 | |
|         vm.password_test = ConfigDriveUtils.PasswordTest(vm=vm)
 | |
| 
 | |
|         return vm.password
 | |
| 
 | |
|     def stop_and_start_vm(self, vm):
 | |
|         self.debug("+++ Verify userdata after stopstartVM - %s" % vm.name)
 | |
|         with self.stopped_vm(vm):
 | |
|             pass
 | |
| 
 | |
|         vm.password_test = ConfigDriveUtils.PasswordTest(expect_pw=False)
 | |
| 
 | |
|     def delete_and_recover_vm(self, vm):
 | |
|         self.debug("+++ Verify userdata after VM recover- %s" % vm.name)
 | |
|         vm.delete(self.api_client, expunge=False)
 | |
| 
 | |
|         self.debug("Recover VM - %s" % vm.name)
 | |
|         vm.recover(self.api_client)
 | |
|         vm.start(self.api_client)
 | |
|         vm.state = VirtualMachine.RUNNING
 | |
| 
 | |
|         vm.password_test = ConfigDriveUtils.PasswordTest(expect_pw=False)
 | |
| 
 | |
|     def wait_until_done(self, thread_list, name):
 | |
|         for aThread in thread_list:
 | |
|             self.debug("[Concurrency]Join %s for vm %s" % (name,
 | |
|                                                            aThread.get_vm()))
 | |
|             aThread.join()
 | |
| 
 | |
|     def update_and_validate_sshkeypair(self, vm, public_ip=None):
 | |
|         """
 | |
| 
 | |
|         :type vm: VirtualMachine
 | |
|         """
 | |
| 
 | |
|         self.generate_ssh_keys()
 | |
| 
 | |
|         with self.stopped_vm(vm):
 | |
|             vm_new_ssh = vm.resetSshKey(self.api_client,
 | |
|                                         keypair=self.keypair.name,
 | |
|                                         account=self.account.user[0].account,
 | |
|                                         domainid=self.account.domainid)
 | |
| 
 | |
|         self.debug("Sshkey reset to - %s" % self.keypair.name)
 | |
| 
 | |
|         vm.details = vm_new_ssh.details
 | |
| 
 | |
|         # reset SSH key also removes the password (see https://github.com/apache/cloudstack/pull/4819)
 | |
|         vm.password_test = ConfigDriveUtils.PasswordTest(expect_pw=False)
 | |
|         vm.key_pair = self.keypair
 | |
| 
 | |
|         if public_ip:
 | |
|             self.then_config_drive_is_as_expected(vm, public_ip, metadata=True)
 | |
| 
 | |
|     def _decrypt_password(self, vm):
 | |
|         """Decrypt VM password
 | |
| 
 | |
|         the new password is available in VM detail,
 | |
|         named "Encrypted.Password".
 | |
|         It is encrypted using the SSH Public Key,
 | |
|         and thus can be decrypted using the SSH Private Key
 | |
| 
 | |
|         :type vm: VirtualMachine
 | |
|         """
 | |
|         password_ = vm.details['Encrypted.Password']
 | |
|         if password_ is not None:
 | |
|             from base64 import b64decode
 | |
|             try:
 | |
|                 from Crypto.PublicKey import RSA
 | |
|                 from Crypto.Cipher import PKCS1_v1_5
 | |
|                 with open(self.keypair.private_key_file, "r") as pkfile:
 | |
|                     key = RSA.importKey(pkfile.read())
 | |
|                     cipher = PKCS1_v1_5.new(key)
 | |
|                 new_password = cipher.decrypt(b64decode(password_), None)
 | |
|                 if new_password:
 | |
|                     vm.password = new_password.decode()
 | |
|                 else:
 | |
|                     self.fail("Failed to decrypt new password")
 | |
|             except ImportError:
 | |
|                 # No pycrypto, fallback to openssl
 | |
|                 cmd = ["echo " + password_ +
 | |
|                        " | base64 -d"
 | |
|                        " | openssl rsautl -decrypt -inkey "
 | |
|                        + self.keypair.private_key_file
 | |
|                        + " 2> /dev/null"
 | |
|                        ]
 | |
| 
 | |
|                 new_password = subprocess.check_output(cmd, shell=True)
 | |
|                 self.debug("Decrypted password %s" % new_password)
 | |
|                 if new_password:
 | |
|                     vm.password = new_password
 | |
|                 else:
 | |
|                     self.fail("Failed to decrypt new password")
 | |
| 
 | |
|     def add_subnet_to_shared_network_and_verify(self, network, services):
 | |
|         """verify required nic is present in the VM"""
 | |
| 
 | |
|         self.debug("Going to add new ip range in shared network %s" %
 | |
|                    network.name)
 | |
|         cmd = createVlanIpRange.createVlanIpRangeCmd()
 | |
|         cmd.networkid = network.id
 | |
|         cmd.gateway = services["gateway"]
 | |
|         cmd.netmask = services["netmask"]
 | |
|         cmd.startip = services["startip"]
 | |
|         cmd.endip = services["endip"]
 | |
|         cmd.forVirtualNetwork = services["forvirtualnetwork"]
 | |
|         addedsubnet = self.api_client.createVlanIpRange(cmd)
 | |
| 
 | |
|         self.debug("verify above iprange is successfully added in shared "
 | |
|                    "network %s or not" % network.name)
 | |
| 
 | |
|         cmd1 = listVlanIpRanges.listVlanIpRangesCmd()
 | |
|         cmd1.networkid = network.id
 | |
|         cmd1.id = addedsubnet.vlan.id
 | |
| 
 | |
|         allsubnets = self.api_client.listVlanIpRanges(cmd1)
 | |
|         self.assertEqual(
 | |
|             allsubnets[0].id,
 | |
|             addedsubnet.vlan.id,
 | |
|             "Check New subnet is successfully added to the shared Network"
 | |
|         )
 | |
|         return addedsubnet
 | |
| 
 | |
|     def ssh_into_VM(self, vm, public_ip, reconnect=True, keypair=None):
 | |
|         pass
 | |
| 
 | |
|     def delete(self, obj, **kwargs):
 | |
|         if isinstance(obj, VirtualMachine) and "expunge" not in kwargs:
 | |
|             kwargs["expunge"] = True
 | |
| 
 | |
|         obj.delete(self.api_client, **kwargs)
 | |
| 
 | |
|         if obj in self.cleanup:
 | |
|             self.cleanup.remove(obj)
 | |
| 
 | |
| 
 | |
| class TestConfigDrive(cloudstackTestCase, ConfigDriveUtils):
 | |
|     """Test user data and password reset functionality
 | |
|     using configDrive
 | |
|     """
 | |
| 
 | |
|     def __init__(self, methodName='runTest'):
 | |
|         super(TestConfigDrive, self).__init__(methodName)
 | |
|         ConfigDriveUtils.__init__(self)
 | |
| 
 | |
|     @classmethod
 | |
|     def setUpClass(cls):
 | |
|         # We want to fail quicker, if it's a failure
 | |
|         socket.setdefaulttimeout(60)
 | |
| 
 | |
|         test_client = super(TestConfigDrive, cls).getClsTestClient()
 | |
|         cls.api_client = test_client.getApiClient()
 | |
|         cls.db_client = test_client.getDbConnection()
 | |
|         cls.test_data = test_client.getParsedTestDataConfig()
 | |
|         cls.test_data.update(Services().services)
 | |
|         cls._cleanup = []
 | |
| 
 | |
|         # Get Zone, Domain and templates
 | |
|         cls.zone = get_zone(cls.api_client)
 | |
|         cls.domain = get_domain(cls.api_client)
 | |
| 
 | |
|         cls.hypervisor = cls.testClient.getHypervisorInfo()
 | |
|         cls.template = get_test_template(cls.api_client,
 | |
|                                          cls.zone.id,
 | |
|                                          cls.hypervisor,
 | |
|                                          cls.test_data["test_templates"])
 | |
|         cls.test_data["virtual_machine"]["zoneid"] = cls.zone.id
 | |
|         cls.test_data["virtual_machine"]["template"] = cls.template.id
 | |
| 
 | |
|         # Create service offering
 | |
|         cls.service_offering = ServiceOffering.create(
 | |
|             cls.api_client,
 | |
|             cls.test_data["service_offering"])
 | |
|         cls._cleanup.append(cls.service_offering)
 | |
| 
 | |
|         hypervisors = Hypervisor.list(cls.api_client, zoneid=cls.zone.id)
 | |
|         cls.isSimulator = any(h.name == "Simulator" for h in hypervisors)
 | |
|         return
 | |
| 
 | |
|     def setUp(self):
 | |
|         # Create an account
 | |
|         self.cleanup = []
 | |
|         self.account = Account.create(self.api_client,
 | |
|                                       self.test_data["account"],
 | |
|                                       admin=True,
 | |
|                                       domainid=self.domain.id
 | |
|                                       )
 | |
|         self.cleanup.append(self.account)
 | |
|         self.tmp_files = []
 | |
|         self.generate_ssh_keys()
 | |
|         return
 | |
| 
 | |
|     @classmethod
 | |
|     def tearDownClass(cls):
 | |
|         super(TestConfigDrive, cls).tearDownClass()
 | |
| 
 | |
|     def tearDown(self):
 | |
|         super(TestConfigDrive,self).tearDown()
 | |
| 
 | |
|         for tmp_file in self.tmp_files:
 | |
|             os.remove(tmp_file)
 | |
|         self.debug("Cleanup complete!")
 | |
| 
 | |
|     # create_StaticNatRule_For_VM - Creates Static NAT rule on the given
 | |
|     # public IP for the given VM in the given network
 | |
|     def create_StaticNatRule_For_VM(self, vm, public_ip, network,
 | |
|                                     vmguestip=None):
 | |
|         self.debug("Enabling Static NAT rule on public IP - %s for VM with ID "
 | |
|                    "- %s in network with ID - %s" %
 | |
|                    (public_ip.ipaddress.ipaddress, vm.id, network.id))
 | |
|         static_nat_rule = StaticNATRule.enable(
 | |
|             self.api_client,
 | |
|             ipaddressid=public_ip.ipaddress.id,
 | |
|             virtualmachineid=vm.id,
 | |
|             networkid=network.id,
 | |
|             vmguestip=vmguestip
 | |
|         )
 | |
|         self.debug("Static NAT rule enabled on public IP - %s for VM with ID "
 | |
|                    "- %s in network with ID - %s" %
 | |
|                    (public_ip.ipaddress.ipaddress, vm.id, network.id))
 | |
|         return static_nat_rule
 | |
| 
 | |
|     def create_FirewallRule(self, public_ip, rule=None):
 | |
|         """Creates an Ingress Firewall Rule on the given public IP
 | |
|            to allow traffic to a VM in an isolated network
 | |
|         :param public_ip: Static NAT rule enabled public IP
 | |
|         :param rule: (optional) Rule to add, defaults to test_data
 | |
|         :type public_ip: PublicIPAddress
 | |
|         """
 | |
|         if not rule:
 | |
|             rule = self.test_data["ingress_rule"]
 | |
|         self.debug("Adding an (Ingress) Firewall rule to make Guest VMs "
 | |
|                    "accessible through Static NAT rule - %s" % rule)
 | |
|         return FireWallRule.create(self.api_client,
 | |
|                                    ipaddressid=public_ip.ipaddress.id,
 | |
|                                    protocol=rule["protocol"],
 | |
|                                    cidrlist=rule["cidrlist"],
 | |
|                                    startport=rule["startport"],
 | |
|                                    endport=rule["endport"]
 | |
|                                    )
 | |
| 
 | |
|     # validate_NetworkServiceProvider - Validates the given Network Service
 | |
|     # Provider in the Physical Network, matches the given provider
 | |
|     # name and state against the list of providers fetched
 | |
|     def validate_NetworkServiceProvider(self, provider_name, state=None):
 | |
|         """Validates the Network Service Provider in the Physical
 | |
|         Network"""
 | |
|         self.debug("Validating the creation and state of Network Service "
 | |
|                    "Provider - %s" % provider_name)
 | |
|         providers = NetworkServiceProvider.list(
 | |
|             self.api_client,
 | |
|             name=provider_name
 | |
|         )
 | |
|         self.assertEqual(isinstance(providers, list), True,
 | |
|                          "List Network Service Provider should return a "
 | |
|                          "valid list"
 | |
|                          )
 | |
|         self.assertEqual(provider_name, providers[0].name,
 | |
|                          "Name of the Network Service Provider should match "
 | |
|                          "with the returned list data"
 | |
|                          )
 | |
|         if state:
 | |
|             self.assertEqual(providers[0].state, state,
 | |
|                              "Network Service Provider state should be '%s'" %
 | |
|                              state
 | |
|                              )
 | |
|         self.debug("Successfully validated the creation and state of Network "
 | |
|                    "Service Provider - %s" % provider_name)
 | |
| 
 | |
|     # validate_PublicIPAddress - Validates if the given public IP address is in
 | |
|     # the expected state form the list of fetched public IP addresses
 | |
|     def validate_PublicIPAddress(self, public_ip, network, static_nat=False,
 | |
|                                  vm=None):
 | |
|         """Validates the Public IP Address"""
 | |
|         self.debug("Validating the assignment and state of public IP address "
 | |
|                    "- %s" % public_ip.ipaddress.ipaddress)
 | |
|         public_ips = PublicIPAddress.list(self.api_client,
 | |
|                                           id=public_ip.ipaddress.id,
 | |
|                                           networkid=network.id,
 | |
|                                           isstaticnat=static_nat,
 | |
|                                           listall=True
 | |
|                                           )
 | |
|         self.debug("PUBLIC IP = " + public_ips[0])
 | |
|         self.assertEqual(isinstance(public_ips, list), True,
 | |
|                          "List public IP for network should return a "
 | |
|                          "valid list"
 | |
|                          )
 | |
|         self.assertEqual(public_ips[0].ipaddress,
 | |
|                          public_ip.ipaddress.ipaddress,
 | |
|                          "List public IP for network should list the assigned "
 | |
|                          "public IP address"
 | |
|                          )
 | |
|         self.assertEqual(public_ips[0].state, "Allocated",
 | |
|                          "Assigned public IP is not in the allocated state"
 | |
|                          )
 | |
|         if static_nat and vm:
 | |
|             self.assertEqual(public_ips[0].virtualmachineid, vm.id,
 | |
|                              "Static NAT rule is not enabled for the VM on "
 | |
|                              "the assigned public IP"
 | |
|                              )
 | |
|         self.debug("Successfully validated the assignment and state of public "
 | |
|                    "IP address - %s" % public_ip.ipaddress.ipaddress)
 | |
| 
 | |
|     # create_NetworkOffering - Creates Network offering
 | |
|     def create_NetworkOffering(self, net_offering, suffix=None,
 | |
|                                conserve_mode=False):
 | |
|         self.debug("Creating Network offering")
 | |
|         if suffix:
 | |
|             net_offering["name"] = "NET_OFF-" + str(suffix)
 | |
|         nw_off = NetworkOffering.create(self.api_client,
 | |
|                                         net_offering,
 | |
|                                         conservemode=conserve_mode
 | |
|                                         )
 | |
|         # Enable Network offering
 | |
|         nw_off.update(self.api_client, state="Enabled")
 | |
|         self.debug("Created and Enabled Network offering")
 | |
|         self.cleanup.append(nw_off)
 | |
|         return nw_off
 | |
| 
 | |
|     # validate_NetworkOffering - Validates the given Network offering, matches
 | |
|     # the given network offering name and state against the list of network
 | |
|     # offerings fetched
 | |
|     def validate_NetworkOffering(self, net_offering, state=None):
 | |
|         """Validates the Network offering.
 | |
|         :param net_offering: Network Offering to validate
 | |
|         :param state: expected state
 | |
|         :type net_offering: NetworkOffering
 | |
|         :type state: str
 | |
|         """
 | |
|         self.debug("Validating the creation and state of Network offering - %s"
 | |
|                    % net_offering.name)
 | |
|         net_offs = NetworkOffering.list(self.api_client,
 | |
|                                         id=net_offering.id
 | |
|                                         )
 | |
|         self.assertEqual(isinstance(net_offs, list), True,
 | |
|                          "List Network offering should return a valid list"
 | |
|                          )
 | |
|         self.assertEqual(net_offering.name, net_offs[0].name,
 | |
|                          "Name of the Network offering should match with the "
 | |
|                          "returned list data"
 | |
|                          )
 | |
|         if state:
 | |
|             self.assertEqual(net_offs[0].state, state,
 | |
|                              "Network offering state should be '%s'" % state
 | |
|                              )
 | |
|         self.debug("Successfully validated the creation and state of Network "
 | |
|                    "offering - %s" % net_offering.name)
 | |
| 
 | |
|     # create_Network - Creates network with the given Network offering
 | |
|     def create_Network(self, nw_off, gateway="10.1.1.1",
 | |
|                        netmask="255.255.255.0", vpc=None, acl_list=None,
 | |
|                        testdata=None, account=None):
 | |
|         if not account:
 | |
|             account = self.account
 | |
|         self.debug("Creating a network in the account - %s" % account.name)
 | |
|         if not testdata:
 | |
|             testdata = self.test_data["network"]
 | |
|             testdata["name"] = "TestNet-" + gateway + "-" + str(nw_off.name)
 | |
|             testdata["displaytext"] = "Test Network"
 | |
|             testdata["gateway"] = gateway
 | |
|             testdata["netmask"] = netmask
 | |
|         network = Network.create(self.api_client,
 | |
|                                  testdata,
 | |
|                                  accountid=account.name,
 | |
|                                  domainid=account.domainid,
 | |
|                                  networkofferingid=nw_off.id,
 | |
|                                  zoneid=self.zone.id,
 | |
|                                  vpcid=vpc.id if vpc
 | |
|                                  else self.vpc.id if self.vpc
 | |
|                                  else None,
 | |
|                                  aclid=acl_list.id if acl_list else None
 | |
|                                  )
 | |
|         self.debug("Created network with ID - %s" % network.id)
 | |
|         self.cleanup.append(network)
 | |
|         return network
 | |
| 
 | |
|     # create_VpcOffering - Creates VPC offering
 | |
|     def create_VpcOffering(self, vpc_offering, suffix=None):
 | |
|         self.debug("Creating VPC offering")
 | |
|         if suffix:
 | |
|             vpc_offering["name"] = "VPC_OFF-" + str(suffix)
 | |
|         vpc_off = VpcOffering.create(self.api_client,
 | |
|                                      vpc_offering
 | |
|                                      )
 | |
|         # Enable VPC offering
 | |
|         vpc_off.update(self.api_client, state="Enabled")
 | |
|         self.debug("Created and Enabled VPC offering")
 | |
|         self.cleanup.append(vpc_off)
 | |
|         return vpc_off
 | |
| 
 | |
|     # create_Vpc - Creates VPC with the given VPC offering
 | |
|     def create_vpc(self, vpc_offering, cidr='10.1.0.0/16', testdata=None,
 | |
|                    account=None, networkDomain=None):
 | |
|         if not account:
 | |
|             account = self.account
 | |
|         self.debug("Creating a VPC in the account - %s" % account.name)
 | |
|         if not testdata:
 | |
|             testdata = self.test_data["vpc"]
 | |
|             testdata["name"] = "TestVPC-" + cidr + "-" + str(vpc_offering.name)
 | |
|             testdata["displaytext"] = "Test VPC"
 | |
|             testdata["cidr"] = cidr
 | |
|         vpc = VPC.create(self.api_client,
 | |
|                          testdata,
 | |
|                          vpcofferingid=vpc_offering.id,
 | |
|                          zoneid=self.zone.id,
 | |
|                          account=account.name,
 | |
|                          domainid=account.domainid,
 | |
|                          networkDomain=networkDomain
 | |
|                          )
 | |
|         self.debug("Created VPC with ID - %s" % vpc.id)
 | |
|         self.cleanup.append(vpc)
 | |
|         return vpc
 | |
| 
 | |
|     # validate_VpcOffering - Validates the given VPC offering, matches the
 | |
|     # given VPC offering name and state against the list of VPC offerings
 | |
|     # fetched
 | |
|     def validate_VpcOffering(self, vpc_offering, state=None):
 | |
|         """Validates the VPC offering"""
 | |
|         self.debug("Validating the creation and state of VPC offering - %s" %
 | |
|                    vpc_offering.name)
 | |
|         vpc_offs = VpcOffering.list(self.api_client,
 | |
|                                     id=vpc_offering.id
 | |
|                                     )
 | |
|         self.assertEqual(isinstance(vpc_offs, list), True,
 | |
|                          "List VPC offering should return a valid list"
 | |
|                          )
 | |
|         self.assertEqual(vpc_offering.name, vpc_offs[0].name,
 | |
|                          "Name of the VPC offering should match with the "
 | |
|                          "returned list data"
 | |
|                          )
 | |
|         if state:
 | |
|             self.assertEqual(vpc_offs[0].state, state,
 | |
|                              "VPC offering state should be '%s'" % state
 | |
|                              )
 | |
|         self.debug("Successfully validated the creation and state of VPC "
 | |
|                    "offering - %s" % vpc_offering.name)
 | |
| 
 | |
|     # validate_Vpc - Validates the given VPC, matches the given VPC name and
 | |
|     # state against the list of VPCs fetched
 | |
|     def validate_vpc(self, vpc, state=None):
 | |
|         """Validates the VPC"""
 | |
|         self.debug("Validating the creation and state of VPC - %s" % vpc.name)
 | |
|         vpcs = VPC.list(self.api_client,
 | |
|                         id=vpc.id
 | |
|                         )
 | |
|         self.assertEqual(isinstance(vpcs, list), True,
 | |
|                          "List VPC should return a valid list"
 | |
|                          )
 | |
|         self.assertEqual(vpc.name, vpcs[0].name,
 | |
|                          "Name of the VPC should match with the returned "
 | |
|                          "list data"
 | |
|                          )
 | |
|         if state:
 | |
|             self.assertEqual(vpcs[0].state, state,
 | |
|                              "VPC state should be '%s'" % state
 | |
|                              )
 | |
|         self.debug("Successfully validated the creation and state of VPC - %s"
 | |
|                    % vpc.name)
 | |
| 
 | |
|     # ssh_into_VM - Gets into the shell of the given VM using its public IP
 | |
|     def ssh_into_VM(self, vm, public_ip, reconnect=True,
 | |
|                     negative_test=False, keypair=None):
 | |
|         self.debug("SSH into VM with ID - %s on public IP address - %s" %
 | |
|                    (vm.id, public_ip.ipaddress.ipaddress))
 | |
|         tries = 1 if negative_test else 3
 | |
|         private_key_file_location = keypair.private_key_file if keypair else None
 | |
| 
 | |
|         def retry_ssh():
 | |
|             ssh_client = vm.get_ssh_client(
 | |
|                 ipaddress=public_ip.ipaddress.ipaddress,
 | |
|                 reconnect=reconnect,
 | |
|                 keyPairFileLocation=private_key_file_location,
 | |
|                 retries=3 if negative_test else 30
 | |
|             )
 | |
|             self.debug("Successful to SSH into VM with ID - %s on "
 | |
|                        "public IP address - %s" %
 | |
|                        (vm.id, public_ip.ipaddress.ipaddress))
 | |
|             return ssh_client
 | |
| 
 | |
|         return retry_ssh()
 | |
| 
 | |
|     # create_VM - Creates VM in the givsen network(s)
 | |
|     def create_VM(self, network_list, host_id=None, start_vm=True,
 | |
|                   testdata=None, account=None, keypair=None):
 | |
|         network_ids = []
 | |
|         if isinstance(network_list, list):
 | |
|             for network in network_list:
 | |
|                 network_ids.append(str(network.id))
 | |
|         else:
 | |
|             network_ids.append(str(network_list.id))
 | |
|         if not account:
 | |
|             account = self.account
 | |
|         self.debug("Creating VM in network(s) with ID(s) - %s in the "
 | |
|                    "account - %s" % (network_ids, account.name))
 | |
|         if not testdata:
 | |
|             testdata = self.test_data["virtual_machine"]
 | |
|         vm = VirtualMachine.create(self.api_client,
 | |
|                                    testdata,
 | |
|                                    accountid=account.name,
 | |
|                                    domainid=account.domainid,
 | |
|                                    serviceofferingid=self.service_offering.id,
 | |
|                                    templateid=self.template.id,
 | |
|                                    zoneid=self.zone.id,
 | |
|                                    networkids=network_ids,
 | |
|                                    startvm=start_vm,
 | |
|                                    hostid=host_id,
 | |
|                                    keypair=keypair
 | |
|                                    )
 | |
|         self.debug("Created VM with ID - %s in network(s) with ID(s) - %s"
 | |
|                    % (vm.id, network_ids))
 | |
|         self.cleanup.append(vm)
 | |
|         return vm
 | |
| 
 | |
|     def check_VM_state(self, vm, state=None):
 | |
|         """Validates the VM state
 | |
|         Checks if the given VM is in the expected state
 | |
|         from the list of fetched VMs
 | |
|         """
 | |
|         self.debug("Validating the deployment and state of VM - %s" % vm.name)
 | |
|         vms = VirtualMachine.list(self.api_client,
 | |
|                                   id=vm.id,
 | |
|                                   listall=True
 | |
|                                   )
 | |
|         self.assertEqual(isinstance(vms, list), True,
 | |
|                          "List virtual machine should return a valid list"
 | |
|                          )
 | |
|         if state:
 | |
|             self.assertEqual(vms[0].state, state,
 | |
|                              "Virtual machine is not in the expected state"
 | |
|                              )
 | |
|         self.debug("Successfully validated the deployment and state of VM - %s"
 | |
|                    % vm.name)
 | |
| 
 | |
|     def validate_Network(self, network, state=None):
 | |
|         """Validates the network
 | |
|            matches the given network name and state
 | |
|            against the list of networks fetched
 | |
|         """
 | |
|         self.debug("Validating the creation and state of Network - %s" %
 | |
|                    network.name)
 | |
|         networks = Network.list(self.api_client,
 | |
|                                 id=network.id
 | |
|                                 )
 | |
|         self.assertEqual(isinstance(networks, list), True,
 | |
|                          "List network should return a valid list"
 | |
|                          )
 | |
|         self.assertEqual(network.name, networks[0].name,
 | |
|                          "Name of the network should match with the "
 | |
|                          "returned list data"
 | |
|                          )
 | |
|         if state:
 | |
|             self.assertEqual(networks[0].state, state,
 | |
|                              "Network state should be '%s'" % state
 | |
|                              )
 | |
|         self.debug("Successfully validated the creation and state of Network "
 | |
|                    "- %s" % network.name)
 | |
| 
 | |
|     def get_Router(self, network):
 | |
|         """Returns router for the given network"""
 | |
|         self.debug("Finding the virtual router for network with ID - %s" %
 | |
|                    network.id)
 | |
|         routers = Router.list(self.api_client,
 | |
|                               networkid=network.id,
 | |
|                               listall=True
 | |
|                               )
 | |
|         self.assertEqual(isinstance(routers, list), True,
 | |
|                          "List routers should return a valid virtual router "
 | |
|                          "for network"
 | |
|                          )
 | |
|         return routers[0]
 | |
| 
 | |
|     # check_Router_state - Checks if the given router is in the expected state
 | |
|     # form the list of fetched routers
 | |
|     def check_Router_state(self, router=None, network=None, state=None):
 | |
|         """Validates the Router state"""
 | |
| 
 | |
|         if router:
 | |
|             self.debug("Validating the deployment and state of Router - %s" %
 | |
|                        router.name)
 | |
|             routers = Router.list(self.api_client, id=router.id,
 | |
|                                   listall=True)
 | |
|         elif network:
 | |
|             self.debug("Validating the deployment and state of Router "
 | |
|                        "in network - %s" % network.name)
 | |
|             routers = Router.list(self.api_client, networkid=network.id,
 | |
|                                   listall=True)
 | |
|         else:
 | |
|             raise AttributeError("Either router or network "
 | |
|                                  "has to be specified")
 | |
| 
 | |
|         self.assertTrue(isinstance(routers, list),
 | |
|                         "List router should return a valid list")
 | |
|         self.assertTrue(len(routers) > 0,
 | |
|                         "List routers should not return an empty list")
 | |
| 
 | |
|         if state:
 | |
|             self.assertEqual(routers[0].state, state,
 | |
|                              "Virtual router is not in the expected state"
 | |
|                              )
 | |
|         self.debug("Successfully validated the deployment and state of Router "
 | |
|                    "- %s" % routers[0].name)
 | |
| 
 | |
|     def acquire_PublicIPAddress(self, network, vpc=None, account=None):
 | |
|         """Acquires public IP address for the given network/VPC"""
 | |
|         if not account:
 | |
|             account = self.account
 | |
|         self.debug("Associating public IP for network with ID - %s in the "
 | |
|                    "account - %s" % (network.id, account.name))
 | |
|         public_ip = PublicIPAddress.create(self.api_client,
 | |
|                                            accountid=account.name,
 | |
|                                            domainid=account.domainid,
 | |
|                                            zoneid=self.zone.id,
 | |
|                                            networkid=network.id if vpc is None
 | |
|                                            else None,
 | |
|                                            vpcid=vpc.id if vpc
 | |
|                                            else self.vpc.id if self.vpc
 | |
|                                            else None
 | |
|                                            )
 | |
|         self.debug("Associated public IP address - %s with network with ID - "
 | |
|                    "%s" % (public_ip.ipaddress.ipaddress, network.id))
 | |
|         self.cleanup.append(public_ip)
 | |
|         return public_ip
 | |
| 
 | |
|     def migrate_VM(self, vm):
 | |
|         """Migrates VM to another host, if available"""
 | |
|         self.debug("+++ Migrating one of the VMs in the created "
 | |
|                    "VPC Tier network to another host, if available...")
 | |
|         self.debug("Checking if a host is available for migration...")
 | |
|         hosts = Host.listForMigration(self.api_client, virtualmachineid=vm.id)
 | |
|         if hosts:
 | |
|             self.assertEqual(isinstance(hosts, list), True,
 | |
|                              "List hosts should return a valid list"
 | |
|                              )
 | |
|             host = hosts[0]
 | |
|             self.debug("Migrating VM with ID: "
 | |
|                        "%s to Host: %s" % (vm.id, host.id))
 | |
|             try:
 | |
|                 vm.migrate(self.api_client, hostid=host.id)
 | |
|                 vm.password_test = ConfigDriveUtils.PasswordTest(expect_pw=False)
 | |
|             except Exception as e:
 | |
|                 self.fail("Failed to migrate instance, %s" % e)
 | |
|             self.debug("Migrated VM with ID: "
 | |
|                        "%s to Host: %s" % (vm.id, host.id))
 | |
|         else:
 | |
|             self.debug("No host available for migration. "
 | |
|                        "Test requires at-least 2 hosts")
 | |
|         return host
 | |
| 
 | |
|     # create_NetworkAclList - Creates network ACL list in the given VPC
 | |
|     def create_NetworkAclList(self, name, description, vpc):
 | |
|         self.debug("Adding NetworkACL list in VPC with ID - %s" % vpc.id)
 | |
|         return NetworkACLList.create(self.api_client,
 | |
|                                      services={},
 | |
|                                      name=name,
 | |
|                                      description=description,
 | |
|                                      vpcid=vpc.id
 | |
|                                      )
 | |
| 
 | |
|     # create_NetworkAclRule - Creates Ingress/Egress Network ACL rule in the
 | |
|     # given VPC network/acl list
 | |
|     def create_NetworkAclRule(self, rule, traffic_type="Ingress", network=None,
 | |
|                               acl_list=None):
 | |
|         self.debug("Adding NetworkACL rule - %s" % rule)
 | |
|         if acl_list:
 | |
|             return NetworkACL.create(self.api_client,
 | |
|                                      networkid=network.id if network else None,
 | |
|                                      services=rule,
 | |
|                                      traffictype=traffic_type,
 | |
|                                      aclid=acl_list.id
 | |
|                                      )
 | |
|         else:
 | |
|             return NetworkACL.create(self.api_client,
 | |
|                                      networkid=network.id if network else None,
 | |
|                                      services=rule,
 | |
|                                      traffictype=traffic_type
 | |
|                                      )
 | |
| 
 | |
|     # restart_Vpc - Restarts the given VPC with/without cleanup
 | |
|     def restart_Vpc(self, vpc, cleanup=False):
 | |
|         self.debug("Restarting VPC with ID - %s" % vpc.id)
 | |
|         cmd = restartVPC.restartVPCCmd()
 | |
|         cmd.id = vpc.id
 | |
|         cmd.cleanup = cleanup
 | |
|         cmd.makeredundant = False
 | |
|         self.api_client.restartVPC(cmd)
 | |
|         self.debug("Restarted VPC with ID - %s" % vpc.id)
 | |
| 
 | |
|     @attr(tags=["advanced", "isonw"], required_hardware="true")
 | |
|     def test_configdrive_isolated_network(self):
 | |
|         """Test Configdrive as provider for isolated Networks
 | |
|            to provide userdata and password reset functionality
 | |
|         """
 | |
| 
 | |
|         # 1. Given ConfigDrive provider is disabled in zone
 | |
|         #    And a network offering which has
 | |
|         #      user data provided by ConfigDrive
 | |
|         #    Then creating an Isolated Network
 | |
|         #    using that network offering fails
 | |
| 
 | |
|         # 2. Given ConfigDrive provider is enabled in zone
 | |
|         #    And a network offering which has
 | |
|         #    * user data provided by ConfigDrive
 | |
|         #    When I create an Isolated Network using that network offering
 | |
|         #    Then the network is successfully created,
 | |
|         #    And is in the "Allocated" state.
 | |
| 
 | |
|         # 3. When I deploy a VM in the created Isolated network with user data,
 | |
|         #    Then the Isolated network state is changed to "Implemented"
 | |
|         #    And the VM is successfully deployed and is in the "Running" state
 | |
|         #    And there is no VR is deployed.
 | |
|         # 4. And the user data in the ConfigDrive device is as expected
 | |
|         # 5. And the vm password in the ConfigDrive device is as expected
 | |
| 
 | |
|         # 6. When I stop, reset the password, and start the VM
 | |
|         # 7. Then I can login into the VM using the new password.
 | |
|         # 8. And the vm password in the ConfigDrive device is the new one
 | |
| 
 | |
|         # 9. Verify various scenarios and check the data in configdriveIso
 | |
|         # 10. Delete all the created objects (cleanup).
 | |
| 
 | |
|         self.debug("+++ Scenario: creating an Isolated network with "
 | |
|                    "config drive fails when config drive provider is "
 | |
|                    "disabled.")
 | |
|         self.given_config_drive_provider_is("Disabled")
 | |
|         self.given_a_network_offering_with_configdrive()
 | |
|         self.then_creating_a_network_with_that_offering_fails()
 | |
| 
 | |
|         self.debug("+++ Preparation Scenario: "
 | |
|                    "creating an Isolated networks with "
 | |
|                    "config drive when config drive provider is "
 | |
|                    "enabled.")
 | |
| 
 | |
|         self.given_config_drive_provider_is("Enabled")
 | |
| 
 | |
|         create_network1 = self.when_I_create_a_network_with_that_offering()
 | |
|         self.then_the_network_is_successfully_created(create_network1)
 | |
|         self.then_the_network_has(create_network1, state="Allocated")
 | |
| 
 | |
|         create_network2 = self.when_I_create_a_network_with_that_offering()
 | |
|         self.then_the_network_is_successfully_created(create_network2)
 | |
|         self.then_the_network_has(create_network2, state="Allocated")
 | |
| 
 | |
|         network1 = create_network1.network
 | |
|         network2 = create_network2.network
 | |
| 
 | |
|         self.given_template_password_enabled_is(True)
 | |
| 
 | |
|         self.debug("+++Deploy VM in the created Isolated network "
 | |
|                    "with user data provider as configdrive")
 | |
| 
 | |
|         vm1 = self.when_I_deploy_a_vm_with_keypair_in(network1)
 | |
|         public_ip_1 = \
 | |
|             self.when_I_create_a_static_nat_ip_to(vm1, network1)
 | |
| 
 | |
|         self.then_vr_is_as_expected(network1)
 | |
|         self.then_config_drive_is_as_expected(
 | |
|             vm1, public_ip_1,
 | |
|             metadata=True)
 | |
| 
 | |
|         self.update_and_validate_userdata(vm1, "helloworld vm1", public_ip_1)
 | |
|         self.update_and_validate_sshkeypair(vm1, public_ip_1)
 | |
| 
 | |
|         # =====================================================================
 | |
| 
 | |
|         self.debug("Adding a non-default nic to the VM "
 | |
|                    "making it a multi-nic VM...")
 | |
|         self.plug_nic(vm1, network2)
 | |
|         self.then_config_drive_is_as_expected(vm1, public_ip_1,
 | |
|                                               metadata=True, reconnect=False)
 | |
| 
 | |
|         with self.stopped_vm(vm1):
 | |
|             self.when_I_reset_the_password(vm1)
 | |
|             self.when_I_update_userdata(vm1, "hellomultinicvm1")
 | |
| 
 | |
|         self.then_config_drive_is_as_expected(vm1, public_ip_1)
 | |
| 
 | |
|         # =====================================================================
 | |
|         # Test using network2 as default network
 | |
|         # =====================================================================
 | |
| 
 | |
|         self.debug("updating non-default nic as the default nic "
 | |
|                    "of the multi-nic VM and enable staticnat...")
 | |
|         self.update_default_nic(vm1, network2)
 | |
| 
 | |
|         public_ip_2 = \
 | |
|             self.when_I_create_a_static_nat_ip_to(vm1, network2)
 | |
|         self.stop_and_start_vm(vm1)
 | |
|         self.then_config_drive_is_as_expected(vm1, public_ip_2, metadata=True)
 | |
| 
 | |
|         self.when_I_reset_the_password(vm1)
 | |
|         self.then_config_drive_is_as_expected(vm1, public_ip_2)
 | |
| 
 | |
|         user_data = "hellomultinicvm1again"
 | |
|         self.update_and_validate_userdata(vm1, user_data, public_ip_2)
 | |
| 
 | |
|         self.debug("Updating the default nic of the multi-nic VM, "
 | |
|                    "deleting the non-default nic...")
 | |
|         self.update_default_nic(vm1, network1)
 | |
|         self.stop_and_start_vm(vm1)
 | |
|         self.then_config_drive_is_as_expected(vm1, public_ip_1, metadata=True)
 | |
| 
 | |
|         self.delete(public_ip_2)
 | |
|         self.unplug_nic(vm1, network2)
 | |
| 
 | |
|         # =====================================================================
 | |
|         # Another Multinic VM
 | |
|         # =====================================================================
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "Reset password and update userdata on a multi nic vm")
 | |
|         multinicvm1 = self.when_I_deploy_a_vm([network2, network1])
 | |
|         self.when_I_reset_the_password(multinicvm1)
 | |
|         public_ip_3 = self.when_I_create_a_static_nat_ip_to(multinicvm1,
 | |
|                                                             network2)
 | |
|         self.then_config_drive_is_as_expected(
 | |
|             multinicvm1, public_ip_3,
 | |
|             metadata=True)
 | |
| 
 | |
|         user_data2 = "hello multinicvm1"
 | |
|         self.update_and_validate_userdata(multinicvm1, user_data2, public_ip_3)
 | |
| 
 | |
|         self.delete(multinicvm1, expunge=True)
 | |
|         self.delete(public_ip_3)
 | |
|         self.delete(network2)
 | |
| 
 | |
|         # =====================================================================
 | |
|         # Network restart tests
 | |
|         # =====================================================================
 | |
| 
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "verify config drive after restart Isolated network without"
 | |
|                    " cleanup...")
 | |
|         self.when_I_reset_the_password(vm1)
 | |
|         self.when_I_restart_the_network_with(network1, cleanup=False)
 | |
|         self.then_config_drive_is_as_expected(vm1, public_ip_1, metadata=True)
 | |
| 
 | |
|         # =====================================================================
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "verify config drive after restart Isolated network with"
 | |
|                    " cleanup...")
 | |
|         self.when_I_restart_the_network_with(network1, cleanup=True)
 | |
|         self.then_config_drive_is_as_expected(vm1, public_ip_1, metadata=True)
 | |
| 
 | |
|         # =====================================================================
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "update userdata and reset password after reboot")
 | |
|         vm1.reboot(self.api_client)
 | |
|         self.then_config_drive_is_as_expected(vm1, public_ip_1, metadata=True)
 | |
|         self.update_and_validate_userdata(vm1, "hello afterboot", public_ip_1)
 | |
|         self.when_I_reset_the_password(vm1)
 | |
|         self.then_config_drive_is_as_expected(vm1, public_ip_1)
 | |
| 
 | |
|         # =====================================================================
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "update userdata and reset password after migrate")
 | |
|         host = self.migrate_VM(vm1)
 | |
|         vm1.hostname = host.name
 | |
|         self.then_config_drive_is_as_expected(vm1, public_ip_1, metadata=True)
 | |
|         self.debug("Updating userdata after migrating VM - %s" % vm1.name)
 | |
|         self.update_and_validate_userdata(vm1, "hello after migrate",
 | |
|                                           public_ip_1)
 | |
|         self.when_I_reset_the_password(vm1)
 | |
|         self.then_config_drive_is_as_expected(vm1, public_ip_1)
 | |
| 
 | |
|         # =====================================================================
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "update userdata and reset password after stop/start")
 | |
|         self.stop_and_start_vm(vm1)
 | |
|         self.then_config_drive_is_as_expected(vm1, public_ip_1, metadata=True)
 | |
|         self.update_and_validate_userdata(vm1, "hello afterstopstart",
 | |
|                                           public_ip_1)
 | |
|         self.when_I_reset_the_password(vm1)
 | |
|         self.then_config_drive_is_as_expected(vm1, public_ip_1)
 | |
| 
 | |
|         # =====================================================================
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "verify config drive after delete/recover")
 | |
|         self.delete_and_recover_vm(vm1)
 | |
|         self.then_config_drive_is_as_expected(vm1, public_ip_1,
 | |
|                                               metadata=True)
 | |
| 
 | |
|         # =====================================================================
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "Start VM fails when ConfigDrive provider is disabled")
 | |
|         self.given_config_drive_provider_is("Disabled")
 | |
|         with self.assertRaises(Exception):
 | |
|             self.when_I_update_userdata(vm1, "hi with provider state Disabled")
 | |
|         self.given_config_drive_provider_is("Enabled")
 | |
| 
 | |
|         self.delete(vm1, expunge=True)
 | |
| 
 | |
|         # =====================================================================
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "Update Userdata on a VM that is not password enabled")
 | |
|         self.given_template_password_enabled_is(False)
 | |
|         vm1 = self.when_I_deploy_a_vm_with_keypair_in(network1)
 | |
| 
 | |
|         public_ip_1 = \
 | |
|             self.when_I_create_a_static_nat_ip_to(vm1, network1)
 | |
| 
 | |
|         self.update_and_validate_userdata(vm1,
 | |
|                                           "This is sample data",
 | |
|                                           public_ip_1,
 | |
|                                           metadata=True)
 | |
| 
 | |
|         self.delete(vm1, expunge=True)
 | |
|         self.delete(network1)
 | |
| 
 | |
|     @attr(tags=["advanced", "vpc"], required_hardware="true")
 | |
|     def test_configdrive_vpc_network(self):
 | |
|         """Test Configdrive for VPC Networks
 | |
|            choose user data with configDrive as service provider
 | |
|            and test password reset functionality using ConfigDrive
 | |
|         """
 | |
| 
 | |
|         # 1. Given ConfigDrive provider is disabled in zone
 | |
|         #    And a network offering for VPC which has
 | |
|         #      user data provided by ConfigDrive
 | |
|         #    And a VPC
 | |
|         #    Then creating an VPC Tier in the VPC
 | |
|         #    using that network offering fails
 | |
| 
 | |
|         # 2. Given ConfigDrive provider is enabled in zone
 | |
|         #    And a network offering for VPC which has
 | |
|         #      user data provided by ConfigDrive
 | |
|         #    And a VPC
 | |
|         #    When I create an VPC Tier in the VPC  using that network offering
 | |
|         #    Then the network is successfully created,
 | |
|         #    And is in the "Allocated" state.
 | |
| 
 | |
|         # 3. When I deploy a VM in the created VPC tier with user data,
 | |
|         #    Then the network state is changed to "Implemented"
 | |
|         #    And the VM is successfully deployed and is in the "Running" state
 | |
| 
 | |
|         # 4. And the user data in the ConfigDrive device is as expected
 | |
|         # 5. And the vm password in the ConfigDrive device is as expected
 | |
| 
 | |
|         # 6. When I stop, reset the password, and start the VM
 | |
|         # 7. Then I can login into the VM using the new password.
 | |
|         # 8. And the vm password in the ConfigDrive device is the new one
 | |
| 
 | |
|         # 9. Verify various scenarios and check the data in configdriveIso
 | |
|         # 10. Delete all the created objects (cleanup).
 | |
| 
 | |
|         self.debug("+++ Scenario: creating an VPC tier with "
 | |
|                    "config drive fails when config drive provider is "
 | |
|                    "disabled.")
 | |
|         self.given_a_vpc()
 | |
|         self.given_config_drive_provider_is("Disabled")
 | |
|         self.given_a_network_offering_for_vpc_with_configdrive()
 | |
|         self.then_creating_a_vpc_tier_with_that_offering_fails()
 | |
| 
 | |
|         self.debug("+++ Preparation Scenario: "
 | |
|                    "Create 2 tier with config drive "
 | |
|                    "when config drive provider is enabled.")
 | |
| 
 | |
|         self.given_config_drive_provider_is("Enabled")
 | |
| 
 | |
|         create_network1 = self.when_I_create_a_vpc_tier_with_that_offering(
 | |
|             gateway='10.1.1.1')
 | |
|         self.then_the_network_is_successfully_created(create_network1)
 | |
|         self.then_the_network_has(create_network1, state="Implemented")
 | |
| 
 | |
|         create_network2 = self.when_I_create_a_vpc_tier_with_that_offering(
 | |
|             gateway='10.1.2.1')
 | |
|         self.then_the_network_is_successfully_created(create_network2)
 | |
|         self.then_the_network_has(create_network2, state="Implemented")
 | |
| 
 | |
|         network1 = create_network1.network
 | |
|         network2 = create_network2.network
 | |
| 
 | |
|         self.given_template_password_enabled_is(True)
 | |
| 
 | |
|         # =====================================================================
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "Deploy VM in the Tier 1 with user data")
 | |
|         vm = self.when_I_deploy_a_vm(network1,
 | |
|                                      keypair=self.keypair.name)
 | |
|         public_ip_1 = \
 | |
|             self.when_I_create_a_static_nat_ip_to(vm, network1)
 | |
|         self.then_config_drive_is_as_expected(vm, public_ip_1, metadata=True)
 | |
| 
 | |
|         self.update_and_validate_userdata(vm, "helloworld vm1",
 | |
|                                           public_ip_1,
 | |
|                                           metadata=True)
 | |
| 
 | |
|         self.when_I_reset_the_password(vm)
 | |
|         self.then_config_drive_is_as_expected(vm, public_ip_1)
 | |
| 
 | |
|         self.update_and_validate_sshkeypair(vm, public_ip_1)
 | |
| 
 | |
|         # =====================================================================
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "Restarting the created vpc without cleanup...")
 | |
|         self.restart_Vpc(self.vpc, cleanup=False)
 | |
|         self.validate_vpc(self.vpc, state="Enabled")
 | |
|         self.then_config_drive_is_as_expected(vm, public_ip_1, metadata=True)
 | |
| 
 | |
|         # =====================================================================
 | |
|         self.debug("Adding a non-default nic to the VM "
 | |
|                    "making it a multi-nic VM...")
 | |
|         self.plug_nic(vm, network2)
 | |
|         self.then_config_drive_is_as_expected(vm, public_ip_1,
 | |
|                                               metadata=True, reconnect=False)
 | |
| 
 | |
|         with self.stopped_vm(vm):
 | |
|             self.when_I_reset_the_password(vm)
 | |
|             self.when_I_update_userdata(vm, "hellomultinicvm1")
 | |
| 
 | |
|         self.then_config_drive_is_as_expected(vm, public_ip_1, metadata=True)
 | |
| 
 | |
|         self.unplug_nic(vm, network2)
 | |
|         self.delete(network2)
 | |
| 
 | |
|         # =====================================================================
 | |
|         # Network restart tests
 | |
|         # =====================================================================
 | |
| 
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "verify config drive after Restart VPC with cleanup...")
 | |
|         self.when_I_restart_the_vpc_with(cleanup=True)
 | |
|         self.then_config_drive_is_as_expected(vm, public_ip_1,
 | |
|                                               metadata=True, reconnect=False)
 | |
| 
 | |
|         # =====================================================================
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "verify config drive after Restart VPC without cleanup...")
 | |
|         self.when_I_restart_the_network_with(network1, cleanup=False)
 | |
|         self.then_config_drive_is_as_expected(vm, public_ip_1,
 | |
|                                               metadata=True, reconnect=False)
 | |
| 
 | |
|         # =====================================================================
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "verify config drive after restart tier with cleanup...")
 | |
|         self.when_I_restart_the_network_with(network1, cleanup=True)
 | |
|         self.then_config_drive_is_as_expected(vm, public_ip_1,
 | |
|                                               metadata=True, reconnect=False)
 | |
| 
 | |
|         # =====================================================================
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "update userdata and reset password after reboot")
 | |
|         vm.reboot(self.api_client)
 | |
|         self.then_config_drive_is_as_expected(vm, public_ip_1, metadata=True)
 | |
|         self.update_and_validate_userdata(vm, "hello reboot", public_ip_1)
 | |
| 
 | |
|         self.when_I_reset_the_password(vm)
 | |
|         self.then_config_drive_is_as_expected(vm, public_ip_1)
 | |
| 
 | |
|         # =====================================================================
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "update userdata and reset password after migrate")
 | |
|         host = self.migrate_VM(vm)
 | |
|         vm.hostname = host.name
 | |
|         self.then_config_drive_is_as_expected(vm, public_ip_1, metadata=True)
 | |
|         self.update_and_validate_userdata(vm, "hello migrate", public_ip_1)
 | |
| 
 | |
|         self.when_I_reset_the_password(vm)
 | |
|         self.then_config_drive_is_as_expected(vm, public_ip_1)
 | |
| 
 | |
|         # =====================================================================
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "update userdata and reset password after stop/start")
 | |
|         self.stop_and_start_vm(vm)
 | |
|         self.then_config_drive_is_as_expected(vm, public_ip_1, metadata=True)
 | |
| 
 | |
|         self.update_and_validate_userdata(vm, "hello stop/start", public_ip_1)
 | |
| 
 | |
|         self.when_I_reset_the_password(vm)
 | |
|         self.then_config_drive_is_as_expected(vm, public_ip_1)
 | |
| 
 | |
|         # =====================================================================
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "verify config drive after delete/recover")
 | |
|         self.delete_and_recover_vm(vm)
 | |
|         self.then_config_drive_is_as_expected(vm, public_ip_1, metadata=True)
 | |
| 
 | |
|         # =====================================================================
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "Verify configdrive when template is not password enabled")
 | |
|         self.given_config_drive_provider_is("Disabled")
 | |
|         self.then_config_drive_is_as_expected(vm, public_ip_1,
 | |
|                                               metadata=True, reconnect=False)
 | |
|         self.given_config_drive_provider_is("Enabled")
 | |
| 
 | |
|         self.delete(vm, expunge=True)
 | |
| 
 | |
|         # =====================================================================
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "Update Userdata on a VM that is not password enabled")
 | |
| 
 | |
|         self.given_template_password_enabled_is(False)
 | |
| 
 | |
|         vm = self.when_I_deploy_a_vm(network1,
 | |
|                                      keypair=self.keypair.name)
 | |
|         public_ip_1 = \
 | |
|             self.when_I_create_a_static_nat_ip_to(vm, network1)
 | |
|         self.update_and_validate_userdata(vm, "This is sample data",
 | |
|                                           public_ip_1,
 | |
|                                           metadata=True)
 | |
| 
 | |
|         self.delete(vm, expunge=True)
 | |
|         self.delete(network1)
 | |
| 
 | |
|     @attr(tags=["advanced", "shared"], required_hardware="true")
 | |
|     def test_configdrive_shared_network(self):
 | |
|         """Test Configdrive as provider for shared Networks
 | |
|            to provide userdata and password reset functionality
 | |
|         """
 | |
| 
 | |
|         # 1. When ConfigDrive is disabled as provider in zone
 | |
|         #    Verify Shared Network creation with a network offering
 | |
|         #    which has userdata provided by ConfigDrive fails
 | |
|         # 2. When ConfigDrive is enabled as provider in zone
 | |
|         #    Create a shared Network with Isolated Network
 | |
|         #    offering specifying ConfigDrive as serviceProvider
 | |
|         #    for userdata.
 | |
|         #    check if it is successfully created and
 | |
|         #    is in the "Setup" state.
 | |
|         # 3. Deploy a VM in the created Shared network with user data,
 | |
|         #    check if the Shared network state is changed to
 | |
|         #    "Implemented", and the VM is successfully deployed and
 | |
|         #    is in the "Running" state.
 | |
|         # 4. Verify that the guest VM's password in the iso.
 | |
|         # 5. Reset VM password, and start the VM.
 | |
|         # 6. Verify that the new guest VM template is password enabled by
 | |
|         #     checking the VM's password (password != "password").
 | |
|         # 7. Verify various scenarios and check the data in configdriveIso
 | |
|         # 8. Delete all the created objects (cleanup).
 | |
| 
 | |
|         self.debug("+++Testing configdrive in an shared network fails..."
 | |
|                    "as provider configdrive is still disabled...")
 | |
|         self.given_config_drive_provider_is("Disabled")
 | |
|         shared_test_data = self.test_data["acl"]["network_all_1"]
 | |
|         shared_network = self.verify_network_creation(
 | |
|             offering_name="shared_network_config_drive_offering",
 | |
|             testdata=shared_test_data)
 | |
|         self.assertFalse(shared_network.success,
 | |
|                          'Network found success = %s, expected success =%s'
 | |
|                          % (str(shared_network.success), 'False'))
 | |
| 
 | |
|         self.given_config_drive_provider_is("Enabled")
 | |
|         shared_network = self.verify_network_creation(
 | |
|             offering=shared_network.offering, testdata=shared_test_data)
 | |
|         self.assertTrue(shared_network.success,
 | |
|                         'Network found success = %s, expected success = %s'
 | |
|                         % (str(shared_network.success), 'True'))
 | |
| 
 | |
|         self.validate_Network(shared_network.network, state="Setup")
 | |
| 
 | |
|         shared_test_data2 = self.test_data["acl"]["network_all_2"]
 | |
|         shared_network2 = self.verify_network_creation(
 | |
|             offering=shared_network.offering,
 | |
|             testdata=shared_test_data2)
 | |
|         self.assertTrue(shared_network2.success,
 | |
|                         'Network found success = %s, expected success = %s'
 | |
|                         % (str(shared_network2.success), 'True'))
 | |
| 
 | |
|         self.validate_Network(shared_network2.network, state="Setup")
 | |
| 
 | |
|         self.debug("+++Test user data & password reset functionality "
 | |
|                    "using configdrive in an Isolated network")
 | |
| 
 | |
|         self.given_template_password_enabled_is(True)
 | |
| 
 | |
|         self.generate_ssh_keys()
 | |
|         self.debug("keypair name %s " % self.keypair.name)
 | |
| 
 | |
|         self.debug("+++Deploy of a VM on a shared network with multiple "
 | |
|                    "ip ranges, all should have the same value for the "
 | |
|                    "underlay flag.")
 | |
|         # Add subnet of different gateway
 | |
|         self.debug("+++ Adding subnet of different gateway")
 | |
| 
 | |
|         self.add_subnet_to_shared_network_and_verify(shared_network.network,
 | |
|                                                      self.test_data[
 | |
|                                                          "publiciprange2"])
 | |
|         self.test_data["virtual_machine"]["ipaddress"] = \
 | |
|             self.test_data["acl"]["network_all_1"]["endip"]
 | |
| 
 | |
|         # with self.assertRaises(Exception):
 | |
|         #         self.create_VM(
 | |
|         #             [shared_network.network],
 | |
|         #             testdata=self.test_data["virtual_machine_userdata"])
 | |
| 
 | |
|         self.debug("+++ In a shared network with multiple ip ranges, "
 | |
|                    "userdata with config drive must be allowed.")
 | |
| 
 | |
|         vm1 = self.create_VM(
 | |
|             [shared_network.network],
 | |
|             testdata=self.test_data["virtual_machine_userdata"],
 | |
|             keypair=self.keypair.name)
 | |
|         # Check VM
 | |
|         self.check_VM_state(vm1, state="Running")
 | |
|         self.check_Router_state(network=shared_network.network,
 | |
|                                 state="Running")
 | |
| 
 | |
|         self.when_I_update_userdata(vm1, "helloworld vm1")
 | |
| 
 | |
|         self.debug("Adding a non-default nic to the VM "
 | |
|                    "making it a multi-nic VM...")
 | |
|         self.plug_nic(vm1, shared_network2.network)
 | |
|         self.when_I_reset_the_password(vm1)
 | |
| 
 | |
|         self.debug("updating non-default nic as the default nic "
 | |
|                    "of the multi-nic VM...")
 | |
|         self.update_default_nic(vm1, shared_network2.network)
 | |
|         self.when_I_reset_the_password(vm1)
 | |
| 
 | |
|         self.when_I_update_userdata(vm1, "hellomultinicvm1")
 | |
| 
 | |
|         self.debug("Updating the default nic of the multi-nic VM, "
 | |
|                    "deleting the non-default nic...")
 | |
|         self.update_default_nic(vm1, shared_network.network)
 | |
|         vm1.stop(self.api_client)
 | |
|         vm1.start(self.api_client)
 | |
| 
 | |
|         self.unplug_nic(vm1,
 | |
|                         shared_network2.network)
 | |
|         self.delete(shared_network2.network)
 | |
| 
 | |
|         self.debug("+++ When template is not password enabled, "
 | |
|                    "verify configdrive of VM - %s" % vm1.name)
 | |
|         self.delete(vm1, expunge=True)
 | |
| 
 | |
|         self.given_config_drive_provider_is("Enabled")
 | |
|         self.given_template_password_enabled_is(False)
 | |
| 
 | |
|         vm1 = self.create_VM(
 | |
|             [shared_network.network],
 | |
|             testdata=self.test_data["virtual_machine_userdata"],
 | |
|             keypair=self.keypair.name)
 | |
|         self.delete(vm1, expunge=True)
 | |
|         self.delete(shared_network.network)
 | |
| 
 | |
|     @attr(tags=["advanced", "isonw"], required_hardware="true")
 | |
|     def test_configdrive_isolated_network_hypervisor_hostname_exposed(self):
 | |
|         """Test Configdrive as provider for isolated Networks
 | |
|            to provide userdata and password reset functionality
 | |
|         """
 | |
| 
 | |
|         # 1. Given a ConfigDrive provider and a network offering
 | |
|         #    which has userdata provided ConfigDrive, create
 | |
|         #    an Isolated network using that network offering.
 | |
|         #    Verify network is successfully created and in the Allocated state
 | |
|         #    Set the "global.allow.expose.host.hostname" and "account.allow.expose.host.hostname" flags to true
 | |
|         #    to enable viewing hypevisor host name in the metadata file
 | |
|         #    Deploy VM in the network created, verify metadata in the configdrive
 | |
|         #    my mounting the configdrive ISO and verify the respective files
 | |
|         #
 | |
|         # 2. Create another Isolated network and plug NIC of the VM to this network
 | |
|         #    make it the default NIC, verify the metadata file contents.
 | |
|         #
 | |
|         # 3. Remove the default NIC, reboot the VM and verify the metadata file contents
 | |
|         #
 | |
|         # 4. Restart network without cleanup and verify the metadata file contents
 | |
|         #
 | |
|         # 5. Restart the network with cleanup and verify the metadata file contents
 | |
|         # 6. Migrate the VM to another host and verify the metadata file contents
 | |
|         # 10. Delete all the created objects (cleanup).
 | |
| 
 | |
|         self.debug("+++ Preparation Scenario: "
 | |
|                    "creating an Isolated networks with "
 | |
|                    "config drive when config drive provider is "
 | |
|                    "enabled.")
 | |
| 
 | |
|         self.given_config_drive_provider_is("Enabled")
 | |
|         self.given_a_network_offering_with_configdrive()
 | |
| 
 | |
|         create_network1 = self.when_I_create_a_network_with_that_offering()
 | |
|         self.then_the_network_is_successfully_created(create_network1)
 | |
|         self.then_the_network_has(create_network1, state="Allocated")
 | |
| 
 | |
|         network1 = create_network1.network
 | |
| 
 | |
|         # Update global setting for "allow.expose.host.hostname"
 | |
|         Configurations.update(self.api_client,
 | |
|                               name="global.allow.expose.host.hostname",
 | |
|                               value="true"
 | |
|                               )
 | |
| 
 | |
|         # Update Account level setting
 | |
|         Configurations.update(self.api_client,
 | |
|                               name="account.allow.expose.host.hostname",
 | |
|                               value="true"
 | |
|                               )
 | |
| 
 | |
|         # Verify that the above mentioned settings are set to true before proceeding
 | |
|         if not is_config_suitable(
 | |
|                 apiclient=self.api_client,
 | |
|                 name='global.allow.expose.host.hostname',
 | |
|                 value='true'):
 | |
|             self.skipTest('global.allow.expose.host.hostname should be true. skipping')
 | |
| 
 | |
|         if not is_config_suitable(
 | |
|                 apiclient=self.api_client,
 | |
|                 name='account.allow.expose.host.hostname',
 | |
|                 value='true'):
 | |
|             self.skipTest('Account level setting account.allow.expose.host.hostname should be true. skipping')
 | |
| 
 | |
|         self.debug("+++Deploy VM in the created Isolated network "
 | |
|                    "with user data provider as configdrive")
 | |
| 
 | |
|         self.given_template_password_enabled_is(True)
 | |
|         vm1 = self.when_I_deploy_a_vm(network1)
 | |
| 
 | |
|         public_ip_1 = self.when_I_create_a_static_nat_ip_to(vm1, network1)
 | |
| 
 | |
|         self.then_vr_is_as_expected(network1)
 | |
|         self.then_config_drive_is_as_expected(
 | |
|             vm1, public_ip_1,
 | |
|             metadata=True)
 | |
| 
 | |
|         # =====================================================================
 | |
|         # Network restart tests
 | |
|         # =====================================================================
 | |
| 
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "verify config drive after restart Isolated network without"
 | |
|                    " cleanup...")
 | |
|         self.when_I_restart_the_network_with(network1, cleanup=False)
 | |
|         self.then_config_drive_is_as_expected(vm1, public_ip_1, metadata=True)
 | |
| 
 | |
|         # =====================================================================
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "verify config drive after restart Isolated network with"
 | |
|                    " cleanup...")
 | |
|         self.when_I_restart_the_network_with(network1, cleanup=True)
 | |
|         self.then_config_drive_is_as_expected(vm1, public_ip_1, metadata=True)
 | |
| 
 | |
|         # =====================================================================
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "verify vm metadata after migrate")
 | |
|         host = self.migrate_VM(vm1)
 | |
|         vm1.hostname = host.name
 | |
|         self.then_config_drive_is_as_expected(vm1, public_ip_1, metadata=True)
 | |
| 
 | |
|         # Reset configuration values to default values i.e., false
 | |
|         Configurations.update(self.api_client,
 | |
|                               name="global.allow.expose.host.hostname",
 | |
|                               value="false"
 | |
|                               )
 | |
| 
 | |
|         # Update Account level setting
 | |
|         Configurations.update(self.api_client,
 | |
|                               name="account.allow.expose.host.hostname",
 | |
|                               value="false"
 | |
|                               )
 | |
| 
 | |
|         self.delete(vm1, expunge=True)
 | |
|         self.delete(network1)
 | |
| 
 | |
|     @attr(tags=["advanced", "vpc"], required_hardware="true")
 | |
|     def test_configdrive_vpc_network_verify_metadata(self):
 | |
|         """Test Configdrive for VPC Networks
 | |
|            choose user data with configDrive as service provider
 | |
|            and test vmdata functionality using ConfigDrive
 | |
|         """
 | |
| 
 | |
|         # 1. Given ConfigDrive provider is enabled in zone
 | |
|         #    And a network offering for VPC which has
 | |
|         #      user data provided by ConfigDrive
 | |
|         #    And a VPC
 | |
|         #    When I create an VPC Tier in the VPC  using that network offering
 | |
|         #    Then the network is successfully created,
 | |
|         #    And is in the "Allocated" state.
 | |
| 
 | |
|         # 2. When I deploy a VM in the created VPC tier with user data,
 | |
|         #    Then the network state is changed to "Implemented"
 | |
|         #    And the VM is successfully deployed and is in the "Running" state
 | |
| 
 | |
|         # 3. And the user data in the ConfigDrive device is as expected
 | |
|         # 4. Verify various scenarios and check the data in configdriveIso
 | |
|         # 5. Delete all the created objects (cleanup).
 | |
| 
 | |
|         self.debug("+++ Preparation Scenario: "
 | |
|                    "Create a tier with config drive "
 | |
|                    "when config drive provider is enabled.")
 | |
| 
 | |
|         self.given_a_vpc()
 | |
|         self.given_config_drive_provider_is("Enabled")
 | |
|         self.given_a_network_offering_for_vpc_with_configdrive()
 | |
|         create_network1 = self.when_I_create_a_vpc_tier_with_that_offering(
 | |
|             gateway='10.1.1.1')
 | |
|         self.then_the_network_is_successfully_created(create_network1)
 | |
|         self.then_the_network_has(create_network1, state="Implemented")
 | |
| 
 | |
|         network1 = create_network1.network
 | |
| 
 | |
|         # Update global setting for "allow.expose.host.hostname"
 | |
|         Configurations.update(self.api_client,
 | |
|                               name="global.allow.expose.host.hostname",
 | |
|                               value="true"
 | |
|                               )
 | |
| 
 | |
|         # Update Account level setting
 | |
|         Configurations.update(self.api_client,
 | |
|                               name="account.allow.expose.host.hostname",
 | |
|                               value="true"
 | |
|                               )
 | |
| 
 | |
|         # Enable domain in metadata
 | |
|         Configurations.update(self.api_client,
 | |
|                               name="metadata.allow.expose.domain",
 | |
|                               value="true"
 | |
|                               )
 | |
| 
 | |
|         # Verify that the above mentioned settings are set to true before proceeding
 | |
|         if not is_config_suitable(
 | |
|                 apiclient=self.api_client,
 | |
|                 name='global.allow.expose.host.hostname',
 | |
|                 value='true'):
 | |
|             self.skipTest('global.allow.expose.host.hostname should be true. skipping')
 | |
| 
 | |
|         if not is_config_suitable(
 | |
|                 apiclient=self.api_client,
 | |
|                 name='account.allow.expose.host.hostname',
 | |
|                 value='true'):
 | |
|             self.skipTest('Account level setting account.allow.expose.host.hostname should be true. skipping')
 | |
| 
 | |
|         if not is_config_suitable(
 | |
|                 apiclient=self.api_client,
 | |
|                 name='metadata.allow.expose.domain',
 | |
|                 value='true'):
 | |
|             self.skipTest('metadata.allow.expose.domain should be true. skipping')
 | |
| 
 | |
|         # =====================================================================
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "Deploy VM in the Tier 1 with user data")
 | |
|         self.given_template_password_enabled_is(True)
 | |
|         vm = self.when_I_deploy_a_vm(network1)
 | |
|         public_ip_1 = self.when_I_create_a_static_nat_ip_to(vm, network1)
 | |
| 
 | |
|         self.then_config_drive_is_as_expected(vm, public_ip_1, metadata=True)
 | |
| 
 | |
|         # =====================================================================
 | |
|         # Network restart tests
 | |
|         # =====================================================================
 | |
| 
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "verify config drive after Restart VPC with cleanup...")
 | |
|         self.when_I_restart_the_vpc_with(cleanup=True)
 | |
|         self.then_config_drive_is_as_expected(vm, public_ip_1,
 | |
|                                               metadata=True, reconnect=False)
 | |
| 
 | |
|         # =====================================================================
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "verify config drive after Restart VPC without cleanup...")
 | |
|         self.when_I_restart_the_network_with(network1, cleanup=False)
 | |
|         self.then_config_drive_is_as_expected(vm, public_ip_1,
 | |
|                                               metadata=True, reconnect=False)
 | |
| 
 | |
|         # =====================================================================
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "verify config drive after restart tier with cleanup...")
 | |
|         self.when_I_restart_the_network_with(network1, cleanup=True)
 | |
|         self.then_config_drive_is_as_expected(vm, public_ip_1,
 | |
|                                               metadata=True, reconnect=False)
 | |
| 
 | |
|         # =====================================================================
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "validate vm metadata after reboot")
 | |
|         vm.reboot(self.api_client)
 | |
|         self.then_config_drive_is_as_expected(vm, public_ip_1, metadata=True)
 | |
| 
 | |
|         # =====================================================================
 | |
|         self.debug("+++ Scenario: "
 | |
|                    "validate updated userdata after migrate")
 | |
|         time.sleep(30)
 | |
|         host = self.migrate_VM(vm)
 | |
|         vm.hostname = host.name
 | |
|         self.then_config_drive_is_as_expected(vm, public_ip_1, metadata=True)
 | |
| 
 | |
|         # Reset configuration values to default values i.e., false
 | |
|         Configurations.update(self.api_client,
 | |
|                               name="global.allow.expose.host.hostname",
 | |
|                               value="false"
 | |
|                               )
 | |
| 
 | |
|         # Update Account level setting
 | |
|         Configurations.update(self.api_client,
 | |
|                               name="account.allow.expose.host.hostname",
 | |
|                               value="false"
 | |
|                               )
 | |
| 
 | |
|         # Disable domain in metadata
 | |
|         Configurations.update(self.api_client,
 | |
|                               name="metadata.allow.expose.domain",
 | |
|                               value="false"
 | |
|                               )
 | |
| 
 | |
|         self.delete(vm, expunge=True)
 | |
|         self.delete(network1)
 |