test_publish.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import sys
3 import rospy
4 import rostest
5 import unittest
6 from time import sleep
7 
8 from rosbridge_library.protocol import Protocol
9 from rosbridge_library.protocol import InvalidArgumentException, MissingArgumentException
11 from rosbridge_library.internal.publishers import manager
12 from rosbridge_library.internal import ros_loader
13 
14 from std_msgs.msg import String
15 
16 from json import dumps, loads
17 
18 
19 class TestAdvertise(unittest.TestCase):
20 
21  def setUp(self):
22  rospy.init_node("test_advertise")
23 
25  proto = Protocol("hello")
26  pub = Publish(proto)
27  msg = {"op": "publish"}
28  self.assertRaises(MissingArgumentException, pub.publish, msg)
29 
30  msg = {"op": "publish", "msg": {}}
31  self.assertRaises(MissingArgumentException, pub.publish, msg)
32 
34  proto = Protocol("hello")
35  pub = Publish(proto)
36 
37  msg = {"op": "publish", "topic": 3}
38  self.assertRaises(InvalidArgumentException, pub.publish, msg)
39 
40  def test_publish_works(self):
41  proto = Protocol("hello")
42  pub = Publish(proto)
43  topic = "/test_publish_works"
44  msg = {"data": "test publish works"}
45 
46  received = {"msg": None}
47 
48  def cb(msg):
49  received["msg"] = msg
50 
51  rospy.Subscriber(topic, String, cb)
52 
53  pub_msg = loads(dumps({"op": "publish", "topic": topic, "msg": msg}))
54  pub.publish(pub_msg)
55 
56  sleep(0.5)
57  self.assertEqual(received["msg"].data, msg["data"])
58 
59 
60 PKG = 'rosbridge_library'
61 NAME = 'test_publish'
62 if __name__ == '__main__':
63  rostest.unitrun(PKG, NAME, TestAdvertise)
64 
def dumps(ob, sort_keys=False)
Definition: cbor.py:223


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Wed Jun 3 2020 03:55:14