mirror of
https://github.com/apache/cloudstack.git
synced 2025-10-26 08:42:29 +01:00
add new testcase class, make testcase pydev friendly
This commit is contained in:
parent
7a7662b01b
commit
2e48a9188d
7
tools/testClient/cloudstackTestCase.py
Normal file
7
tools/testClient/cloudstackTestCase.py
Normal file
@ -0,0 +1,7 @@
|
||||
from cloudstackAPI import *
|
||||
import unittest
|
||||
import cloudstackTestClient
|
||||
class cloudstackTestCase(unittest.case.TestCase):
|
||||
def __init__(self, args):
|
||||
unittest.case.TestCase.__init__(self, args)
|
||||
self.testClient = cloudstackTestClient.cloudstackTestClient()
|
||||
@ -4,7 +4,7 @@ import dbConnection
|
||||
from cloudstackAPI import *
|
||||
|
||||
class cloudstackTestClient(object):
|
||||
def __init__(self, mgtSvr, port=8096, apiKey = None, securityKey = None, asyncTimeout=3600, defaultWorkerThreads=10, logging=None):
|
||||
def __init__(self, mgtSvr=None, port=8096, apiKey = None, securityKey = None, asyncTimeout=3600, defaultWorkerThreads=10, logging=None):
|
||||
self.connection = cloudstackConnection.cloudConnection(mgtSvr, port, apiKey, securityKey, asyncTimeout, logging)
|
||||
self.apiClient = cloudstackAPIClient.CloudStackAPIClient(self.connection)
|
||||
self.dbConnection = None
|
||||
@ -15,7 +15,7 @@ class cloudstackTestClient(object):
|
||||
|
||||
def dbConfigure(self, host="localhost", port=3306, user='cloud', passwd='cloud', db='cloud'):
|
||||
self.dbConnection = dbConnection.dbConnection(host, port, user, passwd, db)
|
||||
self.asyncJobMgr = asyncJobMgr.asyncJobMgr(self.apiClient, self.dbConnection)
|
||||
|
||||
|
||||
def getDbConnection(self):
|
||||
return self.dbConnection
|
||||
@ -36,10 +36,10 @@ class cloudstackTestClient(object):
|
||||
|
||||
def submitCmdsAndWait(self, cmds):
|
||||
if self.asyncJobMgr is None:
|
||||
return None
|
||||
self.asyncJobMgr = asyncJobMgr.asyncJobMgr(self.apiClient, self.dbConnection)
|
||||
return self.asyncJobMgr.submitCmdsAndWait(cmds)
|
||||
|
||||
def submitJobs(self, job, ntimes=1, nums_threads=10, interval=1):
|
||||
if self.asyncJobMgr is None:
|
||||
return None
|
||||
self.asyncJobMgr = asyncJobMgr.asyncJobMgr(self.apiClient, self.dbConnection)
|
||||
self.asyncJobMgr.submitJobs(job, ntimes, nums_threads, interval)
|
||||
@ -1,8 +1,6 @@
|
||||
import unittest
|
||||
import random
|
||||
from cloudstackAPI import *
|
||||
from cloudstackTestCase import *
|
||||
|
||||
class TestCase1(unittest.case.TestCase):
|
||||
class TestCase1(cloudstackTestCase):
|
||||
|
||||
def test_cloudstackapi(self):
|
||||
apiClient = self.testClient.getApiClient()
|
||||
|
||||
@ -1,7 +1,5 @@
|
||||
import unittest
|
||||
import random
|
||||
from cloudstackAPI import *
|
||||
class TestCase2(unittest.case.TestCase):
|
||||
from cloudstackTestCase import *
|
||||
class TestCase2(cloudstackTestCase):
|
||||
|
||||
def test_cloudstackapi1(self):
|
||||
apiClient = self.testClient.getApiClient()
|
||||
|
||||
49
tools/testClient/testcase/test_3.py
Normal file
49
tools/testClient/testcase/test_3.py
Normal file
@ -0,0 +1,49 @@
|
||||
from cloudstackTestCase import *
|
||||
import uuid
|
||||
class TestCase1(cloudstackTestCase):
|
||||
def setUp(self):
|
||||
'''get a small service offering id'''
|
||||
listsocmd = listServiceOfferings.listServiceOfferingsCmd()
|
||||
listsocmd.name = "Small Instance"
|
||||
listsocmd.issystem = "false"
|
||||
sos = self.testClient.getApiClient().listServiceOfferings(listsocmd)
|
||||
if sos is not None and len(sos) > 0:
|
||||
self.svid = sos[0].id
|
||||
listdiskovcmd = listDiskOfferings.listDiskOfferingsCmd()
|
||||
listdiskovcmd.name = "Small"
|
||||
disoov = self.testClient.getApiClient().listDiskOfferings(listdiskovcmd)
|
||||
if disoov is not None and len(disoov) > 0:
|
||||
self.diskov = disoov[0].id
|
||||
|
||||
'''get user vm template id'''
|
||||
listtmplcmd = listTemplates.listTemplatesCmd()
|
||||
listtmplcmd.templatefilter = "featured"
|
||||
tmpls = self.testClient.getApiClient().listTemplates(listtmplcmd)
|
||||
if tmpls is not None:
|
||||
for tmpl in tmpls:
|
||||
if tmpl.isready == "true":
|
||||
self.templateId = tmpl.id
|
||||
self.zoneId = tmpl.zoneid
|
||||
break
|
||||
|
||||
def test_cloudstackapi(self):
|
||||
apiClient = self.testClient.getApiClient()
|
||||
|
||||
createvm = deployVirtualMachine.deployVirtualMachineCmd()
|
||||
createvm.serviceofferingid = self.svid
|
||||
createvm.templateid = self.templateId
|
||||
createvm.zoneid = self.zoneId
|
||||
vm = apiClient.deployVirtualMachine(createvm)
|
||||
vmId = vm.id
|
||||
|
||||
creatvolume = createVolume.createVolumeCmd()
|
||||
creatvolume.name = "tetst" + str(uuid.uuid4())
|
||||
creatvolume.diskofferingid = self.diskov
|
||||
creatvolume.zoneid = self.zoneId
|
||||
createvolumeresponse = apiClient.createVolume(creatvolume)
|
||||
volumeId = createvolumeresponse.id
|
||||
attach = attachVolume.attachVolumeCmd()
|
||||
|
||||
attach.id = volumeId
|
||||
attach.virtualmachineid = vmId
|
||||
apiClient.attachVolume(attach)
|
||||
Loading…
x
Reference in New Issue
Block a user