test_sub_to_multiple_pubs.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2013, Open Source Robotics Foundation, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Open Source Robotics Foundation, Inc. nor
18 # the names of its contributors may be used to endorse or promote
19 # products derived from this software without specific prior
20 # written permission.
21 #
22 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
28 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
30 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
31 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
32 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33 # POSSIBILITY OF SUCH DAMAGE.
34 
35 ## Integration test for subscribing to a topic with multiple publishers
36 
37 from __future__ import print_function
38 
39 PKG = 'test_rospy'
40 NAME = 'test_sub_to_multiple_pubs'
41 
42 import socket
43 import sys
44 import time
45 import unittest
46 try:
47  from xmlrpc.client import ServerProxy
48 except ImportError:
49  from xmlrpclib import ServerProxy
50 
51 import rosgraph
52 import rospy
53 import rostest
54 
55 TOPIC = '/chatter'
56 TALKER_NODE = 'talker%d'
57 NUMBER_OF_TALKERS = 99
58 LISTENER_NODE = 'listener'
59 
60 
61 class TestPubSubToMultiplePubs(unittest.TestCase):
62 
64  # wait so that all connections are established
65  time.sleep(1.0)
66 
67  # ensure that publishers are publishing
68  for i in range(1, NUMBER_OF_TALKERS + 1):
69  self.assert_(rostest.is_publisher(
70  rospy.resolve_name(TOPIC),
71  rospy.resolve_name(TALKER_NODE % i)), 'talker node %d is not up' % i)
72 
73  # ensure that subscriber is subscribed
74  self.assert_(rostest.is_subscriber(
75  rospy.resolve_name(TOPIC),
76  rospy.resolve_name(LISTENER_NODE)), 'listener node is not up')
77 
78  # check number of connections from subscriber to the topic
79  connections = 0
80 
81  master = rosgraph.Master(NAME)
82  node_api = master.lookupNode(LISTENER_NODE)
83  if not node_api:
84  self.assert_(False, 'cannot contact [%s]: unknown node' % LISTENER_NODE)
85 
86  socket.setdefaulttimeout(5.0)
87  node = ServerProxy(node_api)
88  code, _, businfo = node.getBusInfo(NAME)
89  if code != 1:
90  self.assert_(False, 'cannot get node information')
91  if businfo:
92  for info in businfo:
93  topic = info[4]
94  if len(info) > 5:
95  connected = info[5]
96  else:
97  connected = True # backwards compatibility
98 
99  if connected:
100  if topic == TOPIC:
101  connections += 1
102 
103  self.assert_(connections == NUMBER_OF_TALKERS, 'Found only %d connections instead of %d' % (connections, NUMBER_OF_TALKERS))
104 
105 if __name__ == '__main__':
106  rospy.init_node(NAME)
107  rostest.run(PKG, NAME, TestPubSubToMultiplePubs, sys.argv)


test_rospy
Author(s): Ken Conley
autogenerated on Sun Feb 3 2019 03:30:22