test_lazy_transport.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import os
4 import sys
5 
6 import unittest
7 
8 import rosgraph
9 import rospy
10 import rosmsg
11 import roslib
12 
13 
14 PKG = 'topic_tools'
15 NAME = 'test_lazy_transport'
16 
17 
18 class TestLazyTransport(unittest.TestCase):
19 
20  def __init__(self, *args):
21  super(self.__class__, self).__init__(*args)
22  rospy.init_node(NAME)
23 
25  check_connected_topics = rospy.get_param('~check_connected_topics')
26  master = rosgraph.Master('/test_connection')
27  _, sub, _ = master.getSystemState()
28  # Check assumed topics are not there
29  master = rosgraph.Master('test_connection')
30  _, subscriptions, _ = master.getSystemState()
31  for check_topic in check_connected_topics:
32  for topic, sub_node in subscriptions:
33  if topic == rospy.get_namespace() + check_topic:
34  raise ValueError('Found topic: {}'.format(check_topic))
35 
37  topic_type = rospy.get_param('~input_topic_type')
38  check_connected_topics = rospy.get_param('~check_connected_topics')
39  wait_time = rospy.get_param('~wait_for_connection', 0)
40  msg_class = roslib.message.get_message_class(topic_type)
41  # Subscribe topic and bond connection
42  sub = rospy.Subscriber('~input', msg_class,
44  print('Waiting for connection for {} sec.'.format(wait_time))
45  rospy.sleep(wait_time)
46  # Check assumed topics are there
47  master = rosgraph.Master('test_connection')
48  _, subscriptions, _ = master.getSystemState()
49  for check_topic in check_connected_topics:
50  for topic, sub_node in subscriptions:
51  if topic == rospy.get_namespace() + check_topic:
52  break
53  else:
54  raise ValueError('Topic Not Found: {}'
55  .format(rospy.get_namespace() + check_topic))
56  sub.unregister()
57 
59  pass
60 
61 
62 if __name__ == "__main__":
63  import rostest
64  rostest.rosrun(PKG, NAME, TestLazyTransport)
test_lazy_transport.TestLazyTransport._cb_test_subscriber_appears
def _cb_test_subscriber_appears(self, msg)
Definition: test_lazy_transport.py:58
test_lazy_transport.TestLazyTransport
Definition: test_lazy_transport.py:18
test_lazy_transport.TestLazyTransport.test_no_subscribers
def test_no_subscribers(self)
Definition: test_lazy_transport.py:24
test_lazy_transport.TestLazyTransport.__init__
def __init__(self, *args)
Definition: test_lazy_transport.py:20
test_lazy_transport.TestLazyTransport.test_subscriber_appears
def test_subscriber_appears(self)
Definition: test_lazy_transport.py:36


topic_tools
Author(s): Morgan Quigley, Brian Gerkey, Dirk Thomas , Jacob Perron
autogenerated on Sat Sep 14 2024 03:00:05