00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 import threading
00020 import traceback
00021 import copy
00022 from contextlib import contextmanager
00023
00024 import smach
00025 import rospy
00026
00027 __all__ = ['Concurrence']
00028
00029 class ssmConcurrence(smach.Concurrence):
00030 """Concurrence Container
00031
00032 This state allows for simple split-join concurrency. The user adds a set of
00033 states which are all executed simultaneously. The concurrent split state
00034 can only transition once all conatained states are ready to transition.
00035
00036 This container can be configured to return a given outcome as a function of
00037 the outcomes of the contained states. This is specified in the constructor
00038 of the class, or after construction with L{Concurrence.add_outcome_map}.
00039
00040 While a concurrence will not terminate until all if its children terminate,
00041 it is possible for it to preempt a subset of states
00042 - All child states terminate
00043 - At least one child state terminates
00044 - A user-defined callback signals termination
00045
00046 Given these causes of termination, the outcome can be determined in four ways:
00047 - A user-defined callback returns an outcome
00048 - A child-outcome map which requires ALL states to terminate is satisfied
00049 - A child-outcome map which requires ONE state to terminate is satisfied
00050 - No maps are satisfied, so the default outcome is returned
00051
00052 The specification of the outcome maps and the outcome callback are
00053 described in the constructor documentation below. More than one policy can
00054 be supplied, and each policy has the potential to not be satisfied. In the
00055 situation in which multiple policies are provided, and a given policy is
00056 not satisfied, the outcome choice precedence is as follows:
00057 - Outcome callback
00058 - First-triggered outcome map
00059 - last-triggered outcome map
00060 - Default outcome
00061
00062 In practive it is best to try to accomplish your task with just ONE outcome
00063 policy.
00064
00065 """
00066 def __init__(self,
00067 outcomes,
00068 input_keys = [],
00069 output_keys = [],
00070 outcome_map = {},
00071 outcome_cb = None,
00072 child_termination_cb = None
00073 ):
00074
00075 input_keys.append("logfile")
00076 output_keys.append("logfile")
00077 outcomes.append('preempt')
00078 default_outcome = 'preempt'
00079 smach.Concurrence.__init__(self, outcomes, default_outcome, input_keys, output_keys, outcome_map, outcome_cb, child_termination_cb)
00080 self._datamodel = {}
00081 self._onEntry = None
00082 self._onExit = None
00083 self._tree_view = None
00084 self._tree_view_Lock = threading.Lock()
00085 self._tree_view_cb = None
00086
00087
00088 def execute(self, parent_ud = smach.UserData()):
00089 """Overridden execute method.
00090 This starts all the threads.
00091 """
00092
00093 self._ready_event.clear()
00094
00095 self._child_outcomes = {}
00096
00097
00098 self._copy_input_keys(parent_ud, self.userdata)
00099
00100
00101 for data in self._datamodel:
00102 if(self._datamodel[data] != ""):
00103 self.userdata[data] = self._datamodel[data]
00104
00105
00106 if(self._onEntry is not None):
00107 try:
00108 self._onEntry.execute(self.userdata)
00109 except Exception as ex:
00110 rospy.logerr('%s::onEntry::execute() raised | %s'
00111 %(self.__class__.__name__,str(ex)))
00112 return "preempt"
00113
00114
00115 smach.loginfo("Concurrence starting with userdata: \n\t%s" %
00116 (str(list(self.userdata.keys()))))
00117
00118
00119 self.call_start_cbs()
00120
00121
00122 for (label, state) in ((k,self._states[k]) for k in self._states):
00123
00124
00125 self._child_outcomes[label] = None
00126 self._threads[label] = threading.Thread(
00127 name='concurrent_split:'+label,
00128 target=self._state_runner,
00129 args=(label,))
00130
00131
00132 for thread in self._threads.values():
00133 thread.start()
00134
00135
00136 self._done_cond.acquire()
00137
00138
00139 self._ready_event.set()
00140
00141
00142 self._done_cond.wait()
00143 self._done_cond.release()
00144
00145
00146 smach.logdebug("SMACH Concurrence preempting running states.")
00147 for label in self._states:
00148 if self._child_outcomes[label] == None:
00149 self._states[label].request_preempt()
00150
00151
00152 while not smach.is_shutdown():
00153 if all([not t.isAlive() for t in self._threads.values()]):
00154 break
00155 self._done_cond.acquire()
00156 self._done_cond.wait(0.1)
00157 self._done_cond.release()
00158
00159
00160 if self._user_code_exception:
00161 self._user_code_exception = False
00162 raise smach.InvalidStateError("A concurrent state raised an exception during execution.")
00163
00164
00165 if self.preempt_requested():
00166
00167 children_preempts_serviced = True
00168
00169
00170 for (label,state) in ((k,self._states[k]) for k in self._states):
00171 if state.preempt_requested():
00172
00173 children_preempts_serviced = False
00174
00175 smach.logwarn("State '%s' in concurrence did not service preempt." % label)
00176
00177 state.recall_preempt()
00178 if children_preempts_serviced:
00179 smach.loginfo("Concurrence serviced preempt.")
00180 self.service_preempt()
00181
00182
00183 smach.loginfo("Concurrent Outcomes: "+str(self._child_outcomes))
00184
00185
00186 outcome = self._default_outcome
00187
00188
00189 smach.logdebug("SMACH Concurrence determining contained state outcomes.")
00190 for (container_outcome, outcomes) in ((k,self._outcome_map[k]) for k in self._outcome_map):
00191 if all([self._child_outcomes[label] == outcomes[label] for label in outcomes]):
00192 smach.logdebug("Terminating concurrent split with mapped outcome.")
00193 outcome = container_outcome
00194
00195
00196 if self._outcome_cb:
00197 try:
00198 cb_outcome = self._outcome_cb(copy.copy(self._child_outcomes))
00199 if cb_outcome:
00200 if cb_outcome == str(cb_outcome):
00201 outcome = cb_outcome
00202 else:
00203 smach.logerr("Outcome callback returned a non-string '%s', using default outcome '%s'" % (str(cb_outcome), self._default_outcome))
00204 else:
00205 smach.logwarn("Outcome callback returned None, using outcome '%s'" % outcome)
00206 except:
00207 raise smach.InvalidUserCodeError(("Could not execute outcome callback '%s': " % self._outcome_cb)+traceback.format_exc())
00208
00209
00210 self._threads = {}
00211 self._child_outcomes = {}
00212
00213
00214 self.call_termination_cbs(list(self._states.keys()), outcome)
00215
00216
00217 if(self._onExit is not None):
00218 try:
00219 outcome = self._onExit.execute(self.userdata,outcome)
00220 except Exception as ex:
00221 rospy.logerr('%s::onExit::execute() raised | %s'
00222 %(self.__class__.__name__,str(ex)))
00223 return "preempt"
00224
00225
00226 self._copy_output_keys(self.userdata, parent_ud)
00227
00228 return outcome
00229
00230
00231 def _state_runner(self,label):
00232 """Runs the states in parallel threads."""
00233
00234
00235 self._ready_event.wait()
00236
00237 self.call_transition_cbs()
00238
00239
00240
00241 self._tree_view_enable_state(label)
00242
00243 try:
00244 self._child_outcomes[label] = self._states[label].execute(smach.Remapper(
00245 self.userdata,
00246 self._states[label].get_registered_input_keys(),
00247 self._states[label].get_registered_output_keys(),
00248 self._remappings[label]))
00249 except:
00250 self._user_code_exception = True
00251 with self._done_cond:
00252 self._done_cond.notify_all()
00253 raise smach.InvalidStateError(("Could not execute child state '%s': " % label)+traceback.format_exc())
00254
00255 self._tree_view_disable_state(label)
00256
00257 if self._child_outcomes[label] is None:
00258 raise smach.InvalidStateError("Concurrent state '%s' returned no outcome on termination." % label)
00259 else:
00260 smach.loginfo("Concurrent state '%s' returned outcome '%s' on termination." % (label, self._child_outcomes[label]))
00261
00262
00263 with self._done_cond:
00264
00265 preempt_others = False
00266
00267 self.call_transition_cbs()
00268
00269 if self._child_termination_cb:
00270 try:
00271 preempt_others = self._child_termination_cb(self._child_outcomes)
00272 except:
00273 raise smach.InvalidUserCodeError("Could not execute child termination callback: "+traceback.format_exc())
00274
00275 for (container_outcome, outcomes) in ((k,self._outcome_map[k]) for k in self._outcome_map):
00276 if all([self._child_outcomes[label] == outcomes[label] for label in outcomes]):
00277 preempt_others = True
00278
00279
00280 if preempt_others or all([o is not None for o in self._child_outcomes.values()]):
00281 self._done_cond.notify_all()
00282
00283
00284 def _create_tree_view(self):
00285 self._tree_view = {}
00286 for child in self.get_children():
00287 self._tree_view[child] = 0
00288
00289 def _tree_view_enable_state(self, label):
00290 if(self._tree_view is not None):
00291 self._tree_view_Lock.acquire()
00292 self._tree_view[label] = 1
00293 self.call_update_tree_view_cb()
00294 self._tree_view_Lock.release()
00295
00296
00297 def _tree_view_disable_state(self, label):
00298 if(self._tree_view is not None):
00299 self._tree_view_Lock.acquire()
00300 self._tree_view[label] = 0
00301 self.call_update_tree_view_cb()
00302 self._tree_view_Lock.release()
00303
00304
00305 def get_tree_view(self):
00306 return self._tree_view
00307
00308 def register_tree_view_cb(self, callback):
00309 self._tree_view_cb = callback
00310
00311 def call_update_tree_view_cb(self):
00312 if(self._tree_view_cb is not None):
00313 try:
00314 self._tree_view_cb()
00315 except:
00316 smach.logerr("Could not execute treeview callback: "+traceback.format_exc())