$search
00001 # -*- coding: utf-8 -*- 00002 # kate: replace-tabs off; indent-width 4; indent-mode normal 00003 # vim: ts=4:sw=4:noexpandtab 00004 00005 # Copyright (c) 2012 Stéphane Magnenat, ETHZ Zürich and other contributors 00006 # See file authors.txt for details. 00007 # All rights reserved. 00008 # 00009 # Redistribution and use in source and binary forms, with or without 00010 # modification, are permitted provided that the following conditions are met: 00011 # 00012 # * Redistributions of source code must retain the above copyright notice, 00013 # this list of conditions and the following disclaimer. 00014 # * Redistributions in binary form must reproduce the above copyright notice, 00015 # this list of conditions and the following disclaimer in the documentation 00016 # and/or other materials provided with the distribution. 00017 # * Neither the name of Stéphane Magnenat, ETHZ Zürich, nor the names 00018 # of the contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00022 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00023 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 00024 # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE 00025 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 00026 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 00027 # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 00029 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 00030 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00031 00032 from collections import deque 00033 import time 00034 import heapq 00035 import copy 00036 import inspect 00037 00038 # ------------------------------------------------------------ 00039 # === Tasks === 00040 # ------------------------------------------------------------ 00041 class Task(object): 00042 """ The object representing a task/co-routine in the scheduler """ 00043 WAIT_ANY = 1 00044 WAIT_ALL = 2 00045 taskid = 0 00046 def __init__(self,target): 00047 """ Initialize """ 00048 Task.taskid += 1 00049 self.tid = Task.taskid # Task ID 00050 self.target = target # Target coroutine 00051 self.sendval = None # Value to send 00052 self.waitmode = Task.WAIT_ANY 00053 def __repr__(self): 00054 """ Debug information on a task """ 00055 return 'Task ' + str(self.tid) + ' (' + self.target.__name__ + ') @ ' + str(id(self)) 00056 def run(self): 00057 """ Run a task until it hits the next yield statement""" 00058 return self.target.send(self.sendval) 00059 00060 # ------------------------------------------------------------ 00061 # === Conditional Variables === 00062 # ------------------------------------------------------------ 00063 class ConditionVariable(object): 00064 """ The basic conditional variable """ 00065 def __init__(self, initval=None): 00066 """ Initialize """ 00067 self.val = initval 00068 self.myname = None 00069 def __get__(self, obj, objtype): 00070 """ Return the value """ 00071 return self.val 00072 def __set__(self, obj, val): 00073 """ Set a value, evaluate conditions for tasks waiting on this variable """ 00074 self.val = val 00075 self._set_name(type(obj)) 00076 obj._test_conditions(self.myname) 00077 def _set_name(self, cls, top_level=True): 00078 """ if unknown, retrieve my own name using introspection """ 00079 if self.myname is None: 00080 members = cls.__dict__ 00081 # first look into members 00082 for name, value in members.iteritems(): 00083 if value is self: 00084 self.myname = name 00085 break 00086 # look into parents 00087 for parent_cls in cls.__bases__: 00088 self._set_name(parent_cls,False) 00089 # if not found and top-level, assert 00090 if top_level: 00091 assert self.myname is not None 00092 00093 # ------------------------------------------------------------ 00094 # === Scheduler === 00095 # ------------------------------------------------------------ 00096 class Scheduler(object): 00097 """ The scheduler base object, do not instanciate directly """ 00098 def __init__(self): 00099 """ Initialize """ 00100 # Map of all task identifiers to tasks 00101 self.taskmap = {} 00102 # Deque of ready tasks 00103 self.ready = deque() 00104 # Tasks waiting for other tasks to exit, map of: tid => list of tasks 00105 self.exit_waiting = {} 00106 # Task waiting on conditions, map of: "name of condition variable" => (condition, task) 00107 self.cond_waiting = {} 00108 # Task being paused by another task 00109 self.paused_in_syscall = set() 00110 self.paused_in_ready = set() 00111 # Not running a task initially 00112 self.current_task = None 00113 00114 # Public API, these functions are safe to be called from within a task or from outside 00115 00116 def list_all_tids(self): 00117 """ Return all task identifiers """ 00118 return self.taskmap.keys() 00119 00120 def get_current_tid(self): 00121 """ Return the identifier of current task, None if not called from a task """ 00122 if self.current_task is not None: 00123 return self.current_task.tid 00124 else: 00125 return None 00126 00127 def new_task(self, target): 00128 """ Create a new task from function target, return the task identifier """ 00129 newtask = Task(target) 00130 self.taskmap[newtask.tid] = newtask 00131 self._schedule(newtask) 00132 self._log_task_created(newtask) 00133 return newtask.tid 00134 00135 def kill_task(self, tid): 00136 """ Kill a task, return whether the task was killed """ 00137 task = self.taskmap.get(tid,None) 00138 if task: 00139 task.target.close() 00140 return True 00141 else: 00142 return False 00143 00144 def kill_tasks(self, tids): 00145 """ Kill multiple tasks, return the list of killed tasks """ 00146 return filter(self.kill_task, tids) 00147 00148 def kill_all_tasks_except(self, tids): 00149 """ Kill all tasks except a subset, return the list of killed tasks """ 00150 to_kill = filter(lambda tid: tid not in tids, self.list_all_tids()) 00151 return self.kill_tasks(to_kill) 00152 00153 def pause_task(self, tid): 00154 """ Pause a task, return whether the task was paused """ 00155 task = self.taskmap.get(tid,None) 00156 if task is None \ 00157 or task is self.current_task\ 00158 or task in self.paused_in_ready\ 00159 or task in self.paused_in_syscall: 00160 return False 00161 if task in self.ready: 00162 self.ready.remove(task) 00163 self.paused_in_ready.add(task) 00164 else: 00165 self.paused_in_syscall.add(task) 00166 return True 00167 00168 def pause_tasks(self, tids): 00169 """ Pause multiple tasks, return the lits of paused tasks """ 00170 return filter(self.pause_task, tids) 00171 00172 def pause_all_tasks_except(self, tids): 00173 """ Pause all tasks except a subset, return the list of paused tasks """ 00174 to_pause = filter(lambda tid: tid not in tids, self.list_all_tids()) 00175 return self.pause_tasks(to_pause) 00176 00177 def resume_task(self, tid): 00178 """ Resume a task, return whether the task was resumed successfully """ 00179 task = self.taskmap.get(tid,None) 00180 if task is None or task is self.current_task: 00181 return False 00182 if task in self.paused_in_ready: 00183 self.paused_in_ready.remove(task) 00184 self.ready.append(task) 00185 return True 00186 elif task in self.paused_in_syscall: 00187 self.paused_in_syscall.remove(task) 00188 return True 00189 return False 00190 00191 def resume_tasks(self, tids): 00192 """ Resume the execution of multiple tasks, return the list of resumed tasks """ 00193 return filter(self.resume_task, tids) 00194 00195 def resume_all_tasks_except(self, tids): 00196 """ Resume all tasks except a subset, return the list of resumed tasks """ 00197 to_resume = filter(lambda tid: tid not in tids, self.list_all_tids()) 00198 return self.resume_tasks(to_resume) 00199 00200 def create_rate(self, rate): 00201 """ Create a rate object, to have a loop at a certain frequency """ 00202 duration = 1./rate 00203 initial_time = self.current_time() 00204 return Rate(duration, initial_time) 00205 00206 def printd(self, msg): 00207 """ Print something including the current task identifier """ 00208 print "[teer tid: " + str(self.get_current_tid()) + "] " + msg 00209 00210 # Public API, these funtions must be called outside a task 00211 00212 def step(self): 00213 """ Run all tasks until none is ready """ 00214 if self.current_task is not None: 00215 raise RuntimeError('Scheduler.step() called within a task.') 00216 while self.ready: 00217 task = self.ready.popleft() 00218 try: 00219 #print 'Running ' + str(task) 00220 self.current_task = task 00221 result = task.run() 00222 self.current_task = None 00223 if isinstance(result,SystemCall): 00224 result.task = task 00225 result.sched = self 00226 result.handle() 00227 #print 'ready queue B: ' + str(self.ready) 00228 continue 00229 except StopIteration: 00230 self.current_task = None 00231 self._exit(task) 00232 continue 00233 self._schedule(task) 00234 00235 # Public API, these functions are safe to be called from within a task or from outside 00236 # these functions can be overriden by children 00237 00238 def current_time(self): 00239 """ Return the current time """ 00240 return time.time() 00241 00242 # Protected implementations, these functions can only be called by functions from this object 00243 # these functions can be overriden by children 00244 00245 def _sleep(self, duration): 00246 """ Sleep a certain amount of time """ 00247 time.sleep(duration) 00248 00249 def _set_timer_callback(self, t, f): 00250 """ Execute function f at time t """ 00251 raise NotImplementedError('timer callback mechanism must be provided by derived class') 00252 00253 def _log_task_created(self, task): 00254 """ Log for task created """ 00255 print time.ctime() + " - Task %s (tid %d) created" % (task.target.__name__, task.tid) 00256 00257 def _log_task_terminated(self, task): 00258 """ Log for task terminated """ 00259 print time.ctime() + " - Task %s (tid %d) terminated" % (task.target.__name__, task.tid) 00260 00261 # Protected implementations, these functions can only be called by functions from this object 00262 00263 def _exit(self,exiting_task): 00264 """ Handle the termination of a task """ 00265 self._log_task_terminated(exiting_task) 00266 del self.taskmap[exiting_task.tid] 00267 # Notify other tasks waiting for exit 00268 to_remove_keys = [] 00269 for task in self.exit_waiting.pop(exiting_task.tid,[]): 00270 if task.waitmode == Task.WAIT_ANY: 00271 # remove associations to other tasks waited on 00272 for waited_tid, waiting_tasks_list in self.exit_waiting.iteritems(): 00273 # remove task form list of waiting tasks if in there 00274 for waiting_task in waiting_tasks_list: 00275 if waiting_task.tid == task.tid: 00276 waiting_tasks_list.remove(waiting_task) 00277 # return the tid of the exiting_task 00278 task.sendval = exiting_task.tid 00279 self._schedule(task) 00280 else: 00281 are_still_waiting = False 00282 for waited_tid, waiting_tasks_list in self.exit_waiting.iteritems(): 00283 for waiting_task in waiting_tasks_list: 00284 if waiting_task.tid == task.tid: 00285 are_still_waiting = True 00286 if not are_still_waiting: 00287 # return the tid of the exiting_task 00288 task.sendval = exiting_task.tid 00289 self._schedule(task) 00290 self.exit_waiting = dict((k,v) for (k,v) in self.exit_waiting.iteritems() if v) 00291 00292 def _wait_for_exit(self,task,waittid): 00293 """ Set task waiting of the exit of task waittid """ 00294 if waittid in self.taskmap: 00295 self.exit_waiting.setdefault(waittid,[]).append(task) 00296 return True 00297 else: 00298 return False 00299 00300 def _schedule(self,task): 00301 if task in self.paused_in_syscall: 00302 self.paused_in_syscall.remove(task) 00303 self.paused_in_ready.add(task) 00304 else: 00305 self.ready.append(task) 00306 00307 def _schedule_now(self,task): 00308 if task in self.paused_in_syscall: 00309 self.paused_in_syscall.remove(task) 00310 self.paused_in_ready.add(task) 00311 else: 00312 self.ready.appendleft(task) 00313 00314 def _wait_duration(self,task,duration): 00315 def resume(task): 00316 self._schedule_now(task) 00317 self._set_timer_callback(self.current_time()+duration, lambda: resume(task)) 00318 00319 def _wait_duration_rate(self,task,duration,rate): 00320 def resume(task,rate): 00321 # get current time 00322 rate.last_time = self.current_time() 00323 # if not paused, execute the resumed task directly once we exit the syscall 00324 self._schedule_now(task) 00325 self._set_timer_callback(self.current_time()+duration, lambda: resume(task, rate)) 00326 00327 def _add_condition(self,entry): 00328 condition = entry[0] 00329 vars_in_cond = dict(inspect.getmembers(dict(inspect.getmembers(condition))['func_code']))['co_names'] 00330 for var in vars_in_cond: 00331 if var not in self.cond_waiting: 00332 self.cond_waiting[var] = [] 00333 self.cond_waiting[var].append(entry) 00334 00335 def _del_condition(self,candidate): 00336 (condition, task) = candidate 00337 vars_in_cond = dict(inspect.getmembers(dict(inspect.getmembers(condition))['func_code']))['co_names'] 00338 for var in vars_in_cond: 00339 if var in self.cond_waiting: 00340 self.cond_waiting[var].remove(candidate) 00341 if not self.cond_waiting[var]: 00342 del self.cond_waiting[var] 00343 00344 def _wait_condition(self,task,condition): 00345 # add a new condition and directly evalutate it once 00346 entry = (condition,task) 00347 if not condition(): 00348 self._add_condition(entry) 00349 else: 00350 self._schedule_now(task) 00351 00352 def _test_conditions(self, name): 00353 # is there any task waiting on this name? 00354 if name not in self.cond_waiting: 00355 return 00356 # check which conditions are true 00357 candidates = copy.copy(self.cond_waiting[name]) 00358 for candidate in candidates: 00359 (condition, task) = candidate 00360 if task not in self.paused_in_syscall and condition(): 00361 self._schedule(task) 00362 self._del_condition(candidate) 00363 00364 00365 class TimerScheduler(Scheduler): 00366 """ A scheduler that sleeps when there is nothing to do. """ 00367 00368 def __init__(self): 00369 """ Initialize """ 00370 super(TimerScheduler, self).__init__() 00371 self.timer_cb = [] 00372 self.timer_counter = 0 00373 00374 # Public API, these funtions must be called outside a task 00375 00376 def run(self): 00377 """ Run until there is no task to schedule """ 00378 if self.current_task is not None: 00379 raise RuntimeError('TimerScheduler.run() called within a task.') 00380 while self.timer_cb or self.ready or self.cond_waiting: 00381 self.step() 00382 t, counter, f = heapq.heappop(self.timer_cb) 00383 duration = t - self.current_time() 00384 if duration >= 0: 00385 self._sleep(duration) 00386 f() 00387 self.step() 00388 00389 def timer_step(self): 00390 """ Schedule all tasks with past deadlines and step """ 00391 if self.current_task is not None: 00392 raise RuntimeError('TimerScheduler.timer_step() called within a task.') 00393 while self.timer_cb: 00394 t, counter, f = heapq.heappop(self.timer_cb) 00395 duration = t - self.current_time() 00396 if duration <= 0: 00397 f() 00398 else: 00399 heapq.heappush(self.timer_cb, [t, counter, f]) 00400 break 00401 self.step() 00402 00403 # Protected implementations, these functions can only be called by functions from this object 00404 00405 def _set_timer_callback(self, t, f): 00406 """ Implement the timer callback """ 00407 heapq.heappush(self.timer_cb, [t, self.timer_counter, f]) 00408 self.timer_counter += 1 00409 00410 # ------------------------------------------------------------ 00411 # === Helper objects === 00412 # ------------------------------------------------------------ 00413 00414 class Rate(object): 00415 """ Helper class to execute a loop at a certain rate """ 00416 def __init__(self,duration,initial_time): 00417 """ Initialize """ 00418 self.duration = duration 00419 self.last_time = initial_time 00420 def sleep(self,sched,task): 00421 """ Sleep for the rest of this period """ 00422 cur_time = sched.current_time() 00423 delta_time = self.duration - (cur_time - self.last_time) 00424 if delta_time > 0: 00425 sched._wait_duration_rate(task, delta_time, self) 00426 else: 00427 sched._schedule(task) 00428 return delta_time 00429 00430 # ------------------------------------------------------------ 00431 # === System Calls === 00432 # ------------------------------------------------------------ 00433 00434 class SystemCall(object): 00435 """ Parent of all system calls """ 00436 def handle(self): 00437 """ Called in the scheduler context """ 00438 raise NotImplementedError('system call superclass should not be used directly') 00439 00440 class Pass(SystemCall): 00441 """ Pass the execution to other tasks """ 00442 def handle(self): 00443 self.task.sendval = True 00444 self.sched._schedule(self.task) 00445 00446 class GetScheduler(SystemCall): 00447 """ Return the scheduler, useful to access condition variables """ 00448 def handle(self): 00449 self.task.sendval = self.sched 00450 self.sched._schedule(self.task) 00451 00452 class WaitTask(SystemCall): 00453 """ Wait for a task to exit, return whether the wait was a success """ 00454 def __init__(self,tid): 00455 self.tid = tid 00456 def handle(self): 00457 result = self.sched._wait_for_exit(self.task,self.tid) 00458 self.task.sendval = result 00459 self.task.waitmode = Task.WAIT_ANY 00460 # If waiting for a non-existent task, 00461 # return immediately without waiting 00462 if not result: 00463 self.sched._schedule(self.task) 00464 00465 class WaitAnyTasks(SystemCall): 00466 """ Wait for any tasks to exit, return whether the wait was a success """ 00467 def __init__(self,tids): 00468 self.tids = tids 00469 def handle(self): 00470 self.task.waitmode = Task.WAIT_ANY 00471 # Check if all tasks exist 00472 all_exist = True 00473 non_existing_tid = None 00474 for tid in self.tids: 00475 if tid not in self.sched.taskmap: 00476 all_exist = False 00477 non_existing_tid = tid 00478 break 00479 # If all exist 00480 if all_exist: 00481 for tid in self.tids: 00482 self.sched._wait_for_exit(self.task,tid) 00483 #dont set sendval, we want exit() to assign the exiting tasks tid 00484 #self.task.sendval = True 00485 else: 00486 # If waiting for a non-existent task, 00487 # return immediately without waiting 00488 self.task.sendval = non_existing_tid 00489 self.sched._schedule(self.task) 00490 00491 class WaitAllTasks(SystemCall): 00492 """ Wait for all tasks to exit, return whether the wait was a success """ 00493 def __init__(self,tids): 00494 self.tids = tids 00495 def handle(self): 00496 self.task.waitmode = Task.WAIT_ALL 00497 any_exist = False 00498 for tid in self.tids: 00499 result = self.sched._wait_for_exit(self.task,tid) 00500 any_exist = any_exist or result 00501 # If waiting for non-existent tasks, 00502 # return immediately without waiting 00503 if any_exist: 00504 self.task.sendval = True 00505 else: 00506 self.task.sendval = False 00507 self.sched._schedule(self.task) 00508 00509 class WaitDuration(SystemCall): 00510 """ Pause current task for a certain duration """ 00511 def __init__(self,duration): 00512 self.duration = duration 00513 def handle(self): 00514 self.sched._wait_duration(self.task, self.duration) 00515 self.task.sendval = None 00516 00517 class WaitCondition(SystemCall): 00518 """ Pause current task until the condition is true """ 00519 def __init__(self,condition): 00520 self.condition = condition 00521 def handle(self): 00522 self.sched._wait_condition(self.task,self.condition) 00523 self.task.sendval = None 00524 00525 class Sleep(SystemCall): 00526 """ Sleep using a rate object """ 00527 def __init__(self,rate): 00528 self.rate = rate 00529 def handle(self): 00530 self.task.sendval = self.rate.sleep(self.sched, self.task)