00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 from collections import deque
00033 import time
00034 import heapq
00035 import copy
00036 import inspect
00037
00038
00039
00040
00041 class Task(object):
00042 """ The object representing a task/co-routine in the scheduler """
00043 WAIT_ANY = 1
00044 WAIT_ALL = 2
00045 taskid = 0
00046 def __init__(self,target):
00047 """ Initialize """
00048 Task.taskid += 1
00049 self.tid = Task.taskid
00050 self.target = target
00051 self.sendval = None
00052 self.waitmode = Task.WAIT_ANY
00053 def __repr__(self):
00054 """ Debug information on a task """
00055 return 'Task ' + str(self.tid) + ' (' + self.target.__name__ + ') @ ' + str(id(self))
00056 def run(self):
00057 """ Run a task until it hits the next yield statement"""
00058 return self.target.send(self.sendval)
00059
00060
00061
00062
00063 class ConditionVariable(object):
00064 """ The basic conditional variable """
00065 def __init__(self, initval=None):
00066 """ Initialize """
00067 self.val = initval
00068 self.myname = None
00069 def __get__(self, obj, objtype):
00070 """ Return the value """
00071 return self.val
00072 def __set__(self, obj, val):
00073 """ Set a value, evaluate conditions for tasks waiting on this variable """
00074 self.val = val
00075 self._set_name(type(obj))
00076 obj._test_conditions(self.myname)
00077 def _set_name(self, cls, top_level=True):
00078 """ if unknown, retrieve my own name using introspection """
00079 if self.myname is None:
00080 members = cls.__dict__
00081
00082 for name, value in members.iteritems():
00083 if value is self:
00084 self.myname = name
00085 break
00086
00087 for parent_cls in cls.__bases__:
00088 self._set_name(parent_cls,False)
00089
00090 if top_level:
00091 assert self.myname is not None
00092
00093
00094
00095
00096 class Scheduler(object):
00097 """ The scheduler base object, do not instanciate directly """
00098 def __init__(self):
00099 """ Initialize """
00100
00101 self.taskmap = {}
00102
00103 self.ready = deque()
00104
00105 self.exit_waiting = {}
00106
00107 self.cond_waiting = {}
00108
00109 self.paused_in_syscall = set()
00110 self.paused_in_ready = set()
00111
00112 self.current_task = None
00113
00114
00115
00116 def list_all_tids(self):
00117 """ Return all task identifiers """
00118 return self.taskmap.keys()
00119
00120 def get_current_tid(self):
00121 """ Return the identifier of current task, None if not called from a task """
00122 if self.current_task is not None:
00123 return self.current_task.tid
00124 else:
00125 return None
00126
00127 def new_task(self, target):
00128 """ Create a new task from function target, return the task identifier """
00129 newtask = Task(target)
00130 self.taskmap[newtask.tid] = newtask
00131 self._schedule(newtask)
00132 self._log_task_created(newtask)
00133 return newtask.tid
00134
00135 def kill_task(self, tid):
00136 """ Kill a task, return whether the task was killed """
00137 task = self.taskmap.get(tid,None)
00138 if task:
00139 task.target.close()
00140 return True
00141 else:
00142 return False
00143
00144 def kill_tasks(self, tids):
00145 """ Kill multiple tasks, return the list of killed tasks """
00146 return filter(self.kill_task, tids)
00147
00148 def kill_all_tasks_except(self, tids):
00149 """ Kill all tasks except a subset, return the list of killed tasks """
00150 to_kill = filter(lambda tid: tid not in tids, self.list_all_tids())
00151 return self.kill_tasks(to_kill)
00152
00153 def pause_task(self, tid):
00154 """ Pause a task, return whether the task was paused """
00155 task = self.taskmap.get(tid,None)
00156 if task is None \
00157 or task is self.current_task\
00158 or task in self.paused_in_ready\
00159 or task in self.paused_in_syscall:
00160 return False
00161 if task in self.ready:
00162 self.ready.remove(task)
00163 self.paused_in_ready.add(task)
00164 else:
00165 self.paused_in_syscall.add(task)
00166 return True
00167
00168 def pause_tasks(self, tids):
00169 """ Pause multiple tasks, return the lits of paused tasks """
00170 return filter(self.pause_task, tids)
00171
00172 def pause_all_tasks_except(self, tids):
00173 """ Pause all tasks except a subset, return the list of paused tasks """
00174 to_pause = filter(lambda tid: tid not in tids, self.list_all_tids())
00175 return self.pause_tasks(to_pause)
00176
00177 def resume_task(self, tid):
00178 """ Resume a task, return whether the task was resumed successfully """
00179 task = self.taskmap.get(tid,None)
00180 if task is None or task is self.current_task:
00181 return False
00182 if task in self.paused_in_ready:
00183 self.paused_in_ready.remove(task)
00184 self.ready.append(task)
00185 return True
00186 elif task in self.paused_in_syscall:
00187 self.paused_in_syscall.remove(task)
00188 return True
00189 return False
00190
00191 def resume_tasks(self, tids):
00192 """ Resume the execution of multiple tasks, return the list of resumed tasks """
00193 return filter(self.resume_task, tids)
00194
00195 def resume_all_tasks_except(self, tids):
00196 """ Resume all tasks except a subset, return the list of resumed tasks """
00197 to_resume = filter(lambda tid: tid not in tids, self.list_all_tids())
00198 return self.resume_tasks(to_resume)
00199
00200 def create_rate(self, rate):
00201 """ Create a rate object, to have a loop at a certain frequency """
00202 duration = 1./rate
00203 initial_time = self.current_time()
00204 return Rate(duration, initial_time)
00205
00206 def printd(self, msg):
00207 """ Print something including the current task identifier """
00208 print "[teer tid: " + str(self.get_current_tid()) + "] " + msg
00209
00210
00211
00212 def step(self):
00213 """ Run all tasks until none is ready """
00214 if self.current_task is not None:
00215 raise RuntimeError('Scheduler.step() called within a task.')
00216 while self.ready:
00217 task = self.ready.popleft()
00218 try:
00219
00220 self.current_task = task
00221 result = task.run()
00222 self.current_task = None
00223 if isinstance(result,SystemCall):
00224 result.task = task
00225 result.sched = self
00226 result.handle()
00227
00228 continue
00229 except StopIteration:
00230 self.current_task = None
00231 self._exit(task)
00232 continue
00233 self._schedule(task)
00234
00235
00236
00237
00238 def current_time(self):
00239 """ Return the current time """
00240 return time.time()
00241
00242
00243
00244
00245 def _sleep(self, duration):
00246 """ Sleep a certain amount of time """
00247 time.sleep(duration)
00248
00249 def _set_timer_callback(self, t, f):
00250 """ Execute function f at time t """
00251 raise NotImplementedError('timer callback mechanism must be provided by derived class')
00252
00253 def _log_task_created(self, task):
00254 """ Log for task created """
00255 print time.ctime() + " - Task %s (tid %d) created" % (task.target.__name__, task.tid)
00256
00257 def _log_task_terminated(self, task):
00258 """ Log for task terminated """
00259 print time.ctime() + " - Task %s (tid %d) terminated" % (task.target.__name__, task.tid)
00260
00261
00262
00263 def _exit(self,exiting_task):
00264 """ Handle the termination of a task """
00265 self._log_task_terminated(exiting_task)
00266 del self.taskmap[exiting_task.tid]
00267
00268 to_remove_keys = []
00269 for task in self.exit_waiting.pop(exiting_task.tid,[]):
00270 if task.waitmode == Task.WAIT_ANY:
00271
00272 for waited_tid, waiting_tasks_list in self.exit_waiting.iteritems():
00273
00274 for waiting_task in waiting_tasks_list:
00275 if waiting_task.tid == task.tid:
00276 waiting_tasks_list.remove(waiting_task)
00277
00278 task.sendval = exiting_task.tid
00279 self._schedule(task)
00280 else:
00281 are_still_waiting = False
00282 for waited_tid, waiting_tasks_list in self.exit_waiting.iteritems():
00283 for waiting_task in waiting_tasks_list:
00284 if waiting_task.tid == task.tid:
00285 are_still_waiting = True
00286 if not are_still_waiting:
00287
00288 task.sendval = exiting_task.tid
00289 self._schedule(task)
00290 self.exit_waiting = dict((k,v) for (k,v) in self.exit_waiting.iteritems() if v)
00291
00292 def _wait_for_exit(self,task,waittid):
00293 """ Set task waiting of the exit of task waittid """
00294 if waittid in self.taskmap:
00295 self.exit_waiting.setdefault(waittid,[]).append(task)
00296 return True
00297 else:
00298 return False
00299
00300 def _schedule(self,task):
00301 if task in self.paused_in_syscall:
00302 self.paused_in_syscall.remove(task)
00303 self.paused_in_ready.add(task)
00304 else:
00305 self.ready.append(task)
00306
00307 def _schedule_now(self,task):
00308 if task in self.paused_in_syscall:
00309 self.paused_in_syscall.remove(task)
00310 self.paused_in_ready.add(task)
00311 else:
00312 self.ready.appendleft(task)
00313
00314 def _wait_duration(self,task,duration):
00315 def resume(task):
00316 self._schedule_now(task)
00317 self._set_timer_callback(self.current_time()+duration, lambda: resume(task))
00318
00319 def _wait_duration_rate(self,task,duration,rate):
00320 def resume(task,rate):
00321
00322 rate.last_time = self.current_time()
00323
00324 self._schedule_now(task)
00325 self._set_timer_callback(self.current_time()+duration, lambda: resume(task, rate))
00326
00327 def _add_condition(self,entry):
00328 condition = entry[0]
00329 vars_in_cond = dict(inspect.getmembers(dict(inspect.getmembers(condition))['func_code']))['co_names']
00330 for var in vars_in_cond:
00331 if var not in self.cond_waiting:
00332 self.cond_waiting[var] = []
00333 self.cond_waiting[var].append(entry)
00334
00335 def _del_condition(self,candidate):
00336 (condition, task) = candidate
00337 vars_in_cond = dict(inspect.getmembers(dict(inspect.getmembers(condition))['func_code']))['co_names']
00338 for var in vars_in_cond:
00339 if var in self.cond_waiting:
00340 self.cond_waiting[var].remove(candidate)
00341 if not self.cond_waiting[var]:
00342 del self.cond_waiting[var]
00343
00344 def _wait_condition(self,task,condition):
00345
00346 entry = (condition,task)
00347 if not condition():
00348 self._add_condition(entry)
00349 else:
00350 self._schedule_now(task)
00351
00352 def _test_conditions(self, name):
00353
00354 if name not in self.cond_waiting:
00355 return
00356
00357 candidates = copy.copy(self.cond_waiting[name])
00358 for candidate in candidates:
00359 (condition, task) = candidate
00360 if task not in self.paused_in_syscall and condition():
00361 self._schedule(task)
00362 self._del_condition(candidate)
00363
00364
00365 class TimerScheduler(Scheduler):
00366 """ A scheduler that sleeps when there is nothing to do. """
00367
00368 def __init__(self):
00369 """ Initialize """
00370 super(TimerScheduler, self).__init__()
00371 self.timer_cb = []
00372 self.timer_counter = 0
00373
00374
00375
00376 def run(self):
00377 """ Run until there is no task to schedule """
00378 if self.current_task is not None:
00379 raise RuntimeError('TimerScheduler.run() called within a task.')
00380 while self.timer_cb or self.ready or self.cond_waiting:
00381 self.step()
00382 t, counter, f = heapq.heappop(self.timer_cb)
00383 duration = t - self.current_time()
00384 if duration >= 0:
00385 self._sleep(duration)
00386 f()
00387 self.step()
00388
00389 def timer_step(self):
00390 """ Schedule all tasks with past deadlines and step """
00391 if self.current_task is not None:
00392 raise RuntimeError('TimerScheduler.timer_step() called within a task.')
00393 while self.timer_cb:
00394 t, counter, f = heapq.heappop(self.timer_cb)
00395 duration = t - self.current_time()
00396 if duration <= 0:
00397 f()
00398 else:
00399 heapq.heappush(self.timer_cb, [t, counter, f])
00400 break
00401 self.step()
00402
00403
00404
00405 def _set_timer_callback(self, t, f):
00406 """ Implement the timer callback """
00407 heapq.heappush(self.timer_cb, [t, self.timer_counter, f])
00408 self.timer_counter += 1
00409
00410
00411
00412
00413
00414 class Rate(object):
00415 """ Helper class to execute a loop at a certain rate """
00416 def __init__(self,duration,initial_time):
00417 """ Initialize """
00418 self.duration = duration
00419 self.last_time = initial_time
00420 def sleep(self,sched,task):
00421 """ Sleep for the rest of this period """
00422 cur_time = sched.current_time()
00423 delta_time = self.duration - (cur_time - self.last_time)
00424 if delta_time > 0:
00425 sched._wait_duration_rate(task, delta_time, self)
00426 else:
00427 sched._schedule(task)
00428 return delta_time
00429
00430
00431
00432
00433
00434 class SystemCall(object):
00435 """ Parent of all system calls """
00436 def handle(self):
00437 """ Called in the scheduler context """
00438 raise NotImplementedError('system call superclass should not be used directly')
00439
00440 class Pass(SystemCall):
00441 """ Pass the execution to other tasks """
00442 def handle(self):
00443 self.task.sendval = True
00444 self.sched._schedule(self.task)
00445
00446 class GetScheduler(SystemCall):
00447 """ Return the scheduler, useful to access condition variables """
00448 def handle(self):
00449 self.task.sendval = self.sched
00450 self.sched._schedule(self.task)
00451
00452 class WaitTask(SystemCall):
00453 """ Wait for a task to exit, return whether the wait was a success """
00454 def __init__(self,tid):
00455 self.tid = tid
00456 def handle(self):
00457 result = self.sched._wait_for_exit(self.task,self.tid)
00458 self.task.sendval = result
00459 self.task.waitmode = Task.WAIT_ANY
00460
00461
00462 if not result:
00463 self.sched._schedule(self.task)
00464
00465 class WaitAnyTasks(SystemCall):
00466 """ Wait for any tasks to exit, return whether the wait was a success """
00467 def __init__(self,tids):
00468 self.tids = tids
00469 def handle(self):
00470 self.task.waitmode = Task.WAIT_ANY
00471
00472 all_exist = True
00473 non_existing_tid = None
00474 for tid in self.tids:
00475 if tid not in self.sched.taskmap:
00476 all_exist = False
00477 non_existing_tid = tid
00478 break
00479
00480 if all_exist:
00481 for tid in self.tids:
00482 self.sched._wait_for_exit(self.task,tid)
00483
00484
00485 else:
00486
00487
00488 self.task.sendval = non_existing_tid
00489 self.sched._schedule(self.task)
00490
00491 class WaitAllTasks(SystemCall):
00492 """ Wait for all tasks to exit, return whether the wait was a success """
00493 def __init__(self,tids):
00494 self.tids = tids
00495 def handle(self):
00496 self.task.waitmode = Task.WAIT_ALL
00497 any_exist = False
00498 for tid in self.tids:
00499 result = self.sched._wait_for_exit(self.task,tid)
00500 any_exist = any_exist or result
00501
00502
00503 if any_exist:
00504 self.task.sendval = True
00505 else:
00506 self.task.sendval = False
00507 self.sched._schedule(self.task)
00508
00509 class WaitDuration(SystemCall):
00510 """ Pause current task for a certain duration """
00511 def __init__(self,duration):
00512 self.duration = duration
00513 def handle(self):
00514 self.sched._wait_duration(self.task, self.duration)
00515 self.task.sendval = None
00516
00517 class WaitCondition(SystemCall):
00518 """ Pause current task until the condition is true """
00519 def __init__(self,condition):
00520 self.condition = condition
00521 def handle(self):
00522 self.sched._wait_condition(self.task,self.condition)
00523 self.task.sendval = None
00524
00525 class Sleep(SystemCall):
00526 """ Sleep using a rate object """
00527 def __init__(self,rate):
00528 self.rate = rate
00529 def handle(self):
00530 self.task.sendval = self.rate.sleep(self.sched, self.task)