11 from flexbe_msgs.msg
import BehaviorSelection, BEStatus, BehaviorLog, BehaviorModification
17 super(TestOnboard, self).
__init__(name)
18 self.
sub = ProxySubscriberCached({
19 'flexbe/status': BEStatus,
20 'flexbe/log': BehaviorLog
22 self.
rate = rospy.Rate(100)
24 data_folder = os.path.dirname(os.path.realpath(__file__))
25 sys.path.insert(0, data_folder)
28 self.
lib = self.onboard._behavior_lib
29 self.lib._add_behavior_manifests(data_folder)
32 """ Assert that the expected onboard status is received before the timeout. """ 33 for i
in range(int(timeout*100)):
35 if self.sub.has_msg(
'flexbe/status'):
38 raise AssertionError(
'Did not receive a status as required.')
39 msg = self.sub.get_last_msg(
'flexbe/status')
40 self.sub.remove_last_msg(
'flexbe/status')
41 self.assertEqual(msg.code, expected)
45 behavior_pub = rospy.Publisher(
'flexbe/start_behavior', BehaviorSelection, queue_size=1)
52 be_id, _ = self.lib.find_behavior(
"Test Behavior Log")
53 request = BehaviorSelection()
54 request.behavior_id = be_id
55 request.autonomy_level = 255
56 behavior_pub.publish(request)
60 with open(self.lib.get_sourcecode_filepath(be_id))
as f:
61 request.behavior_checksum = zlib.adler32(f.read().encode()) & 0x7fffffff
62 self.sub.enable_buffer(
'flexbe/log')
63 behavior_pub.publish(request)
67 while self.sub.has_buffered(
'flexbe/log'):
68 behavior_logs.append(self.sub.get_from_buffer(
'flexbe/log').text)
69 self.assertIn(
'Test data', behavior_logs)
72 be_id, _ = self.lib.find_behavior(
"Test Behavior Complex")
73 request = BehaviorSelection()
74 request.behavior_id = be_id
75 request.autonomy_level = 255
76 request.arg_keys = [
'param']
77 request.arg_values = [
'value_2']
78 request.input_keys = [
'data']
79 request.input_values = [
'2']
80 with open(self.lib.get_sourcecode_filepath(be_id))
as f:
82 modifications = [(
'flexbe_INVALID',
'flexbe_core'), (
'raise ValueError("TODO: Remove!")',
'')]
83 for replace, by
in modifications:
84 index = content.index(replace)
85 request.modifications.append(BehaviorModification(index, index + len(replace), by))
86 for replace, by
in modifications:
87 content = content.replace(replace, by)
88 request.behavior_checksum = zlib.adler32(content.encode()) & 0x7fffffff
89 behavior_pub.publish(request)
92 self.assertEqual(result.args[0],
'finished')
94 while self.sub.has_buffered(
'flexbe/log'):
95 behavior_logs.append(self.sub.get_from_buffer(
'flexbe/log').text)
96 self.assertIn(
'value_2', behavior_logs)
99 request.arg_keys = [
'param',
'invalid']
100 request.arg_values = [
'value_1',
'should be ignored']
101 request.input_keys = []
102 request.input_values = []
103 behavior_pub.publish(request)
106 self.assertEqual(result.args[0],
'failed')
108 while self.sub.has_buffered(
'flexbe/log'):
109 behavior_logs.append(self.sub.get_from_buffer(
'flexbe/log').text)
110 self.assertIn(
'value_1', behavior_logs)
113 if __name__ ==
'__main__':
114 rospy.init_node(
'test_flexbe_onboard')
116 rostest.rosrun(
'flexbe_onboard',
'test_flexbe_onboard', TestOnboard)
def assertStatus(self, expected, timeout)
def test_onboard_behaviors(self)