00001
00002 import sys
00003 import rospy
00004 import rostest
00005 import unittest
00006 from json import loads, dumps
00007
00008 from StringIO import StringIO
00009
00010 from rosbridge_library.internal import message_conversion as c
00011 from rosbridge_library.internal import ros_loader
00012 from base64 import standard_b64encode, standard_b64decode
00013
00014
00015 class TestMessageConversion(unittest.TestCase):
00016
00017 def setUp(self):
00018 rospy.init_node("test_message_conversion")
00019
00020 def validate_instance(self, inst1):
00021 """ Serializes and deserializes the inst to typecheck and ensure that
00022 instances are correct """
00023 inst1._check_types()
00024 buff = StringIO()
00025 inst1.serialize(buff)
00026 inst2 = type(inst1)()
00027 inst2.deserialize(buff.getvalue())
00028 self.assertEqual(inst1, inst2)
00029 inst2._check_types()
00030
00031 def msgs_equal(self, msg1, msg2):
00032 if type(msg1) in [str, unicode] and type(msg2) in [str, unicode]:
00033 pass
00034 else:
00035 self.assertEqual(type(msg1), type(msg2))
00036 if type(msg1) in c.list_types:
00037 for x, y in zip(msg1, msg2):
00038 self.msgs_equal(x, y)
00039 elif type(msg1) in c.primitive_types or type(msg1) in c.string_types:
00040 self.assertEqual(msg1, msg2)
00041 else:
00042 for x in msg1:
00043 self.assertTrue(x in msg2)
00044 for x in msg2:
00045 self.assertTrue(x in msg1)
00046 for x in msg1:
00047 self.msgs_equal(msg1[x], msg2[x])
00048
00049 def do_primitive_test(self, data_value, msgtype):
00050 for msg in [{"data": data_value}, loads(dumps({"data": data_value}))]:
00051 inst = ros_loader.get_message_instance(msgtype)
00052 c.populate_instance(msg, inst)
00053 self.assertEqual(inst.data, data_value)
00054 self.validate_instance(inst)
00055 extracted = c.extract_values(inst)
00056 for msg2 in [extracted, loads(dumps(extracted))]:
00057 self.msgs_equal(msg, msg2)
00058 self.assertEqual(msg["data"], msg2["data"])
00059 self.assertEqual(msg2["data"], inst.data)
00060
00061 def do_test(self, orig_msg, msgtype):
00062 for msg in [orig_msg, loads(dumps(orig_msg))]:
00063 inst = ros_loader.get_message_instance(msgtype)
00064 c.populate_instance(msg, inst)
00065 self.validate_instance(inst)
00066 extracted = c.extract_values(inst)
00067 for msg2 in [extracted, loads(dumps(extracted))]:
00068 self.msgs_equal(msg, msg2)
00069
00070 def test_int_primitives(self):
00071
00072 for msg in range(-100, 100):
00073 for rostype in ["int8", "int16", "int32", "int64"]:
00074 self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
00075 self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
00076
00077 for msg in range(0, 200):
00078 for rostype in ["uint8", "uint16", "uint32", "uint64"]:
00079 self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
00080 self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
00081
00082 def test_bool_primitives(self):
00083 self.assertTrue(c._to_primitive_inst(True, "bool", "bool", []))
00084 self.assertTrue(c._to_inst(True, "bool", "bool"))
00085 self.assertFalse(c._to_primitive_inst(False, "bool", "bool", []))
00086 self.assertFalse(c._to_inst(False, "bool", "bool"))
00087
00088 def test_float_primitives(self):
00089 for msg in [0.12341234 + i for i in range(-100, 100)]:
00090 for rostype in ["float32", "float64"]:
00091 self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
00092 self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
00093 c._to_inst(msg, rostype, rostype)
00094
00095 def test_float_special_cases(self):
00096 for msg in [1e9999999, -1e9999999, float('nan')]:
00097 for rostype in ["float32", "float64"]:
00098 self.assertEqual(c._from_inst(msg, rostype), None)
00099 self.assertEqual(dumps({"data":c._from_inst(msg, rostype)}), "{\"data\": null}")
00100
00101 def test_signed_int_base_msgs(self):
00102 int8s = range(-127, 128)
00103 for int8 in int8s:
00104 self.do_primitive_test(int8, "std_msgs/Byte")
00105 self.do_primitive_test(int8, "std_msgs/Int8")
00106 self.do_primitive_test(int8, "std_msgs/Int16")
00107 self.do_primitive_test(int8, "std_msgs/Int32")
00108 self.do_primitive_test(int8, "std_msgs/Int64")
00109
00110 int16s = [-32767, 32767]
00111 for int16 in int16s:
00112 self.do_primitive_test(int16, "std_msgs/Int16")
00113 self.do_primitive_test(int16, "std_msgs/Int32")
00114 self.do_primitive_test(int16, "std_msgs/Int64")
00115 self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/Byte")
00116 self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/Int8")
00117
00118 int32s = [-2147483647, 2147483647]
00119 for int32 in int32s:
00120 self.do_primitive_test(int32, "std_msgs/Int32")
00121 self.do_primitive_test(int32, "std_msgs/Int64")
00122 self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Byte")
00123 self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Int8")
00124 self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Int16")
00125
00126 int64s = [-9223372036854775807, 9223372036854775807]
00127 for int64 in int64s:
00128 self.do_primitive_test(int64, "std_msgs/Int64")
00129 self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Byte")
00130 self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int8")
00131 self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int16")
00132 self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int32")
00133
00134 def test_unsigned_int_base_msgs(self):
00135 int8s = range(0, 256)
00136 for int8 in int8s:
00137 self.do_primitive_test(int8, "std_msgs/Char")
00138 self.do_primitive_test(int8, "std_msgs/UInt8")
00139 self.do_primitive_test(int8, "std_msgs/UInt16")
00140 self.do_primitive_test(int8, "std_msgs/UInt32")
00141 self.do_primitive_test(int8, "std_msgs/UInt64")
00142
00143 int16s = [32767, 32768, 65535]
00144 for int16 in int16s:
00145 self.do_primitive_test(int16, "std_msgs/UInt16")
00146 self.do_primitive_test(int16, "std_msgs/UInt32")
00147 self.do_primitive_test(int16, "std_msgs/UInt64")
00148 self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/Char")
00149 self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/UInt8")
00150
00151 int32s = [2147483647, 2147483648, 4294967295]
00152 for int32 in int32s:
00153 self.do_primitive_test(int32, "std_msgs/UInt32")
00154 self.do_primitive_test(int32, "std_msgs/UInt64")
00155 self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Char")
00156 self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/UInt8")
00157 self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/UInt16")
00158
00159 int64s = [4294967296, 9223372036854775807, 9223372036854775808,
00160 18446744073709551615]
00161 for int64 in int64s:
00162 self.do_primitive_test(int64, "std_msgs/UInt64")
00163 self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Char")
00164 self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/UInt8")
00165 self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/UInt16")
00166 self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/UInt32")
00167
00168 def test_bool_base_msg(self):
00169 self.do_primitive_test(True, "std_msgs/Bool")
00170 self.do_primitive_test(False, "std_msgs/Bool")
00171
00172 def test_string_base_msg(self):
00173 for x in c.ros_primitive_types:
00174 self.do_primitive_test(x, "std_msgs/String")
00175
00176 def test_time_msg(self):
00177 msg = {"data": {"secs": 3, "nsecs": 5}}
00178 self.do_test(msg, "std_msgs/Time")
00179
00180 msg = {"times": [{"secs": 3, "nsecs": 5}, {"secs": 2, "nsecs": 7}]}
00181 self.do_test(msg, "rosbridge_library/TestTimeArray")
00182
00183 def test_time_msg_now(self):
00184 msg = {"data": "now"}
00185 msgtype = "std_msgs/Time"
00186
00187 inst = ros_loader.get_message_instance(msgtype)
00188 c.populate_instance(msg, inst)
00189 currenttime = rospy.get_rostime()
00190 self.validate_instance(inst)
00191 extracted = c.extract_values(inst)
00192 print extracted
00193 self.assertIn("data", extracted)
00194 self.assertIn("secs", extracted["data"])
00195 self.assertIn("nsecs", extracted["data"])
00196 self.assertNotEqual(extracted["data"]["secs"], 0)
00197 self.assertLessEqual(extracted["data"]["secs"], currenttime.secs)
00198 self.assertGreaterEqual(currenttime.secs, extracted["data"]["secs"])
00199
00200 def test_duration_msg(self):
00201 msg = {"data": {"secs": 3, "nsecs": 5}}
00202 self.do_test(msg, "std_msgs/Duration")
00203
00204 msg = {"durations": [{"secs": 3, "nsecs": 5}, {"secs": 2, "nsecs": 7}]}
00205 self.do_test(msg, "rosbridge_library/TestDurationArray")
00206
00207 def test_header_msg(self):
00208 msg = {"seq": 5, "stamp": {"secs": 12347, "nsecs": 322304}, "frame_id": "2394dnfnlcx;v[p234j]"}
00209 self.do_test(msg, "std_msgs/Header")
00210
00211 msg = {"header": msg}
00212 self.do_test(msg, "rosbridge_library/TestHeader")
00213 self.do_test(msg, "rosbridge_library/TestHeaderTwo")
00214
00215 msg = {"header": [msg["header"], msg["header"], msg["header"]]}
00216 msg["header"][1]["seq"] = 6
00217 msg["header"][2]["seq"] = 7
00218 self.do_test(msg, "rosbridge_library/TestHeaderArray")
00219
00220 def test_assorted_msgs(self):
00221 assortedmsgs = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
00222 "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
00223 "nav_msgs/OccupancyGrid", "geometry_msgs/Point32", "std_msgs/String",
00224 "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
00225 "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
00226 "sensor_msgs/PointCloud2"]
00227 for rostype in assortedmsgs:
00228 inst = ros_loader.get_message_instance(rostype)
00229 msg = c.extract_values(inst)
00230 self.do_test(msg, rostype)
00231 l = loads(dumps(msg))
00232 inst2 = ros_loader.get_message_instance(rostype)
00233 c.populate_instance(msg, inst2)
00234 self.assertEqual(inst, inst2)
00235
00236 def test_int8array(self):
00237 def test_int8_msg(rostype, data):
00238 msg = {"data": data}
00239 inst = ros_loader.get_message_instance(rostype)
00240 c.populate_instance(msg, inst)
00241 self.validate_instance(inst)
00242 return inst.data
00243
00244 for msgtype in ["TestChar", "TestUInt8"]:
00245 rostype = "rosbridge_library/" + msgtype
00246
00247 int8s = range(0, 256)
00248 ret = test_int8_msg(rostype, int8s)
00249 self.assertEqual(ret, str(bytearray(int8s)))
00250
00251 str_int8s = str(bytearray(int8s))
00252
00253 b64str_int8s = standard_b64encode(str_int8s)
00254 ret = test_int8_msg(rostype, b64str_int8s)
00255 self.assertEqual(ret, str_int8s)
00256
00257 for msgtype in ["TestUInt8FixedSizeArray16"]:
00258 rostype = "rosbridge_library/" + msgtype
00259
00260 int8s = range(0, 16)
00261 ret = test_int8_msg(rostype, int8s)
00262 self.assertEqual(ret, str(bytearray(int8s)))
00263
00264 str_int8s = str(bytearray(int8s))
00265
00266 b64str_int8s = standard_b64encode(str_int8s)
00267 ret = test_int8_msg(rostype, b64str_int8s)
00268 self.assertEqual(ret, str_int8s)
00269
00270
00271 PKG = 'rosbridge_library'
00272 NAME = 'test_message_conversion'
00273 if __name__ == '__main__':
00274 rostest.unitrun(PKG, NAME, TestMessageConversion)