Go to the documentation of this file.
00001 # Copyright 2014 Open Source Robotics Foundation, Inc.
00002 #
00003 # Licensed under the Apache License, Version 2.0 (the "License");
00004 # you may not use this file except in compliance with the License.
00005 # You may obtain a copy of the License at
00006 #
00007 #
00008 #
00009 # Unless required by applicable law or agreed to in writing, software
00010 # distributed under the License is distributed on an "AS IS" BASIS,
00011 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00012 # See the License for the specific language governing permissions and
00013 # limitations under the License.
00015 from __future__ import print_function
00017 import threading
00018 import copy
00019 import math
00020 import numpy as np
00021 import random
00023 import rospy
00024 from ros_statistics_msgs.msg import HostStatistics
00025 from ros_statistics_msgs.msg import NodeStatistics
00026 # from ros_topology_msgs.msg import Connection
00027 from ros_topology_msgs.msg import Graph
00028 # from ros_topology_msgs.msg import Node
00029 # from ros_topology_msgs.msg import Service
00030 # from ros_topology_msgs.msg import Topic
00031 from rosgraph_msgs.msg import TopicStatistics
00033 # from diarc import topology
00034 from diarc.base_adapter import BaseAdapter
00035 from diarc.view import BlockItemAttributes
00036 from diarc.view import BandItemAttributes
00037 from diarc.view import SnapItemAttributes
00038 import ros_topology as rsg
00041 class ColorMapper(object):
00042     def __init__(self):
00043         self._choices = list()
00044         # Reds
00045         self._choices.extend(["IndianRed", "DarkSalmon", "Crimson"])
00046         # Pinks
00047         self._choices.extend(["HotPink", "DeepPink"])
00048         # Oranges
00049         self._choices.extend(["Coral", "OrangeRed", "DarkOrange"])
00050         # Yellows
00051         self._choices.extend(["Gold", "DarkKhaki"])
00052         # Purples
00053         self._choices.extend(["Thistle", "Orchid", "MediumPurple", "DarkOrchid", "Purple", "Indigo", "DarkSlateBlue"])
00054         # Greens
00055         self._choices.extend(["LawnGreen", "LimeGreen", "MediumSeaGreen", "ForestGreen", "OliveDrab", "Olive", "DarkOliveGreen", "DarkCyan"])
00056         # Blues
00057         self._choices.extend(["PaleTurquoise", "Turquoise", "CadetBlue", "SteelBlue", "DodgerBlue"])
00058         # Browns
00059 #         self._choices.extend(["Cornsilk","Tan","RosyBrown","SandyBrown","Goldenrod","DarkGoldenrod","SaddleBrown"])
00060         self._used_colors = dict()
00062     def get_unique_color(self, name):
00063         if name not in self._used_colors:
00064             if len(self._choices) > 0:
00065                 self._used_colors[name] = random.choice(self._choices)
00066                 self._choices.remove(self._used_colors[name])
00067             else:
00068                 self._used_colors[name] = "Gray"
00069         return self._used_colors[name]
00071     def release_unique_color(self, name):
00072         if name in self._used_colors:
00073             if not self._used_colors[name] == "Gray":
00074                 self._choices.append(self._used_colors[name])
00075             self._used_colors.pop(name)
00076         else:
00077             rospy.logwarn("Unknown name mapped to color!")
00080 class ROSProfileAdapter(BaseAdapter):
00081     """ Implementes the Adapter interface for the View and provides hooks for
00082     populating and implementing the ros specific version of the topology.
00083     Subscribes to /statistics, /node_statistics, /host_statistics, and /topology.
00084     Publishes this combined information as /profile
00085     """
00087     def __init__(self, view):
00088         super(ROSProfileAdapter, self).__init__(rsg.RosSystemGraph(), view)
00089         self._topology.hide_disconnected_snaps = True
00091         self._colormapper = ColorMapper()
00092         # Determines whether or not to update the visualization when new data is received
00093         self._auto_update = True
00095         self._TOPIC_QUIET_LIST = list()
00096         self._NODE_QUIET_LIST = list()
00098         # Data Buffers
00099         # To improve accuracy, we hold onto data in the buffer for two evaluation
00100         # periods (length of statistics timer). So we buffer the buffer...
00101         self._last_topology_received = Graph()
00102         self._node_statistics_buffer = dict()  # name: list(NodeStatistics())
00103         self._host_statistics_buffer = dict()  # hostname: list(HostStatistics())
00104         self._topic_statistics_buffer = dict()  # hostname: list(TopicStatistics()
00105         self._previous_node_statistics_buffer = dict()
00106         self._previous_host_statistics_buffer = dict()
00107         self._previous_topic_statistics_buffer = dict()
00109         # Callbacks
00110         self.node_statistics_subscriber = rospy.Subscriber('/node_statistics', NodeStatistics, self._node_statistics_callback)
00111         self.topic_statistics_subscriber = rospy.Subscriber('/statistics', TopicStatistics, self._topic_statistics_callback)
00112         self.host_statistics_subscriber = rospy.Subscriber('/host_statistics', HostStatistics, self._host_statistics_callback)
00113         self.topology_subscriber = rospy.Subscriber('/topology', Graph, self._topology_callback)
00114         self._lock = threading.Lock()
00116         # Timers
00117         self._stats_timer = rospy.Timer(rospy.Duration(2.0), lambda x: self.statistics_update())
00119     def set_topic_quiet_list(self, topic_names):
00120         rospy.loginfo("Updating topic quiet list to %r" % topic_names)
00121         self._TOPIC_QUIET_LIST = copy.copy(topic_names)
00123     def get_topic_quiet_list(self):
00124         return copy.copy(self._TOPIC_QUIET_LIST)
00126     def set_node_quiet_list(self, node_names):
00127         self._NODE_QUIET_LIST = copy.copy(node_names)
00129     def get_node_quiet_list(self):
00130         return copy.copy(self._NODE_QUIET_LIST)
00132     def enable_auto_update(self):
00133         """ Automatically update the visualization when information is received """
00134         self._auto_update = True
00135         self._stats_timer = rospy.Timer(rospy.Duration(2.0), lambda x: self.statistics_update())
00137     def disable_auto_update(self):
00138         """ buffer information received from ROS, but do not automatically update the visualization """
00139         self._auto_update = False
00140         self._stats_timer.shutdown()
00141         self._stats_timer = None
00143     def show_disconnected_topics(self):
00144         self._topology.hide_disconnected_snaps = False
00145         self.topology_update()
00147     def hide_disconnected_topics(self):
00148         self._topology.hide_disconnected_snaps = True
00149         self.topology_update()
00151     def _node_statistics_callback(self, data):
00152         """ Buffers NodeStatistics data """
00153 #         latency = rospy.get_rostime() - data.window_stop
00154 #         window = data.window_stop - data.window_start
00155 #         margin = window*2 - latency
00156 #         if margin.to_sec() > 0:
00157 #             rospy.logerr("Data from '%s' too old by %f secs"%(data.node,-margin.to_sec()))
00158 #             return
00159         # If we have not collected any data from this node yet, initialize the node's buffer
00160         if data.node not in self._node_statistics_buffer:
00161             self._node_statistics_buffer[data.node] = list()
00162         self._node_statistics_buffer[data.node].append(data)
00164     def _topic_statistics_callback(self, data):
00165         """ Buffers TopicStatistics data """
00166         # Buffer Topic Statistics Data.
00167         if data.topic not in self._topic_statistics_buffer:
00168             self._topic_statistics_buffer[data.topic] = list()
00169         self._topic_statistics_buffer[data.topic].append(data)
00171     def _host_statistics_callback(self, data):
00172         """ Buffers HostStatistics data """
00173         # This information is useful for drawing NodeStatistics information
00174         # in context to nodes running on other machines
00175         if data.hostname not in self._host_statistics_buffer:
00176             self._host_statistics_buffer[data.hostname] = list()
00177         self._host_statistics_buffer[data.hostname].append(data)
00179     def _topology_callback(self, data):
00180         self._last_topology_received = copy.copy(data)
00181         if self._auto_update:
00182             self.topology_update()
00184     def topology_update(self):
00185         """ Updates the model with current topology information """
00186         data = self._last_topology_received
00188         # Remove any topics from Ros System Graph not currently known by the profiling system
00189         rsgTopics = self._topology.topics
00190         allCurrentTopicNames = [ for t in data.topics]
00191         for topic in rsgTopics.values():
00192             if in self._TOPIC_QUIET_LIST:
00193                 print("Removing Topic",, "found in quiet list")
00194                 self._colormapper.release_unique_color(
00195                 topic.release()
00196             elif not in allCurrentTopicNames:
00197                 print("Removing Topic",, "not found in ", allCurrentTopicNames)
00198                 self._colormapper.release_unique_color(
00199                 topic.release()
00201         # Add any topics not currently in the Ros System Graph
00202         for topic in data.topics:
00203             if not in rsgTopics and not in self._TOPIC_QUIET_LIST:
00204                 topic = rsg.Topic(self._topology,, topic.type)
00206         # Get all the nodes we currently know about
00207         rsgNodes = self._topology.nodes
00209         # Remove any nodes from RosSystemGraph not currently known to master
00210         allCurrentNodeNames = [ for n in data.nodes]
00211         for node in rsgNodes.values():
00212             if in self._NODE_QUIET_LIST:
00213                 print("Removing node",, "found on quiet list")
00214                 node.release()
00215             elif not in allCurrentNodeNames:
00216                 print("Removing Node",, "not found in ", allCurrentNodeNames)
00217                 node.release()
00218                 # TODO: Remove any of the nodes publishers or subscribers now
00220         # Add any nodes not currently in the Ros System Graph
00221         for node in data.nodes:
00222             # Skip any nodes that are on the quiet list
00223             if in self._NODE_QUIET_LIST:
00224                 continue
00225             rsg_node = None
00226             if not in rsgNodes:  # and name not in QUIET_NAMES:
00227                 rsg_node = rsg.Node(self._topology,
00228                 rsg_node.location = node.uri
00229             else:
00230                 rsg_node = self._topology.nodes[]
00231                 if not rsg_node.location == node.uri:
00232                     rospy.logerr("rsg_node and data.node uri's do not match for name %s" %
00234             # Filter published and subscribed topics we are ignoring
00235             publishes_list = [p for p in node.publishes if p not in self._TOPIC_QUIET_LIST]
00236             subscribes_list = [s for s in node.subscribes if s not in self._TOPIC_QUIET_LIST]
00238             # Add and remove publishers for this node only
00239             # Compile two dictionaries, one of existing topics and one of the most recently
00240             # reported topics. Remove existing publishers that are not mentioned in the
00241             # current list, add publishers that not in the existing list but in the current list,
00242             # and update publishers that occur in both lists.
00243             existing_rsg_node_pub_topics = dict([(, publisher) for publisher in rsg_node.publishers])
00244             current_node_prof_pub_topics = dict([(topic_name, self._topology.topics[topic_name]) for topic_name in publishes_list])
00245             for existing_topic_name in existing_rsg_node_pub_topics.keys():
00246                 # Remove Publisher
00247                 if existing_topic_name not in current_node_prof_pub_topics.keys():
00248                     existing_rsg_node_pub_topics[existing_topic_name].release()
00249             for current_topic_name in current_node_prof_pub_topics.keys():
00250                 # Add Publisher
00251                 if current_topic_name not in existing_rsg_node_pub_topics.keys():
00252                     publisher = rsg.Publisher(self._topology, rsg_node, current_node_prof_pub_topics[current_topic_name])
00254             # Add and remove subscribers for this node only.
00255             # This follows the same patteren as the publishers above.
00256             existing_rsg_node_sub_topics = dict([(, subscriber) for subscriber in rsg_node.subscribers])
00257             current_node_prof_sub_topics = dict([(topic_name, self._topology.topics[topic_name]) for topic_name in subscribes_list])
00258             for existing_topic_name in existing_rsg_node_sub_topics.keys():
00259                 # Remove Subscriber
00260                 if existing_topic_name not in current_node_prof_sub_topics.keys():
00261                     existing_rsg_node_sub_topics[existing_topic_name].release()
00262             for current_topic_name in current_node_prof_sub_topics.keys():
00263                 # Add Subscriber
00264                 if current_topic_name not in existing_rsg_node_sub_topics.keys():
00265                     subscriber = rsg.Subscriber(self._topology, rsg_node, current_node_prof_sub_topics[current_topic_name])
00267         self._update_view()
00269     def statistics_update(self):
00270         """ Updates the model with current statistics information """
00271         rospy.logdebug("Updating Statistics")
00272         # Combine current buffers with previous buffers for evaluation
00273         node_statistics_buffer = dict(self._node_statistics_buffer.items() + self._previous_node_statistics_buffer.items())
00274         host_statistics_buffer = dict(self._host_statistics_buffer.items() + self._previous_host_statistics_buffer.items())
00275         topic_statistics_buffer = dict(self._topic_statistics_buffer.items() + self._previous_topic_statistics_buffer.items())
00277         # TODO: Requires a lock with the callback and other threads
00278         rsgNodes = self._topology.nodes
00279         for node_name, data_buffer in node_statistics_buffer.items():
00280             # Don't process node statistics that we do not have in our internal topology
00281             # (we don't have a place to store the information).
00282             if node_name not in rsgNodes:
00283                 if node_name not in self._NODE_QUIET_LIST:
00284                     rospy.logwarn("Received Statistics Information for untracked node %s" % node_name)
00285                 continue
00286             # Populate datasets for this node
00287             samples = list()
00288             num_threads = list()
00289             cpu_load_mean = list()
00290             cpu_load_std = list()
00291             cpu_load_max = list()
00292             virt_mem_mean = list()
00293             virt_mem_std = list()
00294             virt_mem_max = list()
00295             # TODO: Real memory
00296             for data in data_buffer:
00297                 samples.append(data.samples)
00298                 num_threads.append(data.threads)
00299                 cpu_load_mean.append(data.cpu_load_mean)
00300                 cpu_load_std.append(data.cpu_load_std)
00301                 cpu_load_max.append(data.cpu_load_max)
00302                 virt_mem_mean.append(data.virt_mem_mean)
00303                 virt_mem_std.append(data.virt_mem_std)
00304                 virt_mem_max.append(data.virt_mem_max)
00305             rsgNodes[node_name].num_threads = max(num_threads)
00306             rsgNodes[node_name].cpu_load_mean = np.mean(np.array(cpu_load_mean))
00307             rsgNodes[node_name].cpu_load_std = math.sqrt(sum(
00308                     [math.pow(sd, 2)/n for sd, n in zip(cpu_load_std, samples)]))
00309             rsgNodes[node_name].cpu_load_max = max(cpu_load_max)
00310             rsgNodes[node_name].virt_mem_mean = np.mean(np.array(virt_mem_mean))
00311             rsgNodes[node_name].virt_mem_std = math.sqrt(sum(
00312                     [math.pow(sd, 2)/n for sd, n in zip(virt_mem_std, samples)]))
00313             rsgNodes[node_name].virt_mem_max = max(virt_mem_max)
00315         # Process Topic Statistics Data
00316         # TODO: we are not currently processing all the topic data found in TopicStatistics() message
00317         # TODO: These are actually piecewise between individual publishers and subscribers.
00318         #       Eventually we want to be able to draw each connections individual contribution to the
00319         #       whole topic, but for now just lump it all together
00320         rsgTopics = self._topology.topics
00321         for topic_name, data_buffer in topic_statistics_buffer.items():
00322             # Don't process topic statistics that we do not have in our internal topology
00323             # (We don't have a place to store the information)
00324             if topic_name not in rsgTopics:
00325                 if topic_name not in self._TOPIC_QUIET_LIST:
00326                     rospy.logwarn("Received Statistics Information for untracked topic %s" % topic_name)
00327                 continue
00328             # populate datasets for this topic
00329             delivered_msgs = list()
00330             traffic = list()
00331             period_mean = list()
00332             window_start = list()
00333             window_stop = list()
00334             node_sub = list()  # we need to know the number of subscribers
00335             for data in data_buffer:
00336                 delivered_msgs.append(data.delivered_msgs)
00337                 traffic.append(data.traffic)
00338                 period_mean.append(data.period_mean)
00339                 window_start.append(data.window_start.to_sec())
00340                 window_stop.append(data.window_stop.to_sec())
00341                 node_sub.append(data.node_sub)
00342             start_time = min(window_start)
00343             stop_time = max(window_stop)
00344             unique_subs = len(set(node_sub))
00345             # avoid divide by 0 errors
00346             if stop_time == start_time or unique_subs == 0:
00347                 continue
00348             # Approximate the hz (per subscriber)
00349             total_msgs_sent = sum(delivered_msgs)
00350             messages_sent = total_msgs_sent / unique_subs
00351             hz = messages_sent / (stop_time - start_time)
00352             rsgTopics[topic_name].hz = hz
00353             # Approximate the bw in bytes per seconds (per subscriber)
00354             bytes_sent = sum(traffic) / unique_subs
00355             bw = bytes_sent / (stop_time - start_time)
00356             rsgTopics[topic_name].bw = bw
00358         # Reset data buffers
00359         self._previous_node_statistics_buffer = copy.copy(self._node_statistics_buffer)
00360         self._previous_host_statistics_buffer = copy.copy(self._host_statistics_buffer)
00361         self._previous_topic_statistics_buffer = copy.copy(self._topic_statistics_buffer)
00362         self._node_statistics_buffer.clear()
00363         self._host_statistics_buffer.clear()
00364         self._topic_statistics_buffer.clear()
00366         self._update_view()
00368     def get_block_item_attributes(self, block_index):
00369         """ Overloads the BaseAdapters stock implementation of this method """
00370         block = self._topology.blocks[block_index]
00371         attrs = BlockItemAttributes()
00372         attrs.bgcolor = None
00373         attrs.border_color = "black"
00374         attrs.border_width = 5
00375         attrs.label =
00376         attrs.tooltip_text = "Node:\t%s\nCPU:\t%d\nMEM:\t%s\nThreads:\t%d" % (, block.vertex.cpu_load_mean, sizeof_fmt(block.vertex.virt_mem_mean), block.vertex.num_threads)
00377         attrs.label_color = "black"
00378 #         attrs.spacerwidth = block.vertex.
00379         attrs.spacerwidth = 30
00380         return attrs
00382     def get_band_item_attributes(self, band_altitude):
00383         """ Overloads the BaseAdapters stock implementation of this method """
00384         band = self._topology.bands[band_altitude]
00385         attrs = BandItemAttributes()
00386         attrs.bgcolor = self._colormapper.get_unique_color(
00387         attrs.border_color = "red"
00388         attrs.tooltip_text = "Topic:\t%s\nBw:\t%s/sec\nHz:\t%.1f" % (, sizeof_fmt(, band.edge.hz)
00389         attrs.label =
00390         attrs.label_color = "white"
00391         attrs.width = 15
00392         return attrs
00394     def get_snap_item_attributes(self, snapkey):
00395         """ Default method for providing some stock settings for snaps """
00396         attrs = SnapItemAttributes()
00397         attrs.bgcolor = "darkCyan" if 'c' in snapkey else "green"
00398         attrs.border_color = "darkBlue" if 'c' in snapkey else "darkGreen"
00399         attrs.border_width = 1
00400         attrs.label = self._topology.snaps[snapkey]
00401         attrs.label_color = "white"
00402         attrs.width = 20
00403         return attrs
00406 def sizeof_fmt(num):
00407     # Taken from
00408     for x in ['bytes', 'KB', 'MB', 'GB', 'TB']:
00409         if num < 1024.0:
00410             return "%3.1f %s" % (num, x)
00411         num /= 1024.0

Author(s): Dan Brooks
autogenerated on Thu Jun 6 2019 20:29:31