test_message_converter.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import struct
3 import unittest
4 import rospy
5 from rospy_message_converter import message_converter
6 
7 class TestMessageConverter(unittest.TestCase):
8 
10  from rospy_message_converter.msg import TestArray
11  expected_dictionary = {
12  'data': [1.1, 2.2, 3.3]
13  }
14  message = TestArray(data = expected_dictionary['data'])
15  message = serialize_deserialize(message)
16  dictionary = message_converter.convert_ros_message_to_dictionary(message)
17  self.assertEqual(dictionary, expected_dictionary)
18 
20  from std_msgs.msg import Bool
21  expected_dictionary = { 'data': True }
22  message = Bool(data=expected_dictionary['data'])
23  message = serialize_deserialize(message)
24  dictionary = message_converter.convert_ros_message_to_dictionary(message)
25  self.assertEqual(dictionary, expected_dictionary)
26 
28  from std_msgs.msg import Byte
29  expected_dictionary = { 'data': 5 }
30  message = Byte(data=expected_dictionary['data'])
31  message = serialize_deserialize(message)
32  dictionary = message_converter.convert_ros_message_to_dictionary(message)
33  self.assertEqual(dictionary, expected_dictionary)
34 
36  from std_msgs.msg import Char
37  expected_dictionary = { 'data': 99 }
38  message = Char(data=expected_dictionary['data'])
39  message = serialize_deserialize(message)
40  dictionary = message_converter.convert_ros_message_to_dictionary(message)
41  self.assertEqual(dictionary, expected_dictionary)
42 
44  from std_msgs.msg import Duration
45  duration = rospy.rostime.Duration(33, 25)
46  expected_dictionary = {
47  'data': {
48  'secs' : duration.secs,
49  'nsecs' : duration.nsecs
50  }
51  }
52  message = Duration(data=duration)
53  message = serialize_deserialize(message)
54  dictionary = message_converter.convert_ros_message_to_dictionary(message)
55  self.assertEqual(dictionary, expected_dictionary)
56 
58  from std_msgs.msg import Empty
59  expected_dictionary = {}
60  message = Empty()
61  message = serialize_deserialize(message)
62  dictionary = message_converter.convert_ros_message_to_dictionary(message)
63  self.assertEqual(dictionary, expected_dictionary)
64 
66  from std_msgs.msg import Float32
67  expected_dictionary = { 'data': struct.unpack('<f', '\x7F\x7F\xFF\xFD')[0] }
68  message = Float32(data=expected_dictionary['data'])
69  message = serialize_deserialize(message)
70  dictionary = message_converter.convert_ros_message_to_dictionary(message)
71  self.assertEqual(dictionary, expected_dictionary)
72 
74  from std_msgs.msg import Float64
75  expected_dictionary = { 'data': struct.unpack('<d', '\x7F\xEF\xFF\xFF\xFF\xFF\xFF\xFD')[0] }
76  message = Float64(data=expected_dictionary['data'])
77  message = serialize_deserialize(message)
78  dictionary = message_converter.convert_ros_message_to_dictionary(message)
79  self.assertEqual(dictionary, expected_dictionary)
80 
82  from std_msgs.msg import Header
83  from time import time
84  now_time = rospy.Time(time())
85  expected_dictionary = {
86  'stamp': { 'secs': now_time.secs, 'nsecs': now_time.nsecs },
87  'frame_id' : 'my_frame',
88  'seq': 3
89  }
90  message = Header(
91  stamp = now_time,
92  frame_id = expected_dictionary['frame_id'],
93  seq = expected_dictionary['seq']
94  )
95  message = serialize_deserialize(message)
96  dictionary = message_converter.convert_ros_message_to_dictionary(message)
97  self.assertEqual(dictionary, expected_dictionary)
98 
100  from std_msgs.msg import Int8
101  expected_dictionary = { 'data': -0x7F }
102  message = Int8(data=expected_dictionary['data'])
103  message = serialize_deserialize(message)
104  dictionary = message_converter.convert_ros_message_to_dictionary(message)
105  self.assertEqual(dictionary, expected_dictionary)
106 
108  from std_msgs.msg import UInt8
109  expected_dictionary = { 'data': 0xFF }
110  message = UInt8(data=expected_dictionary['data'])
111  message = serialize_deserialize(message)
112  dictionary = message_converter.convert_ros_message_to_dictionary(message)
113  self.assertEqual(dictionary, expected_dictionary)
114 
116  from rospy_message_converter.msg import Uint8ArrayTestMessage
117  from base64 import standard_b64encode
118  expected_data = "".join([chr(i) for i in [97, 98, 99, 100]])
119  message = Uint8ArrayTestMessage(data=expected_data)
120  message = serialize_deserialize(message)
121  dictionary = message_converter.convert_ros_message_to_dictionary(message)
122  self.assertEqual(dictionary["data"], standard_b64encode(expected_data))
123 
125  from rospy_message_converter.msg import Uint8Array3TestMessage
126  from base64 import standard_b64encode
127  expected_data = "".join([chr(i) for i in [97, 98, 99]])
128  message = Uint8Array3TestMessage(data=expected_data)
129  message = serialize_deserialize(message)
130  dictionary = message_converter.convert_ros_message_to_dictionary(message)
131  self.assertEqual(dictionary["data"], standard_b64encode(expected_data))
132 
134  from std_msgs.msg import Int16
135  expected_dictionary = { 'data': -0x7FFF }
136  message = Int16(data=expected_dictionary['data'])
137  message = serialize_deserialize(message)
138  dictionary = message_converter.convert_ros_message_to_dictionary(message)
139  self.assertEqual(dictionary, expected_dictionary)
140 
142  from std_msgs.msg import UInt16
143  expected_dictionary = { 'data': 0xFFFF }
144  message = UInt16(data=expected_dictionary['data'])
145  message = serialize_deserialize(message)
146  dictionary = message_converter.convert_ros_message_to_dictionary(message)
147  self.assertEqual(dictionary, expected_dictionary)
148 
150  from std_msgs.msg import Int32
151  expected_dictionary = { 'data': -0x7FFFFFFF }
152  message = Int32(data=expected_dictionary['data'])
153  message = serialize_deserialize(message)
154  dictionary = message_converter.convert_ros_message_to_dictionary(message)
155  self.assertEqual(dictionary, expected_dictionary)
156 
158  from std_msgs.msg import UInt32
159  expected_dictionary = { 'data': 0xFFFFFFFF }
160  message = UInt32(data=expected_dictionary['data'])
161  message = serialize_deserialize(message)
162  dictionary = message_converter.convert_ros_message_to_dictionary(message)
163  self.assertEqual(dictionary, expected_dictionary)
164 
166  from std_msgs.msg import Int64
167  expected_dictionary = { 'data': -0x7FFFFFFFFFFFFFFF }
168  message = Int64(data=expected_dictionary['data'])
169  message = serialize_deserialize(message)
170  dictionary = message_converter.convert_ros_message_to_dictionary(message)
171  self.assertEqual(dictionary, expected_dictionary)
172 
174  from std_msgs.msg import UInt64
175  expected_dictionary = { 'data': 0xFFFFFFFFFFFFFFFF }
176  message = UInt64(data=expected_dictionary['data'])
177  message = serialize_deserialize(message)
178  dictionary = message_converter.convert_ros_message_to_dictionary(message)
179  self.assertEqual(dictionary, expected_dictionary)
180 
182  from std_msgs.msg import String
183  expected_dictionary = { 'data': 'Hello' }
184  message = String(data=expected_dictionary['data'])
185  message = serialize_deserialize(message)
186  dictionary = message_converter.convert_ros_message_to_dictionary(message)
187  self.assertEqual(dictionary, expected_dictionary)
188 
190  from std_msgs.msg import Time
191  from time import time
192  now_time = rospy.Time(time())
193  expected_dictionary = {
194  'data': { 'secs': now_time.secs, 'nsecs': now_time.nsecs }
195  }
196  message = Time(data=now_time)
197  message = serialize_deserialize(message)
198  dictionary = message_converter.convert_ros_message_to_dictionary(message)
199  self.assertEqual(dictionary, expected_dictionary)
200 
202  from std_msgs.msg import Float64MultiArray, MultiArrayLayout, MultiArrayDimension
203  expected_dictionary = {
204  'layout': {
205  'dim': [
206  { 'label': 'Dimension1', 'size': 12, 'stride': 7 },
207  { 'label': 'Dimension2', 'size': 24, 'stride': 14 }
208  ],
209  'data_offset': 0
210  },
211  'data': [1.1, 2.2, 3.3]
212  }
213  dimension1 = MultiArrayDimension(
214  label = expected_dictionary['layout']['dim'][0]['label'],
215  size = expected_dictionary['layout']['dim'][0]['size'],
216  stride = expected_dictionary['layout']['dim'][0]['stride']
217  )
218  dimension2 = MultiArrayDimension(
219  label = expected_dictionary['layout']['dim'][1]['label'],
220  size = expected_dictionary['layout']['dim'][1]['size'],
221  stride = expected_dictionary['layout']['dim'][1]['stride']
222  )
223  layout = MultiArrayLayout(
224  dim = [dimension1, dimension2],
225  data_offset = expected_dictionary['layout']['data_offset']
226  )
227  message = Float64MultiArray(
228  layout = layout,
229  data = expected_dictionary['data']
230  )
231  message = serialize_deserialize(message)
232  dictionary = message_converter.convert_ros_message_to_dictionary(message)
233  self.assertEqual(dictionary, expected_dictionary)
234 
236  from rospy_message_converter.msg import TestArray
237  expected_message = TestArray(data = [1.1, 2.2, 3.3, 4.4])
238  dictionary = { 'data': expected_message.data }
239  message = message_converter.convert_dictionary_to_ros_message('rospy_message_converter/TestArray', dictionary)
240  expected_message = serialize_deserialize(expected_message)
241  self.assertEqual(message, expected_message)
242 
244  from std_msgs.msg import Bool
245  expected_message = Bool(data = True)
246  dictionary = { 'data': expected_message.data }
247  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Bool', dictionary)
248  expected_message = serialize_deserialize(expected_message)
249  self.assertEqual(message, expected_message)
250 
252  from std_msgs.msg import Byte
253  expected_message = Byte(data = 3)
254  dictionary = { 'data': expected_message.data }
255  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Byte', dictionary)
256  expected_message = serialize_deserialize(expected_message)
257  self.assertEqual(message, expected_message)
258 
260  from std_msgs.msg import Char
261  expected_message = Char(data = 99)
262  dictionary = { 'data': expected_message.data }
263  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Char', dictionary)
264  expected_message = serialize_deserialize(expected_message)
265  self.assertEqual(message, expected_message)
266 
268  from std_msgs.msg import Duration
269  duration = rospy.rostime.Duration(33, 25)
270  expected_message = Duration(data = duration)
271  dictionary = {
272  'data': {
273  'secs' : duration.secs,
274  'nsecs' : duration.nsecs
275  }
276  }
277  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Duration', dictionary)
278  expected_message = serialize_deserialize(expected_message)
279  self.assertEqual(message, expected_message)
280 
282  from std_msgs.msg import Empty
283  expected_message = Empty()
284  dictionary = {}
285  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Empty', dictionary)
286  expected_message = serialize_deserialize(expected_message)
287  self.assertEqual(message, expected_message)
288 
290  from std_msgs.msg import Float32
291  expected_message = Float32(data = struct.unpack('<f', '\x7F\x7F\xFF\xFD')[0])
292  dictionary = { 'data': expected_message.data }
293  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Float32', dictionary)
294  expected_message = serialize_deserialize(expected_message)
295  self.assertEqual(message, expected_message)
296 
298  from std_msgs.msg import Float64
299  expected_message = Float64(data = struct.unpack('<d', '\x7F\xEF\xFF\xFF\xFF\xFF\xFF\xFD')[0])
300  dictionary = { 'data': expected_message.data }
301  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Float64', dictionary)
302  expected_message = serialize_deserialize(expected_message)
303  self.assertEqual(message, expected_message)
304 
306  from std_msgs.msg import Header
307  from time import time
308  now_time = rospy.Time(time())
309  expected_message = Header(
310  stamp = now_time,
311  frame_id = 'my_frame',
312  seq = 12
313  )
314  dictionary = {
315  'stamp': { 'secs': now_time.secs, 'nsecs': now_time.nsecs },
316  'frame_id' : expected_message.frame_id,
317  'seq': expected_message.seq
318  }
319  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Header', dictionary)
320  expected_message = serialize_deserialize(expected_message)
321  self.assertEqual(message, expected_message)
322 
324  from std_msgs.msg import Header
325  from time import time
326  now_time = rospy.Time(time())
327  expected_message = Header(
328  stamp = now_time,
329  frame_id = 'my_frame',
330  seq = 12
331  )
332  dictionary = {
333  'stamp': { 'secs': now_time.secs, 'nsecs': now_time.nsecs },
334  'frame_id' : expected_message.frame_id,
335  'seq': expected_message.seq
336  }
337  message = message_converter.convert_dictionary_to_ros_message('Header', dictionary)
338  expected_message = serialize_deserialize(expected_message)
339  self.assertEqual(message, expected_message)
340 
342  from std_msgs.msg import Int8
343  expected_message = Int8(data = -0x7F)
344  dictionary = { 'data': expected_message.data }
345  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Int8', dictionary)
346  expected_message = serialize_deserialize(expected_message)
347  self.assertEqual(message, expected_message)
348 
350  from std_msgs.msg import UInt8
351  expected_message = UInt8(data = 0xFF)
352  dictionary = { 'data': expected_message.data }
353  message = message_converter.convert_dictionary_to_ros_message('std_msgs/UInt8', dictionary)
354  expected_message = serialize_deserialize(expected_message)
355  self.assertEqual(message, expected_message)
356 
358  from std_msgs.msg import Int16
359  expected_message = Int16(data = -0x7FFF)
360  dictionary = { 'data': expected_message.data }
361  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Int16', dictionary)
362  expected_message = serialize_deserialize(expected_message)
363  self.assertEqual(message, expected_message)
364 
366  from std_msgs.msg import UInt16
367  expected_message = UInt16(data = 0xFFFF)
368  dictionary = { 'data': expected_message.data }
369  message = message_converter.convert_dictionary_to_ros_message('std_msgs/UInt16', dictionary)
370  expected_message = serialize_deserialize(expected_message)
371  self.assertEqual(message, expected_message)
372 
374  from std_msgs.msg import Int32
375  expected_message = Int32(data = -0x7FFFFFFF)
376  dictionary = { 'data': expected_message.data }
377  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Int32', dictionary)
378  expected_message = serialize_deserialize(expected_message)
379  self.assertEqual(message, expected_message)
380 
382  from std_msgs.msg import UInt32
383  expected_message = UInt32(data = 0xFFFFFFFF)
384  dictionary = { 'data': expected_message.data }
385  message = message_converter.convert_dictionary_to_ros_message('std_msgs/UInt32', dictionary)
386  expected_message = serialize_deserialize(expected_message)
387  self.assertEqual(message, expected_message)
388 
390  from std_msgs.msg import Int64
391  expected_message = Int64(data = -0x7FFFFFFFFFFFFFFF)
392  dictionary = { 'data': expected_message.data }
393  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Int64', dictionary)
394  expected_message = serialize_deserialize(expected_message)
395  self.assertEqual(message, expected_message)
396 
398  from std_msgs.msg import UInt64
399  expected_message = UInt64(data = 0xFFFFFFFFFFFFFFFF)
400  dictionary = { 'data': expected_message.data }
401  message = message_converter.convert_dictionary_to_ros_message('std_msgs/UInt64', dictionary)
402  expected_message = serialize_deserialize(expected_message)
403  self.assertEqual(message, expected_message)
404 
406  from std_msgs.msg import String
407  expected_message = String(data = 'Hello')
408  dictionary = { 'data': expected_message.data }
409  message = message_converter.convert_dictionary_to_ros_message('std_msgs/String', dictionary)
410  expected_message = serialize_deserialize(expected_message)
411  self.assertEqual(message, expected_message)
412 
414  from std_msgs.msg import String
415  expected_message = String(data = 'Hello')
416  dictionary = { 'data': unicode(expected_message.data) }
417  message = message_converter.convert_dictionary_to_ros_message('std_msgs/String', dictionary)
418  expected_message = serialize_deserialize(expected_message)
419  self.assertEqual(message.data,expected_message.data)
420  self.assertEqual(type(message.data),type(expected_message.data))
421 
423  from std_msgs.msg import Time
424  from time import time
425  now_time = rospy.Time(time())
426  expected_message = Time(data=now_time)
427  dictionary = {
428  'data': {
429  'secs' : now_time.secs,
430  'nsecs' : now_time.nsecs
431  }
432  }
433  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Time', dictionary)
434  expected_message = serialize_deserialize(expected_message)
435  self.assertEqual(message, expected_message)
436 
438  from std_msgs.msg import Float64MultiArray, MultiArrayLayout, MultiArrayDimension
439  expected_message = Float64MultiArray(
440  layout = MultiArrayLayout(
441  dim = [
442  MultiArrayDimension(label = 'Dimension1', size = 12, stride = 7),
443  MultiArrayDimension(label = 'Dimension2', size = 90, stride = 8)
444  ],
445  data_offset = 1
446  ),
447  data = [1.1, 2.2, 3.3]
448  )
449  dictionary = {
450  'layout': {
451  'dim': [
452  {
453  'label' : expected_message.layout.dim[0].label,
454  'size' : expected_message.layout.dim[0].size,
455  'stride' : expected_message.layout.dim[0].stride
456  },
457  {
458  'label' : expected_message.layout.dim[1].label,
459  'size' : expected_message.layout.dim[1].size,
460  'stride' : expected_message.layout.dim[1].stride
461  }
462  ],
463  'data_offset': expected_message.layout.data_offset
464  },
465  'data': expected_message.data
466  }
467  message = message_converter.convert_dictionary_to_ros_message('std_msgs/Float64MultiArray', dictionary)
468  expected_message = serialize_deserialize(expected_message)
469  self.assertEqual(message, expected_message)
470 
472  self.assertRaises(ValueError,
473  message_converter.convert_dictionary_to_ros_message,
474  'std_msgs/Empty',
475  {'invalid_field': 1})
476 
478  from std_srvs.srv import Empty, EmptyRequest, EmptyResponse
479  expected_req = EmptyRequest()
480  expected_res = EmptyResponse()
481  dictionary_req = {}
482  dictionary_res = {}
483  message = message_converter.convert_dictionary_to_ros_message('std_srvs/Empty', dictionary_req,
484  'request')
485  expected_req = serialize_deserialize(expected_req)
486  self.assertEqual(message, expected_req)
487  message = message_converter.convert_dictionary_to_ros_message('std_srvs/Empty', dictionary_res,
488  'response')
489  expected_res = serialize_deserialize(expected_res)
490  self.assertEqual(message, expected_res)
491 
493  from std_srvs.srv import SetBool, SetBoolRequest, SetBoolResponse
494  expected_req = SetBoolRequest(data=True)
495  expected_res = SetBoolResponse(success=True, message='Success!')
496  dictionary_req = { 'data': True }
497  dictionary_res = { 'success': True, 'message': 'Success!' }
498  message = message_converter.convert_dictionary_to_ros_message('std_srvs/SetBool', dictionary_req,
499  'request')
500  expected_req = serialize_deserialize(expected_req)
501  self.assertEqual(message, expected_req)
502  message = message_converter.convert_dictionary_to_ros_message('std_srvs/SetBool', dictionary_res,
503  'response')
504  expected_res = serialize_deserialize(expected_res)
505  self.assertEqual(message, expected_res)
506 
508  from std_srvs.srv import Trigger, TriggerRequest, TriggerResponse
509  expected_req = TriggerRequest()
510  expected_res = TriggerResponse(success=True, message='Success!')
511  dictionary_req = {}
512  dictionary_res = { 'success': True, 'message': 'Success!' }
513  message = message_converter.convert_dictionary_to_ros_message('std_srvs/Trigger', dictionary_req,
514  'request')
515  expected_req = serialize_deserialize(expected_req)
516  self.assertEqual(message, expected_req)
517  message = message_converter.convert_dictionary_to_ros_message('std_srvs/Trigger', dictionary_res,
518  'response')
519  expected_res = serialize_deserialize(expected_res)
520  self.assertEqual(message, expected_res)
521 
522 
524  """
525  Serialize and then deserialize a message. This simulates sending a message
526  between ROS nodes and makes sure that the ROS messages being tested are
527  actually serializable, and are in the same format as they would be received
528  over the network. In rospy, it is possible to assign an illegal data type
529  to a message field (for example, `message = String(data=42)`), but trying
530  to publish this message will throw `SerializationError: field data must be
531  of type str`. This method will expose such bugs.
532  """
533  from StringIO import StringIO
534  buff = StringIO()
535  message.serialize(buff)
536  result = message.__class__() # create new instance of same class as message
537  result.deserialize(buff.getvalue())
538  return result
539 
540 
541 PKG = 'rospy_message_converter'
542 NAME = 'test_message_converter'
543 if __name__ == '__main__':
544  import rosunit
545  rosunit.unitrun(PKG, NAME, TestMessageConverter)


rospy_message_converter
Author(s): Brandon Alexander
autogenerated on Mon Jun 10 2019 14:48:03