Package asmach :: Module util

Source Code for Module asmach.util

  1   
  2  import roslib; roslib.load_manifest('smach') 
  3  import asmach as smach 
  4   
  5  import threading 
  6   
  7  import asmach as smach 
  8   
  9  __all__ = ['is_shutdown','set_shutdown_cb',\ 
 10          'cb_interface','has_smach_interface','CBInterface'] 
 11   
12 -def is_shutdown():
13 return False
14
15 -def set_shutdown_check(cb):
16 smach.is_shutdown = cb
17
18 -def has_smach_interface(obj):
19 """Returns True if the object has SMACH interface accessors.""" 20 return hasattr(obj,'get_registered_input_keys')\ 21 and hasattr(obj,'get_registered_output_keys')\ 22 and hasattr(obj,'get_registered_outcomes')
23 24 # Callback decorator for describing userdata
25 -class cb_interface(object):
26 - def __init__(self, outcomes=[], input_keys=[], output_keys=[]):
27 self._outcomes = outcomes 28 self._input_keys = input_keys 29 self._output_keys = output_keys
30
31 - def __call__(self, cb):
32 return CBInterface(cb, self._outcomes, self._input_keys, self._output_keys)
33 -class CBInterface(object):
34 """Decorator to describe the extension of a state's SMACH userdata and outcome interface. 35 36 Some SMACH states can be extended with the use of user callbacks. Since 37 the SMACH interface and SMACH userdata are strictly controlled, the ways in 38 which these callbacks interact with SMACH must be delcared. This decorator 39 allows this information to be attached to a given callback function. 40 41 If a callback adds a potential outcome to a state, suppose 'critical_failure', 42 then one could write this when defining the callback: 43 44 >>> import asmach as smach 45 >>> @smach.cb_interface(outcomes=['critical_failure']) 46 >>> def my_cb(x,y,z): 47 >>> # User code 48 >>> return 'critical_failure' 49 50 Suppose a state retrieves data that it passes into a callback. If the user 51 wants to take that data and put some of all of it into userdata, this 52 interface must be declared. In this case, the user could write: 53 54 >>> import asmach as smach 55 >>> @smach.cb_interface(output_keys=['processed_res']) 56 >>> def my_cb(ud, data): 57 >>> ud.processed_res = data 58 59 """
60 - def __init__(self, cb, outcomes=[], input_keys=[], output_keys=[], io_keys=[]):
61 """Describe callback SMACH interface. 62 63 @type outcomes: array of strings 64 @param outcomes: Custom outcomes for this state. 65 66 @type input_keys: array of strings 67 @param input_keys: The userdata keys from which this state might read 68 at runtime. 69 70 @type output_keys: array of strings 71 @param output_keys: The userdata keys to which this state might write 72 at runtime. 73 74 @type io_keys: array of strings 75 @param io_keys: The userdata keys to which this state might write or 76 from which it might read at runtime. 77 """ 78 79 self._input_keys = set(input_keys) 80 self._input_keys.union(io_keys) 81 82 self._output_keys = set(output_keys) 83 self._output_keys.union(io_keys) 84 85 self._outcomes = outcomes 86 87 self._cb = cb
88
89 - def __call__(self, *args, **kwargs):
90 return self._cb(*args, **kwargs)
91 92 ### SMACH Interface API
94 """Get a tuple of registered input keys.""" 95 return tuple(self._input_keys)
97 """Get a tuple of registered output keys.""" 98 return tuple(self._output_keys)
99 - def get_registered_outcomes(self):
100 """Get a list of registered outcomes. 101 @rtype: tuple of string 102 @return: Tuple of registered outcome strings. 103 """ 104 return tuple(self._outcomes)
105