Package rosmaster :: Module threadpool

Source Code for Module rosmaster.threadpool

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  """ 
 36  Internal threadpool library for zenmaster. 
 37   
 38  Adapted from U{http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/203871} 
 39   
 40  Added a 'marker' to tasks so that multiple tasks with the same 
 41  marker are not executed. As we are using the thread pool for i/o 
 42  tasks, the marker is set to the i/o name. This prevents a slow i/o 
 43  for gobbling up all of our threads 
 44  """ 
 45   
 46  import threading, logging, traceback 
 47  from time import sleep 
 48   
49 -class MarkedThreadPool:
50 51 """Flexible thread pool class. Creates a pool of threads, then 52 accepts tasks that will be dispatched to the next available 53 thread.""" 54
55 - def __init__(self, numThreads):
56 57 """Initialize the thread pool with numThreads workers.""" 58 59 self.__threads = [] 60 self.__resizeLock = threading.Condition(threading.Lock()) 61 self.__taskLock = threading.Condition(threading.Lock()) 62 self.__tasks = [] 63 self.__markers = set() 64 self.__isJoining = False 65 self.set_thread_count(numThreads)
66
67 - def set_thread_count(self, newNumThreads):
68 69 """ External method to set the current pool size. Acquires 70 the resizing lock, then calls the internal version to do real 71 work.""" 72 73 # Can't change the thread count if we're shutting down the pool! 74 if self.__isJoining: 75 return False 76 77 self.__resizeLock.acquire() 78 try: 79 self.__set_thread_count_nolock(newNumThreads) 80 finally: 81 self.__resizeLock.release() 82 return True
83
84 - def __set_thread_count_nolock(self, newNumThreads):
85 86 """Set the current pool size, spawning or terminating threads 87 if necessary. Internal use only; assumes the resizing lock is 88 held.""" 89 90 # If we need to grow the pool, do so 91 while newNumThreads > len(self.__threads): 92 newThread = ThreadPoolThread(self) 93 self.__threads.append(newThread) 94 newThread.start() 95 # If we need to shrink the pool, do so 96 while newNumThreads < len(self.__threads): 97 self.__threads[0].go_away() 98 del self.__threads[0]
99
100 - def get_thread_count(self):
101 """@return: number of threads in the pool.""" 102 self.__resizeLock.acquire() 103 try: 104 return len(self.__threads) 105 finally: 106 self.__resizeLock.release()
107
108 - def queue_task(self, marker, task, args=None, taskCallback=None):
109 110 """Insert a task into the queue. task must be callable; 111 args and taskCallback can be None.""" 112 113 if self.__isJoining == True: 114 return False 115 if not callable(task): 116 return False 117 118 self.__taskLock.acquire() 119 try: 120 self.__tasks.append((marker, task, args, taskCallback)) 121 return True 122 finally: 123 self.__taskLock.release()
124
125 - def remove_marker(self, marker):
126 """Remove the marker from the currently executing tasks. Only one 127 task with the given marker can be executed at a given time""" 128 if marker is None: 129 return 130 self.__taskLock.acquire() 131 try: 132 self.__markers.remove(marker) 133 finally: 134 self.__taskLock.release()
135
136 - def get_next_task(self):
137 138 """ Retrieve the next task from the task queue. For use 139 only by ThreadPoolThread objects contained in the pool.""" 140 141 self.__taskLock.acquire() 142 try: 143 retval = None 144 for marker, task, args, callback in self.__tasks: 145 # unmarked or not currently executing 146 if marker is None or marker not in self.__markers: 147 retval = (marker, task, args, callback) 148 break 149 if retval: 150 # add the marker so we don't do any similar tasks 151 self.__tasks.remove(retval) 152 if marker is not None: 153 self.__markers.add(marker) 154 return retval 155 else: 156 return (None, None, None, None) 157 finally: 158 self.__taskLock.release()
159
160 - def join_all(self, wait_for_tasks = True, wait_for_threads = True):
161 """ Clear the task queue and terminate all pooled threads, 162 optionally allowing the tasks and threads to finish.""" 163 164 # Mark the pool as joining to prevent any more task queueing 165 self.__isJoining = True 166 167 # Wait for tasks to finish 168 if wait_for_tasks: 169 while self.__tasks != []: 170 sleep(.1) 171 172 # Tell all the threads to quit 173 self.__resizeLock.acquire() 174 try: 175 self.__set_thread_count_nolock(0) 176 self.__isJoining = True 177 178 # Wait until all threads have exited 179 if wait_for_threads: 180 for t in self.__threads: 181 t.join() 182 del t 183 184 # Reset the pool for potential reuse 185 self.__isJoining = False 186 finally: 187 self.__resizeLock.release()
188 189 190
191 -class ThreadPoolThread(threading.Thread):
192 """ 193 Pooled thread class. 194 """ 195 196 threadSleepTime = 0.1 197
198 - def __init__(self, pool):
199 """Initialize the thread and remember the pool.""" 200 threading.Thread.__init__(self) 201 self.setDaemon(True) #don't block program exit 202 self.__pool = pool 203 self.__isDying = False
204
205 - def run(self):
206 """ 207 Until told to quit, retrieve the next task and execute 208 it, calling the callback if any. 209 """ 210 while self.__isDying == False: 211 marker, cmd, args, callback = self.__pool.get_next_task() 212 # If there's nothing to do, just sleep a bit 213 if cmd is None: 214 sleep(ThreadPoolThread.threadSleepTime) 215 else: 216 try: 217 try: 218 result = cmd(*args) 219 finally: 220 self.__pool.remove_marker(marker) 221 if callback is not None: 222 callback(result) 223 except Exception, e: 224 logging.getLogger('rosmaster.threadpool').error(traceback.format_exc())
225
226 - def go_away(self):
227 """ Exit the run loop next time through.""" 228 self.__isDying = True
229