Package asmach :: Module concurrence

Source Code for Module asmach.concurrence

  1   
  2  import threading 
  3  import traceback 
  4  import copy 
  5  from contextlib import contextmanager 
  6   
  7  import asmach as smach 
  8   
  9  __all__ = ['Concurrence'] 
10 11 -class Concurrence(smach.container.Container):
12 """Concurrence Container 13 14 This state allows for simple split-join concurrency. The user adds a set of 15 states which are all executed simultaneously. The concurrent split state 16 can only transition once all conatained states are ready to transition. 17 18 This container can be configured to return a given outcome as a function of 19 the outcomes of the contained states. This is specified in the constructor 20 of the class, or after construction with L{Concurrence.add_outcome_map}. 21 22 While a concurrence will not terminate until all if its children terminate, 23 it is possible for it to preempt a subset of states 24 - All child states terminate 25 - At least one child state terminates 26 - A user-defined callback signals termination 27 28 Given these causes of termination, the outcome can be determined in four ways: 29 - A user-defined callback returns an outcome 30 - A child-outcome map which requires ALL states to terminate is satisfied 31 - A child-outcome map which requires ONE state to terminate is satisfied 32 - No maps are satisfied, so the default outcome is returned 33 34 The specification of the outcome maps and the outcome callback are 35 described in the constructor documentation below. More than one policy can 36 be supplied, and each policy has the potential to not be satisfied. In the 37 situation in which multiple policies are provided, and a given policy is 38 not satisfied, the outcome choice precedence is as follows: 39 - Outcome callback 40 - First-triggered outcome map 41 - last-triggered outcome map 42 - Default outcome 43 44 In practive it is best to try to accomplish your task with just ONE outcome 45 policy. 46 47 """
48 - def __init__(self, 49 outcomes, 50 default_outcome, 51 input_keys = [], 52 output_keys = [], 53 outcome_map = {}, 54 outcome_cb = None, 55 child_termination_cb = None 56 ):
57 """Constructor for smach Concurrent Split. 58 59 @type outcomes: list of strings 60 @param outcomes: The potential outcomes of this state machine. 61 62 @type default_outcome: string 63 @param default_outcome: The outcome of this state if no elements in the 64 outcome map are satisfied by the outcomes of the contained states. 65 66 67 @type outcome_map: list 68 @param outcome_map: This is an outcome map for determining the 69 outcome of this container. Each outcome of the container is mapped 70 to a dictionary mapping child labels onto outcomes. If none of the 71 child-outcome maps is satisfied, the concurrence will terminate 72 with thhe default outcome. 73 74 For example, if the and_outcome_map is: 75 {'succeeded' : {'FOO':'succeeded', 'BAR':'done'}, 76 'aborted' : {'FOO':'aborted'}} 77 Then the concurrence will terimate with outcome 'succeeded' only if 78 BOTH states 'FOO' and 'BAR' have terminated 79 with outcomes 'succeeded' and 'done', respectively. The outcome 80 'aborted' will be returned by the concurrence if the state 'FOO' 81 returns the outcome 'aborted'. 82 83 If the outcome of a state is not specified, it will be treated as 84 irrelevant to the outcome of the concurrence 85 86 If the criteria for one outcome is the subset of another outcome, 87 the container will choose the outcome which has more child outcome 88 criteria satisfied. If both container outcomes have the same 89 number of satisfied criteria, the behavior is undefined. 90 91 If a more complex outcome policy is required, see the user can 92 provide an outcome callback. See outcome_cb, below. 93 94 @type child_termination_cb: callale 95 @param child_termination_cb: This callback gives the user the ability 96 to force the concurrence to preempt running states given the 97 termination of some other set of states. This is useful when using 98 a concurrence as a monitor container. 99 100 This callback is called each time a child state terminates. It is 101 passed a single argument, a dictionary mapping child state labels 102 onto their outcomes. If a state has not yet terminated, it's dict 103 value will be None. 104 105 This function can return three things: 106 - False: continue blocking on the termination of all other states 107 - True: Preempt all other states 108 - list of state labels: Preempt only the specified states 109 110 I{If you just want the first termination to cause the other children 111 to terminate, the callback (lamda so: True) will always return True.} 112 113 @type outcome_cb: callable 114 @param outcome_cb: If the outcome policy needs to be more complicated 115 than just a conjunction of state outcomes, the user can supply 116 a callback for specifying the outcome of the container. 117 118 This callback is called only once all child states have terminated, 119 and it is passed the dictionary mapping state labels onto their 120 respective outcomes. 121 122 If the callback returns a string, it will treated as the outcome of 123 the container. 124 125 If the callback returns None, the concurrence will first check the 126 outcome_map, and if no outcome in the outcome_map is satisfied, it 127 will return the default outcome. 128 129 B{NOTE: This callback should be a function ONLY of the outcomes of 130 the child states. It should not access any other resources.} 131 132 """ 133 smach.container.Container.__init__(self, outcomes, input_keys, output_keys) 134 135 # List of concurrent states 136 self._states = {} 137 self._threads = {} 138 self._remappings = {} 139 140 if not (default_outcome or outcome_map or outcome_cb): 141 raise smach.InvalidStateError("Concurrence requires an outcome policy") 142 143 # Initialize error string 144 errors = "" 145 146 # Check if default outcome is necessary 147 if default_outcome != str(default_outcome): 148 errors += "\n\tDefault outcome '%s' does not appear to be a string." % str(default_outcome) 149 if default_outcome not in outcomes: 150 errors += "\n\tDefault outcome '%s' is unregistered." % str(default_outcome) 151 152 # Check if outcome maps only contain outcomes that are registered 153 for o in outcome_map.keys(): 154 if o not in outcomes: 155 errors += "\n\tUnregistered outcome '%s' in and_outcome_map." % str(o) 156 157 # Check if outcome cb is callable 158 if outcome_cb and not hasattr(outcome_cb,'__call__'): 159 errors += "\n\tOutcome callback '%s' is not callable." % str(outcome_cb) 160 161 # Check if child termination cb is callable 162 if child_termination_cb and not hasattr(child_termination_cb,'__call__'): 163 errors += "\n\tChild termination callback '%s' is not callable." % str(child_termination_cb) 164 165 # Report errors 166 if len(errors) > 0: 167 raise smach.InvalidStateError("Errors specifying outcome policy of concurrence: %s" % errors) 168 169 # Store outcome policies 170 self._default_outcome = default_outcome 171 self._outcome_map = outcome_map 172 self._outcome_cb = outcome_cb 173 self._child_termination_cb = child_termination_cb 174 self._child_outcomes = {} 175 176 # Condition variables for threading synchronization 177 self._user_code_exception = False 178 self._done_cond = threading.Condition()
179 180 ### Construction methods 181 @staticmethod
182 - def add(label, state, remapping={}):
183 """Add state to the opened concurrence. 184 This state will need to terminate before the concurrence terminates. 185 """ 186 # Get currently opened container 187 self = Concurrence._currently_opened_container() 188 189 # Store state 190 self._states[label] = state 191 self._remappings[label] = remapping 192 193 return state
194 195 ### State interface
196 - def execute(self, parent_ud = smach.UserData()):
197 """Overloaded execute method. 198 This starts all the threads. 199 """ 200 # Reset child outcomes 201 self._child_outcomes = {} 202 203 # Copy input keys 204 self._copy_input_keys(parent_ud, self.userdata) 205 206 # Spew some info 207 smach.loginfo("Concurrence starting with userdata: \n\t%s" % 208 (str(self.userdata.keys()))) 209 210 # Call start callbacks 211 self.call_start_cbs() 212 213 # Create all the threads 214 for (label, state) in self._states.iteritems(): 215 # Initialize child outcomes 216 self._child_outcomes[label] = None 217 self._threads[label] = threading.Thread( 218 name='concurrent_split:'+label, 219 target=self._state_runner, 220 args=(label,)) 221 222 # Launch threads 223 for thread in self._threads.values(): 224 thread.start() 225 226 # Wait for done notification 227 self._done_cond.acquire() 228 self._done_cond.wait() 229 self._done_cond.release() 230 231 # Preempt any running states 232 smach.logdebug("SMACH Concurrence preempting running states.") 233 for label in self._states: 234 if self._child_outcomes[label] == None: 235 self._states[label].request_preempt() 236 237 # Wait for all states to terminate 238 while not smach.is_shutdown(): 239 if all([o is not None for o in self._child_outcomes.values()]): 240 break 241 self._done_cond.acquire() 242 self._done_cond.wait() 243 self._done_cond.release() 244 245 # Check for user code exception 246 if self._user_code_exception: 247 self._user_code_exception = False 248 raise smach.InvalidStateError("A concurrent state raised an exception during execution.") 249 250 # Check for preempt 251 if self.preempt_requested(): 252 # initialized serviced flag 253 children_preempts_serviced = True 254 255 # Service this preempt if 256 for (label,state) in self._states.iteritems(): 257 if state.preempt_requested(): 258 # Reset the flag 259 children_preempts_serviced = False 260 # Complain 261 smach.logwarn("State '%s' in concurrence did not service preempt." % label) 262 # Recall the preempt if it hasn't been serviced 263 state.recall_preempt() 264 if children_preempts_serviced: 265 smach.loginfo("Concurrence serviced preempt.") 266 self.service_preempt() 267 268 # Spew some debyg info 269 smach.loginfo("Concurrent Outcomes: "+str(self._child_outcomes)) 270 271 # Initialize the outcome 272 outcome = self._default_outcome 273 274 # Determine the outcome from the outcome map 275 smach.logdebug("SMACH Concurrence determining contained state outcomes.") 276 for (container_outcome, outcomes) in self._outcome_map.iteritems(): 277 if all([self._child_outcomes[label] == outcomes[label] for label in outcomes.keys()]): 278 smach.logdebug("Terminating concurrent split with mapped outcome.") 279 outcome = container_outcome 280 281 # Check outcome callback 282 if self._outcome_cb: 283 try: 284 cb_outcome = self._outcome_cb(copy.copy(self._child_outcomes)) 285 if cb_outcome: 286 if cb_outcome == str(cb_outcome): 287 outcome = cb_outcome 288 else: 289 smach.logerr("Outcome callback returned a non-string '%s', using default outcome '%s'" % (str(cb_outcome), self._default_outcome)) 290 else: 291 smach.logwarn("Outcome callback returned None, using outcome '%s'" % outcome) 292 except: 293 raise smach.InvalidUserCodeError(("Could not execute outcome callback '%s': " % self._outcome_cb)+traceback.format_exc()) 294 295 # Cleanup 296 self._threads = {} 297 self._child_outcomes = {} 298 299 # Call termination callbacks 300 self.call_termination_cbs(self._states.keys(), outcome) 301 302 # Copy output keys 303 self._copy_output_keys(self.userdata, parent_ud) 304 305 return outcome
306
307 - def request_preempt(self):
308 """Preempt all contained states.""" 309 # Set preempt flag 310 smach.State.request_preempt(self) 311 312 # Notify concurrence that it should preempt running states and terminate 313 with self._done_cond: 314 self._done_cond.notify_all()
315 316
317 - def _state_runner(self,label):
318 """Runs the states in parallel threads.""" 319 self.call_transition_cbs() 320 321 # Execute child state 322 try: 323 self._child_outcomes[label] = self._states[label].execute(smach.Remapper( 324 self.userdata, 325 self._states[label].get_registered_input_keys(), 326 self._states[label].get_registered_output_keys(), 327 self._remappings[label])) 328 except: 329 self._user_code_exception = True 330 with self._done_cond: 331 self._done_cond.notify_all() 332 raise smach.InvalidStateError(("Could not execute child state '%s': " % label)+traceback.format_exc()) 333 334 # Make sure the child returned an outcome 335 if self._child_outcomes[label] is None: 336 raise smach.InvalidStateexception("Concurrent state '%s' returned no outcome on termination." % label) 337 else: 338 smach.loginfo("Concurrent state '%s' returned outcome '%s' on termination." % (label, self._child_outcomes[label])) 339 340 # Check if all of the states have completed 341 with self._done_cond: 342 # initialize preemption flag 343 preempt_others = False 344 # Call transition cb's 345 self.call_transition_cbs() 346 # Call child termination cb if it's defined 347 if self._child_termination_cb: 348 try: 349 preempt_others = self._child_termination_cb(self._child_outcomes) 350 except: 351 raise smach.InvalidUserCodeError("Could not execute child termination callback: "+traceback.format_exc()) 352 353 # Notify the container to terminate (and preempt other states if neceesary) 354 if preempt_others or all([o is not None for o in self._child_outcomes.values()]): 355 self._done_cond.notify_all()
356 357 ### Container interface
358 - def get_children(self):
359 return self._states
360
361 - def __getitem__(self,key):
362 return self._states[key]
363
364 - def get_initial_states(self):
365 return self._states.keys()
366
367 - def set_initial_state(self, initial_states, userdata):
368 if initial_states > 0: 369 if initial_states < len(self._states): 370 logwarn("Attempting to set initial states in Concurrence container, but Concurrence children are always all executed initially") 371 372 # Set local userdata 373 self.userdata.update(userdata)
374
375 - def get_active_states(self):
376 return [label for (label,outcome) in self._child_outcomes.iteritems() if outcome is None]
377
378 - def get_internal_edges(self):
379 int_edges = [] 380 for (container_outcome, outcomes) in self._outcome_map.iteritems(): 381 for (s,o) in outcomes.iteritems(): 382 int_edges.append([o,s,container_outcome]) 383 return int_edges
384
385 - def check_consistency(self):
386 for (co,cso) in self._outcome_map.iteritems(): 387 for state_label,outcome in cso.iteritems(): 388 if outcome not in self._states[state_label].get_registered_outcomes(): 389 raise smach.InvalidTransitionError( 390 'Outcome map in SMACH Concurrence references a state outcome that does not exist. Requested state outcome: \'%s\', but state \'%s\' only has outcomes %s' % 391 (outcome, state_label, str(self._states[state_label].get_registered_outcomes())))
392