00001
00002 import rospy
00003 import zlib
00004 import smach
00005
00006 from rospy.exceptions import ROSInterruptException
00007
00008 from flexbe_core.core.preemptable_state_machine import PreemptableStateMachine
00009 from flexbe_core.core.lockable_state_machine import LockableStateMachine
00010
00011 from smach.state_machine import StateMachine
00012
00013 from flexbe_core.proxy import ProxyPublisher, ProxySubscriberCached
00014 from flexbe_msgs.msg import Container, ContainerStructure, OutcomeRequest, BehaviorSync, CommandFeedback, BehaviorLog
00015 from std_msgs.msg import Empty, UInt8, Int32
00016
00017 from flexbe_core.core.loopback_state import LoopbackState
00018 from flexbe_core.state_logger import StateLogger
00019
00020
00021 class OperatableStateMachine(PreemptableStateMachine):
00022 """
00023 A state machine that can be operated.
00024 It synchronizes its current state with the mirror and supports some control mechanisms.
00025 """
00026
00027 autonomy_level = 3
00028 silent_mode = False
00029
00030 def __init__(self, *args, **kwargs):
00031 super(OperatableStateMachine, self).__init__(*args, **kwargs)
00032 self._message = None
00033 self._rate = rospy.Rate(10)
00034
00035 self.id = None
00036 self.autonomy = None
00037
00038 self._autonomy = {}
00039 self._ordered_states = []
00040 self._inner_sync_request = False
00041
00042 self._pub = ProxyPublisher()
00043
00044 self._sub = ProxySubscriberCached()
00045
00046
00047 def execute(self, parent_ud = smach.UserData()):
00048
00049 if self._current_state is None:
00050
00051 smach.loginfo("State machine starting in initial state '%s' with userdata: \n\t%s" %
00052 (self._current_label, list(self.userdata.keys())))
00053
00054 self._set_current_state(self._initial_state_label)
00055
00056
00057 self._preempted_label = None
00058 self._preempted_state = None
00059
00060
00061 self.call_start_cbs()
00062
00063
00064 self._copy_input_keys(parent_ud, self.userdata)
00065
00066 container_outcome = self._loopback_name
00067 if self.id is not None:
00068
00069 while container_outcome == self._loopback_name and not smach.is_shutdown():
00070
00071 container_outcome = self._async_execute(parent_ud)
00072 else:
00073 container_outcome = self._async_execute(parent_ud)
00074
00075 if smach.is_shutdown():
00076 container_outcome = 'preempted'
00077
00078 return container_outcome
00079
00080
00081 def _async_execute(self, parent_ud = smach.UserData()):
00082 """Run the state machine on entry to this state.
00083 This will set the "closed" flag and spin up the execute thread. Once
00084 this flag has been set, it will prevent more states from being added to
00085 the state machine.
00086 """
00087
00088
00089 with self._state_transitioning_lock:
00090
00091 try:
00092 self.check_consistency()
00093 except (smach.InvalidStateError, smach.InvalidTransitionError):
00094 smach.logerr("Container consistency check failed.")
00095 return None
00096
00097
00098 self._is_running = True
00099
00100
00101 container_outcome = None
00102
00103
00104 if self._is_running and not smach.is_shutdown():
00105
00106 container_outcome = self._update_once()
00107 if self._current_state is not None:
00108 try:
00109 self._current_state._rate.sleep()
00110 except ROSInterruptException:
00111 rospy.logwarn('Interrupted rate sleep.')
00112
00113 if container_outcome is not None and container_outcome != self._loopback_name:
00114
00115 self._copy_output_keys(self.userdata, parent_ud)
00116 else:
00117 container_outcome = self._loopback_name
00118
00119
00120
00121
00122 if self._inner_sync_request and self._get_deep_state() is not None:
00123 self._inner_sync_request = False
00124 if self.id is None:
00125 self._parent._inner_sync_request = True
00126 else:
00127 msg = BehaviorSync()
00128 msg.behavior_id = self.id
00129 msg.current_state_checksum = zlib.adler32(self._get_deep_state()._get_path())
00130 self._pub.publish('flexbe/mirror/sync', msg)
00131
00132
00133 self._is_running = False
00134
00135 return container_outcome
00136
00137
00138 @staticmethod
00139 def add(label, state, transitions = None, autonomy = None, remapping = None):
00140 """
00141 Add a state to the opened state machine.
00142
00143 @type label: string
00144 @param label: The label of the state being added.
00145
00146 @param state: An instance of a class implementing the L{State} interface.
00147
00148 @param transitions: A dictionary mapping state outcomes to other state
00149 labels or container outcomes.
00150
00151 @param autonomy: A dictionary mapping state outcomes to their required
00152 autonomy level
00153
00154 @param remapping: A dictionary mapping local userdata keys to userdata
00155 keys in the container.
00156 """
00157 self = StateMachine._currently_opened_container()
00158
00159
00160 if isinstance(state, LoopbackState):
00161 transitions[LoopbackState._loopback_name] = label
00162 autonomy[LoopbackState._loopback_name] = -1
00163 if isinstance(state, OperatableStateMachine):
00164 transitions[OperatableStateMachine._loopback_name] = label
00165 autonomy[OperatableStateMachine._loopback_name] = -1
00166
00167 self._ordered_states.append(state)
00168 state.name = label
00169 state.transitions = transitions
00170 state.autonomy = autonomy
00171 state._parent = self
00172
00173 StateMachine.add(label, state, transitions, remapping)
00174 self._autonomy[label] = autonomy
00175
00176 def replace(self, new_state):
00177 old_state = self._states[new_state.name]
00178 new_state.transitions = old_state.transitions
00179 new_state.autonomy = old_state.autonomy
00180 new_state._parent = old_state._parent
00181
00182 self._ordered_states[self._ordered_states.index(old_state)] = new_state
00183 self._states[new_state.name] = new_state
00184
00185
00186 def destroy(self):
00187 self._notify_stop()
00188 self._disable_ros_control()
00189 self._sub.unsubscribe_topic('flexbe/command/autonomy')
00190 self._sub.unsubscribe_topic('flexbe/command/sync')
00191 self._sub.unsubscribe_topic('flexbe/command/attach')
00192 self._sub.unsubscribe_topic('flexbe/request_mirror_structure')
00193 StateLogger.shutdown()
00194
00195
00196 def confirm(self, name, id):
00197 """
00198 Confirms the state machine and triggers the creation of the structural message.
00199 It is mandatory to call this function at the top-level state machine
00200 between building it and starting its execution.
00201
00202 @type name: string
00203 @param name: The name of this state machine to identify it.
00204 """
00205 self.name = name
00206 self.id = id
00207
00208 self._pub.createPublisher('flexbe/mirror/sync', BehaviorSync, _latch = True)
00209 self._pub.createPublisher('flexbe/mirror/preempt', Empty, _latch = True)
00210 self._pub.createPublisher('flexbe/mirror/structure', ContainerStructure)
00211 self._pub.createPublisher('flexbe/log', BehaviorLog)
00212 self._pub.createPublisher('flexbe/command_feedback', CommandFeedback)
00213
00214 self._sub.subscribe('flexbe/command/autonomy', UInt8, self._set_autonomy_level)
00215 self._sub.subscribe('flexbe/command/sync', Empty, self._sync_callback)
00216 self._sub.subscribe('flexbe/command/attach', UInt8, self._attach_callback)
00217 self._sub.subscribe('flexbe/request_mirror_structure', Int32, self._mirror_structure_callback)
00218
00219 StateLogger.initialize(name)
00220 if OperatableStateMachine.autonomy_level != 255:
00221 self._enable_ros_control()
00222
00223 rospy.sleep(0.5)
00224 self._notify_start()
00225
00226
00227 def _set_autonomy_level(self, msg):
00228 """ Sets the current autonomy level. """
00229 if OperatableStateMachine.autonomy_level != msg.data:
00230 rospy.loginfo('--> Autonomy changed to %d', msg.data)
00231
00232 if msg.data < 0:
00233 self.preempt()
00234 else:
00235 OperatableStateMachine.autonomy_level = msg.data
00236
00237 self._pub.publish('flexbe/command_feedback', CommandFeedback(command="autonomy", args=[]))
00238
00239
00240 def _sync_callback(self, msg):
00241 rospy.loginfo("--> Synchronization requested...")
00242 msg = BehaviorSync()
00243 msg.behavior_id = self.id
00244 while self._get_deep_state() is None:
00245 rospy.sleep(0.1)
00246 msg.current_state_checksum = zlib.adler32(self._get_deep_state()._get_path())
00247 self._pub.publish('flexbe/mirror/sync', msg)
00248 self._pub.publish('flexbe/command_feedback', CommandFeedback(command="sync", args=[]))
00249 rospy.loginfo("<-- Sent synchronization message for mirror.")
00250
00251
00252 def _attach_callback(self, msg):
00253 rospy.loginfo("--> Enabling control...")
00254
00255 OperatableStateMachine.autonomy_level = msg.data
00256
00257 self._enable_ros_control()
00258 self._inner_sync_request = True
00259
00260 cfb = CommandFeedback(command="attach")
00261 cfb.args.append(self.name)
00262 self._pub.publish('flexbe/command_feedback', cfb)
00263 rospy.loginfo("<-- Sent attach confirm.")
00264
00265
00266 def _mirror_structure_callback(self, msg):
00267 rospy.loginfo("--> Creating behavior structure for mirror...")
00268 msg = self._build_msg('')
00269 msg.behavior_id = self.id
00270 self._pub.publish('flexbe/mirror/structure', msg)
00271 rospy.loginfo("<-- Sent behavior structure for mirror.")
00272
00273
00274 def _transition_allowed(self, label, outcome):
00275 return self._autonomy[label][outcome] < OperatableStateMachine.autonomy_level
00276
00277
00278 def _build_msg(self, prefix, msg = None):
00279 """
00280 Adds this state machine to the initial structure message.
00281
00282 @type prefix: string
00283 @param prefix: A path consisting of the container hierarchy containing this state.
00284
00285 @type msg: ContainerStructure
00286 @param msg: The message that will finally contain the structure message.
00287 """
00288
00289 children = []
00290 for state in self._ordered_states:
00291 children.append(str(state.name))
00292
00293
00294 name = prefix + (self.name if self.id is None else '')
00295
00296 if msg is None:
00297
00298 self._message = ContainerStructure()
00299 outcomes = list(self._outcomes)
00300 transitions = None
00301 autonomy = None
00302 else:
00303
00304 self._message = msg
00305 outcomes = list(self.transitions)
00306
00307 transitions = []
00308 autonomy = []
00309 for i in range(len(self.transitions)):
00310 outcome = outcomes[i]
00311 if outcome == 'preempted':
00312 transitions.append('preempted')
00313 autonomy.append(-1)
00314 else:
00315 transitions.append(str(self.transitions[outcome]))
00316 autonomy.append(self.autonomy[outcome])
00317
00318
00319 self._message.containers.append(Container(name, children, outcomes, transitions, autonomy))
00320
00321
00322 for state in self._ordered_states:
00323 state._build_msg(name+'/', self._message)
00324
00325
00326 if msg is None:
00327 return self._message
00328
00329
00330 def _notify_start(self):
00331 for state in self._ordered_states:
00332 if isinstance(state, LoopbackState):
00333 state.on_start()
00334 if isinstance(state, OperatableStateMachine):
00335 state._notify_start()
00336
00337 def _enable_ros_control(self):
00338 self._is_controlled = True
00339 for state in self._ordered_states:
00340 if isinstance(state, LoopbackState):
00341 state._enable_ros_control()
00342 if isinstance(state, OperatableStateMachine):
00343 state._enable_ros_control()
00344
00345 def _notify_stop(self):
00346 for state in self._ordered_states:
00347 if isinstance(state, LoopbackState):
00348 state.on_stop()
00349 state._disable_ros_control()
00350 if isinstance(state, OperatableStateMachine):
00351 state._notify_stop()
00352
00353 def _disable_ros_control(self):
00354 self._is_controlled = False
00355 for state in self._ordered_states:
00356 if isinstance(state, LoopbackState):
00357 state._disable_ros_control()
00358 if isinstance(state, OperatableStateMachine):
00359 state._disable_ros_control()
00360
00361 def on_exit(self, userdata):
00362 if self._current_state is not None:
00363 ud = smach.Remapper(
00364 self.userdata,
00365 self._current_state.get_registered_input_keys(),
00366 self._current_state.get_registered_output_keys(),
00367 self._remappings[self._current_state.name]
00368 )
00369 self._current_state._entering = True
00370 self._current_state.on_exit(ud)
00371 self._current_state = None