test_interface.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import inspect
3 import rospy
4 
5 from flexbe_core.core import EventState
6 from .logger import Logger
7 
8 
9 class TestInterface(object):
10  """ Interface to states and behaviors that are subject to testing. """
11 
12  def __init__(self, path, classname):
13  package = __import__(path, fromlist=[path])
14  clsmembers = inspect.getmembers(package, lambda member: (
15  inspect.isclass(member) and member.__module__ == package.__name__
16  ))
17  self._class = next(c for name, c in clsmembers if name == classname)
18  self._instance = None
19  Logger.print_positive('%s imported' % self.get_base_name())
20 
21  def is_state(self):
22  return issubclass(self._class, EventState)
23 
24  def get_base_name(self):
25  return "state" if self.is_state() else "behavior"
26 
27  # instantiate
28 
29  def instantiate(self, params=None):
30  if self.is_state():
31  self._instance = self._instantiate_state(params=params)
32  else:
33  self._instance = self._instantiate_behavior(params=params)
34  Logger.print_positive('%s instantiated' % self.get_base_name())
35 
36  def _instantiate_state(self, params=None):
37  if params is None:
38  return self._class()
39  else:
40  return self._class(**params)
41 
42  def _instantiate_behavior(self, params=None):
43  be = self._class()
44  if params is not None:
45  for name, value in params.items():
46  be.set_parameter(name, value)
47  be.set_up(id=0, autonomy_level=255, debug=False)
48  return be
49 
50  # execute
51 
52  def execute(self, userdata, spin_cb=None):
53  spin_cb = spin_cb or (lambda: None)
54  if self.is_state():
55  outcome = self._execute_state(userdata, spin_cb)
56  else:
57  outcome = self._execute_behavior(userdata, spin_cb)
58  Logger.print_positive('finished %s execution' % self.get_base_name())
59  return outcome
60 
61  def _execute_state(self, userdata, spin_cb):
62  self._instance.on_start()
63  outcome = None
64  while outcome is None and not rospy.is_shutdown():
65  outcome = self._instance.execute(userdata)
66  self._instance.sleep()
67  spin_cb()
68  self._instance.on_stop()
69  return outcome
70 
71  def _execute_behavior(self, userdata, spin_cb):
72  self._instance.prepare_for_execution(userdata._data)
73  self._instance.confirm()
74  # do not execute behavior directly, instead explicitly spin its state machine
75  # this is required here for spinning ROS and processing roslaunch context callbacks
76  outcome = None
77  sm = self._instance._state_machine
78  while outcome is None and not rospy.is_shutdown():
79  outcome = sm.execute(userdata)
80  sm.sleep()
81  spin_cb()
82  return outcome
def _execute_state(self, userdata, spin_cb)
def execute(self, userdata, spin_cb=None)
def _execute_behavior(self, userdata, spin_cb)


flexbe_testing
Author(s): Philipp Schillinger
autogenerated on Sun Dec 13 2020 04:01:44