ssm_concurrence.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # Copyright 2015 Airbus
00004 # Copyright 2017 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA)
00005 #
00006 # Licensed under the Apache License, Version 2.0 (the "License");
00007 # you may not use this file except in compliance with the License.
00008 # You may obtain a copy of the License at
00009 #
00010 #   http://www.apache.org/licenses/LICENSE-2.0
00011 #
00012 # Unless required by applicable law or agreed to in writing, software
00013 # distributed under the License is distributed on an "AS IS" BASIS,
00014 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00015 # See the License for the specific language governing permissions and
00016 # limitations under the License.
00017 
00018 
00019 import threading
00020 import traceback
00021 import copy
00022 from contextlib import contextmanager
00023 
00024 import smach
00025 import rospy
00026 
00027 __all__ = ['Concurrence']
00028 
00029 class ssmConcurrence(smach.Concurrence):
00030     """Concurrence Container
00031 
00032     This state allows for simple split-join concurrency. The user adds a set of
00033     states which are all executed simultaneously. The concurrent split state
00034     can only transition once all conatained states are ready to transition.
00035     
00036     This container can be configured to return a given outcome as a function of
00037     the outcomes of the contained states. This is specified in the constructor
00038     of the class, or after construction with L{Concurrence.add_outcome_map}.
00039 
00040     While a concurrence will not terminate until all if its children terminate,
00041     it is possible for it to preempt a subset of states 
00042      - All child states terminate
00043      - At least one child state terminates
00044      - A user-defined callback signals termination
00045 
00046     Given these causes of termination, the outcome can be determined in four ways:
00047      - A user-defined callback returns an outcome
00048      - A child-outcome map which requires ALL states to terminate is satisfied
00049      - A child-outcome map which requires ONE state to terminate is satisfied
00050      - No  maps are satisfied, so the default outcome is returned
00051 
00052     The specification of the outcome maps and the outcome callback are
00053     described in the constructor documentation below. More than one policy can
00054     be supplied, and each policy has the potential to not be satisfied. In the
00055     situation in which multiple policies are provided, and a given policy is
00056     not satisfied, the outcome choice precedence is as follows:
00057      - Outcome callback
00058      - First-triggered outcome map
00059      - last-triggered outcome map
00060      - Default outcome
00061 
00062     In practive it is best to try to accomplish your task with just ONE outcome
00063     policy.
00064 
00065     """
00066     def __init__(self,
00067             outcomes,
00068             input_keys = [],
00069             output_keys = [],
00070             outcome_map = {},
00071             outcome_cb = None,
00072             child_termination_cb = None
00073             ):
00074 
00075         input_keys.append("logfile")
00076         output_keys.append("logfile")
00077         outcomes.append('preempt')
00078         default_outcome = 'preempt'
00079         smach.Concurrence.__init__(self, outcomes, default_outcome, input_keys, output_keys, outcome_map, outcome_cb, child_termination_cb)
00080         self._datamodel = {}
00081         self._onEntry = None
00082         self._onExit  = None
00083         self._tree_view = None
00084         self._tree_view_Lock = threading.Lock()
00085         self._tree_view_cb = None
00086         
00087     ### State interface
00088     def execute(self, parent_ud = smach.UserData()):
00089         """Overridden execute method.
00090         This starts all the threads.
00091         """
00092         # Clear the ready event
00093         self._ready_event.clear()
00094         # Reset child outcomes
00095         self._child_outcomes = {}
00096         
00097         # Copy input keys
00098         self._copy_input_keys(parent_ud, self.userdata)
00099         
00100         ## Copy the datamodel's value into the userData
00101         for data in self._datamodel:
00102             if(self._datamodel[data] != ""):
00103                 self.userdata[data] = self._datamodel[data]
00104         
00105         ## Do the <onentry>
00106         if(self._onEntry is not None):
00107             try:
00108                 self._onEntry.execute(self.userdata)
00109             except Exception as ex:
00110                 rospy.logerr('%s::onEntry::execute() raised | %s'
00111                              %(self.__class__.__name__,str(ex)))
00112                 return "preempt"
00113 
00114         # Spew some info
00115         smach.loginfo("Concurrence starting with userdata: \n\t%s" %
00116                 (str(list(self.userdata.keys()))))
00117 
00118         # Call start callbacks
00119         self.call_start_cbs()
00120 
00121         # Create all the threads
00122         for (label, state) in ((k,self._states[k]) for k in self._states):
00123             # Initialize child outcomes
00124             
00125             self._child_outcomes[label] = None
00126             self._threads[label] = threading.Thread(
00127                     name='concurrent_split:'+label,
00128                     target=self._state_runner,
00129                     args=(label,))
00130 
00131         # Launch threads
00132         for thread in self._threads.values():
00133             thread.start()
00134         
00135         # Wait for done notification
00136         self._done_cond.acquire()
00137         
00138         # Notify all threads ready to go
00139         self._ready_event.set()
00140         
00141         # Wait for a done notification from a thread
00142         self._done_cond.wait()
00143         self._done_cond.release()
00144 
00145         # Preempt any running states
00146         smach.logdebug("SMACH Concurrence preempting running states.")
00147         for label in self._states:
00148             if self._child_outcomes[label] == None:
00149                 self._states[label].request_preempt()
00150 
00151         # Wait for all states to terminate
00152         while not smach.is_shutdown():
00153             if all([not t.isAlive() for t in self._threads.values()]):
00154                 break
00155             self._done_cond.acquire()
00156             self._done_cond.wait(0.1)
00157             self._done_cond.release()
00158 
00159         # Check for user code exception
00160         if self._user_code_exception:
00161             self._user_code_exception = False
00162             raise smach.InvalidStateError("A concurrent state raised an exception during execution.")
00163 
00164         # Check for preempt
00165         if self.preempt_requested():
00166             # initialized serviced flag
00167             children_preempts_serviced = True
00168 
00169             # Service this preempt if 
00170             for (label,state) in ((k,self._states[k]) for k in self._states):
00171                 if state.preempt_requested():
00172                     # Reset the flag
00173                     children_preempts_serviced = False
00174                     # Complain
00175                     smach.logwarn("State '%s' in concurrence did not service preempt." % label) 
00176                     # Recall the preempt if it hasn't been serviced
00177                     state.recall_preempt()
00178             if children_preempts_serviced:
00179                 smach.loginfo("Concurrence serviced preempt.")
00180                 self.service_preempt()
00181 
00182         # Spew some debyg info
00183         smach.loginfo("Concurrent Outcomes: "+str(self._child_outcomes))
00184 
00185         # Initialize the outcome
00186         outcome = self._default_outcome
00187 
00188         # Determine the outcome from the outcome map
00189         smach.logdebug("SMACH Concurrence determining contained state outcomes.")
00190         for (container_outcome, outcomes) in ((k,self._outcome_map[k]) for k in self._outcome_map):
00191             if all([self._child_outcomes[label] == outcomes[label] for label in outcomes]):
00192                 smach.logdebug("Terminating concurrent split with mapped outcome.")
00193                 outcome = container_outcome
00194 
00195         # Check outcome callback
00196         if self._outcome_cb:
00197             try:
00198                 cb_outcome = self._outcome_cb(copy.copy(self._child_outcomes))
00199                 if cb_outcome:
00200                     if cb_outcome == str(cb_outcome):
00201                         outcome = cb_outcome
00202                     else:
00203                         smach.logerr("Outcome callback returned a non-string '%s', using default outcome '%s'" % (str(cb_outcome), self._default_outcome))
00204                 else:
00205                     smach.logwarn("Outcome callback returned None, using outcome '%s'" % outcome)
00206             except:
00207                 raise smach.InvalidUserCodeError(("Could not execute outcome callback '%s': " % self._outcome_cb)+traceback.format_exc())
00208 
00209         # Cleanup
00210         self._threads = {}
00211         self._child_outcomes = {}
00212 
00213         # Call termination callbacks
00214         self.call_termination_cbs(list(self._states.keys()), outcome)
00215         
00216         ## Do the <onexit>    
00217         if(self._onExit is not None):
00218             try:
00219                 outcome = self._onExit.execute(self.userdata,outcome)
00220             except Exception as ex:
00221                 rospy.logerr('%s::onExit::execute() raised | %s'
00222                              %(self.__class__.__name__,str(ex)))
00223                 return "preempt"
00224 
00225         # Copy output keys
00226         self._copy_output_keys(self.userdata, parent_ud)
00227 
00228         return outcome
00229 
00230 
00231     def _state_runner(self,label):
00232         """Runs the states in parallel threads."""
00233 
00234         # Wait until all threads are ready to start before beginnging
00235         self._ready_event.wait()
00236         
00237         self.call_transition_cbs()
00238 
00239         # Execute child state
00240         
00241         self._tree_view_enable_state(label)
00242         
00243         try:
00244             self._child_outcomes[label] = self._states[label].execute(smach.Remapper(
00245                 self.userdata,
00246                 self._states[label].get_registered_input_keys(),
00247                 self._states[label].get_registered_output_keys(),
00248                 self._remappings[label]))
00249         except:
00250             self._user_code_exception = True
00251             with self._done_cond:
00252                 self._done_cond.notify_all()
00253             raise smach.InvalidStateError(("Could not execute child state '%s': " % label)+traceback.format_exc())
00254         
00255         self._tree_view_disable_state(label)
00256         # Make sure the child returned an outcome
00257         if self._child_outcomes[label] is None:
00258             raise smach.InvalidStateError("Concurrent state '%s' returned no outcome on termination." % label)
00259         else:
00260             smach.loginfo("Concurrent state '%s' returned outcome '%s' on termination." % (label, self._child_outcomes[label]))
00261 
00262         # Check if all of the states have completed
00263         with self._done_cond:
00264             # initialize preemption flag
00265             preempt_others = False
00266             # Call transition cb's
00267             self.call_transition_cbs()
00268             # Call child termination cb if it's defined
00269             if self._child_termination_cb:
00270                 try:
00271                     preempt_others = self._child_termination_cb(self._child_outcomes)
00272                 except:
00273                     raise smach.InvalidUserCodeError("Could not execute child termination callback: "+traceback.format_exc())
00274             ## Check if we have finished one outcome
00275             for (container_outcome, outcomes) in ((k,self._outcome_map[k]) for k in self._outcome_map):
00276                 if all([self._child_outcomes[label] == outcomes[label] for label in outcomes]):
00277                     preempt_others = True
00278 
00279             # Notify the container to terminate (and preempt other states if neceesary)
00280             if preempt_others or all([o is not None for o in self._child_outcomes.values()]):
00281                 self._done_cond.notify_all()
00282         
00283         
00284     def _create_tree_view(self):
00285         self._tree_view = {}
00286         for child in self.get_children():
00287             self._tree_view[child]  = 0
00288             
00289     def _tree_view_enable_state(self, label):
00290         if(self._tree_view is not None):
00291             self._tree_view_Lock.acquire()
00292             self._tree_view[label] = 1
00293             self.call_update_tree_view_cb()
00294             self._tree_view_Lock.release()
00295             
00296     
00297     def _tree_view_disable_state(self, label):
00298         if(self._tree_view is not None):
00299             self._tree_view_Lock.acquire()
00300             self._tree_view[label] = 0
00301             self.call_update_tree_view_cb()
00302             self._tree_view_Lock.release()
00303             
00304     
00305     def get_tree_view(self):
00306         return self._tree_view
00307 
00308     def register_tree_view_cb(self, callback):
00309         self._tree_view_cb = callback
00310     
00311     def call_update_tree_view_cb(self):
00312         if(self._tree_view_cb is not None):
00313             try:
00314                 self._tree_view_cb()
00315             except:
00316                 smach.logerr("Could not execute treeview callback: "+traceback.format_exc())


airbus_ssm_core
Author(s): Ludovic Delval
autogenerated on Thu Jun 6 2019 17:59:28