test_profiler.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Copyright 2014 Open Source Robotics Foundation, Inc.
00003 #
00004 # Licensed under the Apache License, Version 2.0 (the "License");
00005 # you may not use this file except in compliance with the License.
00006 # You may obtain a copy of the License at
00007 #
00008 #     http://www.apache.org/licenses/LICENSE-2.0
00009 #
00010 # Unless required by applicable law or agreed to in writing, software
00011 # distributed under the License is distributed on an "AS IS" BASIS,
00012 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013 # See the License for the specific language governing permissions and
00014 # limitations under the License.
00015 import sys
00016 import unittest
00017 import time
00018 import rospy
00019 import rostest
00020 
00021 from ros_statistics_msgs.msg import NodeStatistics
00022 from std_msgs.msg import String
00023 
00024 # TODO:
00025 # Test adding a node - see number of unique nodesstat msgs increase, check names
00026 # Test node going away - see number of nuique nodestat msgs decrease, check names
00027 
00028 PKG = 'rosprofiler'
00029 NAME = 'test_profiler'
00030 
00031 # These are the nodes we expect to receive information about
00032 EXPECTED_NODES = ['/talker', '/listener', '/rosprofiler_test', '/rosout', '/test_profiler']
00033 
00034 class TestProfiler(unittest.TestCase):
00035     def __init__(self, *args):
00036         super(TestProfiler, self).__init__(*args)
00037         # Start time - for calculating timeout
00038         self.start_time = None
00039         # Node statistics
00040         self.nodes = dict()
00041         # Message Counters
00042         self.chatter_msgs = 0
00043         self.node_statistics_msgs = 0
00044 
00045     def setUp(self):
00046         rospy.init_node(NAME, anonymous=True)
00047         rospy.Subscriber('/chatter', String, self.chatter_callback)   
00048         rospy.Subscriber('/node_statistics', NodeStatistics, self.node_statistics_callback)   
00049         rospy.sleep(1.0)
00050         self.start_time = rospy.get_rostime()
00051 
00052     def node_statistics_callback(self, data):
00053         self.node_statistics_msgs += 1
00054         self.nodes[data.node] = data
00055 
00056     def chatter_callback(self, data):
00057         self.chatter_msgs += 1
00058 
00059     def timeout(self, duration):
00060         """ Returns True if the time since setup is greater then the duration, else False.
00061         duration - time in seconds
00062         """
00063         return rospy.get_rostime() > (self.start_time + rospy.Duration(duration))
00064 
00065     def wait_for_data(self):
00066         """ Waits to receive statistics data """
00067         while not rospy.is_shutdown() and not self.timeout(10.0):
00068             if len(self.nodes.keys()) >= len(EXPECTED_NODES):
00069                 break
00070             rospy.sleep(1.0)
00071         return
00072 
00073     def test_nodes_reported(self):
00074         """ Tests to make sure that we received statistics about the correct processes """
00075         self.wait_for_data()
00076         rospy.loginfo("Received %d messages on /node_statistics"%self.node_statistics_msgs)
00077         assert set(EXPECTED_NODES) == set(self.nodes.keys()), "Expected:%s Received:%s"%(EXPECTED_NODES,self.nodes.keys())
00078 
00079     def test_statistics_pids(self):
00080         """ Make sure each node has a unique PID """
00081         self.wait_for_data()
00082         pids = [node.pid for node in self.nodes.values()]
00083         assert len(pids) > 0, "No PIDS to evaluate"
00084         assert len(pids) == len(set(pids)), "Non-unique PID(s) in list %s"%pids
00085 
00086 
00087     def test_chatter(self):
00088         """ Test that chatter is talking """
00089         # Reset the msg counter and then wait for 1 second. You should have received something.
00090         received_msgs = self.chatter_msgs
00091         rospy.loginfo("Received %d messages on /chatter"%received_msgs)
00092         assert(received_msgs > 0)
00093 
00094 if __name__ == '__main__':
00095     rostest.rosrun(PKG, NAME, TestProfiler, sys.argv)
00096 


rosprofiler
Author(s): Dan Brooks
autogenerated on Thu Jun 6 2019 18:15:05