1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 from math import sqrt
36 import logging
37 import sys
38
39 from rosgraph_msgs.msg import TopicStatistics
40 import rospy
41
42 _logger = logging.getLogger('rospy.impl.statistics')
46 """
47 Class that monitors each subscriber.
48
49 this class basically just keeps a collection of ConnectionStatisticsLogger.
50 """
51
52 @classmethod
54
55
56 try:
57 return rospy.get_param("/enable_statistics", False)
58 except Exception:
59 return False
60
62 self.subscriber_name = subscriber.name
63 self.connections = dict()
64 self.read_parameters()
65
67 """
68 Fetch window parameters from parameter server
69 """
70
71
72 self.min_elements = rospy.get_param("/statistics_window_min_elements", 10)
73 self.max_elements = rospy.get_param("/statistics_window_max_elements", 100)
74
75
76
77
78 self.max_window = rospy.get_param("/statistics_window_max_size", 64)
79 self.min_window = rospy.get_param("/statistics_window_min_size", 4)
80
81 - def callback(self, msg, publisher, stat_bytes):
82 """
83 This method is called for every message that has been received.
84
85 @param msg: The message received.
86 @param publisher: The name of the publisher node that sent the msg
87 @param stat_bytes: A counter, how many bytes have been moved across
88 this connection since it exists.
89
90 This method just looks up the ConnectionStatisticsLogger for the specific connection
91 between publisher and subscriber and delegates to statistics logging to that
92 instance.
93 """
94
95
96
97 if self.subscriber_name == "/clock" or self.subscriber_name == "/statistics":
98 return
99
100 try:
101
102 logger = self.connections.get(publisher)
103 if logger is None:
104 logger = ConnectionStatisticsLogger(self.subscriber_name, rospy.get_name(), publisher)
105 self.connections[publisher] = logger
106
107
108 logger.callback(self, msg, stat_bytes)
109 except Exception as e:
110 rospy.logerr("Unexpected error during statistics measurement: %s", str(e))
111
116
119 """
120 Class that monitors lots of stuff for each connection.
121
122 is created whenever a subscriber is created.
123 is destroyed whenever its parent subscriber is destroyed.
124 its lifecycle is therefore bound to its parent subscriber.
125 """
126
127 - def __init__(self, topic, subscriber, publisher):
128 """
129 Constructor.
130
131 @param topic: Name of the topic
132 @param subscriber: Name of the subscriber
133 @param publisher: Name of the publisher
134
135 These three should uniquely identify the connection.
136 """
137
138 self.topic = topic
139 self.subscriber = subscriber
140 self.publisher = publisher
141
142 self.pub = rospy.Publisher("/statistics", TopicStatistics, queue_size=10)
143
144
145 self.last_pub_time = rospy.Time(0)
146 self.pub_frequency = rospy.Duration(1.0)
147
148
149 self.age_list_ = []
150
151
152 self.arrival_time_list_ = []
153
154 self.last_seq_ = 0
155 self.dropped_msgs_ = 0
156 self.window_start = rospy.Time.now()
157
158
159 self.stat_bytes_last_ = 0
160 self.stat_bytes_window_ = 0
161
163 """
164 Send out statistics. Aggregate collected stats information.
165
166 Currently done blocking. Might be moved to own thread later. But at the moment
167 any computation done here should be rather quick.
168 """
169 curtime = rospy.Time.now()
170
171 msg = TopicStatistics()
172 msg.topic = self.topic
173 msg.node_sub = self.subscriber
174 msg.node_pub = self.publisher
175
176 msg.window_start = self.window_start
177 msg.window_stop = curtime
178
179
180 msg.traffic = self.stat_bytes_window_ - self.stat_bytes_last_
181
182 msg.delivered_msgs = len(self.arrival_time_list_)
183 msg.dropped_msgs = self.dropped_msgs_
184
185
186 if len(self.age_list_) > 0:
187 msg.stamp_age_mean = rospy.Duration(sum(self.age_list_, rospy.Duration(0)).to_sec() / len(self.age_list_))
188 variance = sum((rospy.Duration((msg.stamp_age_mean - value).to_sec() ** 2) for value in self.age_list_), rospy.Duration(0)) / len(self.age_list_)
189 msg.stamp_age_stddev = rospy.Duration(sqrt(variance.to_sec()))
190 msg.stamp_age_max = max(self.age_list_)
191 else:
192 msg.stamp_age_mean = rospy.Duration(0)
193 msg.stamp_age_stddev = rospy.Duration(0)
194 msg.stamp_age_max = rospy.Duration(0)
195
196
197 if len(self.arrival_time_list_) > 1:
198 periods = [j - i for i, j in zip(self.arrival_time_list_[:-1], self.arrival_time_list_[1:])]
199 msg.period_mean = rospy.Duration(sum(periods, rospy.Duration(0)).to_sec() / len(periods))
200 variance = sum((rospy.Duration((msg.period_mean - value).to_sec() ** 2) for value in periods), rospy.Duration(0)) / len(periods)
201 msg.period_stddev = rospy.Duration(sqrt(variance.to_sec()))
202 msg.period_max = max(periods)
203 else:
204 msg.period_mean = rospy.Duration(0)
205 msg.period_stddev = rospy.Duration(0)
206 msg.period_max = rospy.Duration(0)
207
208 self.pub.publish(msg)
209
210
211 if len(self.arrival_time_list_) > subscriber_statistics_logger.max_elements and self.pub_frequency.to_sec() * 2 <= subscriber_statistics_logger.max_window:
212 self.pub_frequency *= 2
213 if len(self.arrival_time_list_) < subscriber_statistics_logger.min_elements and self.pub_frequency.to_sec() / 2 >= subscriber_statistics_logger.min_window:
214 self.pub_frequency /= 2
215
216
217 self.age_list_ = []
218 self.arrival_time_list_ = []
219 self.dropped_msgs_ = 0
220
221 self.window_start = curtime
222
223 self.stat_bytes_last_ = self.stat_bytes_window_
224
225 - def callback(self, subscriber_statistics_logger, msg, stat_bytes):
226 """
227 This method is called for every message, that is received on this
228 subscriber.
229
230 this callback will keep some statistics and publish the results
231 periodically on a topic. the publishing should probably be done
232 asynchronically in another thread.
233
234 @param msg: The message, that has been received. The message has usually
235 been already deserialized. However this is not always the case. (AnyMsg)
236 @param stat_bytes: A counter, how many bytes have been moved across
237 this connection since it exists.
238
239 Any computing-heavy stuff should be done somewhere else, as this
240 callback has to return before the message is delivered to the user.
241 """
242
243 arrival_time = rospy.Time.now()
244
245 self.arrival_time_list_.append(arrival_time)
246
247 self.stat_bytes_window_ = stat_bytes
248
249
250
251
252 if msg._has_header:
253 self.age_list_.append(arrival_time - msg.header.stamp)
254
255 if self.last_seq_ + 1 != msg.header.seq:
256 self.dropped_msgs_ = self.dropped_msgs_ + 1
257 self.last_seq_ = msg.header.seq
258
259
260 if self.last_pub_time + self.pub_frequency < arrival_time:
261 self.last_pub_time = arrival_time
262 self.sendStatistics(subscriber_statistics_logger)
263
266