test_sub_to_multiple_pubs.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2013, Open Source Robotics Foundation, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Open Source Robotics Foundation, Inc. nor
00018 #    the names of its contributors may be used to endorse or promote
00019 #    products derived from this software without specific prior
00020 #    written permission.
00021 #
00022 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00023 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00024 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00025 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00026 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00027 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00028 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00029 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00030 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00031 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00032 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00033 # POSSIBILITY OF SUCH DAMAGE.
00034 
00035 ## Integration test for subscribing to a topic with multiple publishers
00036 
00037 from __future__ import print_function
00038 
00039 PKG = 'test_rospy'
00040 NAME = 'test_sub_to_multiple_pubs'
00041 
00042 import socket
00043 import sys
00044 import time
00045 import unittest
00046 try:
00047     from xmlrpc.client import ServerProxy
00048 except ImportError:
00049     from xmlrpclib import ServerProxy
00050 
00051 import rosgraph
00052 import rospy
00053 import rostest
00054 
00055 TOPIC = '/chatter'
00056 TALKER_NODE = 'talker%d'
00057 NUMBER_OF_TALKERS = 99
00058 LISTENER_NODE = 'listener'
00059 
00060 
00061 class TestPubSubToMultiplePubs(unittest.TestCase):
00062 
00063     def test_subscribe_to_multiple_publishers(self):
00064         # wait so that all connections are established
00065         time.sleep(1.0)
00066 
00067         # ensure that publishers are publishing
00068         for i in range(1, NUMBER_OF_TALKERS + 1):
00069             self.assert_(rostest.is_publisher(
00070                 rospy.resolve_name(TOPIC),
00071                 rospy.resolve_name(TALKER_NODE % i)), 'talker node %d is not up' % i)
00072 
00073         # ensure that subscriber is subscribed
00074         self.assert_(rostest.is_subscriber(
00075             rospy.resolve_name(TOPIC),
00076             rospy.resolve_name(LISTENER_NODE)), 'listener node is not up')
00077 
00078         # check number of connections from subscriber to the topic
00079         connections = 0
00080 
00081         master = rosgraph.Master(NAME)
00082         node_api = master.lookupNode(LISTENER_NODE)
00083         if not node_api:
00084             self.assert_(False, 'cannot contact [%s]: unknown node' % LISTENER_NODE)
00085 
00086         socket.setdefaulttimeout(5.0)
00087         node = ServerProxy(node_api)
00088         code, _, businfo = node.getBusInfo(NAME)
00089         if code != 1:
00090             self.assert_(False, 'cannot get node information')
00091         if businfo:
00092             for info in businfo:
00093                 topic = info[4]
00094                 if len(info) > 5:
00095                     connected = info[5]
00096                 else:
00097                     connected = True  # backwards compatibility
00098 
00099                 if connected:
00100                     if topic == TOPIC:
00101                         connections += 1
00102 
00103         self.assert_(connections == NUMBER_OF_TALKERS, 'Found only %d connections instead of %d' % (connections, NUMBER_OF_TALKERS))
00104 
00105 if __name__ == '__main__':
00106     rospy.init_node(NAME)
00107     rostest.run(PKG, NAME, TestPubSubToMultiplePubs, sys.argv)


test_rospy
Author(s): Ken Conley
autogenerated on Thu Jun 6 2019 21:10:57