test_onboard.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import sys
3 import os
4 import unittest
5 import zlib
6 import rospy
7 
8 from flexbe_onboard.flexbe_onboard import FlexbeOnboard
9 from flexbe_core.proxy import ProxySubscriberCached
10 
11 from flexbe_msgs.msg import BehaviorSelection, BEStatus, BehaviorLog, BehaviorModification
12 
13 
14 class TestOnboard(unittest.TestCase):
15 
16  def __init__(self, name):
17  super(TestOnboard, self).__init__(name)
18  self.sub = ProxySubscriberCached({
19  'flexbe/status': BEStatus,
20  'flexbe/log': BehaviorLog
21  })
22  self.rate = rospy.Rate(100)
23  # make sure that behaviors can be imported
24  data_folder = os.path.dirname(os.path.realpath(__file__))
25  sys.path.insert(0, data_folder)
26  # run onboard and add custom test behaviors to onboard lib
28  self.lib = self.onboard._behavior_lib
29  self.lib._add_behavior_manifests(data_folder)
30 
31  def assertStatus(self, expected, timeout):
32  """ Assert that the expected onboard status is received before the timeout. """
33  for i in range(int(timeout*100)):
34  self.rate.sleep()
35  if self.sub.has_msg('flexbe/status'):
36  break
37  else:
38  raise AssertionError('Did not receive a status as required.')
39  msg = self.sub.get_last_msg('flexbe/status')
40  self.sub.remove_last_msg('flexbe/status')
41  self.assertEqual(msg.code, expected)
42  return msg
43 
45  behavior_pub = rospy.Publisher('flexbe/start_behavior', BehaviorSelection, queue_size=1)
46  rospy.sleep(0.5) # wait for publisher
47 
48  # wait for the initial status message
49  self.assertStatus(BEStatus.READY, 1)
50 
51  # send simple behavior request without checksum
52  be_id, _ = self.lib.find_behavior("Test Behavior Log")
53  request = BehaviorSelection()
54  request.behavior_id = be_id
55  request.autonomy_level = 255
56  behavior_pub.publish(request)
57  self.assertStatus(BEStatus.ERROR, 2)
58 
59  # send valid simple behavior request
60  with open(self.lib.get_sourcecode_filepath(be_id)) as f:
61  request.behavior_checksum = zlib.adler32(f.read().encode()) & 0x7fffffff
62  self.sub.enable_buffer('flexbe/log')
63  behavior_pub.publish(request)
64  self.assertStatus(BEStatus.STARTED, 1)
65  self.assertStatus(BEStatus.FINISHED, 3)
66  behavior_logs = []
67  while self.sub.has_buffered('flexbe/log'):
68  behavior_logs.append(self.sub.get_from_buffer('flexbe/log').text)
69  self.assertIn('Test data', behavior_logs)
70 
71  # send valid complex behavior request
72  be_id, _ = self.lib.find_behavior("Test Behavior Complex")
73  request = BehaviorSelection()
74  request.behavior_id = be_id
75  request.autonomy_level = 255
76  request.arg_keys = ['param']
77  request.arg_values = ['value_2']
78  request.input_keys = ['data']
79  request.input_values = ['2']
80  with open(self.lib.get_sourcecode_filepath(be_id)) as f:
81  content = f.read()
82  modifications = [('flexbe_INVALID', 'flexbe_core'), ('raise ValueError("TODO: Remove!")', '')]
83  for replace, by in modifications:
84  index = content.index(replace)
85  request.modifications.append(BehaviorModification(index, index + len(replace), by))
86  for replace, by in modifications:
87  content = content.replace(replace, by)
88  request.behavior_checksum = zlib.adler32(content.encode()) & 0x7fffffff
89  behavior_pub.publish(request)
90  self.assertStatus(BEStatus.STARTED, 1)
91  result = self.assertStatus(BEStatus.FINISHED, 3)
92  self.assertEqual(result.args[0], 'finished')
93  behavior_logs = []
94  while self.sub.has_buffered('flexbe/log'):
95  behavior_logs.append(self.sub.get_from_buffer('flexbe/log').text)
96  self.assertIn('value_2', behavior_logs)
97 
98  # send the same behavior with different parameters
99  request.arg_keys = ['param', 'invalid']
100  request.arg_values = ['value_1', 'should be ignored']
101  request.input_keys = []
102  request.input_values = []
103  behavior_pub.publish(request)
104  self.assertStatus(BEStatus.STARTED, 1)
105  result = self.assertStatus(BEStatus.FINISHED, 3)
106  self.assertEqual(result.args[0], 'failed')
107  behavior_logs = []
108  while self.sub.has_buffered('flexbe/log'):
109  behavior_logs.append(self.sub.get_from_buffer('flexbe/log').text)
110  self.assertIn('value_1', behavior_logs)
111 
112 
113 if __name__ == '__main__':
114  rospy.init_node('test_flexbe_onboard')
115  import rostest
116  rostest.rosrun('flexbe_onboard', 'test_flexbe_onboard', TestOnboard)
def assertStatus(self, expected, timeout)
Definition: test_onboard.py:31
def __init__(self, name)
Definition: test_onboard.py:16
def test_onboard_behaviors(self)
Definition: test_onboard.py:44


flexbe_onboard
Author(s): Philipp Schillinger
autogenerated on Sun Dec 13 2020 04:01:43