00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 import sys
00016 import unittest
00017 import time
00018 import rospy
00019 import rostest
00020
00021 from ros_topology_msgs.msg import *
00022
00023 PKG = 'rosprofiler'
00024 NAME = 'test_grapher'
00025
00026
00027
00028 EXPECTED_NODES = dict()
00029 talker1 = Node(name="/talker1")
00030 talker1.publishes.append("/chatter")
00031 talker1.publishes.append("/rosout")
00032 talker1.connections.append(Connection(destination='/rosout',topic='/rosout',direction=2,transport="TCPROS"))
00033 talker1.connections.append(Connection(destination='/listener1',topic='/chatter',direction=2,transport="TCPROS"))
00034 talker1.connections.append(Connection(destination='/listener2',topic='/chatter',direction=2,transport="TCPROS"))
00035 talker2 = Node(name="/talker2")
00036 talker2.publishes.append("/chatter")
00037 talker2.publishes.append("/rosout")
00038 talker2.connections.append(Connection(destination='/rosout',topic='/rosout',direction=2,transport="TCPROS"))
00039 talker2.connections.append(Connection(destination='/listener1',topic='/chatter',direction=2,transport="TCPROS"))
00040 talker2.connections.append(Connection(destination='/listener2',topic='/chatter',direction=2,transport="TCPROS"))
00041 listener1 = Node(name="/listener1")
00042 listener1.publishes.append("/rosout")
00043 listener1.subscribes.append("/chatter")
00044 listener1.connections.append(Connection(destination='/rosout',topic='/rosout',direction=2,transport="TCPROS"))
00045 listener1.connections.append(Connection(destination='/talker1',topic='/chatter',direction=1,transport="TCPROS"))
00046 listener1.connections.append(Connection(destination='/talker2',topic='/chatter',direction=1,transport="TCPROS"))
00047 listener2 = Node(name="/listener2")
00048 listener2.publishes.append("/rosout")
00049 listener2.subscribes.append("/chatter")
00050 listener2.connections.append(Connection(destination='/rosout',topic='/rosout',direction=2,transport="TCPROS"))
00051 listener2.connections.append(Connection(destination='/talker1',topic='/chatter',direction=1,transport="TCPROS"))
00052 listener2.connections.append(Connection(destination='/talker2',topic='/chatter',direction=1,transport="TCPROS"))
00053 rosout = Node(name="/rosout")
00054 rosout.publishes.append("/rosout_agg")
00055 rosout.subscribes.append("/rosout")
00056 rosout.connections.append(Connection(destination='/talker1',topic='/rosout',direction=1,transport="TCPROS"))
00057 rosout.connections.append(Connection(destination='/talker2',topic='/rosout',direction=1,transport="TCPROS"))
00058 rosout.connections.append(Connection(destination='/listener1',topic='/rosout',direction=1,transport="TCPROS"))
00059 rosout.connections.append(Connection(destination='/listener2',topic='/rosout',direction=1,transport="TCPROS"))
00060 rosout.connections.append(Connection(destination='/test_grapher',topic='/rosout',direction=1,transport="TCPROS"))
00061 rosout.connections.append(Connection(destination='/rosgrapher',topic='/rosout',direction=1,transport="TCPROS"))
00062 grapher = Node(name="/rosgrapher")
00063 grapher.publishes.append("/rosout")
00064 grapher.publishes.append("/topology")
00065 grapher.connections.append(Connection(destination='/rosout',topic='/rosout',direction=2,transport="TCPROS"))
00066 grapher.connections.append(Connection(destination='/'+NAME,topic='/topology',direction=2,transport="TCPROS"))
00067 tester = Node(name="/test_grapher")
00068 tester.publishes.append("/rosout")
00069 tester.subscribes.append("/topology")
00070 tester.connections.append(Connection(destination='/rosout',topic='/rosout',direction=2,transport="TCPROS"))
00071 tester.connections.append(Connection(destination='/rosgrapher',topic='/topology',direction=1,transport="TCPROS"))
00072 EXPECTED_NODES['/talker1'] = talker1
00073 EXPECTED_NODES['/talker2'] = talker2
00074 EXPECTED_NODES['/listener1'] = listener1
00075 EXPECTED_NODES['/listener2'] = listener2
00076 EXPECTED_NODES['/rosout'] = rosout
00077 EXPECTED_NODES['/rosgrapher'] = grapher
00078 EXPECTED_NODES['/'+NAME] = tester
00079
00080 t_chatter = Topic(name="/chatter", type="std_msgs/String")
00081 t_rosout = Topic(name="/rosout", type="rosgraph_msgs/Log")
00082 t_rosout_agg = Topic(name="/rosout_agg", type="rosgraph_msgs/Log")
00083 t_topology = Topic(name="/topology", type="ros_topology_msgs/Graph")
00084 EXPECTED_TOPICS = [t_chatter, t_rosout, t_rosout_agg, t_topology]
00085
00086 class TestGrapher(unittest.TestCase):
00087 def __init__(self, *args):
00088 super(TestGrapher, self).__init__(*args)
00089
00090 self.start_time = None
00091 self.graph = Graph()
00092
00093 def setUp(self):
00094 rospy.init_node(NAME)
00095 rospy.Subscriber('/topology', Graph, self.callback)
00096 self.wait_for_data(10.0)
00097
00098 def callback(self, data):
00099 self.graph = data
00100
00101 def wait_for_data(self, duration):
00102 """ Waits to receive statistics data """
00103 start_time = rospy.get_rostime()
00104 while not rospy.is_shutdown() and not (rospy.get_rostime() > (start_time + rospy.Duration(duration))):
00105 if len(self.graph.nodes) >= len(EXPECTED_NODES) and len(self.graph.topics) >= len(EXPECTED_TOPICS):
00106 return
00107 rospy.sleep(1.0)
00108
00109 def test_nodes_publishers(self):
00110 for node in self.graph.nodes:
00111 assert node.name in EXPECTED_NODES, "%s not found!"%node.name
00112 testnode = EXPECTED_NODES[node.name]
00113 assert set(node.publishes) == set(testnode.publishes), "%s.publishes=%s, but should be %s"%(node.name,node.publishes,testnode.publishes)
00114
00115 def test_nodes_subscribers(self):
00116 for node in self.graph.nodes:
00117 assert node.name in EXPECTED_NODES, "%s not found!"%node.name
00118 testnode = EXPECTED_NODES[node.name]
00119 assert set(node.subscribes) == set(testnode.subscribes), "%s.subscribes=%s, but should be %s"%(node.name,node.subscribes,testnode.subscribes)
00120
00121 def test_nodes_connections_present(self):
00122 for node in self.graph.nodes:
00123 assert node.name in EXPECTED_NODES, "%s not found!"%node.name
00124 testnode = EXPECTED_NODES[node.name]
00125 for connection in node.connections:
00126 assert connection in testnode.connections, "Node %s has extra connection %s"%(node.name, connection)
00127
00128 def test_nodes_connections_missing(self):
00129 for node in self.graph.nodes:
00130 assert node.name in EXPECTED_NODES, "%s not found!"%node.name
00131 testnode = EXPECTED_NODES[node.name]
00132 for connection in testnode.connections:
00133 assert connection in node.connections, "Node %s expected to find missing connection %s"%(node.name, connection)
00134
00135 def test_nodes_present(self):
00136 for node in self.graph.nodes:
00137 assert node.name in EXPECTED_NODES.keys(), "Found extra node '%s'"%node.name
00138
00139 def test_nodes_missing(self):
00140 for node_name in EXPECTED_NODES.keys():
00141 assert node_name in [n.name for n in self.graph.nodes], "Expected to find missing node '%s'"%node_name
00142
00143 def test_topics_present(self):
00144 for topic in self.graph.topics:
00145 assert topic in EXPECTED_TOPICS, "Found extra topic '%s'"%topic
00146
00147 def test_topics_missing(self):
00148 for topic in EXPECTED_TOPICS:
00149 assert topic in self.graph.topics, "Expected to find missing topic '%s'"%topic
00150
00151
00152 if __name__ == '__main__':
00153 rostest.rosrun(PKG, NAME, TestGrapher, sys.argv)
00154