test_stealth_relay.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # Author: Furushchev <furushchev@jsk.imi.i.u-tokyo.ac.jp>
4 
5 from dynamic_reconfigure.client import Client
6 import rospy
7 import unittest
8 from std_msgs.msg import String
9 
10 
11 class TestStealthRelay(unittest.TestCase):
12  def out_callback(self, msg):
13  self.out_msg_count += 1
14 
15  def monitor_callback(self, msg):
16  self.monitor_msg_count += 1
17 
18  def test_stealth_relay(self):
19  self.out_msg_count = 0
21  sub_out = rospy.Subscriber("/stealth_relay/output", String,
22  self.out_callback, queue_size=1)
23  for i in range(5):
24  if sub_out.get_num_connections() == 0:
25  rospy.sleep(1)
26  self.assertTrue(sub_out.get_num_connections() > 0,
27  "output topic of stealth relay was not advertised")
28 
29  rospy.sleep(5)
30  self.assertEqual(self.out_msg_count, 0,
31  "stealth relay node published message unexpectedly.")
32 
33  sub_monitor = rospy.Subscriber("/original_topic/relay", String,
34  self.monitor_callback, queue_size=1)
35  rospy.loginfo("subscribed monitor topic")
36  rospy.sleep(5) # monitoring topic is subscribed for 5 seconds.
37  cnt = self.out_msg_count
38  sub_monitor.unregister()
39  rospy.loginfo("unsubscribed monitor topic")
40 
41  self.assertGreater(self.monitor_msg_count, 0,
42  "monitoring topic is not published")
43  self.assertGreater(self.out_msg_count, 0,
44  "output topic of stealth relay was not published even monitoring topic is published")
45  rospy.sleep(5)
46  self.assertLess(abs(cnt - self.out_msg_count), 40,
47  "It seems stealth relay node did not stop subscribing even if monitoring topic is not published any more")
48 
49  # FIXME: updating dynamic reconfigure never ends on travis?
50  # client = Client("stealth_relay", timeout=3)
51  # rospy.loginfo("setting 'enable_monitor' to False")
52  # client.update_configuration({'enable_monitor': False, 'monitor_topic': ''})
53  # cnt = self.out_msg_count
54  # rospy.sleep(5)
55  # self.assertGreater(abs(self.out_msg_count - cnt), 40,
56  # "No output topic published even if 'enable_monitor' is set to False")
57 
58 
59 if __name__ == '__main__':
60  import rostest
61  rospy.init_node("test_stealth_relay")
62  rostest.rosrun("jsk_topic_tools", "test_stealth_relay", TestStealthRelay)


jsk_topic_tools
Author(s): Kei Okada , Yusuke Furuta
autogenerated on Tue Feb 6 2018 03:45:19