test_sub_to_multiple_pubs.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2013, Open Source Robotics Foundation, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Open Source Robotics Foundation, Inc. nor
00018 #    the names of its contributors may be used to endorse or promote
00019 #    products derived from this software without specific prior
00020 #    written permission.
00021 #
00022 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00023 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00024 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00025 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00026 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00027 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00028 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00029 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00030 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00031 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00032 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00033 # POSSIBILITY OF SUCH DAMAGE.
00034 
00035 ## Integration test for subscribing to a topic with multiple publishers
00036 
00037 from __future__ import print_function
00038 
00039 PKG = 'test_rospy'
00040 NAME = 'test_sub_to_multiple_pubs'
00041 
00042 import socket
00043 import sys
00044 import time
00045 import unittest
00046 import xmlrpclib
00047 
00048 import rosgraph
00049 import rospy
00050 import rostest
00051 
00052 TOPIC = '/chatter'
00053 TALKER_NODE = 'talker%d'
00054 NUMBER_OF_TALKERS = 99
00055 LISTENER_NODE = 'listener'
00056 
00057 
00058 class TestPubSubToMultiplePubs(unittest.TestCase):
00059 
00060     def test_subscribe_to_multiple_publishers(self):
00061         # wait so that all connections are established
00062         time.sleep(1.0)
00063 
00064         # ensure that publishers are publishing
00065         for i in range(1, NUMBER_OF_TALKERS + 1):
00066             self.assert_(rostest.is_publisher(
00067                 rospy.resolve_name(TOPIC),
00068                 rospy.resolve_name(TALKER_NODE % i)), 'talker node %d is not up' % i)
00069 
00070         # ensure that subscriber is subscribed
00071         self.assert_(rostest.is_subscriber(
00072             rospy.resolve_name(TOPIC),
00073             rospy.resolve_name(LISTENER_NODE)), 'listener node is not up')
00074 
00075         # check number of connections from subscriber to the topic
00076         connections = 0
00077 
00078         master = rosgraph.Master(NAME)
00079         node_api = master.lookupNode(LISTENER_NODE)
00080         if not node_api:
00081             self.assert_(False, 'cannot contact [%s]: unknown node' % LISTENER_NODE)
00082 
00083         socket.setdefaulttimeout(5.0)
00084         node = xmlrpclib.ServerProxy(node_api)
00085         code, _, businfo = node.getBusInfo(NAME)
00086         if code != 1:
00087             self.assert_(False, 'cannot get node information')
00088         if businfo:
00089             for info in businfo:
00090                 topic = info[4]
00091                 if len(info) > 5:
00092                     connected = info[5]
00093                 else:
00094                     connected = True  # backwards compatibility
00095 
00096                 if connected:
00097                     if topic == TOPIC:
00098                         connections += 1
00099 
00100         self.assert_(connections == NUMBER_OF_TALKERS, 'Found only %d connections instead of %d' % (connections, NUMBER_OF_TALKERS))
00101 
00102 if __name__ == '__main__':
00103     rospy.init_node(NAME)
00104     rostest.run(PKG, NAME, TestPubSubToMultiplePubs, sys.argv)


test_rospy
Author(s): Ken Conley
autogenerated on Fri Aug 28 2015 12:33:56