test_multi_unregistering.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import sys
3 import rospy
4 import rostest
5 import unittest
6 
7 from time import sleep, time
8 
11 from rosbridge_library.internal import ros_loader
13 from std_msgs.msg import String, Int32
14 
15 
16 class TestMultiUnregistering(unittest.TestCase):
17 
18  def setUp(self):
19  rospy.init_node("test_multi_unregistering")
20 
21  def test_publish_once(self):
22  """ Make sure that publishing works """
23  topic = "/test_publish_once"
24  msg_type = "std_msgs/String"
25  msg = {"data": "why halo thar"}
26 
27  received = {"msg": None}
28  def cb(msg):
29  received["msg"] = msg
30 
31  rospy.Subscriber(topic, ros_loader.get_message_class(msg_type), cb)
32  p = MultiPublisher(topic, msg_type)
33  p.publish(msg)
34 
35  sleep(1)
36 
37  self.assertEqual(received["msg"].data, msg["data"])
38 
39  def test_publish_twice(self):
40  """ Make sure that publishing works """
41  topic = "/test_publish_twice"
42  msg_type = "std_msgs/String"
43  msg = {"data": "why halo thar"}
44 
45  received = {"msg": None}
46  def cb(msg):
47  received["msg"] = msg
48 
49  rospy.Subscriber(topic, ros_loader.get_message_class(msg_type), cb)
50  p = MultiPublisher(topic, msg_type)
51  p.publish(msg)
52 
53  sleep(1)
54 
55  self.assertEqual(received["msg"].data, msg["data"])
56 
57  p.unregister()
58  # The publisher went away at time T. Here's the timeline of the events:
59  # T+1 seconds - the subscriber will retry to reconnect - fail
60  # T+3 seconds - the subscriber will retry to reconnect - fail
61  # T+5 seconds - publish msg -> it's gone
62  # T+7 seconds - the subscriber will retry to reconnect - success
63  # T+8 seconds - publish msg -> OK
64  # T+11 seconds - we receive the message. Looks like a bug in reconnection...
65  # https://github.com/ros/ros_comm/blob/indigo-devel/clients/rospy/src/rospy/impl/tcpros_base.py#L733
66  # That line should probably be indented.
67  sleep(5)
68 
69  received["msg"] = None
70  self.assertIsNone(received["msg"])
71  p = MultiPublisher(topic, msg_type)
72  p.publish(msg)
73 
74  self.assertIsNone(received["msg"])
75 
76  sleep(3)
77  p.publish(msg)
78  sleep(2)
79  # Next two lines should be removed when this is fixed:
80  # https://github.com/ros/ros_comm/blob/indigo-devel/clients/rospy/src/rospy/impl/tcpros_base.py#L733
81  self.assertIsNone(received["msg"])
82  sleep(3)
83  self.assertEqual(received["msg"].data, msg["data"])
84 
85 
86 PKG = 'rosbridge_library'
87 NAME = 'test_multi_unregistering'
88 if __name__ == '__main__':
89  rostest.unitrun(PKG, NAME, TestMultiUnregistering)


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Wed Jun 3 2020 03:55:14