test_message_conversion.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 PKG = 'rosbridge_library'
00003 import roslib; roslib.load_manifest(PKG)
00004 import rospy
00005 from json import loads, dumps
00006 
00007 from StringIO import StringIO
00008 
00009 from rosbridge_library.internal import message_conversion as c
00010 from rosbridge_library.internal import ros_loader
00011 from base64 import standard_b64encode, standard_b64decode
00012 
00013 import unittest
00014 
00015 
00016 class TestMessageConversion(unittest.TestCase):
00017 
00018     def setUp(self):
00019         rospy.init_node("test_message_conversion")
00020 
00021     def validate_instance(self, inst1):
00022         """ Serializes and deserializes the inst to typecheck and ensure that
00023         instances are correct """
00024         inst1._check_types()
00025         buff = StringIO()
00026         inst1.serialize(buff)
00027         inst2 = type(inst1)()
00028         inst2.deserialize(buff.getvalue())
00029         self.assertEqual(inst1, inst2)
00030         inst2._check_types()
00031 
00032     def msgs_equal(self, msg1, msg2):
00033         if type(msg1) in [str, unicode] and type(msg2) in [str, unicode]:
00034             pass
00035         else:
00036             self.assertEqual(type(msg1), type(msg2))
00037         if type(msg1) in c.list_types:
00038             for x, y in zip(msg1, msg2):
00039                 self.msgs_equal(x, y)
00040         elif type(msg1) in c.primitive_types or type(msg1) in c.string_types:
00041             self.assertEqual(msg1, msg2)
00042         else:
00043             for x in msg1:
00044                 self.assertTrue(x in msg2)
00045             for x in msg2:
00046                 self.assertTrue(x in msg1)
00047             for x in msg1:
00048                 self.msgs_equal(msg1[x], msg2[x])
00049 
00050     def do_primitive_test(self, data_value, msgtype):
00051         for msg in [{"data": data_value}, loads(dumps({"data": data_value}))]:
00052             inst = ros_loader.get_message_instance(msgtype)
00053             c.populate_instance(msg, inst)
00054             self.assertEqual(inst.data, data_value)
00055             self.validate_instance(inst)
00056             extracted = c.extract_values(inst)
00057             for msg2 in [extracted, loads(dumps(extracted))]:
00058                 self.msgs_equal(msg, msg2)
00059                 self.assertEqual(msg["data"], msg2["data"])
00060                 self.assertEqual(msg2["data"], inst.data)
00061 
00062     def do_test(self, orig_msg, msgtype):
00063         for msg in [orig_msg, loads(dumps(orig_msg))]:
00064             inst = ros_loader.get_message_instance(msgtype)
00065             c.populate_instance(msg, inst)
00066             self.validate_instance(inst)
00067             extracted = c.extract_values(inst)
00068             for msg2 in [extracted, loads(dumps(extracted))]:
00069                 self.msgs_equal(msg, msg2)
00070 
00071     def test_int_primitives(self):
00072         # Test raw primitives
00073         for msg in range(-100, 100):
00074             for rostype in ["int8", "int16", "int32", "int64"]:
00075                 self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
00076                 self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
00077         # Test raw primitives
00078         for msg in range(0, 200):
00079             for rostype in ["uint8", "uint16", "uint32", "uint64"]:
00080                 self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
00081                 self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
00082 
00083     def test_bool_primitives(self):
00084         self.assertTrue(c._to_primitive_inst(True, "bool", "bool", []))
00085         self.assertTrue(c._to_inst(True, "bool", "bool"))
00086         self.assertFalse(c._to_primitive_inst(False, "bool", "bool", []))
00087         self.assertFalse(c._to_inst(False, "bool", "bool"))
00088 
00089     def test_float_primitives(self):
00090         for msg in [0.12341234 + i for i in range(-100, 100)]:
00091             for rostype in ["float32", "float64"]:
00092                 self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
00093                 self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
00094 
00095     def test_signed_int_base_msgs(self):
00096         int8s = range(-127, 128)
00097         for int8 in int8s:
00098             self.do_primitive_test(int8, "std_msgs/Byte")
00099             self.do_primitive_test(int8, "std_msgs/Int8")
00100             self.do_primitive_test(int8, "std_msgs/Int16")
00101             self.do_primitive_test(int8, "std_msgs/Int32")
00102             self.do_primitive_test(int8, "std_msgs/Int64")
00103 
00104         int16s = [-32767, 32767]
00105         for int16 in int16s:
00106             self.do_primitive_test(int16, "std_msgs/Int16")
00107             self.do_primitive_test(int16, "std_msgs/Int32")
00108             self.do_primitive_test(int16, "std_msgs/Int64")
00109             self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/Byte")
00110             self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/Int8")
00111 
00112         int32s = [-2147483647, 2147483647]
00113         for int32 in int32s:
00114             self.do_primitive_test(int32, "std_msgs/Int32")
00115             self.do_primitive_test(int32, "std_msgs/Int64")
00116             self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Byte")
00117             self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Int8")
00118             self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Int16")
00119 
00120         int64s = [-9223372036854775807, 9223372036854775807]
00121         for int64 in int64s:
00122             self.do_primitive_test(int64, "std_msgs/Int64")
00123             self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Byte")
00124             self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int8")
00125             self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int16")
00126             self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int32")
00127 
00128     def test_unsigned_int_base_msgs(self):
00129         int8s = range(0, 256)
00130         for int8 in int8s:
00131             self.do_primitive_test(int8, "std_msgs/Char")
00132             self.do_primitive_test(int8, "std_msgs/UInt8")
00133             self.do_primitive_test(int8, "std_msgs/UInt16")
00134             self.do_primitive_test(int8, "std_msgs/UInt32")
00135             self.do_primitive_test(int8, "std_msgs/UInt64")
00136 
00137         int16s = [32767, 32768, 65535]
00138         for int16 in int16s:
00139             self.do_primitive_test(int16, "std_msgs/UInt16")
00140             self.do_primitive_test(int16, "std_msgs/UInt32")
00141             self.do_primitive_test(int16, "std_msgs/UInt64")
00142             self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/Char")
00143             self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/UInt8")
00144 
00145         int32s = [2147483647, 2147483648, 4294967295]
00146         for int32 in int32s:
00147             self.do_primitive_test(int32, "std_msgs/UInt32")
00148             self.do_primitive_test(int32, "std_msgs/UInt64")
00149             self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Char")
00150             self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/UInt8")
00151             self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/UInt16")
00152 
00153         int64s = [4294967296, 9223372036854775807, 9223372036854775808,
00154                    18446744073709551615]
00155         for int64 in int64s:
00156             self.do_primitive_test(int64, "std_msgs/UInt64")
00157             self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Char")
00158             self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/UInt8")
00159             self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/UInt16")
00160             self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/UInt32")
00161 
00162     def test_bool_base_msg(self):
00163         self.do_primitive_test(True, "std_msgs/Bool")
00164         self.do_primitive_test(False, "std_msgs/Bool")
00165 
00166     def test_string_base_msg(self):
00167         for x in c.ros_primitive_types:
00168             self.do_primitive_test(x, "std_msgs/String")
00169 
00170     def test_time_msg(self):
00171         msg = {"data": {"secs": 3, "nsecs": 5}}
00172         self.do_test(msg, "std_msgs/Time")
00173 
00174         msg = {"times": [{"secs": 3, "nsecs": 5}, {"secs": 2, "nsecs": 7}]}
00175         self.do_test(msg, "rosbridge_test/TimeArrayTest")
00176         
00177     def test_time_msg_now(self):
00178         msg = {"data": "now"}
00179         msgtype = "std_msgs/Time"
00180         
00181         inst = ros_loader.get_message_instance(msgtype)
00182         c.populate_instance(msg, inst)
00183         currenttime = rospy.get_rostime()
00184         self.validate_instance(inst)
00185         extracted = c.extract_values(inst)
00186         print extracted
00187         self.assertIn("data", extracted)
00188         self.assertIn("secs", extracted["data"])
00189         self.assertIn("nsecs", extracted["data"])
00190         self.assertNotEqual(extracted["data"]["secs"], 0)
00191         self.assertLessEqual(extracted["data"]["secs"], currenttime.secs)
00192         self.assertGreaterEqual(currenttime.secs, extracted["data"]["secs"])
00193 
00194     def test_duration_msg(self):
00195         msg = {"data": {"secs": 3, "nsecs": 5}}
00196         self.do_test(msg, "std_msgs/Duration")
00197 
00198         msg = {"durations": [{"secs": 3, "nsecs": 5}, {"secs": 2, "nsecs": 7}]}
00199         self.do_test(msg, "rosbridge_test/DurationArrayTest")
00200 
00201     def test_header_msg(self):
00202         msg = {"seq": 5, "stamp": {"secs": 12347, "nsecs": 322304}, "frame_id": "2394dnfnlcx;v[p234j]"}
00203         self.do_test(msg, "std_msgs/Header")
00204 
00205         msg = {"header": msg}
00206         self.do_test(msg, "rosbridge_test/HeaderTest")
00207         self.do_test(msg, "rosbridge_test/HeaderTestTwo")
00208 
00209         msg = {"header": [msg["header"], msg["header"], msg["header"]]}
00210         msg["header"][1]["seq"] = 6
00211         msg["header"][2]["seq"] = 7
00212         self.do_test(msg, "rosbridge_test/HeaderArrayTest")
00213 
00214     def test_assorted_msgs(self):
00215         assortedmsgs = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
00216         "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
00217         "nav_msgs/OccupancyGrid", "geometry_msgs/Point32", "std_msgs/String",
00218         "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
00219         "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
00220         "sensor_msgs/PointCloud2"]
00221         for rostype in assortedmsgs:
00222             inst = ros_loader.get_message_instance(rostype)
00223             msg = c.extract_values(inst)
00224             self.do_test(msg, rostype)
00225             l = loads(dumps(msg))
00226             inst2 = ros_loader.get_message_instance(rostype)
00227             c.populate_instance(msg, inst2)
00228             self.assertEqual(inst, inst2)
00229 
00230     def test_int8array(self):
00231         def test_int8_msg(rostype, data):
00232             msg = {"data": data}
00233             inst = ros_loader.get_message_instance(rostype)
00234             c.populate_instance(msg, inst)
00235             self.validate_instance(inst)
00236             return inst.data
00237 
00238         for msgtype in ["CharTest", "UInt8Test"]:
00239             rostype = "rosbridge_test/" + msgtype
00240 
00241             int8s = range(0, 256)
00242             ret = test_int8_msg(rostype, int8s)
00243             self.assertEqual(ret, str(bytearray(int8s)))
00244 
00245             str_int8s = str(bytearray(int8s))
00246 
00247             b64str_int8s = standard_b64encode(str_int8s)
00248             ret = test_int8_msg(rostype, b64str_int8s)
00249             self.assertEqual(ret, str_int8s)
00250 
00251 
00252 if __name__ == '__main__':
00253     import rostest
00254     rostest.rosrun(PKG, 'test_publish', TestMessageConversion)


rosbridge_test
Author(s): Jonathan Mace
autogenerated on Thu Jan 2 2014 11:54:04