test_connection.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import os
4 import sys
5 
6 import unittest
7 
8 import rosgraph
9 import rospy
10 import rosmsg
11 import roslib
12 
13 
14 PKG = 'jsk_topic_tools'
15 NAME = 'test_connection'
16 
17 
18 class TestConnection(unittest.TestCase):
19 
20  def __init__(self, *args):
21  super(TestConnection, self).__init__(*args)
22  rospy.init_node(NAME)
23 
25  check_connected_topics = rospy.get_param('~check_connected_topics')
26  master = rosgraph.Master('/test_connection')
27  _, sub, _ = master.getSystemState()
28  # Check assumed topics are not there
29  master = rosgraph.Master('test_connection')
30  _, subscriptions, _ = master.getSystemState()
31  for check_topic in check_connected_topics:
32  for topic, sub_node in subscriptions:
33  if topic == rospy.get_namespace() + check_topic:
34  raise ValueError('Found topic: {}'.format(check_topic))
35 
37  topic_type = rospy.get_param('~input_topic_type')
38  check_connected_topics = rospy.get_param('~check_connected_topics')
39  wait_time = rospy.get_param('~wait_for_connection', 0)
40  msg_class = roslib.message.get_message_class(topic_type)
41  # Subscribe topic and bond connection
42  sub = rospy.Subscriber('~input', msg_class,
44  print('Waiting for connection for {} sec.'.format(wait_time))
45  rospy.sleep(wait_time)
46  # Check assumed topics are there
47  master = rosgraph.Master('test_connection')
48  _, subscriptions, _ = master.getSystemState()
49  for check_topic in check_connected_topics:
50  for topic, sub_node in subscriptions:
51  if topic == rospy.get_namespace() + check_topic:
52  break
53  else:
54  raise ValueError('Not found topic: {}'.format(check_topic))
55  sub.unregister()
56  # FIXME: below test won't pass on hydro
57  ROS_DISTRO = os.environ.get('ROS_DISTRO', 'indigo')
58  if ord(ROS_DISTRO[0]) < ord('i'):
59  sys.stderr.write('WARNING: running on rosdistro %s, and skipping '
60  'test for disconnection.\n' % ROS_DISTRO)
61  return
62  rospy.sleep(1) # wait for disconnection
63  # Check specified topics do not exist
64  _, subscriptions, _ = master.getSystemState()
65  for check_topic in check_connected_topics:
66  for topic, sub_node in subscriptions:
67  if topic == rospy.get_namespace() + check_topic:
68  raise ValueError('Found topic: {}'.format(check_topic))
69 
71  pass
72 
73 
74 if __name__ == "__main__":
75  import rostest
76  rostest.rosrun(PKG, NAME, TestConnection)
def _cb_test_subscriber_appears_disappears(self, msg)


jsk_topic_tools
Author(s): Kei Okada , Yusuke Furuta
autogenerated on Tue Feb 6 2018 03:45:19