cloudstack/test/integration/component/test_security_groups.py
Prasanna Santhanam a92abaa916 multiple fixes to regression tests in test/component
- invalid variable references
- syntax/indentation errors
- self/cls reference errors
- some logic fixes in redundant router suite

Signed-off-by: Prasanna Santhanam <tsp@apache.org>
2013-05-14 11:42:34 +05:30

1656 lines
66 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
""" P1 for Security groups
"""
#Import Local Modules
import marvin
from nose.plugins.attrib import attr
from marvin.cloudstackTestCase import *
from marvin.cloudstackAPI import *
from marvin.integration.lib.utils import *
from marvin.integration.lib.base import *
from marvin.integration.lib.common import *
from marvin.remoteSSHClient import remoteSSHClient
#Import System modules
import time
import subprocess
class Services:
"""Test Security groups Services
"""
def __init__(self):
self.services = {
"disk_offering": {
"displaytext": "Small",
"name": "Small",
"disksize": 1
},
"account": {
"email": "test@test.com",
"firstname": "Test",
"lastname": "User",
"username": "test",
# Random characters are appended in create account to
# ensure unique username generated each time
"password": "password",
},
"virtual_machine": {
# Create a small virtual machine instance with disk offering
"displayname": "Test VM",
"username": "root", # VM creds for SSH
"password": "password",
"ssh_port": 22,
"hypervisor": 'XenServer',
"privateport": 22,
"publicport": 22,
"protocol": 'TCP',
"userdata": 'This is sample data',
},
"host": {
"publicport": 22,
"username": "root", # Host creds for SSH
"password": "password",
},
"service_offering": {
"name": "Tiny Instance",
"displaytext": "Tiny Instance",
"cpunumber": 1,
"cpuspeed": 100, # in MHz
"memory": 128, # In MBs
},
"security_group": {
"name": 'SSH',
"protocol": 'TCP',
"startport": 22,
"endport": 22,
"cidrlist": '0.0.0.0/0',
},
"security_group_2": {
"name": 'ICMP',
"protocol": 'ICMP',
"startport": -1,
"endport": -1,
"cidrlist": '0.0.0.0/0',
},
"ostype": 'CentOS 5.3 (64-bit)',
# CentOS 5.3 (64-bit)
"sleep": 60,
"timeout": 10,
}
class TestDefaultSecurityGroup(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
return
def tearDown(self):
try:
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = super(TestDefaultSecurityGroup, cls).getClsTestClient().getApiClient()
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
cls.zone = get_zone(cls.api_client, cls.services)
cls.services['mode'] = cls.zone.networktype
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostype"]
)
cls.services["domainid"] = cls.domain.id
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
cls.services["virtual_machine"]["template"] = template.id
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
cls.account = Account.create(
cls.api_client,
cls.services["account"],
admin=True,
domainid=cls.domain.id
)
cls.services["account"] = cls.account.name
cls._cleanup = [
cls.account,
cls.service_offering
]
return
@classmethod
def tearDownClass(cls):
try:
cls.api_client = super(TestDefaultSecurityGroup, cls).getClsTestClient().getApiClient()
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
@attr(tags = ["sg", "eip"])
def test_01_deployVM_InDefaultSecurityGroup(self):
"""Test deploy VM in default security group
"""
# Validate the following:
# 1. deploy Virtual machine using admin user
# 2. listVM should show a VM in Running state
# 3. listRouters should show one router running
self.virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id
)
self.debug("Deployed VM with ID: %s" % self.virtual_machine.id)
self.cleanup.append(self.virtual_machine)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.virtual_machine.id
)
self.debug(
"Verify listVirtualMachines response for virtual machine: %s" \
% self.virtual_machine.id
)
self.assertEqual(
isinstance(list_vm_response, list),
True,
"Check for list VM response"
)
vm_response = list_vm_response[0]
self.assertNotEqual(
len(list_vm_response),
0,
"Check VM available in List Virtual Machines"
)
self.assertEqual(
vm_response.id,
self.virtual_machine.id,
"Check virtual machine id in listVirtualMachines"
)
self.assertEqual(
vm_response.displayname,
self.virtual_machine.displayname,
"Check virtual machine displayname in listVirtualMachines"
)
# Verify List Routers response for account
self.debug(
"Verify list routers response for account: %s" \
% self.account.name
)
routers = list_routers(
self.apiclient,
zoneid=self.zone.id,
listall=True
)
self.assertEqual(
isinstance(routers, list),
True,
"Check for list Routers response"
)
self.debug("Router Response: %s" % routers)
self.assertEqual(
len(routers),
1,
"Check virtual router is created for account or not"
)
return
@attr(tags = ["sg", "eip"])
def test_02_listSecurityGroups(self):
"""Test list security groups for admin account
"""
# Validate the following:
# 1. listSecurityGroups in admin account
# 2. There should be one security group (default) listed for the admin account
# 3. No Ingress Rules should be part of the default security group
sercurity_groups = SecurityGroup.list(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(sercurity_groups, list),
True,
"Check for list security groups response"
)
self.assertNotEqual(
len(sercurity_groups),
0,
"Check List Security groups response"
)
self.debug("List Security groups response: %s" %
str(sercurity_groups))
self.assertEqual(
hasattr(sercurity_groups, 'ingressrule'),
False,
"Check ingress rule attribute for default security group"
)
return
@attr(tags = ["sg", "eip"])
def test_03_accessInDefaultSecurityGroup(self):
"""Test access in default security group
"""
# Validate the following:
# 1. deploy Virtual machine using admin user
# 2. listVM should show a VM in Running state
# 3. listRouters should show one router running
self.virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id
)
self.debug("Deployed VM with ID: %s" % self.virtual_machine.id)
self.cleanup.append(self.virtual_machine)
list_vm_response = list_virtual_machines(
self.apiclient,
id=self.virtual_machine.id
)
self.assertEqual(
isinstance(list_vm_response, list),
True,
"Check for list VM response"
)
self.debug(
"Verify listVirtualMachines response for virtual machine: %s" \
% self.virtual_machine.id
)
vm_response = list_vm_response[0]
self.assertNotEqual(
len(list_vm_response),
0,
"Check VM available in List Virtual Machines"
)
self.assertEqual(
vm_response.id,
self.virtual_machine.id,
"Check virtual machine id in listVirtualMachines"
)
self.assertEqual(
vm_response.displayname,
self.virtual_machine.displayname,
"Check virtual machine displayname in listVirtualMachines"
)
# Default Security group should not have any ingress rule
sercurity_groups = SecurityGroup.list(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(sercurity_groups, list),
True,
"Check for list security groups response"
)
self.debug("List Security groups response: %s" %
str(sercurity_groups))
self.assertNotEqual(
len(sercurity_groups),
0,
"Check List Security groups response"
)
self.assertEqual(
hasattr(sercurity_groups, 'ingressrule'),
False,
"Check ingress rule attribute for default security group"
)
# SSH Attempt to VM should fail
with self.assertRaises(Exception):
self.debug("SSH into VM: %s" % self.virtual_machine.ssh_ip)
ssh = remoteSSHClient.remoteSSHClient(
self.virtual_machine.ssh_ip,
self.virtual_machine.ssh_port,
self.virtual_machine.username,
self.virtual_machine.password
)
return
class TestAuthorizeIngressRule(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
return
def tearDown(self):
try:
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = super(TestAuthorizeIngressRule, cls).getClsTestClient().getApiClient()
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
cls.zone = get_zone(cls.api_client, cls.services)
cls.services['mode'] = cls.zone.networktype
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostype"]
)
cls.services["domainid"] = cls.domain.id
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
cls.services["virtual_machine"]["template"] = template.id
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
cls.account = Account.create(
cls.api_client,
cls.services["account"],
domainid=cls.domain.id
)
cls.services["account"] = cls.account.name
cls._cleanup = [
cls.account,
cls.service_offering
]
return
@classmethod
def tearDownClass(cls):
try:
cls.api_client = super(TestAuthorizeIngressRule, cls).getClsTestClient().getApiClient()
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
@attr(tags = ["sg", "eip"])
def test_01_authorizeIngressRule(self):
"""Test authorize ingress rule
"""
# Validate the following:
#1. Create Security group for the account.
#2. Createsecuritygroup (ssh-incoming) for this account
#3. authorizeSecurityGroupIngress to allow ssh access to the VM
#4. deployVirtualMachine into this security group (ssh-incoming)
security_group = SecurityGroup.create(
self.apiclient,
self.services["security_group"],
account=self.account.name,
domainid=self.account.domainid
)
self.debug("Created security group with ID: %s" % security_group.id)
# Default Security group should not have any ingress rule
sercurity_groups = SecurityGroup.list(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(sercurity_groups, list),
True,
"Check for list security groups response"
)
self.assertEqual(
len(sercurity_groups),
2,
"Check List Security groups response"
)
# Authorize Security group to SSH to VM
ingress_rule = security_group.authorize(
self.apiclient,
self.services["security_group"],
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(ingress_rule, dict),
True,
"Check ingress rule created properly"
)
self.debug("Authorizing ingress rule for sec group ID: %s for ssh access"
% security_group.id)
self.virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
securitygroupids=[security_group.id]
)
self.debug("Deploying VM in account: %s" % self.account.name)
# Should be able to SSH VM
try:
self.debug("SSH into VM: %s" % self.virtual_machine.id)
self.virtual_machine.get_ssh_client()
except Exception as e:
self.fail("SSH Access failed for %s: %s" % \
(self.virtual_machine.ipaddress, e)
)
return
class TestRevokeIngressRule(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
return
def tearDown(self):
try:
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = super(TestRevokeIngressRule, cls).getClsTestClient().getApiClient()
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
cls.zone = get_zone(cls.api_client, cls.services)
cls.services['mode'] = cls.zone.networktype
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostype"]
)
cls.services["domainid"] = cls.domain.id
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
cls.services["virtual_machine"]["template"] = template.id
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
cls.account = Account.create(
cls.api_client,
cls.services["account"],
domainid=cls.domain.id
)
cls.services["account"] = cls.account.name
cls._cleanup = [
cls.account,
cls.service_offering
]
return
@classmethod
def tearDownClass(cls):
try:
cls.api_client = super(TestRevokeIngressRule, cls).getClsTestClient().getApiClient()
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
@attr(tags = ["sg", "eip"])
def test_01_revokeIngressRule(self):
"""Test revoke ingress rule
"""
# Validate the following:
#1. Create Security group for the account.
#2. Createsecuritygroup (ssh-incoming) for this account
#3. authorizeSecurityGroupIngress to allow ssh access to the VM
#4. deployVirtualMachine into this security group (ssh-incoming)
#5. Revoke the ingress rule, SSH access should fail
security_group = SecurityGroup.create(
self.apiclient,
self.services["security_group"],
account=self.account.name,
domainid=self.account.domainid
)
self.debug("Created security group with ID: %s" % security_group.id)
# Default Security group should not have any ingress rule
sercurity_groups = SecurityGroup.list(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(sercurity_groups, list),
True,
"Check for list security groups response"
)
self.assertEqual(
len(sercurity_groups),
2,
"Check List Security groups response"
)
# Authorize Security group to SSH to VM
self.debug("Authorizing ingress rule for sec group ID: %s for ssh access"
% security_group.id)
ingress_rule = security_group.authorize(
self.apiclient,
self.services["security_group"],
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(ingress_rule, dict),
True,
"Check ingress rule created properly"
)
ssh_rule = (ingress_rule["ingressrule"][0]).__dict__
self.virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
securitygroupids=[security_group.id]
)
self.debug("Deploying VM in account: %s" % self.account.name)
# Should be able to SSH VM
try:
self.debug("SSH into VM: %s" % self.virtual_machine.id)
self.virtual_machine.get_ssh_client()
except Exception as e:
self.fail("SSH Access failed for %s: %s" % \
(self.virtual_machine.ipaddress, e)
)
self.debug("Revoking ingress rule for sec group ID: %s for ssh access"
% security_group.id)
# Revoke Security group to SSH to VM
result = security_group.revoke(
self.apiclient,
id=ssh_rule["ruleid"]
)
# SSH Attempt to VM should fail
with self.assertRaises(Exception):
self.debug("SSH into VM: %s" % self.virtual_machine.id)
remoteSSHClient.remoteSSHClient(
self.virtual_machine.ssh_ip,
self.virtual_machine.ssh_port,
self.virtual_machine.username,
self.virtual_machine.password
)
return
class TestDhcpOnlyRouter(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
return
def tearDown(self):
try:
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = super(TestDhcpOnlyRouter, cls).getClsTestClient().getApiClient()
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
cls.zone = get_zone(cls.api_client, cls.services)
cls.services['mode'] = cls.zone.networktype
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostype"]
)
cls.services["domainid"] = cls.domain.id
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
cls.services["virtual_machine"]["template"] = template.id
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
cls.account = Account.create(
cls.api_client,
cls.services["account"],
domainid=cls.domain.id
)
cls.services["account"] = cls.account.name
cls.virtual_machine = VirtualMachine.create(
cls.api_client,
cls.services["virtual_machine"],
accountid=cls.account.name,
domainid=cls.account.domainid,
serviceofferingid=cls.service_offering.id
)
cls._cleanup = [
cls.account,
cls.service_offering
]
return
@classmethod
def tearDownClass(cls):
try:
cls.api_client = super(TestDhcpOnlyRouter, cls).getClsTestClient().getApiClient()
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
@attr(tags = ["sg", "eip", "basic"])
def test_01_dhcpOnlyRouter(self):
"""Test router services for user account
"""
# Validate the following
#1. List routers for any user account
#2. The only service supported by this router should be dhcp
# Find router associated with user account
list_router_response = list_routers(
self.apiclient,
zoneid=self.zone.id,
listall=True
)
self.assertEqual(
isinstance(list_router_response, list),
True,
"Check list response returns a valid list"
)
router = list_router_response[0]
hosts = list_hosts(
self.apiclient,
zoneid=router.zoneid,
type='Routing',
state='Up',
id=router.hostid
)
self.assertEqual(
isinstance(hosts, list),
True,
"Check list host returns a valid list"
)
host = hosts[0]
self.debug("Router ID: %s, state: %s" % (router.id, router.state))
self.assertEqual(
router.state,
'Running',
"Check list router response for router state"
)
result = get_process_status(
host.ipaddress,
self.services['host']["publicport"],
self.services['host']["username"],
self.services['host']["password"],
router.linklocalip,
"service dnsmasq status"
)
res = str(result)
self.debug("Dnsmasq process status: %s" % res)
self.assertEqual(
res.count("running"),
1,
"Check dnsmasq service is running or not"
)
return
class TestdeployVMWithUserData(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
return
def tearDown(self):
try:
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = super(TestdeployVMWithUserData, cls).getClsTestClient().getApiClient()
# Get Zone, Domain and templates
cls.domain = get_domain(cls.api_client, cls.services)
cls.zone = get_zone(cls.api_client, cls.services)
cls.services['mode'] = cls.zone.networktype
template = get_template(
cls.api_client,
cls.zone.id,
cls.services["ostype"]
)
cls.services["domainid"] = cls.domain.id
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
cls.services["virtual_machine"]["template"] = template.id
cls.service_offering = ServiceOffering.create(
cls.api_client,
cls.services["service_offering"]
)
cls.account = Account.create(
cls.api_client,
cls.services["account"],
domainid=cls.domain.id
)
cls.services["account"] = cls.account.name
cls._cleanup = [
cls.account,
cls.service_offering
]
return
@classmethod
def tearDownClass(cls):
try:
cls.api_client = super(TestdeployVMWithUserData, cls).getClsTestClient().getApiClient()
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
@attr(tags = ["sg", "eip"])
def test_01_deployVMWithUserData(self):
"""Test Deploy VM with User data"""
# Validate the following
# 1. CreateAccount of type user
# 2. CreateSecurityGroup ssh-incoming
# 3. authorizeIngressRule to allow ssh-access
# 4. deployVirtualMachine into this group with some base64 encoded user-data
# 5. wget http://10.1.1.1/latest/user-data to get the latest userdata from the
# router for this VM
# Find router associated with user account
list_router_response = list_routers(
self.apiclient,
zoneid=self.zone.id,
listall=True
)
self.assertEqual(
isinstance(list_router_response, list),
True,
"Check list response returns a valid list"
)
router = list_router_response[0]
security_group = SecurityGroup.create(
self.apiclient,
self.services["security_group"],
account=self.account.name,
domainid=self.account.domainid
)
self.debug("Created security group with ID: %s" % security_group.id)
# Default Security group should not have any ingress rule
sercurity_groups = SecurityGroup.list(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(sercurity_groups, list),
True,
"Check for list security groups response"
)
self.assertEqual(
len(sercurity_groups),
2,
"Check List Security groups response"
)
self.debug(
"Authorize Ingress Rule for Security Group %s for account: %s" \
% (
security_group.id,
self.account.name
))
# Authorize Security group to SSH to VM
ingress_rule = security_group.authorize(
self.apiclient,
self.services["security_group"],
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(ingress_rule, dict),
True,
"Check ingress rule created properly"
)
self.virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
securitygroupids=[security_group.id]
)
self.debug("Deploying VM in account: %s" % self.account.name)
# Should be able to SSH VM
try:
self.debug(
"SSH to VM with IP Address: %s"\
% self.virtual_machine.ssh_ip
)
ssh = self.virtual_machine.get_ssh_client()
except Exception as e:
self.fail("SSH Access failed for %s: %s" % \
(self.virtual_machine.ipaddress, e)
)
cmds = [
"wget http://%s/latest/user-data" % router.guestipaddress,
"cat user-data",
]
for c in cmds:
result = ssh.execute(c)
self.debug("%s: %s" % (c, result))
res = str(result)
self.assertEqual(
res.count(self.services["virtual_machine"]["userdata"]),
1,
"Verify user data"
)
return
class TestDeleteSecurityGroup(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.services = Services().services
# Get Zone, Domain and templates
self.domain = get_domain(self.apiclient, self.services)
self.zone = get_zone(self.apiclient, self.services)
self.services['mode'] = self.zone.networktype
template = get_template(
self.apiclient,
self.zone.id,
self.services["ostype"]
)
self.services["domainid"] = self.domain.id
self.services["virtual_machine"]["zoneid"] = self.zone.id
self.services["virtual_machine"]["template"] = template.id
self.service_offering = ServiceOffering.create(
self.apiclient,
self.services["service_offering"]
)
self.account = Account.create(
self.apiclient,
self.services["account"],
domainid=self.domain.id
)
self.services["account"] = self.account.name
self.cleanup = [
self.account,
self.service_offering
]
return
def tearDown(self):
try:
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = super(TestDeleteSecurityGroup, cls).getClsTestClient().getApiClient()
cls._cleanup = []
return
@classmethod
def tearDownClass(cls):
try:
cls.api_client = super(TestDeleteSecurityGroup, cls).getClsTestClient().getApiClient()
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
@attr(tags = ["sg", "eip"])
def test_01_delete_security_grp_running_vm(self):
"""Test delete security group with running VM"""
# Validate the following
# 1. createsecuritygroup (ssh-incoming) for this account
# 2. authorizeSecurityGroupIngress to allow ssh access to the VM
# 3. deployVirtualMachine into this security group (ssh-incoming)
# 4. deleteSecurityGroup created in step 1. Deletion should fail
# complaining there are running VMs in this group
security_group = SecurityGroup.create(
self.apiclient,
self.services["security_group"],
account=self.account.name,
domainid=self.account.domainid
)
self.debug("Created security group with ID: %s" % security_group.id)
# Default Security group should not have any ingress rule
sercurity_groups = SecurityGroup.list(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(sercurity_groups, list),
True,
"Check for list security groups response"
)
self.assertEqual(
len(sercurity_groups),
2,
"Check List Security groups response"
)
self.debug(
"Authorize Ingress Rule for Security Group %s for account: %s" \
% (
security_group.id,
self.account.name
))
# Authorize Security group to SSH to VM
ingress_rule = security_group.authorize(
self.apiclient,
self.services["security_group"],
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(ingress_rule, dict),
True,
"Check ingress rule created properly"
)
self.virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
securitygroupids=[security_group.id]
)
self.debug("Deploying VM in account: %s" % self.account.name)
# Deleting Security group should raise exception
security_group.delete(self.apiclient)
#sleep to ensure that Security group is deleted properly
time.sleep(self.services["sleep"])
# Default Security group should not have any ingress rule
sercurity_groups = SecurityGroup.list(
self.apiclient,
id=security_group.id
)
self.assertNotEqual(
sercurity_groups,
None,
"Check List Security groups response"
)
return
@attr(tags = ["sg", "eip"])
def test_02_delete_security_grp_withoout_running_vm(self):
"""Test delete security group without running VM"""
# Validate the following
# 1. createsecuritygroup (ssh-incoming) for this account
# 2. authorizeSecurityGroupIngress to allow ssh access to the VM
# 3. deployVirtualMachine into this security group (ssh-incoming)
# 4. deleteSecurityGroup created in step 1. Deletion should fail
# complaining there are running VMs in this group
security_group = SecurityGroup.create(
self.apiclient,
self.services["security_group"],
account=self.account.name,
domainid=self.account.domainid
)
self.debug("Created security group with ID: %s" % security_group.id)
# Default Security group should not have any ingress rule
sercurity_groups = SecurityGroup.list(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(sercurity_groups, list),
True,
"Check for list security groups response"
)
self.assertEqual(
len(sercurity_groups),
2,
"Check List Security groups response"
)
self.debug(
"Authorize Ingress Rule for Security Group %s for account: %s" \
% (
security_group.id,
self.account.name
))
# Authorize Security group to SSH to VM
ingress_rule = security_group.authorize(
self.apiclient,
self.services["security_group"],
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(ingress_rule, dict),
True,
"Check ingress rule created properly"
)
self.virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
securitygroupids=[security_group.id]
)
self.debug("Deploying VM in account: %s" % self.account.name)
# Destroy the VM
self.virtual_machine.delete(self.apiclient)
config = list_configurations(
self.apiclient,
name='expunge.delay'
)
self.assertEqual(
isinstance(config, list),
True,
"Check list configurations response"
)
response = config[0]
self.debug("expunge.delay: %s" % response.value)
# Wait for some time more than expunge.delay
time.sleep(int(response.value) * 2)
# Deleting Security group should raise exception
try:
self.debug("Deleting Security Group: %s" % security_group.id)
security_group.delete(self.apiclient)
except Exception as e:
self.fail("Failed to delete security group - ID: %s" \
% security_group.id
)
return
class TestIngressRule(cloudstackTestCase):
def setUp(self):
self.apiclient = self.testClient.getApiClient()
self.dbclient = self.testClient.getDbConnection()
self.cleanup = []
self.services = Services().services
# Get Zone, Domain and templates
self.domain = get_domain(self.apiclient, self.services)
self.zone = get_zone(self.apiclient, self.services)
self.services['mode'] = self.zone.networktype
template = get_template(
self.apiclient,
self.zone.id,
self.services["ostype"]
)
self.services["domainid"] = self.domain.id
self.services["virtual_machine"]["zoneid"] = self.zone.id
self.services["virtual_machine"]["template"] = template.id
self.service_offering = ServiceOffering.create(
self.apiclient,
self.services["service_offering"]
)
self.account = Account.create(
self.apiclient,
self.services["account"],
domainid=self.domain.id
)
self.services["account"] = self.account.name
self.cleanup = [
self.account,
self.service_offering
]
return
def tearDown(self):
try:
#Clean up, terminate the created templates
cleanup_resources(self.apiclient, self.cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
@classmethod
def setUpClass(cls):
cls.services = Services().services
cls.api_client = super(TestIngressRule, cls).getClsTestClient().getApiClient()
cls._cleanup = []
return
@classmethod
def tearDownClass(cls):
try:
cls.api_client = super(TestIngressRule, cls).getClsTestClient().getApiClient()
#Cleanup resources used
cleanup_resources(cls.api_client, cls._cleanup)
except Exception as e:
raise Exception("Warning: Exception during cleanup : %s" % e)
return
@attr(tags = ["sg", "eip"])
def test_01_authorizeIngressRule_AfterDeployVM(self):
"""Test delete security group with running VM"""
# Validate the following
# 1. createsecuritygroup (ssh-incoming, 22via22) for this account
# 2. authorizeSecurityGroupIngress to allow ssh access to the VM
# 3. deployVirtualMachine into this security group (ssh-incoming)
# 4. authorizeSecurityGroupIngress to allow ssh access (startport:222 to
# endport:22) to the VM
security_group = SecurityGroup.create(
self.apiclient,
self.services["security_group"],
account=self.account.name,
domainid=self.account.domainid
)
self.debug("Created security group with ID: %s" % security_group.id)
# Default Security group should not have any ingress rule
sercurity_groups = SecurityGroup.list(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(sercurity_groups, list),
True,
"Check for list security groups response"
)
self.assertEqual(
len(sercurity_groups),
2,
"Check List Security groups response"
)
self.debug(
"Authorize Ingress Rule for Security Group %s for account: %s" \
% (
security_group.id,
self.account.name
))
# Authorize Security group to SSH to VM
ingress_rule_1 = security_group.authorize(
self.apiclient,
self.services["security_group"],
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(ingress_rule_1, dict),
True,
"Check ingress rule created properly"
)
self.virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
securitygroupids=[security_group.id]
)
self.debug("Deploying VM in account: %s" % self.account.name)
self.debug(
"Authorize Ingress Rule for Security Group %s for account: %s" \
% (
security_group.id,
self.account.name
))
# Authorize Security group to SSH to VM
ingress_rule_2 = security_group.authorize(
self.apiclient,
self.services["security_group_2"],
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(ingress_rule_2, dict),
True,
"Check ingress rule created properly"
)
# SSH should be allowed on 22 & 2222 ports
try:
self.debug("Trying to SSH into VM %s on port %s" % (
self.virtual_machine.ssh_ip,
self.services["security_group"]["endport"]
))
self.virtual_machine.get_ssh_client()
except Exception as e:
self.fail("SSH access failed for ingress rule ID: %s, %s" \
% (ingress_rule_1["id"], e))
# User should be able to ping VM
try:
self.debug("Trying to ping VM %s" % self.virtual_machine.ssh_ip)
result = subprocess.call(['ping', '-c 1', self.virtual_machine.ssh_ip])
self.debug("Ping result: %s" % result)
# if ping successful, then result should be 0
self.assertEqual(
result,
0,
"Check if ping is successful or not"
)
except Exception as e:
self.fail("Ping failed for ingress rule ID: %s, %s" \
% (ingress_rule_2["id"], e))
return
@attr(tags = ["sg", "eip"])
def test_02_revokeIngressRule_AfterDeployVM(self):
"""Test Revoke ingress rule after deploy VM"""
# Validate the following
# 1. createsecuritygroup (ssh-incoming, 22via22) for this account
# 2. authorizeSecurityGroupIngress to allow ssh access to the VM
# 3. deployVirtualMachine into this security group (ssh-incoming)
# 4. authorizeSecurityGroupIngress to allow ssh access (startport:222
# to endport:22) to the VM
# 5. check ssh access via port 222
# 6. revokeSecurityGroupIngress to revoke rule added in step 5. verify
# that ssh-access into the VM is now NOT allowed through ports 222
# but allowed through port 22
security_group = SecurityGroup.create(
self.apiclient,
self.services["security_group"],
account=self.account.name,
domainid=self.account.domainid
)
self.debug("Created security group with ID: %s" % security_group.id)
# Default Security group should not have any ingress rule
sercurity_groups = SecurityGroup.list(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(sercurity_groups, list),
True,
"Check for list security groups response"
)
self.assertEqual(
len(sercurity_groups),
2,
"Check List Security groups response"
)
self.debug(
"Authorize Ingress Rule for Security Group %s for account: %s" \
% (
security_group.id,
self.account.name
))
# Authorize Security group to SSH to VM
ingress_rule = security_group.authorize(
self.apiclient,
self.services["security_group"],
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(ingress_rule, dict),
True,
"Check ingress rule created properly"
)
self.virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
securitygroupids=[security_group.id]
)
self.debug("Deploying VM in account: %s" % self.account.name)
self.debug(
"Authorize Ingress Rule for Security Group %s for account: %s" \
% (
security_group.id,
self.account.name
))
# Authorize Security group to SSH to VM
ingress_rule_2 = security_group.authorize(
self.apiclient,
self.services["security_group_2"],
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(ingress_rule_2, dict),
True,
"Check ingress rule created properly"
)
ssh_rule = (ingress_rule["ingressrule"][0]).__dict__
icmp_rule = (ingress_rule_2["ingressrule"][0]).__dict__
# SSH should be allowed on 22
try:
self.debug("Trying to SSH into VM %s on port %s" % (
self.virtual_machine.ssh_ip,
self.services["security_group"]["endport"]
))
self.virtual_machine.get_ssh_client()
except Exception as e:
self.fail("SSH access failed for ingress rule ID: %s, %s" \
% (ssh_rule["ruleid"], e))
# User should be able to ping VM
try:
self.debug("Trying to ping VM %s" % self.virtual_machine.ssh_ip)
result = subprocess.call(['ping', '-c 1', self.virtual_machine.ssh_ip])
self.debug("Ping result: %s" % result)
# if ping successful, then result should be 0
self.assertEqual(
result,
0,
"Check if ping is successful or not"
)
except Exception as e:
self.fail("Ping failed for ingress rule ID: %s, %s" \
% (icmp_rule["ruleid"], e))
self.debug(
"Revoke Ingress Rule for Security Group %s for account: %s" \
% (
security_group.id,
self.account.name
))
result = security_group.revoke(
self.apiclient,
id=icmp_rule["ruleid"]
)
self.debug("Revoke ingress rule result: %s" % result)
time.sleep(self.services["sleep"])
# User should not be able to ping VM
try:
self.debug("Trying to ping VM %s" % self.virtual_machine.ssh_ip)
result = subprocess.call(['ping', '-c 1', self.virtual_machine.ssh_ip])
self.debug("Ping result: %s" % result)
# if ping successful, then result should be 0
self.assertNotEqual(
result,
0,
"Check if ping is successful or not"
)
except Exception as e:
self.fail("Ping failed for ingress rule ID: %s, %s" \
% (icmp_rule["ruleid"], e))
return
@attr(tags = ["sg", "eip"])
def test_03_stopStartVM_verifyIngressAccess(self):
"""Test Start/Stop VM and Verify ingress rule"""
# Validate the following
# 1. createsecuritygroup (ssh-incoming, 22via22) for this account
# 2. authorizeSecurityGroupIngress to allow ssh access to the VM
# 3. deployVirtualMachine into this security group (ssh-incoming)
# 4. once the VM is running and ssh-access is available,
# stopVirtualMachine
# 5. startVirtualMachine. After stop start of the VM is complete
# verify that ssh-access to the VM is allowed
security_group = SecurityGroup.create(
self.apiclient,
self.services["security_group"],
account=self.account.name,
domainid=self.account.domainid
)
self.debug("Created security group with ID: %s" % security_group.id)
# Default Security group should not have any ingress rule
sercurity_groups = SecurityGroup.list(
self.apiclient,
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(sercurity_groups, list),
True,
"Check for list security groups response"
)
self.assertEqual(
len(sercurity_groups),
2,
"Check List Security groups response"
)
self.debug(
"Authorize Ingress Rule for Security Group %s for account: %s" \
% (
security_group.id,
self.account.name
))
# Authorize Security group to SSH to VM
ingress_rule = security_group.authorize(
self.apiclient,
self.services["security_group"],
account=self.account.name,
domainid=self.account.domainid
)
self.assertEqual(
isinstance(ingress_rule, dict),
True,
"Check ingress rule created properly"
)
self.virtual_machine = VirtualMachine.create(
self.apiclient,
self.services["virtual_machine"],
accountid=self.account.name,
domainid=self.account.domainid,
serviceofferingid=self.service_offering.id,
securitygroupids=[security_group.id]
)
self.debug("Deploying VM in account: %s" % self.account.name)
# SSH should be allowed on 22 port
try:
self.debug("Trying to SSH into VM %s" % self.virtual_machine.ssh_ip)
self.virtual_machine.get_ssh_client()
except Exception as e:
self.fail("SSH access failed for ingress rule ID: %s" \
% ingress_rule["id"]
)
self.virtual_machine.stop(self.apiclient)
# Sleep to ensure that VM is in stopped state
time.sleep(self.services["sleep"])
self.virtual_machine.start(self.apiclient)
# Sleep to ensure that VM is in running state
time.sleep(self.services["sleep"])
# SSH should be allowed on 22 port after restart
try:
self.debug("Trying to SSH into VM %s" % self.virtual_machine.ssh_ip)
self.virtual_machine.get_ssh_client()
except Exception as e:
self.fail("SSH access failed for ingress rule ID: %s" \
% ingress_rule["id"]
)
return