Queue 可以作为线程间共享的消息队列。
Thread可以异步的处理Queue中的消息。
import threading ,Queue
class BPMPatternMultiDeployer():
#put the pattern names in this queue
Qin = Queue.Queue()
#put the vs needed waiting for in this queue
Qwait = Queue.Queue()
#provison results put into Qout
Qout = Queue.Queue()
#any error msg put into Qerr
Qerr = Queue.Queue()
#save the deamons in this list
Pool = []
#BPMPatternDeployer(pattern_list,profile,cloud,ip_group,vs_name_prefix)
def __init__(self, profile, cloud, ip_group, vs_password="passw0rd"):
self.profile = profile
self.ipgroup = ip_group
self.cloud = cloud
self.password = vs_password
"""
do the deploying work in a multi thread way for multi patterns
"""
""" put the error to queue """
def report_error(self):
self.Qerr.put(sys.exc_info()[:2])
def get_all_from_queue(self, Q):
try:
while True:
yield Q.get_nowait()
except Queue.Empty:
raise StopIteration
"""process the request for deploying in queue"""
def wait_for_virtual_system_in_queue(self):
while True:
command, virtual_system = self.Qwait.get()
print "job start", command
# kill thread if the cmd is stop
if command == 'stop':
break
# wait for the vSys status
if command == 'waitfor':
print ">>%s Start to wait for the virtual system %s ." % (timestamp(), virtual_system.name)
waitting = True
while waitting:
vSys_status = virtual_system.currentstatus
vSys_status_text = virtual_system.currentstatus_text
if vSys_status == u"RM01006":
waitting = False
print ">>%s Cheers ! %s is deployed successfully." % (timestamp(), virtual_system.name)
elif vSys_status == u"RM01013":
waitting = False
print ">>%s Oops ! %s is failed to deploy." % (timestamp(), virtual_system.name)
else:
print ">>%s %s status :%s" % (timestamp(), virtual_system.name, vSys_status_text)
time.sleep(30)
else:
raise ValueError, 'Unknown command %r' % command
# except:
# unconditional except is right, since we report _all_ errors
# self.report_error()
# else:
# self.Qout.put("unknown")
def make_and_start_wait_for_thread_pool(self, number_of_threads_in_pool=5, daemons=False):
for i in range(number_of_threads_in_pool):
new_thread = threading.Thread(target=self.wait_for_virtual_system_in_queue)
new_thread.setDaemon(daemons)
self.Pool.append(new_thread)
new_thread.start()
def request_deploy_pattern(self, pattern_name, prefix):
bpm_deployer = BPMPatternDeployer(self.profile, self.cloud, self.ipgroup, prefix)
self.request_wait_for_job(virtual_system=bpm_deployer.deploy_pattern(pattern_name, self.password), command='waitfor')
def request_wait_for_job(self, virtual_system, command='waitfor'):
print ">>%s Put %s into the Qwait , cmd:%s" % (timestamp(), virtual_system.name, command)
self.Qwait.put((command, virtual_system))
def get_result(self):
return self.Qout.get() # implicitly stops and waits
def show_all_results(self):
for result in self.get_all_from_queue(self.Qout):
print 'Result:', result
def show_all_errors(self):
for etyp, err in self.get_all_from_queue(self.Qerr):
print 'Error:', etyp, err
def stop_and_free_thread_pool(self):
for i in range(len(self.Pool)):
self.request_work(None, 'stop')
for existing_thread in self.Pool:
existing_thread.join()
# clean up the pool from now-unused thread objects
del self.Pool[:]
# test bpm multi deployer
bpm_deployer = BPMPatternMultiDeployer(profile, cloud, ip_group, vs_password)
for pattern in pattern_list: bpm_deployer.request_deploy_pattern(pattern["name"],pattern["prefix"])
bpm_deployer.make_and_start_wait_for_thread_pool()