oem7_message_test.py
Go to the documentation of this file.
1 ################################################################################
2 # Copyright (c) 2020 NovAtel Inc.
3 #
4 # Permission is hereby granted, free of charge, to any person obtaining a copy
5 # of this software and associated documentation files (the "Software"), to deal
6 # in the Software without restriction, including without limitation the rights
7 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 # copies of the Software, and to permit persons to whom the Software is
9 # furnished to do so, subject to the following conditions:
10 #
11 # The above copyright notice and this permission notice shall be included in all
12 # copies or substantial portions of the Software.
13 #
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 # SOFTWARE.
21 #
22 #################################################################################
23 
24 """
25 Message Hz analysis.
26 
27 Existing ros_comm hz tests anaylyzes a single topic at a time;
28 This script analyzes multiple topics in a Bag.
29 """
30 
31 import os
32 import traceback
33 
34 import numpy as np
35 
36 import rosbag
37 from docutils.nodes import topic
38 
39 
40 
41 def get_topic_list(bag_name):
42  """ Return a list of topics stored in this bag """
43  bag = rosbag.Bag(bag_name, 'r')
44  return bag.get_type_and_topic_info()[1].keys()
45 
46 def make_msg_gen(bag_name, topic):
47  """ Generates a sequence of messages in the topic from the bag """
48  bag = rosbag.Bag(bag_name, 'r')
49  for top, msg, time in bag.read_messages():
50  if top == topic:
51  yield time, msg
52 
53 
54 def make_timestamp_gen(msg_itr):
55  for t, m in msg_itr:
56  yield t.to_sec(), m.header.stamp.to_sec()
57 
58 
59 
60 
61 def analyze_topic_hz(name, topic, exp_int, output_csv):
62  """
63  Analyzes message interval for a particular topic.
64  """
65 
66  bag_name = name + ".bag"
67  bag_path = os.path.expanduser("~") + "/.ros/" + bag_name
68 
69  msg_gen = make_msg_gen(bag_path, topic)
70  ts_gen = make_timestamp_gen(msg_gen)
71 
72  # Array of timestamps: Bag ts (recording time), Message ts (generation time), Delta: rec ts - gen ts
73  ts_arr_ = [[bag_ts, msg_ts, bag_ts - msg_ts] for bag_ts, msg_ts in ts_gen] # sigh...
74 
75  if len(ts_arr_) < 3:
76  print("Topic: {}: Insufficient data: {} samples".format(topic, len(ts_arr_)))
77  return False
78 
79  # Convert to numpy array
80  ts_arr = np.array(ts_arr_)
81 
82  # Delta: time between message generated by the driver and bag recording (i.e. publishign overhead)
83  d_arr = ts_arr[:, -1] # Allow rows in the Delta column
84  d_mean = np.mean(d_arr)
85  d_max = np.max( d_arr)
86  d_std = np.std( d_arr)
87 
88  ROWS = 0 # Axis
89  # Interval: time between individual message handling: generation by driver; bag recording.
90  int_arr = np.diff(ts_arr[:,:-1], axis = ROWS) # Intervals between Bag timestampts, publish timestamps,
91  # i.e. message arrival interval: ['bag interval', 'publish interval']
92 
93  int_mean = np.mean(int_arr, axis = ROWS)
94  int_max = np.amax(int_arr, axis = ROWS)
95  int_std = np.std( int_arr, axis = ROWS)
96 
97  BAG = 0
98  MSG = 1
99  print("topic: '{}', exp interval= {}. samples= {}: ".format(topic, exp_int, len(ts_arr_)))
100  print(" mean, max, stdev")
101  print(" Bag Recording interval: {}, {}, {}".format(int_mean[BAG], int_max[BAG], int_std[BAG]))
102  print(" Message publish interval: {}, {}, {}".format(int_mean[MSG], int_max[MSG], int_std[MSG]))
103  print(" Bag Rec - Message Pub Delta: {}, {}, {}".format(d_mean, d_max, d_std))
104 
105  if output_csv:
106  # Dump into .csv for additional post-processing.
107  topic_filename = topic.replace("/", "_")
108  np.savetxt(name + "." + topic_filename + ".csv", int_arr, delimiter=",")
109 
110 
111 """
112 Topic configuration; refer to std_init_commands.yaml.
113 """
114 NS = "/novatel/oem7/"
115 topic_config = \
116  {
117  # Topic : Rate
118  NS + "bestpos": 0.1,
119  NS + "bestvel": 0.1,
120  NS + "bestutm": 1.0,
121  NS + "time": 1.0,
122  NS + "corrimu": 0.01,
123  NS + "inspva": 0.02,
124  NS + "inspvax": 1.0,
125  NS + "insstdev": 1.0,
126  "/gps/gps": 0.02, # Same rate as INSPVA
127  "/gps/fix": 0.02, # Same rate as gps
128  "/gps/imu": 0.01 # Same as corrimu
129  }
130 
131 
132 
133 def analyze_hz(bag_name, output_csv):
134  """
135  Tests message frequency.
136  Analyzes message intervals based on Message timestamp and Bag recording timestamp.
137  """
138 
139  for topic in topic_config:
140  analyze_topic_hz(bag_name, topic, topic_config[topic], output_csv)
141  print ""
142 
143 
144 
def analyze_topic_hz(name, topic, exp_int, output_csv)
def analyze_hz(bag_name, output_csv)
def get_topic_list(bag_name)
def make_msg_gen(bag_name, topic)
def make_timestamp_gen(msg_itr)


novatel_oem7_driver
Author(s):
autogenerated on Tue Mar 9 2021 03:48:00