2 from __future__
import print_function
7 from json
import loads, dumps
10 from cStringIO
import StringIO
12 from io
import BytesIO
as StringIO
16 from base64
import standard_b64encode, standard_b64decode
18 if sys.version_info >= (3, 0):
21 string_types = (str, unicode)
24 PKG =
'rosbridge_library'
25 NAME =
'test_message_conversion'
34 """ Serializes and deserializes the inst to typecheck and ensure that
35 instances are correct """
41 inst2.deserialize(buff.getvalue())
42 self.assertEqual(inst1, inst2)
46 if type(msg1)
in string_types
and type(msg2)
in string_types:
49 self.assertEqual(type(msg1), type(msg2))
50 if type(msg1)
in c.list_types:
51 for x, y
in zip(msg1, msg2):
53 elif type(msg1)
in c.primitive_types
or type(msg1)
in c.string_types:
54 self.assertEqual(msg1, msg2)
57 self.assertTrue(x
in msg2)
59 self.assertTrue(x
in msg1)
64 for msg
in [{
"data": data_value},
loads(
dumps({
"data": data_value}))]:
65 inst = ros_loader.get_message_instance(msgtype)
66 c.populate_instance(msg, inst)
67 self.assertEqual(inst.data, data_value)
69 extracted = c.extract_values(inst)
70 for msg2
in [extracted,
loads(
dumps(extracted))]:
72 self.assertEqual(msg[
"data"], msg2[
"data"])
73 self.assertEqual(msg2[
"data"], inst.data)
77 inst = ros_loader.get_message_instance(msgtype)
78 c.populate_instance(msg, inst)
80 extracted = c.extract_values(inst)
81 for msg2
in [extracted,
loads(
dumps(extracted))]:
86 for msg
in range(-100, 100):
87 for rostype
in [
"int8",
"int16",
"int32",
"int64"]:
88 self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
89 self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
91 for msg
in range(0, 200):
92 for rostype
in [
"uint8",
"uint16",
"uint32",
"uint64"]:
93 self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
94 self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
97 self.assertTrue(c._to_primitive_inst(
True,
"bool",
"bool", []))
98 self.assertTrue(c._to_inst(
True,
"bool",
"bool"))
99 self.assertFalse(c._to_primitive_inst(
False,
"bool",
"bool", []))
100 self.assertFalse(c._to_inst(
False,
"bool",
"bool"))
103 for msg
in [0.12341234 + i
for i
in range(-100, 100)]:
104 for rostype
in [
"float32",
"float64"]:
105 self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
106 self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
107 c._to_inst(msg, rostype, rostype)
110 for msg
in [1e9999999, -1e9999999, float(
'nan')]:
111 for rostype
in [
"float32",
"float64"]:
112 self.assertEqual(c._from_inst(msg, rostype),
None)
113 self.assertEqual(
dumps({
"data":c._from_inst(msg, rostype)}),
"{\"data\": null}")
116 int8s = range(-127, 128)
124 int16s = [-32767, 32767]
132 int32s = [-2147483647, 2147483647]
140 int64s = [-9223372036854775807, 9223372036854775807]
149 int8s = range(0, 256)
157 int16s = [32767, 32768, 65535]
165 int32s = [2147483647, 2147483648, 4294967295]
173 int64s = [4294967296, 9223372036854775807, 9223372036854775808,
174 18446744073709551615]
187 for x
in c.ros_primitive_types:
191 msg = {
"data": {
"secs": 3,
"nsecs": 5}}
192 self.
do_test(msg,
"std_msgs/Time")
194 msg = {
"times": [{
"secs": 3,
"nsecs": 5}, {
"secs": 2,
"nsecs": 7}]}
195 self.
do_test(msg,
"rosbridge_library/TestTimeArray")
198 msg = {
"data":
"now"}
199 msgtype =
"std_msgs/Time"
201 inst = ros_loader.get_message_instance(msgtype)
202 c.populate_instance(msg, inst)
203 currenttime = rospy.get_rostime()
205 extracted = c.extract_values(inst)
207 self.assertIn(
"data", extracted)
208 self.assertIn(
"secs", extracted[
"data"])
209 self.assertIn(
"nsecs", extracted[
"data"])
210 self.assertNotEqual(extracted[
"data"][
"secs"], 0)
211 self.assertLessEqual(extracted[
"data"][
"secs"], currenttime.secs)
212 self.assertGreaterEqual(currenttime.secs, extracted[
"data"][
"secs"])
215 msg = {
"data": {
"secs": 3,
"nsecs": 5}}
216 self.
do_test(msg,
"std_msgs/Duration")
218 msg = {
"durations": [{
"secs": 3,
"nsecs": 5}, {
"secs": 2,
"nsecs": 7}]}
219 self.
do_test(msg,
"rosbridge_library/TestDurationArray")
222 msg = {
"seq": 5,
"stamp": {
"secs": 12347,
"nsecs": 322304},
"frame_id":
"2394dnfnlcx;v[p234j]"}
223 self.
do_test(msg,
"std_msgs/Header")
225 msg = {
"header": msg}
226 self.
do_test(msg,
"rosbridge_library/TestHeader")
227 self.
do_test(msg,
"rosbridge_library/TestHeaderTwo")
229 msg = {
"header": [msg[
"header"], msg[
"header"], msg[
"header"]]}
230 msg[
"header"][1][
"seq"] = 6
231 msg[
"header"][2][
"seq"] = 7
232 self.
do_test(msg,
"rosbridge_library/TestHeaderArray")
235 assortedmsgs = [
"geometry_msgs/Pose",
"actionlib_msgs/GoalStatus",
236 "geometry_msgs/WrenchStamped",
"stereo_msgs/DisparityImage",
237 "nav_msgs/OccupancyGrid",
"geometry_msgs/Point32",
"std_msgs/String",
238 "trajectory_msgs/JointTrajectoryPoint",
"diagnostic_msgs/KeyValue",
239 "visualization_msgs/InteractiveMarkerUpdate",
"nav_msgs/GridCells",
240 "sensor_msgs/PointCloud2"]
241 for rostype
in assortedmsgs:
242 inst = ros_loader.get_message_instance(rostype)
243 msg = c.extract_values(inst)
246 inst2 = ros_loader.get_message_instance(rostype)
247 c.populate_instance(msg, inst2)
248 self.assertEqual(inst, inst2)
251 def test_int8_msg(rostype, data):
253 inst = ros_loader.get_message_instance(rostype)
254 c.populate_instance(msg, inst)
258 for msgtype
in [
"TestChar",
"TestUInt8"]:
259 rostype =
"rosbridge_library/" + msgtype
261 int8s = list(range(0, 256))
262 ret = test_int8_msg(rostype, int8s)
263 self.assertEqual(ret, bytes(bytearray(int8s)))
265 str_int8s = bytes(bytearray(int8s))
267 b64str_int8s = standard_b64encode(str_int8s).
decode(
'ascii')
268 ret = test_int8_msg(rostype, b64str_int8s)
269 self.assertEqual(ret, str_int8s)
271 for msgtype
in [
"TestUInt8FixedSizeArray16"]:
272 rostype =
"rosbridge_library/" + msgtype
274 int8s = list(range(0, 16))
275 ret = test_int8_msg(rostype, int8s)
276 self.assertEqual(ret, bytes(bytearray(int8s)))
278 str_int8s = bytes(bytearray(int8s))
280 b64str_int8s = standard_b64encode(str_int8s).
decode(
'ascii')
281 ret = test_int8_msg(rostype, b64str_int8s)
282 self.assertEqual(ret, str_int8s)
285 if __name__ ==
'__main__':
286 rosunit.unitrun(PKG, NAME, TestMessageConversion)