00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 from __future__ import print_function
00016
00017 import threading
00018 import copy
00019 import math
00020 import numpy as np
00021 import random
00022
00023 import rospy
00024 from ros_statistics_msgs.msg import HostStatistics
00025 from ros_statistics_msgs.msg import NodeStatistics
00026
00027 from ros_topology_msgs.msg import Graph
00028
00029
00030
00031 from rosgraph_msgs.msg import TopicStatistics
00032
00033
00034 from diarc.base_adapter import BaseAdapter
00035 from diarc.view import BlockItemAttributes
00036 from diarc.view import BandItemAttributes
00037 from diarc.view import SnapItemAttributes
00038 import ros_topology as rsg
00039
00040
00041 class ColorMapper(object):
00042 def __init__(self):
00043 self._choices = list()
00044
00045 self._choices.extend(["IndianRed", "DarkSalmon", "Crimson"])
00046
00047 self._choices.extend(["HotPink", "DeepPink"])
00048
00049 self._choices.extend(["Coral", "OrangeRed", "DarkOrange"])
00050
00051 self._choices.extend(["Gold", "DarkKhaki"])
00052
00053 self._choices.extend(["Thistle", "Orchid", "MediumPurple", "DarkOrchid", "Purple", "Indigo", "DarkSlateBlue"])
00054
00055 self._choices.extend(["LawnGreen", "LimeGreen", "MediumSeaGreen", "ForestGreen", "OliveDrab", "Olive", "DarkOliveGreen", "DarkCyan"])
00056
00057 self._choices.extend(["PaleTurquoise", "Turquoise", "CadetBlue", "SteelBlue", "DodgerBlue"])
00058
00059
00060 self._used_colors = dict()
00061
00062 def get_unique_color(self, name):
00063 if name not in self._used_colors:
00064 if len(self._choices) > 0:
00065 self._used_colors[name] = random.choice(self._choices)
00066 self._choices.remove(self._used_colors[name])
00067 else:
00068 self._used_colors[name] = "Gray"
00069 return self._used_colors[name]
00070
00071 def release_unique_color(self, name):
00072 if name in self._used_colors:
00073 if not self._used_colors[name] == "Gray":
00074 self._choices.append(self._used_colors[name])
00075 self._used_colors.pop(name)
00076 else:
00077 rospy.logwarn("Unknown name mapped to color!")
00078
00079
00080 class ROSProfileAdapter(BaseAdapter):
00081 """ Implementes the Adapter interface for the View and provides hooks for
00082 populating and implementing the ros specific version of the topology.
00083 Subscribes to /statistics, /node_statistics, /host_statistics, and /topology.
00084 Publishes this combined information as /profile
00085 """
00086
00087 def __init__(self, view):
00088 super(ROSProfileAdapter, self).__init__(rsg.RosSystemGraph(), view)
00089 self._topology.hide_disconnected_snaps = True
00090
00091 self._colormapper = ColorMapper()
00092
00093 self._auto_update = True
00094
00095 self._TOPIC_QUIET_LIST = list()
00096 self._NODE_QUIET_LIST = list()
00097
00098
00099
00100
00101 self._last_topology_received = Graph()
00102 self._node_statistics_buffer = dict()
00103 self._host_statistics_buffer = dict()
00104 self._topic_statistics_buffer = dict()
00105 self._previous_node_statistics_buffer = dict()
00106 self._previous_host_statistics_buffer = dict()
00107 self._previous_topic_statistics_buffer = dict()
00108
00109
00110 self.node_statistics_subscriber = rospy.Subscriber('/node_statistics', NodeStatistics, self._node_statistics_callback)
00111 self.topic_statistics_subscriber = rospy.Subscriber('/statistics', TopicStatistics, self._topic_statistics_callback)
00112 self.host_statistics_subscriber = rospy.Subscriber('/host_statistics', HostStatistics, self._host_statistics_callback)
00113 self.topology_subscriber = rospy.Subscriber('/topology', Graph, self._topology_callback)
00114 self._lock = threading.Lock()
00115
00116
00117 self._stats_timer = rospy.Timer(rospy.Duration(2.0), lambda x: self.statistics_update())
00118
00119 def set_topic_quiet_list(self, topic_names):
00120 rospy.loginfo("Updating topic quiet list to %r" % topic_names)
00121 self._TOPIC_QUIET_LIST = copy.copy(topic_names)
00122
00123 def get_topic_quiet_list(self):
00124 return copy.copy(self._TOPIC_QUIET_LIST)
00125
00126 def set_node_quiet_list(self, node_names):
00127 self._NODE_QUIET_LIST = copy.copy(node_names)
00128
00129 def get_node_quiet_list(self):
00130 return copy.copy(self._NODE_QUIET_LIST)
00131
00132 def enable_auto_update(self):
00133 """ Automatically update the visualization when information is received """
00134 self._auto_update = True
00135 self._stats_timer = rospy.Timer(rospy.Duration(2.0), lambda x: self.statistics_update())
00136
00137 def disable_auto_update(self):
00138 """ buffer information received from ROS, but do not automatically update the visualization """
00139 self._auto_update = False
00140 self._stats_timer.shutdown()
00141 self._stats_timer = None
00142
00143 def show_disconnected_topics(self):
00144 self._topology.hide_disconnected_snaps = False
00145 self.topology_update()
00146
00147 def hide_disconnected_topics(self):
00148 self._topology.hide_disconnected_snaps = True
00149 self.topology_update()
00150
00151 def _node_statistics_callback(self, data):
00152 """ Buffers NodeStatistics data """
00153
00154
00155
00156
00157
00158
00159
00160 if data.node not in self._node_statistics_buffer:
00161 self._node_statistics_buffer[data.node] = list()
00162 self._node_statistics_buffer[data.node].append(data)
00163
00164 def _topic_statistics_callback(self, data):
00165 """ Buffers TopicStatistics data """
00166
00167 if data.topic not in self._topic_statistics_buffer:
00168 self._topic_statistics_buffer[data.topic] = list()
00169 self._topic_statistics_buffer[data.topic].append(data)
00170
00171 def _host_statistics_callback(self, data):
00172 """ Buffers HostStatistics data """
00173
00174
00175 if data.hostname not in self._host_statistics_buffer:
00176 self._host_statistics_buffer[data.hostname] = list()
00177 self._host_statistics_buffer[data.hostname].append(data)
00178
00179 def _topology_callback(self, data):
00180 self._last_topology_received = copy.copy(data)
00181 if self._auto_update:
00182 self.topology_update()
00183
00184 def topology_update(self):
00185 """ Updates the model with current topology information """
00186 data = self._last_topology_received
00187
00188
00189 rsgTopics = self._topology.topics
00190 allCurrentTopicNames = [t.name for t in data.topics]
00191 for topic in rsgTopics.values():
00192 if topic.name in self._TOPIC_QUIET_LIST:
00193 print("Removing Topic", topic.name, "found in quiet list")
00194 self._colormapper.release_unique_color(topic.name)
00195 topic.release()
00196 elif topic.name not in allCurrentTopicNames:
00197 print("Removing Topic", topic.name, "not found in ", allCurrentTopicNames)
00198 self._colormapper.release_unique_color(topic.name)
00199 topic.release()
00200
00201
00202 for topic in data.topics:
00203 if topic.name not in rsgTopics and topic.name not in self._TOPIC_QUIET_LIST:
00204 topic = rsg.Topic(self._topology, topic.name, topic.type)
00205
00206
00207 rsgNodes = self._topology.nodes
00208
00209
00210 allCurrentNodeNames = [n.name for n in data.nodes]
00211 for node in rsgNodes.values():
00212 if node.name in self._NODE_QUIET_LIST:
00213 print("Removing node", node.name, "found on quiet list")
00214 node.release()
00215 elif node.name not in allCurrentNodeNames:
00216 print("Removing Node", node.name, "not found in ", allCurrentNodeNames)
00217 node.release()
00218
00219
00220
00221 for node in data.nodes:
00222
00223 if node.name in self._NODE_QUIET_LIST:
00224 continue
00225 rsg_node = None
00226 if node.name not in rsgNodes:
00227 rsg_node = rsg.Node(self._topology, node.name)
00228 rsg_node.location = node.uri
00229 else:
00230 rsg_node = self._topology.nodes[node.name]
00231 if not rsg_node.location == node.uri:
00232 rospy.logerr("rsg_node and data.node uri's do not match for name %s" % node.name)
00233
00234
00235 publishes_list = [p for p in node.publishes if p not in self._TOPIC_QUIET_LIST]
00236 subscribes_list = [s for s in node.subscribes if s not in self._TOPIC_QUIET_LIST]
00237
00238
00239
00240
00241
00242
00243 existing_rsg_node_pub_topics = dict([(publisher.topic.name, publisher) for publisher in rsg_node.publishers])
00244 current_node_prof_pub_topics = dict([(topic_name, self._topology.topics[topic_name]) for topic_name in publishes_list])
00245 for existing_topic_name in existing_rsg_node_pub_topics.keys():
00246
00247 if existing_topic_name not in current_node_prof_pub_topics.keys():
00248 existing_rsg_node_pub_topics[existing_topic_name].release()
00249 for current_topic_name in current_node_prof_pub_topics.keys():
00250
00251 if current_topic_name not in existing_rsg_node_pub_topics.keys():
00252 publisher = rsg.Publisher(self._topology, rsg_node, current_node_prof_pub_topics[current_topic_name])
00253
00254
00255
00256 existing_rsg_node_sub_topics = dict([(subscriber.topic.name, subscriber) for subscriber in rsg_node.subscribers])
00257 current_node_prof_sub_topics = dict([(topic_name, self._topology.topics[topic_name]) for topic_name in subscribes_list])
00258 for existing_topic_name in existing_rsg_node_sub_topics.keys():
00259
00260 if existing_topic_name not in current_node_prof_sub_topics.keys():
00261 existing_rsg_node_sub_topics[existing_topic_name].release()
00262 for current_topic_name in current_node_prof_sub_topics.keys():
00263
00264 if current_topic_name not in existing_rsg_node_sub_topics.keys():
00265 subscriber = rsg.Subscriber(self._topology, rsg_node, current_node_prof_sub_topics[current_topic_name])
00266
00267 self._update_view()
00268
00269 def statistics_update(self):
00270 """ Updates the model with current statistics information """
00271 rospy.logdebug("Updating Statistics")
00272
00273 node_statistics_buffer = dict(self._node_statistics_buffer.items() + self._previous_node_statistics_buffer.items())
00274 host_statistics_buffer = dict(self._host_statistics_buffer.items() + self._previous_host_statistics_buffer.items())
00275 topic_statistics_buffer = dict(self._topic_statistics_buffer.items() + self._previous_topic_statistics_buffer.items())
00276
00277
00278 rsgNodes = self._topology.nodes
00279 for node_name, data_buffer in node_statistics_buffer.items():
00280
00281
00282 if node_name not in rsgNodes:
00283 if node_name not in self._NODE_QUIET_LIST:
00284 rospy.logwarn("Received Statistics Information for untracked node %s" % node_name)
00285 continue
00286
00287 samples = list()
00288 num_threads = list()
00289 cpu_load_mean = list()
00290 cpu_load_std = list()
00291 cpu_load_max = list()
00292 virt_mem_mean = list()
00293 virt_mem_std = list()
00294 virt_mem_max = list()
00295
00296 for data in data_buffer:
00297 samples.append(data.samples)
00298 num_threads.append(data.threads)
00299 cpu_load_mean.append(data.cpu_load_mean)
00300 cpu_load_std.append(data.cpu_load_std)
00301 cpu_load_max.append(data.cpu_load_max)
00302 virt_mem_mean.append(data.virt_mem_mean)
00303 virt_mem_std.append(data.virt_mem_std)
00304 virt_mem_max.append(data.virt_mem_max)
00305 rsgNodes[node_name].num_threads = max(num_threads)
00306 rsgNodes[node_name].cpu_load_mean = np.mean(np.array(cpu_load_mean))
00307 rsgNodes[node_name].cpu_load_std = math.sqrt(sum(
00308 [math.pow(sd, 2)/n for sd, n in zip(cpu_load_std, samples)]))
00309 rsgNodes[node_name].cpu_load_max = max(cpu_load_max)
00310 rsgNodes[node_name].virt_mem_mean = np.mean(np.array(virt_mem_mean))
00311 rsgNodes[node_name].virt_mem_std = math.sqrt(sum(
00312 [math.pow(sd, 2)/n for sd, n in zip(virt_mem_std, samples)]))
00313 rsgNodes[node_name].virt_mem_max = max(virt_mem_max)
00314
00315
00316
00317
00318
00319
00320 rsgTopics = self._topology.topics
00321 for topic_name, data_buffer in topic_statistics_buffer.items():
00322
00323
00324 if topic_name not in rsgTopics:
00325 if topic_name not in self._TOPIC_QUIET_LIST:
00326 rospy.logwarn("Received Statistics Information for untracked topic %s" % topic_name)
00327 continue
00328
00329 delivered_msgs = list()
00330 traffic = list()
00331 period_mean = list()
00332 window_start = list()
00333 window_stop = list()
00334 node_sub = list()
00335 for data in data_buffer:
00336 delivered_msgs.append(data.delivered_msgs)
00337 traffic.append(data.traffic)
00338 period_mean.append(data.period_mean)
00339 window_start.append(data.window_start.to_sec())
00340 window_stop.append(data.window_stop.to_sec())
00341 node_sub.append(data.node_sub)
00342 start_time = min(window_start)
00343 stop_time = max(window_stop)
00344 unique_subs = len(set(node_sub))
00345
00346 if stop_time == start_time or unique_subs == 0:
00347 continue
00348
00349 total_msgs_sent = sum(delivered_msgs)
00350 messages_sent = total_msgs_sent / unique_subs
00351 hz = messages_sent / (stop_time - start_time)
00352 rsgTopics[topic_name].hz = hz
00353
00354 bytes_sent = sum(traffic) / unique_subs
00355 bw = bytes_sent / (stop_time - start_time)
00356 rsgTopics[topic_name].bw = bw
00357
00358
00359 self._previous_node_statistics_buffer = copy.copy(self._node_statistics_buffer)
00360 self._previous_host_statistics_buffer = copy.copy(self._host_statistics_buffer)
00361 self._previous_topic_statistics_buffer = copy.copy(self._topic_statistics_buffer)
00362 self._node_statistics_buffer.clear()
00363 self._host_statistics_buffer.clear()
00364 self._topic_statistics_buffer.clear()
00365
00366 self._update_view()
00367
00368 def get_block_item_attributes(self, block_index):
00369 """ Overloads the BaseAdapters stock implementation of this method """
00370 block = self._topology.blocks[block_index]
00371 attrs = BlockItemAttributes()
00372 attrs.bgcolor = None
00373 attrs.border_color = "black"
00374 attrs.border_width = 5
00375 attrs.label = block.vertex.name
00376 attrs.tooltip_text = "Node:\t%s\nCPU:\t%d\nMEM:\t%s\nThreads:\t%d" % (block.vertex.name, block.vertex.cpu_load_mean, sizeof_fmt(block.vertex.virt_mem_mean), block.vertex.num_threads)
00377 attrs.label_color = "black"
00378
00379 attrs.spacerwidth = 30
00380 return attrs
00381
00382 def get_band_item_attributes(self, band_altitude):
00383 """ Overloads the BaseAdapters stock implementation of this method """
00384 band = self._topology.bands[band_altitude]
00385 attrs = BandItemAttributes()
00386 attrs.bgcolor = self._colormapper.get_unique_color(band.edge.name)
00387 attrs.border_color = "red"
00388 attrs.tooltip_text = "Topic:\t%s\nBw:\t%s/sec\nHz:\t%.1f" % (band.edge.name, sizeof_fmt(band.edge.bw), band.edge.hz)
00389 attrs.label = band.edge.name
00390 attrs.label_color = "white"
00391 attrs.width = 15
00392 return attrs
00393
00394 def get_snap_item_attributes(self, snapkey):
00395 """ Default method for providing some stock settings for snaps """
00396 attrs = SnapItemAttributes()
00397 attrs.bgcolor = "darkCyan" if 'c' in snapkey else "green"
00398 attrs.border_color = "darkBlue" if 'c' in snapkey else "darkGreen"
00399 attrs.border_width = 1
00400 attrs.label = self._topology.snaps[snapkey].connection.edge.name
00401 attrs.label_color = "white"
00402 attrs.width = 20
00403 return attrs
00404
00405
00406 def sizeof_fmt(num):
00407
00408 for x in ['bytes', 'KB', 'MB', 'GB', 'TB']:
00409 if num < 1024.0:
00410 return "%3.1f %s" % (num, x)
00411 num /= 1024.0