TestClient: use json instead of painful XML

This commit is contained in:
Edison Su 2011-08-25 18:56:05 -07:00
parent 8d3d3272ab
commit f8aaf1a126
7 changed files with 133 additions and 250 deletions

View File

@ -4,6 +4,7 @@ import time
import Queue
import copy
import sys
import jsonHelper
class job(object):
def __init__(self):
@ -17,51 +18,70 @@ class jobStatus(object):
self.endTime = None
self.duration = None
self.jobId = None
self.responsecls = None
class workThread(threading.Thread):
def __init__(self, in_queue, out_dict, apiClient, db=None):
def __init__(self, in_queue, outqueue, apiClient, db=None):
threading.Thread.__init__(self)
self.inqueue = in_queue
self.output = out_dict
self.output = outqueue
self.connection = copy.copy(apiClient.connection)
self.db = None
def queryAsynJob(self, job):
if job.jobId is None:
return job
try:
result = self.connection.pollAsyncJob(job.jobId, job.responsecls).jobresult
except cloudstackException.cloudstackAPIException, e:
result = str(e)
job.result = result
return job
def executeCmd(self, job):
cmd = job.cmd
jobstatus = jobStatus()
jobId = None
try:
if not cmd.isAsync:
jobstatus.startTime = time.time()
result = self.connection.make_request(cmd)
jobstatus.result = result
jobstatus.endTime = time.time()
else:
result = self.connection.make_request(cmd, None, True)
if result is None:
jobstatus.status = False
else:
jobId = result.jobid
jobstatus.jobId = jobId
try:
responseName = cmd.__class__.__name__.replace("Cmd", "Response")
jobstatus.responsecls = jsonHelper.getclassFromName(cmd, responseName)
except:
pass
jobstatus.status = True
except cloudstackException.cloudstackAPIException, e:
jobstatus.result = str(e)
jobstatus.status = False
except:
jobstatus.status = False
jobstatus.result = sys.exc_info()
return jobstatus
def run(self):
while self.inqueue.qsize() > 0:
job = self.inqueue.get()
cmd = job.cmd
cmdName = cmd.__class__.__name__
responseName = cmdName.replace("Cmd", "Response")
responseInstance = self.connection.getclassFromName(cmd, responseName)
jobstatus = jobStatus()
jobId = None
try:
if not cmd.isAsync:
jobstatus.startTime = time.time()
result = self.connection.make_request(cmd, responseInstance)
jobstatus.result = result
jobstatus.endTime = time.time()
else:
result = self.connection.make_request(cmd, responseInstance, True)
jobId = self.connection.getAsyncJobId(responseInstance, result)
result = self.connection.pollAsyncJob(cmd, responseInstance, jobId)
jobstatus.result = result
jobstatus.jobId = jobId
if isinstance(job, jobStatus):
jobstatus = self.queryAsynJob(job)
else:
jobstatus = self.executeCmd(job)
jobstatus.status = True
except cloudstackException.cloudstackAPIException, e:
jobstatus.result = str(e)
jobstatus.status = False
except:
jobstatus.status = False
jobstatus.result = sys.exc_info()
#print job.id
self.output.lock.acquire()
self.output.dict[job.id] = jobstatus
self.output.lock.release()
self.output.put(jobstatus)
self.inqueue.task_done()
'''release the resource'''
self.connection.close()
@ -92,6 +112,7 @@ class asyncJobMgr(object):
def __init__(self, apiClient, db):
self.inqueue = Queue.Queue()
self.output = outputDict()
self.outqueue = Queue.Queue()
self.apiClient = apiClient
self.db = db
@ -109,34 +130,48 @@ class asyncJobMgr(object):
ids.append(id)
return ids
def waitForComplete(self):
self.inqueue.join()
for k,jobstatus in self.output.dict.iteritems():
jobId = jobstatus.jobId
if jobId is not None and self.db is not None:
result = self.db.execute("select job_status, created, last_updated from async_job where id=%s"%jobId)
if result is not None and len(result) > 0:
if result[0][0] == 1:
jobstatus.status = True
else:
jobstatus.status = False
def updateTimeStamp(self, jobstatus):
jobId = jobstatus.jobId
if jobId is not None and self.db is not None:
result = self.db.execute("select job_status, created, last_updated from async_job where id=%s"%jobId)
if result is not None and len(result) > 0:
if result[0][0] == 1:
jobstatus.status = True
else:
jobstatus.status = False
jobstatus.startTime = result[0][1]
jobstatus.endTime = result[0][2]
delta = jobstatus.endTime - jobstatus.startTime
jobstatus.duration = delta.total_seconds()
def waitForComplete(self, workers=10):
self.inqueue.join()
return self.output.dict
resultQueue = Queue.Queue()
'''intermediate result is stored in self.outqueue'''
for i in range(workers):
worker = workThread(self.outqueue, resultQueue, self.apiClient, self.db)
worker.start()
self.outqueue.join()
asyncJobResult = []
while resultQueue.qsize() > 0:
jobstatus = resultQueue.get()
self.updateTimeStamp(jobstatus)
asyncJobResult.append(jobstatus)
return asyncJobResult
'''put commands into a queue at first, then start workers numbers threads to execute this commands'''
def submitCmdsAndWait(self, cmds, workers=10):
self.submitCmds(cmds)
for i in range(workers):
worker = workThread(self.inqueue, self.output, self.apiClient, self.db)
worker = workThread(self.inqueue, self.outqueue, self.apiClient, self.db)
worker.start()
return self.waitForComplete()
return self.waitForComplete(workers)
'''submit one job and execute the same job ntimes, with nums_threads of threads'''
def submitJobExecuteNtimes(self, job, ntimes=1, nums_threads=1, interval=1):

View File

@ -10,6 +10,7 @@ import time
import inspect
import cloudstackException
from cloudstackAPI import *
import jsonHelper
class cloudConnection(object):
def __init__(self, mgtSvr, port=8096, apiKey = None, securityKey = None, asyncTimeout=3600, logging=None):
@ -38,7 +39,7 @@ class cloudConnection(object):
def make_request_with_auth(self, command, requests={}):
requests["command"] = command
requests["apiKey"] = self.apiKey
requests["response"] = "xml"
requests["response"] = "json"
requests = zip(requests.keys(), requests.values())
requests.sort(key=lambda x: str.lower(x[0]))
@ -54,148 +55,30 @@ class cloudConnection(object):
def make_request_without_auth(self, command, requests={}):
requests["command"] = command
requests["response"] = "xml"
requests["response"] = "json"
requests = zip(requests.keys(), requests.values())
requestUrl = "&".join(["=".join([request[0], urllib.quote_plus(str(request[1]))]) for request in requests])
self.connection.request("GET", "/&%s"%requestUrl)
return self.connection.getresponse().read()
def getText(self, elements):
if len(elements) < 1:
return None
if not elements[0].hasChildNodes():
return None
if elements[0].childNodes[0].nodeValue is None:
return None
return elements[0].childNodes[0].nodeValue.strip()
def getclassFromName(self, cmd, name):
module = inspect.getmodule(cmd)
return getattr(module, name)()
def parseOneInstance(self, element, instance):
ItemsNeedToCheck = {}
for attribute in dir(instance):
if attribute != "__doc__" and attribute != "__init__" and attribute != "__module__":
ItemsNeedToCheck[attribute] = getattr(instance, attribute)
for attribute, value in ItemsNeedToCheck.items():
if type(value) == types.ListType:
subItem = []
for subElement in element.getElementsByTagName(attribute):
newInstance = self.getclassFromName(instance, attribute)
self.parseOneInstance(subElement, newInstance)
subItem.append(newInstance)
setattr(instance, attribute, subItem)
continue
else:
item = element.getElementsByTagName(attribute)
if len(item) == 0:
continue
returnValue = self.getText(item)
setattr(instance, attribute, returnValue)
def hasErrorCode(self, elements, responseName):
errorCode = elements[0].getElementsByTagName("errorcode")
if len(errorCode) > 0:
erroCodeText = self.getText(errorCode)
errorText = elements[0].getElementsByTagName("errortext")
if len(errorText) > 0:
errorText = self.getText(errorText)
errMsg = "errorCode: %s, errorText:%s"%(erroCodeText, errorText)
raise cloudstackException.cloudstackAPIException(responseName, errMsg)
def paraseReturnXML(self, result, response):
responseName = response.__class__.__name__.lower()
cls = response.__class__
responseLists = []
morethanOne = False
dom = xml.dom.minidom.parseString(result)
elements = dom.getElementsByTagName(responseName)
if len(elements) == 0:
return responseLists
self.hasErrorCode(elements, responseName)
count = elements[0].getElementsByTagName("count")
if len(count) > 0:
morethanOne = True
for node in elements[0].childNodes:
if node.nodeName == "count":
continue
newInstance = cls()
self.parseOneInstance(node, newInstance)
responseLists.append(newInstance)
else:
if elements[0].hasChildNodes():
newInstance = cls()
self.parseOneInstance(elements[0], newInstance)
responseLists.append(newInstance)
return responseLists, morethanOne
def paraseResultFromElement(self, elements, response):
responseName = response.__class__.__name__.lower()
cls = response.__class__
responseLists = []
morethanOne = False
newInstance = cls()
self.parseOneInstance(elements[0], newInstance)
responseLists.append(newInstance)
return responseLists, morethanOne
def getAsyncJobId(self, response, resultXml):
responseName = response.__class__.__name__.lower()
dom = xml.dom.minidom.parseString(resultXml)
elements = dom.getElementsByTagName(responseName)
if len(elements) == 0:
raise cloudstackException.cloudstackAPIException("can't find %s"%responseName)
self.hasErrorCode(elements, responseName)
jobIdEle = elements[0].getElementsByTagName("jobid")
if len(jobIdEle) == 0:
errMsg = 'can not find jobId in the result:%s'%resultXml
raise cloudstackException.cloudstackAPIException(errMsg)
jobId = self.getText(jobIdEle)
return jobId
def pollAsyncJob(self, cmd, response, jobId):
commandName = cmd.__class__.__name__.replace("Cmd", "")
def pollAsyncJob(self, jobId, response):
cmd = queryAsyncJobResult.queryAsyncJobResultCmd()
cmd.jobid = jobId
while self.asyncTimeout > 0:
asyncResponse = queryAsyncJobResult.queryAsyncJobResultResponse()
responseName = asyncResponse.__class__.__name__.lower()
asyncResponseXml = self.make_request(cmd, asyncResponse, True)
dom = xml.dom.minidom.parseString(asyncResponseXml)
elements = dom.getElementsByTagName(responseName)
if len(elements) == 0:
raise cloudstackException.cloudstackAPIException("can't find %s"%responseName)
self.hasErrorCode(elements, responseName)
asyncResonse = self.make_request(cmd, response, True)
jobstatus = self.getText(elements[0].getElementsByTagName("jobstatus"))
if asyncResonse.jobstatus == 2:
raise cloudstackException.cloudstackAPIException("asyncquery", asyncResonse.jobresult)
elif asyncResonse.jobstatus == 1:
return asyncResonse
if jobstatus == "2":
jobResult = self.getText(elements[0].getElementsByTagName("jobresult"))
raise cloudstackException.cloudstackAPIException(commandName, jobResult)
elif jobstatus == "1":
jobResultEle = elements[0].getElementsByTagName("jobresult")
return self.paraseResultFromElement(jobResultEle, response)
time.sleep(5)
self.asyncTimeout = self.asyncTimeout - 5
raise cloudstackException.cloudstackAPIException(commandName, "Async job timeout")
def make_request(self, cmd, response, raw=False):
raise cloudstackException.cloudstackAPIException("asyncquery", "Async job timeout")
def make_request(self, cmd, response = None, raw=False):
commandName = cmd.__class__.__name__.replace("Cmd", "")
isAsync = "false"
requests = {}
@ -243,19 +126,13 @@ class cloudConnection(object):
if result is None:
return None
if raw:
return result
if isAsync == "false":
result,num = self.paraseReturnXML(result, response)
else:
jobId = self.getAsyncJobId(response, result)
result,num = self.pollAsyncJob(cmd, response, jobId)
if num:
result = jsonHelper.getResultObj(result, response)
if raw or isAsync == "false":
return result
else:
if len(result) != 0:
return result[0]
return None
asynJobId = result.jobid
result = self.pollAsyncJob(asynJobId, response)
return result.jobresult
if __name__ == '__main__':
xml = '<?xml version="1.0" encoding="ISO-8859-1"?><deployVirtualMachineResponse><virtualmachine><id>407</id><name>i-1-407-RS3</name><displayname>i-1-407-RS3</displayname><account>system</account><domainid>1</domainid><domain>ROOT</domain><created>2011-07-30T14:45:19-0700</created><state>Running</state><haenable>false</haenable><zoneid>1</zoneid><zonename>CA1</zonename><hostid>3</hostid><hostname>kvm-50-205</hostname><templateid>4</templateid><templatename>CentOS 5.5(64-bit) no GUI (KVM)</templatename><templatedisplaytext>CentOS 5.5(64-bit) no GUI (KVM)</templatedisplaytext><passwordenabled>false</passwordenabled><serviceofferingid>1</serviceofferingid><serviceofferingname>Small Instance</serviceofferingname><cpunumber>1</cpunumber><cpuspeed>500</cpuspeed><memory>512</memory><guestosid>112</guestosid><rootdeviceid>0</rootdeviceid><rootdevicetype>NetworkFilesystem</rootdevicetype><nic><id>380</id><networkid>203</networkid><netmask>255.255.255.0</netmask><gateway>65.19.181.1</gateway><ipaddress>65.19.181.110</ipaddress><isolationuri>vlan://65</isolationuri><broadcasturi>vlan://65</broadcasturi><traffictype>Guest</traffictype><type>Direct</type><isdefault>true</isdefault><macaddress>06:52:da:00:00:08</macaddress></nic><hypervisor>KVM</hypervisor></virtualmachine></deployVirtualMachineResponse>'

View File

@ -39,7 +39,8 @@ class cloudstackTestClient(object):
def getApiClient(self):
return self.apiClient
def submitCmdsAndWait(self, cmds, workers=10):
'''FixME, httplib has issue if more than one thread submitted'''
def submitCmdsAndWait(self, cmds, workers=1):
if self.asyncJobMgr is None:
self.asyncJobMgr = asyncJobMgr.asyncJobMgr(self.apiClient, self.dbConnection)
return self.asyncJobMgr.submitCmdsAndWait(cmds, workers)

View File

@ -1,57 +1,7 @@
import json
import os
from optparse import OptionParser
class Struct:
'''The recursive class for building and representing objects with.'''
def __init__(self, obj):
for k in obj:
v = obj[k]
if isinstance(v, dict):
setattr(self, k, Struct(v))
elif isinstance(v, (list, tuple)):
setattr(self, k, [Struct(elem) for elem in v])
else:
setattr(self,k,v)
def __getattr__(self, val):
if val in self.__dict__:
return self.__dict__[val]
else:
return None
def __repr__(self):
return '{%s}' % str(', '.join('%s : %s' % (k, repr(v)) for (k, v) in self.__dict__.iteritems()))
def json_repr(obj):
"""Represent instance of a class as JSON.
Arguments:
obj -- any object
Return:
String that reprent JSON-encoded object.
"""
def serialize(obj):
"""Recursively walk object's hierarchy."""
if isinstance(obj, (bool, int, long, float, basestring)):
return obj
elif isinstance(obj, dict):
obj = obj.copy()
newobj = {}
for key in obj:
if obj[key] is not None:
if (isinstance(obj[key], list) and len(obj[key]) == 0):
continue
newobj[key] = serialize(obj[key])
return newobj
elif isinstance(obj, list):
return [serialize(item) for item in obj]
elif isinstance(obj, tuple):
return tuple(serialize([item for item in obj]))
elif hasattr(obj, '__dict__'):
return serialize(obj.__dict__)
else:
return repr(obj) # Don't know how to handle, convert to string
return serialize(obj)
import jsonHelper
class managementServer():
def __init__(self):
self.mgtSvrIp = None
@ -199,7 +149,7 @@ def describe_setup_in_basic_mode():
p.name = "test" +str(l) + str(i)
p.gateway = "192.168.%d.1"%i
p.netmask = "255.255.255.0"
p.startip = "192.168.%d.200"%i
p.startip = "192.168.%d.150"%i
p.endip = "192.168.%d.220"%i
'''add two pod guest ip ranges'''
@ -399,10 +349,10 @@ def describe_setup_in_advanced_mode():
def generate_setup_config(config, file=None):
describe = config
if file is None:
return json.dumps(json_repr(describe))
return json.dumps(jsonHelper.jsonDump.dump(describe))
else:
fp = open(file, 'w')
json.dump(json_repr(describe), fp, indent=4)
json.dump(jsonHelper.jsonDump.dump(describe), fp, indent=4)
fp.close()
@ -412,7 +362,7 @@ def get_setup_config(file):
config = cloudstackConfiguration()
fp = open(file, 'r')
config = json.load(fp)
return Struct(config)
return jsonHelper.jsonLoader(config)
if __name__ == "__main__":
parser = OptionParser()

View File

@ -10,7 +10,7 @@ class TestCase1(cloudstackTestCase):
listtmcmd.templatefilter = "featured"
listtmresponse = apiClient.listTemplates(listtmcmd)
if listtmresponse is not None and len(listtmresponse) > 0:
self.debug(listtmresponse[0].isready)
self.debug(listtmresponse)
self.debug("we are here")
else:
self.debug("we are there")

View File

@ -21,7 +21,7 @@ class TestCase1(cloudstackTestCase):
tmpls = self.testClient.getApiClient().listTemplates(listtmplcmd)
if tmpls is not None:
for tmpl in tmpls:
if tmpl.isready == "true":
if tmpl.isready:
self.templateId = tmpl.id
self.zoneId = tmpl.zoneid
break
@ -52,14 +52,34 @@ class TestCase1(cloudstackTestCase):
'''
cidrlist = ["192.168.1.1/24", "10.1.1.1/24"]
securitygroup.cidrlist = cidrlist
apiClient.authorizeSecurityGroupIngress(securitygroup)
try:
apiClient.authorizeSecurityGroupIngress(securitygroup)
except:
pass
'''
createvm = deployVirtualMachine.deployVirtualMachineCmd()
createvm.serviceofferingid = self.svid
createvm.templateid = self.templateId
createvm.zoneid = self.zoneId
vm = apiClient.deployVirtualMachine(createvm)
vmId = vm.id
'''
vmId = 1
vmcmds = []
for i in range(10):
createvm = deployVirtualMachine.deployVirtualMachineCmd()
createvm.serviceofferingid = self.svid
createvm.templateid = self.templateId
createvm.zoneid = self.zoneId
vmcmds.append(createvm)
result = self.testClient.submitCmdsAndWait(vmcmds)
for jobstatus in result:
if jobstatus.status == 1:
self.debug(jobstatus.result.id)
self.debug(jobstatus.result.displayname)
else:
self.debug(jobstatus.result)
creatvolume = createVolume.createVolumeCmd()
creatvolume.name = "tetst" + str(uuid.uuid4())

View File

@ -49,7 +49,7 @@ if __name__ == "__main__":
asyncJobResult = testclient.submitCmdsAndWait(cmds, 6)
for handle, jobStatus in asyncJobResult.iteritems():
for jobStatus in asyncJobResult:
if jobStatus.status:
print jobStatus.result[0].id, jobStatus.result[0].templatename, jobStatus.startTime, jobStatus.endTime
else: