8 from rospy.exceptions
import ROSInitException
9 from rospy_message_converter
import message_converter
11 python3 = (sys.hexversion > 0x03000000)
16 from rospy_message_converter.msg
import TestArray
17 expected_dictionary = {
18 'data': [1.1, 2.2, 3.3]
20 message = TestArray(data = expected_dictionary[
'data'])
22 dictionary = message_converter.convert_ros_message_to_dictionary(message)
23 self.assertEqual(dictionary, expected_dictionary)
26 from std_msgs.msg
import Bool
27 expected_dictionary = {
'data':
True }
28 message = Bool(data=expected_dictionary[
'data'])
30 dictionary = message_converter.convert_ros_message_to_dictionary(message)
31 self.assertEqual(dictionary, expected_dictionary)
34 from std_msgs.msg
import Byte
35 expected_dictionary = {
'data': 5 }
36 message = Byte(data=expected_dictionary[
'data'])
38 dictionary = message_converter.convert_ros_message_to_dictionary(message)
39 self.assertEqual(dictionary, expected_dictionary)
42 from std_msgs.msg
import Char
43 expected_dictionary = {
'data': 99 }
44 message = Char(data=expected_dictionary[
'data'])
46 dictionary = message_converter.convert_ros_message_to_dictionary(message)
47 self.assertEqual(dictionary, expected_dictionary)
50 from std_msgs.msg
import Duration
51 duration = rospy.rostime.Duration(33, 25)
52 expected_dictionary = {
54 'secs' : duration.secs,
55 'nsecs' : duration.nsecs
58 message = Duration(data=duration)
60 dictionary = message_converter.convert_ros_message_to_dictionary(message)
61 self.assertEqual(dictionary, expected_dictionary)
64 from std_msgs.msg
import Empty
65 expected_dictionary = {}
68 dictionary = message_converter.convert_ros_message_to_dictionary(message)
69 self.assertEqual(dictionary, expected_dictionary)
72 from std_msgs.msg
import Float32
73 expected_dictionary = {
'data': struct.unpack(
'<f', b
'\x7F\x7F\xFF\xFD')[0] }
74 message = Float32(data=expected_dictionary[
'data'])
76 dictionary = message_converter.convert_ros_message_to_dictionary(message)
77 self.assertEqual(dictionary, expected_dictionary)
80 from std_msgs.msg
import Float64
81 expected_dictionary = {
'data': struct.unpack(
'<d', b
'\x7F\xEF\xFF\xFF\xFF\xFF\xFF\xFD')[0] }
82 message = Float64(data=expected_dictionary[
'data'])
84 dictionary = message_converter.convert_ros_message_to_dictionary(message)
85 self.assertEqual(dictionary, expected_dictionary)
88 from std_msgs.msg
import Header
90 now_time = rospy.Time(time())
91 expected_dictionary = {
92 'stamp': {
'secs': now_time.secs,
'nsecs': now_time.nsecs },
93 'frame_id' :
'my_frame',
98 frame_id = expected_dictionary[
'frame_id'],
99 seq = expected_dictionary[
'seq']
102 dictionary = message_converter.convert_ros_message_to_dictionary(message)
103 self.assertEqual(dictionary, expected_dictionary)
106 from std_msgs.msg
import Int8
107 expected_dictionary = {
'data': -0x7F }
108 message = Int8(data=expected_dictionary[
'data'])
110 dictionary = message_converter.convert_ros_message_to_dictionary(message)
111 self.assertEqual(dictionary, expected_dictionary)
114 from std_msgs.msg
import UInt8
115 expected_dictionary = {
'data': 0xFF }
116 message = UInt8(data=expected_dictionary[
'data'])
118 dictionary = message_converter.convert_ros_message_to_dictionary(message)
119 self.assertEqual(dictionary, expected_dictionary)
122 from rospy_message_converter.msg
import Uint8ArrayTestMessage
123 from base64
import b64encode
124 expected_data = [97, 98, 99, 100]
125 message = Uint8ArrayTestMessage(data=expected_data)
127 dictionary = message_converter.convert_ros_message_to_dictionary(message)
128 expected_data = b64encode(bytearray(expected_data)).decode(
'utf-8')
129 self.assertEqual(dictionary[
"data"], expected_data)
132 from rospy_message_converter.msg
import Uint8Array3TestMessage
133 from base64
import b64encode
134 expected_data = [97, 98, 99]
135 message = Uint8Array3TestMessage(data=expected_data)
137 dictionary = message_converter.convert_ros_message_to_dictionary(message)
138 expected_data = b64encode(bytearray(expected_data)).decode(
'utf-8')
139 self.assertEqual(dictionary[
"data"], expected_data)
142 from rospy_message_converter.msg
import Uint8Array3TestMessage
143 expected_data = [97, 98, 99]
144 message = Uint8Array3TestMessage(data=expected_data)
146 dictionary = message_converter.convert_ros_message_to_dictionary(message, binary_array_as_bytes=
False)
147 self.assertEqual(dictionary[
"data"], expected_data)
150 from rospy_message_converter.msg
import NestedUint8ArrayTestMessage, Uint8ArrayTestMessage
151 expected_data = [97, 98, 99]
152 message = NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)])
154 dictionary = message_converter.convert_ros_message_to_dictionary(message, binary_array_as_bytes=
False)
155 self.assertEqual(dictionary[
"arrays"][0][
"data"], expected_data)
158 from rospy_message_converter.msg
import NestedUint8ArrayTestMessage, Uint8ArrayTestMessage
159 from base64
import b64encode
160 expected_data = bytes(bytearray([97, 98, 99]))
161 message = NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)])
163 dictionary = message_converter.convert_ros_message_to_dictionary(message)
164 self.assertEqual(dictionary[
"arrays"][0][
"data"], b64encode(expected_data).decode(
'utf-8'))
167 from std_msgs.msg
import Int16
168 expected_dictionary = {
'data': -0x7FFF }
169 message = Int16(data=expected_dictionary[
'data'])
171 dictionary = message_converter.convert_ros_message_to_dictionary(message)
172 self.assertEqual(dictionary, expected_dictionary)
175 from std_msgs.msg
import UInt16
176 expected_dictionary = {
'data': 0xFFFF }
177 message = UInt16(data=expected_dictionary[
'data'])
179 dictionary = message_converter.convert_ros_message_to_dictionary(message)
180 self.assertEqual(dictionary, expected_dictionary)
183 from std_msgs.msg
import Int32
184 expected_dictionary = {
'data': -0x7FFFFFFF }
185 message = Int32(data=expected_dictionary[
'data'])
187 dictionary = message_converter.convert_ros_message_to_dictionary(message)
188 self.assertEqual(dictionary, expected_dictionary)
191 from std_msgs.msg
import UInt32
192 expected_dictionary = {
'data': 0xFFFFFFFF }
193 message = UInt32(data=expected_dictionary[
'data'])
195 dictionary = message_converter.convert_ros_message_to_dictionary(message)
196 self.assertEqual(dictionary, expected_dictionary)
199 from std_msgs.msg
import Int64
200 expected_dictionary = {
'data': -0x7FFFFFFFFFFFFFFF }
201 message = Int64(data=expected_dictionary[
'data'])
203 dictionary = message_converter.convert_ros_message_to_dictionary(message)
204 self.assertEqual(dictionary, expected_dictionary)
207 from std_msgs.msg
import UInt64
208 expected_dictionary = {
'data': 0xFFFFFFFFFFFFFFFF }
209 message = UInt64(data=expected_dictionary[
'data'])
211 dictionary = message_converter.convert_ros_message_to_dictionary(message)
212 self.assertEqual(dictionary, expected_dictionary)
215 from std_msgs.msg
import String
216 expected_dictionary = {
'data':
'Hello' }
217 message = String(data=expected_dictionary[
'data'])
219 dictionary = message_converter.convert_ros_message_to_dictionary(message)
220 self.assertEqual(dictionary, expected_dictionary)
224 Test that strings are encoded as utf8 226 from std_msgs.msg
import String
227 expected_dictionary = {
'data':
u'Hello \u00dcnicode' }
228 message = String(data=expected_dictionary[
'data'])
230 dictionary = message_converter.convert_ros_message_to_dictionary(message)
231 self.assertEqual(dictionary, expected_dictionary)
234 from std_msgs.msg
import Time
235 from time
import time
236 now_time = rospy.Time(time())
237 expected_dictionary = {
238 'data': {
'secs': now_time.secs,
'nsecs': now_time.nsecs }
240 message = Time(data=now_time)
242 dictionary = message_converter.convert_ros_message_to_dictionary(message)
243 self.assertEqual(dictionary, expected_dictionary)
246 from std_msgs.msg
import Float64MultiArray, MultiArrayLayout, MultiArrayDimension
247 expected_dictionary = {
250 {
'label':
'Dimension1',
'size': 12,
'stride': 7 },
251 {
'label':
'Dimension2',
'size': 24,
'stride': 14 }
255 'data': [1.1, 2.2, 3.3]
257 dimension1 = MultiArrayDimension(
258 label = expected_dictionary[
'layout'][
'dim'][0][
'label'],
259 size = expected_dictionary[
'layout'][
'dim'][0][
'size'],
260 stride = expected_dictionary[
'layout'][
'dim'][0][
'stride']
262 dimension2 = MultiArrayDimension(
263 label = expected_dictionary[
'layout'][
'dim'][1][
'label'],
264 size = expected_dictionary[
'layout'][
'dim'][1][
'size'],
265 stride = expected_dictionary[
'layout'][
'dim'][1][
'stride']
267 layout = MultiArrayLayout(
268 dim = [dimension1, dimension2],
269 data_offset = expected_dictionary[
'layout'][
'data_offset']
271 message = Float64MultiArray(
273 data = expected_dictionary[
'data']
276 dictionary = message_converter.convert_ros_message_to_dictionary(message)
277 self.assertEqual(dictionary, expected_dictionary)
280 from std_srvs.srv
import EmptyRequest, EmptyResponse
281 expected_dictionary_req = {}
282 expected_dictionary_res = {}
283 request = EmptyRequest()
285 response = EmptyResponse()
287 dictionary_req = message_converter.convert_ros_message_to_dictionary(request)
288 self.assertEqual(dictionary_req, expected_dictionary_req)
289 dictionary_res = message_converter.convert_ros_message_to_dictionary(response)
290 self.assertEqual(dictionary_res, expected_dictionary_res)
293 from rospy_message_converter.srv
import NestedUint8ArrayTestServiceRequest, NestedUint8ArrayTestServiceResponse
294 from rospy_message_converter.msg
import NestedUint8ArrayTestMessage, Uint8ArrayTestMessage
295 from base64
import b64encode
296 expected_data = bytes(bytearray([97, 98, 99]))
298 expected_dictionary_req = {
"input": {
"arrays": [{
"data": b64encode(expected_data).decode(
'utf-8')}]}}
299 expected_dictionary_res = {
"output": {
"arrays": [{
"data": b64encode(expected_data).decode(
'utf-8')}]}}
300 request = NestedUint8ArrayTestServiceRequest(
301 input=NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)]))
303 response = NestedUint8ArrayTestServiceResponse(
304 output=NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)]))
307 dictionary_req = message_converter.convert_ros_message_to_dictionary(request)
308 self.assertEqual(dictionary_req, expected_dictionary_req)
309 dictionary_res = message_converter.convert_ros_message_to_dictionary(response)
310 self.assertEqual(dictionary_res, expected_dictionary_res)
313 from rospy_message_converter.msg
import TestArray
314 expected_message = TestArray(data = [1.1, 2.2, 3.3, 4.4])
315 dictionary = {
'data': expected_message.data }
316 message = message_converter.convert_dictionary_to_ros_message(
'rospy_message_converter/TestArray', dictionary)
318 self.assertEqual(message, expected_message)
322 rospy treats uint8[] data as `bytes`, which is the Python representation for byte data. In Python 2, this is 323 the same as `str`. The `bytes` value must be base64-encoded. 325 from rospy_message_converter.msg
import Uint8ArrayTestMessage
326 from base64
import b64encode
327 expected_message = Uint8ArrayTestMessage(data=bytes(bytearray([97, 98, 99])))
328 dictionary = {
'data': b64encode(expected_message.data)}
329 message = message_converter.convert_dictionary_to_ros_message(
'rospy_message_converter/Uint8ArrayTestMessage',
332 self.assertEqual(message, expected_message)
336 Even though rospy treats uint8[] data as `bytes`, rospy_message_converter also handles lists of int. In that 337 case, the input data must *not* be base64-encoded. 339 from rospy_message_converter.msg
import Uint8ArrayTestMessage
340 expected_message = Uint8ArrayTestMessage(data=[1, 2, 3, 4])
341 dictionary = {
'data': expected_message.data}
342 message = message_converter.convert_dictionary_to_ros_message(
'rospy_message_converter/Uint8ArrayTestMessage',
345 self.assertEqual(message, expected_message)
348 dictionary = {
'data': [1, 2, 3, 4000]}
349 with self.assertRaises(ValueError)
as context:
350 message_converter.convert_dictionary_to_ros_message(
'rospy_message_converter/Uint8ArrayTestMessage',
352 self.assertEqual(
'byte must be in range(0, 256)', context.exception.args[0])
356 If the value of a uint8[] field has type `bytes`, rospy_message_converter expects that data to be 357 base64-encoded and runs b64decode on it. This test documents what happens if the value is 360 from rospy_message_converter.msg
import Uint8ArrayTestMessage
368 dictionary = {
'data': bytes(bytearray([1, 2, 97, 4]))}
369 with self.assertRaises((TypeError, binascii.Error))
as context:
370 message_converter.convert_dictionary_to_ros_message(
'rospy_message_converter/Uint8ArrayTestMessage',
372 if type(context.exception) == TypeError:
373 error_msg = context.exception.args[0].args[0]
375 error_msg = context.exception.args[0]
376 self.assertIn(error_msg, [
'Incorrect padding',
'Non-base64 digit found'])
378 dictionary = {
'data': bytes(bytearray([1, 97, 97, 2, 3, 97, 4, 97]))}
381 with self.assertRaises(binascii.Error)
as context:
382 message_converter.convert_dictionary_to_ros_message(
'rospy_message_converter/Uint8ArrayTestMessage',
384 self.assertEqual(
'Non-base64 digit found', context.exception.args[0])
388 message = message_converter.convert_dictionary_to_ros_message(
'rospy_message_converter/Uint8ArrayTestMessage',
390 expected_message =
serialize_deserialize(Uint8ArrayTestMessage(data=bytes(bytearray([105, 166, 154]))))
391 self.assertEqual(message, expected_message)
394 from rospy_message_converter.msg
import Uint8Array3TestMessage
395 from base64
import b64encode
396 expected_message = Uint8Array3TestMessage(data=bytes(bytearray([97, 98, 99])))
397 dictionary = {
'data': b64encode(expected_message.data)}
398 message = message_converter.convert_dictionary_to_ros_message(
'rospy_message_converter/Uint8Array3TestMessage', dictionary)
400 self.assertEqual(message, expected_message)
403 from rospy_message_converter.msg
import Uint8Array3TestMessage
404 expected_message = Uint8Array3TestMessage(data=[97, 98, 99])
405 dictionary = {
'data': expected_message.data}
406 message = message_converter.convert_dictionary_to_ros_message(
'rospy_message_converter/Uint8Array3TestMessage', dictionary)
408 self.assertEqual(message, expected_message)
411 from std_msgs.msg
import Bool
412 expected_message = Bool(data =
True)
413 dictionary = {
'data': expected_message.data }
414 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Bool', dictionary)
416 self.assertEqual(message, expected_message)
419 from std_msgs.msg
import Byte
420 expected_message = Byte(data = 3)
421 dictionary = {
'data': expected_message.data }
422 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Byte', dictionary)
424 self.assertEqual(message, expected_message)
427 from std_msgs.msg
import Char
428 expected_message = Char(data = 99)
429 dictionary = {
'data': expected_message.data }
430 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Char', dictionary)
432 self.assertEqual(message, expected_message)
435 from std_msgs.msg
import Duration
436 duration = rospy.rostime.Duration(33, 25)
437 expected_message = Duration(data = duration)
440 'secs' : duration.secs,
441 'nsecs' : duration.nsecs
444 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Duration', dictionary)
446 self.assertEqual(message, expected_message)
449 from std_msgs.msg
import Empty
450 expected_message = Empty()
452 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Empty', dictionary)
454 self.assertEqual(message, expected_message)
457 dictionary = {
"additional_args":
"should raise value error"}
458 with self.assertRaises(ValueError)
as context:
459 message_converter.convert_dictionary_to_ros_message(
'std_msgs/Empty', dictionary)
460 self.assertEqual(
'''ROS message type "std_msgs/Empty" has no field named "additional_args"''',
461 context.exception.args[0])
464 from std_msgs.msg
import Empty
465 expected_message = Empty()
466 dictionary = {
"additional_args":
"should be ignored"}
467 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Empty', dictionary, strict_mode=
False)
469 self.assertEqual(message, expected_message)
472 from base64
import b64encode
473 expected_data = bytes(bytearray([97, 98, 99]))
474 dictionary = {
"arrays": [{
"data": b64encode(expected_data),
"additional_args":
"should raise value error"}]}
475 with self.assertRaises(ValueError)
as context:
476 message_converter.convert_dictionary_to_ros_message(
'rospy_message_converter/NestedUint8ArrayTestMessage',
479 'ROS message type "rospy_message_converter/Uint8ArrayTestMessage" has no field named "additional_args"',
480 context.exception.args[0])
483 from rospy_message_converter.msg
import NestedUint8ArrayTestMessage, Uint8ArrayTestMessage
484 from base64
import b64encode
485 expected_data = bytes(bytearray([97, 98, 99]))
486 expected_message = NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)])
487 dictionary = {
"arrays": [{
"data": b64encode(expected_data),
"additional_args":
"should be ignored"}]}
488 message = message_converter.convert_dictionary_to_ros_message(
489 'rospy_message_converter/NestedUint8ArrayTestMessage', dictionary, strict_mode=
False)
491 self.assertEqual(message, expected_message)
494 from std_msgs.msg
import Bool
495 expected_message = Bool(data=
False)
497 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Bool', dictionary)
499 self.assertEqual(message, expected_message)
503 with self.assertRaises(ValueError)
as context:
504 message_converter.convert_dictionary_to_ros_message(
'std_msgs/Bool', dictionary, check_missing_fields=
True)
505 self.assertEqual(
'''Missing fields "{'data': 'bool'}"''',
506 context.exception.args[0])
509 from rospy_message_converter.msg
import NestedUint8ArrayTestMessage, Uint8ArrayTestMessage
510 expected_message = NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=[])])
511 dictionary = {
"arrays": [{}]}
512 message = message_converter.convert_dictionary_to_ros_message(
513 'rospy_message_converter/NestedUint8ArrayTestMessage', dictionary)
515 self.assertEqual(message, expected_message)
518 dictionary = {
"arrays": [{}]}
519 with self.assertRaises(ValueError)
as context:
520 message_converter.convert_dictionary_to_ros_message(
'rospy_message_converter/NestedUint8ArrayTestMessage',
521 dictionary, check_missing_fields=
True)
522 self.assertEqual(
'''Missing fields "{'data': 'uint8[]'}"''',
523 context.exception.args[0])
526 dictionary = {
"data":
"should_be_a_bool"}
527 with self.assertRaises(TypeError)
as context:
528 message_converter.convert_dictionary_to_ros_message(
'std_msgs/Bool', dictionary)
529 self.assertTrue(
"Field 'data' has wrong type" in context.exception.args[0])
532 from std_msgs.msg
import Float32
533 expected_message = Float32(data = struct.unpack(
'<f', b
'\x7F\x7F\xFF\xFD')[0])
534 dictionary = {
'data': expected_message.data }
535 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Float32', dictionary)
537 self.assertEqual(message, expected_message)
540 from std_msgs.msg
import Float64
541 expected_message = Float64(data = struct.unpack(
'<d', b
'\x7F\xEF\xFF\xFF\xFF\xFF\xFF\xFD')[0])
542 dictionary = {
'data': expected_message.data }
543 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Float64', dictionary)
545 self.assertEqual(message, expected_message)
548 from std_msgs.msg
import Header
549 from time
import time
550 now_time = rospy.Time(time())
551 expected_message =
Header(
553 frame_id =
'my_frame',
557 'stamp': {
'secs': now_time.secs,
'nsecs': now_time.nsecs },
558 'frame_id' : expected_message.frame_id,
559 'seq': expected_message.seq
561 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Header', dictionary)
563 self.assertEqual(message, expected_message)
566 from std_msgs.msg
import Header
567 from time
import time
568 now_time = rospy.Time(time())
569 expected_message =
Header(
571 frame_id =
'my_frame',
575 'stamp': {
'secs': now_time.secs,
'nsecs': now_time.nsecs },
576 'frame_id' : expected_message.frame_id,
577 'seq': expected_message.seq
579 message = message_converter.convert_dictionary_to_ros_message(
'Header', dictionary)
581 self.assertEqual(message, expected_message)
584 from std_msgs.msg
import Int8
585 expected_message = Int8(data = -0x7F)
586 dictionary = {
'data': expected_message.data }
587 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Int8', dictionary)
589 self.assertEqual(message, expected_message)
592 from std_msgs.msg
import UInt8
593 expected_message = UInt8(data = 0xFF)
594 dictionary = {
'data': expected_message.data }
595 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/UInt8', dictionary)
597 self.assertEqual(message, expected_message)
600 from std_msgs.msg
import Int16
601 expected_message = Int16(data = -0x7FFF)
602 dictionary = {
'data': expected_message.data }
603 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Int16', dictionary)
605 self.assertEqual(message, expected_message)
608 from std_msgs.msg
import UInt16
609 expected_message = UInt16(data = 0xFFFF)
610 dictionary = {
'data': expected_message.data }
611 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/UInt16', dictionary)
613 self.assertEqual(message, expected_message)
616 from std_msgs.msg
import Int32
617 expected_message = Int32(data = -0x7FFFFFFF)
618 dictionary = {
'data': expected_message.data }
619 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Int32', dictionary)
621 self.assertEqual(message, expected_message)
624 from std_msgs.msg
import UInt32
625 expected_message = UInt32(data = 0xFFFFFFFF)
626 dictionary = {
'data': expected_message.data }
627 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/UInt32', dictionary)
629 self.assertEqual(message, expected_message)
632 from std_msgs.msg
import Int64
633 expected_message = Int64(data = -0x7FFFFFFFFFFFFFFF)
634 dictionary = {
'data': expected_message.data }
635 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Int64', dictionary)
637 self.assertEqual(message, expected_message)
640 from std_msgs.msg
import UInt64
641 expected_message = UInt64(data = 0xFFFFFFFFFFFFFFFF)
642 dictionary = {
'data': expected_message.data }
643 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/UInt64', dictionary)
645 self.assertEqual(message, expected_message)
648 from std_msgs.msg
import String
649 expected_message = String(data =
'Hello')
650 dictionary = {
'data': expected_message.data }
651 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/String', dictionary)
653 self.assertEqual(message, expected_message)
656 from std_msgs.msg
import String
657 expected_message = String(data =
u'Hello \u00dcnicode')
658 dictionary = {
'data': expected_message.data }
659 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/String', dictionary)
661 self.assertEqual(message.data,expected_message.data)
662 self.assertEqual(type(message.data),type(expected_message.data))
665 from std_msgs.msg
import Time
666 from time
import time
667 now_time = rospy.Time(time())
668 expected_message = Time(data=now_time)
671 'secs' : now_time.secs,
672 'nsecs' : now_time.nsecs
675 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Time', dictionary)
677 self.assertEqual(message, expected_message)
683 with self.assertRaises(ROSInitException)
as context:
684 message_converter.convert_dictionary_to_ros_message(
'std_msgs/Time', dictionary)
685 self.assertEqual(
'time is not initialized. Have you called init_node()?', context.exception.args[0])
688 from std_msgs.msg
import Float64MultiArray, MultiArrayLayout, MultiArrayDimension
689 expected_message = Float64MultiArray(
690 layout = MultiArrayLayout(
692 MultiArrayDimension(label =
'Dimension1', size = 12, stride = 7),
693 MultiArrayDimension(label =
'Dimension2', size = 90, stride = 8)
697 data = [1.1, 2.2, 3.3]
703 'label' : expected_message.layout.dim[0].label,
704 'size' : expected_message.layout.dim[0].size,
705 'stride' : expected_message.layout.dim[0].stride
708 'label' : expected_message.layout.dim[1].label,
709 'size' : expected_message.layout.dim[1].size,
710 'stride' : expected_message.layout.dim[1].stride
713 'data_offset': expected_message.layout.data_offset
715 'data': expected_message.data
717 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Float64MultiArray', dictionary)
719 self.assertEqual(message, expected_message)
722 self.assertRaises(ValueError,
723 message_converter.convert_dictionary_to_ros_message,
725 {
'invalid_field': 1})
728 from std_srvs.srv
import EmptyRequest, EmptyResponse
729 expected_req = EmptyRequest()
730 expected_res = EmptyResponse()
733 message = message_converter.convert_dictionary_to_ros_message(
'std_srvs/Empty', dictionary_req,
736 self.assertEqual(message, expected_req)
737 message = message_converter.convert_dictionary_to_ros_message(
'std_srvs/Empty', dictionary_res,
740 self.assertEqual(message, expected_res)
743 from rospy_message_converter.srv
import NestedUint8ArrayTestServiceRequest, NestedUint8ArrayTestServiceResponse
744 from rospy_message_converter.msg
import NestedUint8ArrayTestMessage, Uint8ArrayTestMessage
745 from base64
import b64encode
746 expected_data = bytes(bytearray([97, 98, 99]))
747 expected_req = NestedUint8ArrayTestServiceRequest(
748 input=NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)]))
750 expected_res = NestedUint8ArrayTestServiceResponse(
751 output=NestedUint8ArrayTestMessage(arrays=[Uint8ArrayTestMessage(data=expected_data)]))
754 dictionary_req = {
"input": {
"arrays": [{
"data": b64encode(expected_data)}]}}
755 dictionary_res = {
"output": {
"arrays": [{
"data": b64encode(expected_data)}]}}
756 message = message_converter.convert_dictionary_to_ros_message(
757 'rospy_message_converter/NestedUint8ArrayTestService', dictionary_req,
'request')
758 self.assertEqual(message, expected_req)
759 message = message_converter.convert_dictionary_to_ros_message(
760 'rospy_message_converter/NestedUint8ArrayTestService', dictionary_res,
'response')
761 self.assertEqual(message, expected_res)
764 from std_srvs.srv
import SetBoolRequest, SetBoolResponse
765 expected_req = SetBoolRequest(data=
True)
766 expected_res = SetBoolResponse(success=
True, message=
'Success!')
767 dictionary_req = {
'data':
True }
768 dictionary_res = {
'success':
True,
'message':
'Success!' }
769 message = message_converter.convert_dictionary_to_ros_message(
'std_srvs/SetBool', dictionary_req,
772 self.assertEqual(message, expected_req)
773 message = message_converter.convert_dictionary_to_ros_message(
'std_srvs/SetBool', dictionary_res,
776 self.assertEqual(message, expected_res)
779 from std_srvs.srv
import TriggerRequest, TriggerResponse
780 expected_req = TriggerRequest()
781 expected_res = TriggerResponse(success=
True, message=
'Success!')
783 dictionary_res = {
'success':
True,
'message':
'Success!' }
784 message = message_converter.convert_dictionary_to_ros_message(
'std_srvs/Trigger', dictionary_req,
787 self.assertEqual(message, expected_req)
788 message = message_converter.convert_dictionary_to_ros_message(
'std_srvs/Trigger', dictionary_res,
791 self.assertEqual(message, expected_res)
794 with self.assertRaises(ValueError)
as context:
795 message_converter.convert_dictionary_to_ros_message(
'std_msgs/Empty', {}, kind=
'invalid')
796 self.assertEqual(
'Unknown kind "invalid".', context.exception.args[0])
799 from std_msgs.msg
import Byte, Char, Float32, Float64, Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64
800 numpy_numeric_types = [np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32, np.uint64, np.float32, np.float64]
801 min_values = [np.iinfo(t).min
for t
in [np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32, np.uint64]] \
802 + [np.finfo(t).min
for t
in [np.float32, np.float64]]
803 max_values = [np.iinfo(t).max
for t
in [np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32, np.uint64]] \
804 + [np.finfo(t).max
for t
in [np.float32, np.float64]]
805 numeric_limits = {num_type: (min_val, max_val)
for (num_type, min_val, max_val)
in zip(numpy_numeric_types, min_values, max_values)}
806 ros_to_numpy_type_map = {
807 Float32 : [np.float32, np.int8, np.int16, np.uint8, np.uint16],
808 Float64 : [np.float32, np.float64, np.int8, np.int16, np.int32, np.uint8, np.uint16, np.uint32],
810 Int16 : [np.int8, np.int16, np.uint8],
811 Int32 : [np.int8, np.int16, np.int32, np.uint8, np.uint16],
812 Int64 : [np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32],
814 UInt16 : [np.uint8, np.uint16],
815 UInt32 : [np.uint8, np.uint16, np.uint32],
816 UInt64 : [np.uint8, np.uint16, np.uint32, np.uint64],
820 for ros_type, valid_numpy_types
in ros_to_numpy_type_map.items():
821 for numpy_type
in valid_numpy_types:
822 for value
in numeric_limits[numpy_type]:
823 expected_message = ros_type(data=numpy_type(value))
825 'data': numpy_type(value)
827 message = message_converter.convert_dictionary_to_ros_message(expected_message._type, dictionary)
829 self.assertEqual(message, expected_message)
831 for wrong_numpy_type
in [t
for t
in numpy_numeric_types
if t
not in valid_numpy_types]:
832 for value
in numeric_limits[wrong_numpy_type]:
833 with self.assertRaises(TypeError)
as context:
834 expected_message = ros_type(data=wrong_numpy_type(value))
836 'data': wrong_numpy_type(value)
838 message_converter.convert_dictionary_to_ros_message(expected_message._type, dictionary)
839 self.assertTrue(
"Field 'data' has wrong type" in context.exception.args[0])
844 Serialize and then deserialize a message. This simulates sending a message 845 between ROS nodes and makes sure that the ROS messages being tested are 846 actually serializable, and are in the same format as they would be received 847 over the network. In rospy, it is possible to assign an illegal data type 848 to a message field (for example, `message = String(data=42)`), but trying 849 to publish this message will throw `SerializationError: field data must be 850 of type str`. This method will expose such bugs. 852 from io
import BytesIO
854 message.serialize(buff)
855 result = message.__class__()
856 result.deserialize(buff.getvalue())
860 PKG =
'rospy_message_converter' 861 NAME =
'test_message_converter' 862 if __name__ ==
'__main__':
864 rosunit.unitrun(PKG, NAME, TestMessageConverter)
def test_dictionary_with_numpy_conversions(self)
def test_ros_message_with_float32(self)
def test_ros_message_with_child_message(self)
def test_dictionary_with_empty_additional_args_strict_mode(self)
def test_ros_message_with_float64(self)
def test_dictionary_with_missing_field_checked(self)
def test_dictionary_with_missing_field_unchecked(self)
def test_dictionary_with_invalid_kind(self)
def test_ros_message_with_3uint8_array_binary_array_as_array(self)
def test_ros_message_with_time(self)
def test_dictionary_with_uint8_array_list(self)
def test_dictionary_with_3uint8_array_bytes(self)
def test_ros_message_with_uint64(self)
def test_dictionary_with_nested_additional_args_forgiving(self)
def serialize_deserialize(message)
def test_dictionary_with_uint8_array_bytes_unencoded(self)
def test_ros_message_with_char(self)
def test_dictionary_with_unicode(self)
def test_dictionary_with_empty_service(self)
def test_dictionary_with_trigger_service(self)
def test_dictionary_with_float64(self)
def test_ros_message_with_uint8_array(self)
def test_dictionary_with_nested_additional_args_strict_mode(self)
def test_dictionary_with_byte(self)
def test_dictionary_with_int8(self)
def test_dictionary_with_child_message(self)
def test_dictionary_with_uint8_array_bytes(self)
def test_dictionary_with_string(self)
def test_dictionary_with_int64(self)
def test_ros_message_with_unicode(self)
def test_dictionary_with_3uint8_array_list(self)
def test_ros_message_with_array(self)
def test_dictionary_with_uint8_array_list_invalid(self)
def test_dictionary_with_uint64(self)
def test_dictionary_with_float32(self)
def test_dictionary_with_nested_missing_field_checked(self)
def test_ros_message_with_string(self)
def test_ros_message_with_uint32(self)
def test_dictionary_with_uint32(self)
def test_ros_message_with_byte(self)
def test_dictionary_with_invalid_message_fields(self)
def test_ros_message_with_header(self)
def test_ros_message_with_3uint8_array(self)
def test_ros_message_with_int32(self)
def test_ros_message_with_int64(self)
def test_ros_message_with_uint8(self)
def test_dictionary_with_uint16(self)
def test_dictionary_with_time(self)
def test_dictionary_with_nested_missing_field_unchecked(self)
def test_dictionary_with_int16(self)
def test_ros_message_with_int8(self)
def test_ros_message_with_duration(self)
def test_dictionary_with_empty(self)
def test_dictionary_with_nested_service(self)
def test_dictionary_with_char(self)
def test_dictionary_with_bool(self)
def test_dictionary_with_header_with_no_prefix(self)
def test_dictionary_with_duration(self)
def test_dictionary_with_int32(self)
def test_ros_message_with_nested_uint8_array_binary_array_as_array(self)
def test_dictionary_with_header(self)
def test_ros_message_with_int16(self)
def test_dictionary_with_uint8(self)
def test_ros_message_with_empty(self)
def test_dictionary_with_setbool_service(self)
def test_dictionary_with_empty_additional_args_forgiving(self)
def test_ros_message_with_empty_service(self)
def test_ros_message_with_uint16(self)
def test_dictionary_with_time_now(self)
def test_dictionary_with_wrong_type(self)
def test_ros_message_with_nested_uint8_array(self)
def test_dictionary_with_array(self)
def test_ros_message_with_bool(self)
def test_ros_message_with_nested_service(self)