flexbe_onboard.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import rospy
3 import os
4 import sys
5 import inspect
6 import tempfile
7 import threading
8 import time
9 import zlib
10 import contextlib
11 from ast import literal_eval as cast
12 
13 from flexbe_core import Logger, BehaviorLibrary
14 from flexbe_core.proxy import ProxyPublisher, ProxySubscriberCached
15 from flexbe_core.core.state_machine import StateMachine
16 
17 from flexbe_msgs.msg import BehaviorSelection, BEStatus, CommandFeedback, UserdataInfo
18 from flexbe_msgs.srv import GetUserdata, GetUserdataRequest, GetUserdataResponse
19 from std_msgs.msg import Empty
20 
21 
22 class FlexbeOnboard(object):
23  """
24  Controls the execution of robot behaviors.
25  """
26 
27  def __init__(self):
28  self.be = None
29  Logger.initialize()
30  self._tracked_imports = list()
31  # prepare temp folder
32  self._tmp_folder = tempfile.mkdtemp()
33  sys.path.append(self._tmp_folder)
34  rospy.on_shutdown(self._cleanup_tempdir)
35 
36  # prepare manifest folder access
37  self._behavior_lib = BehaviorLibrary()
38 
39  # prepare communication
40  self.status_topic = 'flexbe/status'
41  self.feedback_topic = 'flexbe/command_feedback'
42  self._pub = ProxyPublisher({
43  self.feedback_topic: CommandFeedback,
44  'flexbe/heartbeat': Empty
45  })
46  self._pub.createPublisher(self.status_topic, BEStatus, _latch=True)
47  self._execute_heartbeat()
48 
49  # listen for new behavior to start
50  self._enable_clear_imports = rospy.get_param('~enable_clear_imports', False)
51  self._running = False
52  self._run_lock = threading.Lock()
53  self._switching = False
54  self._switch_lock = threading.Lock()
55  self._sub = ProxySubscriberCached()
56  self._sub.subscribe('flexbe/start_behavior', BehaviorSelection, self._behavior_callback)
57 
58  self._userdata_service = rospy.Service('flexbe/get_userdata', GetUserdata, self._userdata_callback)
59 
60  rospy.sleep(0.5) # wait for publishers etc to really be set up
61  self._pub.publish(self.status_topic, BEStatus(code=BEStatus.READY))
62  rospy.loginfo('\033[92m--- Behavior Engine ready! ---\033[0m')
63 
64  def _behavior_callback(self, msg):
65  thread = threading.Thread(target=self._behavior_execution, args=[msg])
66  thread.daemon = True
67  thread.start()
68 
69  # =================== #
70  # Main execution loop #
71  # ------------------- #
72 
73  def _behavior_execution(self, msg):
74  # sending a behavior while one is already running is considered as switching
75  if self._running:
76  Logger.loginfo('--> Initiating behavior switch...')
77  self._pub.publish(self.feedback_topic, CommandFeedback(command="switch", args=['received']))
78  else:
79  Logger.loginfo('--> Starting new behavior...')
80 
81  # construct the behavior that should be executed
82  be = self._prepare_behavior(msg)
83  if be is None:
84  Logger.logerr('Dropped behavior start request because preparation failed.')
85  if self._running:
86  self._pub.publish(self.feedback_topic, CommandFeedback(command="switch", args=['failed']))
87  else:
88  rospy.loginfo('\033[92m--- Behavior Engine ready! ---\033[0m')
89  return
90 
91  # perform the behavior switch if required
92  with self._switch_lock:
93  self._switching = True
94  if self._running:
95  self._pub.publish(self.feedback_topic, CommandFeedback(command="switch", args=['start']))
96  # ensure that switching is possible
97  if not self._is_switchable(be):
98  Logger.logerr('Dropped behavior start request because switching is not possible.')
99  self._pub.publish(self.feedback_topic, CommandFeedback(command="switch", args=['not_switchable']))
100  return
101  # wait if running behavior is currently starting or stopping
102  rate = rospy.Rate(100)
103  while not rospy.is_shutdown():
104  active_state = self.be.get_current_state()
105  if active_state is not None or not self._running:
106  break
107  rate.sleep()
108  # extract the active state if any
109  if active_state is not None:
110  rospy.loginfo("Current state %s is kept active.", active_state.name)
111  try:
112  be.prepare_for_switch(active_state)
113  self._pub.publish(self.feedback_topic, CommandFeedback(command="switch", args=['prepared']))
114  except Exception as e:
115  Logger.logerr('Failed to prepare behavior switch:\n%s' % str(e))
116  self._pub.publish(self.feedback_topic, CommandFeedback(command="switch", args=['failed']))
117  return
118  # stop the rest
119  rospy.loginfo('Preempting current behavior version...')
120  self.be.preempt()
121 
122  # execute the behavior
123  with self._run_lock:
124  self._switching = False
125  self.be = be
126  self._running = True
127 
128  result = None
129  try:
130  rospy.loginfo('Behavior ready, execution starts now.')
131  rospy.loginfo('[%s : %s]', be.name, msg.behavior_checksum)
132  self.be.confirm()
133  args = [self.be.requested_state_path] if self.be.requested_state_path is not None else []
134  self._pub.publish(self.status_topic,
135  BEStatus(behavior_id=self.be.id, code=BEStatus.STARTED, args=args))
136  result = self.be.execute()
137  if self._switching:
138  self._pub.publish(self.status_topic,
139  BEStatus(behavior_id=self.be.id, code=BEStatus.SWITCHING))
140  else:
141  self._pub.publish(self.status_topic,
142  BEStatus(behavior_id=self.be.id, code=BEStatus.FINISHED, args=[str(result)]))
143  except Exception as e:
144  self._pub.publish(self.status_topic, BEStatus(behavior_id=msg.behavior_checksum, code=BEStatus.FAILED))
145  Logger.logerr('Behavior execution failed!\n%s' % str(e))
146  import traceback
147  Logger.loginfo(traceback.format_exc())
148  result = result or "exception" # only set result if not executed
149 
150  # done, remove left-overs like the temporary behavior file
151  try:
152  # do not clear imports for now, not working correctly (e.g., flexbe/flexbe_app#66)
153  # only if specifically enabled
154  if not self._switching and self._enable_clear_imports:
155  self._clear_imports()
156  self._cleanup_behavior(msg.behavior_checksum)
157  except Exception as e:
158  rospy.logerr('Failed to clean up behavior:\n%s' % str(e))
159 
160  if not self._switching:
161  Logger.loginfo('Behavior execution finished with result %s.', str(result))
162  rospy.loginfo('\033[92m--- Behavior Engine ready! ---\033[0m')
163  self._running = False
164  self.be = None
165 
166 
167  def _userdata_callback(self, request):
168  response = GetUserdataResponse()
169  userdata = []
170  if self.be and self.be._state_machine:
171  # get userdata from top-level behavior
172  if self.be._state_machine._userdata:
173  for key, data in self.be._state_machine._userdata._data.items():
174  # add userdata if fits to the requested key (get all userdata for empty string)
175  if (request.userdata_key == "" or request.userdata_key == key):
176  userdata.append(UserdataInfo(state=self.be._state_machine._name,
177  key=str(key),
178  type=type(data).__name__,
179  data=str(data)))
180  # get userdata from sub-behaviors
181  userdata = self._get_userdata_from_whole_sm(self.be._state_machine, userdata, request.userdata_key, str(self.be._state_machine._name) + "/")
182 
183  if (len(userdata) > 0):
184  # also print in terminal (better readable for complex message types)
185  printable_userdata = ""
186  for ud in userdata:
187  printable_userdata += "\t{}:{}\n".format(ud.key, ud.data)
188  Logger.loginfo("Current userdata: \n{}".format(printable_userdata))
189  response.success = True
190  else:
191  response.success = False
192  response.message = "Found {} occurences of '{}'".format(len(userdata), request.userdata_key)
193  response.userdata = userdata
194  else:
195  response.success = False
196  response.message = "no state_machine running"
197  return response
198 
199  # ==================================== #
200  # Preparation of new behavior requests #
201  # ------------------------------------ #
202 
203  def _prepare_behavior(self, msg):
204  # get sourcecode from ros package
205  try:
206  behavior = self._behavior_lib.get_behavior(msg.behavior_id)
207  if behavior is None:
208  raise ValueError(msg.behavior_id)
209  be_filepath = self._behavior_lib.get_sourcecode_filepath(msg.behavior_id, add_tmp=True)
210  if os.path.isfile(be_filepath):
211  be_file = open(be_filepath, "r")
212  rospy.logwarn("Found a tmp version of the referred behavior! Assuming local test run.")
213  else:
214  be_filepath = self._behavior_lib.get_sourcecode_filepath(msg.behavior_id)
215  be_file = open(be_filepath, "r")
216  try:
217  be_content = be_file.read()
218  finally:
219  be_file.close()
220  except Exception as e:
221  Logger.logerr('Failed to retrieve behavior from library:\n%s' % str(e))
222  self._pub.publish(self.status_topic, BEStatus(behavior_id=msg.behavior_checksum, code=BEStatus.ERROR))
223  return
224 
225  # apply modifications if any
226  try:
227  file_content = ""
228  last_index = 0
229  for mod in msg.modifications:
230  file_content += be_content[last_index:mod.index_begin] + mod.new_content
231  last_index = mod.index_end
232  file_content += be_content[last_index:]
233  if zlib.adler32(file_content.encode()) & 0x7fffffff != msg.behavior_checksum:
234  mismatch_msg = ("Checksum mismatch of behavior versions! \n"
235  "Attempted to load behavior: %s\n"
236  "Make sure that all computers are on the same version a.\n"
237  "Also try: rosrun flexbe_widget clear_cache" % str(be_filepath))
238  raise Exception(mismatch_msg)
239  else:
240  rospy.loginfo("Successfully applied %d modifications." % len(msg.modifications))
241  except Exception as e:
242  Logger.logerr('Failed to apply behavior modifications:\n%s' % str(e))
243  self._pub.publish(self.status_topic, BEStatus(behavior_id=msg.behavior_checksum, code=BEStatus.ERROR))
244  return
245 
246  # create temp file for behavior class
247  try:
248  file_path = os.path.join(self._tmp_folder, 'tmp_%d.py' % msg.behavior_checksum)
249  with open(file_path, "w") as sc_file:
250  sc_file.write(file_content)
251  except Exception as e:
252  Logger.logerr('Failed to create temporary file for behavior class:\n%s' % str(e))
253  self._pub.publish(self.status_topic, BEStatus(behavior_id=msg.behavior_checksum, code=BEStatus.ERROR))
254  return
255 
256  # import temp class file and initialize behavior
257  try:
258  with self._track_imports():
259  package = __import__("tmp_%d" % msg.behavior_checksum, fromlist=["tmp_%d" % msg.behavior_checksum])
260  clsmembers = inspect.getmembers(package, lambda member: (inspect.isclass(member) and
261  member.__module__ == package.__name__))
262  beclass = clsmembers[0][1]
263  be = beclass()
264  rospy.loginfo('Behavior ' + be.name + ' created.')
265  except Exception as e:
266  Logger.logerr('Exception caught in behavior definition:\n%s\n'
267  'See onboard terminal for more information.' % str(e))
268  import traceback
269  traceback.print_exc()
270  self._pub.publish(self.status_topic, BEStatus(behavior_id=msg.behavior_checksum, code=BEStatus.ERROR))
271  if self._enable_clear_imports:
272  self._clear_imports()
273  return
274 
275  # initialize behavior parameters
276  if len(msg.arg_keys) > 0:
277  rospy.loginfo('The following parameters will be used:')
278  try:
279  for i in range(len(msg.arg_keys)):
280  # action call has empty string as default, not a valid param key
281  if msg.arg_keys[i] == '':
282  continue
283  found = be.set_parameter(msg.arg_keys[i], msg.arg_values[i])
284  if found:
285  name_split = msg.arg_keys[i].rsplit('/', 1)
286  behavior = name_split[0] if len(name_split) == 2 else ''
287  key = name_split[-1]
288  suffix = ' (' + behavior + ')' if behavior != '' else ''
289  rospy.loginfo(key + ' = ' + msg.arg_values[i] + suffix)
290  else:
291  rospy.logwarn('Parameter ' + msg.arg_keys[i] + ' (set to ' + msg.arg_values[i] + ') not defined')
292  except Exception as e:
293  Logger.logerr('Failed to initialize parameters:\n%s' % str(e))
294  self._pub.publish(self.status_topic, BEStatus(behavior_id=msg.behavior_checksum, code=BEStatus.ERROR))
295  return
296 
297  # build state machine
298  try:
299  be.set_up(id=msg.behavior_checksum, autonomy_level=msg.autonomy_level, debug=False)
300  be.prepare_for_execution(self._convert_input_data(msg.input_keys, msg.input_values))
301  rospy.loginfo('State machine built.')
302  except Exception as e:
303  Logger.logerr('Behavior construction failed!\n%s\n'
304  'See onboard terminal for more information.' % str(e))
305  import traceback
306  traceback.print_exc()
307  self._pub.publish(self.status_topic, BEStatus(behavior_id=msg.behavior_checksum, code=BEStatus.ERROR))
308  if self._enable_clear_imports:
309  self._clear_imports()
310  return
311 
312  return be
313 
314  # ================ #
315  # Helper functions #
316  # ---------------- #
317 
318  def _is_switchable(self, be):
319  if self.be.name != be.name:
320  Logger.logerr('Unable to switch behavior, names do not match:\ncurrent: %s <--> new: %s' %
321  (self.be.name, be.name))
322  return False
323  # locked inside
324  # locked state exists in new behavior
325  # ok, can switch
326  return True
327 
328  def _cleanup_behavior(self, behavior_checksum):
329  file_path = os.path.join(self._tmp_folder, 'tmp_%d.pyc' % behavior_checksum)
330  try:
331  os.remove(file_path)
332  except OSError:
333  pass
334  try:
335  os.remove(file_path + 'c')
336  except OSError:
337  pass
338 
339  def _clear_imports(self):
340  for module in self._tracked_imports:
341  if module in sys.modules:
342  del sys.modules[module]
343  self._tracked_imports = list()
344 
345  def _cleanup_tempdir(self):
346  try:
347  os.remove(self._tmp_folder)
348  except OSError:
349  pass
350 
351  def _convert_input_data(self, keys, values):
352  result = dict()
353  for k, v in zip(keys, values):
354  # action call has empty string as default, not a valid input key
355  if k == '':
356  continue
357  try:
358  result[k] = self._convert_dict(cast(v))
359  except ValueError:
360  # unquoted strings will raise a ValueError, so leave it as string in this case
361  result[k] = str(v)
362  except SyntaxError as se:
363  Logger.loginfo('Unable to parse input value for key "%s", assuming string:\n%s\n%s' %
364  (k, str(v), str(se)))
365  result[k] = str(v)
366  return result
367 
369  thread = threading.Thread(target=self._heartbeat_worker)
370  thread.daemon = True
371  thread.start()
372 
373  def _heartbeat_worker(self):
374  while True:
375  self._pub.publish('flexbe/heartbeat', Empty())
376  time.sleep(1)
377 
378  def _convert_dict(self, o):
379  if isinstance(o, list):
380  return [self._convert_dict(e) for e in o]
381  elif isinstance(o, dict):
382  return self._attr_dict((k, self._convert_dict(v)) for k, v in list(o.items()))
383  else:
384  return o
385 
386  def _get_userdata_from_whole_sm(self, state_machine, userdata, userdata_key, path):
387  # iterate recursively through all subbehaviors
388  for subbehavior in state_machine._states:
389  # check if userdata available
390  if isinstance(subbehavior, StateMachine):
391  if subbehavior._userdata:
392  for key, data in subbehavior._userdata._data.items():
393  # add userdata if fits to the requested key (get all userdata for empty string)
394  if (userdata_key == "" or userdata_key == key):
395  userdata.append(UserdataInfo(state=path + subbehavior.name + "/",
396  key=str(key),
397  type=type(data).__name__,
398  data=str(data)))
399  self._get_userdata_from_whole_sm(subbehavior, userdata, userdata_key, path + subbehavior.name + "/")
400  return userdata
401 
402  class _attr_dict(dict):
403  __getattr__ = dict.__getitem__
404 
405  @contextlib.contextmanager
406  def _track_imports(self):
407  previous_modules = set(sys.modules.keys())
408  try:
409  yield
410  finally:
411  self._tracked_imports.extend(set(sys.modules.keys()) - previous_modules)
flexbe_onboard.flexbe_onboard.FlexbeOnboard._switching
_switching
Definition: flexbe_onboard.py:53
flexbe_onboard.flexbe_onboard.FlexbeOnboard._run_lock
_run_lock
Definition: flexbe_onboard.py:52
flexbe_onboard.flexbe_onboard.FlexbeOnboard.status_topic
status_topic
Definition: flexbe_onboard.py:40
flexbe_onboard.flexbe_onboard.FlexbeOnboard.feedback_topic
feedback_topic
Definition: flexbe_onboard.py:41
flexbe_onboard.flexbe_onboard.FlexbeOnboard._running
_running
Definition: flexbe_onboard.py:51
flexbe_onboard.flexbe_onboard.FlexbeOnboard._userdata_service
_userdata_service
Definition: flexbe_onboard.py:58
flexbe_onboard.flexbe_onboard.FlexbeOnboard._enable_clear_imports
_enable_clear_imports
Definition: flexbe_onboard.py:50
flexbe_onboard.flexbe_onboard.FlexbeOnboard._prepare_behavior
def _prepare_behavior(self, msg)
Definition: flexbe_onboard.py:203
flexbe_onboard.flexbe_onboard.FlexbeOnboard._execute_heartbeat
def _execute_heartbeat(self)
Definition: flexbe_onboard.py:368
flexbe_onboard.flexbe_onboard.FlexbeOnboard._tmp_folder
_tmp_folder
Definition: flexbe_onboard.py:32
flexbe_onboard.flexbe_onboard.FlexbeOnboard._track_imports
def _track_imports(self)
Definition: flexbe_onboard.py:406
flexbe_onboard.flexbe_onboard.FlexbeOnboard._attr_dict
Definition: flexbe_onboard.py:402
flexbe_onboard.flexbe_onboard.FlexbeOnboard._sub
_sub
Definition: flexbe_onboard.py:55
flexbe_core::proxy
flexbe_onboard.flexbe_onboard.FlexbeOnboard._heartbeat_worker
def _heartbeat_worker(self)
Definition: flexbe_onboard.py:373
flexbe_onboard.flexbe_onboard.FlexbeOnboard._behavior_callback
def _behavior_callback(self, msg)
Definition: flexbe_onboard.py:64
flexbe_onboard.flexbe_onboard.FlexbeOnboard
Definition: flexbe_onboard.py:22
flexbe_onboard.flexbe_onboard.FlexbeOnboard._behavior_execution
def _behavior_execution(self, msg)
Definition: flexbe_onboard.py:73
flexbe_onboard.flexbe_onboard.FlexbeOnboard._pub
_pub
Definition: flexbe_onboard.py:42
flexbe_onboard.flexbe_onboard.FlexbeOnboard._cleanup_tempdir
def _cleanup_tempdir(self)
Definition: flexbe_onboard.py:345
flexbe_onboard.flexbe_onboard.FlexbeOnboard._clear_imports
def _clear_imports(self)
Definition: flexbe_onboard.py:339
flexbe_onboard.flexbe_onboard.FlexbeOnboard._tracked_imports
_tracked_imports
Definition: flexbe_onboard.py:30
flexbe_onboard.flexbe_onboard.FlexbeOnboard._get_userdata_from_whole_sm
def _get_userdata_from_whole_sm(self, state_machine, userdata, userdata_key, path)
Definition: flexbe_onboard.py:386
flexbe_onboard.flexbe_onboard.FlexbeOnboard._behavior_lib
_behavior_lib
Definition: flexbe_onboard.py:37
flexbe_onboard.flexbe_onboard.FlexbeOnboard._userdata_callback
def _userdata_callback(self, request)
Definition: flexbe_onboard.py:167
flexbe_core::core::state_machine
flexbe_onboard.flexbe_onboard.FlexbeOnboard._convert_dict
def _convert_dict(self, o)
Definition: flexbe_onboard.py:378
flexbe_onboard.flexbe_onboard.FlexbeOnboard._switch_lock
_switch_lock
Definition: flexbe_onboard.py:54
flexbe_onboard.flexbe_onboard.FlexbeOnboard.__init__
def __init__(self)
Definition: flexbe_onboard.py:27
flexbe_onboard.flexbe_onboard.FlexbeOnboard._convert_input_data
def _convert_input_data(self, keys, values)
Definition: flexbe_onboard.py:351
flexbe_onboard.flexbe_onboard.FlexbeOnboard.be
be
Definition: flexbe_onboard.py:28
flexbe_onboard.flexbe_onboard.FlexbeOnboard._is_switchable
def _is_switchable(self, be)
Definition: flexbe_onboard.py:318
flexbe_onboard.flexbe_onboard.FlexbeOnboard._cleanup_behavior
def _cleanup_behavior(self, behavior_checksum)
Definition: flexbe_onboard.py:328


flexbe_onboard
Author(s): Philipp Schillinger
autogenerated on Fri Jul 21 2023 02:26:20