Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 from __future__ import print_function
00038
00039 PKG = 'test_rospy'
00040 NAME = 'test_sub_to_multiple_pubs'
00041
00042 import socket
00043 import sys
00044 import time
00045 import unittest
00046 try:
00047 from xmlrpc.client import ServerProxy
00048 except ImportError:
00049 from xmlrpclib import ServerProxy
00050
00051 import rosgraph
00052 import rospy
00053 import rostest
00054
00055 TOPIC = '/chatter'
00056 TALKER_NODE = 'talker%d'
00057 NUMBER_OF_TALKERS = 99
00058 LISTENER_NODE = 'listener'
00059
00060
00061 class TestPubSubToMultiplePubs(unittest.TestCase):
00062
00063 def test_subscribe_to_multiple_publishers(self):
00064
00065 time.sleep(1.0)
00066
00067
00068 for i in range(1, NUMBER_OF_TALKERS + 1):
00069 self.assert_(rostest.is_publisher(
00070 rospy.resolve_name(TOPIC),
00071 rospy.resolve_name(TALKER_NODE % i)), 'talker node %d is not up' % i)
00072
00073
00074 self.assert_(rostest.is_subscriber(
00075 rospy.resolve_name(TOPIC),
00076 rospy.resolve_name(LISTENER_NODE)), 'listener node is not up')
00077
00078
00079 connections = 0
00080
00081 master = rosgraph.Master(NAME)
00082 node_api = master.lookupNode(LISTENER_NODE)
00083 if not node_api:
00084 self.assert_(False, 'cannot contact [%s]: unknown node' % LISTENER_NODE)
00085
00086 socket.setdefaulttimeout(5.0)
00087 node = ServerProxy(node_api)
00088 code, _, businfo = node.getBusInfo(NAME)
00089 if code != 1:
00090 self.assert_(False, 'cannot get node information')
00091 if businfo:
00092 for info in businfo:
00093 topic = info[4]
00094 if len(info) > 5:
00095 connected = info[5]
00096 else:
00097 connected = True
00098
00099 if connected:
00100 if topic == TOPIC:
00101 connections += 1
00102
00103 self.assert_(connections == NUMBER_OF_TALKERS, 'Found only %d connections instead of %d' % (connections, NUMBER_OF_TALKERS))
00104
00105 if __name__ == '__main__':
00106 rospy.init_node(NAME)
00107 rostest.run(PKG, NAME, TestPubSubToMultiplePubs, sys.argv)