test_message_converter.py
Go to the documentation of this file.
1 # -*- coding: utf-8 -*-
2 #
3 # Software License Agreement (BSD License)
4 #
5 # Copyright (c) 2019-2022, Martin Günther (DFKI GmbH) and others
6 # Copyright (c) 2013-2016, Brandon Alexander
7 #
8 # All rights reserved.
9 #
10 # Redistribution and use in source and binary forms, with or without
11 # modification, are permitted provided that the following conditions
12 # are met:
13 #
14 # * Redistributions of source code must retain the above copyright
15 # notice, this list of conditions and the following disclaimer.
16 # * Redistributions in binary form must reproduce the above
17 # copyright notice, this list of conditions and the following
18 # disclaimer in the documentation and/or other materials provided
19 # with the distribution.
20 # * Neither the name of this project nor the names of its
21 # contributors may be used to endorse or promote products derived
22 # from this software without specific prior written permission.
23 #
24 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
27 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
28 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
29 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
30 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
31 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
32 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
33 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
34 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
35 # POSSIBILITY OF SUCH DAMAGE.
36 
37 import numpy as np
38 import struct
39 import sys
40 import unittest
41 
42 import rospy
43 from rospy.exceptions import ROSInitException
44 from rospy_message_converter import message_converter
45 
46 python3 = sys.hexversion > 0x03000000
47 
48 
49 class TestMessageConverter(unittest.TestCase):
51  from rospy_message_converter.msg import TestArray
52 
53  expected_dictionary = {'data': [1.1, 2.2, 3.3]}
54  message = TestArray(data=expected_dictionary['data'])
55  message = serialize_deserialize(message)
56  dictionary = message_converter.convert_ros_message_to_dictionary(message)
57  self.assertEqual(dictionary, expected_dictionary)
58 
60  from std_msgs.msg import Bool
61 
62  expected_dictionary = {'data': True}
63  message = Bool(data=expected_dictionary['data'])
64  message = serialize_deserialize(message)
65  dictionary = message_converter.convert_ros_message_to_dictionary(message)
66  self.assertEqual(dictionary, expected_dictionary)
67 
69  from std_msgs.msg import Byte
70 
71  expected_dictionary = {'data': 5}
72  message = Byte(data=expected_dictionary['data'])
73  message = serialize_deserialize(message)
74  dictionary = message_converter.convert_ros_message_to_dictionary(message)
75  self.assertEqual(dictionary, expected_dictionary)
76 
78  from std_msgs.msg import Char
79 
80  expected_dictionary = {'data': 99}
81  message = Char(data=expected_dictionary['data'])
82  message = serialize_deserialize(message)
83  dictionary = message_converter.convert_ros_message_to_dictionary(message)
84  self.assertEqual(dictionary, expected_dictionary)
85 
87  from std_msgs.msg import Duration
88 
89  duration = rospy.rostime.Duration(33, 25)
90  expected_dictionary = {'data': {'secs': duration.secs, 'nsecs': duration.nsecs}}
91  message = Duration(data=duration)
92  message = serialize_deserialize(message)
93  dictionary = message_converter.convert_ros_message_to_dictionary(message)
94  self.assertEqual(dictionary, expected_dictionary)
95 
97  from std_msgs.msg import Empty
98 
99  expected_dictionary = {}
100  message = Empty()
101  message = serialize_deserialize(message)
102  dictionary = message_converter.convert_ros_message_to_dictionary(message)
103  self.assertEqual(dictionary, expected_dictionary)
104 
106  from std_msgs.msg import Float32
107 
108  expected_dictionary = {'data': struct.unpack('<f', b'\x7F\x7F\xFF\xFD')[0]}
109  message = Float32(data=expected_dictionary['data'])
110  message = serialize_deserialize(message)
111  dictionary = message_converter.convert_ros_message_to_dictionary(message)
112  self.assertEqual(dictionary, expected_dictionary)
113 
115  from std_msgs.msg import Float64
116 
117  expected_dictionary = {'data': struct.unpack('<d', b'\x7F\xEF\xFF\xFF\xFF\xFF\xFF\xFD')[0]}
118  message = Float64(data=expected_dictionary['data'])
119  message = serialize_deserialize(message)
120  dictionary = message_converter.convert_ros_message_to_dictionary(message)
121  self.assertEqual(dictionary, expected_dictionary)
122 
124  from std_msgs.msg import Header
125  from time import time
126 
127  now_time = rospy.Time(time())
128  expected_dictionary = {
129  'stamp': {'secs': now_time.secs, 'nsecs': now_time.nsecs},
130  'frame_id': 'my_frame',
131  'seq': 3,
132  }
133  message = Header(stamp=now_time, frame_id=expected_dictionary['frame_id'], seq=expected_dictionary['seq'])
134  message = serialize_deserialize(message)
135  dictionary = message_converter.convert_ros_message_to_dictionary(message)
136  self.assertEqual(dictionary, expected_dictionary)
137 
139  from std_msgs.msg import Int8
140 
141  expected_dictionary = {'data': -0x7F}
142  message = Int8(data=expected_dictionary['data'])
143  message = serialize_deserialize(message)
144  dictionary = message_converter.convert_ros_message_to_dictionary(message)
145  self.assertEqual(dictionary, expected_dictionary)
146 
148  from std_msgs.msg import UInt8
149 
150  expected_dictionary = {'data': 0xFF}
151  message = UInt8(data=expected_dictionary['data'])
152  message = serialize_deserialize(message)
153  dictionary = message_converter.convert_ros_message_to_dictionary(message)
154  self.assertEqual(dictionary, expected_dictionary)
155 
157  from rospy_message_converter.msg import Uint8ArrayTestMessage
158  from base64 import b64encode
159 
160  expected_data = [97, 98, 99, 100]
161  message = Uint8ArrayTestMessage(data=expected_data)
162  message = serialize_deserialize(message)
163  dictionary = message_converter.convert_ros_message_to_dictionary(message)
164  expected_data = b64encode(bytearray(expected_data)).decode('utf-8')
165  self.assertEqual(dictionary["data"], expected_data)
166 
168  from rospy_message_converter.msg import Uint8Array3TestMessage
169  from base64 import b64encode
170 
171  expected_data = [97, 98, 99]
172  message = Uint8Array3TestMessage(data=expected_data)
173  message = serialize_deserialize(message)
174  dictionary = message_converter.convert_ros_message_to_dictionary(message)
175  expected_data = b64encode(bytearray(expected_data)).decode('utf-8')
176  self.assertEqual(dictionary["data"], expected_data)
177 
179  from rospy_message_converter.msg import Uint8Array3TestMessage
180 
181  expected_data = [97, 98, 99]
182  message = Uint8Array3TestMessage(data=expected_data)
183  message = serialize_deserialize(message)
184  dictionary = message_converter.convert_ros_message_to_dictionary(message, binary_array_as_bytes=False)
185  self.assertEqual(dictionary["data"], expected_data)
186 
188  from rospy_message_converter.msg import NestedUint8ArrayTestMessage, Uint8ArrayTestMessage
189 
190  expected_data = [97, 98, 99]
191  message = NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)])
192  message = serialize_deserialize(message)
193  dictionary = message_converter.convert_ros_message_to_dictionary(message, binary_array_as_bytes=False)
194  self.assertEqual(dictionary["arrays"][0]["data"], expected_data)
195 
197  from rospy_message_converter.msg import NestedUint8ArrayTestMessage, Uint8ArrayTestMessage
198  from base64 import b64encode
199 
200  expected_data = bytes(bytearray([97, 98, 99]))
201  message = NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)])
202  message = serialize_deserialize(message)
203  dictionary = message_converter.convert_ros_message_to_dictionary(message)
204  self.assertEqual(dictionary["arrays"][0]["data"], b64encode(expected_data).decode('utf-8'))
205 
207  from std_msgs.msg import Int16
208 
209  expected_dictionary = {'data': -0x7FFF}
210  message = Int16(data=expected_dictionary['data'])
211  message = serialize_deserialize(message)
212  dictionary = message_converter.convert_ros_message_to_dictionary(message)
213  self.assertEqual(dictionary, expected_dictionary)
214 
216  from std_msgs.msg import UInt16
217 
218  expected_dictionary = {'data': 0xFFFF}
219  message = UInt16(data=expected_dictionary['data'])
220  message = serialize_deserialize(message)
221  dictionary = message_converter.convert_ros_message_to_dictionary(message)
222  self.assertEqual(dictionary, expected_dictionary)
223 
225  from std_msgs.msg import Int32
226 
227  expected_dictionary = {'data': -0x7FFFFFFF}
228  message = Int32(data=expected_dictionary['data'])
229  message = serialize_deserialize(message)
230  dictionary = message_converter.convert_ros_message_to_dictionary(message)
231  self.assertEqual(dictionary, expected_dictionary)
232 
234  from std_msgs.msg import UInt32
235 
236  expected_dictionary = {'data': 0xFFFFFFFF}
237  message = UInt32(data=expected_dictionary['data'])
238  message = serialize_deserialize(message)
239  dictionary = message_converter.convert_ros_message_to_dictionary(message)
240  self.assertEqual(dictionary, expected_dictionary)
241 
243  from std_msgs.msg import Int64
244 
245  expected_dictionary = {'data': -0x7FFFFFFFFFFFFFFF}
246  message = Int64(data=expected_dictionary['data'])
247  message = serialize_deserialize(message)
248  dictionary = message_converter.convert_ros_message_to_dictionary(message)
249  self.assertEqual(dictionary, expected_dictionary)
250 
252  from std_msgs.msg import UInt64
253 
254  expected_dictionary = {'data': 0xFFFFFFFFFFFFFFFF}
255  message = UInt64(data=expected_dictionary['data'])
256  message = serialize_deserialize(message)
257  dictionary = message_converter.convert_ros_message_to_dictionary(message)
258  self.assertEqual(dictionary, expected_dictionary)
259 
261  from std_msgs.msg import String
262 
263  expected_dictionary = {'data': 'Hello'}
264  message = String(data=expected_dictionary['data'])
265  message = serialize_deserialize(message)
266  dictionary = message_converter.convert_ros_message_to_dictionary(message)
267  self.assertEqual(dictionary, expected_dictionary)
268 
270  """
271  Test that strings are encoded as utf8
272  """
273  from std_msgs.msg import String
274 
275  expected_dictionary = {'data': u'Hello \u00dcnicode'}
276  message = String(data=expected_dictionary['data'])
277  message = serialize_deserialize(message)
278  dictionary = message_converter.convert_ros_message_to_dictionary(message)
279  self.assertEqual(dictionary, expected_dictionary)
280 
282  from std_msgs.msg import Time
283  from time import time
284 
285  now_time = rospy.Time(time())
286  expected_dictionary = {'data': {'secs': now_time.secs, 'nsecs': now_time.nsecs}}
287  message = Time(data=now_time)
288  message = serialize_deserialize(message)
289  dictionary = message_converter.convert_ros_message_to_dictionary(message)
290  self.assertEqual(dictionary, expected_dictionary)
291 
293  from std_msgs.msg import Float64MultiArray, MultiArrayLayout, MultiArrayDimension
294 
295  expected_dictionary = {
296  'layout': {
297  'dim': [
298  {'label': 'Dimension1', 'size': 12, 'stride': 7},
299  {'label': 'Dimension2', 'size': 24, 'stride': 14},
300  ],
301  'data_offset': 0,
302  },
303  'data': [1.1, 2.2, 3.3],
304  }
305  dimension1 = MultiArrayDimension(
306  label=expected_dictionary['layout']['dim'][0]['label'],
307  size=expected_dictionary['layout']['dim'][0]['size'],
308  stride=expected_dictionary['layout']['dim'][0]['stride'],
309  )
310  dimension2 = MultiArrayDimension(
311  label=expected_dictionary['layout']['dim'][1]['label'],
312  size=expected_dictionary['layout']['dim'][1]['size'],
313  stride=expected_dictionary['layout']['dim'][1]['stride'],
314  )
315  layout = MultiArrayLayout(
316  dim=[dimension1, dimension2], data_offset=expected_dictionary['layout']['data_offset']
317  )
318  message = Float64MultiArray(layout=layout, data=expected_dictionary['data'])
319  message = serialize_deserialize(message)
320  dictionary = message_converter.convert_ros_message_to_dictionary(message)
321  self.assertEqual(dictionary, expected_dictionary)
322 
324  from std_srvs.srv import EmptyRequest, EmptyResponse
325 
326  expected_dictionary_req = {}
327  expected_dictionary_res = {}
328  request = EmptyRequest()
329  request = serialize_deserialize(request)
330  response = EmptyResponse()
331  response = serialize_deserialize(response)
332  dictionary_req = message_converter.convert_ros_message_to_dictionary(request)
333  self.assertEqual(dictionary_req, expected_dictionary_req)
334  dictionary_res = message_converter.convert_ros_message_to_dictionary(response)
335  self.assertEqual(dictionary_res, expected_dictionary_res)
336 
338  from rospy_message_converter.srv import NestedUint8ArrayTestServiceRequest, NestedUint8ArrayTestServiceResponse
339  from rospy_message_converter.msg import NestedUint8ArrayTestMessage, Uint8ArrayTestMessage
340  from base64 import b64encode
341 
342  expected_data = bytes(bytearray([97, 98, 99]))
343 
344  expected_dictionary_req = {"input": {"arrays": [{"data": b64encode(expected_data).decode('utf-8')}]}}
345  expected_dictionary_res = {"output": {"arrays": [{"data": b64encode(expected_data).decode('utf-8')}]}}
346  request = NestedUint8ArrayTestServiceRequest(
347  input=NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)])
348  )
349  request = serialize_deserialize(request)
350  response = NestedUint8ArrayTestServiceResponse(
351  output=NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)])
352  )
353  response = serialize_deserialize(response)
354 
355  dictionary_req = message_converter.convert_ros_message_to_dictionary(request)
356  self.assertEqual(dictionary_req, expected_dictionary_req)
357  dictionary_res = message_converter.convert_ros_message_to_dictionary(response)
358  self.assertEqual(dictionary_res, expected_dictionary_res)
359 
361  from rospy_message_converter.msg import TestArray
362 
363  expected_message = TestArray(data=[1.1, 2.2, 3.3, 4.4])
364  dictionary = {'data': expected_message.data}
365  message = message_converter.convert_dictionary_to_ros_message('rospy_message_converter/TestArray', dictionary)
366  expected_message = serialize_deserialize(expected_message)
367  self.assertEqual(message, expected_message)
368 
370  """
371  rospy treats uint8[] data as `bytes`, which is the Python representation for byte data. In Python 2, this is
372  the same as `str`. The `bytes` value must be base64-encoded.
373  """
374  from rospy_message_converter.msg import Uint8ArrayTestMessage
375  from base64 import b64encode
376 
377  expected_message = Uint8ArrayTestMessage(data=bytes(bytearray([97, 98, 99])))
378  dictionary = {'data': b64encode(expected_message.data)} # base64 encoding
379  message = message_converter.convert_dictionary_to_ros_message(
380  'rospy_message_converter/Uint8ArrayTestMessage', dictionary
381  )
382  expected_message = serialize_deserialize(expected_message)
383  self.assertEqual(message, expected_message)
384 
386  """
387  Even though rospy treats uint8[] data as `bytes`, rospy_message_converter also handles lists of int. In that
388  case, the input data must *not* be base64-encoded.
389  """
390  from rospy_message_converter.msg import Uint8ArrayTestMessage
391 
392  expected_message = Uint8ArrayTestMessage(data=[1, 2, 3, 4])
393  dictionary = {'data': expected_message.data} # no base64 encoding
394  message = message_converter.convert_dictionary_to_ros_message(
395  'rospy_message_converter/Uint8ArrayTestMessage', dictionary
396  )
397  expected_message = serialize_deserialize(expected_message)
398  self.assertEqual(message, expected_message)
399 
401  dictionary = {'data': [1, 2, 3, 4000]}
402  with self.assertRaises(ValueError) as context:
403  message_converter.convert_dictionary_to_ros_message(
404  'rospy_message_converter/Uint8ArrayTestMessage', dictionary
405  )
406  self.assertEqual('byte must be in range(0, 256)', context.exception.args[0])
407 
409  """
410  If the value of a uint8[] field has type `bytes`, rospy_message_converter expects that data to be
411  base64-encoded and runs b64decode on it. This test documents what happens if the value is
412  not base64-encoded.
413  """
414  from rospy_message_converter.msg import Uint8ArrayTestMessage
415  import binascii
416 
417  # this raises a TypeError, because:
418  # * b64decode removes all characters that are not in the standard alphabet ([A-Za-Z0-9+/])
419  # * this only leaves 97 (= 'a')
420  # * the length of a base64 string must be a multiple of 4 characters (if necessary, padded at the end with '=')
421  # * since the length of 'a' is not a multiple of 4, a TypeError is thrown
422  dictionary = {'data': bytes(bytearray([1, 2, 97, 4]))}
423  with self.assertRaises((TypeError, binascii.Error)) as context:
424  message_converter.convert_dictionary_to_ros_message(
425  'rospy_message_converter/Uint8ArrayTestMessage', dictionary
426  )
427  if type(context.exception) == TypeError: # python2
428  error_msg = context.exception.args[0].args[0]
429  else: # python3
430  error_msg = context.exception.args[0]
431  self.assertIn(error_msg, ['Incorrect padding', 'Non-base64 digit found'])
432 
433  dictionary = {'data': bytes(bytearray([1, 97, 97, 2, 3, 97, 4, 97]))}
434  if python3:
435  # On python3, we validate the input, so an error is raised.
436  with self.assertRaises(binascii.Error) as context:
437  message_converter.convert_dictionary_to_ros_message(
438  'rospy_message_converter/Uint8ArrayTestMessage', dictionary
439  )
440  self.assertEqual('Non-base64 digit found', context.exception.args[0])
441  else:
442  # if the dictionary contains a multiple of 4 characters from the standard alphabet, no error is raised
443  # (but the result is garbage).
444  message = message_converter.convert_dictionary_to_ros_message(
445  'rospy_message_converter/Uint8ArrayTestMessage', dictionary
446  )
447  expected_message = serialize_deserialize(Uint8ArrayTestMessage(data=bytes(bytearray([105, 166, 154]))))
448  self.assertEqual(message, expected_message)
449 
451  from rospy_message_converter.msg import Uint8Array3TestMessage
452  from base64 import b64encode
453 
454  expected_message = Uint8Array3TestMessage(data=bytes(bytearray([97, 98, 99])))
455  dictionary = {'data': b64encode(expected_message.data)}
456  message = message_converter.convert_dictionary_to_ros_message(
457  'rospy_message_converter/Uint8Array3TestMessage', dictionary
458  )
459  expected_message = serialize_deserialize(expected_message)
460  self.assertEqual(message, expected_message)
461 
463  from rospy_message_converter.msg import Uint8Array3TestMessage
464 
465  expected_message = Uint8Array3TestMessage(data=[97, 98, 99])
466  dictionary = {'data': expected_message.data}
467  message = message_converter.convert_dictionary_to_ros_message(
468  'rospy_message_converter/Uint8Array3TestMessage', dictionary
469  )
470  expected_message = serialize_deserialize(expected_message)
471  self.assertEqual(message, expected_message)
472 
474  from std_msgs.msg import Bool
475 
476  expected_message = Bool(data=True)
477  dictionary = {'data': expected_message.data}
478  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Bool', dictionary)
479  expected_message = serialize_deserialize(expected_message)
480  self.assertEqual(message, expected_message)
481 
483  from std_msgs.msg import Byte
484 
485  expected_message = Byte(data=3)
486  dictionary = {'data': expected_message.data}
487  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Byte', dictionary)
488  expected_message = serialize_deserialize(expected_message)
489  self.assertEqual(message, expected_message)
490 
492  from std_msgs.msg import Char
493 
494  expected_message = Char(data=99)
495  dictionary = {'data': expected_message.data}
496  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Char', dictionary)
497  expected_message = serialize_deserialize(expected_message)
498  self.assertEqual(message, expected_message)
499 
501  from std_msgs.msg import Duration
502 
503  duration = rospy.rostime.Duration(33, 25)
504  expected_message = Duration(data=duration)
505  dictionary = {'data': {'secs': duration.secs, 'nsecs': duration.nsecs}}
506  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Duration', dictionary)
507  expected_message = serialize_deserialize(expected_message)
508  self.assertEqual(message, expected_message)
509 
511  from std_msgs.msg import Empty
512 
513  expected_message = Empty()
514  dictionary = {}
515  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Empty', dictionary)
516  expected_message = serialize_deserialize(expected_message)
517  self.assertEqual(message, expected_message)
518 
520  dictionary = {"additional_args": "should raise value error"}
521  with self.assertRaises(ValueError) as context:
522  message_converter.convert_dictionary_to_ros_message('std_msgs/Empty', dictionary)
523  self.assertEqual(
524  '''ROS message type "std_msgs/Empty" has no field named "additional_args"''', context.exception.args[0]
525  )
526 
528  from std_msgs.msg import Empty
529 
530  expected_message = Empty()
531  dictionary = {"additional_args": "should be ignored"}
532  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Empty', dictionary, strict_mode=False)
533  expected_message = serialize_deserialize(expected_message)
534  self.assertEqual(message, expected_message)
535 
537  from base64 import b64encode
538 
539  expected_data = bytes(bytearray([97, 98, 99]))
540  dictionary = {"arrays": [{"data": b64encode(expected_data), "additional_args": "should raise value error"}]}
541  with self.assertRaises(ValueError) as context:
542  message_converter.convert_dictionary_to_ros_message(
543  'rospy_message_converter/NestedUint8ArrayTestMessage', dictionary
544  )
545  self.assertEqual(
546  'ROS message type "rospy_message_converter/Uint8ArrayTestMessage" has no field named "additional_args"',
547  context.exception.args[0],
548  )
549 
551  from rospy_message_converter.msg import NestedUint8ArrayTestMessage, Uint8ArrayTestMessage
552  from base64 import b64encode
553 
554  expected_data = bytes(bytearray([97, 98, 99]))
555  expected_message = NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)])
556  dictionary = {"arrays": [{"data": b64encode(expected_data), "additional_args": "should be ignored"}]}
557  message = message_converter.convert_dictionary_to_ros_message(
558  'rospy_message_converter/NestedUint8ArrayTestMessage', dictionary, strict_mode=False
559  )
560  expected_message = serialize_deserialize(expected_message)
561  self.assertEqual(message, expected_message)
562 
564  from std_msgs.msg import Bool
565 
566  expected_message = Bool(data=False)
567  dictionary = {}
568  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Bool', dictionary)
569  expected_message = serialize_deserialize(expected_message)
570  self.assertEqual(message, expected_message)
571 
573  dictionary = {}
574  with self.assertRaises(ValueError) as context:
575  message_converter.convert_dictionary_to_ros_message('std_msgs/Bool', dictionary, check_missing_fields=True)
576  self.assertEqual('''Missing fields "{'data': 'bool'}"''', context.exception.args[0])
577 
579  from rospy_message_converter.msg import NestedUint8ArrayTestMessage, Uint8ArrayTestMessage
580 
581  expected_message = NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=[])])
582  dictionary = {"arrays": [{}]}
583  message = message_converter.convert_dictionary_to_ros_message(
584  'rospy_message_converter/NestedUint8ArrayTestMessage', dictionary
585  )
586  expected_message = serialize_deserialize(expected_message)
587  self.assertEqual(message, expected_message)
588 
590  dictionary = {"arrays": [{}]}
591  with self.assertRaises(ValueError) as context:
592  message_converter.convert_dictionary_to_ros_message(
593  'rospy_message_converter/NestedUint8ArrayTestMessage', dictionary, check_missing_fields=True
594  )
595  self.assertEqual('''Missing fields "{'data': 'uint8[]'}"''', context.exception.args[0])
596 
598  dictionary = {"data": "should_be_a_bool"}
599  with self.assertRaises(TypeError) as context:
600  message_converter.convert_dictionary_to_ros_message('std_msgs/Bool', dictionary)
601  self.assertTrue("Field 'data' has wrong type" in context.exception.args[0])
602 
604  from std_msgs.msg import Float32
605 
606  expected_message = Float32(data=struct.unpack('<f', b'\x7F\x7F\xFF\xFD')[0])
607  dictionary = {'data': expected_message.data}
608  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Float32', dictionary)
609  expected_message = serialize_deserialize(expected_message)
610  self.assertEqual(message, expected_message)
611 
613  from std_msgs.msg import Float64
614 
615  expected_message = Float64(data=struct.unpack('<d', b'\x7F\xEF\xFF\xFF\xFF\xFF\xFF\xFD')[0])
616  dictionary = {'data': expected_message.data}
617  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Float64', dictionary)
618  expected_message = serialize_deserialize(expected_message)
619  self.assertEqual(message, expected_message)
620 
622  from std_msgs.msg import Header
623  from time import time
624 
625  now_time = rospy.Time(time())
626  expected_message = Header(stamp=now_time, frame_id='my_frame', seq=12)
627  dictionary = {
628  'stamp': {'secs': now_time.secs, 'nsecs': now_time.nsecs},
629  'frame_id': expected_message.frame_id,
630  'seq': expected_message.seq,
631  }
632  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Header', dictionary)
633  expected_message = serialize_deserialize(expected_message)
634  self.assertEqual(message, expected_message)
635 
637  from std_msgs.msg import Header
638  from time import time
639 
640  now_time = rospy.Time(time())
641  expected_message = Header(stamp=now_time, frame_id='my_frame', seq=12)
642  dictionary = {
643  'stamp': {'secs': now_time.secs, 'nsecs': now_time.nsecs},
644  'frame_id': expected_message.frame_id,
645  'seq': expected_message.seq,
646  }
647  message = message_converter.convert_dictionary_to_ros_message('Header', dictionary)
648  expected_message = serialize_deserialize(expected_message)
649  self.assertEqual(message, expected_message)
650 
652  from std_msgs.msg import Int8
653 
654  expected_message = Int8(data=-0x7F)
655  dictionary = {'data': expected_message.data}
656  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Int8', dictionary)
657  expected_message = serialize_deserialize(expected_message)
658  self.assertEqual(message, expected_message)
659 
661  from std_msgs.msg import UInt8
662 
663  expected_message = UInt8(data=0xFF)
664  dictionary = {'data': expected_message.data}
665  message = message_converter.convert_dictionary_to_ros_message('std_msgs/UInt8', dictionary)
666  expected_message = serialize_deserialize(expected_message)
667  self.assertEqual(message, expected_message)
668 
670  from std_msgs.msg import Int16
671 
672  expected_message = Int16(data=-0x7FFF)
673  dictionary = {'data': expected_message.data}
674  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Int16', dictionary)
675  expected_message = serialize_deserialize(expected_message)
676  self.assertEqual(message, expected_message)
677 
679  from std_msgs.msg import UInt16
680 
681  expected_message = UInt16(data=0xFFFF)
682  dictionary = {'data': expected_message.data}
683  message = message_converter.convert_dictionary_to_ros_message('std_msgs/UInt16', dictionary)
684  expected_message = serialize_deserialize(expected_message)
685  self.assertEqual(message, expected_message)
686 
688  from std_msgs.msg import Int32
689 
690  expected_message = Int32(data=-0x7FFFFFFF)
691  dictionary = {'data': expected_message.data}
692  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Int32', dictionary)
693  expected_message = serialize_deserialize(expected_message)
694  self.assertEqual(message, expected_message)
695 
697  from std_msgs.msg import UInt32
698 
699  expected_message = UInt32(data=0xFFFFFFFF)
700  dictionary = {'data': expected_message.data}
701  message = message_converter.convert_dictionary_to_ros_message('std_msgs/UInt32', dictionary)
702  expected_message = serialize_deserialize(expected_message)
703  self.assertEqual(message, expected_message)
704 
706  from std_msgs.msg import Int64
707 
708  expected_message = Int64(data=-0x7FFFFFFFFFFFFFFF)
709  dictionary = {'data': expected_message.data}
710  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Int64', dictionary)
711  expected_message = serialize_deserialize(expected_message)
712  self.assertEqual(message, expected_message)
713 
715  from std_msgs.msg import UInt64
716 
717  expected_message = UInt64(data=0xFFFFFFFFFFFFFFFF)
718  dictionary = {'data': expected_message.data}
719  message = message_converter.convert_dictionary_to_ros_message('std_msgs/UInt64', dictionary)
720  expected_message = serialize_deserialize(expected_message)
721  self.assertEqual(message, expected_message)
722 
724  from std_msgs.msg import String
725 
726  expected_message = String(data='Hello')
727  dictionary = {'data': expected_message.data}
728  message = message_converter.convert_dictionary_to_ros_message('std_msgs/String', dictionary)
729  expected_message = serialize_deserialize(expected_message)
730  self.assertEqual(message, expected_message)
731 
733  from std_msgs.msg import String
734 
735  expected_message = String(data=u'Hello \u00dcnicode')
736  dictionary = {'data': expected_message.data}
737  message = message_converter.convert_dictionary_to_ros_message('std_msgs/String', dictionary)
738  expected_message = serialize_deserialize(expected_message)
739  self.assertEqual(message.data, expected_message.data)
740  self.assertEqual(type(message.data), type(expected_message.data))
741 
743  from std_msgs.msg import Time
744  from time import time
745 
746  now_time = rospy.Time(time())
747  expected_message = Time(data=now_time)
748  dictionary = {'data': {'secs': now_time.secs, 'nsecs': now_time.nsecs}}
749  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Time', dictionary)
750  expected_message = serialize_deserialize(expected_message)
751  self.assertEqual(message, expected_message)
752 
754  dictionary = {'data': 'now'}
755  with self.assertRaises(ROSInitException) as context:
756  message_converter.convert_dictionary_to_ros_message('std_msgs/Time', dictionary)
757  self.assertEqual('time is not initialized. Have you called init_node()?', context.exception.args[0])
758 
760  from std_msgs.msg import Float64MultiArray, MultiArrayLayout, MultiArrayDimension
761 
762  expected_message = Float64MultiArray(
763  layout=MultiArrayLayout(
764  dim=[
765  MultiArrayDimension(label='Dimension1', size=12, stride=7),
766  MultiArrayDimension(label='Dimension2', size=90, stride=8),
767  ],
768  data_offset=1,
769  ),
770  data=[1.1, 2.2, 3.3],
771  )
772  dictionary = {
773  'layout': {
774  'dim': [
775  {
776  'label': expected_message.layout.dim[0].label,
777  'size': expected_message.layout.dim[0].size,
778  'stride': expected_message.layout.dim[0].stride,
779  },
780  {
781  'label': expected_message.layout.dim[1].label,
782  'size': expected_message.layout.dim[1].size,
783  'stride': expected_message.layout.dim[1].stride,
784  },
785  ],
786  'data_offset': expected_message.layout.data_offset,
787  },
788  'data': expected_message.data,
789  }
790  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Float64MultiArray', dictionary)
791  expected_message = serialize_deserialize(expected_message)
792  self.assertEqual(message, expected_message)
793 
795  self.assertRaises(
796  ValueError, message_converter.convert_dictionary_to_ros_message, 'std_msgs/Empty', {'invalid_field': 1}
797  )
798 
800  from std_srvs.srv import EmptyRequest, EmptyResponse
801 
802  expected_req = EmptyRequest()
803  expected_res = EmptyResponse()
804  dictionary_req = {}
805  dictionary_res = {}
806  message = message_converter.convert_dictionary_to_ros_message('std_srvs/Empty', dictionary_req, 'request')
807  expected_req = serialize_deserialize(expected_req)
808  self.assertEqual(message, expected_req)
809  message = message_converter.convert_dictionary_to_ros_message('std_srvs/Empty', dictionary_res, 'response')
810  expected_res = serialize_deserialize(expected_res)
811  self.assertEqual(message, expected_res)
812 
814  from rospy_message_converter.srv import NestedUint8ArrayTestServiceRequest, NestedUint8ArrayTestServiceResponse
815  from rospy_message_converter.msg import NestedUint8ArrayTestMessage, Uint8ArrayTestMessage
816  from base64 import b64encode
817 
818  expected_data = bytes(bytearray([97, 98, 99]))
819  expected_req = NestedUint8ArrayTestServiceRequest(
820  input=NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)])
821  )
822  expected_req = serialize_deserialize(expected_req)
823  expected_res = NestedUint8ArrayTestServiceResponse(
824  output=NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)])
825  )
826  expected_res = serialize_deserialize(expected_res)
827 
828  dictionary_req = {"input": {"arrays": [{"data": b64encode(expected_data)}]}}
829  dictionary_res = {"output": {"arrays": [{"data": b64encode(expected_data)}]}}
830  message = message_converter.convert_dictionary_to_ros_message(
831  'rospy_message_converter/NestedUint8ArrayTestService', dictionary_req, 'request'
832  )
833  self.assertEqual(message, expected_req)
834  message = message_converter.convert_dictionary_to_ros_message(
835  'rospy_message_converter/NestedUint8ArrayTestService', dictionary_res, 'response'
836  )
837  self.assertEqual(message, expected_res)
838 
840  from std_srvs.srv import SetBoolRequest, SetBoolResponse
841 
842  expected_req = SetBoolRequest(data=True)
843  expected_res = SetBoolResponse(success=True, message='Success!')
844  dictionary_req = {'data': True}
845  dictionary_res = {'success': True, 'message': 'Success!'}
846  message = message_converter.convert_dictionary_to_ros_message('std_srvs/SetBool', dictionary_req, 'request')
847  expected_req = serialize_deserialize(expected_req)
848  self.assertEqual(message, expected_req)
849  message = message_converter.convert_dictionary_to_ros_message('std_srvs/SetBool', dictionary_res, 'response')
850  expected_res = serialize_deserialize(expected_res)
851  self.assertEqual(message, expected_res)
852 
854  from std_srvs.srv import TriggerRequest, TriggerResponse
855 
856  expected_req = TriggerRequest()
857  expected_res = TriggerResponse(success=True, message='Success!')
858  dictionary_req = {}
859  dictionary_res = {'success': True, 'message': 'Success!'}
860  message = message_converter.convert_dictionary_to_ros_message('std_srvs/Trigger', dictionary_req, 'request')
861  expected_req = serialize_deserialize(expected_req)
862  self.assertEqual(message, expected_req)
863  message = message_converter.convert_dictionary_to_ros_message('std_srvs/Trigger', dictionary_res, 'response')
864  expected_res = serialize_deserialize(expected_res)
865  self.assertEqual(message, expected_res)
866 
868  with self.assertRaises(ValueError) as context:
869  message_converter.convert_dictionary_to_ros_message('std_msgs/Empty', {}, kind='invalid')
870  self.assertEqual('Unknown kind "invalid".', context.exception.args[0])
871 
873  from std_msgs.msg import Byte, Char, Float32, Float64, Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64
874 
875  numpy_numeric_types = [
876  np.int8,
877  np.int16,
878  np.int32,
879  np.int64,
880  np.uint8,
881  np.uint16,
882  np.uint32,
883  np.uint64,
884  np.float32,
885  np.float64,
886  ]
887  min_values = [
888  np.iinfo(t).min for t in [np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32, np.uint64]
889  ] + [np.finfo(t).min for t in [np.float32, np.float64]]
890  max_values = [
891  np.iinfo(t).max for t in [np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32, np.uint64]
892  ] + [np.finfo(t).max for t in [np.float32, np.float64]]
893  numeric_limits = {
894  num_type: (min_val, max_val)
895  for (num_type, min_val, max_val) in zip(numpy_numeric_types, min_values, max_values)
896  }
897  ros_to_numpy_type_map = {
898  Float32: [np.float32, np.int8, np.int16, np.uint8, np.uint16],
899  Float64: [np.float32, np.float64, np.int8, np.int16, np.int32, np.uint8, np.uint16, np.uint32],
900  Int8: [np.int8],
901  Int16: [np.int8, np.int16, np.uint8],
902  Int32: [np.int8, np.int16, np.int32, np.uint8, np.uint16],
903  Int64: [np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32],
904  UInt8: [np.uint8],
905  UInt16: [np.uint8, np.uint16],
906  UInt32: [np.uint8, np.uint16, np.uint32],
907  UInt64: [np.uint8, np.uint16, np.uint32, np.uint64],
908  Byte: [np.int8],
909  Char: [np.uint8],
910  }
911  for ros_type, valid_numpy_types in ros_to_numpy_type_map.items():
912  for numpy_type in valid_numpy_types:
913  for value in numeric_limits[numpy_type]:
914  expected_message = ros_type(data=numpy_type(value))
915  dictionary = {'data': numpy_type(value)}
916  message = message_converter.convert_dictionary_to_ros_message(expected_message._type, dictionary)
917  expected_message = serialize_deserialize(expected_message)
918  self.assertEqual(message, expected_message)
919 
920  for wrong_numpy_type in [t for t in numpy_numeric_types if t not in valid_numpy_types]:
921  for value in numeric_limits[wrong_numpy_type]:
922  with self.assertRaises(TypeError) as context:
923  expected_message = ros_type(data=wrong_numpy_type(value))
924  dictionary = {'data': wrong_numpy_type(value)}
925  message_converter.convert_dictionary_to_ros_message(expected_message._type, dictionary)
926  self.assertTrue("Field 'data' has wrong type" in context.exception.args[0])
927 
928  # -------- Tests for None: --------
930  from rospy_message_converter.msg import TestArray
931 
932  expected_message = TestArray(data=[])
933  dictionary = {'data': None}
934  message = message_converter.convert_dictionary_to_ros_message('rospy_message_converter/TestArray', dictionary)
935  expected_message = serialize_deserialize(expected_message)
936  self.assertEqual(message, expected_message)
937 
939  from rospy_message_converter.msg import Uint8ArrayTestMessage
940 
941  expected_message = Uint8ArrayTestMessage(data=bytes())
942  dictionary = {'data': None}
943  message = message_converter.convert_dictionary_to_ros_message(
944  'rospy_message_converter/Uint8ArrayTestMessage', dictionary
945  )
946  expected_message = serialize_deserialize(expected_message)
947  self.assertEqual(message, expected_message)
948 
950  from rospy_message_converter.msg import Uint8Array3TestMessage
951 
952  expected_message = Uint8Array3TestMessage(data=[0, 0, 0])
953  dictionary = {'data': None}
954  message = message_converter.convert_dictionary_to_ros_message(
955  'rospy_message_converter/Uint8Array3TestMessage', dictionary
956  )
957  expected_message = serialize_deserialize(expected_message)
958  self.assertEqual(message, expected_message)
959 
961  from std_msgs.msg import Bool
962 
963  expected_message = Bool(data=False)
964  dictionary = {'data': None}
965  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Bool', dictionary)
966  expected_message = serialize_deserialize(expected_message)
967  self.assertEqual(message, expected_message)
968 
970  from std_msgs.msg import Byte
971 
972  expected_message = Byte(data=0)
973  dictionary = {'data': None}
974  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Byte', dictionary)
975  expected_message = serialize_deserialize(expected_message)
976  self.assertEqual(message, expected_message)
977 
979  from std_msgs.msg import Char
980 
981  expected_message = Char(data=0)
982  dictionary = {'data': None}
983  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Char', dictionary)
984  expected_message = serialize_deserialize(expected_message)
985  self.assertEqual(message, expected_message)
986 
988  from std_msgs.msg import Duration
989 
990  expected_message = Duration()
991  dictionary = {'data': None}
992  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Duration', dictionary)
993  expected_message = serialize_deserialize(expected_message)
994  self.assertEqual(message, expected_message)
995 
997  from std_msgs.msg import Duration
998 
999  expected_message = Duration(data=rospy.rostime.Duration())
1000  dictionary = {'data': {'secs': None, 'nsecs': None}}
1001  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Duration', dictionary)
1002  expected_message = serialize_deserialize(expected_message)
1003  self.assertEqual(message, expected_message)
1004 
1006  from std_msgs.msg import Float32
1007 
1008  expected_message = Float32(data=0.0)
1009  dictionary = {'data': None}
1010  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Float32', dictionary)
1011  expected_message = serialize_deserialize(expected_message)
1012  self.assertEqual(message, expected_message)
1013 
1015  from std_msgs.msg import Float64
1016 
1017  expected_message = Float64(data=0.0)
1018  dictionary = {'data': None}
1019  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Float64', dictionary)
1020  expected_message = serialize_deserialize(expected_message)
1021  self.assertEqual(message, expected_message)
1022 
1024  from std_msgs.msg import Header
1025 
1026  expected_message = Header(stamp=rospy.Time(), frame_id='', seq=12)
1027  dictionary = {'stamp': {'secs': None, 'nsecs': 0.0}, 'frame_id': None, 'seq': expected_message.seq}
1028  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Header', dictionary)
1029  expected_message = serialize_deserialize(expected_message)
1030  self.assertEqual(message, expected_message)
1031 
1033  from std_msgs.msg import Int8
1034 
1035  expected_message = Int8(data=0)
1036  dictionary = {'data': None}
1037  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Int8', dictionary)
1038  expected_message = serialize_deserialize(expected_message)
1039  self.assertEqual(message, expected_message)
1040 
1042  from std_msgs.msg import UInt8
1043 
1044  expected_message = UInt8(data=0)
1045  dictionary = {'data': None}
1046  message = message_converter.convert_dictionary_to_ros_message('std_msgs/UInt8', dictionary)
1047  expected_message = serialize_deserialize(expected_message)
1048  self.assertEqual(message, expected_message)
1049 
1051  from std_msgs.msg import Int16
1052 
1053  expected_message = Int16(data=0)
1054  dictionary = {'data': None}
1055  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Int16', dictionary)
1056  expected_message = serialize_deserialize(expected_message)
1057  self.assertEqual(message, expected_message)
1058 
1060  from std_msgs.msg import UInt16
1061 
1062  expected_message = UInt16(data=0)
1063  dictionary = {'data': None}
1064  message = message_converter.convert_dictionary_to_ros_message('std_msgs/UInt16', dictionary)
1065  expected_message = serialize_deserialize(expected_message)
1066  self.assertEqual(message, expected_message)
1067 
1069  from std_msgs.msg import Int32
1070 
1071  expected_message = Int32(data=0)
1072  dictionary = {'data': None}
1073  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Int32', dictionary)
1074  expected_message = serialize_deserialize(expected_message)
1075  self.assertEqual(message, expected_message)
1076 
1078  from std_msgs.msg import UInt32
1079 
1080  expected_message = UInt32(data=0)
1081  dictionary = {'data': None}
1082  message = message_converter.convert_dictionary_to_ros_message('std_msgs/UInt32', dictionary)
1083  expected_message = serialize_deserialize(expected_message)
1084  self.assertEqual(message, expected_message)
1085 
1087  from std_msgs.msg import Int64
1088 
1089  expected_message = Int64(data=0)
1090  dictionary = {'data': None}
1091  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Int64', dictionary)
1092  expected_message = serialize_deserialize(expected_message)
1093  self.assertEqual(message, expected_message)
1094 
1096  from std_msgs.msg import UInt64
1097 
1098  expected_message = UInt64(data=0)
1099  dictionary = {'data': None}
1100  message = message_converter.convert_dictionary_to_ros_message('std_msgs/UInt64', dictionary)
1101  expected_message = serialize_deserialize(expected_message)
1102  self.assertEqual(message, expected_message)
1103 
1105  from std_msgs.msg import String
1106 
1107  expected_message = String(data='')
1108  dictionary = {'data': None}
1109  message = message_converter.convert_dictionary_to_ros_message('std_msgs/String', dictionary)
1110  expected_message = serialize_deserialize(expected_message)
1111  self.assertEqual(message, expected_message)
1112 
1114  from std_msgs.msg import Time
1115 
1116  expected_message = Time()
1117  dictionary = {'data': None}
1118  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Time', dictionary)
1119  expected_message = serialize_deserialize(expected_message)
1120  self.assertEqual(message, expected_message)
1121 
1123  from std_msgs.msg import Time
1124 
1125  test_time = rospy.Time.from_sec(0.123456)
1126  expected_message = Time(data=test_time)
1127  dictionary = {'data': {'secs': None, 'nsecs': test_time.nsecs}}
1128  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Time', dictionary)
1129  expected_message = serialize_deserialize(expected_message)
1130  self.assertEqual(message, expected_message)
1131 
1133  from std_msgs.msg import Float64MultiArray, MultiArrayLayout, MultiArrayDimension
1134 
1135  expected_message = Float64MultiArray(
1136  layout=MultiArrayLayout(
1137  dim=[
1138  MultiArrayDimension(label='', size=0, stride=0),
1139  MultiArrayDimension(label='Dimension2', size=90, stride=8),
1140  ],
1141  data_offset=1,
1142  ),
1143  data=[1.1, 2.2, 3.3],
1144  )
1145  dictionary = {
1146  'layout': {
1147  'dim': [
1148  None,
1149  {
1150  'label': expected_message.layout.dim[1].label,
1151  'size': expected_message.layout.dim[1].size,
1152  'stride': expected_message.layout.dim[1].stride,
1153  },
1154  ],
1155  'data_offset': expected_message.layout.data_offset,
1156  },
1157  'data': expected_message.data,
1158  }
1159  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Float64MultiArray', dictionary)
1160  expected_message = serialize_deserialize(expected_message)
1161  self.assertEqual(message, expected_message)
1162 
1163 
1165  """
1166  Serialize and then deserialize a message. This simulates sending a message
1167  between ROS nodes and makes sure that the ROS messages being tested are
1168  actually serializable, and are in the same format as they would be received
1169  over the network. In rospy, it is possible to assign an illegal data type
1170  to a message field (for example, `message = String(data=42)`), but trying
1171  to publish this message will throw `SerializationError: field data must be
1172  of type str`. This method will expose such bugs.
1173  """
1174  from io import BytesIO
1175 
1176  buff = BytesIO()
1177  message.serialize(buff)
1178  result = message.__class__() # create new instance of same class as message
1179  result.deserialize(buff.getvalue())
1180  return result
1181 
1182 
1183 PKG = 'rospy_message_converter'
1184 NAME = 'test_message_converter'
1185 if __name__ == '__main__':
1186  import rosunit
1187 
1188  rosunit.unitrun(PKG, NAME, TestMessageConverter)


rospy_message_converter
Author(s): Brandon Alexander
autogenerated on Thu Dec 22 2022 03:33:21