37 from __future__
import print_function
40 NAME =
'test_sub_to_multiple_pubs' 47 from xmlrpc.client
import ServerProxy
49 from xmlrpclib
import ServerProxy
56 TALKER_NODE =
'talker%d' 57 NUMBER_OF_TALKERS = 99
58 LISTENER_NODE =
'listener' 68 for i
in range(1, NUMBER_OF_TALKERS + 1):
69 self.assert_(rostest.is_publisher(
70 rospy.resolve_name(TOPIC),
71 rospy.resolve_name(TALKER_NODE % i)),
'talker node %d is not up' % i)
74 self.assert_(rostest.is_subscriber(
75 rospy.resolve_name(TOPIC),
76 rospy.resolve_name(LISTENER_NODE)),
'listener node is not up')
81 master = rosgraph.Master(NAME)
82 node_api = master.lookupNode(LISTENER_NODE)
84 self.assert_(
False,
'cannot contact [%s]: unknown node' % LISTENER_NODE)
86 socket.setdefaulttimeout(5.0)
87 node = ServerProxy(node_api)
88 code, _, businfo = node.getBusInfo(NAME)
90 self.assert_(
False,
'cannot get node information')
103 self.assert_(connections == NUMBER_OF_TALKERS,
'Found only %d connections instead of %d' % (connections, NUMBER_OF_TALKERS))
105 if __name__ ==
'__main__':
106 rospy.init_node(NAME)
107 rostest.run(PKG, NAME, TestPubSubToMultiplePubs, sys.argv)
def test_subscribe_to_multiple_publishers(self)