mirror of
https://github.com/apache/cloudstack.git
synced 2025-10-26 08:42:29 +01:00
2131 lines
86 KiB
Python
2131 lines
86 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
""" P1 for Egresss & Ingress rules
|
|
"""
|
|
#Import Local Modules
|
|
from nose.plugins.attrib import attr
|
|
from marvin.cloudstackTestCase import cloudstackTestCase
|
|
from marvin.lib.utils import (random_gen,
|
|
cleanup_resources)
|
|
from marvin.lib.base import (SecurityGroup,
|
|
VirtualMachine,
|
|
Account,
|
|
ServiceOffering)
|
|
from marvin.lib.common import (get_domain,
|
|
get_zone,
|
|
get_template,
|
|
list_virtual_machines)
|
|
|
|
class Services:
|
|
"""Test Security groups Services
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.services = {
|
|
"disk_offering": {
|
|
"displaytext": "Small",
|
|
"name": "Small",
|
|
"disksize": 1
|
|
},
|
|
"account": {
|
|
"email": "test@test.com",
|
|
"firstname": "Test",
|
|
"lastname": "User",
|
|
"username": "test",
|
|
# Random characters are appended in create account to
|
|
# ensure unique username generated each time
|
|
"password": "password",
|
|
},
|
|
"virtual_machine": {
|
|
# Create a small virtual machine instance with disk offering
|
|
"displayname": "Test VM",
|
|
"username": "root", # VM creds for SSH
|
|
"password": "password",
|
|
"ssh_port": 22,
|
|
"hypervisor": 'XenServer',
|
|
"privateport": 22,
|
|
"publicport": 22,
|
|
"protocol": 'TCP',
|
|
"userdata": 'This is sample data',
|
|
},
|
|
"service_offering": {
|
|
"name": "Tiny Instance",
|
|
"displaytext": "Tiny Instance",
|
|
"cpunumber": 1,
|
|
"cpuspeed": 100, # in MHz
|
|
"memory": 128, # In MBs
|
|
},
|
|
"security_group": {
|
|
"name": 'SSH',
|
|
"protocol": 'TCP',
|
|
"startport": 22,
|
|
"endport": 22,
|
|
"cidrlist": '0.0.0.0/0',
|
|
},
|
|
"egress_icmp": {
|
|
"protocol": 'ICMP',
|
|
"icmptype": '-1',
|
|
"icmpcode": '-1',
|
|
"cidrlist": '0.0.0.0/0',
|
|
},
|
|
"sg_invalid_port": {
|
|
"name": 'SSH',
|
|
"protocol": 'TCP',
|
|
"startport": -22,
|
|
"endport": -22,
|
|
"cidrlist": '0.0.0.0/0',
|
|
},
|
|
"sg_invalid_cidr": {
|
|
"name": 'SSH',
|
|
"protocol": 'TCP',
|
|
"startport": 22,
|
|
"endport": 22,
|
|
"cidrlist": '0.0.0.10'
|
|
},
|
|
"sg_cidr_anywhere": {
|
|
"name": 'SSH',
|
|
"protocol": 'TCP',
|
|
"startport": 22,
|
|
"endport": 22,
|
|
"cidrlist": '0.0.0.0/0'
|
|
},
|
|
"sg_cidr_restricted": {
|
|
"name": 'SSH',
|
|
"protocol": 'TCP',
|
|
"startport": 22,
|
|
"endport": 22,
|
|
"cidrlist": '10.0.0.1/24',
|
|
},
|
|
"sg_account": {
|
|
"name": 'SSH',
|
|
"protocol": 'TCP',
|
|
"startport": 22,
|
|
"endport": 22,
|
|
"cidrlist": '0.0.0.0/0'
|
|
},
|
|
"mgmt_server": {
|
|
"username": "root",
|
|
"password": "password",
|
|
"ipaddress": "192.168.100.21"
|
|
},
|
|
"ostype": 'CentOS 5.3 (64-bit)',
|
|
# CentOS 5.3 (64-bit)
|
|
"sleep": 60,
|
|
"timeout": 10,
|
|
}
|
|
|
|
class TestDefaultSecurityGroupEgress(cloudstackTestCase):
|
|
|
|
def setUp(self):
|
|
|
|
self.apiclient = self.testClient.getApiClient()
|
|
self.dbclient = self.testClient.getDbConnection()
|
|
self.cleanup = []
|
|
return
|
|
|
|
def tearDown(self):
|
|
try:
|
|
#Clean up, terminate the created templates
|
|
cleanup_resources(self.apiclient, self.cleanup)
|
|
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
return
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.testClient = super(TestDefaultSecurityGroupEgress, cls).getClsTestClient()
|
|
cls.api_client = cls.testClient.getApiClient()
|
|
|
|
cls.services = Services().services
|
|
# Get Zone, Domain and templates
|
|
cls.domain = get_domain(cls.api_client)
|
|
cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests())
|
|
cls.services['mode'] = cls.zone.networktype
|
|
|
|
template = get_template(
|
|
cls.api_client,
|
|
cls.zone.id,
|
|
cls.services["ostype"]
|
|
)
|
|
cls.services["domainid"] = cls.domain.id
|
|
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
|
|
cls.services["virtual_machine"]["template"] = template.id
|
|
|
|
cls.service_offering = ServiceOffering.create(
|
|
cls.api_client,
|
|
cls.services["service_offering"]
|
|
)
|
|
cls.account = Account.create(
|
|
cls.api_client,
|
|
cls.services["account"],
|
|
admin=True,
|
|
domainid=cls.domain.id
|
|
)
|
|
cls.services["account"] = cls.account.name
|
|
|
|
cls._cleanup = [
|
|
cls.account,
|
|
cls.service_offering
|
|
]
|
|
return
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
try:
|
|
#Cleanup resources used
|
|
cleanup_resources(cls.api_client, cls._cleanup)
|
|
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
|
|
return
|
|
|
|
@attr(tags = ["sg", "eip", "advancedsg"])
|
|
def test_deployVM_InDefaultSecurityGroup(self):
|
|
"""Test deploy VM in default security group with no egress rules
|
|
"""
|
|
|
|
|
|
# Validate the following:
|
|
# 1. Deploy a VM.
|
|
# 2. Deployed VM should be running, verify with listVirtualMachiens
|
|
# 3. listSecurityGroups for this account. should list the default
|
|
# security group with no egress rules
|
|
# 4. listVirtualMachines should show that the VM belongs to default
|
|
# security group
|
|
|
|
self.debug("Deploying VM in account: %s" % self.account.name)
|
|
self.virtual_machine = VirtualMachine.create(
|
|
self.apiclient,
|
|
self.services["virtual_machine"],
|
|
accountid=self.account.name,
|
|
domainid=self.account.domainid,
|
|
serviceofferingid=self.service_offering.id
|
|
)
|
|
self.debug("Deployed VM with ID: %s" % self.virtual_machine.id)
|
|
self.cleanup.append(self.virtual_machine)
|
|
|
|
list_vm_response = list_virtual_machines(
|
|
self.apiclient,
|
|
id=self.virtual_machine.id
|
|
)
|
|
self.debug(
|
|
"Verify listVirtualMachines response for virtual machine: %s" \
|
|
% self.virtual_machine.id
|
|
)
|
|
self.assertEqual(
|
|
isinstance(list_vm_response, list),
|
|
True,
|
|
"Check for list VM response"
|
|
)
|
|
vm_response = list_vm_response[0]
|
|
self.assertNotEqual(
|
|
len(list_vm_response),
|
|
0,
|
|
"Check VM available in List Virtual Machines"
|
|
)
|
|
|
|
self.assertEqual(
|
|
|
|
vm_response.id,
|
|
self.virtual_machine.id,
|
|
"Check virtual machine id in listVirtualMachines"
|
|
)
|
|
|
|
self.assertEqual(
|
|
vm_response.state,
|
|
'Running',
|
|
"VM state should be running"
|
|
)
|
|
self.assertEqual(
|
|
hasattr(vm_response, "securitygroup"),
|
|
True,
|
|
"List VM response should have atleast one security group"
|
|
)
|
|
|
|
# Verify listSecurity groups response
|
|
security_groups = SecurityGroup.list(
|
|
self.apiclient,
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
self.assertEqual(
|
|
isinstance(security_groups, list),
|
|
True,
|
|
"Check for list security groups response"
|
|
)
|
|
self.assertEqual(
|
|
len(security_groups),
|
|
1,
|
|
"Check List Security groups response"
|
|
)
|
|
self.debug("List Security groups response: %s" %
|
|
str(security_groups))
|
|
sec_grp = security_groups[0]
|
|
self.assertEqual(
|
|
sec_grp.name,
|
|
'default',
|
|
"List Sec Group should only list default sec. group"
|
|
)
|
|
return
|
|
|
|
class TestAuthorizeIngressRule(cloudstackTestCase):
|
|
|
|
def setUp(self):
|
|
|
|
self.apiclient = self.testClient.getApiClient()
|
|
self.dbclient = self.testClient.getDbConnection()
|
|
self.cleanup = []
|
|
return
|
|
|
|
def tearDown(self):
|
|
try:
|
|
#Clean up, terminate the created templates
|
|
cleanup_resources(self.apiclient, self.cleanup)
|
|
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
return
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.testClient = super(TestAuthorizeIngressRule, cls).getClsTestClient()
|
|
cls.api_client = cls.testClient.getApiClient()
|
|
|
|
cls.services = Services().services
|
|
# Get Zone, Domain and templates
|
|
cls.domain = get_domain(cls.api_client)
|
|
cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests())
|
|
cls.services['mode'] = cls.zone.networktype
|
|
|
|
template = get_template(
|
|
cls.api_client,
|
|
cls.zone.id,
|
|
cls.services["ostype"]
|
|
)
|
|
cls.services["domainid"] = cls.domain.id
|
|
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
|
|
cls.services["virtual_machine"]["template"] = template.id
|
|
|
|
cls.service_offering = ServiceOffering.create(
|
|
cls.api_client,
|
|
cls.services["service_offering"]
|
|
)
|
|
cls.account = Account.create(
|
|
cls.api_client,
|
|
cls.services["account"],
|
|
domainid=cls.domain.id
|
|
)
|
|
cls.services["account"] = cls.account.name
|
|
cls._cleanup = [
|
|
cls.account,
|
|
cls.service_offering
|
|
]
|
|
return
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
try:
|
|
#Cleanup resources used
|
|
cleanup_resources(cls.api_client, cls._cleanup)
|
|
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
|
|
return
|
|
|
|
@attr(tags = ["sg", "eip", "advancedsg"])
|
|
def test_authorizeIngressRule(self):
|
|
"""Test authorize ingress rule
|
|
"""
|
|
|
|
|
|
# Validate the following:
|
|
# 1. createaccount of type user
|
|
# 2. createsecuritygroup (ssh) for this account
|
|
# 3. authorizeSecurityGroupIngress to allow ssh access to the VM
|
|
# 4. deployVirtualMachine into this security group (ssh). deployed VM
|
|
# should be Running
|
|
# 5. listSecurityGroups should show two groups, default and ssh
|
|
# 6. verify that ssh-access into the VM is now allowed
|
|
# 7. verify from within the VM is able to ping outside world
|
|
# (ping www.google.com)
|
|
|
|
security_group = SecurityGroup.create(
|
|
self.apiclient,
|
|
self.services["security_group"],
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
self.debug("Created security group with ID: %s" % security_group.id)
|
|
# Default Security group should not have any ingress rule
|
|
sercurity_groups = SecurityGroup.list(
|
|
self.apiclient,
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
self.assertEqual(
|
|
isinstance(sercurity_groups, list),
|
|
True,
|
|
"Check for list security groups response"
|
|
)
|
|
|
|
self.assertEqual(
|
|
len(sercurity_groups),
|
|
2,
|
|
"Check List Security groups response"
|
|
)
|
|
# Authorize Security group to SSH to VM
|
|
ingress_rule = security_group.authorize(
|
|
self.apiclient,
|
|
self.services["security_group"],
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
self.assertEqual(
|
|
isinstance(ingress_rule, dict),
|
|
True,
|
|
"Check ingress rule created properly"
|
|
)
|
|
|
|
self.debug("Authorizing ingress rule for sec group ID: %s for ssh access"
|
|
% security_group.id)
|
|
self.virtual_machine = VirtualMachine.create(
|
|
self.apiclient,
|
|
self.services["virtual_machine"],
|
|
accountid=self.account.name,
|
|
domainid=self.account.domainid,
|
|
serviceofferingid=self.service_offering.id,
|
|
securitygroupids=[security_group.id],
|
|
mode=self.services['mode']
|
|
)
|
|
self.debug("Deploying VM in account: %s" % self.account.name)
|
|
# Should be able to SSH VM
|
|
try:
|
|
self.debug("SSH into VM: %s" % self.virtual_machine.ssh_ip)
|
|
ssh = self.virtual_machine.get_ssh_client()
|
|
|
|
# Ping to outsite world
|
|
res = ssh.execute("ping -c 1 www.google.com")
|
|
# res = 64 bytes from maa03s17-in-f20.1e100.net (74.125.236.212):
|
|
# icmp_req=1 ttl=57 time=25.9 ms
|
|
# --- www.l.google.com ping statistics ---
|
|
# 1 packets transmitted, 1 received, 0% packet loss, time 0ms
|
|
# rtt min/avg/max/mdev = 25.970/25.970/25.970/0.000 ms
|
|
except Exception as e:
|
|
self.fail("SSH Access failed for %s: %s" % \
|
|
(self.virtual_machine.ipaddress, e)
|
|
)
|
|
|
|
result = str(res)
|
|
self.assertEqual(
|
|
result.count("1 received"),
|
|
1,
|
|
"Ping to outside world from VM should be successful"
|
|
)
|
|
return
|
|
|
|
|
|
class TestDefaultGroupEgress(cloudstackTestCase):
|
|
|
|
def setUp(self):
|
|
|
|
self.apiclient = self.testClient.getApiClient()
|
|
self.dbclient = self.testClient.getDbConnection()
|
|
self.cleanup = []
|
|
return
|
|
|
|
def tearDown(self):
|
|
try:
|
|
#Clean up, terminate the created templates
|
|
cleanup_resources(self.apiclient, self.cleanup)
|
|
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
return
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.testClient = super(TestDefaultGroupEgress, cls).getClsTestClient()
|
|
cls.api_client = cls.testClient.getApiClient()
|
|
|
|
cls.services = Services().services
|
|
# Get Zone, Domain and templates
|
|
cls.domain = get_domain(cls.api_client)
|
|
cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests())
|
|
cls.services['mode'] = cls.zone.networktype
|
|
|
|
template = get_template(
|
|
cls.api_client,
|
|
cls.zone.id,
|
|
cls.services["ostype"]
|
|
)
|
|
cls.services["domainid"] = cls.domain.id
|
|
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
|
|
cls.services["virtual_machine"]["template"] = template.id
|
|
|
|
cls.service_offering = ServiceOffering.create(
|
|
cls.api_client,
|
|
cls.services["service_offering"]
|
|
)
|
|
cls.account = Account.create(
|
|
cls.api_client,
|
|
cls.services["account"],
|
|
domainid=cls.domain.id
|
|
)
|
|
cls.services["account"] = cls.account.name
|
|
cls._cleanup = [
|
|
cls.account,
|
|
cls.service_offering
|
|
]
|
|
return
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
try:
|
|
#Cleanup resources used
|
|
cleanup_resources(cls.api_client, cls._cleanup)
|
|
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
|
|
return
|
|
|
|
@attr(tags = ["sg", "eip", "advancedsg"])
|
|
def test_01_default_group_with_egress(self):
|
|
"""Test default group with egress rule before VM deploy and ping, ssh
|
|
"""
|
|
|
|
|
|
# Validate the following:
|
|
# 1. createaccount of type user
|
|
# 2. createsecuritygroup (ssh) for this account
|
|
# 3. authorizeSecurityGroupIngress to allow ssh access to the VM
|
|
# 4. authorizeSecurityGroupEgress to allow ssh access only out to
|
|
# CIDR: 0.0.0.0/0
|
|
# 5. deployVirtualMachine into this security group (ssh)
|
|
# 6. deployed VM should be Running, ssh should be allowed into the VM,
|
|
# ping out to google.com from the VM should be successful,
|
|
# ssh from within VM to mgt server should pass
|
|
|
|
security_group = SecurityGroup.create(
|
|
self.apiclient,
|
|
self.services["security_group"],
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
self.debug("Created security group with ID: %s" % security_group.id)
|
|
|
|
# Default Security group should not have any ingress rule
|
|
sercurity_groups = SecurityGroup.list(
|
|
self.apiclient,
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
self.assertEqual(
|
|
isinstance(sercurity_groups, list),
|
|
True,
|
|
"Check for list security groups response"
|
|
)
|
|
|
|
self.assertEqual(
|
|
len(sercurity_groups),
|
|
2,
|
|
"Check List Security groups response"
|
|
)
|
|
# Authorize Security group to SSH to VM
|
|
self.debug("Authorizing ingress rule for sec group ID: %s for ssh access"
|
|
% security_group.id)
|
|
ingress_rule = security_group.authorize(
|
|
self.apiclient,
|
|
self.services["security_group"],
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
|
|
self.assertEqual(
|
|
isinstance(ingress_rule, dict),
|
|
True,
|
|
"Check ingress rule created properly"
|
|
)
|
|
|
|
# Authorize Security group to SSH to VM
|
|
self.debug("Authorizing egress rule for sec group ID: %s for ssh access"
|
|
% security_group.id)
|
|
egress_rule = security_group.authorizeEgress(
|
|
self.apiclient,
|
|
self.services["egress_icmp"],
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
|
|
self.assertEqual(
|
|
isinstance(egress_rule, dict),
|
|
True,
|
|
"Check egress rule created properly"
|
|
)
|
|
|
|
self.virtual_machine = VirtualMachine.create(
|
|
self.apiclient,
|
|
self.services["virtual_machine"],
|
|
accountid=self.account.name,
|
|
domainid=self.account.domainid,
|
|
serviceofferingid=self.service_offering.id,
|
|
securitygroupids=[security_group.id],
|
|
mode=self.services['mode']
|
|
)
|
|
self.debug("Deploying VM in account: %s" % self.account.name)
|
|
|
|
# Should be able to SSH VM
|
|
try:
|
|
self.debug("SSH into VM: %s" % self.virtual_machine.ssh_ip)
|
|
|
|
ssh = self.virtual_machine.get_ssh_client()
|
|
|
|
self.debug("Ping to google.com from VM")
|
|
# Ping to outsite world
|
|
res = ssh.execute("ping -c 1 www.google.com")
|
|
# res = 64 bytes from maa03s17-in-f20.1e100.net (74.125.236.212):
|
|
# icmp_req=1 ttl=57 time=25.9 ms
|
|
# --- www.l.google.com ping statistics ---
|
|
# 1 packets transmitted, 1 received, 0% packet loss, time 0ms
|
|
# rtt min/avg/max/mdev = 25.970/25.970/25.970/0.000 ms
|
|
except Exception as e:
|
|
self.fail("SSH Access failed for %s: %s" % \
|
|
(self.virtual_machine.ipaddress, e)
|
|
)
|
|
|
|
result = str(res)
|
|
self.assertEqual(
|
|
result.count("1 received"),
|
|
1,
|
|
"Ping to outside world from VM should be successful"
|
|
)
|
|
|
|
try:
|
|
self.debug("SSHing into management server from VM")
|
|
res = ssh.execute("ssh %s@%s" % (
|
|
self.apiclient.connection.user,
|
|
self.apiclient.connection.mgtSvr
|
|
))
|
|
self.debug("SSH result: %s" % str(res))
|
|
|
|
except Exception as e:
|
|
self.fail("SSH Access failed for %s: %s" % \
|
|
(self.virtual_machine.ipaddress, e)
|
|
)
|
|
result = str(res)
|
|
self.assertNotEqual(
|
|
result.count("No route to host"),
|
|
1,
|
|
"SSH into management server from VM should be successful"
|
|
)
|
|
return
|
|
|
|
|
|
class TestDefaultGroupEgressAfterDeploy(cloudstackTestCase):
|
|
|
|
def setUp(self):
|
|
|
|
self.apiclient = self.testClient.getApiClient()
|
|
self.dbclient = self.testClient.getDbConnection()
|
|
self.cleanup = []
|
|
return
|
|
|
|
def tearDown(self):
|
|
try:
|
|
#Clean up, terminate the created templates
|
|
cleanup_resources(self.apiclient, self.cleanup)
|
|
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
return
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.testClient = super(TestDefaultGroupEgressAfterDeploy, cls).getClsTestClient()
|
|
cls.api_client = cls.testClient.getApiClient()
|
|
|
|
cls.services = Services().services
|
|
# Get Zone, Domain and templates
|
|
cls.domain = get_domain(cls.api_client)
|
|
cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests())
|
|
cls.services['mode'] = cls.zone.networktype
|
|
|
|
template = get_template(
|
|
cls.api_client,
|
|
cls.zone.id,
|
|
cls.services["ostype"]
|
|
)
|
|
cls.services["domainid"] = cls.domain.id
|
|
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
|
|
cls.services["virtual_machine"]["template"] = template.id
|
|
|
|
cls.service_offering = ServiceOffering.create(
|
|
cls.api_client,
|
|
cls.services["service_offering"]
|
|
)
|
|
cls.account = Account.create(
|
|
cls.api_client,
|
|
cls.services["account"],
|
|
domainid=cls.domain.id
|
|
)
|
|
cls.services["account"] = cls.account.name
|
|
cls._cleanup = [
|
|
cls.account,
|
|
cls.service_offering
|
|
]
|
|
return
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
try:
|
|
#Cleanup resources used
|
|
cleanup_resources(cls.api_client, cls._cleanup)
|
|
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
|
|
return
|
|
|
|
@attr(tags = ["sg", "eip", "advancedsg"])
|
|
def test_01_default_group_with_egress(self):
|
|
""" Test default group with egress rule added after vm deploy and ping,
|
|
ssh test
|
|
"""
|
|
|
|
|
|
# Validate the following:
|
|
# 1. createaccount of type user
|
|
# 2. createsecuritygroup (ssh) for this account
|
|
# 3. authorizeSecurityGroupIngress to allow ssh access to the VM
|
|
# 4. deployVirtualMachine into this security group (ssh)
|
|
# 5. authorizeSecurityGroupEgress to allow ssh access only out to
|
|
# CIDR: 0.0.0.0/0
|
|
# 6. deployed VM should be Running, ssh should be allowed into the VM,
|
|
# ping out to google.com from the VM should be successful
|
|
|
|
security_group = SecurityGroup.create(
|
|
self.apiclient,
|
|
self.services["security_group"],
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
self.debug("Created security group with ID: %s" % security_group.id)
|
|
|
|
# Default Security group should not have any ingress rule
|
|
sercurity_groups = SecurityGroup.list(
|
|
self.apiclient,
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
self.assertEqual(
|
|
isinstance(sercurity_groups, list),
|
|
True,
|
|
"Check for list security groups response"
|
|
)
|
|
|
|
self.assertEqual(
|
|
len(sercurity_groups),
|
|
2,
|
|
"Check List Security groups response"
|
|
)
|
|
# Authorize Security group to SSH to VM
|
|
self.debug("Authorizing ingress rule for sec group ID: %s for ssh access"
|
|
% security_group.id)
|
|
ingress_rule = security_group.authorize(
|
|
self.apiclient,
|
|
self.services["security_group"],
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
|
|
self.assertEqual(
|
|
isinstance(ingress_rule, dict),
|
|
True,
|
|
"Check ingress rule created properly"
|
|
)
|
|
|
|
self.virtual_machine = VirtualMachine.create(
|
|
self.apiclient,
|
|
self.services["virtual_machine"],
|
|
accountid=self.account.name,
|
|
domainid=self.account.domainid,
|
|
serviceofferingid=self.service_offering.id,
|
|
securitygroupids=[security_group.id],
|
|
mode=self.services['mode']
|
|
)
|
|
self.debug("Deploying VM in account: %s" % self.account.name)
|
|
|
|
# Authorize Security group to SSH to VM
|
|
self.debug(
|
|
"Authorizing egress rule for sec group ID: %s for ssh access"
|
|
% security_group.id)
|
|
egress_rule = security_group.authorizeEgress(
|
|
self.apiclient,
|
|
self.services["egress_icmp"],
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
|
|
self.assertEqual(
|
|
isinstance(egress_rule, dict),
|
|
True,
|
|
"Check egress rule created properly"
|
|
)
|
|
|
|
# Should be able to SSH VM
|
|
try:
|
|
self.debug("SSH into VM: %s" % self.virtual_machine.ssh_ip)
|
|
ssh = self.virtual_machine.get_ssh_client()
|
|
|
|
self.debug("Ping to google.com from VM")
|
|
# Ping to outsite world
|
|
res = ssh.execute("ping -c 1 www.google.com")
|
|
# res = 64 bytes from maa03s17-in-f20.1e100.net (74.125.236.212):
|
|
# icmp_req=1 ttl=57 time=25.9 ms
|
|
# --- www.l.google.com ping statistics ---
|
|
# 1 packets transmitted, 1 received, 0% packet loss, time 0ms
|
|
# rtt min/avg/max/mdev = 25.970/25.970/25.970/0.000 ms
|
|
self.debug("SSH result: %s" % str(res))
|
|
except Exception as e:
|
|
self.fail("SSH Access failed for %s: %s" % \
|
|
(self.virtual_machine.ipaddress, e)
|
|
)
|
|
|
|
result = str(res)
|
|
self.assertEqual(
|
|
result.count("1 received"),
|
|
1,
|
|
"Ping to outside world from VM should be successful"
|
|
)
|
|
return
|
|
|
|
class TestRevokeEgressRule(cloudstackTestCase):
|
|
|
|
def setUp(self):
|
|
|
|
self.apiclient = self.testClient.getApiClient()
|
|
self.dbclient = self.testClient.getDbConnection()
|
|
self.cleanup = []
|
|
return
|
|
|
|
def tearDown(self):
|
|
try:
|
|
#Clean up, terminate the created templates
|
|
cleanup_resources(self.apiclient, self.cleanup)
|
|
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
return
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.testClient = super(TestRevokeEgressRule, cls).getClsTestClient()
|
|
cls.api_client = cls.testClient.getApiClient()
|
|
|
|
cls.services = Services().services
|
|
# Get Zone, Domain and templates
|
|
cls.domain = get_domain(cls.api_client)
|
|
cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests())
|
|
cls.services['mode'] = cls.zone.networktype
|
|
|
|
template = get_template(
|
|
cls.api_client,
|
|
cls.zone.id,
|
|
cls.services["ostype"]
|
|
)
|
|
cls.services["domainid"] = cls.domain.id
|
|
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
|
|
cls.services["virtual_machine"]["template"] = template.id
|
|
|
|
cls.service_offering = ServiceOffering.create(
|
|
cls.api_client,
|
|
cls.services["service_offering"]
|
|
)
|
|
cls.account = Account.create(
|
|
cls.api_client,
|
|
cls.services["account"],
|
|
domainid=cls.domain.id
|
|
)
|
|
cls.services["account"] = cls.account.name
|
|
cls._cleanup = [
|
|
cls.account,
|
|
cls.service_offering
|
|
]
|
|
return
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
try:
|
|
#Cleanup resources used
|
|
cleanup_resources(cls.api_client, cls._cleanup)
|
|
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
|
|
return
|
|
|
|
@attr(tags = ["sg", "eip", "advancedsg"])
|
|
def test_revoke_egress_rule(self):
|
|
"""Test revoke security group egress rule
|
|
"""
|
|
|
|
|
|
# Validate the following:
|
|
# 1. createaccount of type user
|
|
# 2. createsecuritygroup (ssh) for this account
|
|
# 3. authorizeSecurityGroupIngress to allow ssh access to the VM
|
|
# 4. authorizeSecurityGroupEgress to allow ssh access only out to
|
|
# CIDR: 0.0.0.0/0
|
|
# 5. deployVirtualMachine into this security group (ssh)
|
|
# 6. deployed VM should be Running, ssh should be allowed into the VM,
|
|
# ping out to google.com from the VM should be successful,
|
|
# ssh from within VM to mgt server should pass
|
|
# 7. Revoke egress rule. Verify ping and SSH access to management server
|
|
# is restored
|
|
|
|
security_group = SecurityGroup.create(
|
|
self.apiclient,
|
|
self.services["security_group"],
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
self.debug("Created security group with ID: %s" % security_group.id)
|
|
|
|
# Default Security group should not have any ingress rule
|
|
sercurity_groups = SecurityGroup.list(
|
|
self.apiclient,
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
self.assertEqual(
|
|
isinstance(sercurity_groups, list),
|
|
True,
|
|
"Check for list security groups response"
|
|
)
|
|
|
|
self.assertEqual(
|
|
len(sercurity_groups),
|
|
2,
|
|
"Check List Security groups response"
|
|
)
|
|
# Authorize Security group to SSH to VM
|
|
self.debug(
|
|
"Authorizing ingress rule for sec group ID: %s for ssh access"
|
|
% security_group.id)
|
|
ingress_rule = security_group.authorize(
|
|
self.apiclient,
|
|
self.services["security_group"],
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
|
|
self.assertEqual(
|
|
isinstance(ingress_rule, dict),
|
|
True,
|
|
"Check ingress rule created properly"
|
|
)
|
|
|
|
# Authorize Security group to ping outside world
|
|
self.debug(
|
|
"Authorizing egress rule with ICMP protocol for sec group ID: %s for ssh access"
|
|
% security_group.id)
|
|
egress_rule_icmp = security_group.authorizeEgress(
|
|
self.apiclient,
|
|
self.services["egress_icmp"],
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
|
|
self.assertEqual(
|
|
isinstance(egress_rule_icmp, dict),
|
|
True,
|
|
"Check egress rule created properly"
|
|
)
|
|
ssh_egress_rule_icmp = (egress_rule_icmp["egressrule"][0]).__dict__
|
|
|
|
# Authorize Security group to SSH to other VM
|
|
self.debug(
|
|
"Authorizing egress rule with TCP protocol for sec group ID: %s for ssh access"
|
|
% security_group.id)
|
|
egress_rule_tcp = security_group.authorizeEgress(
|
|
self.apiclient,
|
|
self.services["security_group"],
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
|
|
self.assertEqual(
|
|
isinstance(egress_rule_tcp, dict),
|
|
True,
|
|
"Check egress rule created properly"
|
|
)
|
|
ssh_egress_rule_tcp = (egress_rule_tcp["egressrule"][0]).__dict__
|
|
|
|
self.virtual_machine = VirtualMachine.create(
|
|
self.apiclient,
|
|
self.services["virtual_machine"],
|
|
accountid=self.account.name,
|
|
domainid=self.account.domainid,
|
|
serviceofferingid=self.service_offering.id,
|
|
securitygroupids=[security_group.id],
|
|
mode=self.services['mode']
|
|
)
|
|
self.debug("Deploying VM in account: %s" % self.account.name)
|
|
|
|
# Should be able to SSH VM
|
|
try:
|
|
self.debug("SSH into VM: %s" % self.virtual_machine.ssh_ip)
|
|
ssh = self.virtual_machine.get_ssh_client()
|
|
|
|
self.debug("Ping to google.com from VM")
|
|
# Ping to outsite world
|
|
res = ssh.execute("ping -c 1 www.google.com")
|
|
# res = 64 bytes from maa03s17-in-f20.1e100.net (74.125.236.212):
|
|
# icmp_req=1 ttl=57 time=25.9 ms
|
|
# --- www.l.google.com ping statistics ---
|
|
# 1 packets transmitted, 1 received, 0% packet loss, time 0ms
|
|
# rtt min/avg/max/mdev = 25.970/25.970/25.970/0.000 ms
|
|
self.debug("SSH result: %s" % str(res))
|
|
except Exception as e:
|
|
self.fail("SSH Access failed for %s: %s" % \
|
|
(self.virtual_machine.ipaddress, e)
|
|
)
|
|
|
|
result = str(res)
|
|
self.assertEqual(
|
|
result.count("1 received"),
|
|
1,
|
|
"Ping to outside world from VM should be successful"
|
|
)
|
|
|
|
try:
|
|
self.debug("SSHing into management server from VM")
|
|
res = ssh.execute("ssh %s@%s" % (
|
|
self.services["mgmt_server"]["username"],
|
|
self.apiclient.connection.mgtSvr
|
|
))
|
|
self.debug("SSH result: %s" % str(res))
|
|
|
|
except Exception as e:
|
|
self.fail("SSH Access failed for %s: %s" % \
|
|
(self.virtual_machine.ipaddress, e)
|
|
)
|
|
result = str(res)
|
|
self.assertNotEqual(
|
|
result.count("No route to host"),
|
|
1,
|
|
"SSH into management server from VM should be successful"
|
|
)
|
|
|
|
self.debug(
|
|
"Revoke Egress Rules for Security Group %s for account: %s" \
|
|
% (
|
|
security_group.id,
|
|
self.account.name
|
|
))
|
|
|
|
result = security_group.revokeEgress(
|
|
self.apiclient,
|
|
id=ssh_egress_rule_icmp["ruleid"]
|
|
)
|
|
self.debug("Revoked egress rule result: %s" % result)
|
|
|
|
result = security_group.revokeEgress(
|
|
self.apiclient,
|
|
id=ssh_egress_rule_tcp["ruleid"]
|
|
)
|
|
self.debug("Revoked egress rule result: %s" % result)
|
|
|
|
# Should be able to SSH VM
|
|
try:
|
|
self.debug("SSH into VM: %s" % self.virtual_machine.ssh_ip)
|
|
ssh = self.virtual_machine.get_ssh_client(reconnect=True)
|
|
|
|
self.debug("Ping to google.com from VM")
|
|
# Ping to outsite world
|
|
res = ssh.execute("ping -c 1 www.google.com")
|
|
# res = 64 bytes from maa03s17-in-f20.1e100.net (74.125.236.212):
|
|
# icmp_req=1 ttl=57 time=25.9 ms
|
|
# --- www.l.google.com ping statistics ---
|
|
# 1 packets transmitted, 1 received, 0% packet loss, time 0ms
|
|
# rtt min/avg/max/mdev = 25.970/25.970/25.970/0.000 ms
|
|
except Exception as e:
|
|
self.fail("SSH Access failed for %s: %s" % \
|
|
(self.virtual_machine.ipaddress, e)
|
|
)
|
|
|
|
result = str(res)
|
|
self.assertEqual(
|
|
result.count("1 received"),
|
|
1,
|
|
"Ping to outside world from VM should be successful"
|
|
)
|
|
|
|
try:
|
|
self.debug("SSHing into management server from VM")
|
|
res = ssh.execute("ssh %s@%s" % (
|
|
self.services["mgmt_server"]["username"],
|
|
self.apiclient.connection.mgtSvr
|
|
))
|
|
self.debug("SSH result: %s" % str(res))
|
|
|
|
except Exception as e:
|
|
self.fail("SSH Access failed for %s: %s" % \
|
|
(self.virtual_machine.ipaddress, e)
|
|
)
|
|
result = str(res)
|
|
self.assertNotEqual(
|
|
result.count("No route to host"),
|
|
1,
|
|
"SSH into management server from VM should be successful"
|
|
)
|
|
return
|
|
|
|
class TestInvalidAccountAuthroize(cloudstackTestCase):
|
|
|
|
def setUp(self):
|
|
|
|
self.apiclient = self.testClient.getApiClient()
|
|
self.dbclient = self.testClient.getDbConnection()
|
|
self.cleanup = []
|
|
return
|
|
|
|
def tearDown(self):
|
|
try:
|
|
#Clean up, terminate the created templates
|
|
cleanup_resources(self.apiclient, self.cleanup)
|
|
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
return
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.testClient = super(TestInvalidAccountAuthroize, cls).getClsTestClient()
|
|
cls.api_client = cls.testClient.getApiClient()
|
|
|
|
cls.services = Services().services
|
|
# Get Zone, Domain and templates
|
|
cls.domain = get_domain(cls.api_client)
|
|
cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests())
|
|
cls.services['mode'] = cls.zone.networktype
|
|
|
|
template = get_template(
|
|
cls.api_client,
|
|
cls.zone.id,
|
|
cls.services["ostype"]
|
|
)
|
|
cls.services["domainid"] = cls.domain.id
|
|
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
|
|
cls.services["virtual_machine"]["template"] = template.id
|
|
|
|
cls.service_offering = ServiceOffering.create(
|
|
cls.api_client,
|
|
cls.services["service_offering"]
|
|
)
|
|
cls.account = Account.create(
|
|
cls.api_client,
|
|
cls.services["account"],
|
|
domainid=cls.domain.id
|
|
)
|
|
cls.services["account"] = cls.account.name
|
|
cls._cleanup = [
|
|
cls.account,
|
|
cls.service_offering
|
|
]
|
|
return
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
try:
|
|
#Cleanup resources used
|
|
cleanup_resources(cls.api_client, cls._cleanup)
|
|
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
|
|
return
|
|
|
|
@attr(tags = ["sg", "eip", "advancedsg"])
|
|
def test_invalid_account_authroize(self):
|
|
"""Test invalid account authroize
|
|
"""
|
|
|
|
|
|
# Validate the following:
|
|
# 1. createaccount of type user
|
|
# 2. createsecuritygroup (ssh) for this account
|
|
# 3. authorizeSecurityGroupEgress to allow ssh access only out to
|
|
# non-existent random account and default security group
|
|
# 4. listSecurityGroups should show ssh and default security groups
|
|
# 5. authorizeSecurityGroupEgress API should fail since there is no
|
|
# account
|
|
|
|
security_group = SecurityGroup.create(
|
|
self.apiclient,
|
|
self.services["security_group"],
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
self.debug("Created security group with ID: %s" % security_group.id)
|
|
|
|
# Default Security group should not have any ingress rule
|
|
sercurity_groups = SecurityGroup.list(
|
|
self.apiclient,
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
self.assertEqual(
|
|
isinstance(sercurity_groups, list),
|
|
True,
|
|
"Check for list security groups response"
|
|
)
|
|
|
|
self.assertEqual(
|
|
len(sercurity_groups),
|
|
2,
|
|
"Check List Security groups response"
|
|
)
|
|
|
|
# Authorize Security group to SSH to VM
|
|
self.debug(
|
|
"Authorizing egress rule for sec group ID: %s for ssh access"
|
|
% security_group.id)
|
|
with self.assertRaises(Exception):
|
|
security_group.authorizeEgress(
|
|
self.apiclient,
|
|
self.services["security_group"],
|
|
account=random_gen(),
|
|
domainid=self.account.domainid
|
|
)
|
|
return
|
|
|
|
class TestMultipleAccountsEgressRuleNeg(cloudstackTestCase):
|
|
|
|
def setUp(self):
|
|
|
|
self.apiclient = self.testClient.getApiClient()
|
|
self.dbclient = self.testClient.getDbConnection()
|
|
self.cleanup = []
|
|
return
|
|
|
|
def tearDown(self):
|
|
try:
|
|
#Clean up, terminate the created templates
|
|
cleanup_resources(self.apiclient, self.cleanup)
|
|
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
return
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.testClient = super(TestMultipleAccountsEgressRuleNeg, cls).getClsTestClient()
|
|
cls.api_client = cls.testClient.getApiClient()
|
|
|
|
cls.services = Services().services
|
|
# Get Zone, Domain and templates
|
|
cls.domain = get_domain(cls.api_client)
|
|
cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests())
|
|
cls.services['mode'] = cls.zone.networktype
|
|
|
|
template = get_template(
|
|
cls.api_client,
|
|
cls.zone.id,
|
|
cls.services["ostype"]
|
|
)
|
|
cls.services["domainid"] = cls.domain.id
|
|
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
|
|
cls.services["virtual_machine"]["template"] = template.id
|
|
|
|
cls.service_offering = ServiceOffering.create(
|
|
cls.api_client,
|
|
cls.services["service_offering"]
|
|
)
|
|
cls.accountA = Account.create(
|
|
cls.api_client,
|
|
cls.services["account"],
|
|
domainid=cls.domain.id
|
|
)
|
|
cls.accountB = Account.create(
|
|
cls.api_client,
|
|
cls.services["account"],
|
|
domainid=cls.domain.id
|
|
)
|
|
cls.services["account"] = cls.accountA.name
|
|
cls._cleanup = [
|
|
cls.accountA,
|
|
cls.accountB,
|
|
cls.service_offering
|
|
]
|
|
return
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
try:
|
|
#Cleanup resources used
|
|
cleanup_resources(cls.api_client, cls._cleanup)
|
|
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
|
|
return
|
|
|
|
@attr(tags = ["sg", "eip", "advancedsg"])
|
|
def test_multiple_account_egress_rule_negative(self):
|
|
"""Test multiple account egress rules negative case
|
|
"""
|
|
|
|
|
|
# Validate the following:
|
|
# 1. createaccount of type user A
|
|
# 2. createaccount of type user B
|
|
# 3. createsecuritygroup (SSH-A) for account A
|
|
# 4. authorizeSecurityGroupEgress in account A to allow ssh access
|
|
# only out to VMs in account B's default security group
|
|
# 5. authorizeSecurityGroupIngress in account A to allow ssh incoming
|
|
# access from anywhere into Vm's of account A. listSecurityGroups
|
|
# for account A should show two groups (default and ssh-a) and ssh
|
|
# ingress rule and account based egress rule
|
|
# 6. deployVM in account A into security group SSH-A. deployed VM
|
|
# should be Running
|
|
# 7. deployVM in account B. deployed VM should be Running
|
|
# 8. ssh into VM in account A and from there ssh to VM in account B.
|
|
# ssh should fail
|
|
|
|
security_group = SecurityGroup.create(
|
|
self.apiclient,
|
|
self.services["security_group"],
|
|
account=self.accountA.name,
|
|
domainid=self.accountA.domainid
|
|
)
|
|
self.debug("Created security group with ID: %s" % security_group.id)
|
|
|
|
# Default Security group should not have any ingress rule
|
|
sercurity_groups = SecurityGroup.list(
|
|
self.apiclient,
|
|
account=self.accountA.name,
|
|
domainid=self.accountA.domainid
|
|
)
|
|
self.assertEqual(
|
|
isinstance(sercurity_groups, list),
|
|
True,
|
|
"Check for list security groups response"
|
|
)
|
|
|
|
self.assertEqual(
|
|
len(sercurity_groups),
|
|
2,
|
|
"Check List Security groups response"
|
|
)
|
|
# Authorize Security group to SSH to VM
|
|
self.debug(
|
|
"Authorizing egress rule for sec group ID: %s for ssh access"
|
|
% security_group.id)
|
|
# Authorize to only account not CIDR
|
|
user_secgrp_list = {self.accountB.name: 'default'}
|
|
|
|
egress_rule = security_group.authorizeEgress(
|
|
self.apiclient,
|
|
self.services["sg_account"],
|
|
account=self.accountA.name,
|
|
domainid=self.accountA.domainid,
|
|
user_secgrp_list=user_secgrp_list
|
|
)
|
|
|
|
self.assertEqual(
|
|
isinstance(egress_rule, dict),
|
|
True,
|
|
"Check egress rule created properly"
|
|
)
|
|
|
|
|
|
# Authorize Security group to SSH to VM
|
|
self.debug(
|
|
"Authorizing ingress rule for sec group ID: %s for ssh access"
|
|
% security_group.id)
|
|
ingress_rule = security_group.authorize(
|
|
self.apiclient,
|
|
self.services["security_group"],
|
|
account=self.accountA.name,
|
|
domainid=self.accountA.domainid
|
|
)
|
|
|
|
self.assertEqual(
|
|
isinstance(ingress_rule, dict),
|
|
True,
|
|
"Check ingress rule created properly"
|
|
)
|
|
|
|
|
|
|
|
self.virtual_machineA = VirtualMachine.create(
|
|
self.apiclient,
|
|
self.services["virtual_machine"],
|
|
accountid=self.accountA.name,
|
|
domainid=self.accountA.domainid,
|
|
serviceofferingid=self.service_offering.id,
|
|
securitygroupids=[security_group.id],
|
|
mode=self.services['mode']
|
|
)
|
|
self.cleanup.append(self.virtual_machineA)
|
|
self.debug("Deploying VM in account: %s" % self.accountA.name)
|
|
vms = VirtualMachine.list(
|
|
self.apiclient,
|
|
id=self.virtual_machineA.id,
|
|
listall=True
|
|
)
|
|
self.assertEqual(
|
|
isinstance(vms, list),
|
|
True,
|
|
"List VM should return a valid list"
|
|
)
|
|
vm = vms[0]
|
|
self.assertEqual(
|
|
vm.state,
|
|
"Running",
|
|
"VM state after deployment should be running"
|
|
)
|
|
self.debug("VM: %s state: %s" % (vm.id, vm.state))
|
|
|
|
self.virtual_machineB = VirtualMachine.create(
|
|
self.apiclient,
|
|
self.services["virtual_machine"],
|
|
accountid=self.accountB.name,
|
|
domainid=self.accountB.domainid,
|
|
serviceofferingid=self.service_offering.id
|
|
)
|
|
self.cleanup.append(self.virtual_machineB)
|
|
self.debug("Deploying VM in account: %s" % self.accountB.name)
|
|
|
|
vms = VirtualMachine.list(
|
|
self.apiclient,
|
|
id=self.virtual_machineB.id,
|
|
listall=True
|
|
)
|
|
self.assertEqual(
|
|
isinstance(vms, list),
|
|
True,
|
|
"List VM should return a valid list"
|
|
)
|
|
vm = vms[0]
|
|
self.assertEqual(
|
|
vm.state,
|
|
"Running",
|
|
"VM state after deployment should be running"
|
|
)
|
|
self.debug("VM: %s state: %s" % (vm.id, vm.state))
|
|
|
|
# Should be able to SSH VM
|
|
try:
|
|
self.debug("SSH into VM: %s" % self.virtual_machineA.ssh_ip)
|
|
ssh = self.virtual_machineA.get_ssh_client()
|
|
except Exception as e:
|
|
self.fail("SSH Access failed for %s: %s" % \
|
|
(self.virtual_machineA.ipaddress, e)
|
|
)
|
|
|
|
try:
|
|
self.debug("SSHing into VM type B from VM A")
|
|
self.debug("VM IP: %s" % self.virtual_machineB.ssh_ip)
|
|
res = ssh.execute("ssh -o 'BatchMode=yes' %s" % (
|
|
self.virtual_machineB.ssh_ip
|
|
))
|
|
self.debug("SSH result: %s" % str(res))
|
|
|
|
except Exception as e:
|
|
self.fail("SSH Access failed for %s: %s" % \
|
|
(self.virtual_machineA.ipaddress, e)
|
|
)
|
|
|
|
# SSH failure may result in one of the following three error messages
|
|
ssh_failure_result_set = ["ssh: connect to host %s port 22: No route to host" % self.virtual_machineB.ssh_ip,
|
|
"ssh: connect to host %s port 22: Connection timed out" % self.virtual_machineB.ssh_ip,
|
|
"Host key verification failed."]
|
|
|
|
self.assertFalse(set(res).isdisjoint(ssh_failure_result_set),
|
|
"SSH into VM of other account should not be successful"
|
|
)
|
|
return
|
|
|
|
class TestMultipleAccountsEgressRule(cloudstackTestCase):
|
|
|
|
def setUp(self):
|
|
|
|
self.apiclient = self.testClient.getApiClient()
|
|
self.dbclient = self.testClient.getDbConnection()
|
|
self.cleanup = []
|
|
return
|
|
|
|
def tearDown(self):
|
|
try:
|
|
#Clean up, terminate the created templates
|
|
cleanup_resources(self.apiclient, self.cleanup)
|
|
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
return
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.testClient = super(TestMultipleAccountsEgressRule, cls).getClsTestClient()
|
|
cls.api_client = cls.testClient.getApiClient()
|
|
|
|
cls.services = Services().services
|
|
# Get Zone, Domain and templates
|
|
cls.domain = get_domain(cls.api_client)
|
|
cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests())
|
|
cls.services['mode'] = cls.zone.networktype
|
|
|
|
template = get_template(
|
|
cls.api_client,
|
|
cls.zone.id,
|
|
cls.services["ostype"]
|
|
)
|
|
cls.services["domainid"] = cls.domain.id
|
|
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
|
|
cls.services["virtual_machine"]["template"] = template.id
|
|
|
|
cls.service_offering = ServiceOffering.create(
|
|
cls.api_client,
|
|
cls.services["service_offering"]
|
|
)
|
|
cls.accountA = Account.create(
|
|
cls.api_client,
|
|
cls.services["account"],
|
|
domainid=cls.domain.id
|
|
)
|
|
cls.accountB = Account.create(
|
|
cls.api_client,
|
|
cls.services["account"],
|
|
domainid=cls.domain.id
|
|
)
|
|
cls.services["account"] = cls.accountA.name
|
|
cls._cleanup = [
|
|
cls.accountA,
|
|
cls.accountB,
|
|
cls.service_offering
|
|
]
|
|
return
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
try:
|
|
#Cleanup resources used
|
|
cleanup_resources(cls.api_client, cls._cleanup)
|
|
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
|
|
return
|
|
|
|
@attr(tags = ["sg", "eip", "advancedsg"])
|
|
def test_multiple_account_egress_rule_positive(self):
|
|
"""Test multiple account egress rules positive case
|
|
"""
|
|
|
|
|
|
# Validate the following:
|
|
# 1. createaccount of type user A
|
|
# 2. createaccount of type user B
|
|
# 3. createsecuritygroup (SSH-A) for account A
|
|
# 4. authorizeSecurityGroupEgress in account A to allow ssh access
|
|
# only out to VMs in account B's default security group
|
|
# 5. authorizeSecurityGroupIngress in account A to allow ssh incoming
|
|
# access from anywhere into Vm's of account A. listSecurityGroups
|
|
# for account A should show two groups (default and ssh-a) and ssh
|
|
# ingress rule and account based egress rule
|
|
# 6. deployVM in account A into security group SSH-A. deployed VM
|
|
# should be Running
|
|
# 7. deployVM in account B. deployed VM should be Running
|
|
# 8. ssh into VM in account A and from there ssh to VM in account B.
|
|
# ssh should fail
|
|
|
|
security_groupA = SecurityGroup.create(
|
|
self.apiclient,
|
|
self.services["security_group"],
|
|
account=self.accountA.name,
|
|
domainid=self.accountA.domainid
|
|
)
|
|
self.debug("Created security group with ID: %s" % security_groupA.id)
|
|
|
|
# Default Security group should not have any ingress rule
|
|
sercurity_groups = SecurityGroup.list(
|
|
self.apiclient,
|
|
account=self.accountA.name,
|
|
domainid=self.accountA.domainid
|
|
)
|
|
self.assertEqual(
|
|
isinstance(sercurity_groups, list),
|
|
True,
|
|
"Check for list security groups response"
|
|
)
|
|
|
|
self.assertEqual(
|
|
len(sercurity_groups),
|
|
2,
|
|
"Check List Security groups response"
|
|
)
|
|
|
|
security_groupB = SecurityGroup.create(
|
|
self.apiclient,
|
|
self.services["security_group"],
|
|
account=self.accountB.name,
|
|
domainid=self.accountB.domainid
|
|
)
|
|
self.debug("Created security group with ID: %s" % security_groupB.id)
|
|
|
|
# Default Security group should not have any ingress rule
|
|
sercurity_groups = SecurityGroup.list(
|
|
self.apiclient,
|
|
account=self.accountB.name,
|
|
domainid=self.accountB.domainid
|
|
)
|
|
self.assertEqual(
|
|
isinstance(sercurity_groups, list),
|
|
True,
|
|
"Check for list security groups response"
|
|
)
|
|
|
|
self.assertEqual(
|
|
len(sercurity_groups),
|
|
2,
|
|
"Check List Security groups response"
|
|
)
|
|
|
|
# Authorize Security group to SSH to VM
|
|
self.debug(
|
|
"Authorizing egress rule for sec group ID: %s for ssh access"
|
|
% security_groupA.id)
|
|
# Authorize to only account not CIDR
|
|
user_secgrp_list = {self.accountB.name: security_groupB.name}
|
|
|
|
egress_rule = security_groupA.authorizeEgress(
|
|
self.apiclient,
|
|
self.services["sg_account"],
|
|
account=self.accountA.name,
|
|
domainid=self.accountA.domainid,
|
|
user_secgrp_list=user_secgrp_list
|
|
)
|
|
|
|
self.assertEqual(
|
|
isinstance(egress_rule, dict),
|
|
True,
|
|
"Check egress rule created properly"
|
|
)
|
|
|
|
# Authorize Security group to SSH to VM
|
|
self.debug(
|
|
"Authorizing ingress rule for sec group ID: %s for ssh access"
|
|
% security_groupA.id)
|
|
ingress_ruleA = security_groupA.authorize(
|
|
self.apiclient,
|
|
self.services["security_group"],
|
|
account=self.accountA.name,
|
|
domainid=self.accountA.domainid
|
|
)
|
|
|
|
self.assertEqual(
|
|
isinstance(ingress_ruleA, dict),
|
|
True,
|
|
"Check ingress rule created properly"
|
|
)
|
|
|
|
self.virtual_machineA = VirtualMachine.create(
|
|
self.apiclient,
|
|
self.services["virtual_machine"],
|
|
accountid=self.accountA.name,
|
|
domainid=self.accountA.domainid,
|
|
serviceofferingid=self.service_offering.id,
|
|
securitygroupids=[security_groupA.id],
|
|
mode=self.services['mode']
|
|
)
|
|
self.cleanup.append(self.virtual_machineA)
|
|
self.debug("Deploying VM in account: %s" % self.accountA.name)
|
|
|
|
vms = VirtualMachine.list(
|
|
self.apiclient,
|
|
id=self.virtual_machineA.id,
|
|
listall=True
|
|
)
|
|
self.assertEqual(
|
|
isinstance(vms, list),
|
|
True,
|
|
"List VM should return a valid list"
|
|
)
|
|
vm = vms[0]
|
|
self.assertEqual(
|
|
vm.state,
|
|
"Running",
|
|
"VM state after deployment should be running"
|
|
)
|
|
self.debug("VM: %s state: %s" % (vm.id, vm.state))
|
|
|
|
# Authorize Security group to SSH to VM
|
|
self.debug(
|
|
"Authorizing ingress rule for sec group ID: %s for ssh access"
|
|
% security_groupB.id)
|
|
ingress_ruleB = security_groupB.authorize(
|
|
self.apiclient,
|
|
self.services["security_group"],
|
|
account=self.accountB.name,
|
|
domainid=self.accountB.domainid
|
|
)
|
|
|
|
self.assertEqual(
|
|
isinstance(ingress_ruleB, dict),
|
|
True,
|
|
"Check ingress rule created properly"
|
|
)
|
|
|
|
self.virtual_machineB = VirtualMachine.create(
|
|
self.apiclient,
|
|
self.services["virtual_machine"],
|
|
accountid=self.accountB.name,
|
|
domainid=self.accountB.domainid,
|
|
serviceofferingid=self.service_offering.id,
|
|
securitygroupids=[security_groupB.id]
|
|
)
|
|
self.cleanup.append(self.virtual_machineB)
|
|
self.debug("Deploying VM in account: %s" % self.accountB.name)
|
|
|
|
vms = VirtualMachine.list(
|
|
self.apiclient,
|
|
id=self.virtual_machineB.id,
|
|
listall=True
|
|
)
|
|
self.assertEqual(
|
|
isinstance(vms, list),
|
|
True,
|
|
"List VM should return a valid list"
|
|
)
|
|
vm = vms[0]
|
|
self.assertEqual(
|
|
vm.state,
|
|
"Running",
|
|
"VM state after deployment should be running"
|
|
)
|
|
self.debug("VM: %s state: %s" % (vm.id, vm.state))
|
|
|
|
# Should be able to SSH VM
|
|
try:
|
|
self.debug("SSH into VM: %s" % self.virtual_machineA.ssh_ip)
|
|
ssh = self.virtual_machineA.get_ssh_client()
|
|
except Exception as e:
|
|
self.fail("SSH Access failed for %s: %s" % \
|
|
(self.virtual_machineA.ipaddress, e)
|
|
)
|
|
|
|
try:
|
|
self.debug("SSHing into VB type B from VM A")
|
|
self.debug("VM IP: %s" % self.virtual_machineB.ssh_ip)
|
|
|
|
res = ssh.execute("ssh %s@%s" % (
|
|
self.services["virtual_machine"]["username"],
|
|
self.virtual_machineB.ssh_ip
|
|
))
|
|
self.debug("SSH result: %s" % str(res))
|
|
|
|
except Exception as e:
|
|
self.fail("SSH Access failed for %s: %s" % \
|
|
(self.virtual_machineA.ipaddress, e)
|
|
)
|
|
result = str(res)
|
|
self.assertNotEqual(
|
|
result.count("Connection timed out"),
|
|
1,
|
|
"SSH into management server from VM should be successful"
|
|
)
|
|
return
|
|
|
|
class TestStartStopVMWithEgressRule(cloudstackTestCase):
|
|
|
|
def setUp(self):
|
|
|
|
self.apiclient = self.testClient.getApiClient()
|
|
self.dbclient = self.testClient.getDbConnection()
|
|
self.cleanup = []
|
|
return
|
|
|
|
def tearDown(self):
|
|
try:
|
|
#Clean up, terminate the created templates
|
|
cleanup_resources(self.apiclient, self.cleanup)
|
|
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
return
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.testClient = super(TestStartStopVMWithEgressRule, cls).getClsTestClient()
|
|
cls.api_client = cls.testClient.getApiClient()
|
|
|
|
cls.services = Services().services
|
|
# Get Zone, Domain and templates
|
|
cls.domain = get_domain(cls.api_client)
|
|
cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests())
|
|
cls.services['mode'] = cls.zone.networktype
|
|
|
|
template = get_template(
|
|
cls.api_client,
|
|
cls.zone.id,
|
|
cls.services["ostype"]
|
|
)
|
|
cls.services["domainid"] = cls.domain.id
|
|
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
|
|
cls.services["virtual_machine"]["template"] = template.id
|
|
|
|
cls.service_offering = ServiceOffering.create(
|
|
cls.api_client,
|
|
cls.services["service_offering"]
|
|
)
|
|
cls.account = Account.create(
|
|
cls.api_client,
|
|
cls.services["account"],
|
|
domainid=cls.domain.id
|
|
)
|
|
cls.services["account"] = cls.account.name
|
|
cls._cleanup = [
|
|
cls.account,
|
|
cls.service_offering
|
|
]
|
|
return
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
try:
|
|
#Cleanup resources used
|
|
cleanup_resources(cls.api_client, cls._cleanup)
|
|
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
|
|
return
|
|
|
|
@attr(tags = ["sg", "eip", "advancedsg"])
|
|
def test_start_stop_vm_egress(self):
|
|
""" Test stop start Vm with egress rules
|
|
"""
|
|
|
|
|
|
# Validate the following:
|
|
# 1. createaccount of type user
|
|
# 2. createsecuritygroup (ssh) for this account
|
|
# 3. authorizeSecurityGroupIngress to allow ssh access to the VM
|
|
# 4. authorizeSecurityGroupEgress to allow ssh access only out to
|
|
# CIDR: 0.0.0.0/0
|
|
# 5. deployVirtualMachine into this security group (ssh)
|
|
# 6. stopVirtualMachine
|
|
# 7. startVirtualMachine
|
|
# 8. ssh in to VM
|
|
|
|
security_group = SecurityGroup.create(
|
|
self.apiclient,
|
|
self.services["security_group"],
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
self.debug("Created security group with ID: %s" % security_group.id)
|
|
|
|
# Default Security group should not have any ingress rule
|
|
sercurity_groups = SecurityGroup.list(
|
|
self.apiclient,
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
self.assertEqual(
|
|
isinstance(sercurity_groups, list),
|
|
True,
|
|
"Check for list security groups response"
|
|
)
|
|
|
|
self.assertEqual(
|
|
len(sercurity_groups),
|
|
2,
|
|
"Check List Security groups response"
|
|
)
|
|
# Authorize Security group to SSH to VM
|
|
self.debug(
|
|
"Authorizing ingress rule for sec group ID: %s for ssh access"
|
|
% security_group.id)
|
|
ingress_rule = security_group.authorize(
|
|
self.apiclient,
|
|
self.services["security_group"],
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
|
|
self.assertEqual(
|
|
isinstance(ingress_rule, dict),
|
|
True,
|
|
"Check ingress rule created properly"
|
|
)
|
|
|
|
self.virtual_machine = VirtualMachine.create(
|
|
self.apiclient,
|
|
self.services["virtual_machine"],
|
|
accountid=self.account.name,
|
|
domainid=self.account.domainid,
|
|
serviceofferingid=self.service_offering.id,
|
|
securitygroupids=[security_group.id],
|
|
mode=self.services['mode']
|
|
)
|
|
self.debug("Deploying VM in account: %s" % self.account.name)
|
|
|
|
# Authorize Security group to SSH to VM
|
|
self.debug(
|
|
"Authorizing egress rule for sec group ID: %s for ssh access"
|
|
% security_group.id)
|
|
egress_rule = security_group.authorizeEgress(
|
|
self.apiclient,
|
|
self.services["security_group"],
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
|
|
self.assertEqual(
|
|
isinstance(egress_rule, dict),
|
|
True,
|
|
"Check egress rule created properly"
|
|
)
|
|
|
|
try:
|
|
# Stop virtual machine
|
|
self.virtual_machine.stop(self.apiclient)
|
|
except Exception as e:
|
|
self.fail("Failed to stop instance: %s" % e)
|
|
|
|
# Start virtual machine
|
|
self.debug("Starting virtual machine: %s" % self.virtual_machine.id)
|
|
self.virtual_machine.start(self.apiclient)
|
|
|
|
vms = VirtualMachine.list(
|
|
self.apiclient,
|
|
id=self.virtual_machine.id,
|
|
listall=True
|
|
)
|
|
self.assertEqual(
|
|
isinstance(vms, list),
|
|
True,
|
|
"List VM should return a valid list"
|
|
)
|
|
vm = vms[0]
|
|
self.assertEqual(
|
|
vm.state,
|
|
"Running",
|
|
"VM state should be stopped"
|
|
)
|
|
self.debug("VM: %s state: %s" % (vm.id, vm.state))
|
|
|
|
# Should be able to SSH VM
|
|
try:
|
|
self.debug("SSH into VM: %s" % self.virtual_machine.ssh_ip)
|
|
self.virtual_machine.get_ssh_client()
|
|
except Exception as e:
|
|
self.fail("SSH Access failed for %s: %s" % \
|
|
(self.virtual_machine.ipaddress, e)
|
|
)
|
|
return
|
|
|
|
class TestInvalidParametersForEgress(cloudstackTestCase):
|
|
|
|
def setUp(self):
|
|
|
|
self.apiclient = self.testClient.getApiClient()
|
|
self.dbclient = self.testClient.getDbConnection()
|
|
self.cleanup = []
|
|
return
|
|
|
|
def tearDown(self):
|
|
try:
|
|
#Clean up, terminate the created templates
|
|
cleanup_resources(self.apiclient, self.cleanup)
|
|
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
return
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.testClient = super(TestInvalidParametersForEgress, cls).getClsTestClient()
|
|
cls.api_client = cls.testClient.getApiClient()
|
|
|
|
cls.services = Services().services
|
|
# Get Zone, Domain and templates
|
|
cls.domain = get_domain(cls.api_client)
|
|
cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests())
|
|
cls.services['mode'] = cls.zone.networktype
|
|
|
|
template = get_template(
|
|
cls.api_client,
|
|
cls.zone.id,
|
|
cls.services["ostype"]
|
|
)
|
|
cls.services["domainid"] = cls.domain.id
|
|
cls.services["virtual_machine"]["zoneid"] = cls.zone.id
|
|
cls.services["virtual_machine"]["template"] = template.id
|
|
|
|
cls.service_offering = ServiceOffering.create(
|
|
cls.api_client,
|
|
cls.services["service_offering"]
|
|
)
|
|
cls.account = Account.create(
|
|
cls.api_client,
|
|
cls.services["account"],
|
|
domainid=cls.domain.id
|
|
)
|
|
cls.services["account"] = cls.account.name
|
|
cls._cleanup = [
|
|
cls.account,
|
|
cls.service_offering
|
|
]
|
|
return
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
try:
|
|
#Cleanup resources used
|
|
cleanup_resources(cls.api_client, cls._cleanup)
|
|
|
|
except Exception as e:
|
|
raise Exception("Warning: Exception during cleanup : %s" % e)
|
|
|
|
return
|
|
|
|
@attr(tags = ["sg", "eip", "advancedsg"])
|
|
def test_invalid_parameters(self):
|
|
""" Test invalid parameters for egress rules
|
|
"""
|
|
|
|
|
|
# Validate the following:
|
|
# 1. createUserAccount
|
|
# 2. createSecurityGroup (test)
|
|
# 3. authorizeEgressRule (negative port) - Should fail
|
|
# 4. authorizeEgressRule (invalid CIDR) - Should fail
|
|
# 5. authorizeEgressRule (invalid account) - Should fail
|
|
# 6. authorizeEgressRule (22, cidr: anywhere) and
|
|
# authorizeEgressRule (22, cidr: restricted) - Should pass
|
|
# 7. authorizeEgressRule (21, cidr : 10.1.1.0/24) and
|
|
# authorizeEgressRule (21, cidr: 10.1.1.0/24) - Should fail
|
|
|
|
security_group = SecurityGroup.create(
|
|
self.apiclient,
|
|
self.services["security_group"],
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
self.debug("Created security group with ID: %s" % security_group.id)
|
|
|
|
# Default Security group should not have any ingress rule
|
|
sercurity_groups = SecurityGroup.list(
|
|
self.apiclient,
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
self.assertEqual(
|
|
isinstance(sercurity_groups, list),
|
|
True,
|
|
"Check for list security groups response"
|
|
)
|
|
|
|
self.assertEqual(
|
|
len(sercurity_groups),
|
|
2,
|
|
"Check List Security groups response"
|
|
)
|
|
|
|
# Authorize Security group to SSH to VM
|
|
self.debug(
|
|
"Authorizing egress rule for sec group ID: %s with invalid port"
|
|
% security_group.id)
|
|
with self.assertRaises(Exception):
|
|
security_group.authorizeEgress(
|
|
self.apiclient,
|
|
self.services["sg_invalid_port"],
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
self.debug(
|
|
"Authorizing egress rule for sec group ID: %s with invalid cidr"
|
|
% security_group.id)
|
|
with self.assertRaises(Exception):
|
|
security_group.authorizeEgress(
|
|
self.apiclient,
|
|
self.services["sg_invalid_cidr"],
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
self.debug(
|
|
"Authorizing egress rule for sec group ID: %s with invalid account"
|
|
% security_group.id)
|
|
with self.assertRaises(Exception):
|
|
security_group.authorizeEgress(
|
|
self.apiclient,
|
|
self.services["security_group"],
|
|
account=random_gen(),
|
|
domainid=self.account.domainid
|
|
)
|
|
self.debug(
|
|
"Authorizing egress rule for sec group ID: %s with cidr: anywhere and port: 22"
|
|
% security_group.id)
|
|
egress_rule_A = security_group.authorizeEgress(
|
|
self.apiclient,
|
|
self.services["sg_cidr_anywhere"],
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
|
|
self.assertEqual(
|
|
isinstance(egress_rule_A, dict),
|
|
True,
|
|
"Check egress rule created properly"
|
|
)
|
|
|
|
egress_rule_R = security_group.authorizeEgress(
|
|
self.apiclient,
|
|
self.services["sg_cidr_restricted"],
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
|
|
self.assertEqual(
|
|
isinstance(egress_rule_R, dict),
|
|
True,
|
|
"Check egress rule created properly"
|
|
)
|
|
|
|
self.debug(
|
|
"Authorizing egress rule for sec group ID: %s with duplicate port"
|
|
% security_group.id)
|
|
with self.assertRaises(Exception):
|
|
security_group.authorizeEgress(
|
|
self.apiclient,
|
|
self.services["sg_cidr_restricted"],
|
|
account=self.account.name,
|
|
domainid=self.account.domainid
|
|
)
|
|
return
|
|
|
|
|
|
|