test_grapher.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Copyright 2014 Open Source Robotics Foundation, Inc.
00003 #
00004 # Licensed under the Apache License, Version 2.0 (the "License");
00005 # you may not use this file except in compliance with the License.
00006 # You may obtain a copy of the License at
00007 #
00008 #     http://www.apache.org/licenses/LICENSE-2.0
00009 #
00010 # Unless required by applicable law or agreed to in writing, software
00011 # distributed under the License is distributed on an "AS IS" BASIS,
00012 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013 # See the License for the specific language governing permissions and
00014 # limitations under the License.
00015 import sys
00016 import unittest
00017 import time
00018 import rospy
00019 import rostest
00020 
00021 from ros_topology_msgs.msg import *
00022 
00023 PKG = 'rosprofiler'
00024 NAME = 'test_grapher'
00025 
00026 # TODO: Check services
00027 
00028 EXPECTED_NODES = dict()
00029 talker1 = Node(name="/talker1")
00030 talker1.publishes.append("/chatter")
00031 talker1.publishes.append("/rosout")
00032 talker1.connections.append(Connection(destination='/rosout',topic='/rosout',direction=2,transport="TCPROS"))
00033 talker1.connections.append(Connection(destination='/listener1',topic='/chatter',direction=2,transport="TCPROS"))
00034 talker1.connections.append(Connection(destination='/listener2',topic='/chatter',direction=2,transport="TCPROS"))
00035 talker2 = Node(name="/talker2")
00036 talker2.publishes.append("/chatter")
00037 talker2.publishes.append("/rosout")
00038 talker2.connections.append(Connection(destination='/rosout',topic='/rosout',direction=2,transport="TCPROS"))
00039 talker2.connections.append(Connection(destination='/listener1',topic='/chatter',direction=2,transport="TCPROS"))
00040 talker2.connections.append(Connection(destination='/listener2',topic='/chatter',direction=2,transport="TCPROS"))
00041 listener1 = Node(name="/listener1")
00042 listener1.publishes.append("/rosout")
00043 listener1.subscribes.append("/chatter")
00044 listener1.connections.append(Connection(destination='/rosout',topic='/rosout',direction=2,transport="TCPROS"))
00045 listener1.connections.append(Connection(destination='/talker1',topic='/chatter',direction=1,transport="TCPROS"))
00046 listener1.connections.append(Connection(destination='/talker2',topic='/chatter',direction=1,transport="TCPROS"))
00047 listener2 = Node(name="/listener2")
00048 listener2.publishes.append("/rosout")
00049 listener2.subscribes.append("/chatter")
00050 listener2.connections.append(Connection(destination='/rosout',topic='/rosout',direction=2,transport="TCPROS"))
00051 listener2.connections.append(Connection(destination='/talker1',topic='/chatter',direction=1,transport="TCPROS"))
00052 listener2.connections.append(Connection(destination='/talker2',topic='/chatter',direction=1,transport="TCPROS"))
00053 rosout = Node(name="/rosout")
00054 rosout.publishes.append("/rosout_agg")
00055 rosout.subscribes.append("/rosout")
00056 rosout.connections.append(Connection(destination='/talker1',topic='/rosout',direction=1,transport="TCPROS"))
00057 rosout.connections.append(Connection(destination='/talker2',topic='/rosout',direction=1,transport="TCPROS"))
00058 rosout.connections.append(Connection(destination='/listener1',topic='/rosout',direction=1,transport="TCPROS"))
00059 rosout.connections.append(Connection(destination='/listener2',topic='/rosout',direction=1,transport="TCPROS"))
00060 rosout.connections.append(Connection(destination='/test_grapher',topic='/rosout',direction=1,transport="TCPROS"))
00061 rosout.connections.append(Connection(destination='/rosgrapher',topic='/rosout',direction=1,transport="TCPROS"))
00062 grapher = Node(name="/rosgrapher")
00063 grapher.publishes.append("/rosout")
00064 grapher.publishes.append("/topology")
00065 grapher.connections.append(Connection(destination='/rosout',topic='/rosout',direction=2,transport="TCPROS"))
00066 grapher.connections.append(Connection(destination='/'+NAME,topic='/topology',direction=2,transport="TCPROS"))
00067 tester = Node(name="/test_grapher")
00068 tester.publishes.append("/rosout")
00069 tester.subscribes.append("/topology")
00070 tester.connections.append(Connection(destination='/rosout',topic='/rosout',direction=2,transport="TCPROS"))
00071 tester.connections.append(Connection(destination='/rosgrapher',topic='/topology',direction=1,transport="TCPROS"))
00072 EXPECTED_NODES['/talker1'] = talker1
00073 EXPECTED_NODES['/talker2'] = talker2
00074 EXPECTED_NODES['/listener1'] = listener1
00075 EXPECTED_NODES['/listener2'] = listener2
00076 EXPECTED_NODES['/rosout'] = rosout
00077 EXPECTED_NODES['/rosgrapher'] = grapher
00078 EXPECTED_NODES['/'+NAME] = tester
00079 
00080 t_chatter = Topic(name="/chatter", type="std_msgs/String")
00081 t_rosout = Topic(name="/rosout", type="rosgraph_msgs/Log")
00082 t_rosout_agg = Topic(name="/rosout_agg", type="rosgraph_msgs/Log")
00083 t_topology = Topic(name="/topology", type="ros_topology_msgs/Graph")
00084 EXPECTED_TOPICS = [t_chatter, t_rosout, t_rosout_agg, t_topology]
00085 
00086 class TestGrapher(unittest.TestCase):
00087     def __init__(self, *args):
00088         super(TestGrapher, self).__init__(*args)
00089         # Start time - for calculating timeout
00090         self.start_time = None
00091         self.graph = Graph()
00092 
00093     def setUp(self):
00094         rospy.init_node(NAME)
00095         rospy.Subscriber('/topology', Graph, self.callback)   
00096         self.wait_for_data(10.0)
00097 
00098     def callback(self, data):
00099         self.graph = data
00100        
00101     def wait_for_data(self, duration):
00102         """ Waits to receive statistics data """
00103         start_time = rospy.get_rostime()
00104         while not rospy.is_shutdown() and not (rospy.get_rostime() > (start_time + rospy.Duration(duration))):
00105             if len(self.graph.nodes) >= len(EXPECTED_NODES) and len(self.graph.topics) >= len(EXPECTED_TOPICS):
00106                 return
00107             rospy.sleep(1.0)
00108 
00109     def test_nodes_publishers(self):
00110         for node in self.graph.nodes:
00111             assert node.name in EXPECTED_NODES, "%s not found!"%node.name
00112             testnode = EXPECTED_NODES[node.name]
00113             assert set(node.publishes) == set(testnode.publishes), "%s.publishes=%s, but should be %s"%(node.name,node.publishes,testnode.publishes)
00114 
00115     def test_nodes_subscribers(self):
00116         for node in self.graph.nodes:
00117             assert node.name in EXPECTED_NODES, "%s not found!"%node.name
00118             testnode = EXPECTED_NODES[node.name]
00119             assert set(node.subscribes) == set(testnode.subscribes), "%s.subscribes=%s, but should be %s"%(node.name,node.subscribes,testnode.subscribes)
00120 
00121     def test_nodes_connections_present(self):
00122         for node in self.graph.nodes:
00123             assert node.name in EXPECTED_NODES, "%s not found!"%node.name
00124             testnode = EXPECTED_NODES[node.name]
00125             for connection in node.connections:
00126                 assert connection in testnode.connections, "Node %s has extra connection %s"%(node.name, connection)
00127 
00128     def test_nodes_connections_missing(self):
00129         for node in self.graph.nodes:
00130             assert node.name in EXPECTED_NODES, "%s not found!"%node.name
00131             testnode = EXPECTED_NODES[node.name]
00132             for connection in testnode.connections:
00133                 assert connection in node.connections, "Node %s expected to find missing connection %s"%(node.name, connection)
00134 
00135     def test_nodes_present(self):
00136         for node in self.graph.nodes:
00137             assert node.name in EXPECTED_NODES.keys(), "Found extra node '%s'"%node.name
00138 
00139     def test_nodes_missing(self):
00140         for node_name in EXPECTED_NODES.keys():
00141             assert node_name in [n.name for n in self.graph.nodes], "Expected to find missing node '%s'"%node_name
00142 
00143     def test_topics_present(self):
00144         for topic in self.graph.topics:
00145             assert topic in EXPECTED_TOPICS, "Found extra topic '%s'"%topic
00146 
00147     def test_topics_missing(self):
00148         for topic in EXPECTED_TOPICS:
00149             assert topic in self.graph.topics, "Expected to find missing topic '%s'"%topic
00150  
00151 
00152 if __name__ == '__main__':
00153     rostest.rosrun(PKG, NAME, TestGrapher, sys.argv)
00154 


rosprofiler
Author(s): Dan Brooks
autogenerated on Thu Jun 6 2019 18:15:05