flexbe_mirror.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import roslib; roslib.load_manifest('flexbe_mirror')
4 import rospy
5 import smach
6 import threading
7 import zlib
8 
9 from flexbe_core import JumpableStateMachine, LockableStateMachine
10 from flexbe_core.core import PreemptableState
11 from mirror_state import MirrorState
12 
13 from flexbe_core.proxy import ProxyPublisher, ProxySubscriberCached
14 from flexbe_msgs.msg import ContainerStructure, BehaviorSync, BEStatus
15 from std_msgs.msg import Empty, String, Int32, UInt8
16 
17 '''
18 Created on 08.05.2013
19 
20 @author: Philipp Schillinger
21 '''
22 class VigirBehaviorMirror(object):
23  '''
24  classdocs
25  '''
26 
27 
28  def __init__(self):
29  '''
30  Constructor
31  '''
32  self._sm = None
33 
34  smach.set_loggers (
35  rospy.logdebug, # hide SMACH transition log spamming
36  rospy.logwarn,
37  rospy.logdebug,
38  rospy.logerr
39  )
40 
41  # set up proxys for sm <--> GUI communication
42  # publish topics
43  self._pub = ProxyPublisher({'flexbe/behavior_update': String,
44  'flexbe/request_mirror_structure': Int32})
45 
46  self._running = False
47  self._stopping = False
48  self._active_id = 0
49  self._starting_path = None
50  self._current_struct = None
51  self._sync_lock = threading.Lock()
52 
53  self._outcome_topic = 'flexbe/mirror/outcome'
54 
55  self._struct_buffer = list()
56 
57  # listen for mirror message
58  self._sub = ProxySubscriberCached()
59  self._sub.subscribe(self._outcome_topic, UInt8)
60  self._sub.enable_buffer(self._outcome_topic)
61 
62  self._sub.subscribe('flexbe/mirror/structure', ContainerStructure, self._mirror_callback)
63  self._sub.subscribe('flexbe/status', BEStatus, self._status_callback)
64  self._sub.subscribe('flexbe/mirror/sync', BehaviorSync, self._sync_callback)
65  self._sub.subscribe('flexbe/mirror/preempt', Empty, self._preempt_callback)
66 
67 
68  def _mirror_callback(self, msg):
69  rate = rospy.Rate(10)
70  while self._stopping:
71  rate.sleep()
72 
73  if self._running:
74  rospy.logwarn('Received a new mirror structure while mirror is already running, adding to buffer (checksum: %s).' % str(msg.behavior_id))
75  elif self._active_id != 0 and msg.behavior_id != self._active_id:
76  rospy.logwarn('checksum of received mirror structure (%s) does not match expected checksum (%s), will ignore.' % (str(msg.behavior_id), str(self._active_id)))
77  return
78  else:
79  rospy.loginfo('Received a new mirror structure for checksum %s' % str(msg.behavior_id))
80 
81  self._struct_buffer.append(msg)
82 
83  if self._active_id == msg.behavior_id:
84  self._struct_buffer = list()
85  self._mirror_state_machine(msg)
86  rospy.loginfo('Mirror built.')
87 
88  self._execute_mirror()
89 
90 
91  def _status_callback(self, msg):
92  if msg.code == BEStatus.STARTED:
93  thread = threading.Thread(target=self._start_mirror, args=[msg])
94  thread.daemon = True
95  thread.start()
96  else:
97  thread = threading.Thread(target=self._stop_mirror, args=[msg])
98  thread.daemon = True
99  thread.start()
100 
101 
102  def _start_mirror(self, msg):
103  self._sync_lock.acquire()
104  rate = rospy.Rate(10)
105  while self._stopping:
106  rate.sleep()
107 
108  if self._running:
109  rospy.logwarn('Tried to start mirror while it is already running, will ignore.')
110  return
111 
112  if len(msg.args) > 0:
113  self._starting_path = "/" + msg.args[0][1:].replace("/", "_mirror/") + "_mirror"
114 
115  self._active_id = msg.behavior_id
116 
117  while self._sm is None and len(self._struct_buffer) > 0:
118  struct = self._struct_buffer[0]
119  self._struct_buffer = self._struct_buffer[1:]
120  if struct.behavior_id == self._active_id:
121  self._mirror_state_machine(struct)
122  rospy.loginfo('Mirror built.')
123  else:
124  rospy.logwarn('Discarded mismatching buffered structure for checksum %s' % str(struct.behavior_id))
125 
126  if self._sm is None:
127  rospy.logwarn('Missing correct mirror structure, requesting...')
128  rospy.sleep(0.2) # no clean wayacquire to wait for publisher to be ready...
129  self._pub.publish('flexbe/request_mirror_structure', Int32(msg.behavior_id))
130  self._active_id = msg.behavior_id
131  return
132  self._sync_lock.release()
133 
134  self._execute_mirror()
135 
136 
137  def _stop_mirror(self, msg):
138  self._sync_lock.acquire()
139  self._stopping = True
140  if self._sm is not None and self._running:
141  if msg is not None and msg.code == BEStatus.FINISHED:
142  rospy.loginfo('Onboard behavior finished successfully.')
143  self._pub.publish('flexbe/behavior_update', String())
144  elif msg is not None and msg.code == BEStatus.SWITCHING:
145  self._starting_path = None
146  rospy.loginfo('Onboard performing behavior switch.')
147  elif msg is not None and msg.code == BEStatus.READY:
148  rospy.loginfo('Onboard engine just started, stopping currently running mirror.')
149  self._pub.publish('flexbe/behavior_update', String())
150  elif msg is not None:
151  rospy.logwarn('Onboard behavior failed!')
152  self._pub.publish('flexbe/behavior_update', String())
153 
154  PreemptableState.preempt = True
155  rate = rospy.Rate(10)
156  while self._running:
157  rate.sleep()
158  else:
159  rospy.loginfo('No onboard behavior is active.')
160 
161  self._active_id = 0
162  self._sm = None
163  self._current_struct = None
164  self._sub.remove_last_msg(self._outcome_topic, clear_buffer=True)
165 
166  if msg is not None and msg.code != BEStatus.SWITCHING:
167  rospy.loginfo('\033[92m--- Behavior Mirror ready! ---\033[0m')
168 
169  self._stopping = False
170  self._sync_lock.release()
171 
172 
173  def _sync_callback(self, msg):
174  if msg.behavior_id == self._active_id:
175  thread = threading.Thread(target=self._restart_mirror, args=[msg])
176  thread.daemon = True
177  thread.start()
178  else:
179  rospy.logerr('Cannot synchronize! Different behavior is running onboard, please stop execution!')
180  thread = threading.Thread(target=self._stop_mirror, args=[None])
181  thread.daemon = True
182  thread.start()
183 
184 
185  def _restart_mirror(self, msg):
186  self._sync_lock.acquire()
187  rospy.loginfo('Restarting mirror for synchronization...')
188  self._sub.remove_last_msg(self._outcome_topic, clear_buffer=True)
189  if self._sm is not None and self._running:
190  PreemptableState.preempt = True
191  rate = rospy.Rate(10)
192  while self._running:
193  rate.sleep()
194  self._sm = None
195  if msg.current_state_checksum in self._state_checksums:
196  current_state_path = self._state_checksums[msg.current_state_checksum]
197  self._starting_path = "/" + current_state_path[1:].replace("/", "_mirror/") + "_mirror"
198  rospy.loginfo('Current state: %s' % current_state_path)
199  try:
201  rospy.loginfo('Mirror built.')
202  except (AttributeError, smach.InvalidStateError):
203  rospy.loginfo('Stopping synchronization because behavior has stopped.')
204  self._sync_lock.release()
205 
206  self._execute_mirror()
207 
208 
209  def _execute_mirror(self):
210  self._running = True
211 
212  rospy.loginfo("Executing mirror...")
213  if self._starting_path is not None:
214  LockableStateMachine.path_for_switch = self._starting_path
215  rospy.loginfo("Starting mirror in state " + self._starting_path)
216  self._starting_path = None
217 
218  result = 'preempted'
219  try:
220  result = self._sm.execute()
221  except Exception as e:
222  rospy.loginfo('Caught exception on preempt:\n%s' % str(e))
223  result = 'preempted'
224 
225  if self._sm is not None:
226  self._sm.destroy()
227  self._running = False
228 
229  rospy.loginfo('Mirror finished with result ' + result)
230 
231 
232  def _mirror_state_machine(self, msg):
233  self._current_struct = msg
234  self._state_checksums = dict()
235  root = None
236  for con_msg in msg.containers:
237  if con_msg.path.find('/') == -1:
238  root = con_msg.path
239  break
240  self._add_node(msg, root)
241  # calculate checksums of all states
242  for con_msg in msg.containers:
243  if con_msg.path.find('/') != -1:
244  self._state_checksums[zlib.adler32(con_msg.path)] = con_msg.path
245 
246 
247  def _add_node(self, msg, path):
248  #rospy.loginfo('Add node: '+path)
249  container = None
250  for con_msg in msg.containers:
251  if con_msg.path == path:
252  container = con_msg
253  break
254 
255  transitions = None
256  if container.transitions is not None:
257  transitions = {}
258  for i in range(len(container.transitions)):
259  transitions[container.outcomes[i]] = container.transitions[i] + '_mirror'
260 
261  path_frags = path.split('/')
262  container_name = path_frags[len(path_frags)-1]
263  if len(container.children) > 0:
264  sm_outcomes = []
265  for outcome in container.outcomes: sm_outcomes.append(outcome + '_mirror')
266  sm = JumpableStateMachine(outcomes=sm_outcomes)
267  with sm:
268  for child in container.children:
269  self._add_node(msg, path+'/'+child)
270  if len(transitions) > 0:
271  container_transitions = {}
272  for i in range(len(container.transitions)): container_transitions[sm_outcomes[i]] = transitions[container.outcomes[i]]
273  JumpableStateMachine.add(container_name + '_mirror', sm, transitions=container_transitions)
274  else:
275  self._sm = sm
276  else:
277  JumpableStateMachine.add(container_name + '_mirror', MirrorState(container_name, path, container.outcomes, container.autonomy), transitions=transitions)
278 
279 
280  def _jump_callback(self, msg):
281  start_time = rospy.Time.now()
282  current_elapsed = 0
283  jumpable = True
284  while self._sm is None or not self._sm.is_running():
285  current_elapsed = rospy.Time.now() - start_time
286  if current_elapsed > rospy.Duration.from_sec(10):
287  jumpable = False
288  break
289  rospy.sleep(0.05)
290  if jumpable:
291  self._sm.jump_to(msg.stateName)
292  else:
293  rospy.logwarn('Could not jump in mirror: Mirror does not exist or is not running!')
294 
295 
296  def _preempt_callback(self, msg):
297  if self._sm is not None and self._sm.is_running():
298  rospy.logwarn('Explicit preempting is currently ignored, mirror should be preempted by onboard behavior.')
299  else:
300  rospy.logwarn('Could not preempt mirror because it seems not to be running!')
301 
302 
303 
304 


flexbe_mirror
Author(s): Philipp Schillinger
autogenerated on Wed Jun 5 2019 21:52:12