ssm_scxml_interpreter.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 # Copyright 2015 Airbus
00004 # Copyright 2017 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA)
00005 #
00006 # Licensed under the Apache License, Version 2.0 (the "License");
00007 # you may not use this file except in compliance with the License.
00008 # You may obtain a copy of the License at
00009 #
00010 #   http://www.apache.org/licenses/LICENSE-2.0
00011 #
00012 # Unless required by applicable law or agreed to in writing, software
00013 # distributed under the License is distributed on an "AS IS" BASIS,
00014 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00015 # See the License for the specific language governing permissions and
00016 # limitations under the License.
00017 
00018 
00019 
00020 import ssm_state
00021 import ssm_state_machine
00022 import ssm_concurrence
00023 import ssm_on
00024 import ssm_skill_provider
00025 
00026 import rospy
00027 import smach_ros
00028 import re
00029 
00030 
00031 from roslib.packages import get_pkg_dir
00032 import xml.etree.ElementTree as etree
00033 
00034 # RE
00035 RE_PREFIXED_TAG = "$"
00036 RE_PREFIXED_BEGIN_TAG = "${"
00037 RE_PREFIXED_END_TAG = "}"
00038 
00039 def get_pkg_dir_from_prefix(path, ros_package_path=None):
00040     
00041     if RE_PREFIXED_TAG not in path:
00042         return path
00043     
00044     splitpath = path.split(RE_PREFIXED_END_TAG)
00045     after_prefix_path = splitpath[-1]
00046     prefix = splitpath[0].split(RE_PREFIXED_BEGIN_TAG)[-1]
00047     rpath = get_pkg_dir(prefix)+after_prefix_path
00048     
00049     return rpath
00050         
00051 
00052 class ssmInterpreter:
00053     
00054     def __init__(self, file):
00055         self.StateDict =  None
00056         file_ = open(file).read()
00057         file_ = re.sub(' xmlns="[^"]+"', '', file_, count=1)
00058         self.root = etree.fromstring(file_)
00059         self.skillProvider = None
00060         self.CheckBool = True
00061         
00062         self._next_parent_list = []
00063         self._current_parent_list = []
00064         self._next_SM_list = []
00065         self._current_SM_list = []
00066         
00067         if(self.root is None):
00068             rospy.logerr("[SCXML Interpreter] : Error in root !!!!")
00069             self.CheckBool = False
00070         else:
00071             pass
00072 
00073     def convertSCXML(self):
00074         '''
00075             First we import the skills
00076             Then we construct the state machine
00077             We return the constructed state machine
00078         '''
00079         
00080         skill = self.root.find("datamodel/data[@id='skill_file']")
00081         if(skill is None):
00082             rospy.logerr("[SCXML Interpreter] No Skill Register found !!")
00083             self.CheckBool = False
00084             return
00085         else:
00086             try:
00087                 self.skillProvider = ssm_skill_provider.SkillProvider(get_pkg_dir_from_prefix(str(skill.attrib['expr'])))
00088             except Exception as ex:
00089                 rospy.logerr(ex)
00090                 rospy.logerr('[SCXML Interpreter] Skill register XML not found.')
00091                 self.CheckBool = False
00092             
00093         return self.constructSSM()
00094     
00095     def constructSSM(self):       
00096         '''
00097             Construct the mainStateMachine then all the state by recursive level construction (0, then all level 1, then all level 2)
00098         '''
00099         #Find the outcomes of the main state machine
00100         outcomes_=[]
00101         final_states = self.get_final_states_id(self.root)
00102         if(final_states == None):
00103             return
00104         else:
00105             ##Get all possible outcomes but still checking if the transition is consistent
00106             for final_id in final_states:
00107                 for transition in self.root.findall("./*/transition[@target='"+str(final_id)+"']"):
00108                     event = transition.attrib.get('event')
00109                     if(event is None):
00110                         rospy.logerr("[SCXML Interpreter] : No Transition Event for %s !!!!" % transition)
00111                         self.CheckBool = False
00112                         return
00113                     target = transition.attrib.get('target')
00114                     if(target is None):
00115                         rospy.logerr("[SCXML Interpreter] : No Transition Target for %s !!!!" % transition)
00116                         self.CheckBool = False
00117                         return
00118                     if(target == final_id):
00119                         target = event
00120                     outcomes_.append(event)
00121         ##Init the new smach state machine
00122         SSM = ssm_state_machine.ssmMainStateMachine(outcomes = outcomes_)
00123         SSM.userdata.logfile = ""
00124         '''
00125             Setup the data from the scxml
00126         '''
00127         datamodel = self.root.find('datamodel')
00128         if(datamodel is None):
00129             rospy.logwarn("[SCXML Interpreter] : No Datamodel detected !!!!")
00130         else:
00131             for data in datamodel:
00132                 data_ID = data.attrib['id']
00133                 data_expr = data.attrib['expr']
00134                 SSM.userdata[data_ID] = data_expr
00135         
00136         '''
00137             Read the states in the scxml
00138             If there is a problem in a state the interpreter will return and failed
00139         '''
00140         ##Setupt the initial state for the intial state machine
00141         if(self.root.attrib.get('initial') == None):
00142             rospy.logerr("[SCXML Interpreter] : No Initial State in the SCXML !!!!" )
00143             self.CheckBool = False
00144             return
00145         SSM._initial_state_label = self.root.attrib.get('initial')
00146         ##Init variables for interations
00147         finish_ = False
00148         self._current_parent_list = ['.']
00149         self._current_SM_list = [SSM]
00150         
00151         ##Main loop
00152         while(finish_ == False):
00153             ##Construct the actual level
00154             
00155             self._next_parent_list=[]
00156             self._next_SM_list = [] 
00157             for i_parent in range(len(self._current_parent_list)):
00158                 parent = self._current_parent_list[i_parent]
00159                 current_SM = self._current_SM_list[i_parent]
00160                 #open the current SM for construction
00161                 current_SM.open()
00162                 ##Check for final states if not a parallel state
00163                 if(type(current_SM) is not ssm_concurrence.ssmConcurrence):
00164                    final_states = self.get_final_states_id(self.root.find(parent))
00165                    if(final_states is None):
00166                         self.CheckBool = False
00167                         try:
00168                             current_SM.check_consistency()
00169                         except Exception as ex:
00170                             raise ex
00171                         current_SM.close()
00172                         return None
00173                 #List all states and all parallel states
00174                 list_state = self.root.findall(parent+'/state')
00175                 list_parallel = self.root.findall(parent+'/parallel')
00176                 
00177                 ##Parallel
00178                 for parallel in list_parallel:
00179                     ##Construct parallel state
00180                     self.constructParallel(parallel, parent, current_SM, SSM, final_states)
00181                     if(self.CheckBool == False):
00182                         try:
00183                             current_SM.check_consistency()
00184                         except Exception as ex:
00185                             raise ex
00186                         current_SM.close()
00187                         return None
00188 
00189                 for state in list_state:
00190                     if(state.find('state') is not None):
00191                         self.constructCompoundState(state, parent, current_SM, SSM, final_states)  
00192                     else:
00193                         self.constructSimpleState(state, current_SM, SSM, final_states)
00194                     if(self.CheckBool == False):
00195                         try:
00196                             current_SM.check_consistency()
00197                         except Exception as ex:
00198                             raise ex
00199                         current_SM.close()
00200                         return None
00201                     
00202                 
00203                 ##Current level is finished
00204                 #close the current SM for construction
00205                 try:
00206                     current_SM.check_consistency()
00207                 except Exception as ex:
00208                     raise ex
00209                 current_SM.close()
00210             
00211             if(len(self._next_parent_list)==0):##There is no lower level
00212                 finish_ = True
00213             else:
00214                 self._current_parent_list = self._next_parent_list ##copy the new list of parent
00215                 self._current_SM_list = self._next_SM_list ##copy the list of State Machine
00216                 
00217         if(self.CheckBool == False):
00218             return None
00219         else:   
00220             return SSM
00221     
00222     
00223     def constructParallel(self, current_level, parent, current_openSM, mainSM, final_states):
00224         
00225         
00226         ID = current_level.attrib.get('id')
00227 
00228         if(ID is None):
00229             rospy.logerr("[SCXML Interpreter] No ID for a State !!!!" )
00230             self.CheckBool = False
00231             return
00232         
00233         ##get onEntry and onExit
00234         onEntry = self.get_on_entry(current_level)
00235         onExit = self.get_on_exit(current_level)
00236         
00237         ##add to the next parent list
00238         self._next_parent_list.append(parent+"/parallel[@id='"+str(ID)+"']")
00239          ##Find the transitions
00240         transitions_ = {}
00241         outcomes_ = []
00242         outcomes_map = {}
00243         
00244         for transition in current_level.findall('transition'):                       
00245             event = transition.attrib.get('event')
00246             if(event is None):
00247                 rospy.logerr("[SCXML Interpreter] No Transition Event for %s !!!!" % ID)
00248                 self.CheckBool = False
00249                 return
00250             target = transition.attrib.get('target')
00251             if(target is None):
00252                 rospy.logerr("[SCXML Interpreter] No Transition Target for %s !!!!" % ID)
00253                 self.CheckBool = False
00254                 return
00255             cond = transition.attrib.get('cond')
00256             if(target is None):
00257                 rospy.logerr("[SCXML Interpreter] No Transition Condition  for %s !!!!" % ID)
00258                 self.CheckBool = False
00259                 return
00260             ##if target a final state send we set on the outcome
00261             for final_id in final_states:
00262                 if(target == final_id):
00263                     target = event 
00264 
00265             map_list = self.convertToConcurenceMap(cond)
00266 
00267             if(len(map_list) == 0):
00268                 rospy.logerr("[SCXML Interpreter] Error during the convertion event to map for %s !!!!" % ID)
00269                 self.CheckBool = False
00270                 return 
00271             
00272             for map_ in map_list:
00273                 
00274                 outcome_ = event
00275                 outcomes_.append(outcome_)
00276                 outcomes_map[outcome_] = map_
00277                 transitions_[outcome_] = target
00278                                                     
00279         ##Add the datamodel
00280         datamodel_ = self.get_datamodel(current_level,mainSM, ID)
00281 
00282         ##Find the io_keys
00283         keys_ = self.findIOkeys(current_level, ID)
00284         
00285         newSM = ssm_concurrence.ssmConcurrence(outcomes_,outcome_map=outcomes_map,input_keys = keys_, output_keys = keys_) ##Define the empty concurence
00286         newSM._datamodel = datamodel_   
00287         newSM._onEntry = onEntry
00288         newSM._onExit = onExit
00289         #Add to the currently openned SM
00290         self.addToSM(current_openSM,ID,newSM,transitions_,keys_)
00291         self._next_SM_list.append(newSM)
00292         
00293     def constructCompoundState(self, current_level, parent, current_openSM, mainSM, final_states):
00294         ##State is parent ==> it's a state machine
00295         ID = current_level.attrib.get('id')
00296 
00297         if(ID is None):
00298             rospy.logerr("[SCXML Interpreter] No ID for a State !!!!" )
00299             self.CheckBool = False
00300             return         
00301         ##get onEntry and onExit
00302         onEntry = self.get_on_entry(current_level)
00303         onExit = self.get_on_exit(current_level)
00304         ##add to the parent list
00305         self._next_parent_list.append(parent+"/state[@id='"+str(ID)+"']")
00306         ##Find the transitions
00307         transitions_ = {}
00308         outcomes_ = []
00309         ##If the current_openSM is not a concurence then we can map the outcome, otherwise we have to map on this level
00310         ##Because there is no final state for a concurence and he can not transition to another state
00311         if(type(current_openSM) is not ssm_concurrence.ssmConcurrence):
00312             for transition in current_level.findall('transition'):
00313                 event = transition.attrib.get('event')
00314                 if(event is None):
00315                     rospy.logerr("[SCXML Interpreter] No Transition Event for %s !!!!" % ID)
00316                     self.CheckBool = False
00317                     return
00318                 target = transition.attrib.get('target')
00319                 if(target is None):
00320                     rospy.logerr("[SCXML Interpreter] No Transition Target for %s !!!!" % ID)
00321                     self.CheckBool = False
00322                     return
00323                 ##if target a final state send we set on the outcome
00324                 for final_id in final_states:
00325                     if(target == final_id):
00326                         target = event
00327                     
00328                 outcomes_.append(event)
00329                 transitions_[event] = target
00330         else:
00331             ##remplace the final states of the parent to this level
00332             final_states = self.get_final_states_id(current_level)
00333             if(final_states == None):
00334                 rospy.logerr("[SCXML Interpreter] No final State found in "+ID+" !!!!")
00335                 self.CheckBool = False
00336                 return
00337             ##if target a final state send we set on the outcome
00338             for final_id in final_states:
00339                 for transition in current_level.findall("./state/transition[@target='"+str(final_id)+"']"):
00340                     event = transition.attrib.get('event')
00341                     if(event is None):
00342                         rospy.logerr("[SCXML Interpreter] No Transition Event for %s !!!!" % ID)
00343                         self.CheckBool = False
00344                         return
00345                     target = transition.attrib.get('target')
00346                     if(target is None):
00347                         rospy.logerr("[SCXML Interpreter] No Transition Target for %s !!!!" % ID)
00348                         self.CheckBool = False
00349                         return
00350                 
00351                     outcomes_.append(event)
00352                     transitions_[event] = target
00353 
00354         ##Add the datamodel
00355         datamodel_ = self.get_datamodel(current_level,mainSM, ID)
00356         if(datamodel_ is None):
00357             return    
00358         ##Find the io_keys
00359         keys_ = self.findIOkeys(current_level, ID)
00360         if(keys_ is None):
00361             return
00362         ##Add the intial state
00363         initial = current_level.find("initial")
00364         if(initial is None):
00365             rospy.logerr("[SCXML Interpreter] No Initial state for a Parent State !!!!" )
00366             self.CheckBool = False
00367             return
00368         
00369         newSM = ssm_state_machine.ssmStateMachine(outcomes_, input_keys = keys_, output_keys = keys_) ##Define the empty state machine
00370         newSM._datamodel = datamodel_
00371         newSM._onEntry = onEntry
00372         newSM._onExit = onExit
00373         
00374         newSM._initial_state_label = str(initial.find('transition').attrib.get('target'))
00375         self.addToSM(current_openSM,ID,newSM,transitions_, keys_)
00376         self._next_SM_list.append(newSM)
00377                         
00378     def constructSimpleState(self, current_level, current_openSM, mainSM, final_states):
00379         
00380         ID = current_level.attrib.get('id')
00381         
00382         if(ID is None):
00383             rospy.logerr("[SCXML Interpreter] No ID for a State !!!!" )
00384             self.CheckBool = False
00385             return         
00386         ##get onEntry and onExit
00387         onEntry = self.get_on_entry(current_level)
00388         onExit = self.get_on_exit(current_level)
00389         ##Find the transitions
00390         transitions_ = {}
00391         for transition in current_level.findall('transition'):
00392             event = transition.attrib.get('event')
00393             if(event is None):
00394                 rospy.logerr("[SCXML Interpreter] No Transition Event for %s !!!!" % ID)
00395                 self.CheckBool = False
00396                 return
00397             target = transition.attrib.get('target')
00398             if(target is None):
00399                 rospy.logerr("[SCXML Interpreter] No Transition Target for %s !!!!" % ID)
00400                 self.CheckBool = False
00401                 return
00402             ##if target a final state send we set on the outcome
00403             for final_id in final_states:
00404                 if(target == final_id):
00405                     target = event
00406                     
00407             transitions_[event] = target
00408         datamodel_ = self.get_datamodel(current_level,mainSM, ID)
00409 
00410         ##Find the skill
00411         skill_name = current_level.find("./datamodel/data[@id='skill']")
00412         if(skill_name is not None):
00413             if('expr' in skill_name.attrib):
00414                 try:
00415                     State_ref = self.skillProvider.load(skill_name.attrib.get('expr'))
00416                     State = State_ref()
00417                 except Exception as ex:
00418                     self.CheckBool = False
00419                     return
00420             else:
00421                 rospy.logerr('[SCXML Interpreter] Skill "expr" not defined in state "%s" !'%ID)
00422                 self.CheckBool = False
00423                 return
00424         else:
00425             rospy.logwarn("[SCXML Interpreter] No skill found for the state : %s. We use the EmptyState." %ID)
00426             State = ssm_state.EmptyState()
00427             
00428         keys_ = self.findIOkeys(current_level, ID)
00429         if(keys_ is None):
00430             return
00431 
00432         State.register_io_keys(keys_) 
00433         State._datamodel = datamodel_   
00434         State._onEntry = onEntry
00435         State._onExit = onExit
00436         self.addToSM(current_openSM,ID,State,transitions_,keys_)
00437         #Add the io_keys to the mainSM
00438         for keys in State.get_registered_input_keys():
00439             if(not(mainSM.userdata.__contains__(keys))):
00440                 mainSM.userdata.__setattr__(keys, None)
00441               
00442     def get_final_states_id(self, current_level):
00443         final_states = current_level.findall('./final')
00444         final_states_id = []
00445         if(len(final_states) == 0):
00446             rospy.logerr("[SCXML Interpreter] No final State found for the state : %s"%str(current_level))
00447             self.CheckBool = False
00448             return None
00449         for state in final_states:
00450             final_id = state.attrib.get('id')
00451             if(final_id == None):
00452                 rospy.logerr("[SCXML Interpreter] No id for the final state : %s"%str(state))
00453                 self.CheckBool = False
00454                 return None
00455             else:
00456                 final_states_id.append(final_id)
00457                 
00458         return final_states_id 
00459     
00460     def addToSM(self,current_SM,added_ID, added_State, transitions_,keys_):
00461         '''
00462         Add a state, a state machine or a concurence to the current state machine / concurence
00463         '''
00464         remapping_ = {'logfile':'logfile'}
00465         for keys in keys_:  
00466             remapping_[keys] = keys
00467         if(type(current_SM) is ssm_state_machine.ssmStateMachine or type(current_SM) is ssm_state_machine.ssmMainStateMachine):
00468             current_SM.add(added_ID,added_State,transitions_,remapping_)
00469         elif(type(current_SM) is ssm_concurrence.ssmConcurrence):
00470             current_SM.add(added_ID,added_State,remapping_)
00471                     
00472     def convertToConcurenceMap(self, event):
00473         '''
00474         Split the event in the transition into a map for the concurence purpose
00475         For example :
00476         A.Succeeded AND B.Failed ==> [{'A':'Succeeded', 'B':'Failed'}]
00477         A.Succeeded OR B.Failed ==> [{'A':'Succeeded'}, {'B':'Failed'}]
00478         A.Failed ==> [{'A':'Failed'}]
00479         '''
00480         map_list = []
00481         if(event.find(' AND ') == -1): ##There is no "and" condition
00482             if(event.find(' OR ') == -1): ## There is no "or" condition
00483                 list_ = event.split('.')
00484                 map_list.append({list_[0]:list_[1]})
00485             else:
00486                 list_or = event.split(' OR ')
00487                 for event_or in list_or:
00488                     list_ = event_or.split('.')
00489                     map_list.append({list_[0]:list_[1]})
00490         else:
00491             list_and = event.split(' AND ')
00492             map_ = {}
00493             for event_and in list_and:
00494                 list_and = event_and.split('.')
00495                 map_[list_and[0]] = list_and[1]
00496             map_list.append(map_)
00497         
00498         return map_list
00499     
00500     def get_datamodel(self, state, mainSM, ID):
00501         datamodel_ = {}
00502         for data in state.findall('./datamodel/data'):  
00503             data_ID = data.attrib['id']
00504             if('expr' in data.attrib):
00505                 data_expr = data.attrib['expr']
00506             else:
00507                 rospy.logwarn('[SCXML Interpreter] State : "%s". "expr" not defined for data "%s". It will be set to \"\"!'%(ID,data_ID))
00508                 data_expr = ""
00509             datamodel_[data_ID] = data_expr
00510             if(not(mainSM.userdata.__contains__(data_ID))):
00511                 mainSM.userdata.__setattr__(data_ID, data_expr)
00512         return datamodel_
00513                 
00514     def get_on_entry(self, state):
00515         
00516         if(state.find('onentry') == None):
00517             return None
00518         else:
00519             logs = {}
00520             script = None
00521             if(state.find('./onentry/script') is not None):
00522                 script = state.find('./onentry/script').text
00523             
00524             for log in state.findall('./onentry/log'):
00525                 logs[log.attrib['label']] = log.attrib['expr']
00526             
00527             return ssm_on.onEntry(logs, script)
00528                     
00529     def get_on_exit(self, state):
00530         
00531         if(state.find('onexit') == None):
00532             return None
00533         else:
00534             logs = {}
00535             script = None
00536             if(state.find('./onexit/script') is not None):
00537                 script = state.find('./onexit/script').text
00538             
00539             for log in state.findall('./onexit/log'):
00540                 logs[log.attrib['label']] = log.attrib['expr']
00541             
00542             return ssm_on.onExit(logs, script)
00543         
00544     def findIOkeys(self, state, ID):
00545         io_keys = set()
00546         for data in state.findall('.//datamodel/data'):
00547             ##Grab all data ID
00548             io_keys.add(data.attrib['id'])
00549             ##Grad all skills
00550             if(data.attrib['id'] == 'skill'):
00551                 if('expr' in data.attrib):
00552                     try:
00553                         State = self.skillProvider.load(data.attrib['expr'])()
00554                     except Exception as ex:
00555                         rospy.logerr('[SCXML Interpreter] Import fail from Skill "%s" in state : "%s"!'%(data.attrib['expr'],ID))
00556                         rospy.logerr(ex)
00557                         return None
00558                 else:
00559                     rospy.logerr('[SCXML Interpreter] Skill "expr" not defined in state "%s" !'%ID)
00560                     return None
00561             else:
00562                 State = ssm_state.EmptyState()
00563             for key in State.get_registered_input_keys():
00564                 io_keys.add(str(key))
00565                 
00566         for log in state.findall('.//onentry/log'):
00567             ##Grab all data ID
00568             if(not(log.attrib['label'] == "") and not(log.attrib['label'] == 'outcome')):
00569                 io_keys.add(log.attrib['label'])
00570         for log in state.findall('.//onexit/log'):
00571             ##Grab all data ID
00572             if(not(log.attrib['label'] == "") and not(log.attrib['label'] == 'outcome')):
00573                 io_keys.add(log.attrib['label']) 
00574                 
00575         return list(io_keys)
00576     
00577     
00578         
00579     
00580 
00581         


airbus_ssm_core
Author(s): Ludovic Delval
autogenerated on Thu Jun 6 2019 17:59:28