2 from __future__
import print_function
7 from json
import loads, dumps
10 from cStringIO
import StringIO
12 from io
import BytesIO
as StringIO
16 from base64
import standard_b64encode, standard_b64decode
18 if sys.version_info >= (3, 0):
21 string_types = (str, unicode)
27 rospy.init_node(
"test_message_conversion")
30 """ Serializes and deserializes the inst to typecheck and ensure that 31 instances are correct """ 36 inst2.deserialize(buff.getvalue())
37 self.assertEqual(inst1, inst2)
41 if type(msg1)
in string_types
and type(msg2)
in string_types:
44 self.assertEqual(type(msg1), type(msg2))
45 if type(msg1)
in c.list_types:
46 for x, y
in zip(msg1, msg2):
48 elif type(msg1)
in c.primitive_types
or type(msg1)
in c.string_types:
49 self.assertEqual(msg1, msg2)
52 self.assertTrue(x
in msg2)
54 self.assertTrue(x
in msg1)
59 for msg
in [{
"data": data_value},
loads(
dumps({
"data": data_value}))]:
60 inst = ros_loader.get_message_instance(msgtype)
61 c.populate_instance(msg, inst)
62 self.assertEqual(inst.data, data_value)
64 extracted = c.extract_values(inst)
65 for msg2
in [extracted,
loads(
dumps(extracted))]:
67 self.assertEqual(msg[
"data"], msg2[
"data"])
68 self.assertEqual(msg2[
"data"], inst.data)
72 inst = ros_loader.get_message_instance(msgtype)
73 c.populate_instance(msg, inst)
75 extracted = c.extract_values(inst)
76 for msg2
in [extracted,
loads(
dumps(extracted))]:
81 for msg
in range(-100, 100):
82 for rostype
in [
"int8",
"int16",
"int32",
"int64"]:
83 self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
84 self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
86 for msg
in range(0, 200):
87 for rostype
in [
"uint8",
"uint16",
"uint32",
"uint64"]:
88 self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
89 self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
92 self.assertTrue(c._to_primitive_inst(
True,
"bool",
"bool", []))
93 self.assertTrue(c._to_inst(
True,
"bool",
"bool"))
94 self.assertFalse(c._to_primitive_inst(
False,
"bool",
"bool", []))
95 self.assertFalse(c._to_inst(
False,
"bool",
"bool"))
98 for msg
in [0.12341234 + i
for i
in range(-100, 100)]:
99 for rostype
in [
"float32",
"float64"]:
100 self.assertEqual(c._to_primitive_inst(msg, rostype, rostype, []), msg)
101 self.assertEqual(c._to_inst(msg, rostype, rostype), msg)
102 c._to_inst(msg, rostype, rostype)
105 for msg
in [1e9999999, -1e9999999, float(
'nan')]:
106 for rostype
in [
"float32",
"float64"]:
107 self.assertEqual(c._from_inst(msg, rostype),
None)
108 self.assertEqual(
dumps({
"data":c._from_inst(msg, rostype)}),
"{\"data\": null}")
111 int8s = range(-127, 128)
119 int16s = [-32767, 32767]
127 int32s = [-2147483647, 2147483647]
135 int64s = [-9223372036854775807, 9223372036854775807]
144 int8s = range(0, 256)
152 int16s = [32767, 32768, 65535]
160 int32s = [2147483647, 2147483648, 4294967295]
168 int64s = [4294967296, 9223372036854775807, 9223372036854775808,
169 18446744073709551615]
182 for x
in c.ros_primitive_types:
186 msg = {
"data": {
"secs": 3,
"nsecs": 5}}
187 self.
do_test(msg,
"std_msgs/Time")
189 msg = {
"times": [{
"secs": 3,
"nsecs": 5}, {
"secs": 2,
"nsecs": 7}]}
190 self.
do_test(msg,
"rosbridge_library/TestTimeArray")
193 msg = {
"data":
"now"}
194 msgtype =
"std_msgs/Time" 196 inst = ros_loader.get_message_instance(msgtype)
197 c.populate_instance(msg, inst)
198 currenttime = rospy.get_rostime()
200 extracted = c.extract_values(inst)
202 self.assertIn(
"data", extracted)
203 self.assertIn(
"secs", extracted[
"data"])
204 self.assertIn(
"nsecs", extracted[
"data"])
205 self.assertNotEqual(extracted[
"data"][
"secs"], 0)
206 self.assertLessEqual(extracted[
"data"][
"secs"], currenttime.secs)
207 self.assertGreaterEqual(currenttime.secs, extracted[
"data"][
"secs"])
210 msg = {
"data": {
"secs": 3,
"nsecs": 5}}
211 self.
do_test(msg,
"std_msgs/Duration")
213 msg = {
"durations": [{
"secs": 3,
"nsecs": 5}, {
"secs": 2,
"nsecs": 7}]}
214 self.
do_test(msg,
"rosbridge_library/TestDurationArray")
217 msg = {
"seq": 5,
"stamp": {
"secs": 12347,
"nsecs": 322304},
"frame_id":
"2394dnfnlcx;v[p234j]"}
218 self.
do_test(msg,
"std_msgs/Header")
220 msg = {
"header": msg}
221 self.
do_test(msg,
"rosbridge_library/TestHeader")
222 self.
do_test(msg,
"rosbridge_library/TestHeaderTwo")
224 msg = {
"header": [msg[
"header"], msg[
"header"], msg[
"header"]]}
225 msg[
"header"][1][
"seq"] = 6
226 msg[
"header"][2][
"seq"] = 7
227 self.
do_test(msg,
"rosbridge_library/TestHeaderArray")
230 assortedmsgs = [
"geometry_msgs/Pose",
"actionlib_msgs/GoalStatus",
231 "geometry_msgs/WrenchStamped",
"stereo_msgs/DisparityImage",
232 "nav_msgs/OccupancyGrid",
"geometry_msgs/Point32",
"std_msgs/String",
233 "trajectory_msgs/JointTrajectoryPoint",
"diagnostic_msgs/KeyValue",
234 "visualization_msgs/InteractiveMarkerUpdate",
"nav_msgs/GridCells",
235 "sensor_msgs/PointCloud2"]
236 for rostype
in assortedmsgs:
237 inst = ros_loader.get_message_instance(rostype)
238 msg = c.extract_values(inst)
241 inst2 = ros_loader.get_message_instance(rostype)
242 c.populate_instance(msg, inst2)
243 self.assertEqual(inst, inst2)
246 def test_int8_msg(rostype, data):
248 inst = ros_loader.get_message_instance(rostype)
249 c.populate_instance(msg, inst)
253 for msgtype
in [
"TestChar",
"TestUInt8"]:
254 rostype =
"rosbridge_library/" + msgtype
256 int8s = list(range(0, 256))
257 ret = test_int8_msg(rostype, int8s)
258 self.assertEqual(ret, bytes(bytearray(int8s)))
260 str_int8s = bytes(bytearray(int8s))
262 b64str_int8s = standard_b64encode(str_int8s).
decode(
'ascii')
263 ret = test_int8_msg(rostype, b64str_int8s)
264 self.assertEqual(ret, str_int8s)
266 for msgtype
in [
"TestUInt8FixedSizeArray16"]:
267 rostype =
"rosbridge_library/" + msgtype
269 int8s = list(range(0, 16))
270 ret = test_int8_msg(rostype, int8s)
271 self.assertEqual(ret, bytes(bytearray(int8s)))
273 str_int8s = bytes(bytearray(int8s))
275 b64str_int8s = standard_b64encode(str_int8s).
decode(
'ascii')
276 ret = test_int8_msg(rostype, b64str_int8s)
277 self.assertEqual(ret, str_int8s)
280 PKG =
'rosbridge_library' 281 NAME =
'test_message_conversion' 282 if __name__ ==
'__main__':
283 rostest.unitrun(PKG, NAME, TestMessageConversion)
def test_header_msg(self)
def dumps(ob, sort_keys=False)
def msgs_equal(self, msg1, msg2)
def test_duration_msg(self)
def test_signed_int_base_msgs(self)
def do_primitive_test(self, data_value, msgtype)
def test_time_msg_now(self)
def test_int_primitives(self)
def test_float_primitives(self)
def validate_instance(self, inst1)
def test_string_base_msg(self)
def test_unsigned_int_base_msgs(self)
def test_bool_primitives(self)
def test_assorted_msgs(self)
def do_test(self, orig_msg, msgtype)
def test_float_special_cases(self)
def test_bool_base_msg(self)