Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 from __future__ import print_function
00038
00039 PKG = 'test_rospy'
00040 NAME = 'test_sub_to_multiple_pubs'
00041
00042 import socket
00043 import sys
00044 import time
00045 import unittest
00046 import xmlrpclib
00047
00048 import rosgraph
00049 import rospy
00050 import rostest
00051
00052 TOPIC = '/chatter'
00053 TALKER_NODE = 'talker%d'
00054 NUMBER_OF_TALKERS = 99
00055 LISTENER_NODE = 'listener'
00056
00057
00058 class TestPubSubToMultiplePubs(unittest.TestCase):
00059
00060 def test_subscribe_to_multiple_publishers(self):
00061
00062 time.sleep(1.0)
00063
00064
00065 for i in range(1, NUMBER_OF_TALKERS + 1):
00066 self.assert_(rostest.is_publisher(
00067 rospy.resolve_name(TOPIC),
00068 rospy.resolve_name(TALKER_NODE % i)), 'talker node %d is not up' % i)
00069
00070
00071 self.assert_(rostest.is_subscriber(
00072 rospy.resolve_name(TOPIC),
00073 rospy.resolve_name(LISTENER_NODE)), 'listener node is not up')
00074
00075
00076 connections = 0
00077
00078 master = rosgraph.Master(NAME)
00079 node_api = master.lookupNode(LISTENER_NODE)
00080 if not node_api:
00081 self.assert_(False, 'cannot contact [%s]: unknown node' % LISTENER_NODE)
00082
00083 socket.setdefaulttimeout(5.0)
00084 node = xmlrpclib.ServerProxy(node_api)
00085 code, _, businfo = node.getBusInfo(NAME)
00086 if code != 1:
00087 self.assert_(False, 'cannot get node information')
00088 if businfo:
00089 for info in businfo:
00090 topic = info[4]
00091 if len(info) > 5:
00092 connected = info[5]
00093 else:
00094 connected = True
00095
00096 if connected:
00097 if topic == TOPIC:
00098 connections += 1
00099
00100 self.assert_(connections == NUMBER_OF_TALKERS, 'Found only %d connections instead of %d' % (connections, NUMBER_OF_TALKERS))
00101
00102 if __name__ == '__main__':
00103 rospy.init_node(NAME)
00104 rostest.run(PKG, NAME, TestPubSubToMultiplePubs, sys.argv)