Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 import sys
00016 import unittest
00017 import time
00018 import rospy
00019 import rostest
00020
00021 from ros_statistics_msgs.msg import NodeStatistics
00022 from std_msgs.msg import String
00023
00024
00025
00026
00027
00028 PKG = 'rosprofiler'
00029 NAME = 'test_profiler'
00030
00031
00032 EXPECTED_NODES = ['/talker', '/listener', '/rosprofiler_test', '/rosout', '/test_profiler']
00033
00034 class TestProfiler(unittest.TestCase):
00035 def __init__(self, *args):
00036 super(TestProfiler, self).__init__(*args)
00037
00038 self.start_time = None
00039
00040 self.nodes = dict()
00041
00042 self.chatter_msgs = 0
00043 self.node_statistics_msgs = 0
00044
00045 def setUp(self):
00046 rospy.init_node(NAME, anonymous=True)
00047 rospy.Subscriber('/chatter', String, self.chatter_callback)
00048 rospy.Subscriber('/node_statistics', NodeStatistics, self.node_statistics_callback)
00049 rospy.sleep(1.0)
00050 self.start_time = rospy.get_rostime()
00051
00052 def node_statistics_callback(self, data):
00053 self.node_statistics_msgs += 1
00054 self.nodes[data.node] = data
00055
00056 def chatter_callback(self, data):
00057 self.chatter_msgs += 1
00058
00059 def timeout(self, duration):
00060 """ Returns True if the time since setup is greater then the duration, else False.
00061 duration - time in seconds
00062 """
00063 return rospy.get_rostime() > (self.start_time + rospy.Duration(duration))
00064
00065 def wait_for_data(self):
00066 """ Waits to receive statistics data """
00067 while not rospy.is_shutdown() and not self.timeout(10.0):
00068 if len(self.nodes.keys()) >= len(EXPECTED_NODES):
00069 break
00070 rospy.sleep(1.0)
00071 return
00072
00073 def test_nodes_reported(self):
00074 """ Tests to make sure that we received statistics about the correct processes """
00075 self.wait_for_data()
00076 rospy.loginfo("Received %d messages on /node_statistics"%self.node_statistics_msgs)
00077 assert set(EXPECTED_NODES) == set(self.nodes.keys()), "Expected:%s Received:%s"%(EXPECTED_NODES,self.nodes.keys())
00078
00079 def test_statistics_pids(self):
00080 """ Make sure each node has a unique PID """
00081 self.wait_for_data()
00082 pids = [node.pid for node in self.nodes.values()]
00083 assert len(pids) > 0, "No PIDS to evaluate"
00084 assert len(pids) == len(set(pids)), "Non-unique PID(s) in list %s"%pids
00085
00086
00087 def test_chatter(self):
00088 """ Test that chatter is talking """
00089
00090 received_msgs = self.chatter_msgs
00091 rospy.loginfo("Received %d messages on /chatter"%received_msgs)
00092 assert(received_msgs > 0)
00093
00094 if __name__ == '__main__':
00095 rostest.rosrun(PKG, NAME, TestProfiler, sys.argv)
00096