Go to the documentation of this file.00001
00002
00003
00004 import threading
00005 import abc
00006 from Queue import Queue
00007 from Queue import Empty
00008
00009
00010 from rospeex_core import exceptions as ext
00011 from rospeex_core import logging_util
00012
00013
00014 logger = logging_util.get_logger(__name__)
00015
00016 __all__ = ['ThreadPool']
00017
00018
00019 class IRequest(object):
00020 """ Request data
00021 """
00022 __metaclass__ = abc.ABCMeta
00023
00024 @abc.abstractmethod
00025 def request(self, *args, **kwargs):
00026 pass
00027
00028 @abc.abstractmethod
00029 def has_error(self):
00030 pass
00031
00032 @abc.abstractmethod
00033 def response(self):
00034 pass
00035
00036 @abc.abstractmethod
00037 def result(self):
00038 pass
00039
00040
00041 class Worker(threading.Thread):
00042 """ Thrad worker class
00043 """
00044 def __init__(self, tasks, finish_tasks):
00045 """ init worker thread
00046 @param tasks:
00047 @param finish_tasks:
00048 """
00049 threading.Thread.__init__(self)
00050 self._tasks = tasks
00051 self._finish_tasks = finish_tasks
00052 self._stop_request = threading.Event()
00053
00054 self.start()
00055
00056 def run(self):
00057 """ run thread
00058 @return: None
00059 """
00060 while not self._stop_request.isSet():
00061 try:
00062 request, args, kwargs = self._tasks.get(timeout=0.01)
00063
00064 logger.debug('start task')
00065 request.request(*args, **kwargs)
00066 self._finish_tasks.put_nowait(request)
00067 self._tasks.task_done()
00068 logger.debug('finish task')
00069
00070 except Empty:
00071 pass
00072
00073 def join(self, timeout=None):
00074 """ end worker threaed
00075 @return: None
00076 """
00077 self._stop_request.set()
00078 self._tasks.join()
00079 super(Worker, self).join(timeout)
00080
00081
00082 class ThreadPool(object):
00083 """ ThradPool class """
00084 def __init__(self, num_thread):
00085 """ init class
00086 @param num_thread: number of worker thread
00087 """
00088 self._tasks = Queue()
00089 self._finish_tasks = Queue()
00090 self._workers = [
00091 Worker(self._tasks, self._finish_tasks) for i in range(num_thread)
00092 ]
00093
00094 def add_request(self, request, *args, **kwargs):
00095 """ add request
00096 @param request:
00097 @param args:
00098 @param kwargs:
00099 @return: None
00100 """
00101 if isinstance(request, IRequest):
00102 self._tasks.put_nowait([request, args, kwargs])
00103 else:
00104 msg = '{} is not IRequest instance.'.format(type(request))
00105 raise ext.InvalidRequestTypeException(msg)
00106
00107 def wait_completion(self):
00108 """ wait completion
00109 @return: None
00110 """
00111 self._tasks.join()
00112
00113 def check_completion(self):
00114 """ check completion
00115 @return: True for complete tasks / False for NOT complete tasks
00116 """
00117 if self._tasks.qsize() == 0:
00118 return True
00119 return False
00120
00121 def get_finish_task(self, wait=True):
00122 """ get finish task
00123 @return: finish task
00124 """
00125 task = None
00126 try:
00127 task = self._finish_tasks.get(wait)
00128 self._finish_tasks.task_done()
00129
00130 except Empty:
00131 logger.info('finish task que is empty')
00132 return task
00133
00134 def check_finish_task(self):
00135 """
00136 @return: None
00137 """
00138 if self._finish_tasks.qsize() == 0:
00139 return True
00140 return False
00141
00142 def end(self):
00143 """ end woker thread
00144 @return: None
00145 """
00146 logger.debug('end thread pool')
00147 self.wait_completion()
00148
00149 for worker in self._workers:
00150 worker.join()