oem7_decoder_test.py
Go to the documentation of this file.
1 
23 
24 
25 import rosbag
26 import rospy
27 
28 import os
29 import sys
30 
31 import traceback
32 from docutils.nodes import topic
33 
34 
35 def get_topic_list(bag_name):
36  """ Return a list of topics stored in this bag """
37  bag = rosbag.Bag(bag_name, 'r')
38  return bag.get_type_and_topic_info()[1].keys()
39 
40 def make_msg_gen(bag_name, topic):
41  """ Generates a sequence of messages in the topic from the bag """
42  bag = rosbag.Bag(bag_name, 'r')
43  for top, msg, t in bag.read_messages():
44  if top == topic:
45  yield msg
46 
47 """ The functions below check for type of class objects in nested ROS messages """
48 primitive_types = (int, float, bool, str)
49 collection_types = (list, tuple)
50 float_type = (float)
51 def is_primitive(item): return isinstance(item, primitive_types)
52 def is_collection(item): return isinstance(item, collection_types)
53 def is_float_type(item): return isinstance(item, float_type)
54 
55 def is_close(a, b, rel_tol=1e-9, abs_tol=0.0):
56  """ Checks equivalency between two float numbers - Following rules of Python PEP485 - https://peps.python.org/pep-0485/#proposed-implementation"""
57  return abs(a-b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol)
58 
59 def item_gen(item):
60  """ Generates values of various types (int, bool, str, float) from a nested bag object """
61  for member in dir(item):
62  if member.startswith("_"): # exclude private stuff, it's irrelevant and blows up getattr.
63  continue
64  value = getattr(item, member)
65  if (is_primitive(value)):
66  yield value
67  elif (repr(type(value)).startswith("<class")):
68  for c in item_gen(value):
69  yield c
70  elif (is_collection(value)):
71  for elt in value:
72  if is_primitive(elt):
73  continue
74  else:
75  for c in item_gen(value):
76  yield c
77  pass # methods, classmethods, etc.
78 
79 def compare(ref_msg, uut_msg):
80  """
81  Compares contents of two bags; fails if the contents are not identical (except for ROS seqno, timestamp).
82  """
83  # Supress seqno, timestamp
84  ref_msg.header.seq = 0
85  uut_msg.header.seq = 0
86  ref_msg.header.stamp = None
87  uut_msg.header.stamp = None
88 
89  if (ref_msg != uut_msg):
90  ref_value_gen = item_gen(ref_msg)
91  uut_value_gen = item_gen(uut_msg)
92  for ref_value in ref_value_gen:
93  uut_value = next(uut_value_gen)
94  if is_float_type(ref_value):
95  if not is_close(ref_value, uut_value):
96  rospy.logerr("Messages do not match:")
97  rospy.logerr("Ref:\r\n" + str(ref_msg))
98  rospy.logerr("UUT:\r\n" + str(uut_msg))
99  return False;
100  else:
101  if not(ref_value == uut_value):
102  rospy.logerr("Messages do not match:")
103  rospy.logerr("Ref:\r\n" + str(ref_msg))
104  rospy.logerr("UUT:\r\n" + str(uut_msg))
105  return False;
106  return True
107 
108 def verify_bag_equivalency(ref_bag, uut_bag):
109  """
110  Verifies that two bags contain semantically identical sequence of messages.
111  """
112 
113  ref_topics = get_topic_list(ref_bag)
114  print(ref_topics)
115  for topic in ref_topics:
116  msgno = 0
117  uut_gen = make_msg_gen(uut_bag, topic)
118  ref_gen = make_msg_gen(ref_bag, topic)
119  for ref_msg in ref_gen:
120  uut_msg = next(uut_gen)
121  if not compare(ref_msg, uut_msg):
122  rospy.logerr("Topic: {} Msg No: {}".format(topic, msgno))
123  assert False
124 
125  msgno += 1
126 
127  rospy.loginfo("Verified {} '{}' messages".format(msgno, topic))
128  # Check for presence of unexpected messages
129  unexpected_messages = 0
130  try:
131  while True:
132  uut_top, uut_msg, uut_t = next(uut_gen)
133  rospy.logerr("Unexpected message")
134  rospy.logerr(uut_msg)
135  unexpected_messages += 1
136 
137  except StopIteration:
138  pass # Normal
139 
140  except:
141  traceback.print_exc()
142  assert(False)
143 
144  assert(unexpected_messages == 0)
145 
146 if __name__ == '__main__':
147  mgen = make_msg_gen(sys.argv[1], sys.argv[2])
148  for m in mgen:
149  print(m)
150  inspect_item(m)
151 
oem7_decoder_test.is_collection
def is_collection(item)
Definition: oem7_decoder_test.py:52
oem7_decoder_test.get_topic_list
def get_topic_list(bag_name)
Definition: oem7_decoder_test.py:35
oem7_decoder_test.compare
def compare(ref_msg, uut_msg)
Definition: oem7_decoder_test.py:79
oem7_decoder_test.is_primitive
def is_primitive(item)
Definition: oem7_decoder_test.py:51
oem7_decoder_test.is_close
def is_close(a, b, rel_tol=1e-9, abs_tol=0.0)
Definition: oem7_decoder_test.py:55
oem7_decoder_test.item_gen
def item_gen(item)
Definition: oem7_decoder_test.py:59
oem7_decoder_test.verify_bag_equivalency
def verify_bag_equivalency(ref_bag, uut_bag)
Definition: oem7_decoder_test.py:108
oem7_decoder_test.make_msg_gen
def make_msg_gen(bag_name, topic)
Definition: oem7_decoder_test.py:40
oem7_decoder_test.is_float_type
def is_float_type(item)
Definition: oem7_decoder_test.py:53


novatel_oem7_driver
Author(s):
autogenerated on Sat Feb 3 2024 03:51:34