jumpable_state_machine.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import rospy
3 import smach
4 import time
5 
6 from flexbe_msgs.msg import OutcomeRequest
7 from ..proxy import ProxySubscriberCached
8 
9 from .silent_state_machine import SilentStateMachine
10 
11 
12 class JumpableStateMachine(SilentStateMachine):
13  """
14  A state machine that runs in background and does not report any transition.
15  It can jump to any of its states, executing all states on a random way to the target
16  (so make sure its states have no side effects).
17  """
18  refresh = False
19 
20  def __init__(self, *args, **kwargs):
21  super(JumpableStateMachine, self).__init__(*args, **kwargs)
22 
23  self._sub = ProxySubscriberCached()
24 
25 
26  def jump_to(self, target_state_label):
27  """
28  Jumps to the specified state.
29 
30  @type target_state_label: string
31  @param target_state_label: The name of the state to jump to.
32  Should be unique. If not, the first occurrence will be chosen.
33  """
34  # set current state and current container
35  container = self
36  while isinstance(container._current_state, smach.StateMachine):
37  container = container._current_state
38  current_state = container._current_state
39 
40  # determine path
41  path = self._search_state(current_state, target_state_label, container, [], '')
42  if path is None:
43  rospy.logerr('No path found from '+current_state.name+' to '+target_state_label)
44  return
45 
46  rospy.loginfo('Path: '+path)
47  if path == '':
48  JumpableStateMachine.refresh = True
49  return
50 
51  # traverse path
52  outcomes = path.split('/')
53  for i in range(len(outcomes)-1):
54  current_state = self._get_deep_state()
55  current_state._requested_outcome = outcomes[i]
56  if not self._wait_for_next_state(current_state, 1):
57  rospy.logerr('Unable to jump to '+target_state_label+': Stuck in state '+current_state.name)
58  return
59 
60 
61  def _search_state(self, origin_state, target_state_label, container, seen_states, path):
62  """
63  Searches the given state to create an outcome path to it.
64  This is implemented by a hierarchical depth-first search.
65 
66  @type origin_state: State
67  @param origin_state: The current origin for this search.
68 
69  @type target_state_label: string
70  @param target_state_label: The name of the state to search.
71 
72  @type container: StateMachine
73  @param container: The current container inside which the search is currently done.
74 
75  @type seen_states: list of State
76  @param seen_states: A list of all states that already have been traversed.
77 
78  @type path: string
79  @param path: The outcome path to the current origin_state
80 
81  @return: The whole outcome path to the searched state.
82  """
83  # saw this state
84  seen_states.append(origin_state)
85 
86  # look if we found our target
87  if origin_state.name == target_state_label:
88  return path
89  target_path = None
90 
91  # enter the state machine (next lower hierarchy level)
92  if isinstance(origin_state, smach.StateMachine):
93  # do not need to enter, inital state is implicitly active
94  if origin_state._initial_state_label == target_state_label:
95  return path
96  # search inside this state machine
97  child_state = origin_state._states[origin_state._initial_state_label]
98  if seen_states.count(child_state) == 0:
99  target_path = self._search_state(child_state, target_state_label, origin_state, seen_states, path)
100  # found target inside, search finished
101  if target_path is not None:
102  return target_path
103 
104  # proceed with the neighbors
105  else:
106  for outcome in origin_state._outcomes:
107  next_state_label = origin_state.transitions[outcome]
108  if next_state_label == 'preempted' or next_state_label == 'preempted_mirror': continue # skip preempt outcomes
109  next_container = container
110  reached_exit = False
111 
112  # determine the next state
113  # (consider that it might be necessary to go to the next higher hierarchy level)
114  while not next_container._states.has_key(next_state_label):
115  if next_container._parent is None:
116  reached_exit = True
117  break
118  next_state_label = next_container.transitions[next_state_label]
119  next_container = next_container._parent
120  # reached one outcome of the whole state machine (no higher hierarchy level)
121  if reached_exit: break
122 
123  # search the neighbor if not already seen
124  next_state = next_container._states[next_state_label]
125  if seen_states.count(next_state) == 0:
126  target_path = self._search_state(next_state, target_state_label, next_container, seen_states, path+outcome+'/')
127  # found target in neighbors
128  if target_path is not None:
129  return target_path
130 
131  # unfortunately, no success
132  return None
133 
134 
135  def _wait_for_next_state(self, current_state, timeout, period=0.05):
136  """
137  Sleeps until the state machine proceeded with the next state.
138 
139  @type current_state: State
140  @param current_state: The current state when this method is called.
141 
142  @type timeout: float
143  @param timeout: The time in seconds when this method should stop waiting when no transition is done.
144 
145  @type period: float
146  @param period: The time in seconds how often this method should check the current state.
147 
148  @return: True if waiting was successful, False if current_state is still the same (timeout).
149  """
150  end_time = time.time() + timeout
151  while time.time() < end_time:
152  new_state = self._get_deep_state()
153  if new_state != current_state:
154  return True
155  time.sleep(period)
156  return False
157 
158 
159  def _transition_callback(self, msg):
160  """
161  Forces the state machine to do a transition with a specified outcome.
162 
163  @type msg: OutcomeRequest
164  @param msg: A message containing all relevant information.
165  """
166  state = self._get_deep_state()
167 
168  # wait if state machine is currently not active
169  if state is None:
170  if not self._wait_for_next_state(None, 5):
171  rospy.logwarn('Updating mirror while it is not running!')
172  return # not running
173  state = self._get_deep_state()
174  rospy.logwarn('Had to wait for mirror starting to request transition of state %s!', msg.target)
175 
176  # wait for the target state to not trigger a wrong transition
177  if msg.target != state.name:
178  self._wait_for_next_state(state, 2)
179  state = self._get_deep_state()
180  if msg.target != state.name: # waited for next state, so test current state again (but only once to not block anything)
181  rospy.logwarn('Wrong state name: requested %s but is %s!', msg.target, state.name)
182  return
183  rospy.logwarn('Had to wait for state %s to request transition!', msg.target)
184 
185  # trigger desired outcome or preempt
186  if msg.outcome == 255:
187  state._requested_outcome = 'preempted'
188  else:
189  state._requested_outcome = state._outcome_list[msg.outcome]
190 
191  def destroy(self):
192  pass
def _search_state(self, origin_state, target_state_label, container, seen_states, path)
def _wait_for_next_state(self, current_state, timeout, period=0.05)


flexbe_core
Author(s): Philipp Schillinger
autogenerated on Wed Jun 5 2019 21:51:59