test_lazy_transport.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import os
4 import sys
5 
6 import unittest
7 
8 import rosgraph
9 import rospy
10 import rosmsg
11 import roslib
12 
13 
14 PKG = 'topic_tools'
15 NAME = 'test_lazy_transport'
16 
17 
18 class TestLazyTransport(unittest.TestCase):
19 
20  def __init__(self, *args):
21  super(self.__class__, self).__init__(*args)
22  rospy.init_node(NAME)
23 
25  check_connected_topics = rospy.get_param('~check_connected_topics')
26  master = rosgraph.Master('/test_connection')
27  _, sub, _ = master.getSystemState()
28  # Check assumed topics are not there
29  master = rosgraph.Master('test_connection')
30  _, subscriptions, _ = master.getSystemState()
31  for check_topic in check_connected_topics:
32  for topic, sub_node in subscriptions:
33  if topic == rospy.get_namespace() + check_topic:
34  raise ValueError('Found topic: {}'.format(check_topic))
35 
37  topic_type = rospy.get_param('~input_topic_type')
38  check_connected_topics = rospy.get_param('~check_connected_topics')
39  wait_time = rospy.get_param('~wait_for_connection', 0)
40  msg_class = roslib.message.get_message_class(topic_type)
41  # Subscribe topic and bond connection
42  sub = rospy.Subscriber('~input', msg_class,
44  print('Waiting for connection for {} sec.'.format(wait_time))
45  rospy.sleep(wait_time)
46  # Check assumed topics are there
47  master = rosgraph.Master('test_connection')
48  _, subscriptions, _ = master.getSystemState()
49  for check_topic in check_connected_topics:
50  for topic, sub_node in subscriptions:
51  if topic == rospy.get_namespace() + check_topic:
52  break
53  else:
54  raise ValueError('Topic Not Found: {}'
55  .format(rospy.get_namespace() + check_topic))
56  sub.unregister()
57 
59  pass
60 
61 
62 if __name__ == "__main__":
63  import rostest
64  rostest.rosrun(PKG, NAME, TestLazyTransport)


topic_tools
Author(s): Morgan Quigley, Brian Gerkey, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:58