1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 Internal threadpool library for zenmaster.
37
38 Adapted from U{http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/203871}
39
40 Added a 'marker' to tasks so that multiple tasks with the same
41 marker are not executed. As we are using the thread pool for i/o
42 tasks, the marker is set to the i/o name. This prevents a slow i/o
43 for gobbling up all of our threads
44 """
45
46 import threading, logging, traceback
47 from time import sleep
48
50
51 """Flexible thread pool class. Creates a pool of threads, then
52 accepts tasks that will be dispatched to the next available
53 thread."""
54
56
57 """Initialize the thread pool with numThreads workers."""
58
59 self.__threads = []
60 self.__resizeLock = threading.Condition(threading.Lock())
61 self.__taskLock = threading.Condition(threading.Lock())
62 self.__tasks = []
63 self.__markers = set()
64 self.__isJoining = False
65 self.set_thread_count(numThreads)
66
68
69 """ External method to set the current pool size. Acquires
70 the resizing lock, then calls the internal version to do real
71 work."""
72
73
74 if self.__isJoining:
75 return False
76
77 self.__resizeLock.acquire()
78 try:
79 self.__set_thread_count_nolock(newNumThreads)
80 finally:
81 self.__resizeLock.release()
82 return True
83
85
86 """Set the current pool size, spawning or terminating threads
87 if necessary. Internal use only; assumes the resizing lock is
88 held."""
89
90
91 while newNumThreads > len(self.__threads):
92 newThread = ThreadPoolThread(self)
93 self.__threads.append(newThread)
94 newThread.start()
95
96 while newNumThreads < len(self.__threads):
97 self.__threads[0].go_away()
98 del self.__threads[0]
99
101 """@return: number of threads in the pool."""
102 self.__resizeLock.acquire()
103 try:
104 return len(self.__threads)
105 finally:
106 self.__resizeLock.release()
107
108 - def queue_task(self, marker, task, args=None, taskCallback=None):
109
110 """Insert a task into the queue. task must be callable;
111 args and taskCallback can be None."""
112
113 if self.__isJoining == True:
114 return False
115 if not callable(task):
116 return False
117
118 self.__taskLock.acquire()
119 try:
120 self.__tasks.append((marker, task, args, taskCallback))
121 return True
122 finally:
123 self.__taskLock.release()
124
126 """Remove the marker from the currently executing tasks. Only one
127 task with the given marker can be executed at a given time"""
128 if marker is None:
129 return
130 self.__taskLock.acquire()
131 try:
132 self.__markers.remove(marker)
133 finally:
134 self.__taskLock.release()
135
137
138 """ Retrieve the next task from the task queue. For use
139 only by ThreadPoolThread objects contained in the pool."""
140
141 self.__taskLock.acquire()
142 try:
143 retval = None
144 for marker, task, args, callback in self.__tasks:
145
146 if marker is None or marker not in self.__markers:
147 retval = (marker, task, args, callback)
148 break
149 if retval:
150
151 self.__tasks.remove(retval)
152 if marker is not None:
153 self.__markers.add(marker)
154 return retval
155 else:
156 return (None, None, None, None)
157 finally:
158 self.__taskLock.release()
159
160 - def join_all(self, wait_for_tasks = True, wait_for_threads = True):
161 """ Clear the task queue and terminate all pooled threads,
162 optionally allowing the tasks and threads to finish."""
163
164
165 self.__isJoining = True
166
167
168 if wait_for_tasks:
169 while self.__tasks != []:
170 sleep(.1)
171
172
173 self.__resizeLock.acquire()
174 try:
175 self.__set_thread_count_nolock(0)
176 self.__isJoining = True
177
178
179 if wait_for_threads:
180 for t in self.__threads:
181 t.join()
182 del t
183
184
185 self.__isJoining = False
186 finally:
187 self.__resizeLock.release()
188
189
190
192 """
193 Pooled thread class.
194 """
195
196 threadSleepTime = 0.1
197
199 """Initialize the thread and remember the pool."""
200 threading.Thread.__init__(self)
201 self.setDaemon(True)
202 self.__pool = pool
203 self.__isDying = False
204
206 """
207 Until told to quit, retrieve the next task and execute
208 it, calling the callback if any.
209 """
210 while self.__isDying == False:
211 marker, cmd, args, callback = self.__pool.get_next_task()
212
213 if cmd is None:
214 sleep(ThreadPoolThread.threadSleepTime)
215 else:
216 try:
217 try:
218 result = cmd(*args)
219 finally:
220 self.__pool.remove_marker(marker)
221 if callback is not None:
222 callback(result)
223 except Exception, e:
224 logging.getLogger('rosmaster.threadpool').error(traceback.format_exc())
225
227 """ Exit the run loop next time through."""
228 self.__isDying = True
229