test_publish.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import sys
3 import rospy
4 import rosunit
5 import unittest
6 from time import sleep
7 
8 from rosbridge_library.protocol import Protocol
9 from rosbridge_library.protocol import InvalidArgumentException, MissingArgumentException
11 from rosbridge_library.internal.publishers import manager
12 from rosbridge_library.internal import ros_loader
13 
14 from std_msgs.msg import String
15 
16 from json import dumps, loads
17 
18 
19 PKG = 'rosbridge_library'
20 NAME = 'test_publish'
21 
22 
23 class TestPublish(unittest.TestCase):
24 
25  def setUp(self):
26  rospy.init_node(NAME)
27 
29  proto = Protocol("hello")
30  pub = Publish(proto)
31  msg = {"op": "publish"}
32  self.assertRaises(MissingArgumentException, pub.publish, msg)
33 
34  msg = {"op": "publish", "msg": {}}
35  self.assertRaises(MissingArgumentException, pub.publish, msg)
36 
38  proto = Protocol("hello")
39  pub = Publish(proto)
40 
41  msg = {"op": "publish", "topic": 3}
42  self.assertRaises(InvalidArgumentException, pub.publish, msg)
43 
44  def test_publish_works(self):
45  proto = Protocol("hello")
46  pub = Publish(proto)
47  topic = "/test_publish_works"
48  msg = {"data": "test publish works"}
49 
50  received = {"msg": None}
51 
52  def cb(msg):
53  received["msg"] = msg
54 
55  rospy.Subscriber(topic, String, cb)
56 
57  pub_msg = loads(dumps({"op": "publish", "topic": topic, "msg": msg}))
58  pub.publish(pub_msg)
59 
60  sleep(0.5)
61  self.assertEqual(received["msg"].data, msg["data"])
62 
63 
64 if __name__ == '__main__':
65  rosunit.unitrun(PKG, NAME, TestPublish)
66 
msg
test.capabilities.test_publish.TestPublish.test_publish_works
def test_publish_works(self)
Definition: test_publish.py:44
rosbridge_library.internal.publishers
Definition: publishers.py:1
rosbridge_library.capabilities.publish.Publish
Definition: publish.py:40
rosbridge_library.util.cbor.loads
def loads(data)
Definition: cbor.py:270
rosbridge_library.util.cbor.dumps
def dumps(ob, sort_keys=False)
Definition: cbor.py:223
test.capabilities.test_publish.TestPublish.setUp
def setUp(self)
Definition: test_publish.py:25
rosbridge_library.protocol.Protocol
Definition: protocol.py:67
rosbridge_library.protocol
Definition: protocol.py:1
test.capabilities.test_publish.TestPublish
Definition: test_publish.py:23
test.capabilities.test_publish.TestPublish.test_invalid_arguments
def test_invalid_arguments(self)
Definition: test_publish.py:37
rosbridge_library.capabilities.publish
Definition: publish.py:1
test.capabilities.test_publish.TestPublish.test_missing_arguments
def test_missing_arguments(self)
Definition: test_publish.py:28
rosbridge_library.internal
Definition: src/rosbridge_library/internal/__init__.py:1


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Tue Oct 3 2023 02:12:45