Package smach :: Module concurrence

Source Code for Module smach.concurrence

  1  import threading 
  2  import traceback 
  3  import copy 
  4  from contextlib import contextmanager 
  5   
  6  import smach 
  7   
  8  __all__ = ['Concurrence'] 
9 10 -class Concurrence(smach.container.Container):
11 """Concurrence Container 12 13 This state allows for simple split-join concurrency. The user adds a set of 14 states which are all executed simultaneously. The concurrent split state 15 can only transition once all conatained states are ready to transition. 16 17 This container can be configured to return a given outcome as a function of 18 the outcomes of the contained states. This is specified in the constructor 19 of the class, or after construction with L{Concurrence.add_outcome_map}. 20 21 While a concurrence will not terminate until all if its children terminate, 22 it is possible for it to preempt a subset of states 23 - All child states terminate 24 - At least one child state terminates 25 - A user-defined callback signals termination 26 27 Given these causes of termination, the outcome can be determined in four ways: 28 - A user-defined callback returns an outcome 29 - A child-outcome map which requires ALL states to terminate is satisfied 30 - A child-outcome map which requires ONE state to terminate is satisfied 31 - No maps are satisfied, so the default outcome is returned 32 33 The specification of the outcome maps and the outcome callback are 34 described in the constructor documentation below. More than one policy can 35 be supplied, and each policy has the potential to not be satisfied. In the 36 situation in which multiple policies are provided, and a given policy is 37 not satisfied, the outcome choice precedence is as follows: 38 - Outcome callback 39 - First-triggered outcome map 40 - last-triggered outcome map 41 - Default outcome 42 43 In practive it is best to try to accomplish your task with just ONE outcome 44 policy. 45 46 """
47 - def __init__(self, 48 outcomes, 49 default_outcome, 50 input_keys = [], 51 output_keys = [], 52 outcome_map = {}, 53 outcome_cb = None, 54 child_termination_cb = None 55 ):
56 """Constructor for smach Concurrent Split. 57 58 @type outcomes: list of strings 59 @param outcomes: The potential outcomes of this state machine. 60 61 @type default_outcome: string 62 @param default_outcome: The outcome of this state if no elements in the 63 outcome map are satisfied by the outcomes of the contained states. 64 65 66 @type outcome_map: list 67 @param outcome_map: This is an outcome map for determining the 68 outcome of this container. Each outcome of the container is mapped 69 to a dictionary mapping child labels onto outcomes. If none of the 70 child-outcome maps is satisfied, the concurrence will terminate 71 with thhe default outcome. 72 73 For example, if the and_outcome_map is: 74 {'succeeded' : {'FOO':'succeeded', 'BAR':'done'}, 75 'aborted' : {'FOO':'aborted'}} 76 Then the concurrence will terimate with outcome 'succeeded' only if 77 BOTH states 'FOO' and 'BAR' have terminated 78 with outcomes 'succeeded' and 'done', respectively. The outcome 79 'aborted' will be returned by the concurrence if the state 'FOO' 80 returns the outcome 'aborted'. 81 82 If the outcome of a state is not specified, it will be treated as 83 irrelevant to the outcome of the concurrence 84 85 If the criteria for one outcome is the subset of another outcome, 86 the container will choose the outcome which has more child outcome 87 criteria satisfied. If both container outcomes have the same 88 number of satisfied criteria, the behavior is undefined. 89 90 If a more complex outcome policy is required, see the user can 91 provide an outcome callback. See outcome_cb, below. 92 93 @type child_termination_cb: callale 94 @param child_termination_cb: This callback gives the user the ability 95 to force the concurrence to preempt running states given the 96 termination of some other set of states. This is useful when using 97 a concurrence as a monitor container. 98 99 This callback is called each time a child state terminates. It is 100 passed a single argument, a dictionary mapping child state labels 101 onto their outcomes. If a state has not yet terminated, it's dict 102 value will be None. 103 104 This function can return three things: 105 - False: continue blocking on the termination of all other states 106 - True: Preempt all other states 107 - list of state labels: Preempt only the specified states 108 109 I{If you just want the first termination to cause the other children 110 to terminate, the callback (lamda so: True) will always return True.} 111 112 @type outcome_cb: callable 113 @param outcome_cb: If the outcome policy needs to be more complicated 114 than just a conjunction of state outcomes, the user can supply 115 a callback for specifying the outcome of the container. 116 117 This callback is called only once all child states have terminated, 118 and it is passed the dictionary mapping state labels onto their 119 respective outcomes. 120 121 If the callback returns a string, it will treated as the outcome of 122 the container. 123 124 If the callback returns None, the concurrence will first check the 125 outcome_map, and if no outcome in the outcome_map is satisfied, it 126 will return the default outcome. 127 128 B{NOTE: This callback should be a function ONLY of the outcomes of 129 the child states. It should not access any other resources.} 130 131 """ 132 smach.container.Container.__init__(self, outcomes, input_keys, output_keys) 133 134 # List of concurrent states 135 self._states = {} 136 self._threads = {} 137 self._remappings = {} 138 139 if not (default_outcome or outcome_map or outcome_cb): 140 raise smach.InvalidStateError("Concurrence requires an outcome policy") 141 142 # Initialize error string 143 errors = "" 144 145 # Check if default outcome is necessary 146 if default_outcome != str(default_outcome): 147 errors += "\n\tDefault outcome '%s' does not appear to be a string." % str(default_outcome) 148 if default_outcome not in outcomes: 149 errors += "\n\tDefault outcome '%s' is unregistered." % str(default_outcome) 150 151 # Check if outcome maps only contain outcomes that are registered 152 for o in outcome_map: 153 if o not in outcomes: 154 errors += "\n\tUnregistered outcome '%s' in and_outcome_map." % str(o) 155 156 # Check if outcome cb is callable 157 if outcome_cb and not hasattr(outcome_cb,'__call__'): 158 errors += "\n\tOutcome callback '%s' is not callable." % str(outcome_cb) 159 160 # Check if child termination cb is callable 161 if child_termination_cb and not hasattr(child_termination_cb,'__call__'): 162 errors += "\n\tChild termination callback '%s' is not callable." % str(child_termination_cb) 163 164 # Report errors 165 if len(errors) > 0: 166 raise smach.InvalidStateError("Errors specifying outcome policy of concurrence: %s" % errors) 167 168 # Store outcome policies 169 self._default_outcome = default_outcome 170 self._outcome_map = outcome_map 171 self._outcome_cb = outcome_cb 172 self._child_termination_cb = child_termination_cb 173 self._child_outcomes = {} 174 175 # Condition variables for threading synchronization 176 self._user_code_exception = False 177 self._done_cond = threading.Condition() 178 self._ready_event = threading.Event()
179 180 ### Construction methods 181 @staticmethod
182 - def add(label, state, remapping={}):
183 """Add state to the opened concurrence. 184 This state will need to terminate before the concurrence terminates. 185 """ 186 # Get currently opened container 187 self = Concurrence._currently_opened_container() 188 189 # Store state 190 self._states[label] = state 191 self._remappings[label] = remapping 192 193 return state
194 195 ### State interface
196 - def execute(self, parent_ud = smach.UserData()):
197 """Overridden execute method. 198 This starts all the threads. 199 """ 200 # Check if any states added 201 if len(self._states) == 0: 202 raise smach.InvalidStateError("No states was added to concurrence") 203 204 # Clear the ready event 205 self._ready_event.clear() 206 207 # Reset child outcomes 208 self._child_outcomes = {} 209 210 # Copy input keys 211 self._copy_input_keys(parent_ud, self.userdata) 212 213 # Spew some info 214 smach.loginfo("Concurrence starting with userdata: \n\t%s" % 215 (str(list(self.userdata.keys())))) 216 217 # Call start callbacks 218 self.call_start_cbs() 219 220 # Create all the threads 221 for (label, state) in ((k,self._states[k]) for k in self._states): 222 # Initialize child outcomes 223 self._child_outcomes[label] = None 224 self._threads[label] = threading.Thread( 225 name='concurrent_split:'+label, 226 target=self._state_runner, 227 args=(label,)) 228 229 # Launch threads 230 for thread in self._threads.values(): 231 thread.start() 232 233 # Wait for done notification 234 self._done_cond.acquire() 235 236 # Notify all threads ready to go 237 self._ready_event.set() 238 239 # Wait for a done notification from a thread 240 self._done_cond.wait() 241 self._done_cond.release() 242 243 # Preempt any running states 244 smach.logdebug("SMACH Concurrence preempting running states.") 245 for label in self._states: 246 if self._child_outcomes[label] == None: 247 self._states[label].request_preempt() 248 249 # Wait for all states to terminate 250 while not smach.is_shutdown(): 251 if all([not t.isAlive() for t in self._threads.values()]): 252 break 253 self._done_cond.acquire() 254 self._done_cond.wait(0.1) 255 self._done_cond.release() 256 257 # Check for user code exception 258 if self._user_code_exception: 259 self._user_code_exception = False 260 raise smach.InvalidStateError("A concurrent state raised an exception during execution.") 261 262 # Check for preempt 263 if self.preempt_requested(): 264 # initialized serviced flag 265 children_preempts_serviced = True 266 267 # Service this preempt if 268 for (label,state) in ((k,self._states[k]) for k in self._states): 269 if state.preempt_requested(): 270 # Reset the flag 271 children_preempts_serviced = False 272 # Complain 273 smach.logwarn("State '%s' in concurrence did not service preempt." % label) 274 # Recall the preempt if it hasn't been serviced 275 state.recall_preempt() 276 if children_preempts_serviced: 277 smach.loginfo("Concurrence serviced preempt.") 278 self.service_preempt() 279 280 # Spew some debyg info 281 smach.loginfo("Concurrent Outcomes: "+str(self._child_outcomes)) 282 283 # Initialize the outcome 284 outcome = self._default_outcome 285 286 # Determine the outcome from the outcome map 287 smach.logdebug("SMACH Concurrence determining contained state outcomes.") 288 for (container_outcome, outcomes) in ((k,self._outcome_map[k]) for k in self._outcome_map): 289 if all([self._child_outcomes[label] == outcomes[label] for label in outcomes]): 290 smach.logdebug("Terminating concurrent split with mapped outcome.") 291 outcome = container_outcome 292 293 # Check outcome callback 294 if self._outcome_cb: 295 try: 296 cb_outcome = self._outcome_cb(copy.copy(self._child_outcomes)) 297 if cb_outcome: 298 if cb_outcome == str(cb_outcome): 299 outcome = cb_outcome 300 else: 301 smach.logerr("Outcome callback returned a non-string '%s', using default outcome '%s'" % (str(cb_outcome), self._default_outcome)) 302 else: 303 smach.logwarn("Outcome callback returned None, using outcome '%s'" % outcome) 304 except: 305 raise smach.InvalidUserCodeError(("Could not execute outcome callback '%s': " % self._outcome_cb)+traceback.format_exc()) 306 307 # Cleanup 308 self._threads = {} 309 self._child_outcomes = {} 310 311 # Call termination callbacks 312 self.call_termination_cbs(list(self._states.keys()), outcome) 313 314 # Copy output keys 315 self._copy_output_keys(self.userdata, parent_ud) 316 317 return outcome
318
319 - def request_preempt(self):
320 """Preempt all contained states.""" 321 # Set preempt flag 322 smach.State.request_preempt(self) 323 324 # Notify concurrence that it should preempt running states and terminate 325 with self._done_cond: 326 self._done_cond.notify_all()
327 328
329 - def _state_runner(self,label):
330 """Runs the states in parallel threads.""" 331 332 # Wait until all threads are ready to start before beginnging 333 self._ready_event.wait() 334 335 self.call_transition_cbs() 336 337 # Execute child state 338 try: 339 self._child_outcomes[label] = self._states[label].execute(smach.Remapper( 340 self.userdata, 341 self._states[label].get_registered_input_keys(), 342 self._states[label].get_registered_output_keys(), 343 self._remappings[label])) 344 except: 345 self._user_code_exception = True 346 with self._done_cond: 347 self._done_cond.notify_all() 348 raise smach.InvalidStateError(("Could not execute child state '%s': " % label)+traceback.format_exc()) 349 350 # Make sure the child returned an outcome 351 if self._child_outcomes[label] is None: 352 raise smach.InvalidStateError("Concurrent state '%s' returned no outcome on termination." % label) 353 else: 354 smach.loginfo("Concurrent state '%s' returned outcome '%s' on termination." % (label, self._child_outcomes[label])) 355 356 # Check if all of the states have completed 357 with self._done_cond: 358 # initialize preemption flag 359 preempt_others = False 360 # Call transition cb's 361 self.call_transition_cbs() 362 # Call child termination cb if it's defined 363 if self._child_termination_cb: 364 try: 365 preempt_others = self._child_termination_cb(self._child_outcomes) 366 except: 367 raise smach.InvalidUserCodeError("Could not execute child termination callback: "+traceback.format_exc()) 368 369 # Notify the container to terminate (and preempt other states if neceesary) 370 if preempt_others or all([o is not None for o in self._child_outcomes.values()]): 371 self._done_cond.notify_all()
372 373 ### Container interface
374 - def get_children(self):
375 return self._states
376
377 - def __getitem__(self,key):
378 return self._states[key]
379
380 - def get_initial_states(self):
381 return list(self._states.keys())
382
383 - def set_initial_state(self, initial_states, userdata):
384 if initial_states > 0: 385 if initial_states < len(self._states): 386 smach.logwarn("Attempting to set initial states in Concurrence" 387 " container, but Concurrence children are always" 388 " all executed initially, ignoring call.") 389 390 # Set local userdata 391 self.userdata.update(userdata)
392
393 - def get_active_states(self):
394 return [label for (label,outcome) in ((k,self._child_outcomes[k]) for k in self._child_outcomes) if outcome is None]
395
396 - def get_internal_edges(self):
397 int_edges = [] 398 for (container_outcome, outcomes) in ((k,self._outcome_map[k]) for k in self._outcome_map): 399 for state_key in outcomes: 400 int_edges.append((outcomes[state_key], state_key, container_outcome)) 401 return int_edges
402
403 - def check_consistency(self):
404 for (co,cso) in ((k,self._outcome_map[k]) for k in self._outcome_map): 405 for state_label,outcome in ((k,cso[k]) for k in cso): 406 if outcome not in self._states[state_label].get_registered_outcomes(): 407 raise smach.InvalidTransitionError( 408 'Outcome map in SMACH Concurrence references a state outcome that does not exist. Requested state outcome: \'%s\', but state \'%s\' only has outcomes %s' % 409 (outcome, state_label, str(self._states[state_label].get_registered_outcomes())))
410