test_message_converter.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import numpy as np
3 import struct
4 import sys
5 import unittest
6 
7 import rospy
8 from rospy.exceptions import ROSInitException
9 from rospy_message_converter import message_converter
10 
11 python3 = (sys.hexversion > 0x03000000)
12 
13 class TestMessageConverter(unittest.TestCase):
14 
16  from rospy_message_converter.msg import TestArray
17  expected_dictionary = {
18  'data': [1.1, 2.2, 3.3]
19  }
20  message = TestArray(data = expected_dictionary['data'])
21  message = serialize_deserialize(message)
22  dictionary = message_converter.convert_ros_message_to_dictionary(message)
23  self.assertEqual(dictionary, expected_dictionary)
24 
26  from std_msgs.msg import Bool
27  expected_dictionary = { 'data': True }
28  message = Bool(data=expected_dictionary['data'])
29  message = serialize_deserialize(message)
30  dictionary = message_converter.convert_ros_message_to_dictionary(message)
31  self.assertEqual(dictionary, expected_dictionary)
32 
34  from std_msgs.msg import Byte
35  expected_dictionary = { 'data': 5 }
36  message = Byte(data=expected_dictionary['data'])
37  message = serialize_deserialize(message)
38  dictionary = message_converter.convert_ros_message_to_dictionary(message)
39  self.assertEqual(dictionary, expected_dictionary)
40 
42  from std_msgs.msg import Char
43  expected_dictionary = { 'data': 99 }
44  message = Char(data=expected_dictionary['data'])
45  message = serialize_deserialize(message)
46  dictionary = message_converter.convert_ros_message_to_dictionary(message)
47  self.assertEqual(dictionary, expected_dictionary)
48 
50  from std_msgs.msg import Duration
51  duration = rospy.rostime.Duration(33, 25)
52  expected_dictionary = {
53  'data': {
54  'secs' : duration.secs,
55  'nsecs' : duration.nsecs
56  }
57  }
58  message = Duration(data=duration)
59  message = serialize_deserialize(message)
60  dictionary = message_converter.convert_ros_message_to_dictionary(message)
61  self.assertEqual(dictionary, expected_dictionary)
62 
64  from std_msgs.msg import Empty
65  expected_dictionary = {}
66  message = Empty()
67  message = serialize_deserialize(message)
68  dictionary = message_converter.convert_ros_message_to_dictionary(message)
69  self.assertEqual(dictionary, expected_dictionary)
70 
72  from std_msgs.msg import Float32
73  expected_dictionary = { 'data': struct.unpack('<f', b'\x7F\x7F\xFF\xFD')[0] }
74  message = Float32(data=expected_dictionary['data'])
75  message = serialize_deserialize(message)
76  dictionary = message_converter.convert_ros_message_to_dictionary(message)
77  self.assertEqual(dictionary, expected_dictionary)
78 
80  from std_msgs.msg import Float64
81  expected_dictionary = { 'data': struct.unpack('<d', b'\x7F\xEF\xFF\xFF\xFF\xFF\xFF\xFD')[0] }
82  message = Float64(data=expected_dictionary['data'])
83  message = serialize_deserialize(message)
84  dictionary = message_converter.convert_ros_message_to_dictionary(message)
85  self.assertEqual(dictionary, expected_dictionary)
86 
88  from std_msgs.msg import Header
89  from time import time
90  now_time = rospy.Time(time())
91  expected_dictionary = {
92  'stamp': { 'secs': now_time.secs, 'nsecs': now_time.nsecs },
93  'frame_id' : 'my_frame',
94  'seq': 3
95  }
96  message = Header(
97  stamp = now_time,
98  frame_id = expected_dictionary['frame_id'],
99  seq = expected_dictionary['seq']
100  )
101  message = serialize_deserialize(message)
102  dictionary = message_converter.convert_ros_message_to_dictionary(message)
103  self.assertEqual(dictionary, expected_dictionary)
104 
106  from std_msgs.msg import Int8
107  expected_dictionary = { 'data': -0x7F }
108  message = Int8(data=expected_dictionary['data'])
109  message = serialize_deserialize(message)
110  dictionary = message_converter.convert_ros_message_to_dictionary(message)
111  self.assertEqual(dictionary, expected_dictionary)
112 
114  from std_msgs.msg import UInt8
115  expected_dictionary = { 'data': 0xFF }
116  message = UInt8(data=expected_dictionary['data'])
117  message = serialize_deserialize(message)
118  dictionary = message_converter.convert_ros_message_to_dictionary(message)
119  self.assertEqual(dictionary, expected_dictionary)
120 
122  from rospy_message_converter.msg import Uint8ArrayTestMessage
123  from base64 import b64encode
124  expected_data = [97, 98, 99, 100]
125  message = Uint8ArrayTestMessage(data=expected_data)
126  message = serialize_deserialize(message)
127  dictionary = message_converter.convert_ros_message_to_dictionary(message)
128  expected_data = b64encode(bytearray(expected_data)).decode('utf-8')
129  self.assertEqual(dictionary["data"], expected_data)
130 
132  from rospy_message_converter.msg import Uint8Array3TestMessage
133  from base64 import b64encode
134  expected_data = [97, 98, 99]
135  message = Uint8Array3TestMessage(data=expected_data)
136  message = serialize_deserialize(message)
137  dictionary = message_converter.convert_ros_message_to_dictionary(message)
138  expected_data = b64encode(bytearray(expected_data)).decode('utf-8')
139  self.assertEqual(dictionary["data"], expected_data)
140 
142  from rospy_message_converter.msg import Uint8Array3TestMessage
143  expected_data = [97, 98, 99]
144  message = Uint8Array3TestMessage(data=expected_data)
145  message = serialize_deserialize(message)
146  dictionary = message_converter.convert_ros_message_to_dictionary(message, binary_array_as_bytes=False)
147  self.assertEqual(dictionary["data"], expected_data)
148 
150  from rospy_message_converter.msg import NestedUint8ArrayTestMessage, Uint8ArrayTestMessage
151  expected_data = [97, 98, 99]
152  message = NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)])
153  message = serialize_deserialize(message)
154  dictionary = message_converter.convert_ros_message_to_dictionary(message, binary_array_as_bytes=False)
155  self.assertEqual(dictionary["arrays"][0]["data"], expected_data)
156 
158  from rospy_message_converter.msg import NestedUint8ArrayTestMessage, Uint8ArrayTestMessage
159  from base64 import b64encode
160  expected_data = bytes(bytearray([97, 98, 99]))
161  message = NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)])
162  message = serialize_deserialize(message)
163  dictionary = message_converter.convert_ros_message_to_dictionary(message)
164  self.assertEqual(dictionary["arrays"][0]["data"], b64encode(expected_data).decode('utf-8'))
165 
167  from std_msgs.msg import Int16
168  expected_dictionary = { 'data': -0x7FFF }
169  message = Int16(data=expected_dictionary['data'])
170  message = serialize_deserialize(message)
171  dictionary = message_converter.convert_ros_message_to_dictionary(message)
172  self.assertEqual(dictionary, expected_dictionary)
173 
175  from std_msgs.msg import UInt16
176  expected_dictionary = { 'data': 0xFFFF }
177  message = UInt16(data=expected_dictionary['data'])
178  message = serialize_deserialize(message)
179  dictionary = message_converter.convert_ros_message_to_dictionary(message)
180  self.assertEqual(dictionary, expected_dictionary)
181 
183  from std_msgs.msg import Int32
184  expected_dictionary = { 'data': -0x7FFFFFFF }
185  message = Int32(data=expected_dictionary['data'])
186  message = serialize_deserialize(message)
187  dictionary = message_converter.convert_ros_message_to_dictionary(message)
188  self.assertEqual(dictionary, expected_dictionary)
189 
191  from std_msgs.msg import UInt32
192  expected_dictionary = { 'data': 0xFFFFFFFF }
193  message = UInt32(data=expected_dictionary['data'])
194  message = serialize_deserialize(message)
195  dictionary = message_converter.convert_ros_message_to_dictionary(message)
196  self.assertEqual(dictionary, expected_dictionary)
197 
199  from std_msgs.msg import Int64
200  expected_dictionary = { 'data': -0x7FFFFFFFFFFFFFFF }
201  message = Int64(data=expected_dictionary['data'])
202  message = serialize_deserialize(message)
203  dictionary = message_converter.convert_ros_message_to_dictionary(message)
204  self.assertEqual(dictionary, expected_dictionary)
205 
207  from std_msgs.msg import UInt64
208  expected_dictionary = { 'data': 0xFFFFFFFFFFFFFFFF }
209  message = UInt64(data=expected_dictionary['data'])
210  message = serialize_deserialize(message)
211  dictionary = message_converter.convert_ros_message_to_dictionary(message)
212  self.assertEqual(dictionary, expected_dictionary)
213 
215  from std_msgs.msg import String
216  expected_dictionary = { 'data': 'Hello' }
217  message = String(data=expected_dictionary['data'])
218  message = serialize_deserialize(message)
219  dictionary = message_converter.convert_ros_message_to_dictionary(message)
220  self.assertEqual(dictionary, expected_dictionary)
221 
223  """
224  Test that strings are encoded as utf8
225  """
226  from std_msgs.msg import String
227  expected_dictionary = { 'data': u'Hello \u00dcnicode' }
228  message = String(data=expected_dictionary['data'])
229  message = serialize_deserialize(message)
230  dictionary = message_converter.convert_ros_message_to_dictionary(message)
231  self.assertEqual(dictionary, expected_dictionary)
232 
234  from std_msgs.msg import Time
235  from time import time
236  now_time = rospy.Time(time())
237  expected_dictionary = {
238  'data': { 'secs': now_time.secs, 'nsecs': now_time.nsecs }
239  }
240  message = Time(data=now_time)
241  message = serialize_deserialize(message)
242  dictionary = message_converter.convert_ros_message_to_dictionary(message)
243  self.assertEqual(dictionary, expected_dictionary)
244 
246  from std_msgs.msg import Float64MultiArray, MultiArrayLayout, MultiArrayDimension
247  expected_dictionary = {
248  'layout': {
249  'dim': [
250  { 'label': 'Dimension1', 'size': 12, 'stride': 7 },
251  { 'label': 'Dimension2', 'size': 24, 'stride': 14 }
252  ],
253  'data_offset': 0
254  },
255  'data': [1.1, 2.2, 3.3]
256  }
257  dimension1 = MultiArrayDimension(
258  label = expected_dictionary['layout']['dim'][0]['label'],
259  size = expected_dictionary['layout']['dim'][0]['size'],
260  stride = expected_dictionary['layout']['dim'][0]['stride']
261  )
262  dimension2 = MultiArrayDimension(
263  label = expected_dictionary['layout']['dim'][1]['label'],
264  size = expected_dictionary['layout']['dim'][1]['size'],
265  stride = expected_dictionary['layout']['dim'][1]['stride']
266  )
267  layout = MultiArrayLayout(
268  dim = [dimension1, dimension2],
269  data_offset = expected_dictionary['layout']['data_offset']
270  )
271  message = Float64MultiArray(
272  layout = layout,
273  data = expected_dictionary['data']
274  )
275  message = serialize_deserialize(message)
276  dictionary = message_converter.convert_ros_message_to_dictionary(message)
277  self.assertEqual(dictionary, expected_dictionary)
278 
280  from std_srvs.srv import EmptyRequest, EmptyResponse
281  expected_dictionary_req = {}
282  expected_dictionary_res = {}
283  request = EmptyRequest()
284  request = serialize_deserialize(request)
285  response = EmptyResponse()
286  response = serialize_deserialize(response)
287  dictionary_req = message_converter.convert_ros_message_to_dictionary(request)
288  self.assertEqual(dictionary_req, expected_dictionary_req)
289  dictionary_res = message_converter.convert_ros_message_to_dictionary(response)
290  self.assertEqual(dictionary_res, expected_dictionary_res)
291 
293  from rospy_message_converter.srv import NestedUint8ArrayTestServiceRequest, NestedUint8ArrayTestServiceResponse
294  from rospy_message_converter.msg import NestedUint8ArrayTestMessage, Uint8ArrayTestMessage
295  from base64 import b64encode
296  expected_data = bytes(bytearray([97, 98, 99]))
297 
298  expected_dictionary_req = {"input": {"arrays": [{"data": b64encode(expected_data).decode('utf-8')}]}}
299  expected_dictionary_res = {"output": {"arrays": [{"data": b64encode(expected_data).decode('utf-8')}]}}
300  request = NestedUint8ArrayTestServiceRequest(
301  input=NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)]))
302  request = serialize_deserialize(request)
303  response = NestedUint8ArrayTestServiceResponse(
304  output=NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)]))
305  response = serialize_deserialize(response)
306 
307  dictionary_req = message_converter.convert_ros_message_to_dictionary(request)
308  self.assertEqual(dictionary_req, expected_dictionary_req)
309  dictionary_res = message_converter.convert_ros_message_to_dictionary(response)
310  self.assertEqual(dictionary_res, expected_dictionary_res)
311 
313  from rospy_message_converter.msg import TestArray
314  expected_message = TestArray(data = [1.1, 2.2, 3.3, 4.4])
315  dictionary = { 'data': expected_message.data }
316  message = message_converter.convert_dictionary_to_ros_message('rospy_message_converter/TestArray', dictionary)
317  expected_message = serialize_deserialize(expected_message)
318  self.assertEqual(message, expected_message)
319 
321  """
322  rospy treats uint8[] data as `bytes`, which is the Python representation for byte data. In Python 2, this is
323  the same as `str`. The `bytes` value must be base64-encoded.
324  """
325  from rospy_message_converter.msg import Uint8ArrayTestMessage
326  from base64 import b64encode
327  expected_message = Uint8ArrayTestMessage(data=bytes(bytearray([97, 98, 99])))
328  dictionary = {'data': b64encode(expected_message.data)} # base64 encoding
329  message = message_converter.convert_dictionary_to_ros_message('rospy_message_converter/Uint8ArrayTestMessage',
330  dictionary)
331  expected_message = serialize_deserialize(expected_message)
332  self.assertEqual(message, expected_message)
333 
335  """
336  Even though rospy treats uint8[] data as `bytes`, rospy_message_converter also handles lists of int. In that
337  case, the input data must *not* be base64-encoded.
338  """
339  from rospy_message_converter.msg import Uint8ArrayTestMessage
340  expected_message = Uint8ArrayTestMessage(data=[1, 2, 3, 4])
341  dictionary = {'data': expected_message.data} # no base64 encoding
342  message = message_converter.convert_dictionary_to_ros_message('rospy_message_converter/Uint8ArrayTestMessage',
343  dictionary)
344  expected_message = serialize_deserialize(expected_message)
345  self.assertEqual(message, expected_message)
346 
348  dictionary = {'data': [1, 2, 3, 4000]}
349  with self.assertRaises(ValueError) as context:
350  message_converter.convert_dictionary_to_ros_message('rospy_message_converter/Uint8ArrayTestMessage',
351  dictionary)
352  self.assertEqual('byte must be in range(0, 256)', context.exception.args[0])
353 
355  """
356  If the value of a uint8[] field has type `bytes`, rospy_message_converter expects that data to be
357  base64-encoded and runs b64decode on it. This test documents what happens if the value is
358  not base64-encoded.
359  """
360  from rospy_message_converter.msg import Uint8ArrayTestMessage
361  import binascii
362 
363  # this raises a TypeError, because:
364  # * b64decode removes all characters that are not in the standard alphabet ([A-Za-Z0-9+/])
365  # * this only leaves 97 (= 'a')
366  # * the length of a base64 string must be a multiple of 4 characters (if necessary, padded at the end with '=')
367  # * since the length of 'a' is not a multiple of 4, a TypeError is thrown
368  dictionary = {'data': bytes(bytearray([1, 2, 97, 4]))}
369  with self.assertRaises((TypeError, binascii.Error)) as context:
370  message_converter.convert_dictionary_to_ros_message('rospy_message_converter/Uint8ArrayTestMessage',
371  dictionary)
372  if type(context.exception) == TypeError: # python2
373  error_msg = context.exception.args[0].args[0]
374  else: # python3
375  error_msg = context.exception.args[0]
376  self.assertIn(error_msg, ['Incorrect padding', 'Non-base64 digit found'])
377 
378  dictionary = {'data': bytes(bytearray([1, 97, 97, 2, 3, 97, 4, 97]))}
379  if python3:
380  # On python3, we validate the input, so an error is raised.
381  with self.assertRaises(binascii.Error) as context:
382  message_converter.convert_dictionary_to_ros_message('rospy_message_converter/Uint8ArrayTestMessage',
383  dictionary)
384  self.assertEqual('Non-base64 digit found', context.exception.args[0])
385  else:
386  # if the dictionary contains a multiple of 4 characters from the standard alphabet, no error is raised
387  # (but the result is garbage).
388  message = message_converter.convert_dictionary_to_ros_message('rospy_message_converter/Uint8ArrayTestMessage',
389  dictionary)
390  expected_message = serialize_deserialize(Uint8ArrayTestMessage(data=bytes(bytearray([105, 166, 154]))))
391  self.assertEqual(message, expected_message)
392 
394  from rospy_message_converter.msg import Uint8Array3TestMessage
395  from base64 import b64encode
396  expected_message = Uint8Array3TestMessage(data=bytes(bytearray([97, 98, 99])))
397  dictionary = {'data': b64encode(expected_message.data)}
398  message = message_converter.convert_dictionary_to_ros_message('rospy_message_converter/Uint8Array3TestMessage', dictionary)
399  expected_message = serialize_deserialize(expected_message)
400  self.assertEqual(message, expected_message)
401 
403  from rospy_message_converter.msg import Uint8Array3TestMessage
404  expected_message = Uint8Array3TestMessage(data=[97, 98, 99])
405  dictionary = {'data': expected_message.data}
406  message = message_converter.convert_dictionary_to_ros_message('rospy_message_converter/Uint8Array3TestMessage', dictionary)
407  expected_message = serialize_deserialize(expected_message)
408  self.assertEqual(message, expected_message)
409 
411  from std_msgs.msg import Bool
412  expected_message = Bool(data = True)
413  dictionary = { 'data': expected_message.data }
414  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Bool', dictionary)
415  expected_message = serialize_deserialize(expected_message)
416  self.assertEqual(message, expected_message)
417 
419  from std_msgs.msg import Byte
420  expected_message = Byte(data = 3)
421  dictionary = { 'data': expected_message.data }
422  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Byte', dictionary)
423  expected_message = serialize_deserialize(expected_message)
424  self.assertEqual(message, expected_message)
425 
427  from std_msgs.msg import Char
428  expected_message = Char(data = 99)
429  dictionary = { 'data': expected_message.data }
430  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Char', dictionary)
431  expected_message = serialize_deserialize(expected_message)
432  self.assertEqual(message, expected_message)
433 
435  from std_msgs.msg import Duration
436  duration = rospy.rostime.Duration(33, 25)
437  expected_message = Duration(data = duration)
438  dictionary = {
439  'data': {
440  'secs' : duration.secs,
441  'nsecs' : duration.nsecs
442  }
443  }
444  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Duration', dictionary)
445  expected_message = serialize_deserialize(expected_message)
446  self.assertEqual(message, expected_message)
447 
449  from std_msgs.msg import Empty
450  expected_message = Empty()
451  dictionary = {}
452  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Empty', dictionary)
453  expected_message = serialize_deserialize(expected_message)
454  self.assertEqual(message, expected_message)
455 
457  dictionary = {"additional_args": "should raise value error"}
458  with self.assertRaises(ValueError) as context:
459  message_converter.convert_dictionary_to_ros_message('std_msgs/Empty', dictionary)
460  self.assertEqual('''ROS message type "std_msgs/Empty" has no field named "additional_args"''',
461  context.exception.args[0])
462 
464  from std_msgs.msg import Empty
465  expected_message = Empty()
466  dictionary = {"additional_args": "should be ignored"}
467  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Empty', dictionary, strict_mode=False)
468  expected_message = serialize_deserialize(expected_message)
469  self.assertEqual(message, expected_message)
470 
472  from base64 import b64encode
473  expected_data = bytes(bytearray([97, 98, 99]))
474  dictionary = {"arrays": [{"data": b64encode(expected_data), "additional_args": "should raise value error"}]}
475  with self.assertRaises(ValueError) as context:
476  message_converter.convert_dictionary_to_ros_message('rospy_message_converter/NestedUint8ArrayTestMessage',
477  dictionary)
478  self.assertEqual(
479  'ROS message type "rospy_message_converter/Uint8ArrayTestMessage" has no field named "additional_args"',
480  context.exception.args[0])
481 
483  from rospy_message_converter.msg import NestedUint8ArrayTestMessage, Uint8ArrayTestMessage
484  from base64 import b64encode
485  expected_data = bytes(bytearray([97, 98, 99]))
486  expected_message = NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)])
487  dictionary = {"arrays": [{"data": b64encode(expected_data), "additional_args": "should be ignored"}]}
488  message = message_converter.convert_dictionary_to_ros_message(
489  'rospy_message_converter/NestedUint8ArrayTestMessage', dictionary, strict_mode=False)
490  expected_message = serialize_deserialize(expected_message)
491  self.assertEqual(message, expected_message)
492 
494  from std_msgs.msg import Bool
495  expected_message = Bool(data=False)
496  dictionary = {}
497  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Bool', dictionary)
498  expected_message = serialize_deserialize(expected_message)
499  self.assertEqual(message, expected_message)
500 
502  dictionary = {}
503  with self.assertRaises(ValueError) as context:
504  message_converter.convert_dictionary_to_ros_message('std_msgs/Bool', dictionary, check_missing_fields=True)
505  self.assertEqual('''Missing fields "{'data': 'bool'}"''',
506  context.exception.args[0])
507 
509  from rospy_message_converter.msg import NestedUint8ArrayTestMessage, Uint8ArrayTestMessage
510  expected_message = NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=[])])
511  dictionary = {"arrays": [{}]}
512  message = message_converter.convert_dictionary_to_ros_message(
513  'rospy_message_converter/NestedUint8ArrayTestMessage', dictionary)
514  expected_message = serialize_deserialize(expected_message)
515  self.assertEqual(message, expected_message)
516 
518  dictionary = {"arrays": [{}]}
519  with self.assertRaises(ValueError) as context:
520  message_converter.convert_dictionary_to_ros_message('rospy_message_converter/NestedUint8ArrayTestMessage',
521  dictionary, check_missing_fields=True)
522  self.assertEqual('''Missing fields "{'data': 'uint8[]'}"''',
523  context.exception.args[0])
524 
526  dictionary = {"data": "should_be_a_bool"}
527  with self.assertRaises(TypeError) as context:
528  message_converter.convert_dictionary_to_ros_message('std_msgs/Bool', dictionary)
529  self.assertTrue("Field 'data' has wrong type" in context.exception.args[0])
530 
532  from std_msgs.msg import Float32
533  expected_message = Float32(data = struct.unpack('<f', b'\x7F\x7F\xFF\xFD')[0])
534  dictionary = { 'data': expected_message.data }
535  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Float32', dictionary)
536  expected_message = serialize_deserialize(expected_message)
537  self.assertEqual(message, expected_message)
538 
540  from std_msgs.msg import Float64
541  expected_message = Float64(data = struct.unpack('<d', b'\x7F\xEF\xFF\xFF\xFF\xFF\xFF\xFD')[0])
542  dictionary = { 'data': expected_message.data }
543  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Float64', dictionary)
544  expected_message = serialize_deserialize(expected_message)
545  self.assertEqual(message, expected_message)
546 
548  from std_msgs.msg import Header
549  from time import time
550  now_time = rospy.Time(time())
551  expected_message = Header(
552  stamp = now_time,
553  frame_id = 'my_frame',
554  seq = 12
555  )
556  dictionary = {
557  'stamp': { 'secs': now_time.secs, 'nsecs': now_time.nsecs },
558  'frame_id' : expected_message.frame_id,
559  'seq': expected_message.seq
560  }
561  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Header', dictionary)
562  expected_message = serialize_deserialize(expected_message)
563  self.assertEqual(message, expected_message)
564 
566  from std_msgs.msg import Header
567  from time import time
568  now_time = rospy.Time(time())
569  expected_message = Header(
570  stamp = now_time,
571  frame_id = 'my_frame',
572  seq = 12
573  )
574  dictionary = {
575  'stamp': { 'secs': now_time.secs, 'nsecs': now_time.nsecs },
576  'frame_id' : expected_message.frame_id,
577  'seq': expected_message.seq
578  }
579  message = message_converter.convert_dictionary_to_ros_message('Header', dictionary)
580  expected_message = serialize_deserialize(expected_message)
581  self.assertEqual(message, expected_message)
582 
584  from std_msgs.msg import Int8
585  expected_message = Int8(data = -0x7F)
586  dictionary = { 'data': expected_message.data }
587  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Int8', dictionary)
588  expected_message = serialize_deserialize(expected_message)
589  self.assertEqual(message, expected_message)
590 
592  from std_msgs.msg import UInt8
593  expected_message = UInt8(data = 0xFF)
594  dictionary = { 'data': expected_message.data }
595  message = message_converter.convert_dictionary_to_ros_message('std_msgs/UInt8', dictionary)
596  expected_message = serialize_deserialize(expected_message)
597  self.assertEqual(message, expected_message)
598 
600  from std_msgs.msg import Int16
601  expected_message = Int16(data = -0x7FFF)
602  dictionary = { 'data': expected_message.data }
603  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Int16', dictionary)
604  expected_message = serialize_deserialize(expected_message)
605  self.assertEqual(message, expected_message)
606 
608  from std_msgs.msg import UInt16
609  expected_message = UInt16(data = 0xFFFF)
610  dictionary = { 'data': expected_message.data }
611  message = message_converter.convert_dictionary_to_ros_message('std_msgs/UInt16', dictionary)
612  expected_message = serialize_deserialize(expected_message)
613  self.assertEqual(message, expected_message)
614 
616  from std_msgs.msg import Int32
617  expected_message = Int32(data = -0x7FFFFFFF)
618  dictionary = { 'data': expected_message.data }
619  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Int32', dictionary)
620  expected_message = serialize_deserialize(expected_message)
621  self.assertEqual(message, expected_message)
622 
624  from std_msgs.msg import UInt32
625  expected_message = UInt32(data = 0xFFFFFFFF)
626  dictionary = { 'data': expected_message.data }
627  message = message_converter.convert_dictionary_to_ros_message('std_msgs/UInt32', dictionary)
628  expected_message = serialize_deserialize(expected_message)
629  self.assertEqual(message, expected_message)
630 
632  from std_msgs.msg import Int64
633  expected_message = Int64(data = -0x7FFFFFFFFFFFFFFF)
634  dictionary = { 'data': expected_message.data }
635  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Int64', dictionary)
636  expected_message = serialize_deserialize(expected_message)
637  self.assertEqual(message, expected_message)
638 
640  from std_msgs.msg import UInt64
641  expected_message = UInt64(data = 0xFFFFFFFFFFFFFFFF)
642  dictionary = { 'data': expected_message.data }
643  message = message_converter.convert_dictionary_to_ros_message('std_msgs/UInt64', dictionary)
644  expected_message = serialize_deserialize(expected_message)
645  self.assertEqual(message, expected_message)
646 
648  from std_msgs.msg import String
649  expected_message = String(data = 'Hello')
650  dictionary = { 'data': expected_message.data }
651  message = message_converter.convert_dictionary_to_ros_message('std_msgs/String', dictionary)
652  expected_message = serialize_deserialize(expected_message)
653  self.assertEqual(message, expected_message)
654 
656  from std_msgs.msg import String
657  expected_message = String(data = u'Hello \u00dcnicode')
658  dictionary = { 'data': expected_message.data }
659  message = message_converter.convert_dictionary_to_ros_message('std_msgs/String', dictionary)
660  expected_message = serialize_deserialize(expected_message)
661  self.assertEqual(message.data,expected_message.data)
662  self.assertEqual(type(message.data),type(expected_message.data))
663 
665  from std_msgs.msg import Time
666  from time import time
667  now_time = rospy.Time(time())
668  expected_message = Time(data=now_time)
669  dictionary = {
670  'data': {
671  'secs' : now_time.secs,
672  'nsecs' : now_time.nsecs
673  }
674  }
675  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Time', dictionary)
676  expected_message = serialize_deserialize(expected_message)
677  self.assertEqual(message, expected_message)
678 
680  dictionary = {
681  'data': 'now'
682  }
683  with self.assertRaises(ROSInitException) as context:
684  message_converter.convert_dictionary_to_ros_message('std_msgs/Time', dictionary)
685  self.assertEqual('time is not initialized. Have you called init_node()?', context.exception.args[0])
686 
688  from std_msgs.msg import Float64MultiArray, MultiArrayLayout, MultiArrayDimension
689  expected_message = Float64MultiArray(
690  layout = MultiArrayLayout(
691  dim = [
692  MultiArrayDimension(label = 'Dimension1', size = 12, stride = 7),
693  MultiArrayDimension(label = 'Dimension2', size = 90, stride = 8)
694  ],
695  data_offset = 1
696  ),
697  data = [1.1, 2.2, 3.3]
698  )
699  dictionary = {
700  'layout': {
701  'dim': [
702  {
703  'label' : expected_message.layout.dim[0].label,
704  'size' : expected_message.layout.dim[0].size,
705  'stride' : expected_message.layout.dim[0].stride
706  },
707  {
708  'label' : expected_message.layout.dim[1].label,
709  'size' : expected_message.layout.dim[1].size,
710  'stride' : expected_message.layout.dim[1].stride
711  }
712  ],
713  'data_offset': expected_message.layout.data_offset
714  },
715  'data': expected_message.data
716  }
717  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Float64MultiArray', dictionary)
718  expected_message = serialize_deserialize(expected_message)
719  self.assertEqual(message, expected_message)
720 
722  self.assertRaises(ValueError,
723  message_converter.convert_dictionary_to_ros_message,
724  'std_msgs/Empty',
725  {'invalid_field': 1})
726 
728  from std_srvs.srv import EmptyRequest, EmptyResponse
729  expected_req = EmptyRequest()
730  expected_res = EmptyResponse()
731  dictionary_req = {}
732  dictionary_res = {}
733  message = message_converter.convert_dictionary_to_ros_message('std_srvs/Empty', dictionary_req,
734  'request')
735  expected_req = serialize_deserialize(expected_req)
736  self.assertEqual(message, expected_req)
737  message = message_converter.convert_dictionary_to_ros_message('std_srvs/Empty', dictionary_res,
738  'response')
739  expected_res = serialize_deserialize(expected_res)
740  self.assertEqual(message, expected_res)
741 
743  from rospy_message_converter.srv import NestedUint8ArrayTestServiceRequest, NestedUint8ArrayTestServiceResponse
744  from rospy_message_converter.msg import NestedUint8ArrayTestMessage, Uint8ArrayTestMessage
745  from base64 import b64encode
746  expected_data = bytes(bytearray([97, 98, 99]))
747  expected_req = NestedUint8ArrayTestServiceRequest(
748  input=NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)]))
749  expected_req = serialize_deserialize(expected_req)
750  expected_res = NestedUint8ArrayTestServiceResponse(
751  output=NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)]))
752  expected_res = serialize_deserialize(expected_res)
753 
754  dictionary_req = {"input": {"arrays": [{"data": b64encode(expected_data)}]}}
755  dictionary_res = {"output": {"arrays": [{"data": b64encode(expected_data)}]}}
756  message = message_converter.convert_dictionary_to_ros_message(
757  'rospy_message_converter/NestedUint8ArrayTestService', dictionary_req, 'request')
758  self.assertEqual(message, expected_req)
759  message = message_converter.convert_dictionary_to_ros_message(
760  'rospy_message_converter/NestedUint8ArrayTestService', dictionary_res, 'response')
761  self.assertEqual(message, expected_res)
762 
764  from std_srvs.srv import SetBoolRequest, SetBoolResponse
765  expected_req = SetBoolRequest(data=True)
766  expected_res = SetBoolResponse(success=True, message='Success!')
767  dictionary_req = { 'data': True }
768  dictionary_res = { 'success': True, 'message': 'Success!' }
769  message = message_converter.convert_dictionary_to_ros_message('std_srvs/SetBool', dictionary_req,
770  'request')
771  expected_req = serialize_deserialize(expected_req)
772  self.assertEqual(message, expected_req)
773  message = message_converter.convert_dictionary_to_ros_message('std_srvs/SetBool', dictionary_res,
774  'response')
775  expected_res = serialize_deserialize(expected_res)
776  self.assertEqual(message, expected_res)
777 
779  from std_srvs.srv import TriggerRequest, TriggerResponse
780  expected_req = TriggerRequest()
781  expected_res = TriggerResponse(success=True, message='Success!')
782  dictionary_req = {}
783  dictionary_res = { 'success': True, 'message': 'Success!' }
784  message = message_converter.convert_dictionary_to_ros_message('std_srvs/Trigger', dictionary_req,
785  'request')
786  expected_req = serialize_deserialize(expected_req)
787  self.assertEqual(message, expected_req)
788  message = message_converter.convert_dictionary_to_ros_message('std_srvs/Trigger', dictionary_res,
789  'response')
790  expected_res = serialize_deserialize(expected_res)
791  self.assertEqual(message, expected_res)
792 
794  with self.assertRaises(ValueError) as context:
795  message_converter.convert_dictionary_to_ros_message('std_msgs/Empty', {}, kind='invalid')
796  self.assertEqual('Unknown kind "invalid".', context.exception.args[0])
797 
799  from std_msgs.msg import Byte, Char, Float32, Float64, Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64
800  numpy_numeric_types = [np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32, np.uint64, np.float32, np.float64]
801  min_values = [np.iinfo(t).min for t in [np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32, np.uint64]] \
802  + [np.finfo(t).min for t in [np.float32, np.float64]]
803  max_values = [np.iinfo(t).max for t in [np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32, np.uint64]] \
804  + [np.finfo(t).max for t in [np.float32, np.float64]]
805  numeric_limits = {num_type: (min_val, max_val) for (num_type, min_val, max_val) in zip(numpy_numeric_types, min_values, max_values)}
806  ros_to_numpy_type_map = {
807  Float32 : [np.float32, np.int8, np.int16, np.uint8, np.uint16],
808  Float64 : [np.float32, np.float64, np.int8, np.int16, np.int32, np.uint8, np.uint16, np.uint32],
809  Int8 : [np.int8],
810  Int16 : [np.int8, np.int16, np.uint8],
811  Int32 : [np.int8, np.int16, np.int32, np.uint8, np.uint16],
812  Int64 : [np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32],
813  UInt8 : [np.uint8],
814  UInt16 : [np.uint8, np.uint16],
815  UInt32 : [np.uint8, np.uint16, np.uint32],
816  UInt64 : [np.uint8, np.uint16, np.uint32, np.uint64],
817  Byte : [np.int8],
818  Char : [np.uint8]
819  }
820  for ros_type, valid_numpy_types in ros_to_numpy_type_map.items():
821  for numpy_type in valid_numpy_types:
822  for value in numeric_limits[numpy_type]:
823  expected_message = ros_type(data=numpy_type(value))
824  dictionary = {
825  'data': numpy_type(value)
826  }
827  message = message_converter.convert_dictionary_to_ros_message(expected_message._type, dictionary)
828  expected_message = serialize_deserialize(expected_message)
829  self.assertEqual(message, expected_message)
830 
831  for wrong_numpy_type in [t for t in numpy_numeric_types if t not in valid_numpy_types]:
832  for value in numeric_limits[wrong_numpy_type]:
833  with self.assertRaises(TypeError) as context:
834  expected_message = ros_type(data=wrong_numpy_type(value))
835  dictionary = {
836  'data': wrong_numpy_type(value)
837  }
838  message_converter.convert_dictionary_to_ros_message(expected_message._type, dictionary)
839  self.assertTrue("Field 'data' has wrong type" in context.exception.args[0])
840 
841 
843  """
844  Serialize and then deserialize a message. This simulates sending a message
845  between ROS nodes and makes sure that the ROS messages being tested are
846  actually serializable, and are in the same format as they would be received
847  over the network. In rospy, it is possible to assign an illegal data type
848  to a message field (for example, `message = String(data=42)`), but trying
849  to publish this message will throw `SerializationError: field data must be
850  of type str`. This method will expose such bugs.
851  """
852  from io import BytesIO
853  buff = BytesIO()
854  message.serialize(buff)
855  result = message.__class__() # create new instance of same class as message
856  result.deserialize(buff.getvalue())
857  return result
858 
859 
860 PKG = 'rospy_message_converter'
861 NAME = 'test_message_converter'
862 if __name__ == '__main__':
863  import rosunit
864  rosunit.unitrun(PKG, NAME, TestMessageConverter)


rospy_message_converter
Author(s): Brandon Alexander
autogenerated on Tue Mar 2 2021 03:04:27