test_multi_unregistering.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import os
4 import rospy
5 import rosunit
6 import unittest
7 
8 from time import sleep
9 
10 from rosbridge_library.internal.publishers import MultiPublisher
11 from rosbridge_library.internal import ros_loader
12 
13 
14 PKG = 'rosbridge_library'
15 NAME = 'test_multi_unregistering'
16 
17 
18 class TestMultiUnregistering(unittest.TestCase):
19 
20  def setUp(self):
21  rospy.init_node(NAME, log_level=rospy.DEBUG)
22 
23  def test_publish_once(self):
24  """ Make sure that publishing works """
25  topic = "/test_publish_once"
26  msg_type = "std_msgs/String"
27  msg = {"data": "why halo thar"}
28 
29  received = {"msg": None}
30 
31  def cb(msg):
32  received["msg"] = msg
33 
34  rospy.Subscriber(topic, ros_loader.get_message_class(msg_type), cb)
35  p = MultiPublisher(topic, msg_type)
36  p.publish(msg)
37 
38  sleep(1) # Time to publish and receive
39 
40  self.assertEqual(received["msg"].data, msg["data"])
41 
42  def test_publish_twice(self):
43  """ Make sure that publishing works """
44  topic = "/test_publish_twice"
45  msg_type = "std_msgs/String"
46  msg = {"data": "why halo thar"}
47 
48  received = {"msg": None}
49 
50  def cb(msg):
51  received["msg"] = msg
52 
53  rospy.Subscriber(topic, ros_loader.get_message_class(msg_type), cb)
54  p = MultiPublisher(topic, msg_type)
55  p.publish(msg)
56 
57  sleep(1) # Time to publish and receive
58 
59  self.assertEqual(received["msg"].data, msg["data"])
60 
61  p.unregister()
62  sleep(5) # Time to unregister
63 
64  received["msg"] = None # Reset received
65  self.assertIsNone(received["msg"])
66  p = MultiPublisher(topic, msg_type)
67  p.publish(msg)
68 
69  sleep(1) # Time to publish and receive
70 
71  self.assertEqual(received["msg"].data, msg["data"])
72 
73 
74 if __name__ == '__main__':
75  rosunit.unitrun(PKG, NAME, TestMultiUnregistering)


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Fri Oct 21 2022 02:45:18