Package asmach :: Module state

Source Code for Module asmach.state

  1   
  2  import threading 
  3  import traceback 
  4   
  5  import asmach as smach 
  6   
  7  __all__ = ['State','CBState'] 
  8               
  9  import sys 
 10       
11 -class State(object):
12 """Base class for SMACH states. 13 14 A SMACH state interacts with SMACH containers in two ways. The first is its 15 outcome identifier, and the second is the set of userdata variables which 16 it reads from and writes to at runtime. Both of these interactions are 17 declared before the state goes active (when its C{execute()} method is 18 called) and are checked during construction. 19 """
20 - def __init__(self, outcomes = [], input_keys = [], output_keys = [], io_keys = []):
21 """State constructor 22 @type outcomes: array of strings 23 @param outcomes: Custom outcomes for this state. 24 25 @type input_keys: array of strings 26 @param input_keys: The userdata keys from which this state might read 27 at runtime. 28 29 @type output_keys: array of strings 30 @param output_keys: The userdata keys to which this state might write 31 at runtime. 32 33 @type io_keys: array of strings 34 @param io_keys: The userdata keys to which this state might write or 35 from which it might read at runtime. 36 """ 37 # Store outcomes 38 self._outcomes = set(outcomes) 39 40 # Store userdata interface description 41 self._input_keys = set(input_keys+io_keys) 42 self._output_keys = set(output_keys+io_keys) 43 44 # Declare preempt flag 45 self._preempt_requested = False
46 47 ### Asynchronous 48
49 - def execute_async(self, continuation, resume, ud):
50 raise NotImplementedError() 51 yield
52 53 ### SMACH Interface API
54 - def register_outcomes(self,new_outcomes):
55 """Add outcomes to the outcome set.""" 56 self._outcomes = self._outcomes.union(new_outcomes)
57
58 - def get_registered_outcomes(self):
59 """Get a list of registered outcomes. 60 @rtype: tuple of string 61 @return: Tuple of registered outcome strings. 62 """ 63 return tuple(self._outcomes)
64 65 ### Userdata API
66 - def register_io_keys(self, keys):
67 """Add keys to the set of keys from which this state may read and write. 68 @type keys: list of strings 69 @param keys: List of keys which may be read from and written to when this 70 state is active. 71 """ 72 self._input_keys = self._input_keys.union(keys) 73 self._ouput_keys = self._output_keys.union(keys)
74
75 - def register_input_keys(self, keys):
76 """Add keys to the set of keys from which this state may read. 77 @type keys: list of strings 78 @param keys: List of keys which may be read from when this state is 79 active. 80 """ 81 self._input_keys = self._input_keys.union(keys)
82
84 """Get a tuple of registered input keys.""" 85 return tuple(self._input_keys)
86
87 - def register_output_keys(self, keys):
88 """Add keys to the set of keys to which this state may write. 89 @type keys: list of strings 90 @param keys: List of keys which may be written to when this state is 91 active. 92 """ 93 self._output_keys = self._output_keys.union(keys)
94
96 """Get a tuple of registered output keys.""" 97 return tuple(self._output_keys)
98 99 ### Preemption interface
100 - def request_preempt(self):
101 """Sets preempt_requested to True""" 102 self._preempt_requested = True
103
104 - def service_preempt(self):
105 """Sets preempt_requested to False""" 106 self._preempt_requested = False
107
108 - def recall_preempt(self):
109 """Sets preempt_requested to False""" 110 self._preempt_requested = False
111
112 - def preempt_requested(self):
113 """True if a preempt has been requested.""" 114 return self._preempt_requested
115
116 -class CBState(State):
117 - def __init__(self, cb, cb_args=[], cb_kwargs={}, outcomes=[], input_keys=[], output_keys=[], io_keys=[]):
118 """Create s state from a single function. 119 120 @type outcomes: array of strings 121 @param outcomes: Custom outcomes for this state. 122 123 @type input_keys: array of strings 124 @param input_keys: The userdata keys from which this state might read 125 at runtime. 126 127 @type output_keys: array of strings 128 @param output_keys: The userdata keys to which this state might write 129 at runtime. 130 131 @type io_keys: array of strings 132 @param io_keys: The userdata keys to which this state might write or 133 from which it might read at runtime. 134 """ 135 State.__init__(self, outcomes, input_keys, output_keys, io_keys) 136 self._cb = cb 137 self._cb_args = cb_args 138 self._cb_kwargs = cb_kwargs 139 140 if smach.util.has_smach_interface(cb): 141 self._cb_input_keys = cb.get_registered_input_keys() 142 self._cb_output_keys = cb.get_registered_output_keys() 143 self._cb_outcomes = cb.get_registered_outcomes() 144 145 self.register_input_keys(self._cb_input_keys) 146 self.register_output_keys(self._cb_output_keys) 147 self.register_outcomes(self._cb_outcomes)
148
149 - def execute(self, ud):
150 return self._cb(ud, *self._cb_args, **self._cb_kwargs)
151