00001
00002 from __future__ import print_function
00003 import sys
00004 import rospy
00005 import rostest
00006 import unittest
00007 from json import loads, dumps
00008
00009 try:
00010 from cStringIO import StringIO
00011 except ImportError:
00012 from io import BytesIO as StringIO
00013
00014 from rosbridge_library.internal import message_conversion as c
00015 from rosbridge_library.internal import ros_loader
00016 from base64 import standard_b64encode, standard_b64decode
00017
00018 if sys.version_info >= (3, 0):
00019 string_types = (str,)
00020 else:
00021 string_types = (str, unicode)
00022
00023
00024 class TestMessageConversion(unittest.TestCase):
00025
00026 def setUp(self):
00027 rospy.init_node("test_message_conversion")
00028
00029 def validate_instance(self, inst1):
00030 """ Serializes and deserializes the inst to typecheck and ensure that
00031 instances are correct """
00032 inst1._check_types()
00033 buff = StringIO()
00034 inst1.serialize(buff)
00035 inst2 = type(inst1)()
00036 inst2.deserialize(buff.getvalue())
00037 self.assertEqual(inst1, inst2)
00038 inst2._check_types()
00039
00040 def msgs_equal(self, msg1, msg2):
00041 if type(msg1) in string_types and type(msg2) in string_types:
00042 pass
00043 else:
00044 self.assertEqual(type(msg1), type(msg2))
00045 if type(msg1) in c.list_types:
00046 for x, y in zip(msg1, msg2):
00047 self.msgs_equal(x, y)
00048 elif type(msg1) in c.primitive_types or type(msg1) in c.string_types:
00049 self.assertEqual(msg1, msg2)
00050 else:
00051 for x in msg1:
00052 self.assertTrue(x in msg2)
00053 for x in msg2:
00054 self.assertTrue(x in msg1)
00055 for x in msg1:
00056 self.msgs_equal(msg1[x], msg2[x])
00057
00058 def do_primitive_test(self, data_value, msgtype):
00059 for msg in [{"data": data_value}, loads(dumps({"data": data_value}))]:
00060 inst = ros_loader.get_message_instance(msgtype)
00061 c.populate_instance(msg, inst)
00062 self.assertEqual(inst.data, data_value)
00063 self.validate_instance(inst)
00064 extracted = c.extract_values(inst)
00065 for msg2 in [extracted, loads(dumps(extracted))]:
00066 self.msgs_equal(msg, msg2)
00067 self.assertEqual(msg["data"], msg2["data"])
00068 self.assertEqual(msg2["data"], inst.data)
00069
00070 def do_test(self, orig_msg, msgtype):
00071 for msg in [orig_msg, loads(dumps(orig_msg))]:
00072 inst = ros_loader.get_message_instance(msgtype)
00073 c.populate_instance(msg, inst)
00074 self.validate_instance(inst)
00075 extracted = c.extract_values(inst)
00076 for msg2 in [extracted, loads(dumps(extracted))]:
00077 self.msgs_equal(msg, msg2)
00078
00079 def test_int_primitives(self):
00080
00081 for msg in range(-100, 100):
00082 for rostype in ["int8", "int16", "int32", "int64"]:
00083 self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
00084 self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
00085
00086 for msg in range(0, 200):
00087 for rostype in ["uint8", "uint16", "uint32", "uint64"]:
00088 self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
00089 self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
00090
00091 def test_bool_primitives(self):
00092 self.assertTrue(c._to_primitive_inst(True, "bool", "bool", []))
00093 self.assertTrue(c._to_inst(True, "bool", "bool"))
00094 self.assertFalse(c._to_primitive_inst(False, "bool", "bool", []))
00095 self.assertFalse(c._to_inst(False, "bool", "bool"))
00096
00097 def test_float_primitives(self):
00098 for msg in [0.12341234 + i for i in range(-100, 100)]:
00099 for rostype in ["float32", "float64"]:
00100 self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
00101 self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
00102 c._to_inst(msg, rostype, rostype)
00103
00104 def test_float_special_cases(self):
00105 for msg in [1e9999999, -1e9999999, float('nan')]:
00106 for rostype in ["float32", "float64"]:
00107 self.assertEqual(c._from_inst(msg, rostype), None)
00108 self.assertEqual(dumps({"data":c._from_inst(msg, rostype)}), "{\"data\": null}")
00109
00110 def test_signed_int_base_msgs(self):
00111 int8s = range(-127, 128)
00112 for int8 in int8s:
00113 self.do_primitive_test(int8, "std_msgs/Byte")
00114 self.do_primitive_test(int8, "std_msgs/Int8")
00115 self.do_primitive_test(int8, "std_msgs/Int16")
00116 self.do_primitive_test(int8, "std_msgs/Int32")
00117 self.do_primitive_test(int8, "std_msgs/Int64")
00118
00119 int16s = [-32767, 32767]
00120 for int16 in int16s:
00121 self.do_primitive_test(int16, "std_msgs/Int16")
00122 self.do_primitive_test(int16, "std_msgs/Int32")
00123 self.do_primitive_test(int16, "std_msgs/Int64")
00124 self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/Byte")
00125 self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/Int8")
00126
00127 int32s = [-2147483647, 2147483647]
00128 for int32 in int32s:
00129 self.do_primitive_test(int32, "std_msgs/Int32")
00130 self.do_primitive_test(int32, "std_msgs/Int64")
00131 self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Byte")
00132 self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Int8")
00133 self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Int16")
00134
00135 int64s = [-9223372036854775807, 9223372036854775807]
00136 for int64 in int64s:
00137 self.do_primitive_test(int64, "std_msgs/Int64")
00138 self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Byte")
00139 self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int8")
00140 self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int16")
00141 self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int32")
00142
00143 def test_unsigned_int_base_msgs(self):
00144 int8s = range(0, 256)
00145 for int8 in int8s:
00146 self.do_primitive_test(int8, "std_msgs/Char")
00147 self.do_primitive_test(int8, "std_msgs/UInt8")
00148 self.do_primitive_test(int8, "std_msgs/UInt16")
00149 self.do_primitive_test(int8, "std_msgs/UInt32")
00150 self.do_primitive_test(int8, "std_msgs/UInt64")
00151
00152 int16s = [32767, 32768, 65535]
00153 for int16 in int16s:
00154 self.do_primitive_test(int16, "std_msgs/UInt16")
00155 self.do_primitive_test(int16, "std_msgs/UInt32")
00156 self.do_primitive_test(int16, "std_msgs/UInt64")
00157 self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/Char")
00158 self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/UInt8")
00159
00160 int32s = [2147483647, 2147483648, 4294967295]
00161 for int32 in int32s:
00162 self.do_primitive_test(int32, "std_msgs/UInt32")
00163 self.do_primitive_test(int32, "std_msgs/UInt64")
00164 self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Char")
00165 self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/UInt8")
00166 self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/UInt16")
00167
00168 int64s = [4294967296, 9223372036854775807, 9223372036854775808,
00169 18446744073709551615]
00170 for int64 in int64s:
00171 self.do_primitive_test(int64, "std_msgs/UInt64")
00172 self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Char")
00173 self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/UInt8")
00174 self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/UInt16")
00175 self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/UInt32")
00176
00177 def test_bool_base_msg(self):
00178 self.do_primitive_test(True, "std_msgs/Bool")
00179 self.do_primitive_test(False, "std_msgs/Bool")
00180
00181 def test_string_base_msg(self):
00182 for x in c.ros_primitive_types:
00183 self.do_primitive_test(x, "std_msgs/String")
00184
00185 def test_time_msg(self):
00186 msg = {"data": {"secs": 3, "nsecs": 5}}
00187 self.do_test(msg, "std_msgs/Time")
00188
00189 msg = {"times": [{"secs": 3, "nsecs": 5}, {"secs": 2, "nsecs": 7}]}
00190 self.do_test(msg, "rosbridge_library/TestTimeArray")
00191
00192 def test_time_msg_now(self):
00193 msg = {"data": "now"}
00194 msgtype = "std_msgs/Time"
00195
00196 inst = ros_loader.get_message_instance(msgtype)
00197 c.populate_instance(msg, inst)
00198 currenttime = rospy.get_rostime()
00199 self.validate_instance(inst)
00200 extracted = c.extract_values(inst)
00201 print(extracted)
00202 self.assertIn("data", extracted)
00203 self.assertIn("secs", extracted["data"])
00204 self.assertIn("nsecs", extracted["data"])
00205 self.assertNotEqual(extracted["data"]["secs"], 0)
00206 self.assertLessEqual(extracted["data"]["secs"], currenttime.secs)
00207 self.assertGreaterEqual(currenttime.secs, extracted["data"]["secs"])
00208
00209 def test_duration_msg(self):
00210 msg = {"data": {"secs": 3, "nsecs": 5}}
00211 self.do_test(msg, "std_msgs/Duration")
00212
00213 msg = {"durations": [{"secs": 3, "nsecs": 5}, {"secs": 2, "nsecs": 7}]}
00214 self.do_test(msg, "rosbridge_library/TestDurationArray")
00215
00216 def test_header_msg(self):
00217 msg = {"seq": 5, "stamp": {"secs": 12347, "nsecs": 322304}, "frame_id": "2394dnfnlcx;v[p234j]"}
00218 self.do_test(msg, "std_msgs/Header")
00219
00220 msg = {"header": msg}
00221 self.do_test(msg, "rosbridge_library/TestHeader")
00222 self.do_test(msg, "rosbridge_library/TestHeaderTwo")
00223
00224 msg = {"header": [msg["header"], msg["header"], msg["header"]]}
00225 msg["header"][1]["seq"] = 6
00226 msg["header"][2]["seq"] = 7
00227 self.do_test(msg, "rosbridge_library/TestHeaderArray")
00228
00229 def test_assorted_msgs(self):
00230 assortedmsgs = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
00231 "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
00232 "nav_msgs/OccupancyGrid", "geometry_msgs/Point32", "std_msgs/String",
00233 "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
00234 "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
00235 "sensor_msgs/PointCloud2"]
00236 for rostype in assortedmsgs:
00237 inst = ros_loader.get_message_instance(rostype)
00238 msg = c.extract_values(inst)
00239 self.do_test(msg, rostype)
00240 l = loads(dumps(msg))
00241 inst2 = ros_loader.get_message_instance(rostype)
00242 c.populate_instance(msg, inst2)
00243 self.assertEqual(inst, inst2)
00244
00245 def test_int8array(self):
00246 def test_int8_msg(rostype, data):
00247 msg = {"data": data}
00248 inst = ros_loader.get_message_instance(rostype)
00249 c.populate_instance(msg, inst)
00250 self.validate_instance(inst)
00251 return inst.data
00252
00253 for msgtype in ["TestChar", "TestUInt8"]:
00254 rostype = "rosbridge_library/" + msgtype
00255
00256 int8s = list(range(0, 256))
00257 ret = test_int8_msg(rostype, int8s)
00258 self.assertEqual(ret, bytes(bytearray(int8s)))
00259
00260 str_int8s = bytes(bytearray(int8s))
00261
00262 b64str_int8s = standard_b64encode(str_int8s).decode('ascii')
00263 ret = test_int8_msg(rostype, b64str_int8s)
00264 self.assertEqual(ret, str_int8s)
00265
00266 for msgtype in ["TestUInt8FixedSizeArray16"]:
00267 rostype = "rosbridge_library/" + msgtype
00268
00269 int8s = list(range(0, 16))
00270 ret = test_int8_msg(rostype, int8s)
00271 self.assertEqual(ret, bytes(bytearray(int8s)))
00272
00273 str_int8s = bytes(bytearray(int8s))
00274
00275 b64str_int8s = standard_b64encode(str_int8s).decode('ascii')
00276 ret = test_int8_msg(rostype, b64str_int8s)
00277 self.assertEqual(ret, str_int8s)
00278
00279
00280 PKG = 'rosbridge_library'
00281 NAME = 'test_message_conversion'
00282 if __name__ == '__main__':
00283 rostest.unitrun(PKG, NAME, TestMessageConversion)