11 from flexbe_msgs.msg
import BehaviorSelection, BEStatus, BehaviorLog, BehaviorModification
17 print(
"Initializing TestOnboard ...")
18 super(TestOnboard, self).
__init__(name)
19 self.
sub = ProxySubscriberCached({
20 'flexbe/status': BEStatus,
21 'flexbe/log': BehaviorLog
23 self.
rate = rospy.Rate(100)
25 data_folder = os.path.dirname(os.path.realpath(__file__))
26 sys.path.insert(0, data_folder)
27 print(f
"Using {data_folder} ...")
29 print(f
"Create onboard instance ...")
32 self.
lib._add_behavior_manifests(data_folder)
33 print(f
"Done adding behavior manifests from {data_folder}")
36 """ Assert that the expected onboard status is received before the timeout. """
37 for i
in range(int(timeout*100)):
39 if self.
sub.has_msg(
'flexbe/status'):
42 raise AssertionError(
'Did not receive a status as required.')
43 msg = self.
sub.get_last_msg(
'flexbe/status')
44 self.
sub.remove_last_msg(
'flexbe/status')
45 self.assertEqual(msg.code, expected)
46 print(f
"Successfully received status {msg}")
50 print(
"Creating start_behavior publisher ...")
51 behavior_pub = rospy.Publisher(
'flexbe/start_behavior', BehaviorSelection, queue_size=1)
56 print(
"Behavior engine is ready!")
59 be_id, _ = self.
lib.find_behavior(
"Test Behavior Log")
60 request = BehaviorSelection()
61 request.behavior_id = be_id
62 request.autonomy_level = 255
63 behavior_pub.publish(request)
65 print(f
"Correctly detected checksum issue!")
68 print(f
"Correctly set checksum and send request ...")
69 with open(self.
lib.get_sourcecode_filepath(be_id))
as f:
70 request.behavior_checksum = zlib.adler32(f.read().encode()) & 0x7fffffff
71 self.
sub.enable_buffer(
'flexbe/log')
72 behavior_pub.publish(request)
76 while self.
sub.has_buffered(
'flexbe/log'):
77 behavior_logs.append(self.
sub.get_from_buffer(
'flexbe/log').text)
78 self.assertIn(
'Test data', behavior_logs)
79 print(f
"Correctly recived update!")
82 be_id, _ = self.
lib.find_behavior(
"Test Behavior Complex")
83 print(f
"Send complex request ...")
84 request = BehaviorSelection()
85 request.behavior_id = be_id
86 request.autonomy_level = 255
87 request.arg_keys = [
'param']
88 request.arg_values = [
'value_2']
89 request.input_keys = [
'data']
90 request.input_values = [
'2']
91 with open(self.
lib.get_sourcecode_filepath(be_id))
as f:
93 modifications = [(
'flexbe_INVALID',
'flexbe_core'), (
'raise ValueError("TODO: Remove!")',
'')]
94 for replace, by
in modifications:
95 index = content.index(replace)
96 request.modifications.append(BehaviorModification(index, index + len(replace), by))
97 for replace, by
in modifications:
98 content = content.replace(replace, by)
99 request.behavior_checksum = zlib.adler32(content.encode()) & 0x7fffffff
100 behavior_pub.publish(request)
101 print(
"Wait for status updates ...")
104 self.assertEqual(result.args[0],
'finished')
105 print(
"Successful update, now check logs ...")
108 while self.
sub.has_buffered(
'flexbe/log'):
109 behavior_logs.append(self.
sub.get_from_buffer(
'flexbe/log').text)
110 self.assertIn(
'value_2', behavior_logs)
113 print(f
"Send complex request with different parameters ...")
114 request.arg_keys = [
'param',
'invalid']
115 request.arg_values = [
'value_1',
'should be ignored']
116 request.input_keys = []
117 request.input_values = []
118 behavior_pub.publish(request)
121 self.assertEqual(result.args[0],
'failed')
123 while self.
sub.has_buffered(
'flexbe/log'):
124 behavior_logs.append(self.
sub.get_from_buffer(
'flexbe/log').text)
125 self.assertIn(
'value_1', behavior_logs)
128 if __name__ ==
'__main__':
129 rospy.init_node(
'test_flexbe_onboard')
131 rostest.rosrun(
'flexbe_onboard',
'test_flexbe_onboard', TestOnboard)