operatable_state_machine.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import rospy
3 import zlib
4 import smach
5 
6 from rospy.exceptions import ROSInterruptException
7 
8 from flexbe_core.core.preemptable_state_machine import PreemptableStateMachine
9 from flexbe_core.core.lockable_state_machine import LockableStateMachine
10 
11 from smach.state_machine import StateMachine
12 
13 from flexbe_core.proxy import ProxyPublisher, ProxySubscriberCached
14 from flexbe_msgs.msg import Container, ContainerStructure, OutcomeRequest, BehaviorSync, CommandFeedback, BehaviorLog
15 from std_msgs.msg import Empty, UInt8, Int32
16 
17 from flexbe_core.core.loopback_state import LoopbackState
18 from flexbe_core.state_logger import StateLogger
19 
20 
22  """
23  A state machine that can be operated.
24  It synchronizes its current state with the mirror and supports some control mechanisms.
25  """
26 
27  autonomy_level = 3
28  silent_mode = False
29 
30  def __init__(self, *args, **kwargs):
31  super(OperatableStateMachine, self).__init__(*args, **kwargs)
32  self._message = None
33  self._rate = rospy.Rate(10)
34 
35  self.id = None
36  self.autonomy = None
37 
38  self._autonomy = {}
39  self._ordered_states = []
40  self._inner_sync_request = False
41 
42  self._pub = ProxyPublisher()
43 
44  self._sub = ProxySubscriberCached()
45 
46 
47  def execute(self, parent_ud = smach.UserData()):
48  # First time in this state machine
49  if self._current_state is None:
50  # Spew some info
51  smach.loginfo("State machine starting in initial state '%s' with userdata: \n\t%s" %
52  (self._current_label, list(self.userdata.keys())))
53  # Set initial state
54  self._set_current_state(self._initial_state_label)
55 
56  # Initialize preempt state
57  self._preempted_label = None
58  self._preempted_state = None
59 
60  # Call start callbacks
61  self.call_start_cbs()
62 
63  # Copy input keys
64  self._copy_input_keys(parent_ud, self.userdata)
65 
66  container_outcome = self._loopback_name
67  if self.id is not None:
68  # only top-level statemachine should loop for outcome
69  while container_outcome == self._loopback_name and not smach.is_shutdown():
70  # Update the state machine
71  container_outcome = self._async_execute(parent_ud)
72  else:
73  container_outcome = self._async_execute(parent_ud)
74 
75  if smach.is_shutdown():
76  container_outcome = 'preempted'
77 
78  return container_outcome
79 
80 
81  def _async_execute(self, parent_ud = smach.UserData()):
82  """Run the state machine on entry to this state.
83  This will set the "closed" flag and spin up the execute thread. Once
84  this flag has been set, it will prevent more states from being added to
85  the state machine.
86  """
87 
88  # This will prevent preempts from getting propagated to non-existent children
89  with self._state_transitioning_lock:
90  # Check state consistency
91  try:
92  self.check_consistency()
93  except (smach.InvalidStateError, smach.InvalidTransitionError):
94  smach.logerr("Container consistency check failed.")
95  return None
96 
97  # Set running flag
98  self._is_running = True
99 
100  # Initialize container outcome
101  container_outcome = None
102 
103  # Step through state machine
104  if self._is_running and not smach.is_shutdown():
105  # Update the state machine
106  container_outcome = self._update_once()
107  if self._current_state is not None:
108  try:
109  self._current_state._rate.sleep()
110  except ROSInterruptException:
111  rospy.logwarn('Interrupted rate sleep.')
112 
113  if container_outcome is not None and container_outcome != self._loopback_name:
114  # Copy output keys
115  self._copy_output_keys(self.userdata, parent_ud)
116  else:
117  container_outcome = self._loopback_name
118 
119  # provide explicit sync as back-up functionality
120  # should be used only if there is no other choice
121  # since it requires additional 8 byte + header update bandwith and time to restart mirror
122  if self._inner_sync_request and self._get_deep_state() is not None:
123  self._inner_sync_request = False
124  if self.id is None:
125  self._parent._inner_sync_request = True
126  else:
127  msg = BehaviorSync()
128  msg.behavior_id = self.id
129  msg.current_state_checksum = zlib.adler32(self._get_deep_state()._get_path())
130  self._pub.publish('flexbe/mirror/sync', msg)
131 
132  # We're no longer running
133  self._is_running = False
134 
135  return container_outcome
136 
137 
138  @staticmethod
139  def add(label, state, transitions = None, autonomy = None, remapping = None):
140  """
141  Add a state to the opened state machine.
142 
143  @type label: string
144  @param label: The label of the state being added.
145 
146  @param state: An instance of a class implementing the L{State} interface.
147 
148  @param transitions: A dictionary mapping state outcomes to other state
149  labels or container outcomes.
150 
151  @param autonomy: A dictionary mapping state outcomes to their required
152  autonomy level
153 
154  @param remapping: A dictionary mapping local userdata keys to userdata
155  keys in the container.
156  """
157  self = StateMachine._currently_opened_container()
158 
159  # add loopback transition to loopback states
160  if isinstance(state, LoopbackState):
161  transitions[LoopbackState._loopback_name] = label
162  autonomy[LoopbackState._loopback_name] = -1
163  if isinstance(state, OperatableStateMachine):
164  transitions[OperatableStateMachine._loopback_name] = label
165  autonomy[OperatableStateMachine._loopback_name] = -1
166 
167  self._ordered_states.append(state)
168  state.name = label
169  state.transitions = transitions
170  state.autonomy = autonomy
171  state._parent = self
172 
173  StateMachine.add(label, state, transitions, remapping)
174  self._autonomy[label] = autonomy
175 
176  def replace(self, new_state):
177  old_state = self._states[new_state.name]
178  new_state.transitions = old_state.transitions
179  new_state.autonomy = old_state.autonomy
180  new_state._parent = old_state._parent
181 
182  self._ordered_states[self._ordered_states.index(old_state)] = new_state
183  self._states[new_state.name] = new_state
184 
185 
186  def destroy(self):
187  self._notify_stop()
188  self._disable_ros_control()
189  self._sub.unsubscribe_topic('flexbe/command/autonomy')
190  self._sub.unsubscribe_topic('flexbe/command/sync')
191  self._sub.unsubscribe_topic('flexbe/command/attach')
192  self._sub.unsubscribe_topic('flexbe/request_mirror_structure')
193  StateLogger.shutdown()
194 
195 
196  def confirm(self, name, id):
197  """
198  Confirms the state machine and triggers the creation of the structural message.
199  It is mandatory to call this function at the top-level state machine
200  between building it and starting its execution.
201 
202  @type name: string
203  @param name: The name of this state machine to identify it.
204  """
205  self.name = name
206  self.id = id
207 
208  self._pub.createPublisher('flexbe/mirror/sync', BehaviorSync, _latch = True) # Update mirror with currently active state (high bandwidth mode)
209  self._pub.createPublisher('flexbe/mirror/preempt', Empty, _latch = True) # Preempts the mirror
210  self._pub.createPublisher('flexbe/mirror/structure', ContainerStructure) # Sends the current structure to the mirror
211  self._pub.createPublisher('flexbe/log', BehaviorLog) # Topic for logs to the GUI
212  self._pub.createPublisher('flexbe/command_feedback', CommandFeedback) # Gives feedback about executed commands to the GUI
213 
214  self._sub.subscribe('flexbe/command/autonomy', UInt8, self._set_autonomy_level)
215  self._sub.subscribe('flexbe/command/sync', Empty, self._sync_callback)
216  self._sub.subscribe('flexbe/command/attach', UInt8, self._attach_callback)
217  self._sub.subscribe('flexbe/request_mirror_structure', Int32, self._mirror_structure_callback)
218 
219  StateLogger.initialize(name)
220  if OperatableStateMachine.autonomy_level != 255:
221  self._enable_ros_control()
222 
223  rospy.sleep(0.5) # no clean way to wait for publisher to be ready...
224  self._notify_start()
225 
226 
227  def _set_autonomy_level(self, msg):
228  """ Sets the current autonomy level. """
229  if OperatableStateMachine.autonomy_level != msg.data:
230  rospy.loginfo('--> Autonomy changed to %d', msg.data)
231 
232  if msg.data < 0:
233  self.preempt()
234  else:
235  OperatableStateMachine.autonomy_level = msg.data
236 
237  self._pub.publish('flexbe/command_feedback', CommandFeedback(command="autonomy", args=[]))
238 
239 
240  def _sync_callback(self, msg):
241  rospy.loginfo("--> Synchronization requested...")
242  msg = BehaviorSync()
243  msg.behavior_id = self.id
244  while self._get_deep_state() is None:
245  rospy.sleep(0.1)
246  msg.current_state_checksum = zlib.adler32(self._get_deep_state()._get_path())
247  self._pub.publish('flexbe/mirror/sync', msg)
248  self._pub.publish('flexbe/command_feedback', CommandFeedback(command="sync", args=[]))
249  rospy.loginfo("<-- Sent synchronization message for mirror.")
250 
251 
252  def _attach_callback(self, msg):
253  rospy.loginfo("--> Enabling control...")
254  # set autonomy level
255  OperatableStateMachine.autonomy_level = msg.data
256  # enable control of states
257  self._enable_ros_control()
258  self._inner_sync_request = True
259  # send command feedback
260  cfb = CommandFeedback(command="attach")
261  cfb.args.append(self.name)
262  self._pub.publish('flexbe/command_feedback', cfb)
263  rospy.loginfo("<-- Sent attach confirm.")
264 
265 
267  rospy.loginfo("--> Creating behavior structure for mirror...")
268  msg = self._build_msg('')
269  msg.behavior_id = self.id
270  self._pub.publish('flexbe/mirror/structure', msg)
271  rospy.loginfo("<-- Sent behavior structure for mirror.")
272 
273 
274  def _transition_allowed(self, label, outcome):
275  return self._autonomy[label][outcome] < OperatableStateMachine.autonomy_level
276 
277 
278  def _build_msg(self, prefix, msg = None):
279  """
280  Adds this state machine to the initial structure message.
281 
282  @type prefix: string
283  @param prefix: A path consisting of the container hierarchy containing this state.
284 
285  @type msg: ContainerStructure
286  @param msg: The message that will finally contain the structure message.
287  """
288  # set children
289  children = []
290  for state in self._ordered_states:
291  children.append(str(state.name))
292 
293  # set name
294  name = prefix + (self.name if self.id is None else '')
295 
296  if msg is None:
297  # top-level state machine (has no transitions)
298  self._message = ContainerStructure()
299  outcomes = list(self._outcomes)
300  transitions = None
301  autonomy = None
302  else:
303  # lower-level state machine
304  self._message = msg
305  outcomes = list(self.transitions)
306  # set transitions and autonomy
307  transitions = []
308  autonomy = []
309  for i in range(len(self.transitions)):
310  outcome = outcomes[i]
311  if outcome == 'preempted': # set preempt transition
312  transitions.append('preempted')
313  autonomy.append(-1)
314  else:
315  transitions.append(str(self.transitions[outcome]))
316  autonomy.append(self.autonomy[outcome])
317 
318  # add to message
319  self._message.containers.append(Container(name, children, outcomes, transitions, autonomy))
320 
321  # build message for children
322  for state in self._ordered_states:
323  state._build_msg(name+'/', self._message)
324 
325  # top-level state machine returns the message
326  if msg is None:
327  return self._message
328 
329 
330  def _notify_start(self):
331  for state in self._ordered_states:
332  if isinstance(state, LoopbackState):
333  state.on_start()
334  if isinstance(state, OperatableStateMachine):
335  state._notify_start()
336 
338  self._is_controlled = True
339  for state in self._ordered_states:
340  if isinstance(state, LoopbackState):
341  state._enable_ros_control()
342  if isinstance(state, OperatableStateMachine):
343  state._enable_ros_control()
344 
345  def _notify_stop(self):
346  for state in self._ordered_states:
347  if isinstance(state, LoopbackState):
348  state.on_stop()
349  state._disable_ros_control()
350  if isinstance(state, OperatableStateMachine):
351  state._notify_stop()
352 
354  self._is_controlled = False
355  for state in self._ordered_states:
356  if isinstance(state, LoopbackState):
357  state._disable_ros_control()
358  if isinstance(state, OperatableStateMachine):
359  state._disable_ros_control()
360 
361  def on_exit(self, userdata):
362  if self._current_state is not None:
363  ud = smach.Remapper(
364  self.userdata,
365  self._current_state.get_registered_input_keys(),
366  self._current_state.get_registered_output_keys(),
367  self._remappings[self._current_state.name]
368  )
369  self._current_state._entering = True
370  self._current_state.on_exit(ud)
371  self._current_state = None
def add(label, state, transitions=None, autonomy=None, remapping=None)


flexbe_core
Author(s): Philipp Schillinger
autogenerated on Wed Jun 5 2019 21:51:59