test_message_conversion.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 from __future__ import print_function
00003 import sys
00004 import rospy
00005 import rostest
00006 import unittest
00007 from json import loads, dumps
00008 
00009 try:
00010     from cStringIO import StringIO  # Python 2.x
00011 except ImportError:
00012     from io import BytesIO as StringIO  # Python 3.x
00013 
00014 from rosbridge_library.internal import message_conversion as c
00015 from rosbridge_library.internal import ros_loader
00016 from base64 import standard_b64encode, standard_b64decode
00017 
00018 if sys.version_info >= (3, 0):
00019     string_types = (str,)
00020 else:
00021     string_types = (str, unicode)
00022 
00023 
00024 class TestMessageConversion(unittest.TestCase):
00025 
00026     def setUp(self):
00027         rospy.init_node("test_message_conversion")
00028 
00029     def validate_instance(self, inst1):
00030         """ Serializes and deserializes the inst to typecheck and ensure that
00031         instances are correct """
00032         inst1._check_types()
00033         buff = StringIO()
00034         inst1.serialize(buff)
00035         inst2 = type(inst1)()
00036         inst2.deserialize(buff.getvalue())
00037         self.assertEqual(inst1, inst2)
00038         inst2._check_types()
00039 
00040     def msgs_equal(self, msg1, msg2):
00041         if type(msg1) in string_types and type(msg2) in string_types:
00042             pass
00043         else:
00044             self.assertEqual(type(msg1), type(msg2))
00045         if type(msg1) in c.list_types:
00046             for x, y in zip(msg1, msg2):
00047                 self.msgs_equal(x, y)
00048         elif type(msg1) in c.primitive_types or type(msg1) in c.string_types:
00049             self.assertEqual(msg1, msg2)
00050         else:
00051             for x in msg1:
00052                 self.assertTrue(x in msg2)
00053             for x in msg2:
00054                 self.assertTrue(x in msg1)
00055             for x in msg1:
00056                 self.msgs_equal(msg1[x], msg2[x])
00057 
00058     def do_primitive_test(self, data_value, msgtype):
00059         for msg in [{"data": data_value}, loads(dumps({"data": data_value}))]:
00060             inst = ros_loader.get_message_instance(msgtype)
00061             c.populate_instance(msg, inst)
00062             self.assertEqual(inst.data, data_value)
00063             self.validate_instance(inst)
00064             extracted = c.extract_values(inst)
00065             for msg2 in [extracted, loads(dumps(extracted))]:
00066                 self.msgs_equal(msg, msg2)
00067                 self.assertEqual(msg["data"], msg2["data"])
00068                 self.assertEqual(msg2["data"], inst.data)
00069 
00070     def do_test(self, orig_msg, msgtype):
00071         for msg in [orig_msg, loads(dumps(orig_msg))]:
00072             inst = ros_loader.get_message_instance(msgtype)
00073             c.populate_instance(msg, inst)
00074             self.validate_instance(inst)
00075             extracted = c.extract_values(inst)
00076             for msg2 in [extracted, loads(dumps(extracted))]:
00077                 self.msgs_equal(msg, msg2)
00078 
00079     def test_int_primitives(self):
00080         # Test raw primitives
00081         for msg in range(-100, 100):
00082             for rostype in ["int8", "int16", "int32", "int64"]:
00083                 self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
00084                 self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
00085         # Test raw primitives
00086         for msg in range(0, 200):
00087             for rostype in ["uint8", "uint16", "uint32", "uint64"]:
00088                 self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
00089                 self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
00090 
00091     def test_bool_primitives(self):
00092         self.assertTrue(c._to_primitive_inst(True, "bool", "bool", []))
00093         self.assertTrue(c._to_inst(True, "bool", "bool"))
00094         self.assertFalse(c._to_primitive_inst(False, "bool", "bool", []))
00095         self.assertFalse(c._to_inst(False, "bool", "bool"))
00096 
00097     def test_float_primitives(self):
00098         for msg in [0.12341234 + i for i in range(-100, 100)]:
00099             for rostype in ["float32", "float64"]:
00100                 self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
00101                 self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
00102                 c._to_inst(msg, rostype, rostype)
00103 
00104     def test_float_special_cases(self):
00105         for msg in [1e9999999, -1e9999999, float('nan')]:
00106             for rostype in ["float32", "float64"]:
00107                 self.assertEqual(c._from_inst(msg, rostype), None)
00108                 self.assertEqual(dumps({"data":c._from_inst(msg, rostype)}), "{\"data\": null}")
00109 
00110     def test_signed_int_base_msgs(self):
00111         int8s = range(-127, 128)
00112         for int8 in int8s:
00113             self.do_primitive_test(int8, "std_msgs/Byte")
00114             self.do_primitive_test(int8, "std_msgs/Int8")
00115             self.do_primitive_test(int8, "std_msgs/Int16")
00116             self.do_primitive_test(int8, "std_msgs/Int32")
00117             self.do_primitive_test(int8, "std_msgs/Int64")
00118 
00119         int16s = [-32767, 32767]
00120         for int16 in int16s:
00121             self.do_primitive_test(int16, "std_msgs/Int16")
00122             self.do_primitive_test(int16, "std_msgs/Int32")
00123             self.do_primitive_test(int16, "std_msgs/Int64")
00124             self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/Byte")
00125             self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/Int8")
00126 
00127         int32s = [-2147483647, 2147483647]
00128         for int32 in int32s:
00129             self.do_primitive_test(int32, "std_msgs/Int32")
00130             self.do_primitive_test(int32, "std_msgs/Int64")
00131             self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Byte")
00132             self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Int8")
00133             self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Int16")
00134 
00135         int64s = [-9223372036854775807, 9223372036854775807]
00136         for int64 in int64s:
00137             self.do_primitive_test(int64, "std_msgs/Int64")
00138             self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Byte")
00139             self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int8")
00140             self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int16")
00141             self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int32")
00142 
00143     def test_unsigned_int_base_msgs(self):
00144         int8s = range(0, 256)
00145         for int8 in int8s:
00146             self.do_primitive_test(int8, "std_msgs/Char")
00147             self.do_primitive_test(int8, "std_msgs/UInt8")
00148             self.do_primitive_test(int8, "std_msgs/UInt16")
00149             self.do_primitive_test(int8, "std_msgs/UInt32")
00150             self.do_primitive_test(int8, "std_msgs/UInt64")
00151 
00152         int16s = [32767, 32768, 65535]
00153         for int16 in int16s:
00154             self.do_primitive_test(int16, "std_msgs/UInt16")
00155             self.do_primitive_test(int16, "std_msgs/UInt32")
00156             self.do_primitive_test(int16, "std_msgs/UInt64")
00157             self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/Char")
00158             self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/UInt8")
00159 
00160         int32s = [2147483647, 2147483648, 4294967295]
00161         for int32 in int32s:
00162             self.do_primitive_test(int32, "std_msgs/UInt32")
00163             self.do_primitive_test(int32, "std_msgs/UInt64")
00164             self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Char")
00165             self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/UInt8")
00166             self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/UInt16")
00167 
00168         int64s = [4294967296, 9223372036854775807, 9223372036854775808,
00169                    18446744073709551615]
00170         for int64 in int64s:
00171             self.do_primitive_test(int64, "std_msgs/UInt64")
00172             self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Char")
00173             self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/UInt8")
00174             self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/UInt16")
00175             self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/UInt32")
00176 
00177     def test_bool_base_msg(self):
00178         self.do_primitive_test(True, "std_msgs/Bool")
00179         self.do_primitive_test(False, "std_msgs/Bool")
00180 
00181     def test_string_base_msg(self):
00182         for x in c.ros_primitive_types:
00183             self.do_primitive_test(x, "std_msgs/String")
00184 
00185     def test_time_msg(self):
00186         msg = {"data": {"secs": 3, "nsecs": 5}}
00187         self.do_test(msg, "std_msgs/Time")
00188 
00189         msg = {"times": [{"secs": 3, "nsecs": 5}, {"secs": 2, "nsecs": 7}]}
00190         self.do_test(msg, "rosbridge_library/TestTimeArray")
00191 
00192     def test_time_msg_now(self):
00193         msg = {"data": "now"}
00194         msgtype = "std_msgs/Time"
00195 
00196         inst = ros_loader.get_message_instance(msgtype)
00197         c.populate_instance(msg, inst)
00198         currenttime = rospy.get_rostime()
00199         self.validate_instance(inst)
00200         extracted = c.extract_values(inst)
00201         print(extracted)
00202         self.assertIn("data", extracted)
00203         self.assertIn("secs", extracted["data"])
00204         self.assertIn("nsecs", extracted["data"])
00205         self.assertNotEqual(extracted["data"]["secs"], 0)
00206         self.assertLessEqual(extracted["data"]["secs"], currenttime.secs)
00207         self.assertGreaterEqual(currenttime.secs, extracted["data"]["secs"])
00208 
00209     def test_duration_msg(self):
00210         msg = {"data": {"secs": 3, "nsecs": 5}}
00211         self.do_test(msg, "std_msgs/Duration")
00212 
00213         msg = {"durations": [{"secs": 3, "nsecs": 5}, {"secs": 2, "nsecs": 7}]}
00214         self.do_test(msg, "rosbridge_library/TestDurationArray")
00215 
00216     def test_header_msg(self):
00217         msg = {"seq": 5, "stamp": {"secs": 12347, "nsecs": 322304}, "frame_id": "2394dnfnlcx;v[p234j]"}
00218         self.do_test(msg, "std_msgs/Header")
00219 
00220         msg = {"header": msg}
00221         self.do_test(msg, "rosbridge_library/TestHeader")
00222         self.do_test(msg, "rosbridge_library/TestHeaderTwo")
00223 
00224         msg = {"header": [msg["header"], msg["header"], msg["header"]]}
00225         msg["header"][1]["seq"] = 6
00226         msg["header"][2]["seq"] = 7
00227         self.do_test(msg, "rosbridge_library/TestHeaderArray")
00228 
00229     def test_assorted_msgs(self):
00230         assortedmsgs = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
00231         "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
00232         "nav_msgs/OccupancyGrid", "geometry_msgs/Point32", "std_msgs/String",
00233         "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
00234         "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
00235         "sensor_msgs/PointCloud2"]
00236         for rostype in assortedmsgs:
00237             inst = ros_loader.get_message_instance(rostype)
00238             msg = c.extract_values(inst)
00239             self.do_test(msg, rostype)
00240             l = loads(dumps(msg))
00241             inst2 = ros_loader.get_message_instance(rostype)
00242             c.populate_instance(msg, inst2)
00243             self.assertEqual(inst, inst2)
00244 
00245     def test_int8array(self):
00246         def test_int8_msg(rostype, data):
00247             msg = {"data": data}
00248             inst = ros_loader.get_message_instance(rostype)
00249             c.populate_instance(msg, inst)
00250             self.validate_instance(inst)
00251             return inst.data
00252 
00253         for msgtype in ["TestChar", "TestUInt8"]:
00254             rostype = "rosbridge_library/" + msgtype
00255 
00256             int8s = list(range(0, 256))
00257             ret = test_int8_msg(rostype, int8s)
00258             self.assertEqual(ret, bytes(bytearray(int8s)))
00259 
00260             str_int8s = bytes(bytearray(int8s))
00261 
00262             b64str_int8s = standard_b64encode(str_int8s).decode('ascii')
00263             ret = test_int8_msg(rostype, b64str_int8s)
00264             self.assertEqual(ret, str_int8s)
00265 
00266         for msgtype in ["TestUInt8FixedSizeArray16"]:
00267             rostype = "rosbridge_library/" + msgtype
00268 
00269             int8s = list(range(0, 16))
00270             ret = test_int8_msg(rostype, int8s)
00271             self.assertEqual(ret, bytes(bytearray(int8s)))
00272 
00273             str_int8s = bytes(bytearray(int8s))
00274 
00275             b64str_int8s = standard_b64encode(str_int8s).decode('ascii')
00276             ret = test_int8_msg(rostype, b64str_int8s)
00277             self.assertEqual(ret, str_int8s)
00278 
00279 
00280 PKG = 'rosbridge_library'
00281 NAME = 'test_message_conversion'
00282 if __name__ == '__main__':
00283     rostest.unitrun(PKG, NAME, TestMessageConversion)


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Thu Jun 6 2019 21:51:43