diff --git a/tools/testClient/asyncJobMgr.py b/tools/testClient/asyncJobMgr.py index 5487ba32775..5991e0f04c1 100644 --- a/tools/testClient/asyncJobMgr.py +++ b/tools/testClient/asyncJobMgr.py @@ -27,7 +27,7 @@ class workThread(threading.Thread): self.db = copy.copy(db) def run(self): - while True: + while self.inqueue.qsize() > 0: job = self.inqueue.get() cmd = job.cmd cmdName = cmd.__class__.__name__ @@ -68,6 +68,11 @@ class workThread(threading.Thread): self.output.dict[job.id] = jobstatus self.output.lock.release() self.inqueue.task_done() + + '''release the resource''' + self.connection.close() + if self.db is not None: + self.db.close() class jobThread(threading.Thread): def __init__(self, inqueue, interval): @@ -75,33 +80,29 @@ class jobThread(threading.Thread): self.inqueue = inqueue self.interval = interval def run(self): - while True: - + while self.inqueue.qsize() > 0: job = self.inqueue.get() try: job.run() + '''release the api connection''' + job.apiClient.connection.close() except: pass + self.inqueue.task_done() time.sleep(self.interval) - + class outputDict(object): def __init__(self): self.lock = threading.Condition() self.dict = {} class asyncJobMgr(object): - def __init__(self, apiClient, db, workers=10): + def __init__(self, apiClient, db): self.inqueue = Queue.Queue() self.output = outputDict() self.apiClient = apiClient self.db = db - self.workers = workers - - for i in range(self.workers): - worker = workThread(self.inqueue, self.output, self.apiClient, self.db) - worker.setDaemon(True) - worker.start() def submitCmds(self, cmds): if not self.inqueue.empty(): @@ -121,11 +122,18 @@ class asyncJobMgr(object): self.inqueue.join() return self.output.dict - def submitCmdsAndWait(self, cmds): + '''put commands into a queue at first, then start workers numbers threads to execute this commands''' + def submitCmdsAndWait(self, cmds, workers=10): self.submitCmds(cmds) + + for i in range(workers): + worker = workThread(self.inqueue, self.output, self.apiClient, self.db) + worker.start() + return self.waitForComplete() - def submitJobs(self, job, ntimes=1, nums_threads=1, interval=1): + '''submit one job and execute the same job ntimes, with nums_threads of threads''' + def submitJobExecuteNtimes(self, job, ntimes=1, nums_threads=1, interval=1): inqueue1 = Queue.Queue() lock = threading.Condition() for i in range(ntimes): @@ -136,6 +144,20 @@ class asyncJobMgr(object): for i in range(nums_threads): work = jobThread(inqueue1, interval) - work.setDaemon(True) + work.start() + inqueue1.join() + + '''submit n jobs, execute them with nums_threads of threads''' + def submitJobs(self, jobs, nums_threads=1, interval=1): + inqueue1 = Queue.Queue() + lock = threading.Condition() + + for job in jobs: + setattr(job, "apiClient", copy.copy(self.apiClient)) + setattr(job, "lock", lock) + inqueue1.put(job) + + for i in range(nums_threads): + work = jobThread(inqueue1, interval) work.start() inqueue1.join() \ No newline at end of file diff --git a/tools/testClient/cloudstackConnection.py b/tools/testClient/cloudstackConnection.py index 4ab5bd9dcc4..1d9c3074c7c 100644 --- a/tools/testClient/cloudstackConnection.py +++ b/tools/testClient/cloudstackConnection.py @@ -25,7 +25,13 @@ class cloudConnection(object): self.auth = True self.asyncTimeout = asyncTimeout - + + def close(self): + try: + self.connection.close() + except: + pass + def __copy__(self): return cloudConnection(self.mgtSvr, self.port, self.apiKey, self.securityKey, self.asyncTimeout, self.logging) diff --git a/tools/testClient/cloudstackTestClient.py b/tools/testClient/cloudstackTestClient.py index 28aef44b4bd..5a843eb40ad 100644 --- a/tools/testClient/cloudstackTestClient.py +++ b/tools/testClient/cloudstackTestClient.py @@ -39,7 +39,14 @@ class cloudstackTestClient(object): self.asyncJobMgr = asyncJobMgr.asyncJobMgr(self.apiClient, self.dbConnection) return self.asyncJobMgr.submitCmdsAndWait(cmds) - def submitJobs(self, job, ntimes=1, nums_threads=10, interval=1): + '''submit one job and execute the same job ntimes, with nums_threads of threads''' + def submitJob(self, job, ntimes=1, nums_threads=10, interval=1): if self.asyncJobMgr is None: self.asyncJobMgr = asyncJobMgr.asyncJobMgr(self.apiClient, self.dbConnection) - self.asyncJobMgr.submitJobs(job, ntimes, nums_threads, interval) \ No newline at end of file + self.asyncJobMgr.submitJobExecuteNtimes(job, ntimes, nums_threads, interval) + + '''submit n jobs, execute them with nums_threads of threads''' + def submitJobs(self, jobs, nums_threads=10, interval=1): + if self.asyncJobMgr is None: + self.asyncJobMgr = asyncJobMgr.asyncJobMgr(self.apiClient, self.dbConnection) + self.asyncJobMgr.submitJobs(jobs, nums_threads, interval) \ No newline at end of file diff --git a/tools/testClient/dbConnection.py b/tools/testClient/dbConnection.py index 18603ccc8c8..326a03524fa 100644 --- a/tools/testClient/dbConnection.py +++ b/tools/testClient/dbConnection.py @@ -18,6 +18,12 @@ class dbConnection(object): def __copy__(self): return dbConnection(self.host, self.port, self.user, self.passwd, self.database) + def close(self): + try: + self.db.close() + except: + pass + def execute(self, sql=None): if sql is None: return None diff --git a/tools/testClient/unitTest/test_async.py b/tools/testClient/unitTest/test_async.py index 96018f6bc3e..a8d825662d5 100644 --- a/tools/testClient/unitTest/test_async.py +++ b/tools/testClient/unitTest/test_async.py @@ -30,7 +30,13 @@ if __name__ == "__main__": testclient.dbConfigure() api = testclient.getApiClient() - #testclient.submitJobs(jobs(1), 10, 10, 1) + testclient.submitJob(jobs(1), 10, 10, 1) + + js = [] + for i in range(10): + js.append(jobs(1)) + + testclient.submitJobs(js, 10, 1) cmds = [] for i in range(20): @@ -46,4 +52,4 @@ if __name__ == "__main__": else: print jobStatus.result, jobStatus.startTime, jobStatus.endTime - print jobStatus.duration \ No newline at end of file + print jobStatus.duration