thread_pool.py
Go to the documentation of this file.
00001 # -*- coding: utf-8 -*-
00002 
00003 # python libraries
00004 import threading
00005 import abc
00006 from Queue import Queue
00007 from Queue import Empty
00008 
00009 # local libraries
00010 from rospeex_core import exceptions as ext
00011 from rospeex_core import logging_util
00012 
00013 
00014 logger = logging_util.get_logger(__name__)
00015 
00016 __all__ = ['ThreadPool']
00017 
00018 
00019 class IRequest(object):
00020     """ Request data
00021     """
00022     __metaclass__ = abc.ABCMeta
00023 
00024     @abc.abstractmethod
00025     def request(self, *args, **kwargs):
00026         pass    # pragma: no cover
00027 
00028     @abc.abstractmethod
00029     def has_error(self):
00030         pass    # pragma: no cover
00031 
00032     @abc.abstractmethod
00033     def response(self):
00034         pass    # pragma: no cover
00035 
00036     @abc.abstractmethod
00037     def result(self):
00038         pass    # pragma: no cover
00039 
00040 
00041 class Worker(threading.Thread):
00042     """ Thrad worker class
00043     """
00044     def __init__(self, tasks, finish_tasks):
00045         """ init worker thread
00046         @param tasks:
00047         @param finish_tasks:
00048         """
00049         threading.Thread.__init__(self)
00050         self._tasks = tasks
00051         self._finish_tasks = finish_tasks
00052         self._stop_request = threading.Event()
00053         # self.daemon = True
00054         self.start()
00055 
00056     def run(self):
00057         """ run thread
00058         @return: None
00059         """
00060         while not self._stop_request.isSet():
00061             try:
00062                 request, args, kwargs = self._tasks.get(timeout=0.01)
00063 
00064                 logger.debug('start task')
00065                 request.request(*args, **kwargs)
00066                 self._finish_tasks.put_nowait(request)
00067                 self._tasks.task_done()
00068                 logger.debug('finish task')
00069 
00070             except Empty:
00071                 pass
00072 
00073     def join(self, timeout=None):
00074         """ end worker threaed
00075         @return: None
00076         """
00077         self._stop_request.set()
00078         self._tasks.join()
00079         super(Worker, self).join(timeout)
00080 
00081 
00082 class ThreadPool(object):
00083     """ ThradPool class """
00084     def __init__(self, num_thread):
00085         """ init class
00086         @param num_thread: number of worker thread
00087         """
00088         self._tasks = Queue()
00089         self._finish_tasks = Queue()
00090         self._workers = [
00091             Worker(self._tasks, self._finish_tasks) for i in range(num_thread)
00092         ]
00093 
00094     def add_request(self, request, *args, **kwargs):
00095         """ add request
00096         @param request:
00097         @param args:
00098         @param kwargs:
00099         @return: None
00100         """
00101         if isinstance(request, IRequest):
00102             self._tasks.put_nowait([request, args, kwargs])
00103         else:
00104             msg = '{} is not IRequest instance.'.format(type(request))
00105             raise ext.InvalidRequestTypeException(msg)
00106 
00107     def wait_completion(self):
00108         """ wait completion
00109         @return: None
00110         """
00111         self._tasks.join()
00112 
00113     def check_completion(self):
00114         """ check completion
00115         @return: True for complete tasks / False for NOT complete tasks
00116         """
00117         if self._tasks.qsize() == 0:
00118             return True
00119         return False
00120 
00121     def get_finish_task(self, wait=True):
00122         """ get finish task
00123         @return: finish task
00124         """
00125         task = None
00126         try:
00127             task = self._finish_tasks.get(wait)
00128             self._finish_tasks.task_done()
00129 
00130         except Empty:
00131             logger.info('finish task que is empty')
00132         return task
00133 
00134     def check_finish_task(self):
00135         """
00136         @return: None
00137         """
00138         if self._finish_tasks.qsize() == 0:
00139             return True
00140         return False
00141 
00142     def end(self):
00143         """ end woker thread
00144         @return: None
00145         """
00146         logger.debug('end thread pool')
00147         self.wait_completion()
00148 
00149         for worker in self._workers:
00150             worker.join()


rospeex_core
Author(s): Komei Sugiura
autogenerated on Thu Apr 20 2017 03:08:53