43 from rospy.exceptions
import ROSInitException
44 from rospy_message_converter
import message_converter
46 python3 = sys.hexversion > 0x03000000
51 from rospy_message_converter.msg
import TestArray
53 expected_dictionary = {
'data': [1.1, 2.2, 3.3]}
54 message = TestArray(data=expected_dictionary[
'data'])
56 dictionary = message_converter.convert_ros_message_to_dictionary(message)
57 self.assertEqual(dictionary, expected_dictionary)
60 from std_msgs.msg
import Bool
62 expected_dictionary = {
'data':
True}
63 message = Bool(data=expected_dictionary[
'data'])
65 dictionary = message_converter.convert_ros_message_to_dictionary(message)
66 self.assertEqual(dictionary, expected_dictionary)
69 from std_msgs.msg
import Byte
71 expected_dictionary = {
'data': 5}
72 message = Byte(data=expected_dictionary[
'data'])
74 dictionary = message_converter.convert_ros_message_to_dictionary(message)
75 self.assertEqual(dictionary, expected_dictionary)
78 from std_msgs.msg
import Char
80 expected_dictionary = {
'data': 99}
81 message = Char(data=expected_dictionary[
'data'])
83 dictionary = message_converter.convert_ros_message_to_dictionary(message)
84 self.assertEqual(dictionary, expected_dictionary)
87 from std_msgs.msg
import Duration
89 duration = rospy.rostime.Duration(33, 25)
90 expected_dictionary = {
'data': {
'secs': duration.secs,
'nsecs': duration.nsecs}}
91 message = Duration(data=duration)
93 dictionary = message_converter.convert_ros_message_to_dictionary(message)
94 self.assertEqual(dictionary, expected_dictionary)
97 from std_msgs.msg
import Empty
99 expected_dictionary = {}
102 dictionary = message_converter.convert_ros_message_to_dictionary(message)
103 self.assertEqual(dictionary, expected_dictionary)
106 from std_msgs.msg
import Float32
108 expected_dictionary = {
'data': struct.unpack(
'<f', b
'\x7F\x7F\xFF\xFD')[0]}
109 message = Float32(data=expected_dictionary[
'data'])
111 dictionary = message_converter.convert_ros_message_to_dictionary(message)
112 self.assertEqual(dictionary, expected_dictionary)
115 from std_msgs.msg
import Float64
117 expected_dictionary = {
'data': struct.unpack(
'<d', b
'\x7F\xEF\xFF\xFF\xFF\xFF\xFF\xFD')[0]}
118 message = Float64(data=expected_dictionary[
'data'])
120 dictionary = message_converter.convert_ros_message_to_dictionary(message)
121 self.assertEqual(dictionary, expected_dictionary)
124 from std_msgs.msg
import Header
125 from time
import time
127 now_time = rospy.Time(time())
128 expected_dictionary = {
129 'stamp': {
'secs': now_time.secs,
'nsecs': now_time.nsecs},
130 'frame_id':
'my_frame',
133 message =
Header(stamp=now_time, frame_id=expected_dictionary[
'frame_id'], seq=expected_dictionary[
'seq'])
135 dictionary = message_converter.convert_ros_message_to_dictionary(message)
136 self.assertEqual(dictionary, expected_dictionary)
139 from std_msgs.msg
import Int8
141 expected_dictionary = {
'data': -0x7F}
142 message = Int8(data=expected_dictionary[
'data'])
144 dictionary = message_converter.convert_ros_message_to_dictionary(message)
145 self.assertEqual(dictionary, expected_dictionary)
148 from std_msgs.msg
import UInt8
150 expected_dictionary = {
'data': 0xFF}
151 message = UInt8(data=expected_dictionary[
'data'])
153 dictionary = message_converter.convert_ros_message_to_dictionary(message)
154 self.assertEqual(dictionary, expected_dictionary)
157 from rospy_message_converter.msg
import Uint8ArrayTestMessage
158 from base64
import b64encode
160 expected_data = [97, 98, 99, 100]
161 message = Uint8ArrayTestMessage(data=expected_data)
163 dictionary = message_converter.convert_ros_message_to_dictionary(message)
164 expected_data = b64encode(bytearray(expected_data)).decode(
'utf-8')
165 self.assertEqual(dictionary[
"data"], expected_data)
168 from rospy_message_converter.msg
import Uint8Array3TestMessage
169 from base64
import b64encode
171 expected_data = [97, 98, 99]
172 message = Uint8Array3TestMessage(data=expected_data)
174 dictionary = message_converter.convert_ros_message_to_dictionary(message)
175 expected_data = b64encode(bytearray(expected_data)).decode(
'utf-8')
176 self.assertEqual(dictionary[
"data"], expected_data)
179 from rospy_message_converter.msg
import Uint8Array3TestMessage
181 expected_data = [97, 98, 99]
182 message = Uint8Array3TestMessage(data=expected_data)
184 dictionary = message_converter.convert_ros_message_to_dictionary(message, binary_array_as_bytes=
False)
185 self.assertEqual(dictionary[
"data"], expected_data)
188 from rospy_message_converter.msg
import NestedUint8ArrayTestMessage, Uint8ArrayTestMessage
190 expected_data = [97, 98, 99]
191 message = NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)])
193 dictionary = message_converter.convert_ros_message_to_dictionary(message, binary_array_as_bytes=
False)
194 self.assertEqual(dictionary[
"arrays"][0][
"data"], expected_data)
197 from rospy_message_converter.msg
import NestedUint8ArrayTestMessage, Uint8ArrayTestMessage
198 from base64
import b64encode
200 expected_data = bytes(bytearray([97, 98, 99]))
201 message = NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)])
203 dictionary = message_converter.convert_ros_message_to_dictionary(message)
204 self.assertEqual(dictionary[
"arrays"][0][
"data"], b64encode(expected_data).decode(
'utf-8'))
207 from std_msgs.msg
import Int16
209 expected_dictionary = {
'data': -0x7FFF}
210 message = Int16(data=expected_dictionary[
'data'])
212 dictionary = message_converter.convert_ros_message_to_dictionary(message)
213 self.assertEqual(dictionary, expected_dictionary)
216 from std_msgs.msg
import UInt16
218 expected_dictionary = {
'data': 0xFFFF}
219 message = UInt16(data=expected_dictionary[
'data'])
221 dictionary = message_converter.convert_ros_message_to_dictionary(message)
222 self.assertEqual(dictionary, expected_dictionary)
225 from std_msgs.msg
import Int32
227 expected_dictionary = {
'data': -0x7FFFFFFF}
228 message = Int32(data=expected_dictionary[
'data'])
230 dictionary = message_converter.convert_ros_message_to_dictionary(message)
231 self.assertEqual(dictionary, expected_dictionary)
234 from std_msgs.msg
import UInt32
236 expected_dictionary = {
'data': 0xFFFFFFFF}
237 message = UInt32(data=expected_dictionary[
'data'])
239 dictionary = message_converter.convert_ros_message_to_dictionary(message)
240 self.assertEqual(dictionary, expected_dictionary)
243 from std_msgs.msg
import Int64
245 expected_dictionary = {
'data': -0x7FFFFFFFFFFFFFFF}
246 message = Int64(data=expected_dictionary[
'data'])
248 dictionary = message_converter.convert_ros_message_to_dictionary(message)
249 self.assertEqual(dictionary, expected_dictionary)
252 from std_msgs.msg
import UInt64
254 expected_dictionary = {
'data': 0xFFFFFFFFFFFFFFFF}
255 message = UInt64(data=expected_dictionary[
'data'])
257 dictionary = message_converter.convert_ros_message_to_dictionary(message)
258 self.assertEqual(dictionary, expected_dictionary)
261 from std_msgs.msg
import String
263 expected_dictionary = {
'data':
'Hello'}
264 message = String(data=expected_dictionary[
'data'])
266 dictionary = message_converter.convert_ros_message_to_dictionary(message)
267 self.assertEqual(dictionary, expected_dictionary)
271 Test that strings are encoded as utf8 273 from std_msgs.msg
import String
275 expected_dictionary = {
'data':
u'Hello \u00dcnicode'}
276 message = String(data=expected_dictionary[
'data'])
278 dictionary = message_converter.convert_ros_message_to_dictionary(message)
279 self.assertEqual(dictionary, expected_dictionary)
282 from std_msgs.msg
import Time
283 from time
import time
285 now_time = rospy.Time(time())
286 expected_dictionary = {
'data': {
'secs': now_time.secs,
'nsecs': now_time.nsecs}}
287 message = Time(data=now_time)
289 dictionary = message_converter.convert_ros_message_to_dictionary(message)
290 self.assertEqual(dictionary, expected_dictionary)
293 from std_msgs.msg
import Float64MultiArray, MultiArrayLayout, MultiArrayDimension
295 expected_dictionary = {
298 {
'label':
'Dimension1',
'size': 12,
'stride': 7},
299 {
'label':
'Dimension2',
'size': 24,
'stride': 14},
303 'data': [1.1, 2.2, 3.3],
305 dimension1 = MultiArrayDimension(
306 label=expected_dictionary[
'layout'][
'dim'][0][
'label'],
307 size=expected_dictionary[
'layout'][
'dim'][0][
'size'],
308 stride=expected_dictionary[
'layout'][
'dim'][0][
'stride'],
310 dimension2 = MultiArrayDimension(
311 label=expected_dictionary[
'layout'][
'dim'][1][
'label'],
312 size=expected_dictionary[
'layout'][
'dim'][1][
'size'],
313 stride=expected_dictionary[
'layout'][
'dim'][1][
'stride'],
315 layout = MultiArrayLayout(
316 dim=[dimension1, dimension2], data_offset=expected_dictionary[
'layout'][
'data_offset']
318 message = Float64MultiArray(layout=layout, data=expected_dictionary[
'data'])
320 dictionary = message_converter.convert_ros_message_to_dictionary(message)
321 self.assertEqual(dictionary, expected_dictionary)
324 from std_srvs.srv
import EmptyRequest, EmptyResponse
326 expected_dictionary_req = {}
327 expected_dictionary_res = {}
328 request = EmptyRequest()
330 response = EmptyResponse()
332 dictionary_req = message_converter.convert_ros_message_to_dictionary(request)
333 self.assertEqual(dictionary_req, expected_dictionary_req)
334 dictionary_res = message_converter.convert_ros_message_to_dictionary(response)
335 self.assertEqual(dictionary_res, expected_dictionary_res)
338 from rospy_message_converter.srv
import NestedUint8ArrayTestServiceRequest, NestedUint8ArrayTestServiceResponse
339 from rospy_message_converter.msg
import NestedUint8ArrayTestMessage, Uint8ArrayTestMessage
340 from base64
import b64encode
342 expected_data = bytes(bytearray([97, 98, 99]))
344 expected_dictionary_req = {
"input": {
"arrays": [{
"data": b64encode(expected_data).decode(
'utf-8')}]}}
345 expected_dictionary_res = {
"output": {
"arrays": [{
"data": b64encode(expected_data).decode(
'utf-8')}]}}
346 request = NestedUint8ArrayTestServiceRequest(
347 input=NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)])
350 response = NestedUint8ArrayTestServiceResponse(
351 output=NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)])
355 dictionary_req = message_converter.convert_ros_message_to_dictionary(request)
356 self.assertEqual(dictionary_req, expected_dictionary_req)
357 dictionary_res = message_converter.convert_ros_message_to_dictionary(response)
358 self.assertEqual(dictionary_res, expected_dictionary_res)
361 from rospy_message_converter.msg
import TestArray
363 expected_message = TestArray(data=[1.1, 2.2, 3.3, 4.4])
364 dictionary = {
'data': expected_message.data}
365 message = message_converter.convert_dictionary_to_ros_message(
'rospy_message_converter/TestArray', dictionary)
367 self.assertEqual(message, expected_message)
371 rospy treats uint8[] data as `bytes`, which is the Python representation for byte data. In Python 2, this is 372 the same as `str`. The `bytes` value must be base64-encoded. 374 from rospy_message_converter.msg
import Uint8ArrayTestMessage
375 from base64
import b64encode
377 expected_message = Uint8ArrayTestMessage(data=bytes(bytearray([97, 98, 99])))
378 dictionary = {
'data': b64encode(expected_message.data)}
379 message = message_converter.convert_dictionary_to_ros_message(
380 'rospy_message_converter/Uint8ArrayTestMessage', dictionary
383 self.assertEqual(message, expected_message)
387 Even though rospy treats uint8[] data as `bytes`, rospy_message_converter also handles lists of int. In that 388 case, the input data must *not* be base64-encoded. 390 from rospy_message_converter.msg
import Uint8ArrayTestMessage
392 expected_message = Uint8ArrayTestMessage(data=[1, 2, 3, 4])
393 dictionary = {
'data': expected_message.data}
394 message = message_converter.convert_dictionary_to_ros_message(
395 'rospy_message_converter/Uint8ArrayTestMessage', dictionary
398 self.assertEqual(message, expected_message)
401 dictionary = {
'data': [1, 2, 3, 4000]}
402 with self.assertRaises(ValueError)
as context:
403 message_converter.convert_dictionary_to_ros_message(
404 'rospy_message_converter/Uint8ArrayTestMessage', dictionary
406 self.assertEqual(
'byte must be in range(0, 256)', context.exception.args[0])
410 If the value of a uint8[] field has type `bytes`, rospy_message_converter expects that data to be 411 base64-encoded and runs b64decode on it. This test documents what happens if the value is 414 from rospy_message_converter.msg
import Uint8ArrayTestMessage
422 dictionary = {
'data': bytes(bytearray([1, 2, 97, 4]))}
423 with self.assertRaises((TypeError, binascii.Error))
as context:
424 message_converter.convert_dictionary_to_ros_message(
425 'rospy_message_converter/Uint8ArrayTestMessage', dictionary
427 if type(context.exception) == TypeError:
428 error_msg = context.exception.args[0].args[0]
430 error_msg = context.exception.args[0]
431 self.assertIn(error_msg, [
'Incorrect padding',
'Non-base64 digit found'])
433 dictionary = {
'data': bytes(bytearray([1, 97, 97, 2, 3, 97, 4, 97]))}
436 with self.assertRaises(binascii.Error)
as context:
437 message_converter.convert_dictionary_to_ros_message(
438 'rospy_message_converter/Uint8ArrayTestMessage', dictionary
440 self.assertEqual(
'Non-base64 digit found', context.exception.args[0])
444 message = message_converter.convert_dictionary_to_ros_message(
445 'rospy_message_converter/Uint8ArrayTestMessage', dictionary
447 expected_message =
serialize_deserialize(Uint8ArrayTestMessage(data=bytes(bytearray([105, 166, 154]))))
448 self.assertEqual(message, expected_message)
451 from rospy_message_converter.msg
import Uint8Array3TestMessage
452 from base64
import b64encode
454 expected_message = Uint8Array3TestMessage(data=bytes(bytearray([97, 98, 99])))
455 dictionary = {
'data': b64encode(expected_message.data)}
456 message = message_converter.convert_dictionary_to_ros_message(
457 'rospy_message_converter/Uint8Array3TestMessage', dictionary
460 self.assertEqual(message, expected_message)
463 from rospy_message_converter.msg
import Uint8Array3TestMessage
465 expected_message = Uint8Array3TestMessage(data=[97, 98, 99])
466 dictionary = {
'data': expected_message.data}
467 message = message_converter.convert_dictionary_to_ros_message(
468 'rospy_message_converter/Uint8Array3TestMessage', dictionary
471 self.assertEqual(message, expected_message)
474 from std_msgs.msg
import Bool
476 expected_message = Bool(data=
True)
477 dictionary = {
'data': expected_message.data}
478 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Bool', dictionary)
480 self.assertEqual(message, expected_message)
483 from std_msgs.msg
import Byte
485 expected_message = Byte(data=3)
486 dictionary = {
'data': expected_message.data}
487 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Byte', dictionary)
489 self.assertEqual(message, expected_message)
492 from std_msgs.msg
import Char
494 expected_message = Char(data=99)
495 dictionary = {
'data': expected_message.data}
496 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Char', dictionary)
498 self.assertEqual(message, expected_message)
501 from std_msgs.msg
import Duration
503 duration = rospy.rostime.Duration(33, 25)
504 expected_message = Duration(data=duration)
505 dictionary = {
'data': {
'secs': duration.secs,
'nsecs': duration.nsecs}}
506 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Duration', dictionary)
508 self.assertEqual(message, expected_message)
511 from std_msgs.msg
import Empty
513 expected_message = Empty()
515 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Empty', dictionary)
517 self.assertEqual(message, expected_message)
520 dictionary = {
"additional_args":
"should raise value error"}
521 with self.assertRaises(ValueError)
as context:
522 message_converter.convert_dictionary_to_ros_message(
'std_msgs/Empty', dictionary)
524 '''ROS message type "std_msgs/Empty" has no field named "additional_args"''', context.exception.args[0]
528 from std_msgs.msg
import Empty
530 expected_message = Empty()
531 dictionary = {
"additional_args":
"should be ignored"}
532 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Empty', dictionary, strict_mode=
False)
534 self.assertEqual(message, expected_message)
537 from base64
import b64encode
539 expected_data = bytes(bytearray([97, 98, 99]))
540 dictionary = {
"arrays": [{
"data": b64encode(expected_data),
"additional_args":
"should raise value error"}]}
541 with self.assertRaises(ValueError)
as context:
542 message_converter.convert_dictionary_to_ros_message(
543 'rospy_message_converter/NestedUint8ArrayTestMessage', dictionary
546 'ROS message type "rospy_message_converter/Uint8ArrayTestMessage" has no field named "additional_args"',
547 context.exception.args[0],
551 from rospy_message_converter.msg
import NestedUint8ArrayTestMessage, Uint8ArrayTestMessage
552 from base64
import b64encode
554 expected_data = bytes(bytearray([97, 98, 99]))
555 expected_message = NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)])
556 dictionary = {
"arrays": [{
"data": b64encode(expected_data),
"additional_args":
"should be ignored"}]}
557 message = message_converter.convert_dictionary_to_ros_message(
558 'rospy_message_converter/NestedUint8ArrayTestMessage', dictionary, strict_mode=
False 561 self.assertEqual(message, expected_message)
564 from std_msgs.msg
import Bool
566 expected_message = Bool(data=
False)
568 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Bool', dictionary)
570 self.assertEqual(message, expected_message)
574 with self.assertRaises(ValueError)
as context:
575 message_converter.convert_dictionary_to_ros_message(
'std_msgs/Bool', dictionary, check_missing_fields=
True)
576 self.assertEqual(
'''Missing fields "{'data': 'bool'}"''', context.exception.args[0])
579 from rospy_message_converter.msg
import NestedUint8ArrayTestMessage, Uint8ArrayTestMessage
581 expected_message = NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=[])])
582 dictionary = {
"arrays": [{}]}
583 message = message_converter.convert_dictionary_to_ros_message(
584 'rospy_message_converter/NestedUint8ArrayTestMessage', dictionary
587 self.assertEqual(message, expected_message)
590 dictionary = {
"arrays": [{}]}
591 with self.assertRaises(ValueError)
as context:
592 message_converter.convert_dictionary_to_ros_message(
593 'rospy_message_converter/NestedUint8ArrayTestMessage', dictionary, check_missing_fields=
True 595 self.assertEqual(
'''Missing fields "{'data': 'uint8[]'}"''', context.exception.args[0])
598 dictionary = {
"data":
"should_be_a_bool"}
599 with self.assertRaises(TypeError)
as context:
600 message_converter.convert_dictionary_to_ros_message(
'std_msgs/Bool', dictionary)
601 self.assertTrue(
"Field 'data' has wrong type" in context.exception.args[0])
604 from std_msgs.msg
import Float32
606 expected_message = Float32(data=struct.unpack(
'<f', b
'\x7F\x7F\xFF\xFD')[0])
607 dictionary = {
'data': expected_message.data}
608 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Float32', dictionary)
610 self.assertEqual(message, expected_message)
613 from std_msgs.msg
import Float64
615 expected_message = Float64(data=struct.unpack(
'<d', b
'\x7F\xEF\xFF\xFF\xFF\xFF\xFF\xFD')[0])
616 dictionary = {
'data': expected_message.data}
617 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Float64', dictionary)
619 self.assertEqual(message, expected_message)
622 from std_msgs.msg
import Header
623 from time
import time
625 now_time = rospy.Time(time())
626 expected_message =
Header(stamp=now_time, frame_id=
'my_frame', seq=12)
628 'stamp': {
'secs': now_time.secs,
'nsecs': now_time.nsecs},
629 'frame_id': expected_message.frame_id,
630 'seq': expected_message.seq,
632 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Header', dictionary)
634 self.assertEqual(message, expected_message)
637 from std_msgs.msg
import Header
638 from time
import time
640 now_time = rospy.Time(time())
641 expected_message =
Header(stamp=now_time, frame_id=
'my_frame', seq=12)
643 'stamp': {
'secs': now_time.secs,
'nsecs': now_time.nsecs},
644 'frame_id': expected_message.frame_id,
645 'seq': expected_message.seq,
647 message = message_converter.convert_dictionary_to_ros_message(
'Header', dictionary)
649 self.assertEqual(message, expected_message)
652 from std_msgs.msg
import Int8
654 expected_message = Int8(data=-0x7F)
655 dictionary = {
'data': expected_message.data}
656 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Int8', dictionary)
658 self.assertEqual(message, expected_message)
661 from std_msgs.msg
import UInt8
663 expected_message = UInt8(data=0xFF)
664 dictionary = {
'data': expected_message.data}
665 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/UInt8', dictionary)
667 self.assertEqual(message, expected_message)
670 from std_msgs.msg
import Int16
672 expected_message = Int16(data=-0x7FFF)
673 dictionary = {
'data': expected_message.data}
674 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Int16', dictionary)
676 self.assertEqual(message, expected_message)
679 from std_msgs.msg
import UInt16
681 expected_message = UInt16(data=0xFFFF)
682 dictionary = {
'data': expected_message.data}
683 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/UInt16', dictionary)
685 self.assertEqual(message, expected_message)
688 from std_msgs.msg
import Int32
690 expected_message = Int32(data=-0x7FFFFFFF)
691 dictionary = {
'data': expected_message.data}
692 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Int32', dictionary)
694 self.assertEqual(message, expected_message)
697 from std_msgs.msg
import UInt32
699 expected_message = UInt32(data=0xFFFFFFFF)
700 dictionary = {
'data': expected_message.data}
701 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/UInt32', dictionary)
703 self.assertEqual(message, expected_message)
706 from std_msgs.msg
import Int64
708 expected_message = Int64(data=-0x7FFFFFFFFFFFFFFF)
709 dictionary = {
'data': expected_message.data}
710 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Int64', dictionary)
712 self.assertEqual(message, expected_message)
715 from std_msgs.msg
import UInt64
717 expected_message = UInt64(data=0xFFFFFFFFFFFFFFFF)
718 dictionary = {
'data': expected_message.data}
719 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/UInt64', dictionary)
721 self.assertEqual(message, expected_message)
724 from std_msgs.msg
import String
726 expected_message = String(data=
'Hello')
727 dictionary = {
'data': expected_message.data}
728 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/String', dictionary)
730 self.assertEqual(message, expected_message)
733 from std_msgs.msg
import String
735 expected_message = String(data=
u'Hello \u00dcnicode')
736 dictionary = {
'data': expected_message.data}
737 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/String', dictionary)
739 self.assertEqual(message.data, expected_message.data)
740 self.assertEqual(type(message.data), type(expected_message.data))
743 from std_msgs.msg
import Time
744 from time
import time
746 now_time = rospy.Time(time())
747 expected_message = Time(data=now_time)
748 dictionary = {
'data': {
'secs': now_time.secs,
'nsecs': now_time.nsecs}}
749 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Time', dictionary)
751 self.assertEqual(message, expected_message)
754 dictionary = {
'data':
'now'}
755 with self.assertRaises(ROSInitException)
as context:
756 message_converter.convert_dictionary_to_ros_message(
'std_msgs/Time', dictionary)
757 self.assertEqual(
'time is not initialized. Have you called init_node()?', context.exception.args[0])
760 from std_msgs.msg
import Float64MultiArray, MultiArrayLayout, MultiArrayDimension
762 expected_message = Float64MultiArray(
763 layout=MultiArrayLayout(
765 MultiArrayDimension(label=
'Dimension1', size=12, stride=7),
766 MultiArrayDimension(label=
'Dimension2', size=90, stride=8),
770 data=[1.1, 2.2, 3.3],
776 'label': expected_message.layout.dim[0].label,
777 'size': expected_message.layout.dim[0].size,
778 'stride': expected_message.layout.dim[0].stride,
781 'label': expected_message.layout.dim[1].label,
782 'size': expected_message.layout.dim[1].size,
783 'stride': expected_message.layout.dim[1].stride,
786 'data_offset': expected_message.layout.data_offset,
788 'data': expected_message.data,
790 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Float64MultiArray', dictionary)
792 self.assertEqual(message, expected_message)
796 ValueError, message_converter.convert_dictionary_to_ros_message,
'std_msgs/Empty', {
'invalid_field': 1}
800 from std_srvs.srv
import EmptyRequest, EmptyResponse
802 expected_req = EmptyRequest()
803 expected_res = EmptyResponse()
806 message = message_converter.convert_dictionary_to_ros_message(
'std_srvs/Empty', dictionary_req,
'request')
808 self.assertEqual(message, expected_req)
809 message = message_converter.convert_dictionary_to_ros_message(
'std_srvs/Empty', dictionary_res,
'response')
811 self.assertEqual(message, expected_res)
814 from rospy_message_converter.srv
import NestedUint8ArrayTestServiceRequest, NestedUint8ArrayTestServiceResponse
815 from rospy_message_converter.msg
import NestedUint8ArrayTestMessage, Uint8ArrayTestMessage
816 from base64
import b64encode
818 expected_data = bytes(bytearray([97, 98, 99]))
819 expected_req = NestedUint8ArrayTestServiceRequest(
820 input=NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)])
823 expected_res = NestedUint8ArrayTestServiceResponse(
824 output=NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)])
828 dictionary_req = {
"input": {
"arrays": [{
"data": b64encode(expected_data)}]}}
829 dictionary_res = {
"output": {
"arrays": [{
"data": b64encode(expected_data)}]}}
830 message = message_converter.convert_dictionary_to_ros_message(
831 'rospy_message_converter/NestedUint8ArrayTestService', dictionary_req,
'request' 833 self.assertEqual(message, expected_req)
834 message = message_converter.convert_dictionary_to_ros_message(
835 'rospy_message_converter/NestedUint8ArrayTestService', dictionary_res,
'response' 837 self.assertEqual(message, expected_res)
840 from std_srvs.srv
import SetBoolRequest, SetBoolResponse
842 expected_req = SetBoolRequest(data=
True)
843 expected_res = SetBoolResponse(success=
True, message=
'Success!')
844 dictionary_req = {
'data':
True}
845 dictionary_res = {
'success':
True,
'message':
'Success!'}
846 message = message_converter.convert_dictionary_to_ros_message(
'std_srvs/SetBool', dictionary_req,
'request')
848 self.assertEqual(message, expected_req)
849 message = message_converter.convert_dictionary_to_ros_message(
'std_srvs/SetBool', dictionary_res,
'response')
851 self.assertEqual(message, expected_res)
854 from std_srvs.srv
import TriggerRequest, TriggerResponse
856 expected_req = TriggerRequest()
857 expected_res = TriggerResponse(success=
True, message=
'Success!')
859 dictionary_res = {
'success':
True,
'message':
'Success!'}
860 message = message_converter.convert_dictionary_to_ros_message(
'std_srvs/Trigger', dictionary_req,
'request')
862 self.assertEqual(message, expected_req)
863 message = message_converter.convert_dictionary_to_ros_message(
'std_srvs/Trigger', dictionary_res,
'response')
865 self.assertEqual(message, expected_res)
868 with self.assertRaises(ValueError)
as context:
869 message_converter.convert_dictionary_to_ros_message(
'std_msgs/Empty', {}, kind=
'invalid')
870 self.assertEqual(
'Unknown kind "invalid".', context.exception.args[0])
873 from std_msgs.msg
import Byte, Char, Float32, Float64, Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64
875 numpy_numeric_types = [
888 np.iinfo(t).min
for t
in [np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32, np.uint64]
889 ] + [np.finfo(t).min
for t
in [np.float32, np.float64]]
891 np.iinfo(t).max
for t
in [np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32, np.uint64]
892 ] + [np.finfo(t).max
for t
in [np.float32, np.float64]]
894 num_type: (min_val, max_val)
895 for (num_type, min_val, max_val)
in zip(numpy_numeric_types, min_values, max_values)
897 ros_to_numpy_type_map = {
898 Float32: [np.float32, np.int8, np.int16, np.uint8, np.uint16],
899 Float64: [np.float32, np.float64, np.int8, np.int16, np.int32, np.uint8, np.uint16, np.uint32],
901 Int16: [np.int8, np.int16, np.uint8],
902 Int32: [np.int8, np.int16, np.int32, np.uint8, np.uint16],
903 Int64: [np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32],
905 UInt16: [np.uint8, np.uint16],
906 UInt32: [np.uint8, np.uint16, np.uint32],
907 UInt64: [np.uint8, np.uint16, np.uint32, np.uint64],
911 for ros_type, valid_numpy_types
in ros_to_numpy_type_map.items():
912 for numpy_type
in valid_numpy_types:
913 for value
in numeric_limits[numpy_type]:
914 expected_message = ros_type(data=numpy_type(value))
915 dictionary = {
'data': numpy_type(value)}
916 message = message_converter.convert_dictionary_to_ros_message(expected_message._type, dictionary)
918 self.assertEqual(message, expected_message)
920 for wrong_numpy_type
in [t
for t
in numpy_numeric_types
if t
not in valid_numpy_types]:
921 for value
in numeric_limits[wrong_numpy_type]:
922 with self.assertRaises(TypeError)
as context:
923 expected_message = ros_type(data=wrong_numpy_type(value))
924 dictionary = {
'data': wrong_numpy_type(value)}
925 message_converter.convert_dictionary_to_ros_message(expected_message._type, dictionary)
926 self.assertTrue(
"Field 'data' has wrong type" in context.exception.args[0])
930 from rospy_message_converter.msg
import TestArray
932 expected_message = TestArray(data=[])
933 dictionary = {
'data':
None}
934 message = message_converter.convert_dictionary_to_ros_message(
'rospy_message_converter/TestArray', dictionary)
936 self.assertEqual(message, expected_message)
939 from rospy_message_converter.msg
import Uint8ArrayTestMessage
941 expected_message = Uint8ArrayTestMessage(data=bytes())
942 dictionary = {
'data':
None}
943 message = message_converter.convert_dictionary_to_ros_message(
944 'rospy_message_converter/Uint8ArrayTestMessage', dictionary
947 self.assertEqual(message, expected_message)
950 from rospy_message_converter.msg
import Uint8Array3TestMessage
952 expected_message = Uint8Array3TestMessage(data=[0, 0, 0])
953 dictionary = {
'data':
None}
954 message = message_converter.convert_dictionary_to_ros_message(
955 'rospy_message_converter/Uint8Array3TestMessage', dictionary
958 self.assertEqual(message, expected_message)
961 from std_msgs.msg
import Bool
963 expected_message = Bool(data=
False)
964 dictionary = {
'data':
None}
965 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Bool', dictionary)
967 self.assertEqual(message, expected_message)
970 from std_msgs.msg
import Byte
972 expected_message = Byte(data=0)
973 dictionary = {
'data':
None}
974 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Byte', dictionary)
976 self.assertEqual(message, expected_message)
979 from std_msgs.msg
import Char
981 expected_message = Char(data=0)
982 dictionary = {
'data':
None}
983 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Char', dictionary)
985 self.assertEqual(message, expected_message)
988 from std_msgs.msg
import Duration
990 expected_message = Duration()
991 dictionary = {
'data':
None}
992 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Duration', dictionary)
994 self.assertEqual(message, expected_message)
997 from std_msgs.msg
import Duration
999 expected_message = Duration(data=rospy.rostime.Duration())
1000 dictionary = {
'data': {
'secs':
None,
'nsecs':
None}}
1001 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Duration', dictionary)
1003 self.assertEqual(message, expected_message)
1006 from std_msgs.msg
import Float32
1008 expected_message = Float32(data=0.0)
1009 dictionary = {
'data':
None}
1010 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Float32', dictionary)
1012 self.assertEqual(message, expected_message)
1015 from std_msgs.msg
import Float64
1017 expected_message = Float64(data=0.0)
1018 dictionary = {
'data':
None}
1019 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Float64', dictionary)
1021 self.assertEqual(message, expected_message)
1024 from std_msgs.msg
import Header
1026 expected_message =
Header(stamp=rospy.Time(), frame_id=
'', seq=12)
1027 dictionary = {
'stamp': {
'secs':
None,
'nsecs': 0.0},
'frame_id':
None,
'seq': expected_message.seq}
1028 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Header', dictionary)
1030 self.assertEqual(message, expected_message)
1033 from std_msgs.msg
import Int8
1035 expected_message = Int8(data=0)
1036 dictionary = {
'data':
None}
1037 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Int8', dictionary)
1039 self.assertEqual(message, expected_message)
1042 from std_msgs.msg
import UInt8
1044 expected_message = UInt8(data=0)
1045 dictionary = {
'data':
None}
1046 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/UInt8', dictionary)
1048 self.assertEqual(message, expected_message)
1051 from std_msgs.msg
import Int16
1053 expected_message = Int16(data=0)
1054 dictionary = {
'data':
None}
1055 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Int16', dictionary)
1057 self.assertEqual(message, expected_message)
1060 from std_msgs.msg
import UInt16
1062 expected_message = UInt16(data=0)
1063 dictionary = {
'data':
None}
1064 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/UInt16', dictionary)
1066 self.assertEqual(message, expected_message)
1069 from std_msgs.msg
import Int32
1071 expected_message = Int32(data=0)
1072 dictionary = {
'data':
None}
1073 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Int32', dictionary)
1075 self.assertEqual(message, expected_message)
1078 from std_msgs.msg
import UInt32
1080 expected_message = UInt32(data=0)
1081 dictionary = {
'data':
None}
1082 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/UInt32', dictionary)
1084 self.assertEqual(message, expected_message)
1087 from std_msgs.msg
import Int64
1089 expected_message = Int64(data=0)
1090 dictionary = {
'data':
None}
1091 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Int64', dictionary)
1093 self.assertEqual(message, expected_message)
1096 from std_msgs.msg
import UInt64
1098 expected_message = UInt64(data=0)
1099 dictionary = {
'data':
None}
1100 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/UInt64', dictionary)
1102 self.assertEqual(message, expected_message)
1105 from std_msgs.msg
import String
1107 expected_message = String(data=
'')
1108 dictionary = {
'data':
None}
1109 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/String', dictionary)
1111 self.assertEqual(message, expected_message)
1114 from std_msgs.msg
import Time
1116 expected_message = Time()
1117 dictionary = {
'data':
None}
1118 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Time', dictionary)
1120 self.assertEqual(message, expected_message)
1123 from std_msgs.msg
import Time
1125 test_time = rospy.Time.from_sec(0.123456)
1126 expected_message = Time(data=test_time)
1127 dictionary = {
'data': {
'secs':
None,
'nsecs': test_time.nsecs}}
1128 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Time', dictionary)
1130 self.assertEqual(message, expected_message)
1133 from std_msgs.msg
import Float64MultiArray, MultiArrayLayout, MultiArrayDimension
1135 expected_message = Float64MultiArray(
1136 layout=MultiArrayLayout(
1138 MultiArrayDimension(label=
'', size=0, stride=0),
1139 MultiArrayDimension(label=
'Dimension2', size=90, stride=8),
1143 data=[1.1, 2.2, 3.3],
1150 'label': expected_message.layout.dim[1].label,
1151 'size': expected_message.layout.dim[1].size,
1152 'stride': expected_message.layout.dim[1].stride,
1155 'data_offset': expected_message.layout.data_offset,
1157 'data': expected_message.data,
1159 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Float64MultiArray', dictionary)
1161 self.assertEqual(message, expected_message)
1166 Serialize and then deserialize a message. This simulates sending a message 1167 between ROS nodes and makes sure that the ROS messages being tested are 1168 actually serializable, and are in the same format as they would be received 1169 over the network. In rospy, it is possible to assign an illegal data type 1170 to a message field (for example, `message = String(data=42)`), but trying 1171 to publish this message will throw `SerializationError: field data must be 1172 of type str`. This method will expose such bugs. 1174 from io
import BytesIO
1177 message.serialize(buff)
1178 result = message.__class__()
1179 result.deserialize(buff.getvalue())
1183 PKG =
'rospy_message_converter' 1184 NAME =
'test_message_converter' 1185 if __name__ ==
'__main__':
1188 rosunit.unitrun(PKG, NAME, TestMessageConverter)
def test_dictionary_with_numpy_conversions(self)
def test_ros_message_with_float32(self)
def test_ros_message_with_child_message(self)
def test_dictionary_with_bool_none(self)
def test_dictionary_with_empty_additional_args_strict_mode(self)
def test_ros_message_with_float64(self)
def test_dictionary_with_missing_field_checked(self)
def test_dictionary_with_missing_field_unchecked(self)
def test_dictionary_with_invalid_kind(self)
def test_dictionary_with_uint64_none(self)
def test_ros_message_with_3uint8_array_binary_array_as_array(self)
def test_ros_message_with_time(self)
def test_dictionary_with_uint8_array_list(self)
def test_dictionary_with_3uint8_array_bytes(self)
def test_ros_message_with_uint64(self)
def test_dictionary_with_nested_additional_args_forgiving(self)
def serialize_deserialize(message)
def test_dictionary_with_int64_none(self)
def test_dictionary_with_uint8_array_bytes_unencoded(self)
def test_dictionary_with_float32_none(self)
def test_ros_message_with_char(self)
def test_dictionary_with_unicode(self)
def test_dictionary_with_empty_service(self)
def test_dictionary_with_trigger_service(self)
def test_dictionary_with_float64(self)
def test_ros_message_with_uint8_array(self)
def test_dictionary_with_nested_additional_args_strict_mode(self)
def test_dictionary_with_byte(self)
def test_dictionary_with_int8(self)
def test_dictionary_with_child_message(self)
def test_dictionary_with_uint8_array_bytes(self)
def test_dictionary_with_string(self)
def test_dictionary_with_int64(self)
def test_ros_message_with_unicode(self)
def test_dictionary_with_3uint8_array_list(self)
def test_ros_message_with_array(self)
def test_dictionary_with_uint8_array_list_invalid(self)
def test_dictionary_with_uint64(self)
def test_dictionary_with_uint32_none(self)
def test_dictionary_with_float32(self)
def test_dictionary_with_nested_missing_field_checked(self)
def test_dictionary_with_int8_none(self)
def test_dictionary_with_duration_none(self)
def test_ros_message_with_string(self)
def test_ros_message_with_uint32(self)
def test_dictionary_with_string_none(self)
def test_dictionary_with_uint32(self)
def test_ros_message_with_byte(self)
def test_dictionary_with_invalid_message_fields(self)
def test_dictionary_with_array_none(self)
def test_ros_message_with_header(self)
def test_ros_message_with_3uint8_array(self)
def test_ros_message_with_int32(self)
def test_ros_message_with_int64(self)
def test_ros_message_with_uint8(self)
def test_dictionary_with_int32_none(self)
def test_dictionary_with_uint16(self)
def test_dictionary_with_uint8_array_none(self)
def test_dictionary_with_time(self)
def test_dictionary_with_nested_missing_field_unchecked(self)
def test_dictionary_with_int16(self)
def test_dictionary_with_int16_none(self)
def test_dictionary_with_char_none(self)
def test_dictionary_with_time_none(self)
def test_ros_message_with_int8(self)
def test_dictionary_with_duration_nested_none(self)
def test_dictionary_with_3uint8_array_none(self)
def test_ros_message_with_duration(self)
def test_dictionary_with_empty(self)
def test_dictionary_with_nested_service(self)
def test_dictionary_with_char(self)
def test_dictionary_with_bool(self)
def test_dictionary_with_header_with_no_prefix(self)
def test_dictionary_with_time_nested_none(self)
def test_dictionary_with_byte_none(self)
def test_dictionary_with_duration(self)
def test_dictionary_with_int32(self)
def test_dictionary_with_uint8_none(self)
def test_ros_message_with_nested_uint8_array_binary_array_as_array(self)
def test_dictionary_with_header(self)
def test_dictionary_with_float64_none(self)
def test_dictionary_with_uint16_none(self)
def test_ros_message_with_int16(self)
def test_dictionary_with_uint8(self)
def test_ros_message_with_empty(self)
def test_dictionary_with_setbool_service(self)
def test_dictionary_with_empty_additional_args_forgiving(self)
def test_dictionary_with_child_message_none(self)
def test_ros_message_with_empty_service(self)
def test_ros_message_with_uint16(self)
def test_dictionary_with_header_none(self)
def test_dictionary_with_time_now(self)
def test_dictionary_with_wrong_type(self)
def test_ros_message_with_nested_uint8_array(self)
def test_dictionary_with_array(self)
def test_ros_message_with_bool(self)
def test_ros_message_with_nested_service(self)