test_message_conversion.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 from __future__ import print_function
3 import sys
4 import rospy
5 import rostest
6 import unittest
7 from json import loads, dumps
8 
9 try:
10  from cStringIO import StringIO # Python 2.x
11 except ImportError:
12  from io import BytesIO as StringIO # Python 3.x
13 
14 from rosbridge_library.internal import message_conversion as c
15 from rosbridge_library.internal import ros_loader
16 from base64 import standard_b64encode, standard_b64decode
17 
18 if sys.version_info >= (3, 0):
19  string_types = (str,)
20 else:
21  string_types = (str, unicode)
22 
23 
24 class TestMessageConversion(unittest.TestCase):
25 
26  def setUp(self):
27  rospy.init_node("test_message_conversion")
28 
29  def validate_instance(self, inst1):
30  """ Serializes and deserializes the inst to typecheck and ensure that
31  instances are correct """
32  inst1._check_types()
33  buff = StringIO()
34  inst1.serialize(buff)
35  inst2 = type(inst1)()
36  inst2.deserialize(buff.getvalue())
37  self.assertEqual(inst1, inst2)
38  inst2._check_types()
39 
40  def msgs_equal(self, msg1, msg2):
41  if type(msg1) in string_types and type(msg2) in string_types:
42  pass
43  else:
44  self.assertEqual(type(msg1), type(msg2))
45  if type(msg1) in c.list_types:
46  for x, y in zip(msg1, msg2):
47  self.msgs_equal(x, y)
48  elif type(msg1) in c.primitive_types or type(msg1) in c.string_types:
49  self.assertEqual(msg1, msg2)
50  else:
51  for x in msg1:
52  self.assertTrue(x in msg2)
53  for x in msg2:
54  self.assertTrue(x in msg1)
55  for x in msg1:
56  self.msgs_equal(msg1[x], msg2[x])
57 
58  def do_primitive_test(self, data_value, msgtype):
59  for msg in [{"data": data_value}, loads(dumps({"data": data_value}))]:
60  inst = ros_loader.get_message_instance(msgtype)
61  c.populate_instance(msg, inst)
62  self.assertEqual(inst.data, data_value)
63  self.validate_instance(inst)
64  extracted = c.extract_values(inst)
65  for msg2 in [extracted, loads(dumps(extracted))]:
66  self.msgs_equal(msg, msg2)
67  self.assertEqual(msg["data"], msg2["data"])
68  self.assertEqual(msg2["data"], inst.data)
69 
70  def do_test(self, orig_msg, msgtype):
71  for msg in [orig_msg, loads(dumps(orig_msg))]:
72  inst = ros_loader.get_message_instance(msgtype)
73  c.populate_instance(msg, inst)
74  self.validate_instance(inst)
75  extracted = c.extract_values(inst)
76  for msg2 in [extracted, loads(dumps(extracted))]:
77  self.msgs_equal(msg, msg2)
78 
80  # Test raw primitives
81  for msg in range(-100, 100):
82  for rostype in ["int8", "int16", "int32", "int64"]:
83  self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
84  self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
85  # Test raw primitives
86  for msg in range(0, 200):
87  for rostype in ["uint8", "uint16", "uint32", "uint64"]:
88  self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
89  self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
90 
92  self.assertTrue(c._to_primitive_inst(True, "bool", "bool", []))
93  self.assertTrue(c._to_inst(True, "bool", "bool"))
94  self.assertFalse(c._to_primitive_inst(False, "bool", "bool", []))
95  self.assertFalse(c._to_inst(False, "bool", "bool"))
96 
98  for msg in [0.12341234 + i for i in range(-100, 100)]:
99  for rostype in ["float32", "float64"]:
100  self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
101  self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
102  c._to_inst(msg, rostype, rostype)
103 
105  for msg in [1e9999999, -1e9999999, float('nan')]:
106  for rostype in ["float32", "float64"]:
107  self.assertEqual(c._from_inst(msg, rostype), None)
108  self.assertEqual(dumps({"data":c._from_inst(msg, rostype)}), "{\"data\": null}")
109 
111  int8s = range(-127, 128)
112  for int8 in int8s:
113  self.do_primitive_test(int8, "std_msgs/Byte")
114  self.do_primitive_test(int8, "std_msgs/Int8")
115  self.do_primitive_test(int8, "std_msgs/Int16")
116  self.do_primitive_test(int8, "std_msgs/Int32")
117  self.do_primitive_test(int8, "std_msgs/Int64")
118 
119  int16s = [-32767, 32767]
120  for int16 in int16s:
121  self.do_primitive_test(int16, "std_msgs/Int16")
122  self.do_primitive_test(int16, "std_msgs/Int32")
123  self.do_primitive_test(int16, "std_msgs/Int64")
124  self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/Byte")
125  self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/Int8")
126 
127  int32s = [-2147483647, 2147483647]
128  for int32 in int32s:
129  self.do_primitive_test(int32, "std_msgs/Int32")
130  self.do_primitive_test(int32, "std_msgs/Int64")
131  self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Byte")
132  self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Int8")
133  self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Int16")
134 
135  int64s = [-9223372036854775807, 9223372036854775807]
136  for int64 in int64s:
137  self.do_primitive_test(int64, "std_msgs/Int64")
138  self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Byte")
139  self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int8")
140  self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int16")
141  self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Int32")
142 
144  int8s = range(0, 256)
145  for int8 in int8s:
146  self.do_primitive_test(int8, "std_msgs/Char")
147  self.do_primitive_test(int8, "std_msgs/UInt8")
148  self.do_primitive_test(int8, "std_msgs/UInt16")
149  self.do_primitive_test(int8, "std_msgs/UInt32")
150  self.do_primitive_test(int8, "std_msgs/UInt64")
151 
152  int16s = [32767, 32768, 65535]
153  for int16 in int16s:
154  self.do_primitive_test(int16, "std_msgs/UInt16")
155  self.do_primitive_test(int16, "std_msgs/UInt32")
156  self.do_primitive_test(int16, "std_msgs/UInt64")
157  self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/Char")
158  self.assertRaises(Exception, self.do_primitive_test, int16, "std_msgs/UInt8")
159 
160  int32s = [2147483647, 2147483648, 4294967295]
161  for int32 in int32s:
162  self.do_primitive_test(int32, "std_msgs/UInt32")
163  self.do_primitive_test(int32, "std_msgs/UInt64")
164  self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/Char")
165  self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/UInt8")
166  self.assertRaises(Exception, self.do_primitive_test, int32, "std_msgs/UInt16")
167 
168  int64s = [4294967296, 9223372036854775807, 9223372036854775808,
169  18446744073709551615]
170  for int64 in int64s:
171  self.do_primitive_test(int64, "std_msgs/UInt64")
172  self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/Char")
173  self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/UInt8")
174  self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/UInt16")
175  self.assertRaises(Exception, self.do_primitive_test, int64, "std_msgs/UInt32")
176 
178  self.do_primitive_test(True, "std_msgs/Bool")
179  self.do_primitive_test(False, "std_msgs/Bool")
180 
182  for x in c.ros_primitive_types:
183  self.do_primitive_test(x, "std_msgs/String")
184 
185  def test_time_msg(self):
186  msg = {"data": {"secs": 3, "nsecs": 5}}
187  self.do_test(msg, "std_msgs/Time")
188 
189  msg = {"times": [{"secs": 3, "nsecs": 5}, {"secs": 2, "nsecs": 7}]}
190  self.do_test(msg, "rosbridge_library/TestTimeArray")
191 
192  def test_time_msg_now(self):
193  msg = {"data": "now"}
194  msgtype = "std_msgs/Time"
195 
196  inst = ros_loader.get_message_instance(msgtype)
197  c.populate_instance(msg, inst)
198  currenttime = rospy.get_rostime()
199  self.validate_instance(inst)
200  extracted = c.extract_values(inst)
201  print(extracted)
202  self.assertIn("data", extracted)
203  self.assertIn("secs", extracted["data"])
204  self.assertIn("nsecs", extracted["data"])
205  self.assertNotEqual(extracted["data"]["secs"], 0)
206  self.assertLessEqual(extracted["data"]["secs"], currenttime.secs)
207  self.assertGreaterEqual(currenttime.secs, extracted["data"]["secs"])
208 
209  def test_duration_msg(self):
210  msg = {"data": {"secs": 3, "nsecs": 5}}
211  self.do_test(msg, "std_msgs/Duration")
212 
213  msg = {"durations": [{"secs": 3, "nsecs": 5}, {"secs": 2, "nsecs": 7}]}
214  self.do_test(msg, "rosbridge_library/TestDurationArray")
215 
216  def test_header_msg(self):
217  msg = {"seq": 5, "stamp": {"secs": 12347, "nsecs": 322304}, "frame_id": "2394dnfnlcx;v[p234j]"}
218  self.do_test(msg, "std_msgs/Header")
219 
220  msg = {"header": msg}
221  self.do_test(msg, "rosbridge_library/TestHeader")
222  self.do_test(msg, "rosbridge_library/TestHeaderTwo")
223 
224  msg = {"header": [msg["header"], msg["header"], msg["header"]]}
225  msg["header"][1]["seq"] = 6
226  msg["header"][2]["seq"] = 7
227  self.do_test(msg, "rosbridge_library/TestHeaderArray")
228 
230  assortedmsgs = ["geometry_msgs/Pose", "actionlib_msgs/GoalStatus",
231  "geometry_msgs/WrenchStamped", "stereo_msgs/DisparityImage",
232  "nav_msgs/OccupancyGrid", "geometry_msgs/Point32", "std_msgs/String",
233  "trajectory_msgs/JointTrajectoryPoint", "diagnostic_msgs/KeyValue",
234  "visualization_msgs/InteractiveMarkerUpdate", "nav_msgs/GridCells",
235  "sensor_msgs/PointCloud2"]
236  for rostype in assortedmsgs:
237  inst = ros_loader.get_message_instance(rostype)
238  msg = c.extract_values(inst)
239  self.do_test(msg, rostype)
240  l = loads(dumps(msg))
241  inst2 = ros_loader.get_message_instance(rostype)
242  c.populate_instance(msg, inst2)
243  self.assertEqual(inst, inst2)
244 
245  def test_int8array(self):
246  def test_int8_msg(rostype, data):
247  msg = {"data": data}
248  inst = ros_loader.get_message_instance(rostype)
249  c.populate_instance(msg, inst)
250  self.validate_instance(inst)
251  return inst.data
252 
253  for msgtype in ["TestChar", "TestUInt8"]:
254  rostype = "rosbridge_library/" + msgtype
255 
256  int8s = list(range(0, 256))
257  ret = test_int8_msg(rostype, int8s)
258  self.assertEqual(ret, bytes(bytearray(int8s)))
259 
260  str_int8s = bytes(bytearray(int8s))
261 
262  b64str_int8s = standard_b64encode(str_int8s).decode('ascii')
263  ret = test_int8_msg(rostype, b64str_int8s)
264  self.assertEqual(ret, str_int8s)
265 
266  for msgtype in ["TestUInt8FixedSizeArray16"]:
267  rostype = "rosbridge_library/" + msgtype
268 
269  int8s = list(range(0, 16))
270  ret = test_int8_msg(rostype, int8s)
271  self.assertEqual(ret, bytes(bytearray(int8s)))
272 
273  str_int8s = bytes(bytearray(int8s))
274 
275  b64str_int8s = standard_b64encode(str_int8s).decode('ascii')
276  ret = test_int8_msg(rostype, b64str_int8s)
277  self.assertEqual(ret, str_int8s)
278 
279 
280 PKG = 'rosbridge_library'
281 NAME = 'test_message_conversion'
282 if __name__ == '__main__':
283  rostest.unitrun(PKG, NAME, TestMessageConversion)
def dumps(ob, sort_keys=False)
Definition: cbor.py:223


rosbridge_library
Author(s): Jonathan Mace
autogenerated on Wed Jun 3 2020 03:55:14