# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # Import Local Modules from marvin.codes import FAILED, KVM, PASS from nose.plugins.attrib import attr from marvin.cloudstackTestCase import cloudstackTestCase from marvin.lib.utils import random_gen, cleanup_resources, validateList, is_snapshot_on_nfs from marvin.lib.base import (Account, ServiceOffering, VirtualMachine, VmSnapshot, Volume, Snapshot) from marvin.lib.common import (get_zone, get_domain, get_template, list_snapshots) import time class TestVmSnapshot(cloudstackTestCase): @classmethod def setUpClass(cls): testClient = super(TestVmSnapshot, cls).getClsTestClient() cls.apiclient = testClient.getApiClient() cls._cleanup = [] cls.unsupportedHypervisor = False cls.hypervisor = testClient.getHypervisorInfo() if cls.hypervisor.lower() in (KVM.lower(), "hyperv", "lxc"): cls.unsupportedHypervisor = True return cls.services = testClient.getParsedTestDataConfig() # Get Zone, Domain and templates cls.domain = get_domain(cls.apiclient) cls.zone = get_zone(cls.apiclient, testClient.getZoneForTests()) template = get_template( cls.apiclient, cls.zone.id, cls.services["ostype"] ) if template == FAILED: assert False, "get_template() failed to return template\ with description %s" % cls.services["ostype"] cls.services["domainid"] = cls.domain.id cls.services["small"]["zoneid"] = cls.zone.id cls.services["templates"]["ostypeid"] = template.ostypeid cls.services["zoneid"] = cls.zone.id # Create VMs, NAT Rules etc cls.account = Account.create( cls.apiclient, cls.services["account"], domainid=cls.domain.id ) cls._cleanup.append(cls.account) cls.service_offering = ServiceOffering.create( cls.apiclient, cls.services["service_offerings"]["tiny"] ) cls._cleanup.append(cls.service_offering) cls.virtual_machine = VirtualMachine.create( cls.apiclient, cls.services["small"], templateid=template.id, accountid=cls.account.name, domainid=cls.account.domainid, serviceofferingid=cls.service_offering.id, mode=cls.zone.networktype ) cls.random_data_0 = random_gen(size=100) cls.test_dir = "/tmp" cls.random_data = "random.data" return @classmethod def tearDownClass(cls): try: # Cleanup resources used cleanup_resources(cls.apiclient, cls._cleanup) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) return def setUp(self): self.apiclient = self.testClient.getApiClient() self.dbclient = self.testClient.getDbConnection() self.cleanup = [] if self.unsupportedHypervisor: self.skipTest("Skipping test because unsupported hypervisor\ %s" % self.hypervisor) return def tearDown(self): try: # Clean up, terminate the created instance, volumes and snapshots cleanup_resources(self.apiclient, self.cleanup) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) return @attr(tags=["advanced", "advancedns", "smoke"], required_hardware="true") def test_01_create_vm_snapshots(self): """Test to create VM snapshots """ try: # Login to VM and write data to file system ssh_client = self.virtual_machine.get_ssh_client() cmds = [ "echo %s > %s/%s" % (self.random_data_0, self.test_dir, self.random_data), "cat %s/%s" % (self.test_dir, self.random_data)] for c in cmds: self.debug(c) result = ssh_client.execute(c) self.debug(result) except Exception: self.fail("SSH failed for Virtual machine: %s" % self.virtual_machine.ipaddress) self.assertEqual( self.random_data_0, result[0], "Check the random data has be write into temp file!" ) time.sleep(self.services["sleep"]) vm_snapshot = VmSnapshot.create( self.apiclient, self.virtual_machine.id, "false", "TestSnapshot", "Dsiplay Text" ) self.assertEqual( vm_snapshot.state, "Ready", "Check the snapshot of vm is ready!" ) return @attr(tags=["advanced", "advancedns", "smoke"], required_hardware="true") def test_02_revert_vm_snapshots(self): """Test to revert VM snapshots """ try: ssh_client = self.virtual_machine.get_ssh_client() cmds = [ "rm -rf %s/%s" % (self.test_dir, self.random_data), "ls %s/%s" % (self.test_dir, self.random_data) ] for c in cmds: self.debug(c) result = ssh_client.execute(c) self.debug(result) except Exception: self.fail("SSH failed for Virtual machine: %s" % self.virtual_machine.ipaddress) if str(result[0]).index("No such file or directory") == -1: self.fail("Check the random data has be delete from temp file!") time.sleep(self.services["sleep"]) list_snapshot_response = VmSnapshot.list( self.apiclient, vmid=self.virtual_machine.id, listall=True) self.assertEqual( isinstance(list_snapshot_response, list), True, "Check list response returns a valid list" ) self.assertNotEqual( list_snapshot_response, None, "Check if snapshot exists in ListSnapshot" ) self.assertEqual( list_snapshot_response[0].state, "Ready", "Check the snapshot of vm is ready!" ) self.virtual_machine.stop(self.apiclient) VmSnapshot.revertToSnapshot( self.apiclient, list_snapshot_response[0].id) self.virtual_machine.start(self.apiclient) try: ssh_client = self.virtual_machine.get_ssh_client(reconnect=True) cmds = [ "cat %s/%s" % (self.test_dir, self.random_data) ] for c in cmds: self.debug(c) result = ssh_client.execute(c) self.debug(result) except Exception: self.fail("SSH failed for Virtual machine: %s" % self.virtual_machine.ipaddress) self.assertEqual( self.random_data_0, result[0], "Check the random data is equal with the ramdom file!" ) @attr(tags=["advanced", "advancedns", "smoke"], required_hardware="true") def test_03_delete_vm_snapshots(self): """Test to delete vm snapshots """ list_snapshot_response = VmSnapshot.list( self.apiclient, vmid=self.virtual_machine.id, listall=True) self.assertEqual( isinstance(list_snapshot_response, list), True, "Check list response returns a valid list" ) self.assertNotEqual( list_snapshot_response, None, "Check if snapshot exists in ListSnapshot" ) VmSnapshot.deleteVMSnapshot( self.apiclient, list_snapshot_response[0].id) time.sleep(self.services["sleep"] * 3) list_snapshot_response = VmSnapshot.list( self.apiclient, vmid=self.virtual_machine.id, listall=True) self.assertEqual( list_snapshot_response, None, "Check list vm snapshot has be deleted" ) class TestSnapshots(cloudstackTestCase): @classmethod def setUpClass(cls): try: cls._cleanup = [] cls.testClient = super(TestSnapshots, cls).getClsTestClient() cls.api_client = cls.testClient.getApiClient() cls.services = cls.testClient.getParsedTestDataConfig() cls.unsupportedHypervisor = False cls.hypervisor = cls.testClient.getHypervisorInfo() if cls.hypervisor.lower() in (KVM.lower(), "hyperv", "lxc"): cls.unsupportedHypervisor = True return # Get Domain, Zone, Template cls.domain = get_domain(cls.api_client) cls.zone = get_zone( cls.api_client, cls.testClient.getZoneForTests()) cls.template = get_template( cls.api_client, cls.zone.id, cls.services["ostype"] ) if cls.zone.localstorageenabled: cls.storagetype = 'local' cls.services["service_offerings"][ "tiny"]["storagetype"] = 'local' else: cls.storagetype = 'shared' cls.services["service_offerings"][ "tiny"]["storagetype"] = 'shared' cls.services['mode'] = cls.zone.networktype cls.services["virtual_machine"]["hypervisor"] = cls.hypervisor cls.services["virtual_machine"]["zoneid"] = cls.zone.id cls.services["virtual_machine"]["template"] = cls.template.id cls.services["custom_volume"]["zoneid"] = cls.zone.id # Creating Disk offering, Service Offering and Account cls.service_offering = ServiceOffering.create( cls.api_client, cls.services["service_offerings"]["tiny"] ) cls._cleanup.append(cls.service_offering) cls.account = Account.create( cls.api_client, cls.services["account"], domainid=cls.domain.id ) cls._cleanup.append(cls.account) except Exception as e: cls.tearDownClass() raise Exception("Warning: Exception in setup : %s" % e) return def setUp(self): self.apiclient = self.testClient.getApiClient() self.dbclient = self.testClient.getDbConnection() self.cleanup = [] if self.unsupportedHypervisor: self.skipTest("Skipping test because unsupported\ hypervisor %s" % self.hypervisor) def tearDown(self): # Clean up, terminate the created resources cleanup_resources(self.apiclient, self.cleanup) return @classmethod def tearDownClass(cls): try: cleanup_resources(cls.api_client, cls._cleanup) except Exception as e: raise Exception("Warning: Exception during cleanup : %s" % e) return @attr(tags=["advanced", "basic", "smoke"], required_hardware="true") def test_01_test_vm_volume_snapshot(self): """ @Desc: Test that Volume snapshot for root volume is allowed when VM snapshot is present for the VM @Steps: 1: Deploy a VM and create a VM snapshot for VM 2: Try to create snapshot for the root volume of the VM, It should not fail """ # Creating Virtual Machine virtual_machine = VirtualMachine.create( self.apiclient, self.services["virtual_machine"], accountid=self.account.name, domainid=self.account.domainid, serviceofferingid=self.service_offering.id, ) VmSnapshot.create( self.apiclient, virtual_machine.id, ) volumes = Volume.list(self.apiclient, virtualmachineid=virtual_machine.id, type="ROOT", listall=True) self.assertEqual(validateList(volumes)[0], PASS, "Failed to get root volume of the VM") snapshot = Snapshot.create( self.apiclient, volumes[0].id, account=self.account.name, domainid=self.account.domainid ) self.debug("Snapshot created: ID - %s" % snapshot.id) snapshots = list_snapshots( self.apiclient, id=snapshot.id ) self.assertEqual( validateList(snapshots)[0], PASS, "Invalid snapshot list" ) self.assertEqual( snapshots[0].id, snapshot.id, "Check resource id in list resources call" ) return