diff --git a/tools/testClient/asyncJobMgr.py b/tools/testClient/asyncJobMgr.py index 3afc2c1e76e..8800b17a124 100644 --- a/tools/testClient/asyncJobMgr.py +++ b/tools/testClient/asyncJobMgr.py @@ -4,6 +4,7 @@ import time import Queue import copy import sys +import jsonHelper class job(object): def __init__(self): @@ -17,51 +18,70 @@ class jobStatus(object): self.endTime = None self.duration = None self.jobId = None + self.responsecls = None class workThread(threading.Thread): - def __init__(self, in_queue, out_dict, apiClient, db=None): + def __init__(self, in_queue, outqueue, apiClient, db=None): threading.Thread.__init__(self) self.inqueue = in_queue - self.output = out_dict + self.output = outqueue self.connection = copy.copy(apiClient.connection) self.db = None + def queryAsynJob(self, job): + if job.jobId is None: + return job + + try: + result = self.connection.pollAsyncJob(job.jobId, job.responsecls).jobresult + except cloudstackException.cloudstackAPIException, e: + result = str(e) + + job.result = result + return job + + def executeCmd(self, job): + cmd = job.cmd + + jobstatus = jobStatus() + jobId = None + try: + if not cmd.isAsync: + jobstatus.startTime = time.time() + result = self.connection.make_request(cmd) + jobstatus.result = result + jobstatus.endTime = time.time() + else: + result = self.connection.make_request(cmd, None, True) + if result is None: + jobstatus.status = False + else: + jobId = result.jobid + jobstatus.jobId = jobId + try: + responseName = cmd.__class__.__name__.replace("Cmd", "Response") + jobstatus.responsecls = jsonHelper.getclassFromName(cmd, responseName) + except: + pass + jobstatus.status = True + except cloudstackException.cloudstackAPIException, e: + jobstatus.result = str(e) + jobstatus.status = False + except: + jobstatus.status = False + jobstatus.result = sys.exc_info() + return jobstatus + def run(self): while self.inqueue.qsize() > 0: job = self.inqueue.get() - cmd = job.cmd - cmdName = cmd.__class__.__name__ - responseName = cmdName.replace("Cmd", "Response") - responseInstance = self.connection.getclassFromName(cmd, responseName) - jobstatus = jobStatus() - jobId = None - try: - if not cmd.isAsync: - jobstatus.startTime = time.time() - result = self.connection.make_request(cmd, responseInstance) - jobstatus.result = result - jobstatus.endTime = time.time() - else: - result = self.connection.make_request(cmd, responseInstance, True) - jobId = self.connection.getAsyncJobId(responseInstance, result) - result = self.connection.pollAsyncJob(cmd, responseInstance, jobId) - jobstatus.result = result - jobstatus.jobId = jobId + if isinstance(job, jobStatus): + jobstatus = self.queryAsynJob(job) + else: + jobstatus = self.executeCmd(job) - jobstatus.status = True - except cloudstackException.cloudstackAPIException, e: - jobstatus.result = str(e) - jobstatus.status = False - except: - jobstatus.status = False - jobstatus.result = sys.exc_info() - - #print job.id - self.output.lock.acquire() - self.output.dict[job.id] = jobstatus - self.output.lock.release() + self.output.put(jobstatus) self.inqueue.task_done() - '''release the resource''' self.connection.close() @@ -92,6 +112,7 @@ class asyncJobMgr(object): def __init__(self, apiClient, db): self.inqueue = Queue.Queue() self.output = outputDict() + self.outqueue = Queue.Queue() self.apiClient = apiClient self.db = db @@ -109,34 +130,48 @@ class asyncJobMgr(object): ids.append(id) return ids - def waitForComplete(self): - self.inqueue.join() - - for k,jobstatus in self.output.dict.iteritems(): - jobId = jobstatus.jobId - if jobId is not None and self.db is not None: - result = self.db.execute("select job_status, created, last_updated from async_job where id=%s"%jobId) - if result is not None and len(result) > 0: - if result[0][0] == 1: - jobstatus.status = True - else: - jobstatus.status = False + def updateTimeStamp(self, jobstatus): + jobId = jobstatus.jobId + if jobId is not None and self.db is not None: + result = self.db.execute("select job_status, created, last_updated from async_job where id=%s"%jobId) + if result is not None and len(result) > 0: + if result[0][0] == 1: + jobstatus.status = True + else: + jobstatus.status = False jobstatus.startTime = result[0][1] jobstatus.endTime = result[0][2] delta = jobstatus.endTime - jobstatus.startTime jobstatus.duration = delta.total_seconds() + + def waitForComplete(self, workers=10): + self.inqueue.join() - return self.output.dict + resultQueue = Queue.Queue() + '''intermediate result is stored in self.outqueue''' + for i in range(workers): + worker = workThread(self.outqueue, resultQueue, self.apiClient, self.db) + worker.start() + + self.outqueue.join() + + asyncJobResult = [] + while resultQueue.qsize() > 0: + jobstatus = resultQueue.get() + self.updateTimeStamp(jobstatus) + asyncJobResult.append(jobstatus) + + return asyncJobResult '''put commands into a queue at first, then start workers numbers threads to execute this commands''' def submitCmdsAndWait(self, cmds, workers=10): self.submitCmds(cmds) for i in range(workers): - worker = workThread(self.inqueue, self.output, self.apiClient, self.db) + worker = workThread(self.inqueue, self.outqueue, self.apiClient, self.db) worker.start() - return self.waitForComplete() + return self.waitForComplete(workers) '''submit one job and execute the same job ntimes, with nums_threads of threads''' def submitJobExecuteNtimes(self, job, ntimes=1, nums_threads=1, interval=1): diff --git a/tools/testClient/cloudstackConnection.py b/tools/testClient/cloudstackConnection.py index e0a6a22519d..fa42c5bc7ba 100644 --- a/tools/testClient/cloudstackConnection.py +++ b/tools/testClient/cloudstackConnection.py @@ -10,6 +10,7 @@ import time import inspect import cloudstackException from cloudstackAPI import * +import jsonHelper class cloudConnection(object): def __init__(self, mgtSvr, port=8096, apiKey = None, securityKey = None, asyncTimeout=3600, logging=None): @@ -38,7 +39,7 @@ class cloudConnection(object): def make_request_with_auth(self, command, requests={}): requests["command"] = command requests["apiKey"] = self.apiKey - requests["response"] = "xml" + requests["response"] = "json" requests = zip(requests.keys(), requests.values()) requests.sort(key=lambda x: str.lower(x[0])) @@ -54,148 +55,30 @@ class cloudConnection(object): def make_request_without_auth(self, command, requests={}): requests["command"] = command - requests["response"] = "xml" + requests["response"] = "json" requests = zip(requests.keys(), requests.values()) requestUrl = "&".join(["=".join([request[0], urllib.quote_plus(str(request[1]))]) for request in requests]) self.connection.request("GET", "/&%s"%requestUrl) return self.connection.getresponse().read() - def getText(self, elements): - if len(elements) < 1: - return None - if not elements[0].hasChildNodes(): - return None - if elements[0].childNodes[0].nodeValue is None: - return None - return elements[0].childNodes[0].nodeValue.strip() - - def getclassFromName(self, cmd, name): - module = inspect.getmodule(cmd) - return getattr(module, name)() - def parseOneInstance(self, element, instance): - ItemsNeedToCheck = {} - for attribute in dir(instance): - if attribute != "__doc__" and attribute != "__init__" and attribute != "__module__": - ItemsNeedToCheck[attribute] = getattr(instance, attribute) - for attribute, value in ItemsNeedToCheck.items(): - if type(value) == types.ListType: - subItem = [] - for subElement in element.getElementsByTagName(attribute): - newInstance = self.getclassFromName(instance, attribute) - self.parseOneInstance(subElement, newInstance) - subItem.append(newInstance) - setattr(instance, attribute, subItem) - continue - else: - item = element.getElementsByTagName(attribute) - if len(item) == 0: - continue - - returnValue = self.getText(item) - setattr(instance, attribute, returnValue) - - def hasErrorCode(self, elements, responseName): - errorCode = elements[0].getElementsByTagName("errorcode") - if len(errorCode) > 0: - erroCodeText = self.getText(errorCode) - errorText = elements[0].getElementsByTagName("errortext") - if len(errorText) > 0: - errorText = self.getText(errorText) - errMsg = "errorCode: %s, errorText:%s"%(erroCodeText, errorText) - raise cloudstackException.cloudstackAPIException(responseName, errMsg) - - def paraseReturnXML(self, result, response): - responseName = response.__class__.__name__.lower() - cls = response.__class__ - - responseLists = [] - morethanOne = False - - dom = xml.dom.minidom.parseString(result) - elements = dom.getElementsByTagName(responseName) - if len(elements) == 0: - return responseLists - - self.hasErrorCode(elements, responseName) - - count = elements[0].getElementsByTagName("count") - if len(count) > 0: - morethanOne = True - for node in elements[0].childNodes: - if node.nodeName == "count": - continue - newInstance = cls() - self.parseOneInstance(node, newInstance) - responseLists.append(newInstance) - - else: - if elements[0].hasChildNodes(): - newInstance = cls() - self.parseOneInstance(elements[0], newInstance) - responseLists.append(newInstance) - - return responseLists, morethanOne - - def paraseResultFromElement(self, elements, response): - responseName = response.__class__.__name__.lower() - cls = response.__class__ - - responseLists = [] - morethanOne = False - - newInstance = cls() - self.parseOneInstance(elements[0], newInstance) - responseLists.append(newInstance) - - return responseLists, morethanOne - def getAsyncJobId(self, response, resultXml): - responseName = response.__class__.__name__.lower() - dom = xml.dom.minidom.parseString(resultXml) - elements = dom.getElementsByTagName(responseName) - if len(elements) == 0: - raise cloudstackException.cloudstackAPIException("can't find %s"%responseName) - - self.hasErrorCode(elements, responseName) - - jobIdEle = elements[0].getElementsByTagName("jobid") - if len(jobIdEle) == 0: - errMsg = 'can not find jobId in the result:%s'%resultXml - - raise cloudstackException.cloudstackAPIException(errMsg) - - jobId = self.getText(jobIdEle) - return jobId - - def pollAsyncJob(self, cmd, response, jobId): - commandName = cmd.__class__.__name__.replace("Cmd", "") + def pollAsyncJob(self, jobId, response): cmd = queryAsyncJobResult.queryAsyncJobResultCmd() cmd.jobid = jobId while self.asyncTimeout > 0: - asyncResponse = queryAsyncJobResult.queryAsyncJobResultResponse() - responseName = asyncResponse.__class__.__name__.lower() - asyncResponseXml = self.make_request(cmd, asyncResponse, True) - dom = xml.dom.minidom.parseString(asyncResponseXml) - elements = dom.getElementsByTagName(responseName) - if len(elements) == 0: - raise cloudstackException.cloudstackAPIException("can't find %s"%responseName) - - self.hasErrorCode(elements, responseName) + asyncResonse = self.make_request(cmd, response, True) - jobstatus = self.getText(elements[0].getElementsByTagName("jobstatus")) + if asyncResonse.jobstatus == 2: + raise cloudstackException.cloudstackAPIException("asyncquery", asyncResonse.jobresult) + elif asyncResonse.jobstatus == 1: + return asyncResonse - if jobstatus == "2": - jobResult = self.getText(elements[0].getElementsByTagName("jobresult")) - raise cloudstackException.cloudstackAPIException(commandName, jobResult) - elif jobstatus == "1": - jobResultEle = elements[0].getElementsByTagName("jobresult") - - return self.paraseResultFromElement(jobResultEle, response) - time.sleep(5) + self.asyncTimeout = self.asyncTimeout - 5 - raise cloudstackException.cloudstackAPIException(commandName, "Async job timeout") - def make_request(self, cmd, response, raw=False): + raise cloudstackException.cloudstackAPIException("asyncquery", "Async job timeout") + + def make_request(self, cmd, response = None, raw=False): commandName = cmd.__class__.__name__.replace("Cmd", "") isAsync = "false" requests = {} @@ -243,19 +126,13 @@ class cloudConnection(object): if result is None: return None - if raw: - return result - if isAsync == "false": - result,num = self.paraseReturnXML(result, response) - else: - jobId = self.getAsyncJobId(response, result) - result,num = self.pollAsyncJob(cmd, response, jobId) - if num: + result = jsonHelper.getResultObj(result, response) + if raw or isAsync == "false": return result else: - if len(result) != 0: - return result[0] - return None + asynJobId = result.jobid + result = self.pollAsyncJob(asynJobId, response) + return result.jobresult if __name__ == '__main__': xml = '407i-1-407-RS3i-1-407-RS3system1ROOT2011-07-30T14:45:19-0700Runningfalse1CA13kvm-50-2054CentOS 5.5(64-bit) no GUI (KVM)CentOS 5.5(64-bit) no GUI (KVM)false1Small Instance15005121120NetworkFilesystem380203255.255.255.065.19.181.165.19.181.110vlan://65vlan://65GuestDirecttrue06:52:da:00:00:08KVM' diff --git a/tools/testClient/cloudstackTestClient.py b/tools/testClient/cloudstackTestClient.py index 150e02b9046..0d8f8108ea9 100644 --- a/tools/testClient/cloudstackTestClient.py +++ b/tools/testClient/cloudstackTestClient.py @@ -39,7 +39,8 @@ class cloudstackTestClient(object): def getApiClient(self): return self.apiClient - def submitCmdsAndWait(self, cmds, workers=10): + '''FixME, httplib has issue if more than one thread submitted''' + def submitCmdsAndWait(self, cmds, workers=1): if self.asyncJobMgr is None: self.asyncJobMgr = asyncJobMgr.asyncJobMgr(self.apiClient, self.dbConnection) return self.asyncJobMgr.submitCmdsAndWait(cmds, workers) diff --git a/tools/testClient/configGenerator.py b/tools/testClient/configGenerator.py index 51b58861ab7..10d82aa5671 100644 --- a/tools/testClient/configGenerator.py +++ b/tools/testClient/configGenerator.py @@ -1,57 +1,7 @@ import json import os from optparse import OptionParser -class Struct: - '''The recursive class for building and representing objects with.''' - def __init__(self, obj): - for k in obj: - v = obj[k] - if isinstance(v, dict): - setattr(self, k, Struct(v)) - elif isinstance(v, (list, tuple)): - setattr(self, k, [Struct(elem) for elem in v]) - else: - setattr(self,k,v) - def __getattr__(self, val): - if val in self.__dict__: - return self.__dict__[val] - else: - return None - def __repr__(self): - return '{%s}' % str(', '.join('%s : %s' % (k, repr(v)) for (k, v) in self.__dict__.iteritems())) - - -def json_repr(obj): - """Represent instance of a class as JSON. - Arguments: - obj -- any object - Return: - String that reprent JSON-encoded object. - """ - def serialize(obj): - """Recursively walk object's hierarchy.""" - if isinstance(obj, (bool, int, long, float, basestring)): - return obj - elif isinstance(obj, dict): - obj = obj.copy() - newobj = {} - for key in obj: - if obj[key] is not None: - if (isinstance(obj[key], list) and len(obj[key]) == 0): - continue - newobj[key] = serialize(obj[key]) - - return newobj - elif isinstance(obj, list): - return [serialize(item) for item in obj] - elif isinstance(obj, tuple): - return tuple(serialize([item for item in obj])) - elif hasattr(obj, '__dict__'): - return serialize(obj.__dict__) - else: - return repr(obj) # Don't know how to handle, convert to string - return serialize(obj) - +import jsonHelper class managementServer(): def __init__(self): self.mgtSvrIp = None @@ -199,7 +149,7 @@ def describe_setup_in_basic_mode(): p.name = "test" +str(l) + str(i) p.gateway = "192.168.%d.1"%i p.netmask = "255.255.255.0" - p.startip = "192.168.%d.200"%i + p.startip = "192.168.%d.150"%i p.endip = "192.168.%d.220"%i '''add two pod guest ip ranges''' @@ -399,10 +349,10 @@ def describe_setup_in_advanced_mode(): def generate_setup_config(config, file=None): describe = config if file is None: - return json.dumps(json_repr(describe)) + return json.dumps(jsonHelper.jsonDump.dump(describe)) else: fp = open(file, 'w') - json.dump(json_repr(describe), fp, indent=4) + json.dump(jsonHelper.jsonDump.dump(describe), fp, indent=4) fp.close() @@ -412,7 +362,7 @@ def get_setup_config(file): config = cloudstackConfiguration() fp = open(file, 'r') config = json.load(fp) - return Struct(config) + return jsonHelper.jsonLoader(config) if __name__ == "__main__": parser = OptionParser() diff --git a/tools/testClient/testcase/test_1.py b/tools/testClient/testcase/test_1.py index c1593c1c97b..bba55febc2e 100644 --- a/tools/testClient/testcase/test_1.py +++ b/tools/testClient/testcase/test_1.py @@ -10,7 +10,7 @@ class TestCase1(cloudstackTestCase): listtmcmd.templatefilter = "featured" listtmresponse = apiClient.listTemplates(listtmcmd) if listtmresponse is not None and len(listtmresponse) > 0: - self.debug(listtmresponse[0].isready) + self.debug(listtmresponse) self.debug("we are here") else: self.debug("we are there") diff --git a/tools/testClient/testcase/test_3.py b/tools/testClient/testcase/test_3.py index a41d9ecfad3..6b4b9379033 100644 --- a/tools/testClient/testcase/test_3.py +++ b/tools/testClient/testcase/test_3.py @@ -21,7 +21,7 @@ class TestCase1(cloudstackTestCase): tmpls = self.testClient.getApiClient().listTemplates(listtmplcmd) if tmpls is not None: for tmpl in tmpls: - if tmpl.isready == "true": + if tmpl.isready: self.templateId = tmpl.id self.zoneId = tmpl.zoneid break @@ -52,14 +52,34 @@ class TestCase1(cloudstackTestCase): ''' cidrlist = ["192.168.1.1/24", "10.1.1.1/24"] securitygroup.cidrlist = cidrlist - apiClient.authorizeSecurityGroupIngress(securitygroup) - + try: + apiClient.authorizeSecurityGroupIngress(securitygroup) + except: + pass + ''' createvm = deployVirtualMachine.deployVirtualMachineCmd() createvm.serviceofferingid = self.svid createvm.templateid = self.templateId createvm.zoneid = self.zoneId vm = apiClient.deployVirtualMachine(createvm) vmId = vm.id + ''' + vmId = 1 + vmcmds = [] + for i in range(10): + createvm = deployVirtualMachine.deployVirtualMachineCmd() + createvm.serviceofferingid = self.svid + createvm.templateid = self.templateId + createvm.zoneid = self.zoneId + vmcmds.append(createvm) + + result = self.testClient.submitCmdsAndWait(vmcmds) + for jobstatus in result: + if jobstatus.status == 1: + self.debug(jobstatus.result.id) + self.debug(jobstatus.result.displayname) + else: + self.debug(jobstatus.result) creatvolume = createVolume.createVolumeCmd() creatvolume.name = "tetst" + str(uuid.uuid4()) diff --git a/tools/testClient/unitTest/test_async.py b/tools/testClient/unitTest/test_async.py index acd5b6d75d7..19473ba74e9 100644 --- a/tools/testClient/unitTest/test_async.py +++ b/tools/testClient/unitTest/test_async.py @@ -49,7 +49,7 @@ if __name__ == "__main__": asyncJobResult = testclient.submitCmdsAndWait(cmds, 6) - for handle, jobStatus in asyncJobResult.iteritems(): + for jobStatus in asyncJobResult: if jobStatus.status: print jobStatus.result[0].id, jobStatus.result[0].templatename, jobStatus.startTime, jobStatus.endTime else: