TESTClient: async job threads can exit after job pool is empty

This commit is contained in:
Edison Su 2011-08-23 11:44:22 -07:00
parent 3ed2385582
commit ac826a721e
5 changed files with 66 additions and 19 deletions

View File

@ -27,7 +27,7 @@ class workThread(threading.Thread):
self.db = copy.copy(db)
def run(self):
while True:
while self.inqueue.qsize() > 0:
job = self.inqueue.get()
cmd = job.cmd
cmdName = cmd.__class__.__name__
@ -68,6 +68,11 @@ class workThread(threading.Thread):
self.output.dict[job.id] = jobstatus
self.output.lock.release()
self.inqueue.task_done()
'''release the resource'''
self.connection.close()
if self.db is not None:
self.db.close()
class jobThread(threading.Thread):
def __init__(self, inqueue, interval):
@ -75,33 +80,29 @@ class jobThread(threading.Thread):
self.inqueue = inqueue
self.interval = interval
def run(self):
while True:
while self.inqueue.qsize() > 0:
job = self.inqueue.get()
try:
job.run()
'''release the api connection'''
job.apiClient.connection.close()
except:
pass
self.inqueue.task_done()
time.sleep(self.interval)
class outputDict(object):
def __init__(self):
self.lock = threading.Condition()
self.dict = {}
class asyncJobMgr(object):
def __init__(self, apiClient, db, workers=10):
def __init__(self, apiClient, db):
self.inqueue = Queue.Queue()
self.output = outputDict()
self.apiClient = apiClient
self.db = db
self.workers = workers
for i in range(self.workers):
worker = workThread(self.inqueue, self.output, self.apiClient, self.db)
worker.setDaemon(True)
worker.start()
def submitCmds(self, cmds):
if not self.inqueue.empty():
@ -121,11 +122,18 @@ class asyncJobMgr(object):
self.inqueue.join()
return self.output.dict
def submitCmdsAndWait(self, cmds):
'''put commands into a queue at first, then start workers numbers threads to execute this commands'''
def submitCmdsAndWait(self, cmds, workers=10):
self.submitCmds(cmds)
for i in range(workers):
worker = workThread(self.inqueue, self.output, self.apiClient, self.db)
worker.start()
return self.waitForComplete()
def submitJobs(self, job, ntimes=1, nums_threads=1, interval=1):
'''submit one job and execute the same job ntimes, with nums_threads of threads'''
def submitJobExecuteNtimes(self, job, ntimes=1, nums_threads=1, interval=1):
inqueue1 = Queue.Queue()
lock = threading.Condition()
for i in range(ntimes):
@ -136,6 +144,20 @@ class asyncJobMgr(object):
for i in range(nums_threads):
work = jobThread(inqueue1, interval)
work.setDaemon(True)
work.start()
inqueue1.join()
'''submit n jobs, execute them with nums_threads of threads'''
def submitJobs(self, jobs, nums_threads=1, interval=1):
inqueue1 = Queue.Queue()
lock = threading.Condition()
for job in jobs:
setattr(job, "apiClient", copy.copy(self.apiClient))
setattr(job, "lock", lock)
inqueue1.put(job)
for i in range(nums_threads):
work = jobThread(inqueue1, interval)
work.start()
inqueue1.join()

View File

@ -25,7 +25,13 @@ class cloudConnection(object):
self.auth = True
self.asyncTimeout = asyncTimeout
def close(self):
try:
self.connection.close()
except:
pass
def __copy__(self):
return cloudConnection(self.mgtSvr, self.port, self.apiKey, self.securityKey, self.asyncTimeout, self.logging)

View File

@ -39,7 +39,14 @@ class cloudstackTestClient(object):
self.asyncJobMgr = asyncJobMgr.asyncJobMgr(self.apiClient, self.dbConnection)
return self.asyncJobMgr.submitCmdsAndWait(cmds)
def submitJobs(self, job, ntimes=1, nums_threads=10, interval=1):
'''submit one job and execute the same job ntimes, with nums_threads of threads'''
def submitJob(self, job, ntimes=1, nums_threads=10, interval=1):
if self.asyncJobMgr is None:
self.asyncJobMgr = asyncJobMgr.asyncJobMgr(self.apiClient, self.dbConnection)
self.asyncJobMgr.submitJobs(job, ntimes, nums_threads, interval)
self.asyncJobMgr.submitJobExecuteNtimes(job, ntimes, nums_threads, interval)
'''submit n jobs, execute them with nums_threads of threads'''
def submitJobs(self, jobs, nums_threads=10, interval=1):
if self.asyncJobMgr is None:
self.asyncJobMgr = asyncJobMgr.asyncJobMgr(self.apiClient, self.dbConnection)
self.asyncJobMgr.submitJobs(jobs, nums_threads, interval)

View File

@ -18,6 +18,12 @@ class dbConnection(object):
def __copy__(self):
return dbConnection(self.host, self.port, self.user, self.passwd, self.database)
def close(self):
try:
self.db.close()
except:
pass
def execute(self, sql=None):
if sql is None:
return None

View File

@ -30,7 +30,13 @@ if __name__ == "__main__":
testclient.dbConfigure()
api = testclient.getApiClient()
#testclient.submitJobs(jobs(1), 10, 10, 1)
testclient.submitJob(jobs(1), 10, 10, 1)
js = []
for i in range(10):
js.append(jobs(1))
testclient.submitJobs(js, 10, 1)
cmds = []
for i in range(20):
@ -46,4 +52,4 @@ if __name__ == "__main__":
else:
print jobStatus.result, jobStatus.startTime, jobStatus.endTime
print jobStatus.duration
print jobStatus.duration