00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 from collections import deque
00033 import time
00034 import heapq
00035 import copy
00036 import inspect
00037 
00038 
00039 
00040 
00041 class Task(object):
00042         """ The object representing a task/co-routine in the scheduler """
00043         WAIT_ANY = 1
00044         WAIT_ALL = 2
00045         taskid = 0
00046         def __init__(self,target):
00047                 """ Initialize """
00048                 Task.taskid += 1
00049                 self.tid     = Task.taskid   
00050                 self.target  = target        
00051                 self.sendval = None          
00052                 self.waitmode = Task.WAIT_ANY
00053         def __repr__(self):
00054                 """ Debug information on a task """
00055                 return 'Task ' + str(self.tid) + ' (' + self.target.__name__ + ') @ ' + str(id(self))
00056         def run(self):
00057                 """ Run a task until it hits the next yield statement"""
00058                 return self.target.send(self.sendval)
00059 
00060 
00061 
00062 
00063 class ConditionVariable(object):
00064         """ The basic conditional variable """
00065         def __init__(self, initval=None):
00066                 """ Initialize """
00067                 self.val = initval
00068                 self.myname = None
00069         def __get__(self, obj, objtype):
00070                 """ Return the value """
00071                 return self.val
00072         def __set__(self, obj, val):
00073                 """ Set a value, evaluate conditions for tasks waiting on this variable """
00074                 self.val = val
00075                 self._set_name(type(obj))
00076                 obj._test_conditions(self.myname)
00077         def _set_name(self, cls, top_level=True):
00078                 """ if unknown, retrieve my own name using introspection """
00079                 if self.myname is None:
00080                         members = cls.__dict__
00081                         
00082                         for name, value in members.iteritems():
00083                                 if value is self:
00084                                         self.myname = name
00085                                         break
00086                         
00087                         for parent_cls in cls.__bases__:
00088                                 self._set_name(parent_cls,False)
00089                         
00090                         if top_level:
00091                                 assert self.myname is not None
00092 
00093 
00094 
00095 
00096 class Scheduler(object):
00097         """ The scheduler base object, do not instanciate directly """
00098         def __init__(self):
00099                 """ Initialize """
00100                 
00101                 self.taskmap = {}
00102                 
00103                 self.ready   = deque()   
00104                 
00105                 self.exit_waiting = {}
00106                 
00107                 self.cond_waiting = {}
00108                 
00109                 self.paused_in_syscall = set()
00110                 self.paused_in_ready = set()
00111                 
00112                 self.current_task = None
00113         
00114         
00115         
00116         def list_all_tids(self):
00117                 """ Return all task identifiers """
00118                 return self.taskmap.keys()
00119         
00120         def get_current_tid(self):
00121                 """ Return the identifier of current task, None if not called from a task """
00122                 if self.current_task is not None:
00123                         return self.current_task.tid
00124                 else:
00125                         return None
00126         
00127         def new_task(self, target):
00128                 """ Create a new task from function target, return the task identifier """
00129                 newtask = Task(target)
00130                 self.taskmap[newtask.tid] = newtask
00131                 self._schedule(newtask)
00132                 self._log_task_created(newtask)
00133                 return newtask.tid
00134         
00135         def kill_task(self, tid):
00136                 """ Kill a task, return whether the task was killed """
00137                 task = self.taskmap.get(tid,None)
00138                 if task:
00139                         task.target.close() 
00140                         return True
00141                 else:
00142                         return False
00143         
00144         def kill_tasks(self, tids):
00145                 """ Kill multiple tasks, return the list of killed tasks """
00146                 return filter(self.kill_task, tids)
00147         
00148         def kill_all_tasks_except(self, tids):
00149                 """ Kill all tasks except a subset, return the list of killed tasks """
00150                 to_kill = filter(lambda tid: tid not in tids, self.list_all_tids())
00151                 return self.kill_tasks(to_kill)
00152 
00153         def pause_task(self, tid):
00154                 """ Pause a task, return whether the task was paused """
00155                 task = self.taskmap.get(tid,None)
00156                 if task is None \
00157                         or task is self.current_task\
00158                         or task in self.paused_in_ready\
00159                         or task in self.paused_in_syscall:
00160                         return False
00161                 if task in self.ready:
00162                         self.ready.remove(task)
00163                         self.paused_in_ready.add(task)
00164                 else:
00165                         self.paused_in_syscall.add(task)
00166                 return True
00167         
00168         def pause_tasks(self, tids):
00169                 """ Pause multiple tasks, return the lits of paused tasks """
00170                 return filter(self.pause_task, tids)
00171         
00172         def pause_all_tasks_except(self, tids):
00173                 """ Pause all tasks except a subset, return the list of paused tasks """
00174                 to_pause = filter(lambda tid: tid not in tids, self.list_all_tids())
00175                 return self.pause_tasks(to_pause)
00176         
00177         def resume_task(self, tid):
00178                 """ Resume a task, return whether the task was resumed successfully """
00179                 task = self.taskmap.get(tid,None)
00180                 if task is None or task is self.current_task:
00181                         return False
00182                 if task in self.paused_in_ready:
00183                         self.paused_in_ready.remove(task)
00184                         self.ready.append(task)
00185                         return True
00186                 elif task in self.paused_in_syscall:
00187                         self.paused_in_syscall.remove(task)
00188                         return True
00189                 return False
00190         
00191         def resume_tasks(self, tids):
00192                 """ Resume the execution of multiple tasks, return the list of resumed tasks """
00193                 return filter(self.resume_task, tids)
00194         
00195         def resume_all_tasks_except(self, tids):
00196                 """ Resume all tasks except a subset, return the list of resumed tasks """
00197                 to_resume = filter(lambda tid: tid not in tids, self.list_all_tids())
00198                 return self.resume_tasks(to_resume)
00199         
00200         def create_rate(self, rate):
00201                 """ Create a rate object, to have a loop at a certain frequency """
00202                 duration = 1./rate
00203                 initial_time = self.current_time()
00204                 return Rate(duration, initial_time)
00205         
00206         def printd(self, msg):
00207                 """ Print something including the current task identifier """
00208                 print "[teer tid: " + str(self.get_current_tid()) + "] " + msg
00209         
00210         
00211         
00212         def step(self):
00213                 """ Run all tasks until none is ready """
00214                 if self.current_task is not None:
00215                         raise RuntimeError('Scheduler.step() called within a task.')
00216                 while self.ready:
00217                         task = self.ready.popleft()
00218                         try:
00219                                 
00220                                 self.current_task = task
00221                                 result = task.run()
00222                                 self.current_task = None
00223                                 if isinstance(result,SystemCall):
00224                                         result.task  = task
00225                                         result.sched = self
00226                                         result.handle()
00227                                         
00228                                         continue
00229                         except StopIteration:
00230                                 self.current_task = None
00231                                 self._exit(task)
00232                                 continue
00233                         self._schedule(task)
00234         
00235         
00236         
00237         
00238         def current_time(self):
00239                 """ Return the current time """
00240                 return time.time()
00241         
00242         
00243         
00244         
00245         def _sleep(self, duration):
00246                 """ Sleep a certain amount of time """
00247                 time.sleep(duration)
00248         
00249         def _set_timer_callback(self, t, f):
00250                 """ Execute function f at time t """
00251                 raise NotImplementedError('timer callback mechanism must be provided by derived class')
00252         
00253         def _log_task_created(self, task):
00254                 """ Log for task created """
00255                 print time.ctime() + " - Task %s (tid %d) created" % (task.target.__name__, task.tid)
00256         
00257         def _log_task_terminated(self, task):
00258                 """ Log for task terminated """
00259                 print time.ctime() + " - Task %s (tid %d) terminated" % (task.target.__name__, task.tid)
00260         
00261         
00262 
00263         def _exit(self,exiting_task):
00264                 """ Handle the termination of a task """
00265                 self._log_task_terminated(exiting_task)
00266                 del self.taskmap[exiting_task.tid]
00267                 
00268                 to_remove_keys = []
00269                 for task in self.exit_waiting.pop(exiting_task.tid,[]):
00270                         if task.waitmode == Task.WAIT_ANY:
00271                                 
00272                                 for waited_tid, waiting_tasks_list in self.exit_waiting.iteritems():
00273                                         
00274                                         for waiting_task in waiting_tasks_list:
00275                                                 if waiting_task.tid == task.tid:
00276                                                         waiting_tasks_list.remove(waiting_task)
00277                                 
00278                                 task.sendval = exiting_task.tid
00279                                 self._schedule(task)
00280                         else:
00281                                 are_still_waiting = False
00282                                 for waited_tid, waiting_tasks_list in self.exit_waiting.iteritems():
00283                                         for waiting_task in waiting_tasks_list:
00284                                                 if waiting_task.tid == task.tid:
00285                                                         are_still_waiting = True
00286                                 if not are_still_waiting:
00287                                         
00288                                         task.sendval = exiting_task.tid
00289                                         self._schedule(task)
00290                 self.exit_waiting = dict((k,v) for (k,v) in self.exit_waiting.iteritems() if v)
00291 
00292         def _wait_for_exit(self,task,waittid):
00293                 """ Set task waiting of the exit of task waittid """
00294                 if waittid in self.taskmap:
00295                         self.exit_waiting.setdefault(waittid,[]).append(task)
00296                         return True
00297                 else:
00298                         return False
00299 
00300         def _schedule(self,task):
00301                 if task in self.paused_in_syscall:
00302                         self.paused_in_syscall.remove(task)
00303                         self.paused_in_ready.add(task)
00304                 else:
00305                         self.ready.append(task)
00306         
00307         def _schedule_now(self,task):
00308                 if task in self.paused_in_syscall:
00309                         self.paused_in_syscall.remove(task)
00310                         self.paused_in_ready.add(task)
00311                 else:
00312                         self.ready.appendleft(task)
00313                 
00314         def _wait_duration(self,task,duration):
00315                 def resume(task):
00316                         self._schedule_now(task)
00317                 self._set_timer_callback(self.current_time()+duration, lambda: resume(task))
00318         
00319         def _wait_duration_rate(self,task,duration,rate):
00320                 def resume(task,rate):
00321                         
00322                         rate.last_time = self.current_time()
00323                         
00324                         self._schedule_now(task)
00325                 self._set_timer_callback(self.current_time()+duration, lambda: resume(task, rate))
00326         
00327         def _add_condition(self,entry):
00328                 condition = entry[0]
00329                 vars_in_cond = dict(inspect.getmembers(dict(inspect.getmembers(condition))['func_code']))['co_names']
00330                 for var in vars_in_cond:
00331                         if var not in self.cond_waiting:
00332                                 self.cond_waiting[var] = []
00333                         self.cond_waiting[var].append(entry)
00334         
00335         def _del_condition(self,candidate):
00336                 (condition, task) = candidate
00337                 vars_in_cond = dict(inspect.getmembers(dict(inspect.getmembers(condition))['func_code']))['co_names']
00338                 for var in vars_in_cond:
00339                         if var in self.cond_waiting:
00340                                 self.cond_waiting[var].remove(candidate)
00341                                 if not self.cond_waiting[var]:
00342                                         del self.cond_waiting[var]
00343         
00344         def _wait_condition(self,task,condition):
00345                 
00346                 entry = (condition,task)
00347                 if not condition():
00348                         self._add_condition(entry)
00349                 else:
00350                         self._schedule_now(task)
00351                 
00352         def _test_conditions(self, name):
00353                 
00354                 if name not in self.cond_waiting:
00355                         return
00356                 
00357                 candidates = copy.copy(self.cond_waiting[name])
00358                 for candidate in candidates:
00359                         (condition, task) = candidate
00360                         if task not in self.paused_in_syscall and condition():
00361                                 self._schedule(task)
00362                                 self._del_condition(candidate)
00363 
00364 
00365 class TimerScheduler(Scheduler):
00366         """ A scheduler that sleeps when there is nothing to do. """
00367         
00368         def __init__(self):
00369                 """ Initialize """
00370                 super(TimerScheduler, self).__init__()
00371                 self.timer_cb = []
00372                 self.timer_counter = 0
00373         
00374         
00375         
00376         def run(self):
00377                 """ Run until there is no task to schedule """
00378                 if self.current_task is not None:
00379                         raise RuntimeError('TimerScheduler.run() called within a task.')
00380                 while self.timer_cb or self.ready or self.cond_waiting:
00381                         self.step()
00382                         t, counter, f = heapq.heappop(self.timer_cb)
00383                         duration = t - self.current_time()
00384                         if duration >= 0:
00385                                 self._sleep(duration)
00386                         f()
00387                         self.step()
00388         
00389         def timer_step(self):
00390                 """ Schedule all tasks with past deadlines and step """
00391                 if self.current_task is not None:
00392                         raise RuntimeError('TimerScheduler.timer_step() called within a task.')
00393                 while self.timer_cb:
00394                         t, counter, f = heapq.heappop(self.timer_cb)
00395                         duration = t - self.current_time()
00396                         if duration <= 0:
00397                                 f()
00398                         else:
00399                                 heapq.heappush(self.timer_cb, [t, counter, f])
00400                                 break
00401                 self.step()
00402         
00403         
00404         
00405         def _set_timer_callback(self, t, f):
00406                 """ Implement the timer callback """
00407                 heapq.heappush(self.timer_cb, [t, self.timer_counter, f])
00408                 self.timer_counter += 1
00409         
00410 
00411 
00412 
00413 
00414 class Rate(object):
00415         """ Helper class to execute a loop at a certain rate """
00416         def __init__(self,duration,initial_time):
00417                 """ Initialize """
00418                 self.duration = duration
00419                 self.last_time = initial_time
00420         def sleep(self,sched,task):
00421                 """ Sleep for the rest of this period """
00422                 cur_time = sched.current_time()
00423                 delta_time = self.duration - (cur_time - self.last_time)
00424                 if delta_time > 0:
00425                         sched._wait_duration_rate(task, delta_time, self)
00426                 else:
00427                         sched._schedule(task)
00428                 return delta_time
00429 
00430 
00431 
00432 
00433 
00434 class SystemCall(object):
00435         """ Parent of all system calls """
00436         def handle(self):
00437                 """ Called in the scheduler context """
00438                 raise NotImplementedError('system call superclass should not be used directly')
00439 
00440 class Pass(SystemCall):
00441         """ Pass the execution to other tasks """
00442         def handle(self):
00443                 self.task.sendval = True
00444                 self.sched._schedule(self.task)
00445         
00446 class GetScheduler(SystemCall):
00447         """ Return the scheduler, useful to access condition variables """
00448         def handle(self):
00449                 self.task.sendval = self.sched
00450                 self.sched._schedule(self.task)
00451 
00452 class WaitTask(SystemCall):
00453         """ Wait for a task to exit, return whether the wait was a success """
00454         def __init__(self,tid):
00455                 self.tid = tid
00456         def handle(self):
00457                 result = self.sched._wait_for_exit(self.task,self.tid)
00458                 self.task.sendval = result
00459                 self.task.waitmode = Task.WAIT_ANY
00460                 
00461                 
00462                 if not result:
00463                         self.sched._schedule(self.task)
00464 
00465 class WaitAnyTasks(SystemCall):
00466         """ Wait for any tasks to exit, return whether the wait was a success """
00467         def __init__(self,tids):
00468                 self.tids = tids
00469         def handle(self):
00470                 self.task.waitmode = Task.WAIT_ANY
00471                 
00472                 all_exist = True
00473                 non_existing_tid = None
00474                 for tid in self.tids:
00475                         if tid not in self.sched.taskmap:
00476                                 all_exist = False
00477                                 non_existing_tid = tid
00478                                 break
00479                 
00480                 if all_exist:
00481                         for tid in self.tids:
00482                                 self.sched._wait_for_exit(self.task,tid)
00483                         
00484                         
00485                 else:
00486                         
00487                         
00488                         self.task.sendval = non_existing_tid
00489                         self.sched._schedule(self.task)
00490 
00491 class WaitAllTasks(SystemCall):
00492         """ Wait for all tasks to exit, return whether the wait was a success """
00493         def __init__(self,tids):
00494                 self.tids = tids
00495         def handle(self):
00496                 self.task.waitmode = Task.WAIT_ALL
00497                 any_exist = False
00498                 for tid in self.tids:
00499                         result = self.sched._wait_for_exit(self.task,tid)
00500                         any_exist = any_exist or result
00501                 
00502                 
00503                 if any_exist:
00504                         self.task.sendval = True                        
00505                 else:
00506                         self.task.sendval = False
00507                         self.sched._schedule(self.task)
00508 
00509 class WaitDuration(SystemCall):
00510         """ Pause current task for a certain duration """
00511         def __init__(self,duration):
00512                 self.duration = duration
00513         def handle(self):
00514                 self.sched._wait_duration(self.task, self.duration)
00515                 self.task.sendval = None
00516 
00517 class WaitCondition(SystemCall):
00518         """ Pause current task until the condition is true """
00519         def __init__(self,condition):
00520                 self.condition = condition
00521         def handle(self):
00522                 self.sched._wait_condition(self.task,self.condition)
00523                 self.task.sendval = None
00524 
00525 class Sleep(SystemCall):
00526         """ Sleep using a rate object """
00527         def __init__(self,rate):
00528                 self.rate = rate
00529         def handle(self):
00530                 self.task.sendval = self.rate.sleep(self.sched, self.task)