Go to the documentation of this file.00001
00002 PKG = 'rosbridge_library'
00003 import roslib
00004 roslib.load_manifest(PKG)
00005 roslib.load_manifest("std_msgs")
00006 import rospy
00007 from time import sleep
00008
00009 from rosbridge_library.protocol import Protocol
00010 from rosbridge_library.protocol import InvalidArgumentException, MissingArgumentException
00011 from rosbridge_library.capabilities.publish import Publish
00012 from rosbridge_library.internal.publishers import manager
00013 from rosbridge_library.internal import ros_loader
00014
00015 from std_msgs.msg import String
00016
00017 from json import dumps, loads
00018
00019 import unittest
00020
00021
00022 class TestAdvertise(unittest.TestCase):
00023
00024 def setUp(self):
00025 rospy.init_node("test_advertise")
00026
00027 def test_missing_arguments(self):
00028 proto = Protocol("hello")
00029 pub = Publish(proto)
00030 msg = {"op": "publish"}
00031 self.assertRaises(MissingArgumentException, pub.publish, msg)
00032
00033 msg = {"op": "publish", "msg": {}}
00034 self.assertRaises(MissingArgumentException, pub.publish, msg)
00035
00036 def test_invalid_arguments(self):
00037 proto = Protocol("hello")
00038 pub = Publish(proto)
00039
00040 msg = {"op": "publish", "topic": 3}
00041 self.assertRaises(InvalidArgumentException, pub.publish, msg)
00042
00043 def test_publish_works(self):
00044 proto = Protocol("hello")
00045 pub = Publish(proto)
00046 topic = "/test_publish_works"
00047 msg = {"data": "test publish works"}
00048
00049 received = {"msg": None}
00050
00051 def cb(msg):
00052 received["msg"] = msg
00053
00054 rospy.Subscriber(topic, String, cb)
00055
00056 pub_msg = loads(dumps({"op": "publish", "topic": topic, "msg": msg}))
00057 pub.publish(pub_msg)
00058
00059 sleep(0.5)
00060 self.assertEqual(received["msg"].data, msg["data"])
00061
00062
00063 if __name__ == '__main__':
00064 import rostest
00065 rostest.rosrun(PKG, 'test_publish', TestAdvertise)