Package rospy :: Package impl :: Module statistics

Source Code for Module rospy.impl.statistics

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2013-2014 Dariush Forouher 
  4  # All rights reserved. 
  5  # 
  6  # Based on code adapted from diagnostics_updater by Blaise Gassend 
  7  # 
  8  # Redistribution and use in source and binary forms, with or without 
  9  # modification, are permitted provided that the following conditions 
 10  # are met: 
 11  # 
 12  #  * Redistributions of source code must retain the above copyright 
 13  #    notice, this list of conditions and the following disclaimer. 
 14  #  * Redistributions in binary form must reproduce the above 
 15  #    copyright notice, this list of conditions and the following 
 16  #    disclaimer in the documentation and/or other materials provided 
 17  #    with the distribution. 
 18  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 19  #    contributors may be used to endorse or promote products derived 
 20  #    from this software without specific prior written permission. 
 21  # 
 22  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 23  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 24  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 25  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 26  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 27  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 28  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 29  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 30  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 31  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 32  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 33  # POSSIBILITY OF SUCH DAMAGE. 
 34   
 35  from math import sqrt 
 36  import logging 
 37  import sys 
 38   
 39  from rosgraph_msgs.msg import TopicStatistics 
 40  import rospy 
 41   
 42  _logger = logging.getLogger('rospy.impl.statistics') 
43 44 45 -class SubscriberStatisticsLogger():
46 """ 47 Class that monitors each subscriber. 48 49 this class basically just keeps a collection of ConnectionStatisticsLogger. 50 """ 51 52 @classmethod
53 - def is_enabled(cls):
54 # disable statistics if node can't talk to parameter server 55 # which is the case in unit tests 56 try: 57 return rospy.get_param("/enable_statistics", False) 58 except Exception: 59 return False
60
61 - def __init__(self, subscriber):
62 self.subscriber_name = subscriber.name 63 self.connections = dict() 64 self.read_parameters()
65
66 - def read_parameters(self):
67 """ 68 Fetch window parameters from parameter server 69 """ 70 71 # Range of acceptable messages in window. 72 # Window size will be adjusted if number of observed is 73 # outside this range. 74 self.min_elements = rospy.get_param("/statistics_window_min_elements", 10) 75 self.max_elements = rospy.get_param("/statistics_window_max_elements", 100) 76 77 # Range of window length, in seconds 78 self.min_window = rospy.get_param("/statistics_window_min_size", 4) 79 self.max_window = rospy.get_param("/statistics_window_max_size", 64)
80
81 - def callback(self, msg, publisher, stat_bytes):
82 """ 83 This method is called for every message that has been received. 84 85 @param msg: The message received. 86 @param publisher: The name of the publisher node that sent the msg 87 @param stat_bytes: A counter, how many bytes have been moved across 88 this connection since it exists. 89 90 This method just looks up the ConnectionStatisticsLogger for the specific connection 91 between publisher and subscriber and delegates to statistics logging to that 92 instance. 93 """ 94 95 # /clock is special, as it is subscribed very early 96 # also exclude /statistics to reduce noise. 97 if self.subscriber_name == "/clock" or self.subscriber_name == "/statistics": 98 return 99 100 try: 101 # create ConnectionStatisticsLogger for new connections 102 logger = self.connections.get(publisher) 103 if logger is None: 104 logger = ConnectionStatisticsLogger(self.subscriber_name, rospy.get_name(), publisher) 105 self.connections[publisher] = logger 106 107 # delegate stuff to that instance 108 logger.callback(self, msg, stat_bytes) 109 except Exception as e: 110 rospy.logerr("Unexpected error during statistics measurement: %s", str(e))
111
112 - def shutdown(self):
113 for logger in self.connections.values(): 114 logger.shutdown() 115 self.connections.clear()
116
117 118 -class ConnectionStatisticsLogger():
119 """ 120 Class that monitors lots of stuff for each connection. 121 122 is created whenever a subscriber is created. 123 is destroyed whenever its parent subscriber is destroyed. 124 its lifecycle is therefore bound to its parent subscriber. 125 """ 126
127 - def __init__(self, topic, subscriber, publisher):
128 """ 129 Constructor. 130 131 @param topic: Name of the topic 132 @param subscriber: Name of the subscriber 133 @param publisher: Name of the publisher 134 135 These three should uniquely identify the connection. 136 """ 137 138 self.topic = topic 139 self.subscriber = subscriber 140 self.publisher = publisher 141 142 self.pub = rospy.Publisher("/statistics", TopicStatistics, queue_size=10) 143 144 # reset window 145 self.last_pub_time = rospy.Time(0) 146 self.pub_frequency = rospy.Duration(1.0) 147 148 # timestamp age 149 self.age_list_ = [] 150 151 # period calculations 152 self.arrival_time_list_ = [] 153 154 self.last_seq_ = 0 155 self.dropped_msgs_ = 0 156 self.window_start = rospy.Time.now() 157 158 # temporary variables 159 self.stat_bytes_last_ = 0 160 self.stat_bytes_window_ = 0
161
162 - def sendStatistics(self, subscriber_statistics_logger):
163 """ 164 Send out statistics. Aggregate collected stats information. 165 166 Currently done blocking. Might be moved to own thread later. But at the moment 167 any computation done here should be rather quick. 168 """ 169 curtime = rospy.Time.now() 170 171 msg = TopicStatistics() 172 msg.topic = self.topic 173 msg.node_sub = self.subscriber 174 msg.node_pub = self.publisher 175 176 msg.window_start = self.window_start 177 msg.window_stop = curtime 178 179 # Calculate bytes since last message 180 msg.traffic = self.stat_bytes_window_ - self.stat_bytes_last_ 181 182 msg.delivered_msgs = len(self.arrival_time_list_) 183 msg.dropped_msgs = self.dropped_msgs_ 184 185 # we can only calculate message age if the messages did contain Header fields. 186 if len(self.age_list_) > 0: 187 msg.stamp_age_mean = rospy.Duration(sum(self.age_list_, rospy.Duration(0)).to_sec() / len(self.age_list_)) 188 variance = sum((rospy.Duration((msg.stamp_age_mean - value).to_sec() ** 2) for value in self.age_list_), rospy.Duration(0)) / len(self.age_list_) 189 msg.stamp_age_stddev = rospy.Duration(sqrt(variance.to_sec())) 190 msg.stamp_age_max = max(self.age_list_) 191 else: 192 msg.stamp_age_mean = rospy.Duration(0) 193 msg.stamp_age_stddev = rospy.Duration(0) 194 msg.stamp_age_max = rospy.Duration(0) 195 196 # computer period/frequency. we need at least two messages within the window to do this. 197 if len(self.arrival_time_list_) > 1: 198 periods = [j - i for i, j in zip(self.arrival_time_list_[:-1], self.arrival_time_list_[1:])] 199 msg.period_mean = rospy.Duration(sum(periods, rospy.Duration(0)).to_sec() / len(periods)) 200 variance = sum((rospy.Duration((msg.period_mean - value).to_sec() ** 2) for value in periods), rospy.Duration(0)) / len(periods) 201 msg.period_stddev = rospy.Duration(sqrt(variance.to_sec())) 202 msg.period_max = max(periods) 203 else: 204 msg.period_mean = rospy.Duration(0) 205 msg.period_stddev = rospy.Duration(0) 206 msg.period_max = rospy.Duration(0) 207 208 self.pub.publish(msg) 209 210 # adjust window, if message count is not appropriate. 211 pub_period = 1.0 / self.pub_frequency.to_sec() 212 if len(self.arrival_time_list_) > subscriber_statistics_logger.max_elements and pub_period / 2 >= subscriber_statistics_logger.min_window: 213 self.pub_frequency *= 2 214 if len(self.arrival_time_list_) < subscriber_statistics_logger.min_elements and pub_period * 2 <= subscriber_statistics_logger.max_window: 215 self.pub_frequency /= 2 216 217 # clear collected stats, start new window. 218 self.age_list_ = [] 219 self.arrival_time_list_ = [] 220 self.dropped_msgs_ = 0 221 222 self.window_start = curtime 223 224 self.stat_bytes_last_ = self.stat_bytes_window_
225
226 - def callback(self, subscriber_statistics_logger, msg, stat_bytes):
227 """ 228 This method is called for every message, that is received on this 229 subscriber. 230 231 this callback will keep some statistics and publish the results 232 periodically on a topic. the publishing should probably be done 233 asynchronically in another thread. 234 235 @param msg: The message, that has been received. The message has usually 236 been already deserialized. However this is not always the case. (AnyMsg) 237 @param stat_bytes: A counter, how many bytes have been moved across 238 this connection since it exists. 239 240 Any computing-heavy stuff should be done somewhere else, as this 241 callback has to return before the message is delivered to the user. 242 """ 243 244 arrival_time = rospy.Time.now() 245 246 self.arrival_time_list_.append(arrival_time) 247 248 self.stat_bytes_window_ = stat_bytes 249 250 # rospy has the feature to subscribe a topic with AnyMsg which aren't deserialized. 251 # Those subscribers won't have a header. But as these subscribers are rather rare 252 # ("rostopic hz" is the only one I know of), I'm gonna ignore them. 253 if msg._has_header: 254 self.age_list_.append(arrival_time - msg.header.stamp) 255 256 if self.last_seq_ + 1 != msg.header.seq: 257 self.dropped_msgs_ = self.dropped_msgs_ + 1 258 self.last_seq_ = msg.header.seq 259 260 # send out statistics with a certain frequency 261 if self.last_pub_time + rospy.Duration(1.0 / self.pub_frequency.to_sec()) < arrival_time: 262 self.last_pub_time = arrival_time 263 self.sendStatistics(subscriber_statistics_logger)
264
265 - def shutdown(self):
266 self.pub.unregister()
267