flexbe_onboard.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import rospy
3 import os
4 import sys
5 import inspect
6 import tempfile
7 import threading
8 import time
9 import zlib
10 import contextlib
11 from ast import literal_eval as cast
12 
13 from flexbe_core import Logger, BehaviorLibrary
14 from flexbe_core.proxy import ProxyPublisher, ProxySubscriberCached
15 
16 from flexbe_msgs.msg import BehaviorSelection, BEStatus, CommandFeedback
17 from std_msgs.msg import Empty
18 
19 
20 class FlexbeOnboard(object):
21  """
22  Controls the execution of robot behaviors.
23  """
24 
25  def __init__(self):
26  self.be = None
27  Logger.initialize()
28  self._tracked_imports = list()
29  # prepare temp folder
30  self._tmp_folder = tempfile.mkdtemp()
31  sys.path.append(self._tmp_folder)
32  rospy.on_shutdown(self._cleanup_tempdir)
33 
34  # prepare manifest folder access
35  self._behavior_lib = BehaviorLibrary()
36 
37  # prepare communication
38  self.status_topic = 'flexbe/status'
39  self.feedback_topic = 'flexbe/command_feedback'
40  self._pub = ProxyPublisher({
41  self.feedback_topic: CommandFeedback,
42  'flexbe/heartbeat': Empty
43  })
44  self._pub.createPublisher(self.status_topic, BEStatus, _latch=True)
45  self._execute_heartbeat()
46 
47  # listen for new behavior to start
48  self._enable_clear_imports = rospy.get_param('~enable_clear_imports', False)
49  self._running = False
50  self._run_lock = threading.Lock()
51  self._switching = False
52  self._switch_lock = threading.Lock()
53  self._sub = ProxySubscriberCached()
54  self._sub.subscribe('flexbe/start_behavior', BehaviorSelection, self._behavior_callback)
55 
56  rospy.sleep(0.5) # wait for publishers etc to really be set up
57  self._pub.publish(self.status_topic, BEStatus(code=BEStatus.READY))
58  rospy.loginfo('\033[92m--- Behavior Engine ready! ---\033[0m')
59 
60  def _behavior_callback(self, msg):
61  thread = threading.Thread(target=self._behavior_execution, args=[msg])
62  thread.daemon = True
63  thread.start()
64 
65  # =================== #
66  # Main execution loop #
67  # ------------------- #
68 
69  def _behavior_execution(self, msg):
70  # sending a behavior while one is already running is considered as switching
71  if self._running:
72  Logger.loginfo('--> Initiating behavior switch...')
73  self._pub.publish(self.feedback_topic, CommandFeedback(command="switch", args=['received']))
74  else:
75  Logger.loginfo('--> Starting new behavior...')
76 
77  # construct the behavior that should be executed
78  be = self._prepare_behavior(msg)
79  if be is None:
80  Logger.logerr('Dropped behavior start request because preparation failed.')
81  if self._running:
82  self._pub.publish(self.feedback_topic, CommandFeedback(command="switch", args=['failed']))
83  else:
84  rospy.loginfo('\033[92m--- Behavior Engine ready! ---\033[0m')
85  return
86 
87  # perform the behavior switch if required
88  with self._switch_lock:
89  self._switching = True
90  if self._running:
91  self._pub.publish(self.feedback_topic, CommandFeedback(command="switch", args=['start']))
92  # ensure that switching is possible
93  if not self._is_switchable(be):
94  Logger.logerr('Dropped behavior start request because switching is not possible.')
95  self._pub.publish(self.feedback_topic, CommandFeedback(command="switch", args=['not_switchable']))
96  return
97  # wait if running behavior is currently starting or stopping
98  rate = rospy.Rate(100)
99  while not rospy.is_shutdown():
100  active_state = self.be.get_current_state()
101  if active_state is not None or not self._running:
102  break
103  rate.sleep()
104  # extract the active state if any
105  if active_state is not None:
106  rospy.loginfo("Current state %s is kept active.", active_state.name)
107  try:
108  be.prepare_for_switch(active_state)
109  self._pub.publish(self.feedback_topic, CommandFeedback(command="switch", args=['prepared']))
110  except Exception as e:
111  Logger.logerr('Failed to prepare behavior switch:\n%s' % str(e))
112  self._pub.publish(self.feedback_topic, CommandFeedback(command="switch", args=['failed']))
113  return
114  # stop the rest
115  rospy.loginfo('Preempting current behavior version...')
116  self.be.preempt()
117 
118  # execute the behavior
119  with self._run_lock:
120  self._switching = False
121  self.be = be
122  self._running = True
123 
124  result = None
125  try:
126  rospy.loginfo('Behavior ready, execution starts now.')
127  rospy.loginfo('[%s : %s]', be.name, msg.behavior_checksum)
128  self.be.confirm()
129  args = [self.be.requested_state_path] if self.be.requested_state_path is not None else []
130  self._pub.publish(self.status_topic,
131  BEStatus(behavior_id=self.be.id, code=BEStatus.STARTED, args=args))
132  result = self.be.execute()
133  if self._switching:
134  self._pub.publish(self.status_topic,
135  BEStatus(behavior_id=self.be.id, code=BEStatus.SWITCHING))
136  else:
137  self._pub.publish(self.status_topic,
138  BEStatus(behavior_id=self.be.id, code=BEStatus.FINISHED, args=[str(result)]))
139  except Exception as e:
140  self._pub.publish(self.status_topic, BEStatus(behavior_id=msg.behavior_checksum, code=BEStatus.FAILED))
141  Logger.logerr('Behavior execution failed!\n%s' % str(e))
142  import traceback
143  Logger.loginfo(traceback.format_exc())
144  result = result or "exception" # only set result if not executed
145 
146  # done, remove left-overs like the temporary behavior file
147  try:
148  # do not clear imports for now, not working correctly (e.g., flexbe/flexbe_app#66)
149  # only if specifically enabled
150  if not self._switching and self._enable_clear_imports:
151  self._clear_imports()
152  self._cleanup_behavior(msg.behavior_checksum)
153  except Exception as e:
154  rospy.logerr('Failed to clean up behavior:\n%s' % str(e))
155 
156  if not self._switching:
157  Logger.loginfo('Behavior execution finished with result %s.', str(result))
158  rospy.loginfo('\033[92m--- Behavior Engine ready! ---\033[0m')
159  self._running = False
160  self.be = None
161 
162  # ==================================== #
163  # Preparation of new behavior requests #
164  # ------------------------------------ #
165 
166  def _prepare_behavior(self, msg):
167  # get sourcecode from ros package
168  try:
169  behavior = self._behavior_lib.get_behavior(msg.behavior_id)
170  if behavior is None:
171  raise ValueError(msg.behavior_id)
172  be_filepath = self._behavior_lib.get_sourcecode_filepath(msg.behavior_id, add_tmp=True)
173  if os.path.isfile(be_filepath):
174  be_file = open(be_filepath, "r")
175  rospy.logwarn("Found a tmp version of the referred behavior! Assuming local test run.")
176  else:
177  be_filepath = self._behavior_lib.get_sourcecode_filepath(msg.behavior_id)
178  be_file = open(be_filepath, "r")
179  try:
180  be_content = be_file.read()
181  finally:
182  be_file.close()
183  except Exception as e:
184  Logger.logerr('Failed to retrieve behavior from library:\n%s' % str(e))
185  self._pub.publish(self.status_topic, BEStatus(behavior_id=msg.behavior_checksum, code=BEStatus.ERROR))
186  return
187 
188  # apply modifications if any
189  try:
190  file_content = ""
191  last_index = 0
192  for mod in msg.modifications:
193  file_content += be_content[last_index:mod.index_begin] + mod.new_content
194  last_index = mod.index_end
195  file_content += be_content[last_index:]
196  if zlib.adler32(file_content.encode()) & 0x7fffffff != msg.behavior_checksum:
197  mismatch_msg = ("Checksum mismatch of behavior versions! \n"
198  "Attempted to load behavior: %s\n"
199  "Make sure that all computers are on the same version a.\n"
200  "Also try: rosrun flexbe_widget clear_cache" % str(be_filepath))
201  raise Exception(mismatch_msg)
202  else:
203  rospy.loginfo("Successfully applied %d modifications." % len(msg.modifications))
204  except Exception as e:
205  Logger.logerr('Failed to apply behavior modifications:\n%s' % str(e))
206  self._pub.publish(self.status_topic, BEStatus(behavior_id=msg.behavior_checksum, code=BEStatus.ERROR))
207  return
208 
209  # create temp file for behavior class
210  try:
211  file_path = os.path.join(self._tmp_folder, 'tmp_%d.py' % msg.behavior_checksum)
212  with open(file_path, "w") as sc_file:
213  sc_file.write(file_content)
214  except Exception as e:
215  Logger.logerr('Failed to create temporary file for behavior class:\n%s' % str(e))
216  self._pub.publish(self.status_topic, BEStatus(behavior_id=msg.behavior_checksum, code=BEStatus.ERROR))
217  return
218 
219  # import temp class file and initialize behavior
220  try:
221  with self._track_imports():
222  package = __import__("tmp_%d" % msg.behavior_checksum, fromlist=["tmp_%d" % msg.behavior_checksum])
223  clsmembers = inspect.getmembers(package, lambda member: (inspect.isclass(member) and
224  member.__module__ == package.__name__))
225  beclass = clsmembers[0][1]
226  be = beclass()
227  rospy.loginfo('Behavior ' + be.name + ' created.')
228  except Exception as e:
229  Logger.logerr('Exception caught in behavior definition:\n%s\n'
230  'See onboard terminal for more information.' % str(e))
231  import traceback
232  traceback.print_exc()
233  self._pub.publish(self.status_topic, BEStatus(behavior_id=msg.behavior_checksum, code=BEStatus.ERROR))
234  if self._enable_clear_imports:
235  self._clear_imports()
236  return
237 
238  # initialize behavior parameters
239  if len(msg.arg_keys) > 0:
240  rospy.loginfo('The following parameters will be used:')
241  try:
242  for i in range(len(msg.arg_keys)):
243  # action call has empty string as default, not a valid param key
244  if msg.arg_keys[i] == '':
245  continue
246  found = be.set_parameter(msg.arg_keys[i], msg.arg_values[i])
247  if found:
248  name_split = msg.arg_keys[i].rsplit('/', 1)
249  behavior = name_split[0] if len(name_split) == 2 else ''
250  key = name_split[-1]
251  suffix = ' (' + behavior + ')' if behavior != '' else ''
252  rospy.loginfo(key + ' = ' + msg.arg_values[i] + suffix)
253  else:
254  rospy.logwarn('Parameter ' + msg.arg_keys[i] + ' (set to ' + msg.arg_values[i] + ') not defined')
255  except Exception as e:
256  Logger.logerr('Failed to initialize parameters:\n%s' % str(e))
257  self._pub.publish(self.status_topic, BEStatus(behavior_id=msg.behavior_checksum, code=BEStatus.ERROR))
258  return
259 
260  # build state machine
261  try:
262  be.set_up(id=msg.behavior_checksum, autonomy_level=msg.autonomy_level, debug=False)
263  be.prepare_for_execution(self._convert_input_data(msg.input_keys, msg.input_values))
264  rospy.loginfo('State machine built.')
265  except Exception as e:
266  Logger.logerr('Behavior construction failed!\n%s\n'
267  'See onboard terminal for more information.' % str(e))
268  import traceback
269  traceback.print_exc()
270  self._pub.publish(self.status_topic, BEStatus(behavior_id=msg.behavior_checksum, code=BEStatus.ERROR))
271  if self._enable_clear_imports:
272  self._clear_imports()
273  return
274 
275  return be
276 
277  # ================ #
278  # Helper functions #
279  # ---------------- #
280 
281  def _is_switchable(self, be):
282  if self.be.name != be.name:
283  Logger.logerr('Unable to switch behavior, names do not match:\ncurrent: %s <--> new: %s' %
284  (self.be.name, be.name))
285  return False
286  # locked inside
287  # locked state exists in new behavior
288  # ok, can switch
289  return True
290 
291  def _cleanup_behavior(self, behavior_checksum):
292  file_path = os.path.join(self._tmp_folder, 'tmp_%d.pyc' % behavior_checksum)
293  try:
294  os.remove(file_path)
295  except OSError:
296  pass
297  try:
298  os.remove(file_path + 'c')
299  except OSError:
300  pass
301 
302  def _clear_imports(self):
303  for module in self._tracked_imports:
304  if module in sys.modules:
305  del sys.modules[module]
306  self._tracked_imports = list()
307 
308  def _cleanup_tempdir(self):
309  try:
310  os.remove(self._tmp_folder)
311  except OSError:
312  pass
313 
314  def _convert_input_data(self, keys, values):
315  result = dict()
316  for k, v in zip(keys, values):
317  # action call has empty string as default, not a valid input key
318  if k == '':
319  continue
320  try:
321  result[k] = self._convert_dict(cast(v))
322  except ValueError:
323  # unquoted strings will raise a ValueError, so leave it as string in this case
324  result[k] = str(v)
325  except SyntaxError as se:
326  Logger.loginfo('Unable to parse input value for key "%s", assuming string:\n%s\n%s' %
327  (k, str(v), str(se)))
328  result[k] = str(v)
329  return result
330 
332  thread = threading.Thread(target=self._heartbeat_worker)
333  thread.daemon = True
334  thread.start()
335 
336  def _heartbeat_worker(self):
337  while True:
338  self._pub.publish('flexbe/heartbeat', Empty())
339  time.sleep(1)
340 
341  def _convert_dict(self, o):
342  if isinstance(o, list):
343  return [self._convert_dict(e) for e in o]
344  elif isinstance(o, dict):
345  return self._attr_dict((k, self._convert_dict(v)) for k, v in list(o.items()))
346  else:
347  return o
348 
349  class _attr_dict(dict):
350  __getattr__ = dict.__getitem__
351 
352  @contextlib.contextmanager
353  def _track_imports(self):
354  previous_modules = set(sys.modules.keys())
355  try:
356  yield
357  finally:
358  self._tracked_imports.extend(set(sys.modules.keys()) - previous_modules)
def _cleanup_behavior(self, behavior_checksum)


flexbe_onboard
Author(s): Philipp Schillinger
autogenerated on Sun Dec 13 2020 04:01:43