From e9e8c65b4d63c7bd49c68d0ecd97b1971138d9eb Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 24 Apr 2014 16:56:32 +0530 Subject: [PATCH] CLOUDSTACK-6282: Added Automated tests to test_escalations.py for Security Groups, VPN Customer Gateways, Templates, ISO's CS API calls Signed-off-by: SrikanteswaraRao Talluri --- .../integration/component/test_escalations.py | 2486 +++++++++++++++++ tools/marvin/marvin/config/test_data.py | 17 +- tools/marvin/marvin/lib/base.py | 135 +- 3 files changed, 2636 insertions(+), 2 deletions(-) diff --git a/test/integration/component/test_escalations.py b/test/integration/component/test_escalations.py index c3e306604d6..e9945790b8b 100644 --- a/test/integration/component/test_escalations.py +++ b/test/integration/component/test_escalations.py @@ -26,6 +26,7 @@ from marvin.lib.common import * from marvin.lib.utils import checkVolumeSize from marvin.codes import SUCCESS from nose.plugins.attrib import attr +from time import sleep class TestVolumes(cloudstackTestCase): @@ -5741,4 +5742,2489 @@ class TestSnapshots(cloudstackTestCase): vm_snapshot_status, "Listed VM Snapshot details are not as expected" ) + return + +class TestSecurityGroups(cloudstackTestCase): + + @classmethod + def setUpClass(cls): + try: + cls._cleanup = [] + cls.testClient = super(TestSecurityGroups, cls).getClsTestClient() + cls.api_client = cls.testClient.getApiClient() + cls.services = cls.testClient.getParsedTestDataConfig() + # Get Domain, Zone, Template + cls.domain = get_domain(cls.api_client) + cls.zone = get_zone(cls.api_client) + cls.template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostype"] + ) + cls.services['mode'] = cls.zone.networktype + cls.account = Account.create( + cls.api_client, + cls.services["account"], + domainid=cls.domain.id + ) + # Getting authentication for user in newly created Account + cls.user = cls.account.user[0] + cls.userapiclient = cls.testClient.getUserApiClient(cls.user.username, cls.domain.name) + cls._cleanup.append(cls.account) + except Exception as e: + cls.tearDownClass() + raise Exception("Warning: Exception in setup : %s" % e) + return + + def setUp(self): + + self.apiClient = self.testClient.getApiClient() + self.cleanup = [] + + def tearDown(self): + #Clean up, terminate the created resources + cleanup_resources(self.apiClient, self.cleanup) + return + + @classmethod + def tearDownClass(cls): + try: + cleanup_resources(cls.api_client, cls._cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + + return + + def __verify_values(self, expected_vals, actual_vals): + """ + @Desc: Function to verify expected and actual values + @Steps: + Step1: Initializing return flag to True + Step1: Verifying length of expected and actual dictionaries is matching. + If not matching returning false + Step2: Listing all the keys from expected dictionary + Step3: Looping through each key from step2 and verifying expected and actual dictionaries have same value + If not making return flag to False + Step4: returning the return flag after all the values are verified + """ + return_flag = True + + if len(expected_vals) != len(actual_vals): + return False + + keys = expected_vals.keys() + for i in range(0, len(expected_vals)): + exp_val = expected_vals[keys[i]] + act_val = actual_vals[keys[i]] + if exp_val == act_val: + return_flag = return_flag and True + else: + return_flag = return_flag and False + self.debug("expected Value: %s, is not matching with actual value: %s" % ( + exp_val, + act_val + )) + return return_flag + + @attr(tags=["basic", "provisioning"]) + def test_01_list_securitygroups_pagination(self): + """ + @Desc: Test to List Security Groups pagination + @steps: + Step1: Listing all the Security Groups for a user + Step2: Verifying that list size is 1 + Step3: Creating (page size) number of Security Groups + Step4: Listing all the Security Groups again for a user + Step5: Verifying that list size is (page size + 1) + Step6: Listing all the Security Groups in page1 + Step7: Verifying that list size is (page size) + Step8: Listing all the Security Groups in page2 + Step9: Verifying that list size is 1 + Step10: Deleting the Security Group present in page 2 + Step11: Listing all the Security Groups in page2 + Step12: Verifying that no security groups are listed + """ + # Listing all the Security Groups for a User + list_securitygroups_before = SecurityGroup.list( + self.userapiclient, + listall=self.services["listall"] + ) + # Verifying that default security group is created + status = validateList(list_securitygroups_before) + self.assertEquals( + PASS, + status[0], + "Default Security Groups creation failed" + ) + # Verifying the size of the list is 1 + self.assertEquals( + 1, + len(list_securitygroups_before), + "Count of Security Groups list is not matching" + ) + # Creating pagesize number of security groups + for i in range(0, (self.services["pagesize"])): + securitygroup_created = SecurityGroup.create( + self.userapiclient, + self.services["security_group"], + account=self.account.name, + domainid=self.domain.id, + description=self.services["security_group"]["name"] + ) + self.assertIsNotNone( + securitygroup_created, + "Security Group creation failed" + ) + if (i < self.services["pagesize"]): + self.cleanup.append(securitygroup_created) + + # Listing all the security groups for user again + list_securitygroups_after = SecurityGroup.list( + self.userapiclient, + listall=self.services["listall"] + ) + status = validateList(list_securitygroups_after) + self.assertEquals( + PASS, + status[0], + "Security Groups creation failed" + ) + # Verifying that list size is pagesize + 1 + self.assertEquals( + self.services["pagesize"] + 1, + len(list_securitygroups_after), + "Failed to create pagesize + 1 number of Security Groups" + ) + # Listing all the security groups in page 1 + list_securitygroups_page1 = SecurityGroup.list( + self.userapiclient, + listall=self.services["listall"], + page=1, + pagesize=self.services["pagesize"] + ) + status = validateList(list_securitygroups_page1) + self.assertEquals( + PASS, + status[0], + "Failed to list security groups in page 1" + ) + # Verifying the list size to be equal to pagesize + self.assertEquals( + self.services["pagesize"], + len(list_securitygroups_page1), + "Size of security groups in page 1 is not matching" + ) + # Listing all the security groups in page 2 + list_securitygroups_page2 = SecurityGroup.list( + self.userapiclient, + listall=self.services["listall"], + page=2, + pagesize=self.services["pagesize"] + ) + status = validateList(list_securitygroups_page2) + self.assertEquals( + PASS, + status[0], + "Failed to list security groups in page 2" + ) + # Verifying the list size to be equal to pagesize + self.assertEquals( + 1, + len(list_securitygroups_page2), + "Size of security groups in page 2 is not matching" + ) + # Deleting the security group present in page 2 + SecurityGroup.delete( + securitygroup_created, + self.userapiclient) + # Listing all the security groups in page 2 again + list_securitygroups_page2 = SecurityGroup.list( + self.userapiclient, + listall=self.services["listall"], + page=2, + pagesize=self.services["pagesize"] + ) + # Verifying that there are no security groups listed + self.assertIsNone( + list_securitygroups_page2, + "Security Groups not deleted from page 2" + ) + return + + @attr(tags=["basic", "provisioning"]) + def test_02_securitygroups_authorize_revoke_ingress(self): + """ + @Desc: Test to Authorize and Revoke Ingress for Security Group + @steps: + Step1: Listing all the Security Groups for a user + Step2: Verifying that list size is 1 + Step3: Creating a Security Groups + Step4: Listing all the Security Groups again for a user + Step5: Verifying that list size is 2 + Step6: Authorizing Ingress for the security group created in step3 + Step7: Listing the security groups by passing id of security group created in step3 + Step8: Verifying that list size is 1 + Step9: Verifying that Ingress is authorized to the security group + Step10: Verifying the details of the Ingress rule are as expected + Step11: Revoking Ingress for the security group created in step3 + Step12: Listing the security groups by passing id of security group created in step3 + Step13: Verifying that list size is 1 + Step14: Verifying that Ingress is revoked from the security group + """ + # Listing all the Security Groups for a User + list_securitygroups_before = SecurityGroup.list( + self.userapiclient, + listall=self.services["listall"] + ) + # Verifying that default security group is created + status = validateList(list_securitygroups_before) + self.assertEquals( + PASS, + status[0], + "Default Security Groups creation failed" + ) + # Verifying the size of the list is 1 + self.assertEquals( + 1, + len(list_securitygroups_before), + "Count of Security Groups list is not matching" + ) + # Creating a security group + securitygroup_created = SecurityGroup.create( + self.userapiclient, + self.services["security_group"], + account=self.account.name, + domainid=self.domain.id, + description=self.services["security_group"]["name"] + ) + self.assertIsNotNone( + securitygroup_created, + "Security Group creation failed" + ) + self.cleanup.append(securitygroup_created) + + # Listing all the security groups for user again + list_securitygroups_after = SecurityGroup.list( + self.userapiclient, + listall=self.services["listall"] + ) + status = validateList(list_securitygroups_after) + self.assertEquals( + PASS, + status[0], + "Security Groups creation failed" + ) + # Verifying that list size is 2 + self.assertEquals( + 2, + len(list_securitygroups_after), + "Failed to create Security Group" + ) + # Authorizing Ingress for the security group created in step3 + securitygroup_created.authorize( + self.userapiclient, + self.services["ingress_rule"], + self.account.name, + self.domain.id, + ) + # Listing the security group by Id + list_securitygroups_byid = SecurityGroup.list( + self.userapiclient, + listall=self.services["listall"], + id=securitygroup_created.id, + domainid=self.domain.id + ) + # Verifying that security group is listed + status = validateList(list_securitygroups_byid) + self.assertEquals( + PASS, + status[0], + "Listing of Security Groups by id failed" + ) + # Verifying size of the list is 1 + self.assertEquals( + 1, + len(list_securitygroups_byid), + "Count of the listing security group by id is not matching" + ) + securitygroup_ingress = list_securitygroups_byid[0].ingressrule + # Validating the Ingress rule + status = validateList(securitygroup_ingress) + self.assertEquals( + PASS, + status[0], + "Security Groups Ingress rule authorization failed" + ) + self.assertEquals( + 1, + len(securitygroup_ingress), + "Security Group Ingress rules count is not matching" + ) + # Verifying the details of the Ingress rule are as expected + #Creating expected and actual values dictionaries + expected_dict = { + "cidr":self.services["ingress_rule"]["cidrlist"], + "protocol":self.services["ingress_rule"]["protocol"], + "startport":self.services["ingress_rule"]["startport"], + "endport":self.services["ingress_rule"]["endport"], + } + actual_dict = { + "cidr":str(securitygroup_ingress[0].cidr), + "protocol":str(securitygroup_ingress[0].protocol.upper()), + "startport":str(securitygroup_ingress[0].startport), + "endport":str(securitygroup_ingress[0].endport), + } + ingress_status = self.__verify_values( + expected_dict, + actual_dict + ) + self.assertEqual( + True, + ingress_status, + "Listed Security group Ingress rule details are not as expected" + ) + # Revoking the Ingress rule from Security Group + securitygroup_created.revoke(self.userapiclient, securitygroup_ingress[0].ruleid) + # Listing the security group by Id + list_securitygroups_byid = SecurityGroup.list( + self.userapiclient, + listall=self.services["listall"], + id=securitygroup_created.id, + domainid=self.domain.id + ) + # Verifying that security group is listed + status = validateList(list_securitygroups_byid) + self.assertEquals( + PASS, + status[0], + "Listing of Security Groups by id failed" + ) + # Verifying size of the list is 1 + self.assertEquals( + 1, + len(list_securitygroups_byid), + "Count of the listing security group by id is not matching" + ) + securitygroup_ingress = list_securitygroups_byid[0].ingressrule + # Verifying that Ingress rule is empty(revoked) + status = validateList(securitygroup_ingress) + self.assertEquals( + EMPTY_LIST, + status[2], + "Security Groups Ingress rule is not revoked" + ) + return + + @attr(tags=["basic", "provisioning"]) + def test_03_securitygroups_authorize_revoke_egress(self): + """ + @Desc: Test to Authorize and Revoke Egress for Security Group + @steps: + Step1: Listing all the Security Groups for a user + Step2: Verifying that list size is 1 + Step3: Creating a Security Groups + Step4: Listing all the Security Groups again for a user + Step5: Verifying that list size is 2 + Step6: Authorizing Egress for the security group created in step3 + Step7: Listing the security groups by passing id of security group created in step3 + Step8: Verifying that list size is 1 + Step9: Verifying that Egress is authorized to the security group + Step10: Verifying the details of the Egress rule are as expected + Step11: Revoking Egress for the security group created in step3 + Step12: Listing the security groups by passing id of security group created in step3 + Step13: Verifying that list size is 1 + Step14: Verifying that Egress is revoked from the security group + """ + # Listing all the Security Groups for a User + list_securitygroups_before = SecurityGroup.list( + self.userapiclient, + listall=self.services["listall"] + ) + # Verifying that default security group is created + status = validateList(list_securitygroups_before) + self.assertEquals( + PASS, + status[0], + "Default Security Groups creation failed" + ) + # Verifying the size of the list is 1 + self.assertEquals( + 1, + len(list_securitygroups_before), + "Count of Security Groups list is not matching" + ) + # Creating a security group + securitygroup_created = SecurityGroup.create( + self.userapiclient, + self.services["security_group"], + account=self.account.name, + domainid=self.domain.id, + description=self.services["security_group"]["name"] + ) + self.assertIsNotNone( + securitygroup_created, + "Security Group creation failed" + ) + self.cleanup.append(securitygroup_created) + + # Listing all the security groups for user again + list_securitygroups_after = SecurityGroup.list( + self.userapiclient, + listall=self.services["listall"] + ) + status = validateList(list_securitygroups_after) + self.assertEquals( + PASS, + status[0], + "Security Groups creation failed" + ) + # Verifying that list size is 2 + self.assertEquals( + 2, + len(list_securitygroups_after), + "Failed to create Security Group" + ) + # Authorizing Egress for the security group created in step3 + securitygroup_created.authorizeEgress( + self.userapiclient, + self.services["ingress_rule"], + self.account.name, + self.domain.id, + ) + # Listing the security group by Id + list_securitygroups_byid = SecurityGroup.list( + self.userapiclient, + listall=self.services["listall"], + id=securitygroup_created.id, + domainid=self.domain.id + ) + # Verifying that security group is listed + status = validateList(list_securitygroups_byid) + self.assertEquals( + PASS, + status[0], + "Listing of Security Groups by id failed" + ) + # Verifying size of the list is 1 + self.assertEquals( + 1, + len(list_securitygroups_byid), + "Count of the listing security group by id is not matching" + ) + securitygroup_egress = list_securitygroups_byid[0].egressrule + # Validating the Ingress rule + status = validateList(securitygroup_egress) + self.assertEquals( + PASS, + status[0], + "Security Groups Egress rule authorization failed" + ) + self.assertEquals( + 1, + len(securitygroup_egress), + "Security Group Egress rules count is not matching" + ) + # Verifying the details of the Egress rule are as expected + #Creating expected and actual values dictionaries + expected_dict = { + "cidr":self.services["ingress_rule"]["cidrlist"], + "protocol":self.services["ingress_rule"]["protocol"], + "startport":self.services["ingress_rule"]["startport"], + "endport":self.services["ingress_rule"]["endport"], + } + actual_dict = { + "cidr":str(securitygroup_egress[0].cidr), + "protocol":str(securitygroup_egress[0].protocol.upper()), + "startport":str(securitygroup_egress[0].startport), + "endport":str(securitygroup_egress[0].endport), + } + ingress_status = self.__verify_values( + expected_dict, + actual_dict + ) + self.assertEqual( + True, + ingress_status, + "Listed Security group Egress rule details are not as expected" + ) + # Revoking the Egress rule from Security Group + securitygroup_created.revokeEgress(self.userapiclient, securitygroup_egress[0].ruleid) + # Listing the security group by Id + list_securitygroups_byid = SecurityGroup.list( + self.userapiclient, + listall=self.services["listall"], + id=securitygroup_created.id, + domainid=self.domain.id + ) + # Verifying that security group is listed + status = validateList(list_securitygroups_byid) + self.assertEquals( + PASS, + status[0], + "Listing of Security Groups by id failed" + ) + # Verifying size of the list is 1 + self.assertEquals( + 1, + len(list_securitygroups_byid), + "Count of the listing security group by id is not matching" + ) + securitygroup_egress = list_securitygroups_byid[0].egressrule + # Verifying that Ingress rule is empty(revoked) + status = validateList(securitygroup_egress) + self.assertEquals( + EMPTY_LIST, + status[2], + "Security Groups Egress rule is not revoked" + ) + return + +class TestVpnCustomerGateways(cloudstackTestCase): + + @classmethod + def setUpClass(cls): + try: + cls._cleanup = [] + cls.testClient = super(TestVpnCustomerGateways, cls).getClsTestClient() + cls.api_client = cls.testClient.getApiClient() + cls.services = cls.testClient.getParsedTestDataConfig() + # Get Domain, Zone, Template + cls.domain = get_domain(cls.api_client) + cls.zone = get_zone(cls.api_client) + cls.template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostype"] + ) + cls.services['mode'] = cls.zone.networktype + cls.account = Account.create( + cls.api_client, + cls.services["account"], + domainid=cls.domain.id + ) + # Getting authentication for user in newly created Account + cls.user = cls.account.user[0] + cls.userapiclient = cls.testClient.getUserApiClient(cls.user.username, cls.domain.name) + cls._cleanup.append(cls.account) + except Exception as e: + cls.tearDownClass() + raise Exception("Warning: Exception in setup : %s" % e) + return + + def setUp(self): + + self.apiClient = self.testClient.getApiClient() + self.cleanup = [] + + def tearDown(self): + #Clean up, terminate the created resources + cleanup_resources(self.apiClient, self.cleanup) + return + + @classmethod + def tearDownClass(cls): + try: + cleanup_resources(cls.api_client, cls._cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + + return + + def __verify_values(self, expected_vals, actual_vals): + """ + @Desc: Function to verify expected and actual values + @Steps: + Step1: Initializing return flag to True + Step1: Verifying length of expected and actual dictionaries is matching. + If not matching returning false + Step2: Listing all the keys from expected dictionary + Step3: Looping through each key from step2 and verifying expected and actual dictionaries have same value + If not making return flag to False + Step4: returning the return flag after all the values are verified + """ + return_flag = True + + if len(expected_vals) != len(actual_vals): + return False + + keys = expected_vals.keys() + for i in range(0, len(expected_vals)): + exp_val = expected_vals[keys[i]] + act_val = actual_vals[keys[i]] + if exp_val == act_val: + return_flag = return_flag and True + else: + return_flag = return_flag and False + self.debug("expected Value: %s, is not matching with actual value: %s" % ( + exp_val, + act_val + )) + return return_flag + + @attr(tags=["advanced", "basic", "provisioning"]) + def test_01_list_vpncustomergateways_pagination(self): + """ + @Desc: Test to List VPN Customer Gateways pagination + @steps: + Step1: Listing all the VPN Customer Gateways for a user + Step2: Verifying that no VPN Customer Gateways are listed + Step3: Creating (page size + 1) number of VPN Customer Gateways + Step4: Listing all the VPN Customer Gateways again for a user + Step5: Verifying that list size is (page size + 1) + Step6: Listing all the VPN Customer Gateways in page1 + Step7: Verifying that list size is (page size) + Step8: Listing all the VPN Customer Gateways in page2 + Step9: Verifying that list size is 1 + Step10: Deleting the VPN Customer Gateways present in page 2 + Step11: Listing all the VPN Customer Gateways in page2 + Step12: Verifying that no VPN Customer Gateways are listed + """ + # Listing all the VPN Customer Gateways for a User + list_vpncustomergateways_before = VpnCustomerGateway.list( + self.userapiclient, + listall=self.services["listall"] + ) + # Verifying that no VPN Customer Gateways are listed + self.assertIsNone( + list_vpncustomergateways_before, + "VPN Customer Gateways listed for newly created User" + ) + # Creating pagesize + 1 number of VPN Customer Gateways + for i in range(0, (self.services["pagesize"] + 1)): + vpncustomergateway_created = VpnCustomerGateway.create( + self.userapiclient, + self.services["vpncustomergateway"], + name="VPNCustGateway"+str(i+1), + gateway="10.102.153." + str(i+1), + cidrlist="10.0.0.0/24", + account=self.account.name, + domainid=self.domain.id + ) + self.assertIsNotNone( + vpncustomergateway_created, + "VPN Customer Gateway creation failed" + ) + if (i < self.services["pagesize"] + 1): + self.cleanup.append(vpncustomergateway_created) + + # Listing all the VPN Customer Gateways for a User + list_vpncustomergateways_after = VpnCustomerGateway.list( + self.userapiclient, + listall=self.services["listall"] + ) + status = validateList(list_vpncustomergateways_after) + self.assertEquals( + PASS, + status[0], + "VPN Customer Gateway creation failed" + ) + # Verifying that list size is pagesize + 1 + self.assertEquals( + self.services["pagesize"] + 1, + len(list_vpncustomergateways_after), + "Failed to create pagesize + 1 number of VPN Customer Gateways" + ) + # Listing all the VPN Customer Gateways in page 1 + list_vpncustomergateways_page1 = VpnCustomerGateway.list( + self.userapiclient, + listall=self.services["listall"], + page=1, + pagesize=self.services["pagesize"] + ) + status = validateList(list_vpncustomergateways_page1) + self.assertEquals( + PASS, + status[0], + "Failed to list VPN Customer Gateways in page 1" + ) + # Verifying the list size to be equal to pagesize + self.assertEquals( + self.services["pagesize"], + len(list_vpncustomergateways_page1), + "Size of VPN Customer Gateways in page 1 is not matching" + ) + # Listing all the VPN Customer Gateways in page 2 + list_vpncustomergateways_page2 = VpnCustomerGateway.list( + self.userapiclient, + listall=self.services["listall"], + page=2, + pagesize=self.services["pagesize"] + ) + status = validateList(list_vpncustomergateways_page2) + self.assertEquals( + PASS, + status[0], + "Failed to list VPN Customer Gateways in page 2" + ) + # Verifying the list size to be equal to 1 + self.assertEquals( + 1, + len(list_vpncustomergateways_page2), + "Size of VPN Customer Gateways in page 2 is not matching" + ) + # Deleting the VPM Customer Gateway present in page 2 + VpnCustomerGateway.delete( + vpncustomergateway_created, + self.userapiclient + ) + # Listing all the VPN Customer Gateways in page 2 again + list_vpncustomergateways_page2 = VpnCustomerGateway.list( + self.userapiclient, + listall=self.services["listall"], + page=2, + pagesize=self.services["pagesize"] + ) + # Verifying that there are no VPN Customer Gateways listed + self.assertIsNone( + list_vpncustomergateways_page2, + "VPN Customer Gateways not deleted from page 2" + ) + return + + @attr(tags=["advanced", "basic", "provisioning"]) + def test_02_update_vpncustomergateways(self): + """ + @Desc: Test to update VPN Customer Gateways pagination + @steps: + Step1: Listing all the VPN Customer Gateways for a user + Step2: Verifying that no VPN Customer Gateways are listed + Step3: Creating a VPN Customer Gateways + Step4: Listing all the VPN Customer Gateways again for a user + Step5: Verifying that list size is 1 + Step6: Updating the VPN Customer Gateways created in step3 + Step7: Listing the VPN customer gateway by id + Step8: Verifying that list size is 1 + Step9: Verifying the details of the listed VPN customer gateway are same as updated in step6 + """ + # Listing all the VPN Customer Gateways for a User + list_vpncustomergateways_before = VpnCustomerGateway.list( + self.userapiclient, + listall=self.services["listall"] + ) + # Verifying that no VPN Customer Gateways are listed + self.assertIsNone( + list_vpncustomergateways_before, + "VPN Customer Gateways listed for newly created User" + ) + # Creating A VPN Customer Gateways + vpncustomergateway_created = VpnCustomerGateway.create( + self.userapiclient, + self.services["vpncustomergateway"], + name="VPNCustGateway", + gateway="10.102.153.90", + cidrlist="10.0.0.0/24", + account=self.account.name, + domainid=self.domain.id + ) + self.assertIsNotNone( + vpncustomergateway_created, + "VPN Customer Gateway creation failed" + ) + self.cleanup.append(vpncustomergateway_created) + # Listing all the VPN Customer Gateways for a User + list_vpncustomergateways_after = VpnCustomerGateway.list( + self.userapiclient, + listall=self.services["listall"] + ) + status = validateList(list_vpncustomergateways_after) + self.assertEquals( + PASS, + status[0], + "VPN Customer Gateway creation failed" + ) + # Verifying that list size is 1 + self.assertEquals( + 1, + len(list_vpncustomergateways_after), + "Failed to create VPN Customer Gateways" + ) + # Updating the VPN Customer gateway + vpncustomergateway_updated = VpnCustomerGateway.update( + vpncustomergateway_created, + self.userapiclient, + self.services["vpncustomergateway"], + name="NewVPNCustGateway", + gateway="10.102.153.90", + cidrlist="10.0.0.0/24", + ) + self.assertIsNotNone( + vpncustomergateway_updated, + "Updation of VPN Customer Gateway failed" + ) + # Listing the VPN Customer Gateways by Id + list_vpncustomergateway = VpnCustomerGateway.list( + self.userapiclient, + listall=self.services["listall"], + id=vpncustomergateway_created.id + ) + status = validateList(list_vpncustomergateway) + self.assertEquals( + PASS, + status[0], + "Failed to list VPN Customer Gateways by Id" + ) + # Verifying the list size to be equal to 1 + self.assertEquals( + 1, + len(list_vpncustomergateway), + "Size of VPN Customer Gateways by id is not matching" + ) + # Verifying the details of the listed VPN Customer Gateway are same as updated + #Creating expected and actual values dictionaries + expected_dict = { + "name":vpncustomergateway_updated.name, + "id":vpncustomergateway_updated.id, + "account":vpncustomergateway_updated.account, + "domainid":vpncustomergateway_updated.domainid, + "gateway":vpncustomergateway_updated.gateway, + "cidrlist":vpncustomergateway_updated.cidrlist, + "seckey":vpncustomergateway_updated.ipsecpsk, + "ikepolicy":vpncustomergateway_updated.ikepolicy, + "ikelifetime":vpncustomergateway_updated.ikelifetime, + "esppolicy":vpncustomergateway_updated.esppolicy, + "esplifetime":vpncustomergateway_updated.esplifetime, + } + actual_dict = { + "name":list_vpncustomergateway[0].name, + "id":list_vpncustomergateway[0].id, + "account":list_vpncustomergateway[0].account, + "domainid":list_vpncustomergateway[0].domainid, + "gateway":list_vpncustomergateway[0].gateway, + "cidrlist":list_vpncustomergateway[0].cidrlist, + "seckey":list_vpncustomergateway[0].ipsecpsk, + "ikepolicy":list_vpncustomergateway[0].ikepolicy, + "ikelifetime":list_vpncustomergateway[0].ikelifetime, + "esppolicy":list_vpncustomergateway[0].esppolicy, + "esplifetime":list_vpncustomergateway[0].esplifetime, + } + vpncustomergateway_status = self.__verify_values( + expected_dict, + actual_dict + ) + self.assertEqual( + True, + vpncustomergateway_status, + "Listed VPN Customer Gateway details are not as Updated" + ) + return + +class TestTemplates(cloudstackTestCase): + + @classmethod + def setUpClass(cls): + try: + cls._cleanup = [] + cls.testClient = super(TestTemplates, cls).getClsTestClient() + cls.api_client = cls.testClient.getApiClient() + cls.services = cls.testClient.getParsedTestDataConfig() + # Get Domain, Zone, Template + cls.domain = get_domain(cls.api_client) + cls.zone = get_zone(cls.api_client) + cls.template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostype"] + ) + cls.hypervisor = cls.testClient.getHypervisorInfo() + cls.services['mode'] = cls.zone.networktype + cls.account = Account.create( + cls.api_client, + cls.services["account"], + domainid=cls.domain.id + ) + # Getting authentication for user in newly created Account + cls.user = cls.account.user[0] + cls.userapiclient = cls.testClient.getUserApiClient(cls.user.username, cls.domain.name) + cls._cleanup.append(cls.account) + except Exception as e: + cls.tearDownClass() + raise Exception("Warning: Exception in setup : %s" % e) + return + + def setUp(self): + + self.apiClient = self.testClient.getApiClient() + self.cleanup = [] + + def tearDown(self): + #Clean up, terminate the created resources + cleanup_resources(self.apiClient, self.cleanup) + return + + @classmethod + def tearDownClass(cls): + try: + cleanup_resources(cls.api_client, cls._cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + + return + + def __verify_values(self, expected_vals, actual_vals): + """ + @Desc: Function to verify expected and actual values + @Steps: + Step1: Initializing return flag to True + Step1: Verifying length of expected and actual dictionaries is matching. + If not matching returning false + Step2: Listing all the keys from expected dictionary + Step3: Looping through each key from step2 and verifying expected and actual dictionaries have same value + If not making return flag to False + Step4: returning the return flag after all the values are verified + """ + return_flag = True + + if len(expected_vals) != len(actual_vals): + return False + + keys = expected_vals.keys() + for i in range(0, len(expected_vals)): + exp_val = expected_vals[keys[i]] + act_val = actual_vals[keys[i]] + if exp_val == act_val: + return_flag = return_flag and True + else: + return_flag = return_flag and False + self.debug("expected Value: %s, is not matching with actual value: %s" % ( + exp_val, + act_val + )) + return return_flag + + @attr(tags=["advanced", "basic", "provisioning"]) + def test_01_list_templates_pagination(self): + """ + @Desc: Test to List Templates pagination + @steps: + Step1: Listing all the Templates for a user + Step2: Verifying that no Templates are listed + Step3: Creating (page size + 1) number of Templates + Step4: Listing all the Templates again for a user + Step5: Verifying that list size is (page size + 1) + Step6: Listing all the Templates in page1 + Step7: Verifying that list size is (page size) + Step8: Listing all the Templates in page2 + Step9: Verifying that list size is 1 + Step10: Listing the template by Id + Step11: Verifying if the template is downloaded and ready. + If yes the continuing + If not waiting and checking for template to be ready till timeout + Step12: Deleting the Template present in page 2 + Step13: Listing all the Templates in page2 + Step14: Verifying that no Templates are listed + """ + # Listing all the Templates for a User + list_templates_before = Template.list( + self.userapiclient, + listall=self.services["listall"], + templatefilter=self.services["templatefilter"] + ) + # Verifying that no Templates are listed + self.assertIsNone( + list_templates_before, + "Templates listed for newly created User" + ) + self.services["template"]["url"] = "http://10.147.28.7/templates/ttylinux_pv.vhd" + self.services["template"]["format"] = "VHD" + self.services["template"]["ostype"] = self.services["ostype"] + # Creating pagesize + 1 number of Templates + for i in range(0, (self.services["pagesize"] + 1)): + template_created = Template.register( + self.userapiclient, + self.services["template"], + self.zone.id, + hypervisor=self.hypervisor + ) + self.assertIsNotNone( + template_created, + "Template creation failed" + ) + if(i < self.services["pagesize"]): + self.cleanup.append(template_created) + + # Listing all the Templates for a User + list_templates_after = Template.list( + self.userapiclient, + listall=self.services["listall"], + templatefilter=self.services["templatefilter"] + ) + status = validateList(list_templates_after) + self.assertEquals( + PASS, + status[0], + "Templates creation failed" + ) + # Verifying that list size is pagesize + 1 + self.assertEquals( + self.services["pagesize"] + 1, + len(list_templates_after), + "Failed to create pagesize + 1 number of Templates" + ) + # Listing all the Templates in page 1 + list_templates_page1 = Template.list( + self.userapiclient, + listall=self.services["listall"], + templatefilter=self.services["templatefilter"], + page=1, + pagesize=self.services["pagesize"] + ) + status = validateList(list_templates_page1) + self.assertEquals( + PASS, + status[0], + "Failed to list Templates in page 1" + ) + # Verifying the list size to be equal to pagesize + self.assertEquals( + self.services["pagesize"], + len(list_templates_page1), + "Size of Templates in page 1 is not matching" + ) + # Listing all the Templates in page 2 + list_templates_page2 = Template.list( + self.userapiclient, + listall=self.services["listall"], + templatefilter=self.services["templatefilter"], + page=2, + pagesize=self.services["pagesize"] + ) + status = validateList(list_templates_page2) + self.assertEquals( + PASS, + status[0], + "Failed to list Templates in page 2" + ) + # Verifying the list size to be equal to 1 + self.assertEquals( + 1, + len(list_templates_page2), + "Size of Templates in page 2 is not matching" + ) + # Verifying the state of the template to be ready. If not waiting for state to become ready + template_ready = False + count = 0 + while template_ready is False: + list_template = Template.list( + self.userapiclient, + id=template_created.id, + listall=self.services["listall"], + templatefilter=self.services["templatefilter"], + ) + status = validateList(list_template) + self.assertEquals( + PASS, + status[0], + "Failed to list Templates by Id" + ) + if list_template[0].isready is True: + template_ready = True + elif (str(list_template[0].status) == "Error"): + self.fail("Created Template is in Errored state") + break + elif count > 10: + self.fail("Timed out before Template came into ready state") + break + else: + time.sleep(self.services["sleep"]) + count = count + 1 + + # Deleting the Template present in page 2 + Template.delete( + template_created, + self.userapiclient + ) + # Listing all the Templates in page 2 again + list_templates_page2 = Template.list( + self.userapiclient, + listall=self.services["listall"], + templatefilter=self.services["templatefilter"], + page=2, + pagesize=self.services["pagesize"] + ) + # Verifying that there are no Templates listed + self.assertIsNone( + list_templates_page2, + "Templates not deleted from page 2" + ) + del self.services["template"]["url"] + del self.services["template"]["format"] + del self.services["template"]["ostype"] + return + + @attr(tags=["advanced", "basic", "provisioning"]) + def test_02_download_template(self): + """ + @Desc: Test to Download Template + @steps: + Step1: Listing all the Templates for a user + Step2: Verifying that no Templates are listed + Step3: Creating a Templates + Step4: Listing all the Templates again for a user + Step5: Verifying that list size is 1 + Step6: Verifying if the template is in ready state. + If yes the continuing + If not waiting and checking for template to be ready till timeout + Step7: Downloading the template (Extract) + Step8: Verifying that Template is downloaded + """ + # Listing all the Templates for a User + list_templates_before = Template.list( + self.userapiclient, + listall=self.services["listall"], + templatefilter=self.services["templatefilter"] + ) + # Verifying that no Templates are listed + self.assertIsNone( + list_templates_before, + "Templates listed for newly created User" + ) + self.services["template"]["url"] = "http://10.147.28.7/templates/ttylinux_pv.vhd" + self.services["template"]["format"] = "VHD" + self.services["template"]["ostype"] = self.services["ostype"] + self.services["template"]["isextractable"] = True + # Creating aTemplate + template_created = Template.register( + self.userapiclient, + self.services["template"], + self.zone.id, + hypervisor=self.hypervisor + ) + self.assertIsNotNone( + template_created, + "Template creation failed" + ) + self.cleanup.append(template_created) + # Listing all the Templates for a User + list_templates_after = Template.list( + self.userapiclient, + listall=self.services["listall"], + templatefilter=self.services["templatefilter"] + ) + status = validateList(list_templates_after) + self.assertEquals( + PASS, + status[0], + "Templates creation failed" + ) + # Verifying that list size is 1 + self.assertEquals( + 1, + len(list_templates_after), + "Failed to create a Template" + ) + # Verifying the state of the template to be ready. If not waiting for state to become ready till time out + template_ready = False + count = 0 + while template_ready is False: + list_template = Template.list( + self.userapiclient, + id=template_created.id, + listall=self.services["listall"], + templatefilter=self.services["templatefilter"], + ) + status = validateList(list_template) + self.assertEquals( + PASS, + status[0], + "Failed to list Templates by Id" + ) + if list_template[0].isready is True: + template_ready = True + elif (str(list_template[0].status) == "Error"): + self.fail("Created Template is in Errored state") + break + elif count > 10: + self.fail("Timed out before Template came into ready state") + break + else: + time.sleep(self.services["sleep"]) + count = count + 1 + + # Downloading the Template name + download_template = Template.extract( + self.userapiclient, + template_created.id, + mode="HTTP_DOWNLOAD", + zoneid=self.zone.id + ) + self.assertIsNotNone( + download_template, + "Download Template failed" + ) + # Verifying the details of downloaded template + self.assertEquals( + "DOWNLOAD_URL_CREATED", + download_template.state, + "Download URL not created for Template" + ) + self.assertIsNotNone( + download_template.url, + "Download URL not created for Template" + ) + self.assertEquals( + template_created.id, + download_template.id, + "Download Template details are not same as Template created" + ) + del self.services["template"]["url"] + del self.services["template"]["format"] + del self.services["template"]["ostype"] + del self.services["template"]["isextractable"] + return + + @attr(tags=["advanced", "basic", "provisioning"]) + def test_03_edit_template_details(self): + """ + @Desc: Test to Edit Template name, displaytext, OSType + @steps: + Step1: Listing all the Templates for a user + Step2: Verifying that no Templates are listed + Step3: Creating a Templates + Step4: Listing all the Templates again for a user + Step5: Verifying that list size is 1 + Step6: Verifying if the template is in ready state. + If yes the continuing + If not waiting and checking for template to be ready till timeout + Step7: Editing the template name + Step8: Verifying that Template name is edited + Step9: Editing the template displaytext + Step10: Verifying that Template displaytext is edited + Step11: Editing the template ostypeid + Step12: Verifying that Template ostypeid is edited + Step13: Editing the template name, displaytext + Step14: Verifying that Template name, displaytext are edited + Step15: Editing the template name, displaytext, ostypeid + Step16: Verifying that Template name, displaytext and ostypeid are edited + """ + # Listing all the Templates for a User + list_templates_before = Template.list( + self.userapiclient, + listall=self.services["listall"], + templatefilter=self.services["templatefilter"] + ) + # Verifying that no Templates are listed + self.assertIsNone( + list_templates_before, + "Templates listed for newly created User" + ) + self.services["template"]["url"] = "http://10.147.28.7/templates/ttylinux_pv.vhd" + self.services["template"]["format"] = "VHD" + self.services["template"]["ostype"] = self.services["ostype"] + # Creating aTemplate + template_created = Template.register( + self.userapiclient, + self.services["template"], + self.zone.id, + hypervisor=self.hypervisor + ) + self.assertIsNotNone( + template_created, + "Template creation failed" + ) + self.cleanup.append(template_created) + # Listing all the Templates for a User + list_templates_after = Template.list( + self.userapiclient, + listall=self.services["listall"], + templatefilter=self.services["templatefilter"] + ) + status = validateList(list_templates_after) + self.assertEquals( + PASS, + status[0], + "Templates creation failed" + ) + # Verifying that list size is 1 + self.assertEquals( + 1, + len(list_templates_after), + "Failed to create a Template" + ) + # Verifying the state of the template to be ready. If not waiting for state to become ready till time out + template_ready = False + count = 0 + while template_ready is False: + list_template = Template.list( + self.userapiclient, + id=template_created.id, + listall=self.services["listall"], + templatefilter=self.services["templatefilter"], + ) + status = validateList(list_template) + self.assertEquals( + PASS, + status[0], + "Failed to list Templates by Id" + ) + if list_template[0].isready is True: + template_ready = True + elif (str(list_template[0].status) == "Error"): + self.fail("Created Template is in Errored state") + break + elif count > 10: + self.fail("Timed out before Template came into ready state") + break + else: + time.sleep(self.services["sleep"]) + count = count + 1 + + # Editing the Template name + edited_template = Template.update( + template_created, + self.userapiclient, + name="NewTemplateName" + ) + self.assertIsNotNone( + edited_template, + "Editing Template failed" + ) + # Verifying the details of edited template + expected_dict = { + "id":template_created.id, + "name":"NewTemplateName", + "displaytest":template_created.displaytext, + "account":template_created.account, + "domainid":template_created.domainid, + "format":template_created.format, + "ostypeid":template_created.ostypeid, + "templatetype":template_created.templatetype, + } + actual_dict = { + "id":edited_template.id, + "name":edited_template.name, + "displaytest":edited_template.displaytext, + "account":edited_template.account, + "domainid":edited_template.domainid, + "format":edited_template.format, + "ostypeid":edited_template.ostypeid, + "templatetype":edited_template.templatetype, + } + edit_template_status = self.__verify_values( + expected_dict, + actual_dict + ) + self.assertEqual( + True, + edit_template_status, + "Edited Template details are not as expected" + ) + # Editing the Template displaytext + edited_template = Template.update( + template_created, + self.userapiclient, + displaytext="TemplateDisplaytext" + ) + self.assertIsNotNone( + edited_template, + "Editing Template failed" + ) + # Verifying the details of edited template + expected_dict = { + "id":template_created.id, + "name":"NewTemplateName", + "displaytest":"TemplateDisplaytext", + "account":template_created.account, + "domainid":template_created.domainid, + "format":template_created.format, + "ostypeid":template_created.ostypeid, + "templatetype":template_created.templatetype, + } + actual_dict = { + "id":edited_template.id, + "name":edited_template.name, + "displaytest":edited_template.displaytext, + "account":edited_template.account, + "domainid":edited_template.domainid, + "format":edited_template.format, + "ostypeid":edited_template.ostypeid, + "templatetype":edited_template.templatetype, + } + edit_template_status = self.__verify_values( + expected_dict, + actual_dict + ) + self.assertEqual( + True, + edit_template_status, + "Edited Template details are not as expected" + ) + # Editing the Template ostypeid + ostype_list = list_os_types(self.userapiclient) + status = validateList(ostype_list) + self.assertEquals( + PASS, + status[0], + "Failed to list OS Types" + ) + for i in range(0, len(ostype_list)): + if ostype_list[i].id != template_created.ostypeid: + newostypeid = ostype_list[i].id + break + + edited_template = Template.update( + template_created, + self.userapiclient, + ostypeid=newostypeid + ) + self.assertIsNotNone( + edited_template, + "Editing Template failed" + ) + # Verifying the details of edited template + expected_dict = { + "id":template_created.id, + "name":"NewTemplateName", + "displaytest":"TemplateDisplaytext", + "account":template_created.account, + "domainid":template_created.domainid, + "format":template_created.format, + "ostypeid":newostypeid, + "templatetype":template_created.templatetype, + } + actual_dict = { + "id":edited_template.id, + "name":edited_template.name, + "displaytest":edited_template.displaytext, + "account":edited_template.account, + "domainid":edited_template.domainid, + "format":edited_template.format, + "ostypeid":edited_template.ostypeid, + "templatetype":edited_template.templatetype, + } + edit_template_status = self.__verify_values( + expected_dict, + actual_dict + ) + self.assertEqual( + True, + edit_template_status, + "Edited Template details are not as expected" + ) + # Editing the Template name, displaytext + edited_template = Template.update( + template_created, + self.userapiclient, + name=template_created.name, + displaytext=template_created.displaytext + ) + self.assertIsNotNone( + edited_template, + "Editing Template failed" + ) + # Verifying the details of edited template + expected_dict = { + "id":template_created.id, + "name":template_created.name, + "displaytest":template_created.displaytext, + "account":template_created.account, + "domainid":template_created.domainid, + "format":template_created.format, + "ostypeid":newostypeid, + "templatetype":template_created.templatetype, + } + actual_dict = { + "id":edited_template.id, + "name":edited_template.name, + "displaytest":edited_template.displaytext, + "account":edited_template.account, + "domainid":edited_template.domainid, + "format":edited_template.format, + "ostypeid":edited_template.ostypeid, + "templatetype":edited_template.templatetype, + } + edit_template_status = self.__verify_values( + expected_dict, + actual_dict + ) + self.assertEqual( + True, + edit_template_status, + "Edited Template details are not as expected" + ) + # Editing the Template name, displaytext, ostypeid + edited_template = Template.update( + template_created, + self.userapiclient, + name="NewTemplateName", + displaytext="TemplateDisplaytext", + ostypeid=template_created.ostypeid + ) + self.assertIsNotNone( + edited_template, + "Editing Template failed" + ) + # Verifying the details of edited template + expected_dict = { + "id":template_created.id, + "name":"NewTemplateName", + "displaytest":"TemplateDisplaytext", + "account":template_created.account, + "domainid":template_created.domainid, + "format":template_created.format, + "ostypeid":template_created.ostypeid, + "templatetype":template_created.templatetype, + } + actual_dict = { + "id":edited_template.id, + "name":edited_template.name, + "displaytest":edited_template.displaytext, + "account":edited_template.account, + "domainid":edited_template.domainid, + "format":edited_template.format, + "ostypeid":edited_template.ostypeid, + "templatetype":edited_template.templatetype, + } + edit_template_status = self.__verify_values( + expected_dict, + actual_dict + ) + self.assertEqual( + True, + edit_template_status, + "Edited Template details are not as expected" + ) + del self.services["template"]["url"] + del self.services["template"]["format"] + del self.services["template"]["ostype"] + return + + @attr(tags=["advanced", "basic", "provisioning"]) + def test_04_copy_template(self): + """ + @Desc: Test to copy Template from one zone to another + @steps: + Step1: Listing Zones available for a user + Step2: Verifying if the zones listed are greater than 1. + If Yes continuing. + If not halting the test. + Step3: Listing all the templates for a user in zone1 + Step4: Verifying that no templates are listed + Step5: Listing all the templates for a user in zone2 + Step6: Verifying that no templates are listed + Step7: Creating a Template in zone 1 + Step8: Listing all the Templates again for a user in zone1 + Step9: Verifying that list size is 1 + Step10: Listing all the templates for a user in zone2 + Step11: Verifying that no templates are listed + Step12: Copying the template created in step7 from zone1 to zone2 + Step13: Listing all the templates for a user in zone2 + Step14: Verifying that list size is 1 + Step15: Listing all the Templates for a user in zone1 + Step16: Verifying that list size is 1 + """ + # Listing Zones available for a user + zones_list = Zone.list( + self.userapiclient, + available=True + ) + status = validateList(zones_list) + self.assertEquals( + PASS, + status[0], + "Failed to list Zones" + ) + if not len(zones_list) > 1: + self.fail("Enough zones doesnot exists to copy template") + else: + # Listing all the Templates for a User in Zone 1 + list_templates_zone1 = Template.list( + self.userapiclient, + listall=self.services["listall"], + templatefilter=self.services["templatefilter"], + zoneid=zones_list[0].id + ) + # Verifying that no Templates are listed + self.assertIsNone( + list_templates_zone1, + "Templates listed for newly created User in Zone1" + ) + # Listing all the Templates for a User in Zone 2 + list_templates_zone2 = Template.list( + self.userapiclient, + listall=self.services["listall"], + templatefilter=self.services["templatefilter"], + zoneid=zones_list[1].id + ) + # Verifying that no Templates are listed + self.assertIsNone( + list_templates_zone2, + "Templates listed for newly created User in Zone2" + ) + self.services["template"]["url"] = "http://10.147.28.7/templates/ttylinux_pv.vhd" + self.services["template"]["format"] = "VHD" + self.services["template"]["ostype"] = self.services["ostype"] + #Listing Hypervisors in Zone 1 + hypervisor_list = Hypervisor.list( + self.apiClient, + zoneid=zones_list[0].id + ) + status = validateList(zones_list) + self.assertEquals( + PASS, + status[0], + "Failed to list Hypervisors in Zone 1" + ) + # Creating aTemplate in Zone 1 + template_created = Template.register( + self.userapiclient, + self.services["template"], + zones_list[0].id, + hypervisor=hypervisor_list[0].name + ) + self.assertIsNotNone( + template_created, + "Template creation failed" + ) + self.cleanup.append(template_created) + # Listing all the Templates for a User in Zone 1 + list_templates_zone1 = Template.list( + self.userapiclient, + listall=self.services["listall"], + templatefilter=self.services["templatefilter"], + zoneid=zones_list[0].id + ) + status = validateList(list_templates_zone1) + self.assertEquals( + PASS, + status[0], + "Templates creation failed in Zone1" + ) + # Verifying that list size is 1 + self.assertEquals( + 1, + len(list_templates_zone1), + "Failed to create a Template" + ) + # Listing all the Templates for a User in Zone 2 + list_templates_zone2 = Template.list( + self.userapiclient, + listall=self.services["listall"], + templatefilter=self.services["templatefilter"], + zoneid=zones_list[1].id + ) + # Verifying that no Templates are listed + self.assertIsNone( + list_templates_zone2, + "Templates listed for newly created User in Zone2" + ) + # Verifying the state of the template to be ready. If not waiting for state to become ready till time out + template_ready = False + count = 0 + while template_ready is False: + list_template = Template.list( + self.userapiclient, + id=template_created.id, + listall=self.services["listall"], + templatefilter=self.services["templatefilter"], + ) + status = validateList(list_template) + self.assertEquals( + PASS, + status[0], + "Failed to list Templates by Id" + ) + if list_template[0].isready is True: + template_ready = True + elif (str(list_template[0].status) == "Error"): + self.fail("Created Template is in Errored state") + break + elif count > 10: + self.fail("Timed out before Template came into ready state") + break + else: + time.sleep(self.services["sleep"]) + count = count + 1 + + # Copying the Template from Zone1 to Zone2 + copied_template = Template.copy( + self.userapiclient, + template_created.id, + sourcezoneid=template_created.zoneid, + destzoneid=zones_list[1].id + ) + self.assertIsNotNone( + copied_template, + "Copying Template from Zone1 to Zone2 failed" + ) + # Listing all the Templates for a User in Zone 1 + list_templates_zone1 = Template.list( + self.userapiclient, + listall=self.services["listall"], + templatefilter=self.services["templatefilter"], + zoneid=zones_list[0].id + ) + status = validateList(list_templates_zone1) + self.assertEquals( + PASS, + status[0], + "Templates creation failed in Zone1" + ) + # Verifying that list size is 1 + self.assertEquals( + 1, + len(list_templates_zone1), + "Failed to create a Template" + ) + # Listing all the Templates for a User in Zone 2 + list_templates_zone2 = Template.list( + self.userapiclient, + listall=self.services["listall"], + templatefilter=self.services["templatefilter"], + zoneid=zones_list[1].id + ) + status = validateList(list_templates_zone2) + self.assertEquals( + PASS, + status[0], + "Template failed to copy into Zone2" + ) + # Verifying that list size is 1 + self.assertEquals( + 1, + len(list_templates_zone2), + "Template failed to copy into Zone2" + ) + self.assertNotEquals( + "Connection refused", + list_templates_zone2[0].status, + "Failed to copy Template" + ) + self.assertEquals( + True, + list_templates_zone2[0].isready, + "Failed to copy Template" + ) + del self.services["template"]["url"] + del self.services["template"]["format"] + del self.services["template"]["ostype"] + return + +class TestIsos(cloudstackTestCase): + + @classmethod + def setUpClass(cls): + try: + cls._cleanup = [] + cls.testClient = super(TestIsos, cls).getClsTestClient() + cls.api_client = cls.testClient.getApiClient() + cls.services = cls.testClient.getParsedTestDataConfig() + # Get Domain, Zone, Template + cls.domain = get_domain(cls.api_client) + cls.zone = get_zone(cls.api_client) + cls.template = get_template( + cls.api_client, + cls.zone.id, + cls.services["ostype"] + ) + cls.hypervisor = cls.testClient.getHypervisorInfo() + cls.services['mode'] = cls.zone.networktype + cls.account = Account.create( + cls.api_client, + cls.services["account"], + domainid=cls.domain.id + ) + # Getting authentication for user in newly created Account + cls.user = cls.account.user[0] + cls.userapiclient = cls.testClient.getUserApiClient(cls.user.username, cls.domain.name) + cls._cleanup.append(cls.account) + except Exception as e: + cls.tearDownClass() + raise Exception("Warning: Exception in setup : %s" % e) + return + + def setUp(self): + + self.apiClient = self.testClient.getApiClient() + self.cleanup = [] + + def tearDown(self): + #Clean up, terminate the created resources + cleanup_resources(self.apiClient, self.cleanup) + return + + @classmethod + def tearDownClass(cls): + try: + cleanup_resources(cls.api_client, cls._cleanup) + except Exception as e: + raise Exception("Warning: Exception during cleanup : %s" % e) + + return + + def __verify_values(self, expected_vals, actual_vals): + """ + @Desc: Function to verify expected and actual values + @Steps: + Step1: Initializing return flag to True + Step1: Verifying length of expected and actual dictionaries is matching. + If not matching returning false + Step2: Listing all the keys from expected dictionary + Step3: Looping through each key from step2 and verifying expected and actual dictionaries have same value + If not making return flag to False + Step4: returning the return flag after all the values are verified + """ + return_flag = True + + if len(expected_vals) != len(actual_vals): + return False + + keys = expected_vals.keys() + for i in range(0, len(expected_vals)): + exp_val = expected_vals[keys[i]] + act_val = actual_vals[keys[i]] + if exp_val == act_val: + return_flag = return_flag and True + else: + return_flag = return_flag and False + self.debug("expected Value: %s, is not matching with actual value: %s" % ( + exp_val, + act_val + )) + return return_flag + + @attr(tags=["advanced", "basic", "provisioning"]) + def test_01_list_isos_pagination(self): + """ + @Desc: Test to List ISO's pagination + @steps: + Step1: Listing all the ISO's for a user + Step2: Verifying that no ISO's are listed + Step3: Creating (page size + 1) number of ISO's + Step4: Listing all the ISO's again for a user + Step5: Verifying that list size is (page size + 1) + Step6: Listing all the ISO's in page1 + Step7: Verifying that list size is (page size) + Step8: Listing all the ISO's in page2 + Step9: Verifying that list size is 1 + Step10: Listing the ISO's by Id + Step11: Verifying if the ISO is downloaded and ready. + If yes the continuing + If not waiting and checking for iso to be ready till timeout + Step12: Deleting the ISO present in page 2 + Step13: Listing all the ISO's in page2 + Step14: Verifying that no ISO's are listed + """ + # Listing all the ISO's for a User + list_iso_before = Iso.list( + self.userapiclient, + listall=self.services["listall"], + isofilter=self.services["templatefilter"] + ) + # Verifying that no ISOs are listed + self.assertIsNone( + list_iso_before, + "ISOs listed for newly created User" + ) + self.services["iso"]["zoneid"] = self.zone.id + # Creating pagesize + 1 number of ISO's + for i in range(0, (self.services["pagesize"] + 1)): + iso_created = Iso.create( + self.userapiclient, + self.services["iso"] + ) + self.assertIsNotNone( + iso_created, + "ISO creation failed" + ) + if(i < self.services["pagesize"]): + self.cleanup.append(iso_created) + + # Listing all the ISO's for a User + list_iso_after = Iso.list( + self.userapiclient, + listall=self.services["listall"], + isofilter=self.services["templatefilter"] + ) + status = validateList(list_iso_after) + self.assertEquals( + PASS, + status[0], + "ISO's creation failed" + ) + # Verifying that list size is pagesize + 1 + self.assertEquals( + self.services["pagesize"] + 1, + len(list_iso_after), + "Failed to create pagesize + 1 number of ISO's" + ) + # Listing all the ISO's in page 1 + list_iso_page1 = Iso.list( + self.userapiclient, + listall=self.services["listall"], + isofilter=self.services["templatefilter"], + page=1, + pagesize=self.services["pagesize"] + ) + status = validateList(list_iso_page1) + self.assertEquals( + PASS, + status[0], + "Failed to list ISO's in page 1" + ) + # Verifying the list size to be equal to pagesize + self.assertEquals( + self.services["pagesize"], + len(list_iso_page1), + "Size of ISO's in page 1 is not matching" + ) + # Listing all the Templates in page 2 + list_iso_page2 = Iso.list( + self.userapiclient, + listall=self.services["listall"], + isofilter=self.services["templatefilter"], + page=2, + pagesize=self.services["pagesize"] + ) + status = validateList(list_iso_page2) + self.assertEquals( + PASS, + status[0], + "Failed to list ISo's in page 2" + ) + # Verifying the list size to be equal to 1 + self.assertEquals( + 1, + len(list_iso_page2), + "Size of ISO's in page 2 is not matching" + ) + # Verifying the state of the ISO to be ready. If not waiting for state to become ready + iso_ready = False + count = 0 + while iso_ready is False: + list_iso = Iso.list( + self.userapiclient, + listall=self.services["listall"], + isofilter=self.services["templatefilter"], + id=iso_created.id + ) + status = validateList(list_iso) + self.assertEquals( + PASS, + status[0], + "Failed to list ISO by Id" + ) + if list_iso[0].isready is True: + iso_ready = True + elif (str(list_iso[0].status) == "Error"): + self.fail("Created ISO is in Errored state") + break + elif count > 10: + self.fail("Timed out before ISO came into ready state") + break + else: + time.sleep(self.services["sleep"]) + count = count + 1 + + # Deleting the ISO present in page 2 + Iso.delete( + iso_created, + self.userapiclient + ) + # Listing all the ISO's in page 2 again + list_iso_page2 = Iso.list( + self.userapiclient, + listall=self.services["listall"], + isofilter=self.services["templatefilter"], + page=2, + pagesize=self.services["pagesize"] + ) + # Verifying that there are no ISO's listed + self.assertIsNone( + list_iso_page2, + "ISO's not deleted from page 2" + ) + del self.services["iso"]["zoneid"] + return + + @attr(tags=["advanced", "basic", "provisioning"]) + def test_02_download_iso(self): + """ + @Desc: Test to Download ISO + @steps: + Step1: Listing all the ISO's for a user + Step2: Verifying that no ISO's are listed + Step3: Creating an ISO + Step4: Listing all the ISO's again for a user + Step5: Verifying that list size is 1 + Step6: Verifying if the ISO is in ready state. + If yes the continuing + If not waiting and checking for template to be ready till timeout + Step7: Downloading the ISO (Extract) + Step8: Verifying the details of downloaded ISO + """ + # Listing all the ISO's for a User + list_iso_before = Iso.list( + self.userapiclient, + listall=self.services["listall"], + isofilter=self.services["templatefilter"] + ) + # Verifying that no ISOs are listed + self.assertIsNone( + list_iso_before, + "ISOs listed for newly created User" + ) + self.services["iso"]["zoneid"] = self.zone.id + self.services["iso"]["isextractable"] = True + # Creating an ISO's + iso_created = Iso.create( + self.userapiclient, + self.services["iso"] + ) + self.assertIsNotNone( + iso_created, + "ISO creation failed" + ) + self.cleanup.append(iso_created) + # Listing all the ISO's for a User + list_iso_after = Iso.list( + self.userapiclient, + listall=self.services["listall"], + isofilter=self.services["templatefilter"] + ) + status = validateList(list_iso_after) + self.assertEquals( + PASS, + status[0], + "ISO's creation failed" + ) + # Verifying that list size is 1 + self.assertEquals( + 1, + len(list_iso_after), + "Failed to create an ISO's" + ) + # Verifying the state of the ISO to be ready. If not waiting for state to become ready + iso_ready = False + count = 0 + while iso_ready is False: + list_iso = Iso.list( + self.userapiclient, + listall=self.services["listall"], + isofilter=self.services["templatefilter"], + id=iso_created.id + ) + status = validateList(list_iso) + self.assertEquals( + PASS, + status[0], + "Failed to list ISO by Id" + ) + if list_iso[0].isready is True: + iso_ready = True + elif (str(list_iso[0].status) == "Error"): + self.fail("Created ISO is in Errored state") + break + elif count > 10: + self.fail("Timed out before ISO came into ready state") + break + else: + time.sleep(self.services["sleep"]) + count = count + 1 + + # Downloading the ISO + download_iso = Iso.extract( + self.userapiclient, + iso_created.id, + mode="HTTP_DOWNLOAD", + zoneid=self.zone.id + ) + self.assertIsNotNone( + download_iso, + "Download ISO failed" + ) + # Verifying the details of downloaded ISO + self.assertEquals( + "DOWNLOAD_URL_CREATED", + download_iso.state, + "Download URL not created for ISO" + ) + self.assertIsNotNone( + download_iso.url, + "Download URL not created for ISO" + ) + self.assertEquals( + iso_created.id, + download_iso.id, + "Download ISO details are not same as ISO created" + ) + del self.services["iso"]["zoneid"] + del self.services["iso"]["isextractable"] + return + + @attr(tags=["advanced", "basic", "provisioning"]) + def test_03_edit_iso_details(self): + """ + @Desc: Test to Edit ISO name, displaytext, OSType + @steps: + Step1: Listing all the ISO's for a user + Step2: Verifying that no ISO's are listed + Step3: Creating an ISO + Step4: Listing all the ISO's again for a user + Step5: Verifying that list size is 1 + Step6: Verifying if the ISO is in ready state. + If yes the continuing + If not waiting and checking for template to be ready till timeout + Step7: Editing the ISO's name, displaytext + Step8: Verifying that ISO name and displaytext are edited + Step9: Editing the ISO name, displaytext, ostypeid + Step10: Verifying that ISO name, displaytext and ostypeid are edited + """ + # Listing all the ISO's for a User + list_iso_before = Iso.list( + self.userapiclient, + listall=self.services["listall"], + isofilter=self.services["templatefilter"] + ) + # Verifying that no ISOs are listed + self.assertIsNone( + list_iso_before, + "ISOs listed for newly created User" + ) + self.services["iso"]["zoneid"] = self.zone.id + # Creating an ISO's + iso_created = Iso.create( + self.userapiclient, + self.services["iso"] + ) + self.assertIsNotNone( + iso_created, + "ISO creation failed" + ) + self.cleanup.append(iso_created) + # Listing all the ISO's for a User + list_iso_after = Iso.list( + self.userapiclient, + listall=self.services["listall"], + isofilter=self.services["templatefilter"] + ) + status = validateList(list_iso_after) + self.assertEquals( + PASS, + status[0], + "ISO's creation failed" + ) + # Verifying that list size is 1 + self.assertEquals( + 1, + len(list_iso_after), + "Failed to create an ISO's" + ) + # Verifying the state of the ISO to be ready. If not waiting for state to become ready + iso_ready = False + count = 0 + while iso_ready is False: + list_iso = Iso.list( + self.userapiclient, + listall=self.services["listall"], + isofilter=self.services["templatefilter"], + id=iso_created.id + ) + status = validateList(list_iso) + self.assertEquals( + PASS, + status[0], + "Failed to list ISO by Id" + ) + if list_iso[0].isready is True: + iso_ready = True + elif (str(list_iso[0].status) == "Error"): + self.fail("Created ISO is in Errored state") + break + elif count > 10: + self.fail("Timed out before ISO came into ready state") + break + else: + time.sleep(self.services["sleep"]) + count = count + 1 + + # Editing the ISO name, displaytext + edited_iso = Iso.update( + iso_created, + self.userapiclient, + name="NewISOName", + displaytext="NewISODisplayText" + ) + self.assertIsNotNone( + edited_iso, + "Editing ISO failed" + ) + # Verifying the details of edited template + expected_dict = { + "id":iso_created.id, + "name":"NewISOName", + "displaytest":"NewISODisplayText", + "account":iso_created.account, + "domainid":iso_created.domainid, + "isfeatured":iso_created.isfeatured, + "ostypeid":iso_created.ostypeid, + "ispublic":iso_created.ispublic, + } + actual_dict = { + "id":edited_iso.id, + "name":edited_iso.name, + "displaytest":edited_iso.displaytext, + "account":edited_iso.account, + "domainid":edited_iso.domainid, + "isfeatured":edited_iso.isfeatured, + "ostypeid":edited_iso.ostypeid, + "ispublic":edited_iso.ispublic, + } + edit_iso_status = self.__verify_values( + expected_dict, + actual_dict + ) + self.assertEqual( + True, + edit_iso_status, + "Edited ISO details are not as expected" + ) + # Editing the ISO name, displaytext, ostypeid + ostype_list = list_os_types(self.userapiclient) + status = validateList(ostype_list) + self.assertEquals( + PASS, + status[0], + "Failed to list OS Types" + ) + for i in range(0, len(ostype_list)): + if ostype_list[i].id != iso_created.ostypeid: + newostypeid = ostype_list[i].id + break + + edited_iso = Iso.update( + iso_created, + self.userapiclient, + name=iso_created.name, + displaytext=iso_created.displaytext, + ostypeid=newostypeid + ) + self.assertIsNotNone( + edited_iso, + "Editing ISO failed" + ) + # Verifying the details of edited template + expected_dict = { + "id":iso_created.id, + "name":iso_created.name, + "displaytest":iso_created.displaytext, + "account":iso_created.account, + "domainid":iso_created.domainid, + "isfeatured":iso_created.isfeatured, + "ostypeid":newostypeid, + "ispublic":iso_created.ispublic, + } + actual_dict = { + "id":edited_iso.id, + "name":edited_iso.name, + "displaytest":edited_iso.displaytext, + "account":edited_iso.account, + "domainid":edited_iso.domainid, + "isfeatured":edited_iso.isfeatured, + "ostypeid":edited_iso.ostypeid, + "ispublic":edited_iso.ispublic, + } + edit_iso_status = self.__verify_values( + expected_dict, + actual_dict + ) + self.assertEqual( + True, + edit_iso_status, + "Edited ISO details are not as expected" + ) + del self.services["iso"]["zoneid"] + return + + @attr(tags=["advanced", "basic", "provisioning"]) + def test_04_copy_iso(self): + """ + @Desc: Test to copy ISO from one zone to another + @steps: + Step1: Listing Zones available for a user + Step2: Verifying if the zones listed are greater than 1. + If Yes continuing. + If not halting the test. + Step3: Listing all the ISO's for a user in zone1 + Step4: Verifying that no ISO's are listed + Step5: Listing all the ISO's for a user in zone2 + Step6: Verifying that no ISO's are listed + Step7: Creating an ISO in zone 1 + Step8: Listing all the ISO's again for a user in zone1 + Step9: Verifying that list size is 1 + Step10: Listing all the ISO's for a user in zone2 + Step11: Verifying that no ISO's are listed + Step12: Copying the ISO created in step7 from zone1 to zone2 + Step13: Listing all the ISO's for a user in zone2 + Step14: Verifying that list size is 1 + Step15: Listing all the ISO's for a user in zone1 + Step16: Verifying that list size is 1 + """ + # Listing Zones available for a user + zones_list = Zone.list( + self.userapiclient, + available=True + ) + status = validateList(zones_list) + self.assertEquals( + PASS, + status[0], + "Failed to list Zones" + ) + if not len(zones_list) > 1: + self.fail("Enough zones doesnot exists to copy iso") + else: + # Listing all the ISO's for a User in Zone 1 + list_isos_zone1 = Iso.list( + self.userapiclient, + listall=self.services["listall"], + isofilter=self.services["templatefilter"], + zoneid=zones_list[0].id + ) + # Verifying that no ISO's are listed + self.assertIsNone( + list_isos_zone1, + "ISO's listed for newly created User in Zone1" + ) + # Listing all the ISO's for a User in Zone 2 + list_isos_zone2 = Iso.list( + self.userapiclient, + listall=self.services["listall"], + isofilter=self.services["templatefilter"], + zoneid=zones_list[1].id + ) + # Verifying that no ISO's are listed + self.assertIsNone( + list_isos_zone2, + "ISO's listed for newly created User in Zone2" + ) + self.services["iso"]["zoneid"] = zones_list[0].id + # Creating an ISO in Zone 1 + iso_created = Iso.create( + self.userapiclient, + self.services["iso"] + ) + self.assertIsNotNone( + iso_created, + "ISO creation failed" + ) + self.cleanup.append(iso_created) + # Listing all the ISO's for a User in Zone 1 + list_isos_zone1 = Iso.list( + self.userapiclient, + listall=self.services["listall"], + isofilter=self.services["templatefilter"], + zoneid=zones_list[0].id + ) + status = validateList(list_isos_zone1) + self.assertEquals( + PASS, + status[0], + "ISO creation failed in Zone1" + ) + # Verifying that list size is 1 + self.assertEquals( + 1, + len(list_isos_zone1), + "Failed to create a Template" + ) + # Listing all the ISO's for a User in Zone 2 + list_isos_zone2 = Iso.list( + self.userapiclient, + listall=self.services["listall"], + isofilter=self.services["templatefilter"], + zoneid=zones_list[1].id + ) + # Verifying that no ISO's are listed + self.assertIsNone( + list_isos_zone2, + "ISO's listed for newly created User in Zone2" + ) + # Verifying the state of the ISO to be ready. If not waiting for state to become ready + iso_ready = False + count = 0 + while iso_ready is False: + list_iso = Iso.list( + self.userapiclient, + listall=self.services["listall"], + isofilter=self.services["templatefilter"], + id=iso_created.id + ) + status = validateList(list_iso) + self.assertEquals( + PASS, + status[0], + "Failed to list ISO by Id" + ) + if list_iso[0].isready is True: + iso_ready = True + elif (str(list_iso[0].status) == "Error"): + self.fail("Created ISO is in Errored state") + break + elif count > 10: + self.fail("Timed out before ISO came into ready state") + break + else: + time.sleep(self.services["sleep"]) + count = count + 1 + + # Copying the ISO from Zone1 to Zone2 + copied_iso = Iso.copy( + self.userapiclient, + iso_created.id, + sourcezoneid=iso_created.zoneid, + destzoneid=zones_list[1].id + ) + self.assertIsNotNone( + copied_iso, + "Copying ISO from Zone1 to Zone2 failed" + ) + # Listing all the ISO's for a User in Zone 1 + list_isos_zone1 = Iso.list( + self.userapiclient, + listall=self.services["listall"], + isofilter=self.services["templatefilter"], + zoneid=zones_list[0].id + ) + status = validateList(list_isos_zone1) + self.assertEquals( + PASS, + status[0], + "ISO creation failed in Zone1" + ) + # Verifying that list size is 1 + self.assertEquals( + 1, + len(list_isos_zone1), + "Failed to create a Template" + ) + # Listing all the ISO's for a User in Zone 2 + list_isos_zone2 = Iso.list( + self.userapiclient, + listall=self.services["listall"], + isofilter=self.services["templatefilter"], + zoneid=zones_list[1].id + ) + status = validateList(list_isos_zone2) + self.assertEquals( + PASS, + status[0], + "ISO failed to copy into Zone2" + ) + # Verifying that list size is 1 + self.assertEquals( + 1, + len(list_isos_zone2), + "ISO failed to copy into Zone2" + ) + self.assertNotEquals( + "Connection refused", + list_isos_zone2[0].status, + "Failed to copy ISO" + ) + self.assertEquals( + True, + list_isos_zone2[0].isready, + "Failed to copy ISO" + ) + del self.services["iso"]["zoneid"] return \ No newline at end of file diff --git a/tools/marvin/marvin/config/test_data.py b/tools/marvin/marvin/config/test_data.py index eb5ce26a6ec..1a29ed93a27 100644 --- a/tools/marvin/marvin/config/test_data.py +++ b/tools/marvin/marvin/config/test_data.py @@ -434,7 +434,14 @@ test_data = { "publicport": 2222, "protocol": 'TCP' }, - + "iso": { + "displaytext": "Test ISO", + "name": "ISO", + "url": "http://people.apache.org/~tsp/dummy.iso", + "bootable": False, + "ispublic": False, + "ostype": "CentOS 5.6 (64-bit)", + }, "iso1": { "displaytext": "Test ISO 1", "name": "ISO 1", @@ -489,6 +496,14 @@ test_data = { "endport": "22", "cidrlist": "0.0.0.0/0" }, + "vpncustomergateway": { + "ipsecpsk": "secreatKey", + "ikepolicy": "aes128-sha1", + "ikelifetime": "86400", + "esppolicy": "aes128-sha1", + "epslifetime": "3600", + "dpd": "false" + }, "ostype": "CentOS 5.6 (64-bit)", "sleep": 90, "timeout": 10, diff --git a/tools/marvin/marvin/lib/base.py b/tools/marvin/marvin/lib/base.py index 1ac57377949..d75309877f4 100755 --- a/tools/marvin/marvin/lib/base.py +++ b/tools/marvin/marvin/lib/base.py @@ -1086,6 +1086,25 @@ class Template: [setattr(cmd, k, v) for k, v in kwargs.items()] return(apiclient.updateTemplatePermissions(cmd)) + def update(self, apiclient, **kwargs): + """Updates the template details""" + + cmd = updateTemplate.updateTemplateCmd() + cmd.id = self.id + [setattr(cmd, k, v) for k, v in kwargs.items()] + return(apiclient.updateTemplate(cmd)) + + @classmethod + def copy(cls, apiclient, id, sourcezoneid, destzoneid): + "Copy Template from source Zone to Destination Zone" + + cmd = copyTemplate.copyTemplateCmd() + cmd.id = id + cmd.sourcezoneid = sourcezoneid + cmd.destzoneid = destzoneid + + return apiclient.copyTemplate(cmd) + @classmethod def list(cls, apiclient, **kwargs): """List all templates matching criteria""" @@ -1185,6 +1204,36 @@ class Iso: timeout = timeout - 1 return + @classmethod + def extract(cls, apiclient, id, mode, zoneid=None): + "Extract ISO " + + cmd = extractIso.extractIsoCmd() + cmd.id = id + cmd.mode = mode + cmd.zoneid = zoneid + + return apiclient.extractIso(cmd) + + def update(self, apiclient, **kwargs): + """Updates the ISO details""" + + cmd = updateIso.updateIsoCmd() + cmd.id = self.id + [setattr(cmd, k, v) for k, v in kwargs.items()] + return(apiclient.updateIso(cmd)) + + @classmethod + def copy(cls, apiclient, id, sourcezoneid, destzoneid): + "Copy ISO from source Zone to Destination Zone" + + cmd = copyIso.copyIsoCmd() + cmd.id = id + cmd.sourcezoneid = sourcezoneid + cmd.destzoneid = destzoneid + + return apiclient.copyIso(cmd) + @classmethod def list(cls, apiclient, **kwargs): """Lists all available ISO files.""" @@ -1680,6 +1729,20 @@ class SnapshotPolicy: [setattr(cmd, k, v) for k, v in kwargs.items()] return(apiclient.listSnapshotPolicies(cmd)) +class Hypervisor: + """Manage Hypervisor""" + + def __init__(self, items): + self.__dict__.update(items) + + @classmethod + def list(cls, apiclient, **kwargs): + """Lists hypervisors""" + + cmd = listHypervisors.listHypervisorsCmd() + [setattr(cmd, k, v) for k, v in kwargs.items()] + return(apiclient.listHypervisors(cmd)) + class LoadBalancerRule: """Manage Load Balancer rule""" @@ -2678,7 +2741,7 @@ class SecurityGroup: """Create security group""" cmd = createSecurityGroup.createSecurityGroupCmd() - cmd.name = services["name"] + cmd.name = "-".join([services["name"], random_gen()]) if account: cmd.account = account if domainid: @@ -2780,6 +2843,76 @@ class SecurityGroup: return(apiclient.listSecurityGroups(cmd)) +class VpnCustomerGateway: + """Manage VPN Customer Gateway""" + + def __init__(self, items): + self.__dict__.update(items) + + @classmethod + def create(cls, apiclient, services, name, gateway, cidrlist, + account=None, domainid=None): + """Create VPN Customer Gateway""" + cmd = createVpnCustomerGateway.createVpnCustomerGatewayCmd() + cmd.name = name + cmd.gateway = gateway + cmd.cidrlist = cidrlist + if "ipsecpsk" in services: + cmd.ipsecpsk = services["ipsecpsk"] + if "ikepolicy" in services: + cmd.ikepolicy = services["ikepolicy"] + if "ikelifetime" in services: + cmd.ikelifetime = services["ikelifetime"] + if "esppolicy" in services: + cmd.esppolicy = services["esppolicy"] + if "esplifetime" in services: + cmd.esplifetime = services["esplifetime"] + if "dpd" in services: + cmd.dpd = services["dpd"] + if account: + cmd.account = account + if domainid: + cmd.domainid = domainid + return VpnCustomerGateway(apiclient.createVpnCustomerGateway(cmd).__dict__) + + def update(self, apiclient, services, name, gateway, cidrlist): + """Updates VPN Customer Gateway""" + + cmd = updateVpnCustomerGateway.updateVpnCustomerGatewayCmd() + cmd.id = self.id + cmd.name = name + cmd.gateway = gateway + cmd.cidrlist = cidrlist + if "ipsecpsk" in services: + cmd.ipsecpsk = services["ipsecpsk"] + if "ikepolicy" in services: + cmd.ikepolicy = services["ikepolicy"] + if "ikelifetime" in services: + cmd.ikelifetime = services["ikelifetime"] + if "esppolicy" in services: + cmd.esppolicy = services["esppolicy"] + if "esplifetime" in services: + cmd.esplifetime = services["esplifetime"] + if "dpd" in services: + cmd.dpd = services["dpd"] + return(apiclient.updateVpnCustomerGateway(cmd)) + + def delete(self, apiclient): + """Delete VPN Customer Gateway""" + + cmd = deleteVpnCustomerGateway.deleteVpnCustomerGatewayCmd() + cmd.id = self.id + apiclient.deleteVpnCustomerGateway(cmd) + + @classmethod + def list(cls, apiclient, **kwargs): + """List all VPN customer Gateway""" + + cmd = listVpnCustomerGateways.listVpnCustomerGatewaysCmd() + [setattr(cmd, k, v) for k, v in kwargs.items()] + return(apiclient.listVpnCustomerGateways(cmd)) + + class Project: """Manage Project life cycle"""