4 from rospy_message_converter
import json_message_converter
9 from std_msgs.msg
import String
10 expected_json =
'{"data": "Hello"}' 11 message = String(data =
'Hello')
13 returned_json = json_message_converter.convert_ros_message_to_json(message)
14 self.assertEqual(returned_json, expected_json)
17 from std_msgs.msg
import String
18 expected_json =
'{"data": "Hello \\u00dcnicode"}' 19 message = String(data =
u'Hello \u00dcnicode')
21 returned_json = json_message_converter.convert_ros_message_to_json(message)
22 self.assertEqual(returned_json, expected_json)
25 from std_msgs.msg
import Header
27 now_time = rospy.Time(time())
28 expected_json1 =
'{{"stamp": {{"secs": {0}, "nsecs": {1}}}, "frame_id": "my_frame", "seq": 3}}'\
29 .format(now_time.secs, now_time.nsecs)
30 expected_json2 =
'{{"seq": 3, "stamp": {{"secs": {0}, "nsecs": {1}}}, "frame_id": "my_frame"}}'\
31 .format(now_time.secs, now_time.nsecs)
32 expected_json3 =
'{{"frame_id": "my_frame", "seq": 3, "stamp": {{"secs": {0}, "nsecs": {1}}}}}'\
33 .format(now_time.secs, now_time.nsecs)
34 message =
Header(stamp = now_time, frame_id =
'my_frame', seq = 3)
36 returned_json = json_message_converter.convert_ros_message_to_json(message)
37 self.assertTrue(returned_json == expected_json1
or 38 returned_json == expected_json2
or 39 returned_json == expected_json3)
42 from rospy_message_converter.msg
import Uint8ArrayTestMessage
43 input_data = [97, 98, 99, 100]
44 expected_json =
'{"data": "YWJjZA=="}' 45 message = Uint8ArrayTestMessage(data=input_data)
47 returned_json = json_message_converter.convert_ros_message_to_json(message)
48 self.assertEqual(returned_json, expected_json)
51 from rospy_message_converter.msg
import Uint8Array3TestMessage
52 input_data = [97, 98, 99]
53 expected_json =
'{"data": "YWJj"}' 54 message = Uint8Array3TestMessage(data=input_data)
56 returned_json = json_message_converter.convert_ros_message_to_json(message)
57 self.assertEqual(returned_json, expected_json)
60 from std_msgs.msg
import String
61 expected_message = String(data =
'Hello')
62 json_str =
'{"data": "Hello"}' 63 message = json_message_converter.convert_json_to_ros_message(
'std_msgs/String', json_str)
65 self.assertEqual(message, expected_message)
68 from std_msgs.msg
import String
69 expected_message = String(data =
u'Hello \u00dcnicode')
70 json_str =
'{"data": "Hello \\u00dcnicode"}' 71 message = json_message_converter.convert_json_to_ros_message(
'std_msgs/String', json_str)
73 self.assertEqual(message, expected_message)
76 from std_msgs.msg
import Header
78 now_time = rospy.Time(time())
81 frame_id =
'my_frame',
84 json_str =
'{{"stamp": {{"secs": {0}, "nsecs": {1}}}, "frame_id": "my_frame", "seq": 12}}'\
85 .format(now_time.secs, now_time.nsecs)
86 message = json_message_converter.convert_json_to_ros_message(
'std_msgs/Header', json_str)
88 self.assertEqual(message, expected_message)
91 self.assertRaises(ValueError,
92 json_message_converter.convert_json_to_ros_message,
94 '{"not_data": "Hello"}')
99 Serialize and then deserialize a message. This simulates sending a message 100 between ROS nodes and makes sure that the ROS messages being tested are 101 actually serializable, and are in the same format as they would be received 102 over the network. In rospy, it is possible to assign an illegal data type 103 to a message field (for example, `message = String(data=42)`), but trying 104 to publish this message will throw `SerializationError: field data must be 105 of type str`. This method will expose such bugs. 107 from io
import BytesIO
109 message.serialize(buff)
110 result = message.__class__()
111 result.deserialize(buff.getvalue())
115 PKG =
'rospy_message_converter' 116 NAME =
'test_json_message_converter' 117 if __name__ ==
'__main__':
119 rosunit.unitrun(PKG, NAME, TestJsonMessageConverter)
def test_json_with_invalid_message_fields(self)
def test_json_with_string_unicode(self)
def test_ros_message_with_uint8_array(self)
def test_json_with_string(self)
def test_ros_message_with_header(self)
def test_json_with_header(self)
def test_ros_message_with_string(self)
def serialize_deserialize(message)
def test_ros_message_with_string_unicode(self)
def test_ros_message_with_3uint8_array(self)