1
2 import threading
3 import traceback
4 import copy
5 from contextlib import contextmanager
6
7 import asmach as smach
8
9 __all__ = ['Concurrence']
12 """Concurrence Container
13
14 This state allows for simple split-join concurrency. The user adds a set of
15 states which are all executed simultaneously. The concurrent split state
16 can only transition once all conatained states are ready to transition.
17
18 This container can be configured to return a given outcome as a function of
19 the outcomes of the contained states. This is specified in the constructor
20 of the class, or after construction with L{Concurrence.add_outcome_map}.
21
22 While a concurrence will not terminate until all if its children terminate,
23 it is possible for it to preempt a subset of states
24 - All child states terminate
25 - At least one child state terminates
26 - A user-defined callback signals termination
27
28 Given these causes of termination, the outcome can be determined in four ways:
29 - A user-defined callback returns an outcome
30 - A child-outcome map which requires ALL states to terminate is satisfied
31 - A child-outcome map which requires ONE state to terminate is satisfied
32 - No maps are satisfied, so the default outcome is returned
33
34 The specification of the outcome maps and the outcome callback are
35 described in the constructor documentation below. More than one policy can
36 be supplied, and each policy has the potential to not be satisfied. In the
37 situation in which multiple policies are provided, and a given policy is
38 not satisfied, the outcome choice precedence is as follows:
39 - Outcome callback
40 - First-triggered outcome map
41 - last-triggered outcome map
42 - Default outcome
43
44 In practive it is best to try to accomplish your task with just ONE outcome
45 policy.
46
47 """
48 - def __init__(self,
49 outcomes,
50 default_outcome,
51 input_keys = [],
52 output_keys = [],
53 outcome_map = {},
54 outcome_cb = None,
55 child_termination_cb = None
56 ):
57 """Constructor for smach Concurrent Split.
58
59 @type outcomes: list of strings
60 @param outcomes: The potential outcomes of this state machine.
61
62 @type default_outcome: string
63 @param default_outcome: The outcome of this state if no elements in the
64 outcome map are satisfied by the outcomes of the contained states.
65
66
67 @type outcome_map: list
68 @param outcome_map: This is an outcome map for determining the
69 outcome of this container. Each outcome of the container is mapped
70 to a dictionary mapping child labels onto outcomes. If none of the
71 child-outcome maps is satisfied, the concurrence will terminate
72 with thhe default outcome.
73
74 For example, if the and_outcome_map is:
75 {'succeeded' : {'FOO':'succeeded', 'BAR':'done'},
76 'aborted' : {'FOO':'aborted'}}
77 Then the concurrence will terimate with outcome 'succeeded' only if
78 BOTH states 'FOO' and 'BAR' have terminated
79 with outcomes 'succeeded' and 'done', respectively. The outcome
80 'aborted' will be returned by the concurrence if the state 'FOO'
81 returns the outcome 'aborted'.
82
83 If the outcome of a state is not specified, it will be treated as
84 irrelevant to the outcome of the concurrence
85
86 If the criteria for one outcome is the subset of another outcome,
87 the container will choose the outcome which has more child outcome
88 criteria satisfied. If both container outcomes have the same
89 number of satisfied criteria, the behavior is undefined.
90
91 If a more complex outcome policy is required, see the user can
92 provide an outcome callback. See outcome_cb, below.
93
94 @type child_termination_cb: callale
95 @param child_termination_cb: This callback gives the user the ability
96 to force the concurrence to preempt running states given the
97 termination of some other set of states. This is useful when using
98 a concurrence as a monitor container.
99
100 This callback is called each time a child state terminates. It is
101 passed a single argument, a dictionary mapping child state labels
102 onto their outcomes. If a state has not yet terminated, it's dict
103 value will be None.
104
105 This function can return three things:
106 - False: continue blocking on the termination of all other states
107 - True: Preempt all other states
108 - list of state labels: Preempt only the specified states
109
110 I{If you just want the first termination to cause the other children
111 to terminate, the callback (lamda so: True) will always return True.}
112
113 @type outcome_cb: callable
114 @param outcome_cb: If the outcome policy needs to be more complicated
115 than just a conjunction of state outcomes, the user can supply
116 a callback for specifying the outcome of the container.
117
118 This callback is called only once all child states have terminated,
119 and it is passed the dictionary mapping state labels onto their
120 respective outcomes.
121
122 If the callback returns a string, it will treated as the outcome of
123 the container.
124
125 If the callback returns None, the concurrence will first check the
126 outcome_map, and if no outcome in the outcome_map is satisfied, it
127 will return the default outcome.
128
129 B{NOTE: This callback should be a function ONLY of the outcomes of
130 the child states. It should not access any other resources.}
131
132 """
133 smach.container.Container.__init__(self, outcomes, input_keys, output_keys)
134
135
136 self._states = {}
137 self._threads = {}
138 self._remappings = {}
139
140 if not (default_outcome or outcome_map or outcome_cb):
141 raise smach.InvalidStateError("Concurrence requires an outcome policy")
142
143
144 errors = ""
145
146
147 if default_outcome != str(default_outcome):
148 errors += "\n\tDefault outcome '%s' does not appear to be a string." % str(default_outcome)
149 if default_outcome not in outcomes:
150 errors += "\n\tDefault outcome '%s' is unregistered." % str(default_outcome)
151
152
153 for o in outcome_map.keys():
154 if o not in outcomes:
155 errors += "\n\tUnregistered outcome '%s' in and_outcome_map." % str(o)
156
157
158 if outcome_cb and not hasattr(outcome_cb,'__call__'):
159 errors += "\n\tOutcome callback '%s' is not callable." % str(outcome_cb)
160
161
162 if child_termination_cb and not hasattr(child_termination_cb,'__call__'):
163 errors += "\n\tChild termination callback '%s' is not callable." % str(child_termination_cb)
164
165
166 if len(errors) > 0:
167 raise smach.InvalidStateError("Errors specifying outcome policy of concurrence: %s" % errors)
168
169
170 self._default_outcome = default_outcome
171 self._outcome_map = outcome_map
172 self._outcome_cb = outcome_cb
173 self._child_termination_cb = child_termination_cb
174 self._child_outcomes = {}
175
176
177 self._user_code_exception = False
178 self._done_cond = threading.Condition()
179
180
181 @staticmethod
182 - def add(label, state, remapping={}):
183 """Add state to the opened concurrence.
184 This state will need to terminate before the concurrence terminates.
185 """
186
187 self = Concurrence._currently_opened_container()
188
189
190 self._states[label] = state
191 self._remappings[label] = remapping
192
193 return state
194
195
197 """Overloaded execute method.
198 This starts all the threads.
199 """
200
201 self._child_outcomes = {}
202
203
204 self._copy_input_keys(parent_ud, self.userdata)
205
206
207 smach.loginfo("Concurrence starting with userdata: \n\t%s" %
208 (str(self.userdata.keys())))
209
210
211 self.call_start_cbs()
212
213
214 for (label, state) in self._states.iteritems():
215
216 self._child_outcomes[label] = None
217 self._threads[label] = threading.Thread(
218 name='concurrent_split:'+label,
219 target=self._state_runner,
220 args=(label,))
221
222
223 for thread in self._threads.values():
224 thread.start()
225
226
227 self._done_cond.acquire()
228 self._done_cond.wait()
229 self._done_cond.release()
230
231
232 smach.logdebug("SMACH Concurrence preempting running states.")
233 for label in self._states:
234 if self._child_outcomes[label] == None:
235 self._states[label].request_preempt()
236
237
238 while not smach.is_shutdown():
239 if all([o is not None for o in self._child_outcomes.values()]):
240 break
241 self._done_cond.acquire()
242 self._done_cond.wait()
243 self._done_cond.release()
244
245
246 if self._user_code_exception:
247 self._user_code_exception = False
248 raise smach.InvalidStateError("A concurrent state raised an exception during execution.")
249
250
251 if self.preempt_requested():
252
253 children_preempts_serviced = True
254
255
256 for (label,state) in self._states.iteritems():
257 if state.preempt_requested():
258
259 children_preempts_serviced = False
260
261 smach.logwarn("State '%s' in concurrence did not service preempt." % label)
262
263 state.recall_preempt()
264 if children_preempts_serviced:
265 smach.loginfo("Concurrence serviced preempt.")
266 self.service_preempt()
267
268
269 smach.loginfo("Concurrent Outcomes: "+str(self._child_outcomes))
270
271
272 outcome = self._default_outcome
273
274
275 smach.logdebug("SMACH Concurrence determining contained state outcomes.")
276 for (container_outcome, outcomes) in self._outcome_map.iteritems():
277 if all([self._child_outcomes[label] == outcomes[label] for label in outcomes.keys()]):
278 smach.logdebug("Terminating concurrent split with mapped outcome.")
279 outcome = container_outcome
280
281
282 if self._outcome_cb:
283 try:
284 cb_outcome = self._outcome_cb(copy.copy(self._child_outcomes))
285 if cb_outcome:
286 if cb_outcome == str(cb_outcome):
287 outcome = cb_outcome
288 else:
289 smach.logerr("Outcome callback returned a non-string '%s', using default outcome '%s'" % (str(cb_outcome), self._default_outcome))
290 else:
291 smach.logwarn("Outcome callback returned None, using outcome '%s'" % outcome)
292 except:
293 raise smach.InvalidUserCodeError(("Could not execute outcome callback '%s': " % self._outcome_cb)+traceback.format_exc())
294
295
296 self._threads = {}
297 self._child_outcomes = {}
298
299
300 self.call_termination_cbs(self._states.keys(), outcome)
301
302
303 self._copy_output_keys(self.userdata, parent_ud)
304
305 return outcome
306
308 """Preempt all contained states."""
309
310 smach.State.request_preempt(self)
311
312
313 with self._done_cond:
314 self._done_cond.notify_all()
315
316
318 """Runs the states in parallel threads."""
319 self.call_transition_cbs()
320
321
322 try:
323 self._child_outcomes[label] = self._states[label].execute(smach.Remapper(
324 self.userdata,
325 self._states[label].get_registered_input_keys(),
326 self._states[label].get_registered_output_keys(),
327 self._remappings[label]))
328 except:
329 self._user_code_exception = True
330 with self._done_cond:
331 self._done_cond.notify_all()
332 raise smach.InvalidStateError(("Could not execute child state '%s': " % label)+traceback.format_exc())
333
334
335 if self._child_outcomes[label] is None:
336 raise smach.InvalidStateexception("Concurrent state '%s' returned no outcome on termination." % label)
337 else:
338 smach.loginfo("Concurrent state '%s' returned outcome '%s' on termination." % (label, self._child_outcomes[label]))
339
340
341 with self._done_cond:
342
343 preempt_others = False
344
345 self.call_transition_cbs()
346
347 if self._child_termination_cb:
348 try:
349 preempt_others = self._child_termination_cb(self._child_outcomes)
350 except:
351 raise smach.InvalidUserCodeError("Could not execute child termination callback: "+traceback.format_exc())
352
353
354 if preempt_others or all([o is not None for o in self._child_outcomes.values()]):
355 self._done_cond.notify_all()
356
357
360
362 return self._states[key]
363
365 return self._states.keys()
366
368 if initial_states > 0:
369 if initial_states < len(self._states):
370 logwarn("Attempting to set initial states in Concurrence container, but Concurrence children are always all executed initially")
371
372
373 self.userdata.update(userdata)
374
376 return [label for (label,outcome) in self._child_outcomes.iteritems() if outcome is None]
377
379 int_edges = []
380 for (container_outcome, outcomes) in self._outcome_map.iteritems():
381 for (s,o) in outcomes.iteritems():
382 int_edges.append([o,s,container_outcome])
383 return int_edges
384
386 for (co,cso) in self._outcome_map.iteritems():
387 for state_label,outcome in cso.iteritems():
388 if outcome not in self._states[state_label].get_registered_outcomes():
389 raise smach.InvalidTransitionError(
390 'Outcome map in SMACH Concurrence references a state outcome that does not exist. Requested state outcome: \'%s\', but state \'%s\' only has outcomes %s' %
391 (outcome, state_label, str(self._states[state_label].get_registered_outcomes())))
392