3 from builtins
import range
8 from pal_statistics
import StatisticsRegistry
9 from pal_statistics_msgs.msg
import Statistics, StatisticsNames, StatisticsValues
11 DEFAULT_TOPIC =
"pal_statistics"
15 super(TestPalStatistics, self).
__init__(*args)
30 for i
in range(0, len(full_msg.statistics)):
31 actual[full_msg.statistics[i].name] = full_msg.statistics[i].value
33 self.assertDictEqual(expected, actual)
37 self.assertListEqual(list(expected.keys()), names_msg.names)
38 self.assertListEqual(list(expected.values()), values_msg.values)
42 full_msg = registry.createFullMsg()
44 names_msg, values_msg = registry.createOptimizedMsgs()
50 registry.registerFunction(
"var1", (
lambda: var1))
55 msg = registry.createFullMsg()
59 registry.registerFunction(
"var2", (
lambda: var2))
60 msg = registry.createFullMsg()
63 registry.unregister(
"var1")
67 list_data = [0, 10, 20, 30]
69 for i
in range(0, len(list_data)):
70 registry.registerFunction(
"var_{}".format(i), (
lambda index=i: list_data[index]))
72 self.
evaluate_msgs({
"var_0": 0.0,
"var_1": 10.0,
"var_2": 20.0,
"var_3": 30.0}, registry)
77 self.
evaluate_msgs({
"var_0": 0.0,
"var_1": 10.0,
"var_2": 152.2,
"var_3": 1.23}, registry)
80 map_data = {
"x": 20.0,
"y": 124.2,
"z": 20}
83 registry.registerFunction(
"var_{}".format(key), (
lambda map_key=key: map_data[map_key]))
85 self.
evaluate_msgs({
"var_x": 20.0,
"var_y": 124.2,
"var_z": 20}, registry)
90 self.
evaluate_msgs({
"var_x": 25.7,
"var_y": 124.2,
"var_z": 27}, registry)
95 registration_list = []
96 registry.registerFunction(
"var1", (
lambda: var1), registration_list)
100 del registration_list
104 sub = rospy.Subscriber(DEFAULT_TOPIC +
"/full", Statistics,
106 names_sub = rospy.Subscriber(DEFAULT_TOPIC +
"/names", StatisticsNames,
108 values_sub = rospy.Subscriber(DEFAULT_TOPIC +
"/values", StatisticsValues,
121 registry.registerFunction(
"var", (
lambda: var))
128 self.assertNotEqual(old_names_ver, self.
last_names_msg.names_version)
135 self.assertEqual(old_names_ver, self.
last_names_msg.names_version)
139 registry.publishCustomStatistic(
"foo", 23)
144 def clear(self, expect_new_names=True):
151 end = rospy.Time.now() + rospy.Duration(timeout)
152 while rospy.Time.now() < end:
156 raise rospy.ROSException(
"Timeout waiting for msg")
160 if __name__ ==
'__main__':
162 rospy.init_node(
"pal_statistics_test", log_level=rospy.INFO)
163 rostest.rosrun(
'pal_statistics',
'test_pal_statistics', TestPalStatistics)