1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 from math import sqrt
36 import logging
37 import sys
38
39 from rosgraph_msgs.msg import TopicStatistics
40 import rospy
41
42 _logger = logging.getLogger('rospy.impl.statistics')
46 """
47 Class that monitors each subscriber.
48
49 this class basically just keeps a collection of ConnectionStatisticsLogger.
50 """
51
52 @classmethod
54
55
56 try:
57 return rospy.get_param("/enable_statistics", False)
58 except Exception:
59 return False
60
62 self.subscriber_name = subscriber.name
63 self.connections = dict()
64 self.read_parameters()
65
67 """
68 Fetch window parameters from parameter server
69 """
70
71
72
73
74 self.min_elements = rospy.get_param("/statistics_window_min_elements", 10)
75 self.max_elements = rospy.get_param("/statistics_window_max_elements", 100)
76
77
78 self.min_window = rospy.get_param("/statistics_window_min_size", 4)
79 self.max_window = rospy.get_param("/statistics_window_max_size", 64)
80
81 - def callback(self, msg, publisher, stat_bytes):
82 """
83 This method is called for every message that has been received.
84
85 @param msg: The message received.
86 @param publisher: The name of the publisher node that sent the msg
87 @param stat_bytes: A counter, how many bytes have been moved across
88 this connection since it exists.
89
90 This method just looks up the ConnectionStatisticsLogger for the specific connection
91 between publisher and subscriber and delegates to statistics logging to that
92 instance.
93 """
94
95
96
97 if self.subscriber_name == "/clock" or self.subscriber_name == "/statistics":
98 return
99
100 try:
101
102 logger = self.connections.get(publisher)
103 if logger is None:
104 logger = ConnectionStatisticsLogger(self.subscriber_name, rospy.get_name(), publisher)
105 self.connections[publisher] = logger
106
107
108 logger.callback(self, msg, stat_bytes)
109 except Exception as e:
110 rospy.logerr("Unexpected error during statistics measurement: %s", str(e))
111
116
119 """
120 Class that monitors lots of stuff for each connection.
121
122 is created whenever a subscriber is created.
123 is destroyed whenever its parent subscriber is destroyed.
124 its lifecycle is therefore bound to its parent subscriber.
125 """
126
127 - def __init__(self, topic, subscriber, publisher):
128 """
129 Constructor.
130
131 @param topic: Name of the topic
132 @param subscriber: Name of the subscriber
133 @param publisher: Name of the publisher
134
135 These three should uniquely identify the connection.
136 """
137
138 self.topic = topic
139 self.subscriber = subscriber
140 self.publisher = publisher
141
142 self.pub = rospy.Publisher("/statistics", TopicStatistics, queue_size=10)
143
144
145 self.last_pub_time = rospy.Time(0)
146 self.pub_frequency = rospy.Duration(1.0)
147
148
149 self.age_list_ = []
150
151
152 self.arrival_time_list_ = []
153
154 self.last_seq_ = 0
155 self.dropped_msgs_ = 0
156 self.window_start = rospy.Time.now()
157
158
159 self.stat_bytes_last_ = 0
160 self.stat_bytes_window_ = 0
161
163 """
164 Send out statistics. Aggregate collected stats information.
165
166 Currently done blocking. Might be moved to own thread later. But at the moment
167 any computation done here should be rather quick.
168 """
169 curtime = rospy.Time.now()
170
171 msg = TopicStatistics()
172 msg.topic = self.topic
173 msg.node_sub = self.subscriber
174 msg.node_pub = self.publisher
175
176 msg.window_start = self.window_start
177 msg.window_stop = curtime
178
179
180 msg.traffic = self.stat_bytes_window_ - self.stat_bytes_last_
181
182 msg.delivered_msgs = len(self.arrival_time_list_)
183 msg.dropped_msgs = self.dropped_msgs_
184
185
186 if len(self.age_list_) > 0:
187 msg.stamp_age_mean = rospy.Duration(sum(self.age_list_, rospy.Duration(0)).to_sec() / len(self.age_list_))
188 variance = sum((rospy.Duration((msg.stamp_age_mean - value).to_sec() ** 2) for value in self.age_list_), rospy.Duration(0)) / len(self.age_list_)
189 msg.stamp_age_stddev = rospy.Duration(sqrt(variance.to_sec()))
190 msg.stamp_age_max = max(self.age_list_)
191 else:
192 msg.stamp_age_mean = rospy.Duration(0)
193 msg.stamp_age_stddev = rospy.Duration(0)
194 msg.stamp_age_max = rospy.Duration(0)
195
196
197 if len(self.arrival_time_list_) > 1:
198 periods = [j - i for i, j in zip(self.arrival_time_list_[:-1], self.arrival_time_list_[1:])]
199 msg.period_mean = rospy.Duration(sum(periods, rospy.Duration(0)).to_sec() / len(periods))
200 variance = sum((rospy.Duration((msg.period_mean - value).to_sec() ** 2) for value in periods), rospy.Duration(0)) / len(periods)
201 msg.period_stddev = rospy.Duration(sqrt(variance.to_sec()))
202 msg.period_max = max(periods)
203 else:
204 msg.period_mean = rospy.Duration(0)
205 msg.period_stddev = rospy.Duration(0)
206 msg.period_max = rospy.Duration(0)
207
208 self.pub.publish(msg)
209
210
211 pub_period = 1.0 / self.pub_frequency.to_sec()
212 if len(self.arrival_time_list_) > subscriber_statistics_logger.max_elements and pub_period / 2 >= subscriber_statistics_logger.min_window:
213 self.pub_frequency *= 2
214 if len(self.arrival_time_list_) < subscriber_statistics_logger.min_elements and pub_period * 2 <= subscriber_statistics_logger.max_window:
215 self.pub_frequency /= 2
216
217
218 self.age_list_ = []
219 self.arrival_time_list_ = []
220 self.dropped_msgs_ = 0
221
222 self.window_start = curtime
223
224 self.stat_bytes_last_ = self.stat_bytes_window_
225
226 - def callback(self, subscriber_statistics_logger, msg, stat_bytes):
227 """
228 This method is called for every message, that is received on this
229 subscriber.
230
231 this callback will keep some statistics and publish the results
232 periodically on a topic. the publishing should probably be done
233 asynchronically in another thread.
234
235 @param msg: The message, that has been received. The message has usually
236 been already deserialized. However this is not always the case. (AnyMsg)
237 @param stat_bytes: A counter, how many bytes have been moved across
238 this connection since it exists.
239
240 Any computing-heavy stuff should be done somewhere else, as this
241 callback has to return before the message is delivered to the user.
242 """
243
244 arrival_time = rospy.Time.now()
245
246 self.arrival_time_list_.append(arrival_time)
247
248 self.stat_bytes_window_ = stat_bytes
249
250
251
252
253 if msg._has_header:
254 self.age_list_.append(arrival_time - msg.header.stamp)
255
256 if self.last_seq_ + 1 != msg.header.seq:
257 self.dropped_msgs_ = self.dropped_msgs_ + 1
258 self.last_seq_ = msg.header.seq
259
260
261 if self.last_pub_time + rospy.Duration(1.0 / self.pub_frequency.to_sec()) < arrival_time:
262 self.last_pub_time = arrival_time
263 self.sendStatistics(subscriber_statistics_logger)
264
267