5 from rospy_message_converter
import message_converter
10 from rospy_message_converter.msg
import TestArray
11 expected_dictionary = {
12 'data': [1.1, 2.2, 3.3]
14 message = TestArray(data = expected_dictionary[
'data'])
16 dictionary = message_converter.convert_ros_message_to_dictionary(message)
17 self.assertEqual(dictionary, expected_dictionary)
20 from std_msgs.msg
import Bool
21 expected_dictionary = {
'data':
True }
22 message = Bool(data=expected_dictionary[
'data'])
24 dictionary = message_converter.convert_ros_message_to_dictionary(message)
25 self.assertEqual(dictionary, expected_dictionary)
28 from std_msgs.msg
import Byte
29 expected_dictionary = {
'data': 5 }
30 message = Byte(data=expected_dictionary[
'data'])
32 dictionary = message_converter.convert_ros_message_to_dictionary(message)
33 self.assertEqual(dictionary, expected_dictionary)
36 from std_msgs.msg
import Char
37 expected_dictionary = {
'data': 99 }
38 message = Char(data=expected_dictionary[
'data'])
40 dictionary = message_converter.convert_ros_message_to_dictionary(message)
41 self.assertEqual(dictionary, expected_dictionary)
44 from std_msgs.msg
import Duration
45 duration = rospy.rostime.Duration(33, 25)
46 expected_dictionary = {
48 'secs' : duration.secs,
49 'nsecs' : duration.nsecs
52 message = Duration(data=duration)
54 dictionary = message_converter.convert_ros_message_to_dictionary(message)
55 self.assertEqual(dictionary, expected_dictionary)
58 from std_msgs.msg
import Empty
59 expected_dictionary = {}
62 dictionary = message_converter.convert_ros_message_to_dictionary(message)
63 self.assertEqual(dictionary, expected_dictionary)
66 from std_msgs.msg
import Float32
67 expected_dictionary = {
'data': struct.unpack(
'<f',
'\x7F\x7F\xFF\xFD')[0] }
68 message = Float32(data=expected_dictionary[
'data'])
70 dictionary = message_converter.convert_ros_message_to_dictionary(message)
71 self.assertEqual(dictionary, expected_dictionary)
74 from std_msgs.msg
import Float64
75 expected_dictionary = {
'data': struct.unpack(
'<d',
'\x7F\xEF\xFF\xFF\xFF\xFF\xFF\xFD')[0] }
76 message = Float64(data=expected_dictionary[
'data'])
78 dictionary = message_converter.convert_ros_message_to_dictionary(message)
79 self.assertEqual(dictionary, expected_dictionary)
82 from std_msgs.msg
import Header
84 now_time = rospy.Time(time())
85 expected_dictionary = {
86 'stamp': {
'secs': now_time.secs,
'nsecs': now_time.nsecs },
87 'frame_id' :
'my_frame',
92 frame_id = expected_dictionary[
'frame_id'],
93 seq = expected_dictionary[
'seq']
96 dictionary = message_converter.convert_ros_message_to_dictionary(message)
97 self.assertEqual(dictionary, expected_dictionary)
100 from std_msgs.msg
import Int8
101 expected_dictionary = {
'data': -0x7F }
102 message = Int8(data=expected_dictionary[
'data'])
104 dictionary = message_converter.convert_ros_message_to_dictionary(message)
105 self.assertEqual(dictionary, expected_dictionary)
108 from std_msgs.msg
import UInt8
109 expected_dictionary = {
'data': 0xFF }
110 message = UInt8(data=expected_dictionary[
'data'])
112 dictionary = message_converter.convert_ros_message_to_dictionary(message)
113 self.assertEqual(dictionary, expected_dictionary)
116 from rospy_message_converter.msg
import Uint8ArrayTestMessage
117 from base64
import standard_b64encode
118 expected_data =
"".join([chr(i)
for i
in [97, 98, 99, 100]])
119 message = Uint8ArrayTestMessage(data=expected_data)
121 dictionary = message_converter.convert_ros_message_to_dictionary(message)
122 self.assertEqual(dictionary[
"data"], standard_b64encode(expected_data))
125 from rospy_message_converter.msg
import Uint8Array3TestMessage
126 from base64
import standard_b64encode
127 expected_data =
"".join([chr(i)
for i
in [97, 98, 99]])
128 message = Uint8Array3TestMessage(data=expected_data)
130 dictionary = message_converter.convert_ros_message_to_dictionary(message)
131 self.assertEqual(dictionary[
"data"], standard_b64encode(expected_data))
134 from std_msgs.msg
import Int16
135 expected_dictionary = {
'data': -0x7FFF }
136 message = Int16(data=expected_dictionary[
'data'])
138 dictionary = message_converter.convert_ros_message_to_dictionary(message)
139 self.assertEqual(dictionary, expected_dictionary)
142 from std_msgs.msg
import UInt16
143 expected_dictionary = {
'data': 0xFFFF }
144 message = UInt16(data=expected_dictionary[
'data'])
146 dictionary = message_converter.convert_ros_message_to_dictionary(message)
147 self.assertEqual(dictionary, expected_dictionary)
150 from std_msgs.msg
import Int32
151 expected_dictionary = {
'data': -0x7FFFFFFF }
152 message = Int32(data=expected_dictionary[
'data'])
154 dictionary = message_converter.convert_ros_message_to_dictionary(message)
155 self.assertEqual(dictionary, expected_dictionary)
158 from std_msgs.msg
import UInt32
159 expected_dictionary = {
'data': 0xFFFFFFFF }
160 message = UInt32(data=expected_dictionary[
'data'])
162 dictionary = message_converter.convert_ros_message_to_dictionary(message)
163 self.assertEqual(dictionary, expected_dictionary)
166 from std_msgs.msg
import Int64
167 expected_dictionary = {
'data': -0x7FFFFFFFFFFFFFFF }
168 message = Int64(data=expected_dictionary[
'data'])
170 dictionary = message_converter.convert_ros_message_to_dictionary(message)
171 self.assertEqual(dictionary, expected_dictionary)
174 from std_msgs.msg
import UInt64
175 expected_dictionary = {
'data': 0xFFFFFFFFFFFFFFFF }
176 message = UInt64(data=expected_dictionary[
'data'])
178 dictionary = message_converter.convert_ros_message_to_dictionary(message)
179 self.assertEqual(dictionary, expected_dictionary)
182 from std_msgs.msg
import String
183 expected_dictionary = {
'data':
'Hello' }
184 message = String(data=expected_dictionary[
'data'])
186 dictionary = message_converter.convert_ros_message_to_dictionary(message)
187 self.assertEqual(dictionary, expected_dictionary)
190 from std_msgs.msg
import Time
191 from time
import time
192 now_time = rospy.Time(time())
193 expected_dictionary = {
194 'data': {
'secs': now_time.secs,
'nsecs': now_time.nsecs }
196 message = Time(data=now_time)
198 dictionary = message_converter.convert_ros_message_to_dictionary(message)
199 self.assertEqual(dictionary, expected_dictionary)
202 from std_msgs.msg
import Float64MultiArray, MultiArrayLayout, MultiArrayDimension
203 expected_dictionary = {
206 {
'label':
'Dimension1',
'size': 12,
'stride': 7 },
207 {
'label':
'Dimension2',
'size': 24,
'stride': 14 }
211 'data': [1.1, 2.2, 3.3]
213 dimension1 = MultiArrayDimension(
214 label = expected_dictionary[
'layout'][
'dim'][0][
'label'],
215 size = expected_dictionary[
'layout'][
'dim'][0][
'size'],
216 stride = expected_dictionary[
'layout'][
'dim'][0][
'stride']
218 dimension2 = MultiArrayDimension(
219 label = expected_dictionary[
'layout'][
'dim'][1][
'label'],
220 size = expected_dictionary[
'layout'][
'dim'][1][
'size'],
221 stride = expected_dictionary[
'layout'][
'dim'][1][
'stride']
223 layout = MultiArrayLayout(
224 dim = [dimension1, dimension2],
225 data_offset = expected_dictionary[
'layout'][
'data_offset']
227 message = Float64MultiArray(
229 data = expected_dictionary[
'data']
232 dictionary = message_converter.convert_ros_message_to_dictionary(message)
233 self.assertEqual(dictionary, expected_dictionary)
236 from rospy_message_converter.msg
import TestArray
237 expected_message = TestArray(data = [1.1, 2.2, 3.3, 4.4])
238 dictionary = {
'data': expected_message.data }
239 message = message_converter.convert_dictionary_to_ros_message(
'rospy_message_converter/TestArray', dictionary)
241 self.assertEqual(message, expected_message)
244 from std_msgs.msg
import Bool
245 expected_message = Bool(data =
True)
246 dictionary = {
'data': expected_message.data }
247 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Bool', dictionary)
249 self.assertEqual(message, expected_message)
252 from std_msgs.msg
import Byte
253 expected_message = Byte(data = 3)
254 dictionary = {
'data': expected_message.data }
255 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Byte', dictionary)
257 self.assertEqual(message, expected_message)
260 from std_msgs.msg
import Char
261 expected_message = Char(data = 99)
262 dictionary = {
'data': expected_message.data }
263 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Char', dictionary)
265 self.assertEqual(message, expected_message)
268 from std_msgs.msg
import Duration
269 duration = rospy.rostime.Duration(33, 25)
270 expected_message = Duration(data = duration)
273 'secs' : duration.secs,
274 'nsecs' : duration.nsecs
277 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Duration', dictionary)
279 self.assertEqual(message, expected_message)
282 from std_msgs.msg
import Empty
283 expected_message = Empty()
285 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Empty', dictionary)
287 self.assertEqual(message, expected_message)
290 from std_msgs.msg
import Float32
291 expected_message = Float32(data = struct.unpack(
'<f',
'\x7F\x7F\xFF\xFD')[0])
292 dictionary = {
'data': expected_message.data }
293 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Float32', dictionary)
295 self.assertEqual(message, expected_message)
298 from std_msgs.msg
import Float64
299 expected_message = Float64(data = struct.unpack(
'<d',
'\x7F\xEF\xFF\xFF\xFF\xFF\xFF\xFD')[0])
300 dictionary = {
'data': expected_message.data }
301 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Float64', dictionary)
303 self.assertEqual(message, expected_message)
306 from std_msgs.msg
import Header
307 from time
import time
308 now_time = rospy.Time(time())
309 expected_message =
Header(
311 frame_id =
'my_frame',
315 'stamp': {
'secs': now_time.secs,
'nsecs': now_time.nsecs },
316 'frame_id' : expected_message.frame_id,
317 'seq': expected_message.seq
319 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Header', dictionary)
321 self.assertEqual(message, expected_message)
324 from std_msgs.msg
import Header
325 from time
import time
326 now_time = rospy.Time(time())
327 expected_message =
Header(
329 frame_id =
'my_frame',
333 'stamp': {
'secs': now_time.secs,
'nsecs': now_time.nsecs },
334 'frame_id' : expected_message.frame_id,
335 'seq': expected_message.seq
337 message = message_converter.convert_dictionary_to_ros_message(
'Header', dictionary)
339 self.assertEqual(message, expected_message)
342 from std_msgs.msg
import Int8
343 expected_message = Int8(data = -0x7F)
344 dictionary = {
'data': expected_message.data }
345 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Int8', dictionary)
347 self.assertEqual(message, expected_message)
350 from std_msgs.msg
import UInt8
351 expected_message = UInt8(data = 0xFF)
352 dictionary = {
'data': expected_message.data }
353 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/UInt8', dictionary)
355 self.assertEqual(message, expected_message)
358 from std_msgs.msg
import Int16
359 expected_message = Int16(data = -0x7FFF)
360 dictionary = {
'data': expected_message.data }
361 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Int16', dictionary)
363 self.assertEqual(message, expected_message)
366 from std_msgs.msg
import UInt16
367 expected_message = UInt16(data = 0xFFFF)
368 dictionary = {
'data': expected_message.data }
369 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/UInt16', dictionary)
371 self.assertEqual(message, expected_message)
374 from std_msgs.msg
import Int32
375 expected_message = Int32(data = -0x7FFFFFFF)
376 dictionary = {
'data': expected_message.data }
377 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Int32', dictionary)
379 self.assertEqual(message, expected_message)
382 from std_msgs.msg
import UInt32
383 expected_message = UInt32(data = 0xFFFFFFFF)
384 dictionary = {
'data': expected_message.data }
385 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/UInt32', dictionary)
387 self.assertEqual(message, expected_message)
390 from std_msgs.msg
import Int64
391 expected_message = Int64(data = -0x7FFFFFFFFFFFFFFF)
392 dictionary = {
'data': expected_message.data }
393 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Int64', dictionary)
395 self.assertEqual(message, expected_message)
398 from std_msgs.msg
import UInt64
399 expected_message = UInt64(data = 0xFFFFFFFFFFFFFFFF)
400 dictionary = {
'data': expected_message.data }
401 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/UInt64', dictionary)
403 self.assertEqual(message, expected_message)
406 from std_msgs.msg
import String
407 expected_message = String(data =
'Hello')
408 dictionary = {
'data': expected_message.data }
409 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/String', dictionary)
411 self.assertEqual(message, expected_message)
414 from std_msgs.msg
import String
415 expected_message = String(data =
'Hello')
416 dictionary = {
'data': unicode(expected_message.data) }
417 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/String', dictionary)
419 self.assertEqual(message.data,expected_message.data)
420 self.assertEqual(type(message.data),type(expected_message.data))
423 from std_msgs.msg
import Time
424 from time
import time
425 now_time = rospy.Time(time())
426 expected_message = Time(data=now_time)
429 'secs' : now_time.secs,
430 'nsecs' : now_time.nsecs
433 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Time', dictionary)
435 self.assertEqual(message, expected_message)
438 from std_msgs.msg
import Float64MultiArray, MultiArrayLayout, MultiArrayDimension
439 expected_message = Float64MultiArray(
440 layout = MultiArrayLayout(
442 MultiArrayDimension(label =
'Dimension1', size = 12, stride = 7),
443 MultiArrayDimension(label =
'Dimension2', size = 90, stride = 8)
447 data = [1.1, 2.2, 3.3]
453 'label' : expected_message.layout.dim[0].label,
454 'size' : expected_message.layout.dim[0].size,
455 'stride' : expected_message.layout.dim[0].stride
458 'label' : expected_message.layout.dim[1].label,
459 'size' : expected_message.layout.dim[1].size,
460 'stride' : expected_message.layout.dim[1].stride
463 'data_offset': expected_message.layout.data_offset
465 'data': expected_message.data
467 message = message_converter.convert_dictionary_to_ros_message(
'std_msgs/Float64MultiArray', dictionary)
469 self.assertEqual(message, expected_message)
472 self.assertRaises(ValueError,
473 message_converter.convert_dictionary_to_ros_message,
475 {
'invalid_field': 1})
478 from std_srvs.srv
import Empty, EmptyRequest, EmptyResponse
479 expected_req = EmptyRequest()
480 expected_res = EmptyResponse()
483 message = message_converter.convert_dictionary_to_ros_message(
'std_srvs/Empty', dictionary_req,
486 self.assertEqual(message, expected_req)
487 message = message_converter.convert_dictionary_to_ros_message(
'std_srvs/Empty', dictionary_res,
490 self.assertEqual(message, expected_res)
493 from std_srvs.srv
import SetBool, SetBoolRequest, SetBoolResponse
494 expected_req = SetBoolRequest(data=
True)
495 expected_res = SetBoolResponse(success=
True, message=
'Success!')
496 dictionary_req = {
'data':
True }
497 dictionary_res = {
'success':
True,
'message':
'Success!' }
498 message = message_converter.convert_dictionary_to_ros_message(
'std_srvs/SetBool', dictionary_req,
501 self.assertEqual(message, expected_req)
502 message = message_converter.convert_dictionary_to_ros_message(
'std_srvs/SetBool', dictionary_res,
505 self.assertEqual(message, expected_res)
508 from std_srvs.srv
import Trigger, TriggerRequest, TriggerResponse
509 expected_req = TriggerRequest()
510 expected_res = TriggerResponse(success=
True, message=
'Success!')
512 dictionary_res = {
'success':
True,
'message':
'Success!' }
513 message = message_converter.convert_dictionary_to_ros_message(
'std_srvs/Trigger', dictionary_req,
516 self.assertEqual(message, expected_req)
517 message = message_converter.convert_dictionary_to_ros_message(
'std_srvs/Trigger', dictionary_res,
520 self.assertEqual(message, expected_res)
525 Serialize and then deserialize a message. This simulates sending a message 526 between ROS nodes and makes sure that the ROS messages being tested are 527 actually serializable, and are in the same format as they would be received 528 over the network. In rospy, it is possible to assign an illegal data type 529 to a message field (for example, `message = String(data=42)`), but trying 530 to publish this message will throw `SerializationError: field data must be 531 of type str`. This method will expose such bugs. 533 from StringIO
import StringIO
535 message.serialize(buff)
536 result = message.__class__()
537 result.deserialize(buff.getvalue())
541 PKG =
'rospy_message_converter' 542 NAME =
'test_message_converter' 543 if __name__ ==
'__main__':
545 rosunit.unitrun(PKG, NAME, TestMessageConverter)
def test_ros_message_with_float32(self)
def test_ros_message_with_child_message(self)
def test_ros_message_with_float64(self)
def test_ros_message_with_time(self)
def test_ros_message_with_uint64(self)
def serialize_deserialize(message)
def test_ros_message_with_char(self)
def test_dictionary_with_unicode(self)
def test_dictionary_with_empty_service(self)
def test_dictionary_with_trigger_service(self)
def test_dictionary_with_float64(self)
def test_ros_message_with_uint8_array(self)
def test_dictionary_with_byte(self)
def test_dictionary_with_int8(self)
def test_dictionary_with_child_message(self)
def test_dictionary_with_string(self)
def test_dictionary_with_int64(self)
def test_ros_message_with_array(self)
def test_dictionary_with_uint64(self)
def test_dictionary_with_float32(self)
def test_ros_message_with_string(self)
def test_ros_message_with_uint32(self)
def test_dictionary_with_uint32(self)
def test_ros_message_with_byte(self)
def test_dictionary_with_invalid_message_fields(self)
def test_ros_message_with_header(self)
def test_ros_message_with_3uint8_array(self)
def test_ros_message_with_int32(self)
def test_ros_message_with_int64(self)
def test_ros_message_with_uint8(self)
def test_dictionary_with_uint16(self)
def test_dictionary_with_time(self)
def test_dictionary_with_int16(self)
def test_ros_message_with_int8(self)
def test_ros_message_with_duration(self)
def test_dictionary_with_empty(self)
def test_dictionary_with_char(self)
def test_dictionary_with_bool(self)
def test_dictionary_with_header_with_no_prefix(self)
def test_dictionary_with_duration(self)
def test_dictionary_with_int32(self)
def test_dictionary_with_header(self)
def test_ros_message_with_int16(self)
def test_dictionary_with_uint8(self)
def test_ros_message_with_empty(self)
def test_dictionary_with_setbool_service(self)
def test_ros_message_with_uint16(self)
def test_dictionary_with_array(self)
def test_ros_message_with_bool(self)