32 from docutils.nodes
import topic
36 """ Return a list of topics stored in this bag """
37 bag = rosbag.Bag(bag_name,
'r')
38 return bag.get_type_and_topic_info()[1].keys()
41 """ Generates a sequence of messages in the topic from the bag """
42 bag = rosbag.Bag(bag_name,
'r')
43 for top, msg, t
in bag.read_messages():
47 """ The functions below check for type of class objects in nested ROS messages """
48 primitive_types = (int, float, bool, str)
49 collection_types = (list, tuple)
56 """ Checks equivalency between two float numbers - Following rules of Python PEP485 - https://peps.python.org/pep-0485/#proposed-implementation"""
57 return abs(a-b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol)
60 """ Generates values of various types (int, bool, str, float) from a nested bag object """
61 for member
in dir(item):
62 if member.startswith(
"_"):
64 value = getattr(item, member)
67 elif (repr(type(value)).startswith(
"<class")):
81 Compares contents of two bags; fails if the contents are not identical (except for ROS seqno, timestamp).
84 ref_msg.header.seq = 0
85 uut_msg.header.seq = 0
86 ref_msg.header.stamp =
None
87 uut_msg.header.stamp =
None
89 if (ref_msg != uut_msg):
92 for ref_value
in ref_value_gen:
93 uut_value = next(uut_value_gen)
95 if not is_close(ref_value, uut_value):
96 rospy.logerr(
"Messages do not match:")
97 rospy.logerr(
"Ref:\r\n" + str(ref_msg))
98 rospy.logerr(
"UUT:\r\n" + str(uut_msg))
101 if not(ref_value == uut_value):
102 rospy.logerr(
"Messages do not match:")
103 rospy.logerr(
"Ref:\r\n" + str(ref_msg))
104 rospy.logerr(
"UUT:\r\n" + str(uut_msg))
110 Verifies that two bags contain semantically identical sequence of messages.
115 for topic
in ref_topics:
119 for ref_msg
in ref_gen:
120 uut_msg = next(uut_gen)
121 if not compare(ref_msg, uut_msg):
122 rospy.logerr(
"Topic: {} Msg No: {}".format(topic, msgno))
127 rospy.loginfo(
"Verified {} '{}' messages".format(msgno, topic))
129 unexpected_messages = 0
132 uut_top, uut_msg, uut_t = next(uut_gen)
133 rospy.logerr(
"Unexpected message")
134 rospy.logerr(uut_msg)
135 unexpected_messages += 1
137 except StopIteration:
141 traceback.print_exc()
144 assert(unexpected_messages == 0)
146 if __name__ ==
'__main__':