test_message_conversion.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006 from json import loads, dumps
00007 
00008 from StringIO import StringIO
00009 
00010 from rosbridge_library.internal import message_conversion as c
00011 from rosbridge_library.internal import ros_loader
00012 from base64 import standard_b64encode, standard_b64decode
00013 
00014 
00015 class TestMessageConversion(unittest.TestCase):
00016 
00017     def setUp(self):
00018         rospy.init_node("test_message_conversion")
00019 
00020     def validate_instance(self, inst1):
00021         """ Serializes and deserializes the inst to typecheck and ensure that
00022         instances are correct """
00023         inst1._check_types()
00024         buff = StringIO()
00025         inst1.serialize(buff)
00026         inst2 = type(inst1)()
00027         inst2.deserialize(buff.getvalue())
00028         self.assertEqual(inst1, inst2)
00029         inst2._check_types()
00030 
00031     def msgs_equal(self, msg1, msg2):
00032         if type(msg1) in [str, unicode] and type(msg2) in [str, unicode]:
00033             pass
00034         else:
00035             self.assertEqual(type(msg1), type(msg2))
00036         if type(msg1) in c.list_types:
00037             for x, y in zip(msg1, msg2):
00038                 self.msgs_equal(x, y)
00039         elif type(msg1) in c.primitive_types or type(msg1) in c.string_types:
00040             self.assertEqual(msg1, msg2)
00041         else:
00042             for x in msg1:
00043                 self.assertTrue(x in msg2)
00044             for x in msg2:
00045                 self.assertTrue(x in msg1)
00046             for x in msg1:
00047                 self.msgs_equal(msg1[x], msg2[x])
00048 
00049     def do_primitive_test(self, data_value, msgtype):
00050         for msg in [{"data": data_value}, loads(dumps({"data": data_value}))]:
00051             inst = ros_loader.get_message_instance(msgtype)
00052             c.populate_instance(msg, inst)
00053             self.assertEqual(inst.data, data_value)
00054             self.validate_instance(inst)
00055             extracted = c.extract_values(inst)
00056             for msg2 in [extracted, loads(dumps(extracted))]:
00057                 self.msgs_equal(msg, msg2)
00058                 self.assertEqual(msg["data"], msg2["data"])
00059                 self.assertEqual(msg2["data"], inst.data)
00060 
00061     def do_test(self, orig_msg, msgtype):
00062         for msg in [orig_msg, loads(dumps(orig_msg))]:
00063             inst = ros_loader.get_message_instance(msgtype)
00064             c.populate_instance(msg, inst)
00065             self.validate_instance(inst)
00066             extracted = c.extract_values(inst)
00067             for msg2 in [extracted, loads(dumps(extracted))]:
00068                 self.msgs_equal(msg, msg2)
00069 
00070     def test_int_primitives(self):
00071         # Test raw primitives
00072         for msg in range(-100, 100):
00073             for rostype in ["int8", "int16", "int32", "int64"]:
00074                 self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
00075                 self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
00076         # Test raw primitives
00077         for msg in range(0, 200):
00078             for rostype in ["uint8", "uint16", "uint32", "uint64"]:
00079                 self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
00080                 self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
00081 
00082     def test_bool_primitives(self):
00083         self.assertTrue(c._to_primitive_inst(True, "bool", "bool", []))
00084         self.assertTrue(c._to_inst(True, "bool", "bool"))
00085         self.assertFalse(c._to_primitive_inst(False, "bool", "bool", []))
00086         self.assertFalse(c._to_inst(False, "bool", "bool"))
00087 
00088     def test_float_primitives(self):
00089         for msg in [0.12341234 + i for i in range(-100, 100)]:
00090             for rostype in ["float32", "float64"]:
00091                 self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
00092                 self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
00093                 c._to_inst(msg, rostype, rostype)
00094 
00095     def test_float_special_cases(self):
00096         for msg in [1e9999999, -1e9999999, float('nan')]:
00097             for rostype in ["float32", "float64"]:
00098                 self.assertEqual(c._from_inst(msg, rostype), None)
00099                 self.assertEqual(dumps({"data":c._from_inst(msg, rostype)}), "{\"data\": null}")
00100 
00101     def test_signed_int_base_msgs(self):
00102         int8s = range(-127, 128)
00103         for int8 in int8s:
00104             self.do_primitive_test(int8, "std_msgs/Byte")
00105             self.do_primitive_test(int8, "std_msgs/Int8")
00106             self.do_primitive_test(int8, "std_msgs/Int16")
00107             self.do_primitive_test(int8, "std_msgs/Int32")
00108             self.do_primitive_test(int8, "std_msgs/Int64")
00109 
00110         int16s = [-32767, 32767]
00111         for int16 in int16s:
00112             self.do_primitive_test(int16, "std_msgs/Int16")
00113             self.do_primitive_test(int16, "std_msgs/Int32")
00114             self.do_primitive_test(int16, "std_msgs/Int64")
00115             self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/Byte")
00116             self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/Int8")
00117 
00118         int32s = [-2147483647, 2147483647]
00119         for int32 in int32s:
00120             self.do_primitive_test(int32, "std_msgs/Int32")
00121             self.do_primitive_test(int32, "std_msgs/Int64")
00122             self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Byte")
00123             self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Int8")
00124             self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Int16")
00125 
00126         int64s = [-9223372036854775807, 9223372036854775807]
00127         for int64 in int64s:
00128             self.do_primitive_test(int64, "std_msgs/Int64")
00129             self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Byte")
00130             self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int8")
00131             self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int16")
00132             self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int32")
00133 
00134     def test_unsigned_int_base_msgs(self):
00135         int8s = range(0, 256)
00136         for int8 in int8s:
00137             self.do_primitive_test(int8, "std_msgs/Char")
00138             self.do_primitive_test(int8, "std_msgs/UInt8")
00139             self.do_primitive_test(int8, "std_msgs/UInt16")
00140             self.do_primitive_test(int8, "std_msgs/UInt32")
00141             self.do_primitive_test(int8, "std_msgs/UInt64")
00142 
00143         int16s = [32767, 32768, 65535]
00144         for int16 in int16s:
00145             self.do_primitive_test(int16, "std_msgs/UInt16")
00146             self.do_primitive_test(int16, "std_msgs/UInt32")
00147             self.do_primitive_test(int16, "std_msgs/UInt64")
00148             self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/Char")
00149             self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/UInt8")
00150 
00151         int32s = [2147483647, 2147483648, 4294967295]
00152         for int32 in int32s:
00153             self.do_primitive_test(int32, "std_msgs/UInt32")
00154             self.do_primitive_test(int32, "std_msgs/UInt64")
00155             self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Char")
00156             self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/UInt8")
00157             self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/UInt16")
00158 
00159         int64s = [4294967296, 9223372036854775807, 9223372036854775808,
00160                    18446744073709551615]
00161         for int64 in int64s:
00162             self.do_primitive_test(int64, "std_msgs/UInt64")
00163             self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Char")
00164             self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/UInt8")
00165             self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/UInt16")
00166             self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/UInt32")
00167 
00168     def test_bool_base_msg(self):
00169         self.do_primitive_test(True, "std_msgs/Bool")
00170         self.do_primitive_test(False, "std_msgs/Bool")
00171 
00172     def test_string_base_msg(self):
00173         for x in c.ros_primitive_types:
00174             self.do_primitive_test(x, "std_msgs/String")
00175 
00176     def test_time_msg(self):
00177         msg = {"data": {"secs": 3, "nsecs": 5}}
00178         self.do_test(msg, "std_msgs/Time")
00179 
00180         msg = {"times": [{"secs": 3, "nsecs": 5}, {"secs": 2, "nsecs": 7}]}
00181         self.do_test(msg, "rosbridge_library/TestTimeArray")
00182 
00183     def test_time_msg_now(self):
00184         msg = {"data": "now"}
00185         msgtype = "std_msgs/Time"
00186 
00187         inst = ros_loader.get_message_instance(msgtype)
00188         c.populate_instance(msg, inst)
00189         currenttime = rospy.get_rostime()
00190         self.validate_instance(inst)
00191         extracted = c.extract_values(inst)
00192         print extracted
00193         self.assertIn("data", extracted)
00194         self.assertIn("secs", extracted["data"])
00195         self.assertIn("nsecs", extracted["data"])
00196         self.assertNotEqual(extracted["data"]["secs"], 0)
00197         self.assertLessEqual(extracted["data"]["secs"], currenttime.secs)
00198         self.assertGreaterEqual(currenttime.secs, extracted["data"]["secs"])
00199 
00200     def test_duration_msg(self):
00201         msg = {"data": {"secs": 3, "nsecs": 5}}
00202         self.do_test(msg, "std_msgs/Duration")
00203 
00204         msg = {"durations": [{"secs": 3, "nsecs": 5}, {"secs": 2, "nsecs": 7}]}
00205         self.do_test(msg, "rosbridge_library/TestDurationArray")
00206 
00207     def test_header_msg(self):
00208         msg = {"seq": 5, "stamp": {"secs": 12347, "nsecs": 322304}, "frame_id": "2394dnfnlcx;v[p234j]"}
00209         self.do_test(msg, "std_msgs/Header")
00210 
00211         msg = {"header": msg}
00212         self.do_test(msg, "rosbridge_library/TestHeader")
00213         self.do_test(msg, "rosbridge_library/TestHeaderTwo")
00214 
00215         msg = {"header": [msg["header"], msg["header"], msg["header"]]}
00216         msg["header"][1]["seq"] = 6
00217         msg["header"][2]["seq"] = 7
00218         self.do_test(msg, "rosbridge_library/TestHeaderArray")
00219 
00220     def test_assorted_msgs(self):
00221         assortedmsgs = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
00222         "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
00223         "nav_msgs/OccupancyGrid", "geometry_msgs/Point32", "std_msgs/String",
00224         "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
00225         "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
00226         "sensor_msgs/PointCloud2"]
00227         for rostype in assortedmsgs:
00228             inst = ros_loader.get_message_instance(rostype)
00229             msg = c.extract_values(inst)
00230             self.do_test(msg, rostype)
00231             l = loads(dumps(msg))
00232             inst2 = ros_loader.get_message_instance(rostype)
00233             c.populate_instance(msg, inst2)
00234             self.assertEqual(inst, inst2)
00235 
00236     def test_int8array(self):
00237         def test_int8_msg(rostype, data):
00238             msg = {"data": data}
00239             inst = ros_loader.get_message_instance(rostype)
00240             c.populate_instance(msg, inst)
00241             self.validate_instance(inst)
00242             return inst.data
00243 
00244         for msgtype in ["TestChar", "TestUInt8"]:
00245             rostype = "rosbridge_library/" + msgtype
00246 
00247             int8s = range(0, 256)
00248             ret = test_int8_msg(rostype, int8s)
00249             self.assertEqual(ret, str(bytearray(int8s)))
00250 
00251             str_int8s = str(bytearray(int8s))
00252 
00253             b64str_int8s = standard_b64encode(str_int8s)
00254             ret = test_int8_msg(rostype, b64str_int8s)
00255             self.assertEqual(ret, str_int8s)
00256 
00257         for msgtype in ["TestUInt8FixedSizeArray16"]:
00258             rostype = "rosbridge_library/" + msgtype
00259 
00260             int8s = range(0, 16)
00261             ret = test_int8_msg(rostype, int8s)
00262             self.assertEqual(ret, str(bytearray(int8s)))
00263 
00264             str_int8s = str(bytearray(int8s))
00265 
00266             b64str_int8s = standard_b64encode(str_int8s)
00267             ret = test_int8_msg(rostype, b64str_int8s)
00268             self.assertEqual(ret, str_int8s)
00269 
00270 
00271 PKG = 'rosbridge_library'
00272 NAME = 'test_message_conversion'
00273 if __name__ == '__main__':
00274     rostest.unitrun(PKG, NAME, TestMessageConversion)


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Mon Oct 6 2014 06:58:09