plan.py
Go to the documentation of this file.
00001 # -*- Python -*- # -*- coding: utf-8 -*-
00002 
00003 '''rtshell
00004 
00005 Copyright (C) 2009-2014
00006     Geoffrey Biggs
00007     RT-Synthesis Research Group
00008     Intelligent Systems Research Institute,
00009     National Institute of Advanced Industrial Science and Technology (AIST),
00010     Japan
00011     All rights reserved.
00012 Licensed under the Eclipse Public License -v 1.0 (EPL)
00013 http://www.opensource.org/licenses/eclipse-1.0.txt
00014 
00015 Creation and execution of state change plans.
00016 
00017 '''
00018 
00019 
00020 import operator
00021 import os
00022 import rtctree.path
00023 import rtsprofile.message_sending
00024 import sys
00025 import thread
00026 import threading
00027 import time
00028 import traceback
00029 import types
00030 
00031 import rts_exceptions
00032 
00033 
00034 class Counter:
00035     def __init__(self):
00036         self._counter = 1
00037 
00038     @property
00039     def value(self):
00040         self._counter += 1
00041         return self._counter - 1
00042 ider = Counter()
00043 
00044 
00045 ###############################################################################
00046 ## Action executor objects
00047 
00048 class ActionExecutor(threading.Thread):
00049     '''An object for managing execution of an action.
00050 
00051     This object is capable of executing an action immediately when it is
00052     called, or delaying execution until a set of conditions is reached. Which
00053     should occur is chosen automatically. If execution needs to be delayed, a
00054     new thread will be started to manage it. This thread will sleep until all
00055     pre-conditions for the action are met. Any number of condition objects can
00056     be added.
00057 
00058     Callbacks can be added to be executed after executing the action.
00059 
00060     '''
00061     def __init__(self, action=None, owner_flag=None, *args, **kwargs):
00062         super(ActionExecutor, self).__init__(*args, **kwargs)
00063         self._action = action
00064         self._callbacks = []
00065         self._conditions = []
00066         self._cancelled = False
00067         self._cancel_lock = threading.Lock()
00068         self._error = None
00069         self._err_lock = threading.Lock()
00070         self._flag = threading.Event()
00071         self._completed = False
00072         self._completed_lock = threading.Lock()
00073         self._id = ider.value
00074         self._owner_flag = owner_flag
00075 
00076     def __str__(self):
00077         result = self.id_string + ' '
00078         if self._conditions:
00079             result += '[{0}'.format(self._conditions[0])
00080             for c in self._conditions[1:]:
00081                 result += ', {0}'.format(c)
00082             result += '] '
00083         return result + str(self._action)
00084 
00085     def __call__(self, *args, **kwargs):
00086         self._args = args
00087         self._kwargs = kwargs
00088         if self.immediate:
00089             self._execute_action()
00090         else:
00091             self._start_conds()
00092             self.start()
00093 
00094     def add_callback(self, callback):
00095         '''Add a new callback to be executed after the action.'''
00096         self._callbacks.append(callback)
00097 
00098     def add_condition(self, condition):
00099         '''Add a new condition on the action's execution.'''
00100         self._conditions.append(condition)
00101 
00102     def cancel(self):
00103         '''Cancel this action.'''
00104         with self._cancel_lock:
00105             self._cancelled = True
00106         self.set()
00107 
00108     def set(self):
00109         '''Notify this action executor that a condition may have been met.'''
00110         self._flag.set()
00111 
00112     def wait_for_exit(self):
00113         '''Wait for this action executor to exit.'''
00114         if self.immediate:
00115             # If this executor is immediate, it won't have a thread.
00116             return
00117         self.join()
00118 
00119     @property
00120     def action(self):
00121         '''The action object that will be executed.'''
00122         return self._action
00123 
00124     @action.setter
00125     def action(self, action):
00126         self._action = action
00127 
00128     @property
00129     def complete(self):
00130         '''Has this action completed yet?'''
00131         with self._completed_lock:
00132             return self._completed
00133 
00134     @property
00135     def error(self):
00136         '''Any error that has occurred in this executor or one of its
00137         conditions.'''
00138         with self._err_lock:
00139             return self._error
00140 
00141     @property
00142     def id(self):
00143         '''ID of this action.'''
00144         return self._id
00145 
00146     @property
00147     def id_string(self):
00148         '''ID of this action as a string.'''
00149         return '{' + '{0}'.format(self._id) + '}'
00150 
00151     @property
00152     def immediate(self):
00153         '''Tests if this executor can execute immediately.
00154 
00155         If this executor has any conditions that are not immediate, the
00156         executor itself is not immediate.
00157 
00158         '''
00159         for c in self._conditions:
00160             if not c.immediate:
00161                 return False
00162         return True
00163 
00164     @property
00165     def sort_order(self):
00166         '''The integer order for this action in plans.
00167 
00168         Generally, all actions will have a sequence number. For those that
00169         don't, this property will be negative. Actions should be executed in
00170         the order of their sequence number, from smaller to bigger.
00171 
00172         '''
00173         if not self._conditions:
00174             return -1
00175         return min([c.sequence for c in self._conditions])
00176 
00177     def run(self):
00178         cons_satisfied = False
00179         if not self._conditions:
00180             # All conditions have been met
00181             cons_satisfied = True
00182         while not cons_satisfied:
00183             for c in self._conditions:
00184                 if c.error:
00185                     # Propagate the error upwards
00186                     self._error = c.error
00187                     if self._owner_flag:
00188                         self._owner_flag.set()
00189                         return
00190             self._flag.wait()
00191             self._flag.clear()
00192             with self._cancel_lock:
00193                 if self._cancelled:
00194                     self._cancel_conditions()
00195                     return
00196             self._reduce_conds()
00197             if not self._conditions:
00198                 # All conditions have been met
00199                 cons_satisfied = True
00200         self._execute_action()
00201         if self._owner_flag:
00202             self._owner_flag.set()
00203         with self._completed_lock:
00204             self._completed = True
00205 
00206     def _cancel_conditions(self):
00207         for c in self._conditions:
00208             c.cancel()
00209         for c in self._conditions:
00210             c.wait_for_exit()
00211 
00212     def _do_callbacks(self):
00213         for c in self._callbacks:
00214             c(*self._args, **self._kwargs)
00215 
00216     def _execute_action(self):
00217         print >>sys.stderr, 'Executing {0} {1}'.format(self.id_string,
00218                 self._action)
00219         self._action(*self._args, **self._kwargs)
00220         self._do_callbacks()
00221 
00222     def _reduce_conds(self):
00223         result = []
00224         for c in self._conditions:
00225             if c.satisfied:
00226                 c.wait_for_exit()
00227             else:
00228                 result.append(c)
00229                 continue
00230         self._conditions = result
00231 
00232     def _start_conds(self):
00233         for c in self._conditions:
00234             if not c.immediate:
00235                 c.set_args(*self._args, **self._kwargs)
00236                 c.start()
00237 
00238 
00239 ###############################################################################
00240 ## Condition objects
00241 
00242 class BasicCondition(object):
00243     '''A simple condition specifying just a sequence ordering.
00244 
00245     This condition is immediate.
00246 
00247     All other condition types should inherit from this type, and must implement
00248     the @ref start method.
00249 
00250     '''
00251     def __init__(self, executor=None, sequence=-1, desc='', *args, **kwargs):
00252         super(BasicCondition, self).__init__()
00253         self._executor = executor
00254         self._sequence = sequence
00255         self._desc = desc
00256         self._error = None
00257         self._err_lock = threading.Lock()
00258         self._immediate = True
00259         self._satisfied = True
00260         self._sat_lock = threading.Lock()
00261 
00262     def __str__(self):
00263         return 'Order {0}'.format(self._sequence)
00264 
00265     def cancel(self):
00266         return
00267 
00268     def set_args(self, *args, **kwargs):
00269         self._args = args
00270         self._kwargs = kwargs
00271 
00272     def start(self):
00273         return
00274 
00275     def wait_for_exit(self):
00276         '''Wait for this condition's thread to exit.'''
00277         # Nothing to join in an immediate condition
00278         return
00279 
00280     @property
00281     def error(self):
00282         '''Any error that has occurred in this condition.'''
00283         with self._err_lock:
00284             return self._error
00285 
00286     @property
00287     def immediate(self):
00288         '''Will this condition be satisfied immediately?'''
00289         return self._immediate
00290 
00291     @property
00292     def satisfied(self):
00293         '''Is this condition satisfied?'''
00294         with self._sat_lock:
00295             return self._satisfied
00296 
00297     @property
00298     def sequence(self):
00299         '''The sequence order of this condition.'''
00300         return self._sequence
00301 
00302 
00303 class SleepCondition(BasicCondition):
00304     '''A condition that waits for a period of time before being satisfied.
00305 
00306     This condition is delayed.
00307 
00308     This condition is essentially a sleep. It starts a threaded timer, which
00309     sleeps for the given period of time (in ms) before waking up and setting
00310     the condition to satisfied.
00311 
00312     '''
00313     def __init__(self, wait_time=0, *args, **kwargs):
00314         super(SleepCondition, self).__init__(wait_time=wait_time, *args, **kwargs)
00315         self._wait_time_ms = wait_time
00316         self._wait_time = wait_time / 1000.0
00317         self._immediate = False
00318         self._satisfied = False
00319         self._timer = None
00320 
00321     def __str__(self):
00322         return super(SleepCondition, self).__str__() + \
00323                 '/Wait {0}ms'.format(self._wait_time_ms)
00324 
00325     def cancel(self):
00326         self._timer.cancel()
00327         self._timer.join()
00328 
00329     def satisfy(self):
00330         with self._sat_lock:
00331             self._satisfied = True
00332         self._executor.set()
00333 
00334     def start(self):
00335         self._timer = threading.Timer(self._wait_time, self.satisfy)
00336         self._timer.start()
00337 
00338 
00339 class DelayedCondition(BasicCondition, threading.Thread):
00340     '''Base class for delayed conditions.
00341 
00342     Inheriting condition objects should implement the @ref check method,
00343     returning True or False appropriately.
00344 
00345     Delayed conditions start a separate thread, which they use to perform their
00346     condition check at an appropriate time. They use the reference to their
00347     owner to signal it that the condition has been met. If their condition is
00348     not met within an optional timeout (specified in ms), @ref
00349     PrecedingTimeoutError is set. Set the timeout to None for no timeout.
00350 
00351     Once a delayed condition is satisfied, you should ensure its thread has
00352     completed by calling @ref wait_for_exit.
00353 
00354     '''
00355     def __init__(self, timeout=None, *args, **kwargs):
00356         super(DelayedCondition, self).__init__(timeout=timeout, *args, **kwargs)
00357         self._immediate = False
00358         self._satisfied = False
00359         self._cancelled = False
00360         self._cancel_lock = threading.Lock()
00361         self._timeout = timeout / 1000.0
00362 
00363     def __str__(self):
00364         return super(DelayedCondition, self).__str__() + \
00365                 '/' + self._desc
00366 
00367     def cancel(self):
00368         with self._cancel_lock:
00369             self._cancelled = True
00370 
00371     def start(self):
00372         threading.Thread.start(self)
00373 
00374     def wait_for_exit(self):
00375         self.join()
00376 
00377     def run(self):
00378         self._start_time = time.time()
00379         while True:
00380             try:
00381                 satisfied = self._check()
00382             except Exception, e:
00383                 self._set_error(traceback.format_exc())
00384                 break
00385             with self._cancel_lock:
00386                 if self._cancelled:
00387                     return
00388             with self._sat_lock:
00389                 self._satisfied = satisfied
00390             if satisfied:
00391                 # Signal the owner
00392                 self._executor.set()
00393                 break
00394             if type(self._timeout) is not types.NoneType:
00395                 # Check if the remaining time is greater than zero
00396                 if self._check_timeout() <= 0.0:
00397                     self._set_error(
00398                             rts_exceptions.PrecedingTimeoutError(self._desc))
00399                     return
00400 
00401     def _check_timeout(self):
00402         diff = time.time() - self._start_time
00403         return self._timeout - diff
00404 
00405     def _set_error(self, e):
00406         with self._err_lock:
00407             self._error = e
00408             # Signal the owner so it checks the error condition
00409             self._executor.set()
00410 
00411 
00412 class EventCondition(DelayedCondition):
00413     '''A condition that waits for an event.
00414 
00415     This condition is delayed.
00416 
00417     This condition waits on a @ref threading.Event object. It uses a separate
00418     thread to perform the wait; it will sleep until its event is set. When the
00419     event is set, it wakes up and notifies its executor, then exits. If the
00420     event is not set within an optional timeout (specified in ms), @ref
00421     PrecedingTimeoutError is set. Set timeout to None for no timeout.
00422 
00423     '''
00424     def __init__(self, *args, **kwargs):
00425         super(EventCondition, self).__init__(*args, **kwargs)
00426         self._event = threading.Event()
00427 
00428     def cancel(self):
00429         self._event.set()
00430         super(EventCondition, self).cancel()
00431 
00432     def set(self):
00433         self._event.set()
00434 
00435     def _check(self):
00436         self._event.wait(self._timeout)
00437         if self._event.is_set():
00438             return True
00439         return False
00440 
00441 
00442 class MonitorCondition(DelayedCondition):
00443     '''A condition that monitors a state.
00444 
00445     This condition is delayed.
00446 
00447     This condition continuously monitors the state of a callback function. When
00448     the callback's return value matches a provided target value, the condition
00449     is satisfied. If this does not occur within an optional timeout (specified
00450     in ms), @ref PrecedingTimeoutError is set. Set timeout to None for no
00451     timeout. The callback will be called at the frequency specified, in Hertz.
00452 
00453     The callback will be passed all the arguments that get passed to @ref
00454     set_args. It should accept any arguments it needs, as well as *args and
00455     **kwargs.
00456 
00457     '''
00458     def __init__(self, callback=None, target=None, freq=100, *args, **kwargs):
00459         super(MonitorCondition, self).__init__(callback=callback,
00460                 target=target, freq=freq, *args, **kwargs)
00461         self._callback = callback
00462         self._target = target
00463         self._sleep_time = 1.0 / freq
00464 
00465     def _check(self):
00466         if self._callback(*self._args, **self._kwargs) == self._target:
00467             return True
00468         time.sleep(self._sleep_time)
00469         return False
00470 
00471 
00472 ###############################################################################
00473 ## Plan object
00474 
00475 def _make_check_comp_state_cb(rtsprofile, target_comp):
00476     def cb(rtctree=None, *args, **kwargs):
00477         comp = rtsprofile.find_comp_by_target(target_comp)
00478         path = '/' + comp.path_uri
00479         comp = rtctree.get_node(rtctree.path.parse_path(path)[0])
00480         return comp.refresh_state_in_ec(comp.get_ec_index(target_comp.id))
00481     return cb
00482 
00483 def _make_action_cb(target_ec):
00484     def cb(*args, **kwargs):
00485         target_ec.set()
00486     return cb
00487 
00488 class Plan(object):
00489     '''A plan for changing the state of an RT System.
00490 
00491     A plan has two sets of actions to perform. The first is stored in a sorted
00492     list; it is all actions that are to be executed immediately.  The second
00493     set is stored in a separate list, also sorted. This set contains actions
00494     that will be executed at a later point in time, based on some condition.
00495     Many of these will execute on their own threads.
00496 
00497     To execute the plan, call it. A plan can be cancelled during execution with
00498     the @ref cancel method. As immediate actions will all be executed before
00499     @ref execute returns, this is mainly useful for stopping the delayed
00500     actions after an error occurs.
00501 
00502     '''
00503     def __init__(self, *args, **kwargs):
00504         super(Plan, self).__init__(*args, **kwargs)
00505         self._immediates = []
00506         self._laters = []
00507         self._cancelled = False
00508         self._cancel_lock = threading.Lock()
00509         self._complete_flag = threading.Event()
00510 
00511     def __str__(self):
00512         result = ''
00513         for a in self._immediates:
00514             result += '{0}\n'.format(a)
00515         for a in self._laters:
00516             result += '{0}\n'.format(a)
00517         return result[:-1]
00518 
00519     def cancel(self):
00520         '''Cancel execution of this plan.'''
00521         with self._cancel_lock:
00522             self._cancelled = True
00523 
00524     def execute(self, *args, **kwargs):
00525         '''Execute this plan.'''
00526         error = None
00527         for a in self._immediates:
00528             a(*args, **kwargs)
00529         for a in self._laters:
00530             a(*args, **kwargs)
00531         while self._laters and not error:
00532             self._complete_flag.wait()
00533             with self._cancel_lock:
00534                 if self._cancelled:
00535                     for a in self._laters:
00536                         a.cancel()
00537                     break
00538             for a in self._laters:
00539                 if a.error:
00540                     for b in self._laters:
00541                         b.cancel()
00542                     error = a.error
00543                     break
00544             if self._complete_flag.is_set():
00545                 self._laters = [a for a in self._laters if not a.complete]
00546                 self._complete_flag.clear()
00547         for a in self._laters:
00548             a.wait_for_exit()
00549         if error:
00550             raise rts_exceptions.PlanExecutionError(error)
00551 
00552     def make(self, rtsprofile, actions, conds_source, monitor_target):
00553         '''Make a plan from a list of actions and an RTSProfile.'''
00554         all = {}
00555         # First build a dictionary indexed by target for each action
00556         for a in actions:
00557             all[(a.ec_id, a.comp_id, a.instance_name)] = \
00558                     ActionExecutor(action=a, owner_flag=self._complete_flag)
00559         # For each action, find all its conditions and add them to the action's
00560         # executor.
00561         for a in actions:
00562             # First add an executor for each action to a temporary dictionary
00563             conds = self._get_action_conditions(conds_source, a)
00564             if not conds:
00565                 continue
00566             for c in conds:
00567                 target = (c.target_component.id,
00568                           c.target_component.component_id,
00569                           c.target_component.instance_name)
00570                 action = all[target]
00571                 if c.__class__ == rtsprofile.message_sending.Condition:
00572                     # Just a sequencing value
00573                     action.add_condition(BasicCondition(executor=action,
00574                             sequence=c.sequence))
00575                 elif c.__class__ == rtsprofile.message_sending.WaitTime:
00576                     # An action to be executed after a certain amount of time
00577                     action.add_condition(SleepCondition(executor=action,
00578                         wait_time=c.wait_time, sequence=c.sequence))
00579                 elif c.__class__ == rtsprofile.message_sending.Preceding:
00580                     # An action that waits for a previous action to
00581                     # occur/complete.
00582                     if c.sending_timing == 'SYNC':
00583                         # Wait for action to complete
00584                         for p in c.preceding_components:
00585                             desc = 'Sync to {0}'.format(p.instance_name)
00586                             timeout = c.timeout
00587                             if timeout == 0:
00588                                 timeout = None
00589                             mc = MonitorCondition(executor=action,
00590                                     sequence=c.sequence,
00591                                     callback=_make_check_comp_state_cb(rtsprofile, p),
00592                                     target=monitor_target,
00593                                     desc=desc, timeout=timeout)
00594                             action.add_condition(mc)
00595                             target_p = (p.id, p.component_id, p.instance_name)
00596                             if all[target_p].action.optional:
00597                                 print >>sys.stderr, 'Warning: action depends \
00598 on an optional action: "{0}". This may cause a deadlock if the previous \
00599 action\'s component is not present.'.format(desc)
00600                     else:
00601                         # Wait for action to occur
00602                         for p in c.preceding_components:
00603                             desc = "After {0}'s action".format(p.instance_name)
00604                             timeout = c.timeout
00605                             if timeout == 0:
00606                                 timeout = None
00607                             ec = EventCondition(executor=action,
00608                                     sequence=c.sequence, desc=desc,
00609                                     timeout=timeout)
00610                             action.add_condition(ec)
00611                             target_p = (p.id, p.component_id, p.instance_name)
00612                             all[target_p].add_callback(_make_action_cb(ec))
00613                             if all[target_p].action.optional:
00614                                 print >>sys.stderr, 'Warning: action depends \
00615 on an optional action: "{0}". This may cause a deadlock if the previous \
00616 action\'s component is not present.'.format(desc)
00617 
00618         for k, a in all.items():
00619             if a.immediate:
00620                 self._immediates.append(a)
00621             else:
00622                 self._laters.append(a)
00623         self._immediates.sort(key=operator.attrgetter('sort_order'))
00624         self._laters.sort(key=operator.attrgetter('sort_order'))
00625 
00626     def _get_action_conditions(self, conds_source, action):
00627         # Get the corresponding conditions for an action, if any.
00628         result = []
00629         if conds_source and conds_source.targets:
00630             for c in conds_source.targets:
00631                 target = c.target_component
00632                 if target.id == action.ec_id and \
00633                    target.component_id == action.comp_id and \
00634                    target.instance_name == action.instance_name:
00635                     result.append(c)
00636         return result
00637 
00638     def _signal_complete(self):
00639         self._complete_flag.set()
00640 
00641 
00642 # vim: tw=79
00643 


rtshell
Author(s): Geoffrey Biggs
autogenerated on Fri Aug 28 2015 12:55:12