oem7_decoder_test.py
Go to the documentation of this file.
1 ################################################################################
2 # Copyright (c) 2020 NovAtel Inc.
3 #
4 # Permission is hereby granted, free of charge, to any person obtaining a copy
5 # of this software and associated documentation files (the "Software"), to deal
6 # in the Software without restriction, including without limitation the rights
7 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 # copies of the Software, and to permit persons to whom the Software is
9 # furnished to do so, subject to the following conditions:
10 #
11 # The above copyright notice and this permission notice shall be included in all
12 # copies or substantial portions of the Software.
13 #
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 # SOFTWARE.
21 #
22 #################################################################################
23 
24 
25 import rosbag
26 import rospy
27 
28 import os
29 import sys
30 
31 import traceback
32 from docutils.nodes import topic
33 
34 
35 
36 def get_topic_list(bag_name):
37  """ Return a list of topics stored in this bag """
38  bag = rosbag.Bag(bag_name, 'r')
39  return bag.get_type_and_topic_info()[1].keys()
40 
41 def make_msg_gen(bag_name, topic):
42  """ Generates a sequence of messages in the topic from the bag """
43  bag = rosbag.Bag(bag_name, 'r')
44  for top, msg, t in bag.read_messages():
45  if top == topic:
46  yield msg
47 
48 
49 
50 def compare(ref_msg, uut_msg):
51  """
52  Compares contents of two bags; fails if the contents are not identical (except for ROS seqno, timestamp).
53  """
54  # Supress seqno, timestamp
55  ref_msg.header.seq = 0
56  uut_msg.header.seq = 0
57 
58  ref_msg.header.stamp = None
59  uut_msg.header.stamp = None
60 
61  if(ref_msg != uut_msg):
62  rospy.logerr("Messages do not match:")
63  rospy.logerr("Ref:\r\n" + str(ref_msg))
64  rospy.logerr("UUT:\r\n" + str(uut_msg))
65  return False;
66 
67  return True
68 
69 
70 def verify_bag_equivalency(ref_bag, uut_bag):
71  """
72  Verifies that two bags contain semantically identical sequence of messages.
73  """
74 
75  ref_topics = get_topic_list(ref_bag)
76  print(ref_topics)
77  for topic in ref_topics:
78  msgno = 0
79  uut_gen = make_msg_gen(uut_bag, topic)
80  ref_gen = make_msg_gen(ref_bag, topic)
81  for ref_msg in ref_gen:
82  uut_msg = next(uut_gen)
83  if not compare(ref_msg, uut_msg):
84  rospy.logerr("Topic: {} Msg No: {}".format(topic, msgno))
85  assert False
86 
87  msgno += 1
88 
89  rospy.loginfo("Verified {} '{}' messages".format(msgno, topic))
90  # Check for presence of unexpected messages
91  unexpected_messages = 0
92  try:
93  while True:
94  uut_top, uut_msg, uut_t = next(uut_gen)
95  rospy.logerr("Unexpected message")
96  rospy.logerr(uut_msg)
97  unexpected_messages += 1
98 
99  except StopIteration:
100  pass # Normal
101 
102  except:
103  traceback.print_exc()
104  assert(False)
105 
106  assert(unexpected_messages == 0)
107 
108 
109 
110 
111 
112 
def get_topic_list(bag_name)
def compare(ref_msg, uut_msg)
def make_msg_gen(bag_name, topic)
def verify_bag_equivalency(ref_bag, uut_bag)


novatel_oem7_driver
Author(s):
autogenerated on Tue Mar 9 2021 03:48:00