test_rospy_msg.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2008, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 import os
35 import sys
36 import struct
37 import unittest
38 try:
39  from cStringIO import StringIO
40 except ImportError:
41  from io import BytesIO as StringIO
42 import time
43 import random
44 
45 import genpy
46 
47 class TestRospyMsg(unittest.TestCase):
48 
50  import rospy
51  from rospy.msg import args_kwds_to_message
52  from test_rospy.msg import Val, Empty
53 
54  v = Val('hello world-1')
55  d = args_kwds_to_message(Val, (v,), None)
56  self.assertTrue(d == v)
57  d = args_kwds_to_message(Val, ('hello world-2',), None)
58  self.assertEqual(d.val, 'hello world-2')
59  d = args_kwds_to_message(Val, (), {'val':'hello world-3'})
60  self.assertEqual(d.val, 'hello world-3')
61 
62  # error cases
63  try:
64  args_kwds_to_message(Val, 'hi', val='hello world-3')
65  self.fail("should not allow args and kwds")
66  except TypeError: pass
67  try:
68  args_kwds_to_message(Empty, (Val('hola'),), None)
69  self.fail("should raise TypeError when publishing Val msg to Empty topic")
70  except TypeError: pass
71 
73  import rospy.msg
74  import rospy.rostime
75  # have to fake-init rostime so that Header can be stamped
76  rospy.rostime.set_rostime_initialized(True)
77 
78  buff = StringIO()
79  seq = random.randint(1, 1000)
80 
81  #serialize_message(seq, msg)
82  from test_rospy.msg import Val
83 
84  #serialize a simple 'Val' with a string in it
85  teststr = 'foostr-%s'%time.time()
86  val = Val(teststr)
87 
88  fmt = "<II%ss"%len(teststr)
89  size = struct.calcsize(fmt) - 4
90  valid = struct.pack(fmt, size, len(teststr), teststr.encode())
91 
92  rospy.msg.serialize_message(buff, seq, val)
93 
94  self.assertEqual(valid, buff.getvalue())
95 
96  #test repeated serialization
97  rospy.msg.serialize_message(buff, seq, val)
98  rospy.msg.serialize_message(buff, seq, val)
99  self.assertEqual(valid*3, buff.getvalue())
100 
101  # - once more just to make sure that the buffer position is
102  # being preserved properly
103  buff.seek(0)
104  rospy.msg.serialize_message(buff, seq, val)
105  self.assertEqual(valid*3, buff.getvalue())
106  rospy.msg.serialize_message(buff, seq, val)
107  self.assertEqual(valid*3, buff.getvalue())
108  rospy.msg.serialize_message(buff, seq, val)
109  self.assertEqual(valid*3, buff.getvalue())
110 
111  #test sequence parameter
112  buff.truncate(0)
113 
114  from test_rospy.msg import HeaderVal
115  t = rospy.Time.now()
116  t.secs = t.secs - 1 # move it back in time
117  h = rospy.Header(None, rospy.Time.now(), teststr)
118  h.stamp = t
119  val = HeaderVal(h, teststr)
120  seq += 1
121 
122  rospy.msg.serialize_message(buff, seq, val)
123  self.assertEqual(val.header, h)
124  self.assertEqual(seq, h.seq)
125  #should not have been changed
126  self.assertEqual(t, h.stamp)
127  self.assertEqual(teststr, h.frame_id)
128 
129  #test frame_id setting
130  h.frame_id = None
131  rospy.msg.serialize_message(buff, seq, val)
132  self.assertEqual(val.header, h)
133  self.assertEqual('0', h.frame_id)
134 
135 
137  import rospy.msg
138  from test_rospy.msg import Val
139  num_tests = 10
140  teststrs = ['foostr-%s'%random.randint(0, 10000) for i in range(0, num_tests)]
141  valids = []
142  for t in teststrs:
143  fmt = "<II%ss"%len(t)
144  size = struct.calcsize(fmt) - 4
145  valids.append(struct.pack(fmt, size, len(t), t.encode()))
146  data_class = Val
147 
148  def validate_vals(vals, teststrs=teststrs):
149  for i, v in zip(range(0, len(vals)), vals):
150  self.assertTrue(isinstance(v, Val))
151  self.assertEqual(teststrs[i], v.val)
152 
153  b = StringIO()
154  msg_queue = []
155 
156  #test with null buff
157  try:
158  rospy.msg.deserialize_messages(None, msg_queue, data_class)
159  except genpy.DeserializationError: pass
160  #test will null msg_queue
161  try:
162  rospy.msg.deserialize_messages(b, None, data_class)
163  except genpy.DeserializationError: pass
164  #test with empty buff
165  rospy.msg.deserialize_messages(b, msg_queue, data_class)
166  self.assertEqual(0, len(msg_queue))
167  self.assertEqual(0, b.tell())
168 
169  #deserialize a simple value
170  b.truncate(0)
171  b.write(valids[0])
172  rospy.msg.deserialize_messages(b, msg_queue, data_class)
173  self.assertEqual(1, len(msg_queue))
174  validate_vals(msg_queue)
175  # - buffer should be reset
176  self.assertEqual(0, b.tell())
177  del msg_queue[:]
178 
179  #verify deserialize does not read past b.tell()
180  b.truncate(0)
181  b.write(valids[0])
182  b.write(valids[1])
183  b.seek(len(valids[0]))
184  rospy.msg.deserialize_messages(b, msg_queue, data_class)
185  self.assertEqual(1, len(msg_queue))
186  validate_vals(msg_queue)
187  # - buffer should be reset
188  self.assertEqual(0, b.tell())
189 
190  del msg_queue[:]
191 
192  #deserialize an incomplete message
193  b.truncate(0)
194  b.write(valids[0][:-1])
195  rospy.msg.deserialize_messages(b, msg_queue, data_class)
196  self.assertFalse(msg_queue, "deserialize of an incomplete buffer returned %s"%msg_queue)
197 
198  del msg_queue[:]
199 
200  #deserialize with extra data leftover
201  b.truncate(0)
202  b.seek(0)
203  b.write(valids[0]+b'leftovers')
204  rospy.msg.deserialize_messages(b, msg_queue, data_class)
205  self.assertEqual(1, len(msg_queue))
206  validate_vals(msg_queue)
207  # - leftovers should be pushed to the front of the buffer
208  self.assertEqual(b'leftovers', b.getvalue())
209 
210  del msg_queue[:]
211 
212  #deserialize multiple values
213  b.truncate(0)
214  b.seek(0)
215  for v in valids:
216  b.write(v)
217  rospy.msg.deserialize_messages(b, msg_queue, data_class)
218  self.assertEqual(len(valids), len(msg_queue))
219  validate_vals(msg_queue)
220  # - buffer should be reset
221  self.assertEqual(0, b.tell())
222 
223  del msg_queue[:]
224 
225  #deserialize multiple values with max_msgs
226  max_msgs = 5
227  b.truncate(0)
228  for v in valids:
229  b.write(v)
230  rospy.msg.deserialize_messages(b, msg_queue, data_class, max_msgs=max_msgs)
231  self.assertEqual(max_msgs, len(msg_queue))
232  validate_vals(msg_queue)
233  # - buffer should be have remaining msgs
234  b2 = StringIO()
235  for v in valids[max_msgs:]:
236  b2.write(v)
237  self.assertEqual(b.getvalue(), b2.getvalue())
238 
239  #deserialize rest and verify that msg_queue is appended to
240  rospy.msg.deserialize_messages(b, msg_queue, data_class)
241  self.assertEqual(len(valids), len(msg_queue))
242  validate_vals(msg_queue)
243 
244  del msg_queue[:]
245 
246  #deserialize multiple values with queue_size
247  queue_size = 5
248  b.truncate(0)
249  for v in valids:
250  b.write(v)
251  # fill queue with junk
252  msg_queue = [1, 2, 3, 4, 5, 6, 7, 9, 10, 11]
253  rospy.msg.deserialize_messages(b, msg_queue, data_class, queue_size=queue_size)
254  self.assertEqual(queue_size, len(msg_queue))
255  # - msg_queue should have the most recent values only
256  validate_vals(msg_queue, teststrs[-queue_size:])
257  # - buffer should be reset
258  self.assertEqual(0, b.tell())
259 
260  #deserialize multiple values with max_msgs and queue_size
261  queue_size = 5
262  max_msgs = 5
263  b.truncate(0)
264  for v in valids:
265  b.write(v)
266  # fill queue with junk
267  msg_queue = [1, 2, 3, 4, 5, 6, 7, 9, 10, 11]
268  rospy.msg.deserialize_messages(b, msg_queue, data_class, max_msgs=max_msgs, queue_size=queue_size)
269  self.assertEqual(queue_size, len(msg_queue))
270  # - msg_queue should have the oldest messages
271  validate_vals(msg_queue)
272  # - buffer should be have remaining msgs
273  b2 = StringIO()
274  for v in valids[max_msgs:]:
275  b2.write(v)
276  self.assertEqual(b.getvalue(), b2.getvalue())
277 
unit.test_rospy_msg.TestRospyMsg.test_args_kwds_to_message
def test_args_kwds_to_message(self)
Definition: test_rospy_msg.py:49
unit.test_rospy_msg.TestRospyMsg.test_serialize_message
def test_serialize_message(self)
Definition: test_rospy_msg.py:72
unit.test_rospy_msg.TestRospyMsg
Definition: test_rospy_msg.py:47
unit.test_rospy_msg.TestRospyMsg.test_deserialize_messages
def test_deserialize_messages(self)
Definition: test_rospy_msg.py:136


test_rospy
Author(s): Ken Conley, Dirk Thomas , Jacob Perron
autogenerated on Tue May 20 2025 03:00:44