00001
00002
00003 '''rtshell
00004
00005 Copyright (C) 2009-2014
00006 Geoffrey Biggs
00007 RT-Synthesis Research Group
00008 Intelligent Systems Research Institute,
00009 National Institute of Advanced Industrial Science and Technology (AIST),
00010 Japan
00011 All rights reserved.
00012 Licensed under the Eclipse Public License -v 1.0 (EPL)
00013 http://www.opensource.org/licenses/eclipse-1.0.txt
00014
00015 Creation and execution of state change plans.
00016
00017 '''
00018
00019
00020 import operator
00021 import os
00022 import rtctree.path
00023 import rtsprofile.message_sending
00024 import sys
00025 import thread
00026 import threading
00027 import time
00028 import traceback
00029 import types
00030
00031 import rts_exceptions
00032
00033
00034 class Counter:
00035 def __init__(self):
00036 self._counter = 1
00037
00038 @property
00039 def value(self):
00040 self._counter += 1
00041 return self._counter - 1
00042 ider = Counter()
00043
00044
00045
00046
00047
00048 class ActionExecutor(threading.Thread):
00049 '''An object for managing execution of an action.
00050
00051 This object is capable of executing an action immediately when it is
00052 called, or delaying execution until a set of conditions is reached. Which
00053 should occur is chosen automatically. If execution needs to be delayed, a
00054 new thread will be started to manage it. This thread will sleep until all
00055 pre-conditions for the action are met. Any number of condition objects can
00056 be added.
00057
00058 Callbacks can be added to be executed after executing the action.
00059
00060 '''
00061 def __init__(self, action=None, owner_flag=None, *args, **kwargs):
00062 super(ActionExecutor, self).__init__(*args, **kwargs)
00063 self._action = action
00064 self._callbacks = []
00065 self._conditions = []
00066 self._cancelled = False
00067 self._cancel_lock = threading.Lock()
00068 self._error = None
00069 self._err_lock = threading.Lock()
00070 self._flag = threading.Event()
00071 self._completed = False
00072 self._completed_lock = threading.Lock()
00073 self._id = ider.value
00074 self._owner_flag = owner_flag
00075
00076 def __str__(self):
00077 result = self.id_string + ' '
00078 if self._conditions:
00079 result += '[{0}'.format(self._conditions[0])
00080 for c in self._conditions[1:]:
00081 result += ', {0}'.format(c)
00082 result += '] '
00083 return result + str(self._action)
00084
00085 def __call__(self, *args, **kwargs):
00086 self._args = args
00087 self._kwargs = kwargs
00088 if self.immediate:
00089 self._execute_action()
00090 else:
00091 self._start_conds()
00092 self.start()
00093
00094 def add_callback(self, callback):
00095 '''Add a new callback to be executed after the action.'''
00096 self._callbacks.append(callback)
00097
00098 def add_condition(self, condition):
00099 '''Add a new condition on the action's execution.'''
00100 self._conditions.append(condition)
00101
00102 def cancel(self):
00103 '''Cancel this action.'''
00104 with self._cancel_lock:
00105 self._cancelled = True
00106 self.set()
00107
00108 def set(self):
00109 '''Notify this action executor that a condition may have been met.'''
00110 self._flag.set()
00111
00112 def wait_for_exit(self):
00113 '''Wait for this action executor to exit.'''
00114 if self.immediate:
00115
00116 return
00117 self.join()
00118
00119 @property
00120 def action(self):
00121 '''The action object that will be executed.'''
00122 return self._action
00123
00124 @action.setter
00125 def action(self, action):
00126 self._action = action
00127
00128 @property
00129 def complete(self):
00130 '''Has this action completed yet?'''
00131 with self._completed_lock:
00132 return self._completed
00133
00134 @property
00135 def error(self):
00136 '''Any error that has occurred in this executor or one of its
00137 conditions.'''
00138 with self._err_lock:
00139 return self._error
00140
00141 @property
00142 def id(self):
00143 '''ID of this action.'''
00144 return self._id
00145
00146 @property
00147 def id_string(self):
00148 '''ID of this action as a string.'''
00149 return '{' + '{0}'.format(self._id) + '}'
00150
00151 @property
00152 def immediate(self):
00153 '''Tests if this executor can execute immediately.
00154
00155 If this executor has any conditions that are not immediate, the
00156 executor itself is not immediate.
00157
00158 '''
00159 for c in self._conditions:
00160 if not c.immediate:
00161 return False
00162 return True
00163
00164 @property
00165 def sort_order(self):
00166 '''The integer order for this action in plans.
00167
00168 Generally, all actions will have a sequence number. For those that
00169 don't, this property will be negative. Actions should be executed in
00170 the order of their sequence number, from smaller to bigger.
00171
00172 '''
00173 if not self._conditions:
00174 return -1
00175 return min([c.sequence for c in self._conditions])
00176
00177 def run(self):
00178 cons_satisfied = False
00179 if not self._conditions:
00180
00181 cons_satisfied = True
00182 while not cons_satisfied:
00183 for c in self._conditions:
00184 if c.error:
00185
00186 self._error = c.error
00187 if self._owner_flag:
00188 self._owner_flag.set()
00189 return
00190 self._flag.wait()
00191 self._flag.clear()
00192 with self._cancel_lock:
00193 if self._cancelled:
00194 self._cancel_conditions()
00195 return
00196 self._reduce_conds()
00197 if not self._conditions:
00198
00199 cons_satisfied = True
00200 self._execute_action()
00201 if self._owner_flag:
00202 self._owner_flag.set()
00203 with self._completed_lock:
00204 self._completed = True
00205
00206 def _cancel_conditions(self):
00207 for c in self._conditions:
00208 c.cancel()
00209 for c in self._conditions:
00210 c.wait_for_exit()
00211
00212 def _do_callbacks(self):
00213 for c in self._callbacks:
00214 c(*self._args, **self._kwargs)
00215
00216 def _execute_action(self):
00217 print >>sys.stderr, 'Executing {0} {1}'.format(self.id_string,
00218 self._action)
00219 self._action(*self._args, **self._kwargs)
00220 self._do_callbacks()
00221
00222 def _reduce_conds(self):
00223 result = []
00224 for c in self._conditions:
00225 if c.satisfied:
00226 c.wait_for_exit()
00227 else:
00228 result.append(c)
00229 continue
00230 self._conditions = result
00231
00232 def _start_conds(self):
00233 for c in self._conditions:
00234 if not c.immediate:
00235 c.set_args(*self._args, **self._kwargs)
00236 c.start()
00237
00238
00239
00240
00241
00242 class BasicCondition(object):
00243 '''A simple condition specifying just a sequence ordering.
00244
00245 This condition is immediate.
00246
00247 All other condition types should inherit from this type, and must implement
00248 the @ref start method.
00249
00250 '''
00251 def __init__(self, executor=None, sequence=-1, desc='', *args, **kwargs):
00252 super(BasicCondition, self).__init__()
00253 self._executor = executor
00254 self._sequence = sequence
00255 self._desc = desc
00256 self._error = None
00257 self._err_lock = threading.Lock()
00258 self._immediate = True
00259 self._satisfied = True
00260 self._sat_lock = threading.Lock()
00261
00262 def __str__(self):
00263 return 'Order {0}'.format(self._sequence)
00264
00265 def cancel(self):
00266 return
00267
00268 def set_args(self, *args, **kwargs):
00269 self._args = args
00270 self._kwargs = kwargs
00271
00272 def start(self):
00273 return
00274
00275 def wait_for_exit(self):
00276 '''Wait for this condition's thread to exit.'''
00277
00278 return
00279
00280 @property
00281 def error(self):
00282 '''Any error that has occurred in this condition.'''
00283 with self._err_lock:
00284 return self._error
00285
00286 @property
00287 def immediate(self):
00288 '''Will this condition be satisfied immediately?'''
00289 return self._immediate
00290
00291 @property
00292 def satisfied(self):
00293 '''Is this condition satisfied?'''
00294 with self._sat_lock:
00295 return self._satisfied
00296
00297 @property
00298 def sequence(self):
00299 '''The sequence order of this condition.'''
00300 return self._sequence
00301
00302
00303 class SleepCondition(BasicCondition):
00304 '''A condition that waits for a period of time before being satisfied.
00305
00306 This condition is delayed.
00307
00308 This condition is essentially a sleep. It starts a threaded timer, which
00309 sleeps for the given period of time (in ms) before waking up and setting
00310 the condition to satisfied.
00311
00312 '''
00313 def __init__(self, wait_time=0, *args, **kwargs):
00314 super(SleepCondition, self).__init__(wait_time=wait_time, *args, **kwargs)
00315 self._wait_time_ms = wait_time
00316 self._wait_time = wait_time / 1000.0
00317 self._immediate = False
00318 self._satisfied = False
00319 self._timer = None
00320
00321 def __str__(self):
00322 return super(SleepCondition, self).__str__() + \
00323 '/Wait {0}ms'.format(self._wait_time_ms)
00324
00325 def cancel(self):
00326 self._timer.cancel()
00327 self._timer.join()
00328
00329 def satisfy(self):
00330 with self._sat_lock:
00331 self._satisfied = True
00332 self._executor.set()
00333
00334 def start(self):
00335 self._timer = threading.Timer(self._wait_time, self.satisfy)
00336 self._timer.start()
00337
00338
00339 class DelayedCondition(BasicCondition, threading.Thread):
00340 '''Base class for delayed conditions.
00341
00342 Inheriting condition objects should implement the @ref check method,
00343 returning True or False appropriately.
00344
00345 Delayed conditions start a separate thread, which they use to perform their
00346 condition check at an appropriate time. They use the reference to their
00347 owner to signal it that the condition has been met. If their condition is
00348 not met within an optional timeout (specified in ms), @ref
00349 PrecedingTimeoutError is set. Set the timeout to None for no timeout.
00350
00351 Once a delayed condition is satisfied, you should ensure its thread has
00352 completed by calling @ref wait_for_exit.
00353
00354 '''
00355 def __init__(self, timeout=None, *args, **kwargs):
00356 super(DelayedCondition, self).__init__(timeout=timeout, *args, **kwargs)
00357 self._immediate = False
00358 self._satisfied = False
00359 self._cancelled = False
00360 self._cancel_lock = threading.Lock()
00361 self._timeout = timeout / 1000.0
00362
00363 def __str__(self):
00364 return super(DelayedCondition, self).__str__() + \
00365 '/' + self._desc
00366
00367 def cancel(self):
00368 with self._cancel_lock:
00369 self._cancelled = True
00370
00371 def start(self):
00372 threading.Thread.start(self)
00373
00374 def wait_for_exit(self):
00375 self.join()
00376
00377 def run(self):
00378 self._start_time = time.time()
00379 while True:
00380 try:
00381 satisfied = self._check()
00382 except Exception, e:
00383 self._set_error(traceback.format_exc())
00384 break
00385 with self._cancel_lock:
00386 if self._cancelled:
00387 return
00388 with self._sat_lock:
00389 self._satisfied = satisfied
00390 if satisfied:
00391
00392 self._executor.set()
00393 break
00394 if type(self._timeout) is not types.NoneType:
00395
00396 if self._check_timeout() <= 0.0:
00397 self._set_error(
00398 rts_exceptions.PrecedingTimeoutError(self._desc))
00399 return
00400
00401 def _check_timeout(self):
00402 diff = time.time() - self._start_time
00403 return self._timeout - diff
00404
00405 def _set_error(self, e):
00406 with self._err_lock:
00407 self._error = e
00408
00409 self._executor.set()
00410
00411
00412 class EventCondition(DelayedCondition):
00413 '''A condition that waits for an event.
00414
00415 This condition is delayed.
00416
00417 This condition waits on a @ref threading.Event object. It uses a separate
00418 thread to perform the wait; it will sleep until its event is set. When the
00419 event is set, it wakes up and notifies its executor, then exits. If the
00420 event is not set within an optional timeout (specified in ms), @ref
00421 PrecedingTimeoutError is set. Set timeout to None for no timeout.
00422
00423 '''
00424 def __init__(self, *args, **kwargs):
00425 super(EventCondition, self).__init__(*args, **kwargs)
00426 self._event = threading.Event()
00427
00428 def cancel(self):
00429 self._event.set()
00430 super(EventCondition, self).cancel()
00431
00432 def set(self):
00433 self._event.set()
00434
00435 def _check(self):
00436 self._event.wait(self._timeout)
00437 if self._event.is_set():
00438 return True
00439 return False
00440
00441
00442 class MonitorCondition(DelayedCondition):
00443 '''A condition that monitors a state.
00444
00445 This condition is delayed.
00446
00447 This condition continuously monitors the state of a callback function. When
00448 the callback's return value matches a provided target value, the condition
00449 is satisfied. If this does not occur within an optional timeout (specified
00450 in ms), @ref PrecedingTimeoutError is set. Set timeout to None for no
00451 timeout. The callback will be called at the frequency specified, in Hertz.
00452
00453 The callback will be passed all the arguments that get passed to @ref
00454 set_args. It should accept any arguments it needs, as well as *args and
00455 **kwargs.
00456
00457 '''
00458 def __init__(self, callback=None, target=None, freq=100, *args, **kwargs):
00459 super(MonitorCondition, self).__init__(callback=callback,
00460 target=target, freq=freq, *args, **kwargs)
00461 self._callback = callback
00462 self._target = target
00463 self._sleep_time = 1.0 / freq
00464
00465 def _check(self):
00466 if self._callback(*self._args, **self._kwargs) == self._target:
00467 return True
00468 time.sleep(self._sleep_time)
00469 return False
00470
00471
00472
00473
00474
00475 def _make_check_comp_state_cb(rtsprofile, target_comp):
00476 def cb(rtctree=None, *args, **kwargs):
00477 comp = rtsprofile.find_comp_by_target(target_comp)
00478 path = '/' + comp.path_uri
00479 comp = rtctree.get_node(rtctree.path.parse_path(path)[0])
00480 return comp.refresh_state_in_ec(comp.get_ec_index(target_comp.id))
00481 return cb
00482
00483 def _make_action_cb(target_ec):
00484 def cb(*args, **kwargs):
00485 target_ec.set()
00486 return cb
00487
00488 class Plan(object):
00489 '''A plan for changing the state of an RT System.
00490
00491 A plan has two sets of actions to perform. The first is stored in a sorted
00492 list; it is all actions that are to be executed immediately. The second
00493 set is stored in a separate list, also sorted. This set contains actions
00494 that will be executed at a later point in time, based on some condition.
00495 Many of these will execute on their own threads.
00496
00497 To execute the plan, call it. A plan can be cancelled during execution with
00498 the @ref cancel method. As immediate actions will all be executed before
00499 @ref execute returns, this is mainly useful for stopping the delayed
00500 actions after an error occurs.
00501
00502 '''
00503 def __init__(self, *args, **kwargs):
00504 super(Plan, self).__init__(*args, **kwargs)
00505 self._immediates = []
00506 self._laters = []
00507 self._cancelled = False
00508 self._cancel_lock = threading.Lock()
00509 self._complete_flag = threading.Event()
00510
00511 def __str__(self):
00512 result = ''
00513 for a in self._immediates:
00514 result += '{0}\n'.format(a)
00515 for a in self._laters:
00516 result += '{0}\n'.format(a)
00517 return result[:-1]
00518
00519 def cancel(self):
00520 '''Cancel execution of this plan.'''
00521 with self._cancel_lock:
00522 self._cancelled = True
00523
00524 def execute(self, *args, **kwargs):
00525 '''Execute this plan.'''
00526 error = None
00527 for a in self._immediates:
00528 a(*args, **kwargs)
00529 for a in self._laters:
00530 a(*args, **kwargs)
00531 while self._laters and not error:
00532 self._complete_flag.wait()
00533 with self._cancel_lock:
00534 if self._cancelled:
00535 for a in self._laters:
00536 a.cancel()
00537 break
00538 for a in self._laters:
00539 if a.error:
00540 for b in self._laters:
00541 b.cancel()
00542 error = a.error
00543 break
00544 if self._complete_flag.is_set():
00545 self._laters = [a for a in self._laters if not a.complete]
00546 self._complete_flag.clear()
00547 for a in self._laters:
00548 a.wait_for_exit()
00549 if error:
00550 raise rts_exceptions.PlanExecutionError(error)
00551
00552 def make(self, rtsprofile, actions, conds_source, monitor_target):
00553 '''Make a plan from a list of actions and an RTSProfile.'''
00554 all = {}
00555
00556 for a in actions:
00557 all[(a.ec_id, a.comp_id, a.instance_name)] = \
00558 ActionExecutor(action=a, owner_flag=self._complete_flag)
00559
00560
00561 for a in actions:
00562
00563 conds = self._get_action_conditions(conds_source, a)
00564 if not conds:
00565 continue
00566 for c in conds:
00567 target = (c.target_component.id,
00568 c.target_component.component_id,
00569 c.target_component.instance_name)
00570 action = all[target]
00571 if c.__class__ == rtsprofile.message_sending.Condition:
00572
00573 action.add_condition(BasicCondition(executor=action,
00574 sequence=c.sequence))
00575 elif c.__class__ == rtsprofile.message_sending.WaitTime:
00576
00577 action.add_condition(SleepCondition(executor=action,
00578 wait_time=c.wait_time, sequence=c.sequence))
00579 elif c.__class__ == rtsprofile.message_sending.Preceding:
00580
00581
00582 if c.sending_timing == 'SYNC':
00583
00584 for p in c.preceding_components:
00585 desc = 'Sync to {0}'.format(p.instance_name)
00586 timeout = c.timeout
00587 if timeout == 0:
00588 timeout = None
00589 mc = MonitorCondition(executor=action,
00590 sequence=c.sequence,
00591 callback=_make_check_comp_state_cb(rtsprofile, p),
00592 target=monitor_target,
00593 desc=desc, timeout=timeout)
00594 action.add_condition(mc)
00595 target_p = (p.id, p.component_id, p.instance_name)
00596 if all[target_p].action.optional:
00597 print >>sys.stderr, 'Warning: action depends \
00598 on an optional action: "{0}". This may cause a deadlock if the previous \
00599 action\'s component is not present.'.format(desc)
00600 else:
00601
00602 for p in c.preceding_components:
00603 desc = "After {0}'s action".format(p.instance_name)
00604 timeout = c.timeout
00605 if timeout == 0:
00606 timeout = None
00607 ec = EventCondition(executor=action,
00608 sequence=c.sequence, desc=desc,
00609 timeout=timeout)
00610 action.add_condition(ec)
00611 target_p = (p.id, p.component_id, p.instance_name)
00612 all[target_p].add_callback(_make_action_cb(ec))
00613 if all[target_p].action.optional:
00614 print >>sys.stderr, 'Warning: action depends \
00615 on an optional action: "{0}". This may cause a deadlock if the previous \
00616 action\'s component is not present.'.format(desc)
00617
00618 for k, a in all.items():
00619 if a.immediate:
00620 self._immediates.append(a)
00621 else:
00622 self._laters.append(a)
00623 self._immediates.sort(key=operator.attrgetter('sort_order'))
00624 self._laters.sort(key=operator.attrgetter('sort_order'))
00625
00626 def _get_action_conditions(self, conds_source, action):
00627
00628 result = []
00629 if conds_source and conds_source.targets:
00630 for c in conds_source.targets:
00631 target = c.target_component
00632 if target.id == action.ec_id and \
00633 target.component_id == action.comp_id and \
00634 target.instance_name == action.instance_name:
00635 result.append(c)
00636 return result
00637
00638 def _signal_complete(self):
00639 self._complete_flag.set()
00640
00641
00642
00643