teer.py
Go to the documentation of this file.
00001 # -*- coding: utf-8 -*-
00002 # kate: replace-tabs off; indent-width 4; indent-mode normal
00003 # vim: ts=4:sw=4:noexpandtab
00004 
00005 # Copyright (c) 2012 Stéphane Magnenat, ETHZ Zürich and other contributors
00006 # See file authors.txt for details.
00007 # All rights reserved.
00008 #
00009 # Redistribution and use in source and binary forms, with or without
00010 # modification, are permitted provided that the following conditions are met:
00011 #
00012 #   * Redistributions of source code must retain the above copyright notice,
00013 #     this list of conditions and the following disclaimer.
00014 #   * Redistributions in binary form must reproduce the above copyright notice,
00015 #     this list of conditions and the following disclaimer in the documentation
00016 #     and/or other materials provided with the distribution.
00017 #   * Neither the name of Stéphane Magnenat, ETHZ Zürich, nor the names
00018 #     of the contributors may be used to endorse or promote products derived
00019 #     from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00022 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00023 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00024 # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
00025 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
00026 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
00027 # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00028 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
00029 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
00030 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00031 
00032 from collections import deque
00033 import time
00034 import heapq
00035 import copy
00036 import inspect
00037 
00038 # ------------------------------------------------------------
00039 #                       === Tasks ===
00040 # ------------------------------------------------------------
00041 class Task(object):
00042         """ The object representing a task/co-routine in the scheduler """
00043         WAIT_ANY = 1
00044         WAIT_ALL = 2
00045         taskid = 0
00046         def __init__(self,target):
00047                 """ Initialize """
00048                 Task.taskid += 1
00049                 self.tid     = Task.taskid   # Task ID
00050                 self.target  = target        # Target coroutine
00051                 self.sendval = None          # Value to send
00052                 self.waitmode = Task.WAIT_ANY
00053         def __repr__(self):
00054                 """ Debug information on a task """
00055                 return 'Task ' + str(self.tid) + ' (' + self.target.__name__ + ') @ ' + str(id(self))
00056         def run(self):
00057                 """ Run a task until it hits the next yield statement"""
00058                 return self.target.send(self.sendval)
00059 
00060 # ------------------------------------------------------------
00061 #                === Conditional Variables ===
00062 # ------------------------------------------------------------
00063 class ConditionVariable(object):
00064         """ The basic conditional variable """
00065         def __init__(self, initval=None):
00066                 """ Initialize """
00067                 self.val = initval
00068                 self.myname = None
00069         def __get__(self, obj, objtype):
00070                 """ Return the value """
00071                 return self.val
00072         def __set__(self, obj, val):
00073                 """ Set a value, evaluate conditions for tasks waiting on this variable """
00074                 self.val = val
00075                 self._set_name(type(obj))
00076                 obj._test_conditions(self.myname)
00077         def _set_name(self, cls, top_level=True):
00078                 """ if unknown, retrieve my own name using introspection """
00079                 if self.myname is None:
00080                         members = cls.__dict__
00081                         # first look into members
00082                         for name, value in members.iteritems():
00083                                 if value is self:
00084                                         self.myname = name
00085                                         break
00086                         # look into parents
00087                         for parent_cls in cls.__bases__:
00088                                 self._set_name(parent_cls,False)
00089                         # if not found and top-level, assert
00090                         if top_level:
00091                                 assert self.myname is not None
00092 
00093 # ------------------------------------------------------------
00094 #                      === Scheduler ===
00095 # ------------------------------------------------------------
00096 class Scheduler(object):
00097         """ The scheduler base object, do not instanciate directly """
00098         def __init__(self):
00099                 """ Initialize """
00100                 # Map of all task identifiers to tasks
00101                 self.taskmap = {}
00102                 # Deque of ready tasks
00103                 self.ready   = deque()   
00104                 # Tasks waiting for other tasks to exit, map of: tid => list of tasks
00105                 self.exit_waiting = {}
00106                 # Task waiting on conditions, map of: "name of condition variable" => (condition, task)
00107                 self.cond_waiting = {}
00108                 # Task being paused by another task
00109                 self.paused_in_syscall = set()
00110                 self.paused_in_ready = set()
00111                 # Not running a task initially
00112                 self.current_task = None
00113         
00114         # Public API, these functions are safe to be called from within a task or from outside
00115         
00116         def list_all_tids(self):
00117                 """ Return all task identifiers """
00118                 return self.taskmap.keys()
00119         
00120         def get_current_tid(self):
00121                 """ Return the identifier of current task, None if not called from a task """
00122                 if self.current_task is not None:
00123                         return self.current_task.tid
00124                 else:
00125                         return None
00126         
00127         def new_task(self, target):
00128                 """ Create a new task from function target, return the task identifier """
00129                 newtask = Task(target)
00130                 self.taskmap[newtask.tid] = newtask
00131                 self._schedule(newtask)
00132                 self._log_task_created(newtask)
00133                 return newtask.tid
00134         
00135         def kill_task(self, tid):
00136                 """ Kill a task, return whether the task was killed """
00137                 task = self.taskmap.get(tid,None)
00138                 if task:
00139                         task.target.close() 
00140                         return True
00141                 else:
00142                         return False
00143         
00144         def kill_tasks(self, tids):
00145                 """ Kill multiple tasks, return the list of killed tasks """
00146                 return filter(self.kill_task, tids)
00147         
00148         def kill_all_tasks_except(self, tids):
00149                 """ Kill all tasks except a subset, return the list of killed tasks """
00150                 to_kill = filter(lambda tid: tid not in tids, self.list_all_tids())
00151                 return self.kill_tasks(to_kill)
00152 
00153         def pause_task(self, tid):
00154                 """ Pause a task, return whether the task was paused """
00155                 task = self.taskmap.get(tid,None)
00156                 if task is None \
00157                         or task is self.current_task\
00158                         or task in self.paused_in_ready\
00159                         or task in self.paused_in_syscall:
00160                         return False
00161                 if task in self.ready:
00162                         self.ready.remove(task)
00163                         self.paused_in_ready.add(task)
00164                 else:
00165                         self.paused_in_syscall.add(task)
00166                 return True
00167         
00168         def pause_tasks(self, tids):
00169                 """ Pause multiple tasks, return the lits of paused tasks """
00170                 return filter(self.pause_task, tids)
00171         
00172         def pause_all_tasks_except(self, tids):
00173                 """ Pause all tasks except a subset, return the list of paused tasks """
00174                 to_pause = filter(lambda tid: tid not in tids, self.list_all_tids())
00175                 return self.pause_tasks(to_pause)
00176         
00177         def resume_task(self, tid):
00178                 """ Resume a task, return whether the task was resumed successfully """
00179                 task = self.taskmap.get(tid,None)
00180                 if task is None or task is self.current_task:
00181                         return False
00182                 if task in self.paused_in_ready:
00183                         self.paused_in_ready.remove(task)
00184                         self.ready.append(task)
00185                         return True
00186                 elif task in self.paused_in_syscall:
00187                         self.paused_in_syscall.remove(task)
00188                         return True
00189                 return False
00190         
00191         def resume_tasks(self, tids):
00192                 """ Resume the execution of multiple tasks, return the list of resumed tasks """
00193                 return filter(self.resume_task, tids)
00194         
00195         def resume_all_tasks_except(self, tids):
00196                 """ Resume all tasks except a subset, return the list of resumed tasks """
00197                 to_resume = filter(lambda tid: tid not in tids, self.list_all_tids())
00198                 return self.resume_tasks(to_resume)
00199         
00200         def create_rate(self, rate):
00201                 """ Create a rate object, to have a loop at a certain frequency """
00202                 duration = 1./rate
00203                 initial_time = self.current_time()
00204                 return Rate(duration, initial_time)
00205         
00206         def printd(self, msg):
00207                 """ Print something including the current task identifier """
00208                 print "[teer tid: " + str(self.get_current_tid()) + "] " + msg
00209         
00210         # Public API, these funtions must be called outside a task
00211         
00212         def step(self):
00213                 """ Run all tasks until none is ready """
00214                 if self.current_task is not None:
00215                         raise RuntimeError('Scheduler.step() called within a task.')
00216                 while self.ready:
00217                         task = self.ready.popleft()
00218                         try:
00219                                 #print 'Running ' + str(task)
00220                                 self.current_task = task
00221                                 result = task.run()
00222                                 self.current_task = None
00223                                 if isinstance(result,SystemCall):
00224                                         result.task  = task
00225                                         result.sched = self
00226                                         result.handle()
00227                                         #print 'ready queue B: ' + str(self.ready)
00228                                         continue
00229                         except StopIteration:
00230                                 self.current_task = None
00231                                 self._exit(task)
00232                                 continue
00233                         self._schedule(task)
00234         
00235         # Public API, these functions are safe to be called from within a task or from outside
00236         # these functions can be overriden by children
00237         
00238         def current_time(self):
00239                 """ Return the current time """
00240                 return time.time()
00241         
00242         # Protected implementations, these functions can only be called by functions from this object
00243         # these functions can be overriden by children
00244         
00245         def _sleep(self, duration):
00246                 """ Sleep a certain amount of time """
00247                 time.sleep(duration)
00248         
00249         def _set_timer_callback(self, t, f):
00250                 """ Execute function f at time t """
00251                 raise NotImplementedError('timer callback mechanism must be provided by derived class')
00252         
00253         def _log_task_created(self, task):
00254                 """ Log for task created """
00255                 print time.ctime() + " - Task %s (tid %d) created" % (task.target.__name__, task.tid)
00256         
00257         def _log_task_terminated(self, task):
00258                 """ Log for task terminated """
00259                 print time.ctime() + " - Task %s (tid %d) terminated" % (task.target.__name__, task.tid)
00260         
00261         # Protected implementations, these functions can only be called by functions from this object
00262 
00263         def _exit(self,exiting_task):
00264                 """ Handle the termination of a task """
00265                 self._log_task_terminated(exiting_task)
00266                 del self.taskmap[exiting_task.tid]
00267                 # Notify other tasks waiting for exit
00268                 to_remove_keys = []
00269                 for task in self.exit_waiting.pop(exiting_task.tid,[]):
00270                         if task.waitmode == Task.WAIT_ANY:
00271                                 # remove associations to other tasks waited on
00272                                 for waited_tid, waiting_tasks_list in self.exit_waiting.iteritems():
00273                                         # remove task form list of waiting tasks if in there
00274                                         for waiting_task in waiting_tasks_list:
00275                                                 if waiting_task.tid == task.tid:
00276                                                         waiting_tasks_list.remove(waiting_task)
00277                                 # return the tid of the exiting_task 
00278                                 task.sendval = exiting_task.tid
00279                                 self._schedule(task)
00280                         else:
00281                                 are_still_waiting = False
00282                                 for waited_tid, waiting_tasks_list in self.exit_waiting.iteritems():
00283                                         for waiting_task in waiting_tasks_list:
00284                                                 if waiting_task.tid == task.tid:
00285                                                         are_still_waiting = True
00286                                 if not are_still_waiting:
00287                                         # return the tid of the exiting_task 
00288                                         task.sendval = exiting_task.tid
00289                                         self._schedule(task)
00290                 self.exit_waiting = dict((k,v) for (k,v) in self.exit_waiting.iteritems() if v)
00291 
00292         def _wait_for_exit(self,task,waittid):
00293                 """ Set task waiting of the exit of task waittid """
00294                 if waittid in self.taskmap:
00295                         self.exit_waiting.setdefault(waittid,[]).append(task)
00296                         return True
00297                 else:
00298                         return False
00299 
00300         def _schedule(self,task):
00301                 if task in self.paused_in_syscall:
00302                         self.paused_in_syscall.remove(task)
00303                         self.paused_in_ready.add(task)
00304                 else:
00305                         self.ready.append(task)
00306         
00307         def _schedule_now(self,task):
00308                 if task in self.paused_in_syscall:
00309                         self.paused_in_syscall.remove(task)
00310                         self.paused_in_ready.add(task)
00311                 else:
00312                         self.ready.appendleft(task)
00313                 
00314         def _wait_duration(self,task,duration):
00315                 def resume(task):
00316                         self._schedule_now(task)
00317                 self._set_timer_callback(self.current_time()+duration, lambda: resume(task))
00318         
00319         def _wait_duration_rate(self,task,duration,rate):
00320                 def resume(task,rate):
00321                         # get current time
00322                         rate.last_time = self.current_time()
00323                         # if not paused, execute the resumed task directly once we exit the syscall
00324                         self._schedule_now(task)
00325                 self._set_timer_callback(self.current_time()+duration, lambda: resume(task, rate))
00326         
00327         def _add_condition(self,entry):
00328                 condition = entry[0]
00329                 vars_in_cond = dict(inspect.getmembers(dict(inspect.getmembers(condition))['func_code']))['co_names']
00330                 for var in vars_in_cond:
00331                         if var not in self.cond_waiting:
00332                                 self.cond_waiting[var] = []
00333                         self.cond_waiting[var].append(entry)
00334         
00335         def _del_condition(self,candidate):
00336                 (condition, task) = candidate
00337                 vars_in_cond = dict(inspect.getmembers(dict(inspect.getmembers(condition))['func_code']))['co_names']
00338                 for var in vars_in_cond:
00339                         if var in self.cond_waiting:
00340                                 self.cond_waiting[var].remove(candidate)
00341                                 if not self.cond_waiting[var]:
00342                                         del self.cond_waiting[var]
00343         
00344         def _wait_condition(self,task,condition):
00345                 # add a new condition and directly evalutate it once
00346                 entry = (condition,task)
00347                 if not condition():
00348                         self._add_condition(entry)
00349                 else:
00350                         self._schedule_now(task)
00351                 
00352         def _test_conditions(self, name):
00353                 # is there any task waiting on this name?
00354                 if name not in self.cond_waiting:
00355                         return
00356                 # check which conditions are true
00357                 candidates = copy.copy(self.cond_waiting[name])
00358                 for candidate in candidates:
00359                         (condition, task) = candidate
00360                         if task not in self.paused_in_syscall and condition():
00361                                 self._schedule(task)
00362                                 self._del_condition(candidate)
00363 
00364 
00365 class TimerScheduler(Scheduler):
00366         """ A scheduler that sleeps when there is nothing to do. """
00367         
00368         def __init__(self):
00369                 """ Initialize """
00370                 super(TimerScheduler, self).__init__()
00371                 self.timer_cb = []
00372                 self.timer_counter = 0
00373         
00374         # Public API, these funtions must be called outside a task
00375         
00376         def run(self):
00377                 """ Run until there is no task to schedule """
00378                 if self.current_task is not None:
00379                         raise RuntimeError('TimerScheduler.run() called within a task.')
00380                 while self.timer_cb or self.ready or self.cond_waiting:
00381                         self.step()
00382                         t, counter, f = heapq.heappop(self.timer_cb)
00383                         duration = t - self.current_time()
00384                         if duration >= 0:
00385                                 self._sleep(duration)
00386                         f()
00387                         self.step()
00388         
00389         def timer_step(self):
00390                 """ Schedule all tasks with past deadlines and step """
00391                 if self.current_task is not None:
00392                         raise RuntimeError('TimerScheduler.timer_step() called within a task.')
00393                 while self.timer_cb:
00394                         t, counter, f = heapq.heappop(self.timer_cb)
00395                         duration = t - self.current_time()
00396                         if duration <= 0:
00397                                 f()
00398                         else:
00399                                 heapq.heappush(self.timer_cb, [t, counter, f])
00400                                 break
00401                 self.step()
00402         
00403         # Protected implementations, these functions can only be called by functions from this object
00404         
00405         def _set_timer_callback(self, t, f):
00406                 """ Implement the timer callback """
00407                 heapq.heappush(self.timer_cb, [t, self.timer_counter, f])
00408                 self.timer_counter += 1
00409         
00410 # ------------------------------------------------------------
00411 #                   === Helper objects ===
00412 # ------------------------------------------------------------
00413 
00414 class Rate(object):
00415         """ Helper class to execute a loop at a certain rate """
00416         def __init__(self,duration,initial_time):
00417                 """ Initialize """
00418                 self.duration = duration
00419                 self.last_time = initial_time
00420         def sleep(self,sched,task):
00421                 """ Sleep for the rest of this period """
00422                 cur_time = sched.current_time()
00423                 delta_time = self.duration - (cur_time - self.last_time)
00424                 if delta_time > 0:
00425                         sched._wait_duration_rate(task, delta_time, self)
00426                 else:
00427                         sched._schedule(task)
00428                 return delta_time
00429 
00430 # ------------------------------------------------------------
00431 #                   === System Calls ===
00432 # ------------------------------------------------------------
00433 
00434 class SystemCall(object):
00435         """ Parent of all system calls """
00436         def handle(self):
00437                 """ Called in the scheduler context """
00438                 raise NotImplementedError('system call superclass should not be used directly')
00439 
00440 class Pass(SystemCall):
00441         """ Pass the execution to other tasks """
00442         def handle(self):
00443                 self.task.sendval = True
00444                 self.sched._schedule(self.task)
00445         
00446 class GetScheduler(SystemCall):
00447         """ Return the scheduler, useful to access condition variables """
00448         def handle(self):
00449                 self.task.sendval = self.sched
00450                 self.sched._schedule(self.task)
00451 
00452 class WaitTask(SystemCall):
00453         """ Wait for a task to exit, return whether the wait was a success """
00454         def __init__(self,tid):
00455                 self.tid = tid
00456         def handle(self):
00457                 result = self.sched._wait_for_exit(self.task,self.tid)
00458                 self.task.sendval = result
00459                 self.task.waitmode = Task.WAIT_ANY
00460                 # If waiting for a non-existent task,
00461                 # return immediately without waiting
00462                 if not result:
00463                         self.sched._schedule(self.task)
00464 
00465 class WaitAnyTasks(SystemCall):
00466         """ Wait for any tasks to exit, return whether the wait was a success """
00467         def __init__(self,tids):
00468                 self.tids = tids
00469         def handle(self):
00470                 self.task.waitmode = Task.WAIT_ANY
00471                 # Check if all tasks exist
00472                 all_exist = True
00473                 non_existing_tid = None
00474                 for tid in self.tids:
00475                         if tid not in self.sched.taskmap:
00476                                 all_exist = False
00477                                 non_existing_tid = tid
00478                                 break
00479                 # If all exist
00480                 if all_exist:
00481                         for tid in self.tids:
00482                                 self.sched._wait_for_exit(self.task,tid)
00483                         #dont set sendval, we want exit() to assign the exiting tasks tid
00484                         #self.task.sendval = True
00485                 else:
00486                         # If waiting for a non-existent task,
00487                         # return immediately without waiting
00488                         self.task.sendval = non_existing_tid
00489                         self.sched._schedule(self.task)
00490 
00491 class WaitAllTasks(SystemCall):
00492         """ Wait for all tasks to exit, return whether the wait was a success """
00493         def __init__(self,tids):
00494                 self.tids = tids
00495         def handle(self):
00496                 self.task.waitmode = Task.WAIT_ALL
00497                 any_exist = False
00498                 for tid in self.tids:
00499                         result = self.sched._wait_for_exit(self.task,tid)
00500                         any_exist = any_exist or result
00501                 # If waiting for non-existent tasks,
00502                 # return immediately without waiting
00503                 if any_exist:
00504                         self.task.sendval = True                        
00505                 else:
00506                         self.task.sendval = False
00507                         self.sched._schedule(self.task)
00508 
00509 class WaitDuration(SystemCall):
00510         """ Pause current task for a certain duration """
00511         def __init__(self,duration):
00512                 self.duration = duration
00513         def handle(self):
00514                 self.sched._wait_duration(self.task, self.duration)
00515                 self.task.sendval = None
00516 
00517 class WaitCondition(SystemCall):
00518         """ Pause current task until the condition is true """
00519         def __init__(self,condition):
00520                 self.condition = condition
00521         def handle(self):
00522                 self.sched._wait_condition(self.task,self.condition)
00523                 self.task.sendval = None
00524 
00525 class Sleep(SystemCall):
00526         """ Sleep using a rate object """
00527         def __init__(self,rate):
00528                 self.rate = rate
00529         def handle(self):
00530                 self.task.sendval = self.rate.sleep(self.sched, self.task)


teer_ros
Author(s): Stéphane Magnenat
autogenerated on Sun Oct 5 2014 23:56:00