1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18   
 19   
 20   
 21   
 22   
 23   
 24   
 25   
 26   
 27   
 28   
 29   
 30   
 31   
 32   
 33   
 34   
 35  from math import sqrt 
 36  import logging 
 37  import sys 
 38   
 39  from rosgraph_msgs.msg import TopicStatistics 
 40  import rospy 
 41   
 42  _logger = logging.getLogger('rospy.impl.statistics') 
 46      """ 
 47      Class that monitors each subscriber. 
 48   
 49      this class basically just keeps a collection of ConnectionStatisticsLogger. 
 50      """ 
 51   
 52      @classmethod 
 54           
 55           
 56          try: 
 57              return rospy.get_param("/enable_statistics", False) 
 58          except Exception: 
 59              return False 
  60   
 62          self.subscriber_name = subscriber.name 
 63          self.connections = dict() 
 64          self.read_parameters() 
  65   
 67          """ 
 68          Fetch window parameters from parameter server 
 69          """ 
 70   
 71           
 72          self.min_elements = rospy.get_param("/statistics_window_min_elements", 10) 
 73          self.max_elements = rospy.get_param("/statistics_window_max_elements", 100) 
 74   
 75           
 76           
 77           
 78          self.max_window = rospy.get_param("/statistics_window_max_size", 64) 
 79          self.min_window = rospy.get_param("/statistics_window_min_size", 4) 
  80   
 81 -    def callback(self, msg, publisher, stat_bytes): 
  82          """ 
 83          This method is called for every message that has been received. 
 84   
 85          @param msg: The message received. 
 86          @param publisher: The name of the publisher node that sent the msg 
 87          @param stat_bytes: A counter, how many bytes have been moved across 
 88          this connection since it exists. 
 89   
 90          This method just looks up the ConnectionStatisticsLogger for the specific connection 
 91          between publisher and subscriber and delegates to statistics logging to that 
 92          instance. 
 93          """ 
 94   
 95           
 96           
 97          if self.subscriber_name == "/clock" or self.subscriber_name == "/statistics": 
 98              return 
 99   
100          try: 
101               
102              logger = self.connections.get(publisher) 
103              if logger is None: 
104                  logger = ConnectionStatisticsLogger(self.subscriber_name, rospy.get_name(), publisher) 
105                  self.connections[publisher] = logger 
106   
107               
108              logger.callback(self, msg, stat_bytes) 
109          except Exception as e: 
110              rospy.logerr("Unexpected error during statistics measurement: %s", str(e)) 
 111   
 116   
119      """ 
120      Class that monitors lots of stuff for each connection. 
121   
122      is created whenever a subscriber is created. 
123      is destroyed whenever its parent subscriber is destroyed. 
124      its lifecycle is therefore bound to its parent subscriber. 
125      """ 
126   
127 -    def __init__(self, topic, subscriber, publisher): 
 128          """ 
129          Constructor. 
130   
131          @param topic: Name of the topic 
132          @param subscriber: Name of the subscriber 
133          @param publisher: Name of the publisher 
134   
135          These three should uniquely identify the connection. 
136          """ 
137   
138          self.topic = topic 
139          self.subscriber = subscriber 
140          self.publisher = publisher 
141   
142          self.pub = rospy.Publisher("/statistics", TopicStatistics, queue_size=10) 
143   
144           
145          self.last_pub_time = rospy.Time(0) 
146          self.pub_frequency = rospy.Duration(1.0) 
147   
148           
149          self.age_list_ = [] 
150   
151           
152          self.arrival_time_list_ = [] 
153   
154          self.last_seq_ = 0 
155          self.dropped_msgs_ = 0 
156          self.window_start = rospy.Time.now() 
157   
158           
159          self.stat_bytes_last_ = 0 
160          self.stat_bytes_window_ = 0 
 161   
163          """ 
164          Send out statistics. Aggregate collected stats information. 
165   
166          Currently done blocking. Might be moved to own thread later. But at the moment 
167          any computation done here should be rather quick. 
168          """ 
169          curtime = rospy.Time.now() 
170   
171          msg = TopicStatistics() 
172          msg.topic = self.topic 
173          msg.node_sub = self.subscriber 
174          msg.node_pub = self.publisher 
175   
176          msg.window_start = self.window_start 
177          msg.window_stop = curtime 
178   
179           
180          msg.traffic = self.stat_bytes_window_ - self.stat_bytes_last_ 
181   
182          msg.delivered_msgs = len(self.arrival_time_list_) 
183          msg.dropped_msgs = self.dropped_msgs_ 
184   
185           
186          if len(self.age_list_) > 0: 
187              msg.stamp_age_mean = rospy.Duration(sum(self.age_list_, rospy.Duration(0)).to_sec() / len(self.age_list_)) 
188              variance = sum((rospy.Duration((msg.stamp_age_mean - value).to_sec() ** 2) for value in self.age_list_), rospy.Duration(0)) / len(self.age_list_) 
189              msg.stamp_age_stddev = rospy.Duration(sqrt(variance.to_sec())) 
190              msg.stamp_age_max = max(self.age_list_) 
191          else: 
192              msg.stamp_age_mean = rospy.Duration(0) 
193              msg.stamp_age_stddev = rospy.Duration(0) 
194              msg.stamp_age_max = rospy.Duration(0) 
195   
196           
197          if len(self.arrival_time_list_) > 1: 
198              periods = [j - i for i, j in zip(self.arrival_time_list_[:-1], self.arrival_time_list_[1:])] 
199              msg.period_mean = rospy.Duration(sum(periods, rospy.Duration(0)).to_sec() / len(periods)) 
200              variance = sum((rospy.Duration((msg.period_mean - value).to_sec() ** 2) for value in periods), rospy.Duration(0)) / len(periods) 
201              msg.period_stddev = rospy.Duration(sqrt(variance.to_sec())) 
202              msg.period_max = max(periods) 
203          else: 
204              msg.period_mean = rospy.Duration(0) 
205              msg.period_stddev = rospy.Duration(0) 
206              msg.period_max = rospy.Duration(0) 
207   
208          self.pub.publish(msg) 
209   
210           
211          if len(self.arrival_time_list_) > subscriber_statistics_logger.max_elements and self.pub_frequency.to_sec() * 2 <= subscriber_statistics_logger.max_window: 
212              self.pub_frequency *= 2 
213          if len(self.arrival_time_list_) < subscriber_statistics_logger.min_elements and self.pub_frequency.to_sec() / 2 >= subscriber_statistics_logger.min_window: 
214              self.pub_frequency /= 2 
215   
216           
217          self.age_list_ = [] 
218          self.arrival_time_list_ = [] 
219          self.dropped_msgs_ = 0 
220   
221          self.window_start = curtime 
222   
223          self.stat_bytes_last_ = self.stat_bytes_window_ 
 224   
225 -    def callback(self, subscriber_statistics_logger, msg, stat_bytes): 
 226          """ 
227          This method is called for every message, that is received on this 
228          subscriber. 
229   
230          this callback will keep some statistics and publish the results 
231          periodically on a topic. the publishing should probably be done 
232          asynchronically in another thread. 
233   
234          @param msg: The message, that has been received. The message has usually 
235          been already deserialized. However this is not always the case. (AnyMsg) 
236          @param stat_bytes: A counter, how many bytes have been moved across 
237          this connection since it exists. 
238   
239          Any computing-heavy stuff should be done somewhere else, as this 
240          callback has to return before the message is delivered to the user. 
241          """ 
242   
243          arrival_time = rospy.Time.now() 
244   
245          self.arrival_time_list_.append(arrival_time) 
246   
247          self.stat_bytes_window_ = stat_bytes 
248   
249           
250           
251           
252          if msg._has_header: 
253              self.age_list_.append(arrival_time - msg.header.stamp) 
254   
255              if self.last_seq_ + 1 != msg.header.seq: 
256                  self.dropped_msgs_ = self.dropped_msgs_ + 1 
257              self.last_seq_ = msg.header.seq 
258   
259           
260          if self.last_pub_time + self.pub_frequency < arrival_time: 
261              self.last_pub_time = arrival_time 
262              self.sendStatistics(subscriber_statistics_logger) 
 263   
 266