00001
00002 PKG = 'rosbridge_library'
00003 import roslib; roslib.load_manifest(PKG)
00004 import rospy
00005 from json import loads, dumps
00006
00007 from StringIO import StringIO
00008
00009 from rosbridge_library.internal import message_conversion as c
00010 from rosbridge_library.internal import ros_loader
00011 from base64 import standard_b64encode, standard_b64decode
00012
00013 import unittest
00014
00015
00016 class TestMessageConversion(unittest.TestCase):
00017
00018 def setUp(self):
00019 rospy.init_node("test_message_conversion")
00020
00021 def validate_instance(self, inst1):
00022 """ Serializes and deserializes the inst to typecheck and ensure that
00023 instances are correct """
00024 inst1._check_types()
00025 buff = StringIO()
00026 inst1.serialize(buff)
00027 inst2 = type(inst1)()
00028 inst2.deserialize(buff.getvalue())
00029 self.assertEqual(inst1, inst2)
00030 inst2._check_types()
00031
00032 def msgs_equal(self, msg1, msg2):
00033 if type(msg1) in [str, unicode] and type(msg2) in [str, unicode]:
00034 pass
00035 else:
00036 self.assertEqual(type(msg1), type(msg2))
00037 if type(msg1) in c.list_types:
00038 for x, y in zip(msg1, msg2):
00039 self.msgs_equal(x, y)
00040 elif type(msg1) in c.primitive_types or type(msg1) in c.string_types:
00041 self.assertEqual(msg1, msg2)
00042 else:
00043 for x in msg1:
00044 self.assertTrue(x in msg2)
00045 for x in msg2:
00046 self.assertTrue(x in msg1)
00047 for x in msg1:
00048 self.msgs_equal(msg1[x], msg2[x])
00049
00050 def do_primitive_test(self, data_value, msgtype):
00051 for msg in [{"data": data_value}, loads(dumps({"data": data_value}))]:
00052 inst = ros_loader.get_message_instance(msgtype)
00053 c.populate_instance(msg, inst)
00054 self.assertEqual(inst.data, data_value)
00055 self.validate_instance(inst)
00056 extracted = c.extract_values(inst)
00057 for msg2 in [extracted, loads(dumps(extracted))]:
00058 self.msgs_equal(msg, msg2)
00059 self.assertEqual(msg["data"], msg2["data"])
00060 self.assertEqual(msg2["data"], inst.data)
00061
00062 def do_test(self, orig_msg, msgtype):
00063 for msg in [orig_msg, loads(dumps(orig_msg))]:
00064 inst = ros_loader.get_message_instance(msgtype)
00065 c.populate_instance(msg, inst)
00066 self.validate_instance(inst)
00067 extracted = c.extract_values(inst)
00068 for msg2 in [extracted, loads(dumps(extracted))]:
00069 self.msgs_equal(msg, msg2)
00070
00071 def test_int_primitives(self):
00072
00073 for msg in range(-100, 100):
00074 for rostype in ["int8", "int16", "int32", "int64"]:
00075 self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
00076 self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
00077
00078 for msg in range(0, 200):
00079 for rostype in ["uint8", "uint16", "uint32", "uint64"]:
00080 self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
00081 self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
00082
00083 def test_bool_primitives(self):
00084 self.assertTrue(c._to_primitive_inst(True, "bool", "bool", []))
00085 self.assertTrue(c._to_inst(True, "bool", "bool"))
00086 self.assertFalse(c._to_primitive_inst(False, "bool", "bool", []))
00087 self.assertFalse(c._to_inst(False, "bool", "bool"))
00088
00089 def test_float_primitives(self):
00090 for msg in [0.12341234 + i for i in range(-100, 100)]:
00091 for rostype in ["float32", "float64"]:
00092 self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
00093 self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
00094
00095 def test_signed_int_base_msgs(self):
00096 int8s = range(-127, 128)
00097 for int8 in int8s:
00098 self.do_primitive_test(int8, "std_msgs/Byte")
00099 self.do_primitive_test(int8, "std_msgs/Int8")
00100 self.do_primitive_test(int8, "std_msgs/Int16")
00101 self.do_primitive_test(int8, "std_msgs/Int32")
00102 self.do_primitive_test(int8, "std_msgs/Int64")
00103
00104 int16s = [-32767, 32767]
00105 for int16 in int16s:
00106 self.do_primitive_test(int16, "std_msgs/Int16")
00107 self.do_primitive_test(int16, "std_msgs/Int32")
00108 self.do_primitive_test(int16, "std_msgs/Int64")
00109 self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/Byte")
00110 self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/Int8")
00111
00112 int32s = [-2147483647, 2147483647]
00113 for int32 in int32s:
00114 self.do_primitive_test(int32, "std_msgs/Int32")
00115 self.do_primitive_test(int32, "std_msgs/Int64")
00116 self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Byte")
00117 self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Int8")
00118 self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Int16")
00119
00120 int64s = [-9223372036854775807, 9223372036854775807]
00121 for int64 in int64s:
00122 self.do_primitive_test(int64, "std_msgs/Int64")
00123 self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Byte")
00124 self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int8")
00125 self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int16")
00126 self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int32")
00127
00128 def test_unsigned_int_base_msgs(self):
00129 int8s = range(0, 256)
00130 for int8 in int8s:
00131 self.do_primitive_test(int8, "std_msgs/Char")
00132 self.do_primitive_test(int8, "std_msgs/UInt8")
00133 self.do_primitive_test(int8, "std_msgs/UInt16")
00134 self.do_primitive_test(int8, "std_msgs/UInt32")
00135 self.do_primitive_test(int8, "std_msgs/UInt64")
00136
00137 int16s = [32767, 32768, 65535]
00138 for int16 in int16s:
00139 self.do_primitive_test(int16, "std_msgs/UInt16")
00140 self.do_primitive_test(int16, "std_msgs/UInt32")
00141 self.do_primitive_test(int16, "std_msgs/UInt64")
00142 self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/Char")
00143 self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/UInt8")
00144
00145 int32s = [2147483647, 2147483648, 4294967295]
00146 for int32 in int32s:
00147 self.do_primitive_test(int32, "std_msgs/UInt32")
00148 self.do_primitive_test(int32, "std_msgs/UInt64")
00149 self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Char")
00150 self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/UInt8")
00151 self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/UInt16")
00152
00153 int64s = [4294967296, 9223372036854775807, 9223372036854775808,
00154 18446744073709551615]
00155 for int64 in int64s:
00156 self.do_primitive_test(int64, "std_msgs/UInt64")
00157 self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Char")
00158 self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/UInt8")
00159 self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/UInt16")
00160 self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/UInt32")
00161
00162 def test_bool_base_msg(self):
00163 self.do_primitive_test(True, "std_msgs/Bool")
00164 self.do_primitive_test(False, "std_msgs/Bool")
00165
00166 def test_string_base_msg(self):
00167 for x in c.ros_primitive_types:
00168 self.do_primitive_test(x, "std_msgs/String")
00169
00170 def test_time_msg(self):
00171 msg = {"data": {"secs": 3, "nsecs": 5}}
00172 self.do_test(msg, "std_msgs/Time")
00173
00174 msg = {"times": [{"secs": 3, "nsecs": 5}, {"secs": 2, "nsecs": 7}]}
00175 self.do_test(msg, "rosbridge_test/TimeArrayTest")
00176
00177 def test_time_msg_now(self):
00178 msg = {"data": "now"}
00179 msgtype = "std_msgs/Time"
00180
00181 inst = ros_loader.get_message_instance(msgtype)
00182 c.populate_instance(msg, inst)
00183 currenttime = rospy.get_rostime()
00184 self.validate_instance(inst)
00185 extracted = c.extract_values(inst)
00186 print extracted
00187 self.assertIn("data", extracted)
00188 self.assertIn("secs", extracted["data"])
00189 self.assertIn("nsecs", extracted["data"])
00190 self.assertNotEqual(extracted["data"]["secs"], 0)
00191 self.assertLessEqual(extracted["data"]["secs"], currenttime.secs)
00192 self.assertGreaterEqual(currenttime.secs, extracted["data"]["secs"])
00193
00194 def test_duration_msg(self):
00195 msg = {"data": {"secs": 3, "nsecs": 5}}
00196 self.do_test(msg, "std_msgs/Duration")
00197
00198 msg = {"durations": [{"secs": 3, "nsecs": 5}, {"secs": 2, "nsecs": 7}]}
00199 self.do_test(msg, "rosbridge_test/DurationArrayTest")
00200
00201 def test_header_msg(self):
00202 msg = {"seq": 5, "stamp": {"secs": 12347, "nsecs": 322304}, "frame_id": "2394dnfnlcx;v[p234j]"}
00203 self.do_test(msg, "std_msgs/Header")
00204
00205 msg = {"header": msg}
00206 self.do_test(msg, "rosbridge_test/HeaderTest")
00207 self.do_test(msg, "rosbridge_test/HeaderTestTwo")
00208
00209 msg = {"header": [msg["header"], msg["header"], msg["header"]]}
00210 msg["header"][1]["seq"] = 6
00211 msg["header"][2]["seq"] = 7
00212 self.do_test(msg, "rosbridge_test/HeaderArrayTest")
00213
00214 def test_assorted_msgs(self):
00215 assortedmsgs = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
00216 "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
00217 "nav_msgs/OccupancyGrid", "geometry_msgs/Point32", "std_msgs/String",
00218 "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
00219 "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
00220 "sensor_msgs/PointCloud2"]
00221 for rostype in assortedmsgs:
00222 inst = ros_loader.get_message_instance(rostype)
00223 msg = c.extract_values(inst)
00224 self.do_test(msg, rostype)
00225 l = loads(dumps(msg))
00226 inst2 = ros_loader.get_message_instance(rostype)
00227 c.populate_instance(msg, inst2)
00228 self.assertEqual(inst, inst2)
00229
00230 def test_int8array(self):
00231 def test_int8_msg(rostype, data):
00232 msg = {"data": data}
00233 inst = ros_loader.get_message_instance(rostype)
00234 c.populate_instance(msg, inst)
00235 self.validate_instance(inst)
00236 return inst.data
00237
00238 for msgtype in ["CharTest", "UInt8Test"]:
00239 rostype = "rosbridge_test/" + msgtype
00240
00241 int8s = range(0, 256)
00242 ret = test_int8_msg(rostype, int8s)
00243 self.assertEqual(ret, str(bytearray(int8s)))
00244
00245 str_int8s = str(bytearray(int8s))
00246
00247 b64str_int8s = standard_b64encode(str_int8s)
00248 ret = test_int8_msg(rostype, b64str_int8s)
00249 self.assertEqual(ret, str_int8s)
00250
00251
00252 if __name__ == '__main__':
00253 import rostest
00254 rostest.rosrun(PKG, 'test_publish', TestMessageConversion)