test_json_message_converter.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import unittest
3 import rospy
4 from rospy_message_converter import json_message_converter
5 
6 class TestJsonMessageConverter(unittest.TestCase):
7 
9  from std_msgs.msg import String
10  expected_json = '{"data": "Hello"}'
11  message = String(data = 'Hello')
12  message = serialize_deserialize(message)
13  returned_json = json_message_converter.convert_ros_message_to_json(message)
14  self.assertEqual(returned_json, expected_json)
15 
17  from std_msgs.msg import String
18  expected_json = '{"data": "Hello \\u00dcnicode"}'
19  message = String(data = u'Hello \u00dcnicode')
20  message = serialize_deserialize(message)
21  returned_json = json_message_converter.convert_ros_message_to_json(message)
22  self.assertEqual(returned_json, expected_json)
23 
25  from std_msgs.msg import Header
26  from time import time
27  now_time = rospy.Time(time())
28  expected_json1 = '{{"stamp": {{"secs": {0}, "nsecs": {1}}}, "frame_id": "my_frame", "seq": 3}}'\
29  .format(now_time.secs, now_time.nsecs)
30  expected_json2 = '{{"seq": 3, "stamp": {{"secs": {0}, "nsecs": {1}}}, "frame_id": "my_frame"}}'\
31  .format(now_time.secs, now_time.nsecs)
32  expected_json3 = '{{"frame_id": "my_frame", "seq": 3, "stamp": {{"secs": {0}, "nsecs": {1}}}}}'\
33  .format(now_time.secs, now_time.nsecs)
34  message = Header(stamp = now_time, frame_id = 'my_frame', seq = 3)
35  message = serialize_deserialize(message)
36  returned_json = json_message_converter.convert_ros_message_to_json(message)
37  self.assertTrue(returned_json == expected_json1 or
38  returned_json == expected_json2 or
39  returned_json == expected_json3)
40 
42  from rospy_message_converter.msg import Uint8ArrayTestMessage
43  input_data = [97, 98, 99, 100]
44  expected_json = '{"data": "YWJjZA=="}' # base64.b64encode("abcd") is "YWJjZA=="
45  message = Uint8ArrayTestMessage(data=input_data)
46  message = serialize_deserialize(message)
47  returned_json = json_message_converter.convert_ros_message_to_json(message)
48  self.assertEqual(returned_json, expected_json)
49 
51  from rospy_message_converter.msg import Uint8Array3TestMessage
52  input_data = [97, 98, 99]
53  expected_json = '{"data": "YWJj"}' # base64.b64encode("abc") is "YWJj"
54  message = Uint8Array3TestMessage(data=input_data)
55  message = serialize_deserialize(message)
56  returned_json = json_message_converter.convert_ros_message_to_json(message)
57  self.assertEqual(returned_json, expected_json)
58 
60  from std_msgs.msg import String
61  expected_message = String(data = 'Hello')
62  json_str = '{"data": "Hello"}'
63  message = json_message_converter.convert_json_to_ros_message('std_msgs/String', json_str)
64  expected_message = serialize_deserialize(expected_message)
65  self.assertEqual(message, expected_message)
66 
68  from std_msgs.msg import String
69  expected_message = String(data = u'Hello \u00dcnicode')
70  json_str = '{"data": "Hello \\u00dcnicode"}'
71  message = json_message_converter.convert_json_to_ros_message('std_msgs/String', json_str)
72  expected_message = serialize_deserialize(expected_message)
73  self.assertEqual(message, expected_message)
74 
76  from std_msgs.msg import Header
77  from time import time
78  now_time = rospy.Time(time())
79  expected_message = Header(
80  stamp = now_time,
81  frame_id = 'my_frame',
82  seq = 12
83  )
84  json_str = '{{"stamp": {{"secs": {0}, "nsecs": {1}}}, "frame_id": "my_frame", "seq": 12}}'\
85  .format(now_time.secs, now_time.nsecs)
86  message = json_message_converter.convert_json_to_ros_message('std_msgs/Header', json_str)
87  expected_message = serialize_deserialize(expected_message)
88  self.assertEqual(message, expected_message)
89 
91  self.assertRaises(ValueError,
92  json_message_converter.convert_json_to_ros_message,
93  'std_msgs/String',
94  '{"not_data": "Hello"}')
95 
96 
97 def serialize_deserialize(message):
98  """
99  Serialize and then deserialize a message. This simulates sending a message
100  between ROS nodes and makes sure that the ROS messages being tested are
101  actually serializable, and are in the same format as they would be received
102  over the network. In rospy, it is possible to assign an illegal data type
103  to a message field (for example, `message = String(data=42)`), but trying
104  to publish this message will throw `SerializationError: field data must be
105  of type str`. This method will expose such bugs.
106  """
107  from io import BytesIO
108  buff = BytesIO()
109  message.serialize(buff)
110  result = message.__class__() # create new instance of same class as message
111  result.deserialize(buff.getvalue())
112  return result
113 
114 
115 PKG = 'rospy_message_converter'
116 NAME = 'test_json_message_converter'
117 if __name__ == '__main__':
118  import rosunit
119  rosunit.unitrun(PKG, NAME, TestJsonMessageConverter)


rospy_message_converter
Author(s): Brandon Alexander
autogenerated on Tue Mar 2 2021 03:04:27