flexbe_mirror.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import rospy
3 import threading
4 import zlib
5 
6 from flexbe_core.core import PreemptableState, PreemptableStateMachine, LockableStateMachine
7 from .mirror_state import MirrorState
8 
9 from flexbe_core.proxy import ProxyPublisher, ProxySubscriberCached
10 from flexbe_msgs.msg import ContainerStructure, BehaviorSync, BEStatus
11 from std_msgs.msg import Empty, String, Int32, UInt8
12 
13 
14 class FlexbeMirror(object):
15 
16  def __init__(self):
17  self._sm = None
18 
19  # set up proxys for sm <--> GUI communication
20  # publish topics
21  self._pub = ProxyPublisher({'flexbe/behavior_update': String,
22  'flexbe/request_mirror_structure': Int32})
23 
24  self._running = False
25  self._stopping = False
26  self._active_id = 0
27  self._starting_path = None
28  self._current_struct = None
29  self._struct_buffer = list()
30  self._sync_lock = threading.Lock()
31 
32  self._outcome_topic = 'flexbe/mirror/outcome'
33 
34  # listen for mirror message
35  self._sub = ProxySubscriberCached()
36  self._sub.subscribe(self._outcome_topic, UInt8)
37  self._sub.enable_buffer(self._outcome_topic)
38 
39  self._sub.subscribe('flexbe/mirror/structure', ContainerStructure, self._mirror_callback)
40  self._sub.subscribe('flexbe/status', BEStatus, self._status_callback)
41  self._sub.subscribe('flexbe/mirror/sync', BehaviorSync, self._sync_callback)
42  self._sub.subscribe('flexbe/mirror/preempt', Empty, self._preempt_callback)
43 
44  def _mirror_callback(self, msg):
45  rate = rospy.Rate(10)
46  while self._stopping:
47  rate.sleep()
48 
49  if self._running:
50  rospy.logwarn('Received a new mirror structure while mirror is already running, '
51  'adding to buffer (checksum: %s).' % str(msg.behavior_id))
52  elif self._active_id != 0 and msg.behavior_id != self._active_id:
53  rospy.logwarn('Checksum of received mirror structure (%s) does not match expected (%s), '
54  'will ignore.' % (str(msg.behavior_id), str(self._active_id)))
55  return
56  else:
57  rospy.loginfo('Received a new mirror structure for checksum %s' % str(msg.behavior_id))
58 
59  self._struct_buffer.append(msg)
60 
61  if self._active_id == msg.behavior_id:
62  self._struct_buffer = list()
63  self._mirror_state_machine(msg)
64  rospy.loginfo('Mirror built.')
65 
66  self._execute_mirror()
67 
68  def _status_callback(self, msg):
69  if msg.code == BEStatus.STARTED:
70  thread = threading.Thread(target=self._start_mirror, args=[msg])
71  thread.daemon = True
72  thread.start()
73  else:
74  thread = threading.Thread(target=self._stop_mirror, args=[msg])
75  thread.daemon = True
76  thread.start()
77 
78  def _start_mirror(self, msg):
79  with self._sync_lock:
80  rate = rospy.Rate(10)
81  while self._stopping:
82  rate.sleep()
83 
84  if self._running:
85  rospy.logwarn('Tried to start mirror while it is already running, will ignore.')
86  return
87 
88  if len(msg.args) > 0:
89  self._starting_path = "/" + msg.args[0][1:].replace("/", "_mirror/") + "_mirror"
90 
91  self._active_id = msg.behavior_id
92 
93  while self._sm is None and len(self._struct_buffer) > 0:
94  struct = self._struct_buffer[0]
95  self._struct_buffer = self._struct_buffer[1:]
96  if struct.behavior_id == self._active_id:
97  self._mirror_state_machine(struct)
98  rospy.loginfo('Mirror built for checksum %s.' % self._active_id)
99  else:
100  rospy.logwarn('Discarded mismatching buffered structure for checksum %s' % str(struct.behavior_id))
101 
102  if self._sm is None:
103  rospy.logwarn('Missing correct mirror structure, requesting...')
104  rospy.sleep(0.2) # no clean wayacquire to wait for publisher to be ready...
105  self._pub.publish('flexbe/request_mirror_structure', Int32(msg.behavior_id))
106  self._active_id = msg.behavior_id
107  return
108 
109  self._execute_mirror()
110 
111  def _stop_mirror(self, msg):
112  with self._sync_lock:
113  self._stopping = True
114  if self._sm is not None and self._running:
115  if msg is not None and msg.code == BEStatus.FINISHED:
116  rospy.loginfo('Onboard behavior finished successfully.')
117  self._pub.publish('flexbe/behavior_update', String())
118  elif msg is not None and msg.code == BEStatus.SWITCHING:
119  self._starting_path = None
120  rospy.loginfo('Onboard performing behavior switch.')
121  elif msg is not None and msg.code == BEStatus.READY:
122  rospy.loginfo('Onboard engine just started, stopping currently running mirror.')
123  self._pub.publish('flexbe/behavior_update', String())
124  elif msg is not None:
125  rospy.logwarn('Onboard behavior failed!')
126  self._pub.publish('flexbe/behavior_update', String())
127 
128  PreemptableState.preempt = True
129  rate = rospy.Rate(10)
130  while self._running:
131  rate.sleep()
132  else:
133  rospy.loginfo('No onboard behavior is active.')
134 
135  self._active_id = 0
136  self._sm = None
137  self._current_struct = None
138  self._sub.remove_last_msg(self._outcome_topic, clear_buffer=True)
139 
140  if msg is not None and msg.code != BEStatus.SWITCHING:
141  rospy.loginfo('\033[92m--- Behavior Mirror ready! ---\033[0m')
142 
143  self._stopping = False
144 
145  def _sync_callback(self, msg):
146  if msg.behavior_id == self._active_id:
147  thread = threading.Thread(target=self._restart_mirror, args=[msg])
148  thread.daemon = True
149  thread.start()
150  else:
151  rospy.logerr('Cannot synchronize! Different behavior is running onboard, please stop execution!')
152  thread = threading.Thread(target=self._stop_mirror, args=[None])
153  thread.daemon = True
154  thread.start()
155 
156  def _restart_mirror(self, msg):
157  with self._sync_lock:
158  rospy.loginfo('Restarting mirror for synchronization...')
159  self._sub.remove_last_msg(self._outcome_topic, clear_buffer=True)
160  if self._sm is not None and self._running:
161  PreemptableState.preempt = True
162  rate = rospy.Rate(10)
163  while self._running:
164  rate.sleep()
165  self._sm = None
166  if msg.current_state_checksum in self._state_checksums:
167  current_state_path = self._state_checksums[msg.current_state_checksum]
168  self._starting_path = "/" + current_state_path[1:].replace("/", "_mirror/") + "_mirror"
169  rospy.loginfo('Current state: %s' % current_state_path)
170  try:
172  rospy.loginfo('Mirror built.')
173  except (AttributeError, RuntimeError):
174  rospy.loginfo('Stopping synchronization because behavior has stopped.')
175 
176  self._execute_mirror()
177 
178  def _execute_mirror(self):
179  self._running = True
180 
181  rospy.loginfo("Executing mirror...")
182  if self._starting_path is not None:
183  LockableStateMachine.path_for_switch = self._starting_path
184  rospy.loginfo("Starting mirror in state " + self._starting_path)
185  self._starting_path = None
186 
187  result = 'preempted'
188  try:
189  result = self._sm.spin()
190  except Exception as e:
191  rospy.loginfo('Caught exception on preempt:\n%s' % str(e))
192  result = 'preempted'
193 
194  self._running = False
195 
196  rospy.loginfo('Mirror finished with result ' + result)
197 
198  def _mirror_state_machine(self, msg):
199  self._current_struct = msg
200  self._state_checksums = dict()
201  root = None
202  for con_msg in msg.containers:
203  if con_msg.path.find('/') == -1:
204  root = con_msg.path
205  break
206  self._add_node(msg, root)
207  # calculate checksums of all states
208  for con_msg in msg.containers:
209  if con_msg.path.find('/') != -1:
210  self._state_checksums[zlib.adler32(con_msg.path.encode()) & 0x7fffffff] = con_msg.path
211 
212  def _add_node(self, msg, path):
213  container = None
214  for con_msg in msg.containers:
215  if con_msg.path == path:
216  container = con_msg
217  break
218 
219  transitions = None
220  if container.transitions is not None:
221  transitions = {}
222  for i in range(len(container.transitions)):
223  transitions[container.outcomes[i]] = container.transitions[i] + '_mirror'
224 
225  path_frags = path.split('/')
226  container_name = path_frags[len(path_frags)-1]
227  if len(container.children) > 0:
228  sm_outcomes = []
229  for outcome in container.outcomes:
230  sm_outcomes.append(outcome + '_mirror')
231  sm = PreemptableStateMachine(outcomes=sm_outcomes)
232  with sm:
233  for child in container.children:
234  self._add_node(msg, path+'/'+child)
235  if len(transitions) > 0:
236  container_transitions = {}
237  for i in range(len(container.transitions)):
238  container_transitions[sm_outcomes[i]] = transitions[container.outcomes[i]]
239  PreemptableStateMachine.add(container_name + '_mirror', sm, transitions=container_transitions)
240  else:
241  self._sm = sm
242  else:
243  PreemptableStateMachine.add(container_name + '_mirror',
244  MirrorState(container_name, path, container.outcomes, container.autonomy),
245  transitions=transitions)
246 
247  def _preempt_callback(self, msg):
248  if self._sm is not None:
249  rospy.logwarn('Explicit preempting is currently ignored, mirror should be preempted by onboard behavior.')
250  else:
251  rospy.logwarn('Could not preempt mirror because it seems not to be running!')


flexbe_mirror
Author(s): Philipp Schillinger
autogenerated on Sun Dec 13 2020 04:01:42