test_message_converter.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import struct
3 import unittest
4 import rospy
5 from rospy_message_converter import message_converter
6 
7 class TestMessageConverter(unittest.TestCase):
8 
10  from rospy_message_converter.msg import TestArray
11  expected_dictionary = {
12  'data': [1.1, 2.2, 3.3]
13  }
14  message = TestArray(data = expected_dictionary['data'])
15  message = serialize_deserialize(message)
16  dictionary = message_converter.convert_ros_message_to_dictionary(message)
17  self.assertEqual(dictionary, expected_dictionary)
18 
20  from std_msgs.msg import Bool
21  expected_dictionary = { 'data': True }
22  message = Bool(data=expected_dictionary['data'])
23  message = serialize_deserialize(message)
24  dictionary = message_converter.convert_ros_message_to_dictionary(message)
25  self.assertEqual(dictionary, expected_dictionary)
26 
28  from std_msgs.msg import Byte
29  expected_dictionary = { 'data': 5 }
30  message = Byte(data=expected_dictionary['data'])
31  message = serialize_deserialize(message)
32  dictionary = message_converter.convert_ros_message_to_dictionary(message)
33  self.assertEqual(dictionary, expected_dictionary)
34 
36  from std_msgs.msg import Char
37  expected_dictionary = { 'data': 99 }
38  message = Char(data=expected_dictionary['data'])
39  message = serialize_deserialize(message)
40  dictionary = message_converter.convert_ros_message_to_dictionary(message)
41  self.assertEqual(dictionary, expected_dictionary)
42 
44  from std_msgs.msg import Duration
45  duration = rospy.rostime.Duration(33, 25)
46  expected_dictionary = {
47  'data': {
48  'secs' : duration.secs,
49  'nsecs' : duration.nsecs
50  }
51  }
52  message = Duration(data=duration)
53  message = serialize_deserialize(message)
54  dictionary = message_converter.convert_ros_message_to_dictionary(message)
55  self.assertEqual(dictionary, expected_dictionary)
56 
58  from std_msgs.msg import Empty
59  expected_dictionary = {}
60  message = Empty()
61  message = serialize_deserialize(message)
62  dictionary = message_converter.convert_ros_message_to_dictionary(message)
63  self.assertEqual(dictionary, expected_dictionary)
64 
66  from std_msgs.msg import Float32
67  expected_dictionary = { 'data': struct.unpack('<f', '\x7F\x7F\xFF\xFD')[0] }
68  message = Float32(data=expected_dictionary['data'])
69  message = serialize_deserialize(message)
70  dictionary = message_converter.convert_ros_message_to_dictionary(message)
71  self.assertEqual(dictionary, expected_dictionary)
72 
74  from std_msgs.msg import Float64
75  expected_dictionary = { 'data': struct.unpack('<d', '\x7F\xEF\xFF\xFF\xFF\xFF\xFF\xFD')[0] }
76  message = Float64(data=expected_dictionary['data'])
77  message = serialize_deserialize(message)
78  dictionary = message_converter.convert_ros_message_to_dictionary(message)
79  self.assertEqual(dictionary, expected_dictionary)
80 
82  from std_msgs.msg import Header
83  from time import time
84  now_time = rospy.Time(time())
85  expected_dictionary = {
86  'stamp': { 'secs': now_time.secs, 'nsecs': now_time.nsecs },
87  'frame_id' : 'my_frame',
88  'seq': 3
89  }
90  message = Header(
91  stamp = now_time,
92  frame_id = expected_dictionary['frame_id'],
93  seq = expected_dictionary['seq']
94  )
95  message = serialize_deserialize(message)
96  dictionary = message_converter.convert_ros_message_to_dictionary(message)
97  self.assertEqual(dictionary, expected_dictionary)
98 
100  from std_msgs.msg import Int8
101  expected_dictionary = { 'data': -0x7F }
102  message = Int8(data=expected_dictionary['data'])
103  message = serialize_deserialize(message)
104  dictionary = message_converter.convert_ros_message_to_dictionary(message)
105  self.assertEqual(dictionary, expected_dictionary)
106 
108  from std_msgs.msg import UInt8
109  expected_dictionary = { 'data': 0xFF }
110  message = UInt8(data=expected_dictionary['data'])
111  message = serialize_deserialize(message)
112  dictionary = message_converter.convert_ros_message_to_dictionary(message)
113  self.assertEqual(dictionary, expected_dictionary)
114 
116  from rospy_message_converter.msg import Uint8ArrayTestMessage
117  from base64 import standard_b64encode
118  expected_data = "".join([chr(i) for i in [97, 98, 99, 100]])
119  message = Uint8ArrayTestMessage(data=expected_data)
120  message = serialize_deserialize(message)
121  dictionary = message_converter.convert_ros_message_to_dictionary(message)
122  self.assertEqual(dictionary["data"], standard_b64encode(expected_data))
123 
125  from rospy_message_converter.msg import Uint8Array3TestMessage
126  from base64 import standard_b64encode
127  expected_data = "".join([chr(i) for i in [97, 98, 99]])
128  message = Uint8Array3TestMessage(data=expected_data)
129  message = serialize_deserialize(message)
130  dictionary = message_converter.convert_ros_message_to_dictionary(message)
131  self.assertEqual(dictionary["data"], standard_b64encode(expected_data))
132 
134  from std_msgs.msg import Int16
135  expected_dictionary = { 'data': -0x7FFF }
136  message = Int16(data=expected_dictionary['data'])
137  message = serialize_deserialize(message)
138  dictionary = message_converter.convert_ros_message_to_dictionary(message)
139  self.assertEqual(dictionary, expected_dictionary)
140 
142  from std_msgs.msg import UInt16
143  expected_dictionary = { 'data': 0xFFFF }
144  message = UInt16(data=expected_dictionary['data'])
145  message = serialize_deserialize(message)
146  dictionary = message_converter.convert_ros_message_to_dictionary(message)
147  self.assertEqual(dictionary, expected_dictionary)
148 
150  from std_msgs.msg import Int32
151  expected_dictionary = { 'data': -0x7FFFFFFF }
152  message = Int32(data=expected_dictionary['data'])
153  message = serialize_deserialize(message)
154  dictionary = message_converter.convert_ros_message_to_dictionary(message)
155  self.assertEqual(dictionary, expected_dictionary)
156 
158  from std_msgs.msg import UInt32
159  expected_dictionary = { 'data': 0xFFFFFFFF }
160  message = UInt32(data=expected_dictionary['data'])
161  message = serialize_deserialize(message)
162  dictionary = message_converter.convert_ros_message_to_dictionary(message)
163  self.assertEqual(dictionary, expected_dictionary)
164 
166  from std_msgs.msg import Int64
167  expected_dictionary = { 'data': -0x7FFFFFFFFFFFFFFF }
168  message = Int64(data=expected_dictionary['data'])
169  message = serialize_deserialize(message)
170  dictionary = message_converter.convert_ros_message_to_dictionary(message)
171  self.assertEqual(dictionary, expected_dictionary)
172 
174  from std_msgs.msg import UInt64
175  expected_dictionary = { 'data': 0xFFFFFFFFFFFFFFFF }
176  message = UInt64(data=expected_dictionary['data'])
177  message = serialize_deserialize(message)
178  dictionary = message_converter.convert_ros_message_to_dictionary(message)
179  self.assertEqual(dictionary, expected_dictionary)
180 
182  from std_msgs.msg import String
183  expected_dictionary = { 'data': 'Hello' }
184  message = String(data=expected_dictionary['data'])
185  message = serialize_deserialize(message)
186  dictionary = message_converter.convert_ros_message_to_dictionary(message)
187  self.assertEqual(dictionary, expected_dictionary)
188 
190  from std_msgs.msg import Time
191  from time import time
192  now_time = rospy.Time(time())
193  expected_dictionary = {
194  'data': { 'secs': now_time.secs, 'nsecs': now_time.nsecs }
195  }
196  message = Time(data=now_time)
197  message = serialize_deserialize(message)
198  dictionary = message_converter.convert_ros_message_to_dictionary(message)
199  self.assertEqual(dictionary, expected_dictionary)
200 
202  from std_msgs.msg import Float64MultiArray, MultiArrayLayout, MultiArrayDimension
203  expected_dictionary = {
204  'layout': {
205  'dim': [
206  { 'label': 'Dimension1', 'size': 12, 'stride': 7 },
207  { 'label': 'Dimension2', 'size': 24, 'stride': 14 }
208  ],
209  'data_offset': 0
210  },
211  'data': [1.1, 2.2, 3.3]
212  }
213  dimension1 = MultiArrayDimension(
214  label = expected_dictionary['layout']['dim'][0]['label'],
215  size = expected_dictionary['layout']['dim'][0]['size'],
216  stride = expected_dictionary['layout']['dim'][0]['stride']
217  )
218  dimension2 = MultiArrayDimension(
219  label = expected_dictionary['layout']['dim'][1]['label'],
220  size = expected_dictionary['layout']['dim'][1]['size'],
221  stride = expected_dictionary['layout']['dim'][1]['stride']
222  )
223  layout = MultiArrayLayout(
224  dim = [dimension1, dimension2],
225  data_offset = expected_dictionary['layout']['data_offset']
226  )
227  message = Float64MultiArray(
228  layout = layout,
229  data = expected_dictionary['data']
230  )
231  message = serialize_deserialize(message)
232  dictionary = message_converter.convert_ros_message_to_dictionary(message)
233  self.assertEqual(dictionary, expected_dictionary)
234 
236  from rospy_message_converter.msg import TestArray
237  expected_message = TestArray(data = [1.1, 2.2, 3.3, 4.4])
238  dictionary = { 'data': expected_message.data }
239  message = message_converter.convert_dictionary_to_ros_message('rospy_message_converter/TestArray', dictionary)
240  expected_message = serialize_deserialize(expected_message)
241  self.assertEqual(message, expected_message)
242 
244  from std_msgs.msg import Bool
245  expected_message = Bool(data = True)
246  dictionary = { 'data': expected_message.data }
247  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Bool', dictionary)
248  expected_message = serialize_deserialize(expected_message)
249  self.assertEqual(message, expected_message)
250 
252  from std_msgs.msg import Byte
253  expected_message = Byte(data = 3)
254  dictionary = { 'data': expected_message.data }
255  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Byte', dictionary)
256  expected_message = serialize_deserialize(expected_message)
257  self.assertEqual(message, expected_message)
258 
260  from std_msgs.msg import Char
261  expected_message = Char(data = 99)
262  dictionary = { 'data': expected_message.data }
263  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Char', dictionary)
264  expected_message = serialize_deserialize(expected_message)
265  self.assertEqual(message, expected_message)
266 
268  from std_msgs.msg import Duration
269  duration = rospy.rostime.Duration(33, 25)
270  expected_message = Duration(data = duration)
271  dictionary = {
272  'data': {
273  'secs' : duration.secs,
274  'nsecs' : duration.nsecs
275  }
276  }
277  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Duration', dictionary)
278  expected_message = serialize_deserialize(expected_message)
279  self.assertEqual(message, expected_message)
280 
282  from std_msgs.msg import Empty
283  expected_message = Empty()
284  dictionary = {}
285  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Empty', dictionary)
286  expected_message = serialize_deserialize(expected_message)
287  self.assertEqual(message, expected_message)
288 
290  from std_msgs.msg import Empty
291  dictionary = {"additional_args": "should raise value error"}
292  with self.assertRaises(ValueError) as context:
293  message_converter.convert_dictionary_to_ros_message('std_msgs/Empty', dictionary)
294  self.assertTrue("has no field named" in context.exception.message)
295 
297  from std_msgs.msg import Empty
298  expected_message = Empty()
299  dictionary = {"additional_args": "should be ignored"}
300  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Empty', dictionary, strict_mode=False)
301  expected_message = serialize_deserialize(expected_message)
302  self.assertEqual(message, expected_message)
303 
305  from std_msgs.msg import Float32
306  expected_message = Float32(data = struct.unpack('<f', '\x7F\x7F\xFF\xFD')[0])
307  dictionary = { 'data': expected_message.data }
308  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Float32', dictionary)
309  expected_message = serialize_deserialize(expected_message)
310  self.assertEqual(message, expected_message)
311 
313  from std_msgs.msg import Float64
314  expected_message = Float64(data = struct.unpack('<d', '\x7F\xEF\xFF\xFF\xFF\xFF\xFF\xFD')[0])
315  dictionary = { 'data': expected_message.data }
316  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Float64', dictionary)
317  expected_message = serialize_deserialize(expected_message)
318  self.assertEqual(message, expected_message)
319 
321  from std_msgs.msg import Header
322  from time import time
323  now_time = rospy.Time(time())
324  expected_message = Header(
325  stamp = now_time,
326  frame_id = 'my_frame',
327  seq = 12
328  )
329  dictionary = {
330  'stamp': { 'secs': now_time.secs, 'nsecs': now_time.nsecs },
331  'frame_id' : expected_message.frame_id,
332  'seq': expected_message.seq
333  }
334  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Header', dictionary)
335  expected_message = serialize_deserialize(expected_message)
336  self.assertEqual(message, expected_message)
337 
339  from std_msgs.msg import Header
340  from time import time
341  now_time = rospy.Time(time())
342  expected_message = Header(
343  stamp = now_time,
344  frame_id = 'my_frame',
345  seq = 12
346  )
347  dictionary = {
348  'stamp': { 'secs': now_time.secs, 'nsecs': now_time.nsecs },
349  'frame_id' : expected_message.frame_id,
350  'seq': expected_message.seq
351  }
352  message = message_converter.convert_dictionary_to_ros_message('Header', dictionary)
353  expected_message = serialize_deserialize(expected_message)
354  self.assertEqual(message, expected_message)
355 
357  from std_msgs.msg import Int8
358  expected_message = Int8(data = -0x7F)
359  dictionary = { 'data': expected_message.data }
360  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Int8', dictionary)
361  expected_message = serialize_deserialize(expected_message)
362  self.assertEqual(message, expected_message)
363 
365  from std_msgs.msg import UInt8
366  expected_message = UInt8(data = 0xFF)
367  dictionary = { 'data': expected_message.data }
368  message = message_converter.convert_dictionary_to_ros_message('std_msgs/UInt8', dictionary)
369  expected_message = serialize_deserialize(expected_message)
370  self.assertEqual(message, expected_message)
371 
373  from std_msgs.msg import Int16
374  expected_message = Int16(data = -0x7FFF)
375  dictionary = { 'data': expected_message.data }
376  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Int16', dictionary)
377  expected_message = serialize_deserialize(expected_message)
378  self.assertEqual(message, expected_message)
379 
381  from std_msgs.msg import UInt16
382  expected_message = UInt16(data = 0xFFFF)
383  dictionary = { 'data': expected_message.data }
384  message = message_converter.convert_dictionary_to_ros_message('std_msgs/UInt16', dictionary)
385  expected_message = serialize_deserialize(expected_message)
386  self.assertEqual(message, expected_message)
387 
389  from std_msgs.msg import Int32
390  expected_message = Int32(data = -0x7FFFFFFF)
391  dictionary = { 'data': expected_message.data }
392  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Int32', dictionary)
393  expected_message = serialize_deserialize(expected_message)
394  self.assertEqual(message, expected_message)
395 
397  from std_msgs.msg import UInt32
398  expected_message = UInt32(data = 0xFFFFFFFF)
399  dictionary = { 'data': expected_message.data }
400  message = message_converter.convert_dictionary_to_ros_message('std_msgs/UInt32', dictionary)
401  expected_message = serialize_deserialize(expected_message)
402  self.assertEqual(message, expected_message)
403 
405  from std_msgs.msg import Int64
406  expected_message = Int64(data = -0x7FFFFFFFFFFFFFFF)
407  dictionary = { 'data': expected_message.data }
408  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Int64', dictionary)
409  expected_message = serialize_deserialize(expected_message)
410  self.assertEqual(message, expected_message)
411 
413  from std_msgs.msg import UInt64
414  expected_message = UInt64(data = 0xFFFFFFFFFFFFFFFF)
415  dictionary = { 'data': expected_message.data }
416  message = message_converter.convert_dictionary_to_ros_message('std_msgs/UInt64', dictionary)
417  expected_message = serialize_deserialize(expected_message)
418  self.assertEqual(message, expected_message)
419 
421  from std_msgs.msg import String
422  expected_message = String(data = 'Hello')
423  dictionary = { 'data': expected_message.data }
424  message = message_converter.convert_dictionary_to_ros_message('std_msgs/String', dictionary)
425  expected_message = serialize_deserialize(expected_message)
426  self.assertEqual(message, expected_message)
427 
429  from std_msgs.msg import String
430  expected_message = String(data = 'Hello')
431  dictionary = { 'data': unicode(expected_message.data) }
432  message = message_converter.convert_dictionary_to_ros_message('std_msgs/String', dictionary)
433  expected_message = serialize_deserialize(expected_message)
434  self.assertEqual(message.data,expected_message.data)
435  self.assertEqual(type(message.data),type(expected_message.data))
436 
438  from std_msgs.msg import Time
439  from time import time
440  now_time = rospy.Time(time())
441  expected_message = Time(data=now_time)
442  dictionary = {
443  'data': {
444  'secs' : now_time.secs,
445  'nsecs' : now_time.nsecs
446  }
447  }
448  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Time', dictionary)
449  expected_message = serialize_deserialize(expected_message)
450  self.assertEqual(message, expected_message)
451 
453  from std_msgs.msg import Float64MultiArray, MultiArrayLayout, MultiArrayDimension
454  expected_message = Float64MultiArray(
455  layout = MultiArrayLayout(
456  dim = [
457  MultiArrayDimension(label = 'Dimension1', size = 12, stride = 7),
458  MultiArrayDimension(label = 'Dimension2', size = 90, stride = 8)
459  ],
460  data_offset = 1
461  ),
462  data = [1.1, 2.2, 3.3]
463  )
464  dictionary = {
465  'layout': {
466  'dim': [
467  {
468  'label' : expected_message.layout.dim[0].label,
469  'size' : expected_message.layout.dim[0].size,
470  'stride' : expected_message.layout.dim[0].stride
471  },
472  {
473  'label' : expected_message.layout.dim[1].label,
474  'size' : expected_message.layout.dim[1].size,
475  'stride' : expected_message.layout.dim[1].stride
476  }
477  ],
478  'data_offset': expected_message.layout.data_offset
479  },
480  'data': expected_message.data
481  }
482  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Float64MultiArray', dictionary)
483  expected_message = serialize_deserialize(expected_message)
484  self.assertEqual(message, expected_message)
485 
487  self.assertRaises(ValueError,
488  message_converter.convert_dictionary_to_ros_message,
489  'std_msgs/Empty',
490  {'invalid_field': 1})
491 
493  from std_srvs.srv import Empty, EmptyRequest, EmptyResponse
494  expected_req = EmptyRequest()
495  expected_res = EmptyResponse()
496  dictionary_req = {}
497  dictionary_res = {}
498  message = message_converter.convert_dictionary_to_ros_message('std_srvs/Empty', dictionary_req,
499  'request')
500  expected_req = serialize_deserialize(expected_req)
501  self.assertEqual(message, expected_req)
502  message = message_converter.convert_dictionary_to_ros_message('std_srvs/Empty', dictionary_res,
503  'response')
504  expected_res = serialize_deserialize(expected_res)
505  self.assertEqual(message, expected_res)
506 
508  from std_srvs.srv import SetBool, SetBoolRequest, SetBoolResponse
509  expected_req = SetBoolRequest(data=True)
510  expected_res = SetBoolResponse(success=True, message='Success!')
511  dictionary_req = { 'data': True }
512  dictionary_res = { 'success': True, 'message': 'Success!' }
513  message = message_converter.convert_dictionary_to_ros_message('std_srvs/SetBool', dictionary_req,
514  'request')
515  expected_req = serialize_deserialize(expected_req)
516  self.assertEqual(message, expected_req)
517  message = message_converter.convert_dictionary_to_ros_message('std_srvs/SetBool', dictionary_res,
518  'response')
519  expected_res = serialize_deserialize(expected_res)
520  self.assertEqual(message, expected_res)
521 
523  from std_srvs.srv import Trigger, TriggerRequest, TriggerResponse
524  expected_req = TriggerRequest()
525  expected_res = TriggerResponse(success=True, message='Success!')
526  dictionary_req = {}
527  dictionary_res = { 'success': True, 'message': 'Success!' }
528  message = message_converter.convert_dictionary_to_ros_message('std_srvs/Trigger', dictionary_req,
529  'request')
530  expected_req = serialize_deserialize(expected_req)
531  self.assertEqual(message, expected_req)
532  message = message_converter.convert_dictionary_to_ros_message('std_srvs/Trigger', dictionary_res,
533  'response')
534  expected_res = serialize_deserialize(expected_res)
535  self.assertEqual(message, expected_res)
536 
537 
539  """
540  Serialize and then deserialize a message. This simulates sending a message
541  between ROS nodes and makes sure that the ROS messages being tested are
542  actually serializable, and are in the same format as they would be received
543  over the network. In rospy, it is possible to assign an illegal data type
544  to a message field (for example, `message = String(data=42)`), but trying
545  to publish this message will throw `SerializationError: field data must be
546  of type str`. This method will expose such bugs.
547  """
548  from StringIO import StringIO
549  buff = StringIO()
550  message.serialize(buff)
551  result = message.__class__() # create new instance of same class as message
552  result.deserialize(buff.getvalue())
553  return result
554 
555 
556 PKG = 'rospy_message_converter'
557 NAME = 'test_message_converter'
558 if __name__ == '__main__':
559  import rosunit
560  rosunit.unitrun(PKG, NAME, TestMessageConverter)


rospy_message_converter
Author(s): Brandon Alexander
autogenerated on Thu Mar 5 2020 03:12:49