test_publish.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import sys
3 import rospy
4 import rosunit
5 import unittest
6 from time import sleep
7 
8 from rosbridge_library.protocol import Protocol
9 from rosbridge_library.protocol import InvalidArgumentException, MissingArgumentException
11 from rosbridge_library.internal.publishers import manager
12 from rosbridge_library.internal import ros_loader
13 
14 from std_msgs.msg import String
15 
16 from json import dumps, loads
17 
18 
19 PKG = 'rosbridge_library'
20 NAME = 'test_publish'
21 
22 
23 class TestPublish(unittest.TestCase):
24 
25  def setUp(self):
26  rospy.init_node(NAME)
27 
29  proto = Protocol("hello")
30  pub = Publish(proto)
31  msg = {"op": "publish"}
32  self.assertRaises(MissingArgumentException, pub.publish, msg)
33 
34  msg = {"op": "publish", "msg": {}}
35  self.assertRaises(MissingArgumentException, pub.publish, msg)
36 
38  proto = Protocol("hello")
39  pub = Publish(proto)
40 
41  msg = {"op": "publish", "topic": 3}
42  self.assertRaises(InvalidArgumentException, pub.publish, msg)
43 
44  def test_publish_works(self):
45  proto = Protocol("hello")
46  pub = Publish(proto)
47  topic = "/test_publish_works"
48  msg = {"data": "test publish works"}
49 
50  received = {"msg": None}
51 
52  def cb(msg):
53  received["msg"] = msg
54 
55  rospy.Subscriber(topic, String, cb)
56 
57  pub_msg = loads(dumps({"op": "publish", "topic": topic, "msg": msg}))
58  pub.publish(pub_msg)
59 
60  sleep(0.5)
61  self.assertEqual(received["msg"].data, msg["data"])
62 
63 
64 if __name__ == '__main__':
65  rosunit.unitrun(PKG, NAME, TestPublish)
66 
def dumps(ob, sort_keys=False)
Definition: cbor.py:223


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Fri Oct 21 2022 02:45:18