test_onboard.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import sys
3 import os
4 import unittest
5 import zlib
6 import rospy
7 
8 from flexbe_onboard.flexbe_onboard import FlexbeOnboard
9 from flexbe_core.proxy import ProxySubscriberCached
10 
11 from flexbe_msgs.msg import BehaviorSelection, BEStatus, BehaviorLog, BehaviorModification
12 
13 
14 class TestOnboard(unittest.TestCase):
15 
16  def __init__(self, name):
17  print("Initializing TestOnboard ...")
18  super(TestOnboard, self).__init__(name)
19  self.sub = ProxySubscriberCached({
20  'flexbe/status': BEStatus,
21  'flexbe/log': BehaviorLog
22  })
23  self.rate = rospy.Rate(100)
24  # make sure that behaviors can be imported
25  data_folder = os.path.dirname(os.path.realpath(__file__))
26  sys.path.insert(0, data_folder)
27  print(f"Using {data_folder} ...")
28  # run onboard and add custom test behaviors to onboard lib
29  print(f"Create onboard instance ...")
31  self.lib = self.onboard._behavior_lib
32  self.lib._add_behavior_manifests(data_folder)
33  print(f"Done adding behavior manifests from {data_folder}")
34 
35  def assertStatus(self, expected, timeout):
36  """ Assert that the expected onboard status is received before the timeout. """
37  for i in range(int(timeout*100)):
38  self.rate.sleep()
39  if self.sub.has_msg('flexbe/status'):
40  break
41  else:
42  raise AssertionError('Did not receive a status as required.')
43  msg = self.sub.get_last_msg('flexbe/status')
44  self.sub.remove_last_msg('flexbe/status')
45  self.assertEqual(msg.code, expected)
46  print(f"Successfully received status {msg}")
47  return msg
48 
50  print("Creating start_behavior publisher ...")
51  behavior_pub = rospy.Publisher('flexbe/start_behavior', BehaviorSelection, queue_size=1)
52  rospy.sleep(0.5) # wait for publisher
53 
54  # wait for the initial status message
55  self.assertStatus(BEStatus.READY, 1)
56  print("Behavior engine is ready!")
57 
58  # send simple behavior request without checksum
59  be_id, _ = self.lib.find_behavior("Test Behavior Log")
60  request = BehaviorSelection()
61  request.behavior_id = be_id
62  request.autonomy_level = 255
63  behavior_pub.publish(request)
64  self.assertStatus(BEStatus.ERROR, 2)
65  print(f"Correctly detected checksum issue!")
66 
67  # send valid simple behavior request
68  print(f"Correctly set checksum and send request ...")
69  with open(self.lib.get_sourcecode_filepath(be_id)) as f:
70  request.behavior_checksum = zlib.adler32(f.read().encode()) & 0x7fffffff
71  self.sub.enable_buffer('flexbe/log')
72  behavior_pub.publish(request)
73  self.assertStatus(BEStatus.STARTED, 1)
74  self.assertStatus(BEStatus.FINISHED, 3)
75  behavior_logs = []
76  while self.sub.has_buffered('flexbe/log'):
77  behavior_logs.append(self.sub.get_from_buffer('flexbe/log').text)
78  self.assertIn('Test data', behavior_logs)
79  print(f"Correctly recived update!")
80 
81  # send valid complex behavior request
82  be_id, _ = self.lib.find_behavior("Test Behavior Complex")
83  print(f"Send complex request ...")
84  request = BehaviorSelection()
85  request.behavior_id = be_id
86  request.autonomy_level = 255
87  request.arg_keys = ['param']
88  request.arg_values = ['value_2']
89  request.input_keys = ['data']
90  request.input_values = ['2']
91  with open(self.lib.get_sourcecode_filepath(be_id)) as f:
92  content = f.read()
93  modifications = [('flexbe_INVALID', 'flexbe_core'), ('raise ValueError("TODO: Remove!")', '')]
94  for replace, by in modifications:
95  index = content.index(replace)
96  request.modifications.append(BehaviorModification(index, index + len(replace), by))
97  for replace, by in modifications:
98  content = content.replace(replace, by)
99  request.behavior_checksum = zlib.adler32(content.encode()) & 0x7fffffff
100  behavior_pub.publish(request)
101  print("Wait for status updates ...")
102  self.assertStatus(BEStatus.STARTED, 1)
103  result = self.assertStatus(BEStatus.FINISHED, 3)
104  self.assertEqual(result.args[0], 'finished')
105  print("Successful update, now check logs ...")
106 
107  behavior_logs = []
108  while self.sub.has_buffered('flexbe/log'):
109  behavior_logs.append(self.sub.get_from_buffer('flexbe/log').text)
110  self.assertIn('value_2', behavior_logs)
111 
112  # send the same behavior with different parameters
113  print(f"Send complex request with different parameters ...")
114  request.arg_keys = ['param', 'invalid']
115  request.arg_values = ['value_1', 'should be ignored']
116  request.input_keys = []
117  request.input_values = []
118  behavior_pub.publish(request)
119  self.assertStatus(BEStatus.STARTED, 1)
120  result = self.assertStatus(BEStatus.FINISHED, 3)
121  self.assertEqual(result.args[0], 'failed')
122  behavior_logs = []
123  while self.sub.has_buffered('flexbe/log'):
124  behavior_logs.append(self.sub.get_from_buffer('flexbe/log').text)
125  self.assertIn('value_1', behavior_logs)
126  print("Success!")
127 
128 if __name__ == '__main__':
129  rospy.init_node('test_flexbe_onboard')
130  import rostest
131  rostest.rosrun('flexbe_onboard', 'test_flexbe_onboard', TestOnboard)
test_onboard.TestOnboard.onboard
onboard
Definition: test_onboard.py:30
test_onboard.TestOnboard.__init__
def __init__(self, name)
Definition: test_onboard.py:16
test_onboard.TestOnboard.lib
lib
Definition: test_onboard.py:31
flexbe_onboard.flexbe_onboard
Definition: flexbe_onboard.py:1
test_onboard.TestOnboard.test_onboard_behaviors
def test_onboard_behaviors(self)
Definition: test_onboard.py:49
test_onboard.TestOnboard.rate
rate
Definition: test_onboard.py:23
test_onboard.TestOnboard.assertStatus
def assertStatus(self, expected, timeout)
Definition: test_onboard.py:35
flexbe_core::proxy
flexbe_onboard.flexbe_onboard.FlexbeOnboard
Definition: flexbe_onboard.py:22
test_onboard.TestOnboard.sub
sub
Definition: test_onboard.py:19
test_onboard.TestOnboard
Definition: test_onboard.py:14


flexbe_onboard
Author(s): Philipp Schillinger
autogenerated on Fri Jul 21 2023 02:26:20