1 import threading
2 import traceback
3 import copy
4 from contextlib import contextmanager
5
6 import smach
7
8 __all__ = ['Concurrence']
11 """Concurrence Container
12
13 This state allows for simple split-join concurrency. The user adds a set of
14 states which are all executed simultaneously. The concurrent split state
15 can only transition once all conatained states are ready to transition.
16
17 This container can be configured to return a given outcome as a function of
18 the outcomes of the contained states. This is specified in the constructor
19 of the class, or after construction with L{Concurrence.add_outcome_map}.
20
21 While a concurrence will not terminate until all if its children terminate,
22 it is possible for it to preempt a subset of states
23 - All child states terminate
24 - At least one child state terminates
25 - A user-defined callback signals termination
26
27 Given these causes of termination, the outcome can be determined in four ways:
28 - A user-defined callback returns an outcome
29 - A child-outcome map which requires ALL states to terminate is satisfied
30 - A child-outcome map which requires ONE state to terminate is satisfied
31 - No maps are satisfied, so the default outcome is returned
32
33 The specification of the outcome maps and the outcome callback are
34 described in the constructor documentation below. More than one policy can
35 be supplied, and each policy has the potential to not be satisfied. In the
36 situation in which multiple policies are provided, and a given policy is
37 not satisfied, the outcome choice precedence is as follows:
38 - Outcome callback
39 - First-triggered outcome map
40 - last-triggered outcome map
41 - Default outcome
42
43 In practive it is best to try to accomplish your task with just ONE outcome
44 policy.
45
46 """
47 - def __init__(self,
48 outcomes,
49 default_outcome,
50 input_keys = [],
51 output_keys = [],
52 outcome_map = {},
53 outcome_cb = None,
54 child_termination_cb = None
55 ):
56 """Constructor for smach Concurrent Split.
57
58 @type outcomes: list of strings
59 @param outcomes: The potential outcomes of this state machine.
60
61 @type default_outcome: string
62 @param default_outcome: The outcome of this state if no elements in the
63 outcome map are satisfied by the outcomes of the contained states.
64
65
66 @type outcome_map: list
67 @param outcome_map: This is an outcome map for determining the
68 outcome of this container. Each outcome of the container is mapped
69 to a dictionary mapping child labels onto outcomes. If none of the
70 child-outcome maps is satisfied, the concurrence will terminate
71 with thhe default outcome.
72
73 For example, if the and_outcome_map is:
74 {'succeeded' : {'FOO':'succeeded', 'BAR':'done'},
75 'aborted' : {'FOO':'aborted'}}
76 Then the concurrence will terimate with outcome 'succeeded' only if
77 BOTH states 'FOO' and 'BAR' have terminated
78 with outcomes 'succeeded' and 'done', respectively. The outcome
79 'aborted' will be returned by the concurrence if the state 'FOO'
80 returns the outcome 'aborted'.
81
82 If the outcome of a state is not specified, it will be treated as
83 irrelevant to the outcome of the concurrence
84
85 If the criteria for one outcome is the subset of another outcome,
86 the container will choose the outcome which has more child outcome
87 criteria satisfied. If both container outcomes have the same
88 number of satisfied criteria, the behavior is undefined.
89
90 If a more complex outcome policy is required, see the user can
91 provide an outcome callback. See outcome_cb, below.
92
93 @type child_termination_cb: callale
94 @param child_termination_cb: This callback gives the user the ability
95 to force the concurrence to preempt running states given the
96 termination of some other set of states. This is useful when using
97 a concurrence as a monitor container.
98
99 This callback is called each time a child state terminates. It is
100 passed a single argument, a dictionary mapping child state labels
101 onto their outcomes. If a state has not yet terminated, it's dict
102 value will be None.
103
104 This function can return three things:
105 - False: continue blocking on the termination of all other states
106 - True: Preempt all other states
107 - list of state labels: Preempt only the specified states
108
109 I{If you just want the first termination to cause the other children
110 to terminate, the callback (lamda so: True) will always return True.}
111
112 @type outcome_cb: callable
113 @param outcome_cb: If the outcome policy needs to be more complicated
114 than just a conjunction of state outcomes, the user can supply
115 a callback for specifying the outcome of the container.
116
117 This callback is called only once all child states have terminated,
118 and it is passed the dictionary mapping state labels onto their
119 respective outcomes.
120
121 If the callback returns a string, it will treated as the outcome of
122 the container.
123
124 If the callback returns None, the concurrence will first check the
125 outcome_map, and if no outcome in the outcome_map is satisfied, it
126 will return the default outcome.
127
128 B{NOTE: This callback should be a function ONLY of the outcomes of
129 the child states. It should not access any other resources.}
130
131 """
132 smach.container.Container.__init__(self, outcomes, input_keys, output_keys)
133
134
135 self._states = {}
136 self._threads = {}
137 self._remappings = {}
138
139 if not (default_outcome or outcome_map or outcome_cb):
140 raise smach.InvalidStateError("Concurrence requires an outcome policy")
141
142
143 errors = ""
144
145
146 if default_outcome != str(default_outcome):
147 errors += "\n\tDefault outcome '%s' does not appear to be a string." % str(default_outcome)
148 if default_outcome not in outcomes:
149 errors += "\n\tDefault outcome '%s' is unregistered." % str(default_outcome)
150
151
152 for o in outcome_map:
153 if o not in outcomes:
154 errors += "\n\tUnregistered outcome '%s' in and_outcome_map." % str(o)
155
156
157 if outcome_cb and not hasattr(outcome_cb,'__call__'):
158 errors += "\n\tOutcome callback '%s' is not callable." % str(outcome_cb)
159
160
161 if child_termination_cb and not hasattr(child_termination_cb,'__call__'):
162 errors += "\n\tChild termination callback '%s' is not callable." % str(child_termination_cb)
163
164
165 if len(errors) > 0:
166 raise smach.InvalidStateError("Errors specifying outcome policy of concurrence: %s" % errors)
167
168
169 self._default_outcome = default_outcome
170 self._outcome_map = outcome_map
171 self._outcome_cb = outcome_cb
172 self._child_termination_cb = child_termination_cb
173 self._child_outcomes = {}
174
175
176 self._user_code_exception = False
177 self._done_cond = threading.Condition()
178 self._ready_event = threading.Event()
179
180
181 @staticmethod
182 - def add(label, state, remapping={}):
183 """Add state to the opened concurrence.
184 This state will need to terminate before the concurrence terminates.
185 """
186
187 self = Concurrence._currently_opened_container()
188
189
190 self._states[label] = state
191 self._remappings[label] = remapping
192
193 return state
194
195
197 """Overridden execute method.
198 This starts all the threads.
199 """
200
201 if len(self._states) == 0:
202 raise smach.InvalidStateError("No states was added to concurrence")
203
204
205 self._ready_event.clear()
206
207
208 self._child_outcomes = {}
209
210
211 self._copy_input_keys(parent_ud, self.userdata)
212
213
214 smach.loginfo("Concurrence starting with userdata: \n\t%s" %
215 (str(list(self.userdata.keys()))))
216
217
218 self.call_start_cbs()
219
220
221 for (label, state) in ((k,self._states[k]) for k in self._states):
222
223 self._child_outcomes[label] = None
224 self._threads[label] = threading.Thread(
225 name='concurrent_split:'+label,
226 target=self._state_runner,
227 args=(label,))
228
229
230 for thread in self._threads.values():
231 thread.start()
232
233
234 self._done_cond.acquire()
235
236
237 self._ready_event.set()
238
239
240 self._done_cond.wait()
241 self._done_cond.release()
242
243
244 smach.logdebug("SMACH Concurrence preempting running states.")
245 for label in self._states:
246 if self._child_outcomes[label] == None:
247 self._states[label].request_preempt()
248
249
250 while not smach.is_shutdown():
251 if all([not t.isAlive() for t in self._threads.values()]):
252 break
253 self._done_cond.acquire()
254 self._done_cond.wait(0.1)
255 self._done_cond.release()
256
257
258 if self._user_code_exception:
259 self._user_code_exception = False
260 raise smach.InvalidStateError("A concurrent state raised an exception during execution.")
261
262
263 if self.preempt_requested():
264
265 children_preempts_serviced = True
266
267
268 for (label,state) in ((k,self._states[k]) for k in self._states):
269 if state.preempt_requested():
270
271 children_preempts_serviced = False
272
273 smach.logwarn("State '%s' in concurrence did not service preempt." % label)
274
275 state.recall_preempt()
276 if children_preempts_serviced:
277 smach.loginfo("Concurrence serviced preempt.")
278 self.service_preempt()
279
280
281 smach.loginfo("Concurrent Outcomes: "+str(self._child_outcomes))
282
283
284 outcome = self._default_outcome
285
286
287 smach.logdebug("SMACH Concurrence determining contained state outcomes.")
288 for (container_outcome, outcomes) in ((k,self._outcome_map[k]) for k in self._outcome_map):
289 if all([self._child_outcomes[label] == outcomes[label] for label in outcomes]):
290 smach.logdebug("Terminating concurrent split with mapped outcome.")
291 outcome = container_outcome
292
293
294 if self._outcome_cb:
295 try:
296 cb_outcome = self._outcome_cb(copy.copy(self._child_outcomes))
297 if cb_outcome:
298 if cb_outcome == str(cb_outcome):
299 outcome = cb_outcome
300 else:
301 smach.logerr("Outcome callback returned a non-string '%s', using default outcome '%s'" % (str(cb_outcome), self._default_outcome))
302 else:
303 smach.logwarn("Outcome callback returned None, using outcome '%s'" % outcome)
304 except:
305 raise smach.InvalidUserCodeError(("Could not execute outcome callback '%s': " % self._outcome_cb)+traceback.format_exc())
306
307
308 self._threads = {}
309 self._child_outcomes = {}
310
311
312 self.call_termination_cbs(list(self._states.keys()), outcome)
313
314
315 self._copy_output_keys(self.userdata, parent_ud)
316
317 return outcome
318
320 """Preempt all contained states."""
321
322 smach.State.request_preempt(self)
323
324
325 with self._done_cond:
326 self._done_cond.notify_all()
327
328
330 """Runs the states in parallel threads."""
331
332
333 self._ready_event.wait()
334
335 self.call_transition_cbs()
336
337
338 try:
339 self._child_outcomes[label] = self._states[label].execute(smach.Remapper(
340 self.userdata,
341 self._states[label].get_registered_input_keys(),
342 self._states[label].get_registered_output_keys(),
343 self._remappings[label]))
344 except:
345 self._user_code_exception = True
346 with self._done_cond:
347 self._done_cond.notify_all()
348 raise smach.InvalidStateError(("Could not execute child state '%s': " % label)+traceback.format_exc())
349
350
351 if self._child_outcomes[label] is None:
352 raise smach.InvalidStateError("Concurrent state '%s' returned no outcome on termination." % label)
353 else:
354 smach.loginfo("Concurrent state '%s' returned outcome '%s' on termination." % (label, self._child_outcomes[label]))
355
356
357 with self._done_cond:
358
359 preempt_others = False
360
361 self.call_transition_cbs()
362
363 if self._child_termination_cb:
364 try:
365 preempt_others = self._child_termination_cb(self._child_outcomes)
366 except:
367 raise smach.InvalidUserCodeError("Could not execute child termination callback: "+traceback.format_exc())
368
369
370 if preempt_others or all([o is not None for o in self._child_outcomes.values()]):
371 self._done_cond.notify_all()
372
373
376
378 return self._states[key]
379
381 return list(self._states.keys())
382
384 if initial_states > 0:
385 if initial_states < len(self._states):
386 smach.logwarn("Attempting to set initial states in Concurrence"
387 " container, but Concurrence children are always"
388 " all executed initially, ignoring call.")
389
390
391 self.userdata.update(userdata)
392
394 return [label for (label,outcome) in ((k,self._child_outcomes[k]) for k in self._child_outcomes) if outcome is None]
395
397 int_edges = []
398 for (container_outcome, outcomes) in ((k,self._outcome_map[k]) for k in self._outcome_map):
399 for state_key in outcomes:
400 int_edges.append((outcomes[state_key], state_key, container_outcome))
401 return int_edges
402
404 for (co,cso) in ((k,self._outcome_map[k]) for k in self._outcome_map):
405 for state_label,outcome in ((k,cso[k]) for k in cso):
406 if outcome not in self._states[state_label].get_registered_outcomes():
407 raise smach.InvalidTransitionError(
408 'Outcome map in SMACH Concurrence references a state outcome that does not exist. Requested state outcome: \'%s\', but state \'%s\' only has outcomes %s' %
409 (outcome, state_label, str(self._states[state_label].get_registered_outcomes())))
410