1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18   
 19   
 20   
 21   
 22   
 23   
 24   
 25   
 26   
 27   
 28   
 29   
 30   
 31   
 32   
 33   
 34   
 35  """ 
 36  Internal threadpool library for zenmaster. 
 37   
 38  Adapted from U{http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/203871} 
 39   
 40  Added a 'marker' to tasks so that multiple tasks with the same 
 41  marker are not executed. As we are using the thread pool for i/o 
 42  tasks, the marker is set to the i/o name. This prevents a slow i/o 
 43  for gobbling up all of our threads 
 44  """ 
 45   
 46  import threading, logging, traceback 
 47  from time import sleep 
 48   
 50   
 51      """Flexible thread pool class.  Creates a pool of threads, then 
 52      accepts tasks that will be dispatched to the next available 
 53      thread.""" 
 54       
 56   
 57          """Initialize the thread pool with numThreads workers.""" 
 58           
 59          self.__threads = [] 
 60          self.__resizeLock = threading.Condition(threading.Lock()) 
 61          self.__taskLock = threading.Condition(threading.Lock()) 
 62          self.__tasks = [] 
 63          self.__markers = set() 
 64          self.__isJoining = False 
 65          self.set_thread_count(numThreads) 
  66   
 68   
 69          """ External method to set the current pool size.  Acquires 
 70          the resizing lock, then calls the internal version to do real 
 71          work.""" 
 72           
 73           
 74          if self.__isJoining: 
 75              return False 
 76           
 77          self.__resizeLock.acquire() 
 78          try: 
 79              self.__set_thread_count_nolock(newNumThreads) 
 80          finally: 
 81              self.__resizeLock.release() 
 82          return True 
  83   
 85           
 86          """Set the current pool size, spawning or terminating threads 
 87          if necessary.  Internal use only; assumes the resizing lock is 
 88          held.""" 
 89           
 90           
 91          while newNumThreads > len(self.__threads): 
 92              newThread = ThreadPoolThread(self) 
 93              self.__threads.append(newThread) 
 94              newThread.start() 
 95           
 96          while newNumThreads < len(self.__threads): 
 97              self.__threads[0].go_away() 
 98              del self.__threads[0] 
  99   
101          """@return: number of threads in the pool.""" 
102          self.__resizeLock.acquire() 
103          try: 
104              return len(self.__threads) 
105          finally: 
106              self.__resizeLock.release() 
 107   
108 -    def queue_task(self, marker, task, args=None, taskCallback=None): 
 109   
110          """Insert a task into the queue.  task must be callable; 
111          args and taskCallback can be None.""" 
112           
113          if self.__isJoining == True: 
114              return False 
115          if not callable(task): 
116              return False 
117           
118          self.__taskLock.acquire() 
119          try: 
120              self.__tasks.append((marker, task, args, taskCallback)) 
121              return True 
122          finally: 
123              self.__taskLock.release() 
 124   
126          """Remove the marker from the currently executing tasks. Only one 
127          task with the given marker can be executed at a given time""" 
128          if marker is None: 
129              return 
130          self.__taskLock.acquire()         
131          try: 
132              self.__markers.remove(marker) 
133          finally: 
134              self.__taskLock.release()             
 135       
137   
138          """ Retrieve the next task from the task queue.  For use 
139          only by ThreadPoolThread objects contained in the pool.""" 
140           
141          self.__taskLock.acquire() 
142          try: 
143              retval = None 
144              for marker, task, args, callback in self.__tasks: 
145                   
146                  if marker is None or marker not in self.__markers: 
147                      retval = (marker, task, args, callback) 
148                      break 
149              if retval: 
150                   
151                  self.__tasks.remove(retval) 
152                  if marker is not None: 
153                      self.__markers.add(marker) 
154                  return retval 
155              else: 
156                  return (None, None, None, None) 
157          finally: 
158              self.__taskLock.release() 
 159       
160 -    def join_all(self, wait_for_tasks = True, wait_for_threads = True): 
 161          """ Clear the task queue and terminate all pooled threads, 
162          optionally allowing the tasks and threads to finish.""" 
163           
164           
165          self.__isJoining = True 
166   
167           
168          if wait_for_tasks: 
169              while self.__tasks != []: 
170                  sleep(.1) 
171   
172           
173          self.__resizeLock.acquire() 
174          try: 
175              self.__set_thread_count_nolock(0) 
176              self.__isJoining = True 
177   
178               
179              if wait_for_threads: 
180                  for t in self.__threads: 
181                      t.join() 
182                      del t 
183   
184               
185              self.__isJoining = False 
186          finally: 
187              self.__resizeLock.release() 
  188   
189   
190           
192      """ 
193      Pooled thread class. 
194      """ 
195       
196      threadSleepTime = 0.1 
197   
199          """Initialize the thread and remember the pool.""" 
200          threading.Thread.__init__(self) 
201          self.setDaemon(True)  
202          self.__pool = pool 
203          self.__isDying = False 
 204           
206          """ 
207          Until told to quit, retrieve the next task and execute 
208          it, calling the callback if any.   
209          """ 
210          while self.__isDying == False: 
211              marker, cmd, args, callback = self.__pool.get_next_task() 
212               
213              if cmd is None: 
214                  sleep(ThreadPoolThread.threadSleepTime) 
215              else: 
216                  try: 
217                      try: 
218                          result = cmd(*args) 
219                      finally: 
220                          self.__pool.remove_marker(marker) 
221                      if callback is not None: 
222                          callback(result) 
223                  except Exception as e: 
224                      logging.getLogger('rosmaster.threadpool').error(traceback.format_exc()) 
 225       
227          """ Exit the run loop next time through.""" 
228          self.__isDying = True 
  229