test_message_conversion.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from __future__ import print_function
3 import sys
4 import rospy
5 import rosunit
6 import unittest
7 from json import loads, dumps
8 
9 try:
10  from cStringIO import StringIO # Python 2.x
11 except ImportError:
12  from io import BytesIO as StringIO # Python 3.x
13 
14 from rosbridge_library.internal import message_conversion as c
15 from rosbridge_library.internal import ros_loader
16 from base64 import standard_b64encode, standard_b64decode
17 
18 if sys.version_info >= (3, 0):
19  string_types = (str,)
20 else:
21  string_types = (str, unicode) # noqa: F821
22 
23 
24 PKG = 'rosbridge_library'
25 NAME = 'test_message_conversion'
26 
27 
28 class TestMessageConversion(unittest.TestCase):
29 
30  def setUp(self):
31  rospy.init_node(NAME)
32 
33  def validate_instance(self, inst1):
34  """ Serializes and deserializes the inst to typecheck and ensure that
35  instances are correct """
36  # _check_types() skipped because: https://github.com/ros/genpy/issues/122
37  #inst1._check_types()
38  buff = StringIO()
39  inst1.serialize(buff)
40  inst2 = type(inst1)()
41  inst2.deserialize(buff.getvalue())
42  self.assertEqual(inst1, inst2)
43  #inst2._check_types()
44 
45  def msgs_equal(self, msg1, msg2):
46  if type(msg1) in string_types and type(msg2) in string_types:
47  pass
48  else:
49  self.assertEqual(type(msg1), type(msg2))
50  if type(msg1) in c.list_types:
51  for x, y in zip(msg1, msg2):
52  self.msgs_equal(x, y)
53  elif type(msg1) in c.primitive_types or type(msg1) in c.string_types:
54  self.assertEqual(msg1, msg2)
55  else:
56  for x in msg1:
57  self.assertTrue(x in msg2)
58  for x in msg2:
59  self.assertTrue(x in msg1)
60  for x in msg1:
61  self.msgs_equal(msg1[x], msg2[x])
62 
63  def do_primitive_test(self, data_value, msgtype):
64  for msg in [{"data": data_value}, loads(dumps({"data": data_value}))]:
65  inst = ros_loader.get_message_instance(msgtype)
66  c.populate_instance(msg, inst)
67  self.assertEqual(inst.data, data_value)
68  self.validate_instance(inst)
69  extracted = c.extract_values(inst)
70  for msg2 in [extracted, loads(dumps(extracted))]:
71  self.msgs_equal(msg, msg2)
72  self.assertEqual(msg["data"], msg2["data"])
73  self.assertEqual(msg2["data"], inst.data)
74 
75  def do_test(self, orig_msg, msgtype):
76  for msg in [orig_msg, loads(dumps(orig_msg))]:
77  inst = ros_loader.get_message_instance(msgtype)
78  c.populate_instance(msg, inst)
79  self.validate_instance(inst)
80  extracted = c.extract_values(inst)
81  for msg2 in [extracted, loads(dumps(extracted))]:
82  self.msgs_equal(msg, msg2)
83 
85  # Test raw primitives
86  for msg in range(-100, 100):
87  for rostype in ["int8", "int16", "int32", "int64"]:
88  self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
89  self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
90  # Test raw primitives
91  for msg in range(0, 200):
92  for rostype in ["uint8", "uint16", "uint32", "uint64"]:
93  self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
94  self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
95 
97  self.assertTrue(c._to_primitive_inst(True, "bool", "bool", []))
98  self.assertTrue(c._to_inst(True, "bool", "bool"))
99  self.assertFalse(c._to_primitive_inst(False, "bool", "bool", []))
100  self.assertFalse(c._to_inst(False, "bool", "bool"))
101 
103  for msg in [0.12341234 + i for i in range(-100, 100)]:
104  for rostype in ["float32", "float64"]:
105  self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
106  self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
107  c._to_inst(msg, rostype, rostype)
108 
110  for msg in [1e9999999, -1e9999999, float('nan')]:
111  for rostype in ["float32", "float64"]:
112  self.assertEqual(c._from_inst(msg, rostype), None)
113  self.assertEqual(dumps({"data":c._from_inst(msg, rostype)}), "{\"data\": null}")
114 
116  int8s = range(-127, 128)
117  for int8 in int8s:
118  self.do_primitive_test(int8, "std_msgs/Byte")
119  self.do_primitive_test(int8, "std_msgs/Int8")
120  self.do_primitive_test(int8, "std_msgs/Int16")
121  self.do_primitive_test(int8, "std_msgs/Int32")
122  self.do_primitive_test(int8, "std_msgs/Int64")
123 
124  int16s = [-32767, 32767]
125  for int16 in int16s:
126  self.do_primitive_test(int16, "std_msgs/Int16")
127  self.do_primitive_test(int16, "std_msgs/Int32")
128  self.do_primitive_test(int16, "std_msgs/Int64")
129  self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/Byte")
130  self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/Int8")
131 
132  int32s = [-2147483647, 2147483647]
133  for int32 in int32s:
134  self.do_primitive_test(int32, "std_msgs/Int32")
135  self.do_primitive_test(int32, "std_msgs/Int64")
136  self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Byte")
137  self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Int8")
138  self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Int16")
139 
140  int64s = [-9223372036854775807, 9223372036854775807]
141  for int64 in int64s:
142  self.do_primitive_test(int64, "std_msgs/Int64")
143  self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Byte")
144  self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int8")
145  self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int16")
146  self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int32")
147 
149  int8s = range(0, 256)
150  for int8 in int8s:
151  self.do_primitive_test(int8, "std_msgs/Char")
152  self.do_primitive_test(int8, "std_msgs/UInt8")
153  self.do_primitive_test(int8, "std_msgs/UInt16")
154  self.do_primitive_test(int8, "std_msgs/UInt32")
155  self.do_primitive_test(int8, "std_msgs/UInt64")
156 
157  int16s = [32767, 32768, 65535]
158  for int16 in int16s:
159  self.do_primitive_test(int16, "std_msgs/UInt16")
160  self.do_primitive_test(int16, "std_msgs/UInt32")
161  self.do_primitive_test(int16, "std_msgs/UInt64")
162  self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/Char")
163  self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/UInt8")
164 
165  int32s = [2147483647, 2147483648, 4294967295]
166  for int32 in int32s:
167  self.do_primitive_test(int32, "std_msgs/UInt32")
168  self.do_primitive_test(int32, "std_msgs/UInt64")
169  self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Char")
170  self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/UInt8")
171  self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/UInt16")
172 
173  int64s = [4294967296, 9223372036854775807, 9223372036854775808,
174  18446744073709551615]
175  for int64 in int64s:
176  self.do_primitive_test(int64, "std_msgs/UInt64")
177  self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Char")
178  self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/UInt8")
179  self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/UInt16")
180  self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/UInt32")
181 
183  self.do_primitive_test(True, "std_msgs/Bool")
184  self.do_primitive_test(False, "std_msgs/Bool")
185 
187  for x in c.ros_primitive_types:
188  self.do_primitive_test(x, "std_msgs/String")
189 
190  def test_time_msg(self):
191  msg = {"data": {"secs": 3, "nsecs": 5}}
192  self.do_test(msg, "std_msgs/Time")
193 
194  msg = {"times": [{"secs": 3, "nsecs": 5}, {"secs": 2, "nsecs": 7}]}
195  self.do_test(msg, "rosbridge_library/TestTimeArray")
196 
197  def test_time_msg_now(self):
198  msg = {"data": "now"}
199  msgtype = "std_msgs/Time"
200 
201  inst = ros_loader.get_message_instance(msgtype)
202  c.populate_instance(msg, inst)
203  currenttime = rospy.get_rostime()
204  self.validate_instance(inst)
205  extracted = c.extract_values(inst)
206  print(extracted)
207  self.assertIn("data", extracted)
208  self.assertIn("secs", extracted["data"])
209  self.assertIn("nsecs", extracted["data"])
210  self.assertNotEqual(extracted["data"]["secs"], 0)
211  self.assertLessEqual(extracted["data"]["secs"], currenttime.secs)
212  self.assertGreaterEqual(currenttime.secs, extracted["data"]["secs"])
213 
214  def test_duration_msg(self):
215  msg = {"data": {"secs": 3, "nsecs": 5}}
216  self.do_test(msg, "std_msgs/Duration")
217 
218  msg = {"durations": [{"secs": 3, "nsecs": 5}, {"secs": 2, "nsecs": 7}]}
219  self.do_test(msg, "rosbridge_library/TestDurationArray")
220 
221  def test_header_msg(self):
222  msg = {"seq": 5, "stamp": {"secs": 12347, "nsecs": 322304}, "frame_id": "2394dnfnlcx;v[p234j]"}
223  self.do_test(msg, "std_msgs/Header")
224 
225  msg = {"header": msg}
226  self.do_test(msg, "rosbridge_library/TestHeader")
227  self.do_test(msg, "rosbridge_library/TestHeaderTwo")
228 
229  msg = {"header": [msg["header"], msg["header"], msg["header"]]}
230  msg["header"][1]["seq"] = 6
231  msg["header"][2]["seq"] = 7
232  self.do_test(msg, "rosbridge_library/TestHeaderArray")
233 
235  assortedmsgs = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
236  "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
237  "nav_msgs/OccupancyGrid", "geometry_msgs/Point32", "std_msgs/String",
238  "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
239  "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
240  "sensor_msgs/PointCloud2"]
241  for rostype in assortedmsgs:
242  inst = ros_loader.get_message_instance(rostype)
243  msg = c.extract_values(inst)
244  self.do_test(msg, rostype)
245  l = loads(dumps(msg))
246  inst2 = ros_loader.get_message_instance(rostype)
247  c.populate_instance(msg, inst2)
248  self.assertEqual(inst, inst2)
249 
250  def test_int8array(self):
251  def test_int8_msg(rostype, data):
252  msg = {"data": data}
253  inst = ros_loader.get_message_instance(rostype)
254  c.populate_instance(msg, inst)
255  self.validate_instance(inst)
256  return inst.data
257 
258  for msgtype in ["TestChar", "TestUInt8"]:
259  rostype = "rosbridge_library/" + msgtype
260 
261  int8s = list(range(0, 256))
262  ret = test_int8_msg(rostype, int8s)
263  self.assertEqual(ret, bytes(bytearray(int8s)))
264 
265  str_int8s = bytes(bytearray(int8s))
266 
267  b64str_int8s = standard_b64encode(str_int8s).decode('ascii')
268  ret = test_int8_msg(rostype, b64str_int8s)
269  self.assertEqual(ret, str_int8s)
270 
271  for msgtype in ["TestUInt8FixedSizeArray16"]:
272  rostype = "rosbridge_library/" + msgtype
273 
274  int8s = list(range(0, 16))
275  ret = test_int8_msg(rostype, int8s)
276  self.assertEqual(ret, bytes(bytearray(int8s)))
277 
278  str_int8s = bytes(bytearray(int8s))
279 
280  b64str_int8s = standard_b64encode(str_int8s).decode('ascii')
281  ret = test_int8_msg(rostype, b64str_int8s)
282  self.assertEqual(ret, str_int8s)
283 
284 
285 if __name__ == '__main__':
286  rosunit.unitrun(PKG, NAME, TestMessageConversion)
def dumps(ob, sort_keys=False)
Definition: cbor.py:223


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Fri Oct 21 2022 02:45:18