test_pal_statistics.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 from builtins import range
4 
5 import unittest
6 import rospy
7 import gc
8 from pal_statistics import StatisticsRegistry
9 from pal_statistics_msgs.msg import Statistics, StatisticsNames, StatisticsValues
10 
11 DEFAULT_TOPIC = "pal_statistics"
12 class TestPalStatistics(unittest.TestCase):
13 
14  def __init__(self, *args):
15  super(TestPalStatistics, self).__init__(*args)
16  self.clear()
17  self.last_names_msg = None
18 
19  def full_msg_cb(self, msg):
20  self.last_full_msg = msg
21 
22  def names_msg_cb(self, msg):
23  self.last_names_msg = msg
24 
25  def values_msg_cb(self, msg):
26  self.last_values_msg = msg
27 
28  def compare_full_msg(self, expected, full_msg):
29  actual = {}
30  for i in range(0, len(full_msg.statistics)):
31  actual[full_msg.statistics[i].name] = full_msg.statistics[i].value
32 
33  self.assertDictEqual(expected, actual)
34 
35  def compare_optimized_msgs(self, expected, names_msg, values_msg):
36  if names_msg:
37  self.assertListEqual(list(expected.keys()), names_msg.names)
38  self.assertListEqual(list(expected.values()), values_msg.values)
39 
40 
41  def evaluate_msgs(self, expected, registry):
42  full_msg = registry.createFullMsg()
43  self.compare_full_msg(expected, full_msg)
44  names_msg, values_msg = registry.createOptimizedMsgs()
45  self.compare_optimized_msgs(expected, names_msg, values_msg)
46 
47  def test_basic(self):
48  var1 = 0.0
49  registry = StatisticsRegistry(DEFAULT_TOPIC)
50  registry.registerFunction("var1", (lambda: var1))
51 
52  self.evaluate_msgs({"var1": 0.0}, registry)
53  var1 = 1.0
54 
55  msg = registry.createFullMsg()
56  self.evaluate_msgs({"var1": 1.0}, registry)
57  var2 = 2
58 
59  registry.registerFunction("var2", (lambda: var2))
60  msg = registry.createFullMsg()
61  self.evaluate_msgs({"var1": 1.0, "var2": 2.0}, registry)
62 
63  registry.unregister("var1")
64  self.evaluate_msgs({"var2": 2.0}, registry)
65 
66  def test_list_data(self):
67  list_data = [0, 10, 20, 30]
68  registry = StatisticsRegistry(DEFAULT_TOPIC)
69  for i in range(0, len(list_data)):
70  registry.registerFunction("var_{}".format(i), (lambda index=i: list_data[index]))
71 
72  self.evaluate_msgs({"var_0": 0.0, "var_1": 10.0, "var_2": 20.0, "var_3": 30.0}, registry)
73 
74  # Testing by modifying the data in the list
75  list_data[2] = 152.2
76  list_data[3] = 1.23
77  self.evaluate_msgs({"var_0": 0.0, "var_1": 10.0, "var_2": 152.2, "var_3": 1.23}, registry)
78 
79  def test_map_data(self):
80  map_data = {"x": 20.0, "y": 124.2, "z": 20}
81  registry = StatisticsRegistry(DEFAULT_TOPIC)
82  for key in map_data:
83  registry.registerFunction("var_{}".format(key), (lambda map_key=key: map_data[map_key]))
84 
85  self.evaluate_msgs({"var_x": 20.0, "var_y": 124.2, "var_z": 20}, registry)
86 
87  # Testing by modifying the data in the dictionary
88  map_data["x"] = 25.7
89  map_data["z"] += 7
90  self.evaluate_msgs({"var_x": 25.7, "var_y": 124.2, "var_z": 27}, registry)
91 
93  var1 = 0.0
94  registry = StatisticsRegistry(DEFAULT_TOPIC)
95  registration_list = []
96  registry.registerFunction("var1", (lambda: var1), registration_list)
97 
98  self.evaluate_msgs({"var1": 0.0}, registry)
99 
100  del registration_list
101  self.evaluate_msgs({}, registry)
102 
103  def test_publish(self):
104  sub = rospy.Subscriber(DEFAULT_TOPIC + "/full", Statistics,
105  self.full_msg_cb)
106  names_sub = rospy.Subscriber(DEFAULT_TOPIC + "/names", StatisticsNames,
107  self.names_msg_cb)
108  values_sub = rospy.Subscriber(DEFAULT_TOPIC + "/values", StatisticsValues,
109  self.values_msg_cb)
110  registry = StatisticsRegistry(DEFAULT_TOPIC)
111  rospy.sleep(0.5) #Time for pub-sub connection
112  self.clear()
113  registry.publish()
114  self.wait_for_msg()
115  self.compare_full_msg({}, self.last_full_msg)
116  self.assertEqual(self.last_names_msg.names_version,
117  self.last_values_msg.names_version)
118  old_names_ver = self.last_names_msg.names_version
119 
120  var = 1.0
121  registry.registerFunction("var", (lambda: var))
122  self.clear()
123  registry.publish()
124  self.wait_for_msg()
125  self.compare_full_msg({"var": 1.0}, self.last_full_msg)
126  self.assertEqual(self.last_names_msg.names_version,
127  self.last_values_msg.names_version)
128  self.assertNotEqual(old_names_ver, self.last_names_msg.names_version)
129  old_names_ver = self.last_names_msg.names_version
130 
131  #If we publish the same statistics, names_version shouldn't change
132  self.clear(False)
133  registry.publish()
134  self.wait_for_msg()
135  self.assertEqual(old_names_ver, self.last_names_msg.names_version)
136 
137 
138  self.clear()
139  registry.publishCustomStatistic("foo", 23)
140  rospy.sleep(0.2) #hard coded sleep only for full msg
141  self.compare_full_msg({"foo": 23}, self.last_full_msg)
142 
143 
144  def clear(self, expect_new_names=True):
145  self.last_full_msg = None
146  self.last_values_msg = None
147  if expect_new_names:
148  self.last_names_msg = None
149 
150  def wait_for_msg(self, timeout=2.0):
151  end = rospy.Time.now() + rospy.Duration(timeout)
152  while rospy.Time.now() < end:
153  if self.last_full_msg is not None and self.last_values_msg is not None and self.last_names_msg is not None:
154  return
155  rospy.sleep(0.1)
156  raise rospy.ROSException("Timeout waiting for msg")
157 
158 
159 
160 if __name__ == '__main__':
161  import rostest
162  rospy.init_node("pal_statistics_test", log_level=rospy.INFO)
163  rostest.rosrun('pal_statistics', 'test_pal_statistics', TestPalStatistics)
test_pal_statistics.TestPalStatistics.test_basic
def test_basic(self)
Definition: test_pal_statistics.py:47
test_pal_statistics.TestPalStatistics.test_registration_list
def test_registration_list(self)
Definition: test_pal_statistics.py:92
test_pal_statistics.TestPalStatistics.clear
def clear(self, expect_new_names=True)
Definition: test_pal_statistics.py:144
test_pal_statistics.TestPalStatistics.test_map_data
def test_map_data(self)
Definition: test_pal_statistics.py:79
test_pal_statistics.TestPalStatistics.test_publish
def test_publish(self)
Definition: test_pal_statistics.py:103
test_pal_statistics.TestPalStatistics.values_msg_cb
def values_msg_cb(self, msg)
Definition: test_pal_statistics.py:25
pal_statistics::StatisticsRegistry
The StatisticsRegistry class reads the value of registered variables and publishes them on the specif...
Definition: pal_statistics.h:53
test_pal_statistics.TestPalStatistics.wait_for_msg
def wait_for_msg(self, timeout=2.0)
Definition: test_pal_statistics.py:150
test_pal_statistics.TestPalStatistics.last_names_msg
last_names_msg
Definition: test_pal_statistics.py:17
test_pal_statistics.TestPalStatistics.full_msg_cb
def full_msg_cb(self, msg)
Definition: test_pal_statistics.py:19
test_pal_statistics.TestPalStatistics.names_msg_cb
def names_msg_cb(self, msg)
Definition: test_pal_statistics.py:22
test_pal_statistics.TestPalStatistics.last_full_msg
last_full_msg
Definition: test_pal_statistics.py:20
test_pal_statistics.TestPalStatistics.test_list_data
def test_list_data(self)
Definition: test_pal_statistics.py:66
test_pal_statistics.TestPalStatistics
Definition: test_pal_statistics.py:12
test_pal_statistics.TestPalStatistics.compare_optimized_msgs
def compare_optimized_msgs(self, expected, names_msg, values_msg)
Definition: test_pal_statistics.py:35
test_pal_statistics.TestPalStatistics.compare_full_msg
def compare_full_msg(self, expected, full_msg)
Definition: test_pal_statistics.py:28
test_pal_statistics.TestPalStatistics.evaluate_msgs
def evaluate_msgs(self, expected, registry)
Definition: test_pal_statistics.py:41
test_pal_statistics.TestPalStatistics.__init__
def __init__(self, *args)
Definition: test_pal_statistics.py:14
test_pal_statistics.TestPalStatistics.last_values_msg
last_values_msg
Definition: test_pal_statistics.py:26


pal_statistics
Author(s):
autogenerated on Fri Aug 2 2024 08:29:35