Go to the documentation of this file.00001
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006 from time import sleep
00007
00008 from rosbridge_library.protocol import Protocol
00009 from rosbridge_library.protocol import InvalidArgumentException, MissingArgumentException
00010 from rosbridge_library.capabilities.publish import Publish
00011 from rosbridge_library.internal.publishers import manager
00012 from rosbridge_library.internal import ros_loader
00013
00014 from std_msgs.msg import String
00015
00016 from json import dumps, loads
00017
00018
00019 class TestAdvertise(unittest.TestCase):
00020
00021 def setUp(self):
00022 rospy.init_node("test_advertise")
00023
00024 def test_missing_arguments(self):
00025 proto = Protocol("hello")
00026 pub = Publish(proto)
00027 msg = {"op": "publish"}
00028 self.assertRaises(MissingArgumentException, pub.publish, msg)
00029
00030 msg = {"op": "publish", "msg": {}}
00031 self.assertRaises(MissingArgumentException, pub.publish, msg)
00032
00033 def test_invalid_arguments(self):
00034 proto = Protocol("hello")
00035 pub = Publish(proto)
00036
00037 msg = {"op": "publish", "topic": 3}
00038 self.assertRaises(InvalidArgumentException, pub.publish, msg)
00039
00040 def test_publish_works(self):
00041 proto = Protocol("hello")
00042 pub = Publish(proto)
00043 topic = "/test_publish_works"
00044 msg = {"data": "test publish works"}
00045
00046 received = {"msg": None}
00047
00048 def cb(msg):
00049 received["msg"] = msg
00050
00051 rospy.Subscriber(topic, String, cb)
00052
00053 pub_msg = loads(dumps({"op": "publish", "topic": topic, "msg": msg}))
00054 pub.publish(pub_msg)
00055
00056 sleep(0.5)
00057 self.assertEqual(received["msg"].data, msg["data"])
00058
00059
00060 PKG = 'rosbridge_library'
00061 NAME = 'test_publish'
00062 if __name__ == '__main__':
00063 rostest.unitrun(PKG, NAME, TestAdvertise)
00064