27 Existing ros_comm hz tests anaylyzes a single topic at a time; 28 This script analyzes multiple topics in a Bag. 37 from docutils.nodes
import topic
42 """ Return a list of topics stored in this bag """ 44 return bag.get_type_and_topic_info()[1].keys()
47 """ Generates a sequence of messages in the topic from the bag """ 49 for top, msg, time
in bag.read_messages():
56 yield t.to_sec(), m.header.stamp.to_sec()
63 Analyzes message interval for a particular topic. 66 bag_name = name +
".bag" 67 bag_path = os.path.expanduser(
"~") +
"/.ros/" + bag_name
73 ts_arr_ = [[bag_ts, msg_ts, bag_ts - msg_ts]
for bag_ts, msg_ts
in ts_gen]
76 print(
"Topic: {}: Insufficient data: {} samples".format(topic, len(ts_arr_)))
80 ts_arr = np.array(ts_arr_)
84 d_mean = np.mean(d_arr)
85 d_max = np.max( d_arr)
86 d_std = np.std( d_arr)
90 int_arr = np.diff(ts_arr[:,:-1], axis = ROWS)
93 int_mean = np.mean(int_arr, axis = ROWS)
94 int_max = np.amax(int_arr, axis = ROWS)
95 int_std = np.std( int_arr, axis = ROWS)
99 print(
"topic: '{}', exp interval= {}. samples= {}: ".format(topic, exp_int, len(ts_arr_)))
100 print(
" mean, max, stdev")
101 print(
" Bag Recording interval: {}, {}, {}".format(int_mean[BAG], int_max[BAG], int_std[BAG]))
102 print(
" Message publish interval: {}, {}, {}".format(int_mean[MSG], int_max[MSG], int_std[MSG]))
103 print(
" Bag Rec - Message Pub Delta: {}, {}, {}".format(d_mean, d_max, d_std))
107 topic_filename = topic.replace(
"/",
"_")
108 np.savetxt(name +
"." + topic_filename +
".csv", int_arr, delimiter=
",")
112 Topic configuration; refer to std_init_commands.yaml. 114 NS =
"/novatel/oem7/" 122 NS +
"corrimu": 0.01,
125 NS +
"insstdev": 1.0,
135 Tests message frequency. 136 Analyzes message intervals based on Message timestamp and Bag recording timestamp. 139 for topic
in topic_config:
def analyze_topic_hz(name, topic, exp_int, output_csv)
def analyze_hz(bag_name, output_csv)
def get_topic_list(bag_name)
def make_msg_gen(bag_name, topic)
def make_timestamp_gen(msg_itr)