test_rospy_msg.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2008, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 import os
35 import sys
36 import struct
37 import unittest
38 try:
39  from cStringIO import StringIO
40 except ImportError:
41  from io import StringIO
42 import time
43 import random
44 
45 import genpy
46 
47 class TestRospyMsg(unittest.TestCase):
48 
50  import rospy
51  from rospy.msg import args_kwds_to_message
52  from test_rospy.msg import Val, Empty
53 
54  v = Val('hello world-1')
55  d = args_kwds_to_message(Val, (v,), None)
56  self.assert_(d == v)
57  d = args_kwds_to_message(Val, ('hello world-2',), None)
58  self.assertEquals(d.val, 'hello world-2')
59  d = args_kwds_to_message(Val, (), {'val':'hello world-3'})
60  self.assertEquals(d.val, 'hello world-3')
61 
62  # error cases
63  try:
64  args_kwds_to_message(Val, 'hi', val='hello world-3')
65  self.fail("should not allow args and kwds")
66  except TypeError: pass
67  try:
68  args_kwds_to_message(Empty, (Val('hola'),), None)
69  self.fail("should raise TypeError when publishing Val msg to Empty topic")
70  except TypeError: pass
71 
73  import rospy.msg
74  import rospy.rostime
75  # have to fake-init rostime so that Header can be stamped
76  rospy.rostime.set_rostime_initialized(True)
77 
78  buff = StringIO()
79  seq = random.randint(1, 1000)
80 
81  #serialize_message(seq, msg)
82  from test_rospy.msg import Val
83 
84  #serialize a simple 'Val' with a string in it
85  teststr = 'foostr-%s'%time.time()
86  val = Val(teststr)
87 
88  fmt = "<II%ss"%len(teststr)
89  size = struct.calcsize(fmt) - 4
90  valid = struct.pack(fmt, size, len(teststr), teststr)
91 
92  rospy.msg.serialize_message(buff, seq, val)
93 
94  self.assertEquals(valid, buff.getvalue())
95 
96  #test repeated serialization
97  rospy.msg.serialize_message(buff, seq, val)
98  rospy.msg.serialize_message(buff, seq, val)
99  self.assertEquals(valid*3, buff.getvalue())
100 
101  # - once more just to make sure that the buffer position is
102  # being preserved properly
103  buff.seek(0)
104  rospy.msg.serialize_message(buff, seq, val)
105  self.assertEquals(valid*3, buff.getvalue())
106  rospy.msg.serialize_message(buff, seq, val)
107  self.assertEquals(valid*3, buff.getvalue())
108  rospy.msg.serialize_message(buff, seq, val)
109  self.assertEquals(valid*3, buff.getvalue())
110 
111  #test sequence parameter
112  buff.truncate(0)
113 
114  from test_rospy.msg import HeaderVal
115  t = rospy.Time.now()
116  t.secs = t.secs - 1 # move it back in time
117  h = rospy.Header(None, rospy.Time.now(), teststr)
118  h.stamp = t
119  val = HeaderVal(h, teststr)
120  seq += 1
121 
122  rospy.msg.serialize_message(buff, seq, val)
123  self.assertEquals(val.header, h)
124  self.assertEquals(seq, h.seq)
125  #should not have been changed
126  self.assertEquals(t, h.stamp)
127  self.assertEquals(teststr, h.frame_id)
128 
129  #test frame_id setting
130  h.frame_id = None
131  rospy.msg.serialize_message(buff, seq, val)
132  self.assertEquals(val.header, h)
133  self.assertEquals('0', h.frame_id)
134 
135 
137  import rospy.msg
138  from test_rospy.msg import Val
139  num_tests = 10
140  teststrs = ['foostr-%s'%random.randint(0, 10000) for i in range(0, num_tests)]
141  valids = []
142  for t in teststrs:
143  fmt = "<II%ss"%len(t)
144  size = struct.calcsize(fmt) - 4
145  valids.append(struct.pack(fmt, size, len(t), t))
146  data_class = Val
147 
148  def validate_vals(vals, teststrs=teststrs):
149  for i, v in zip(range(0, len(vals)), vals):
150  self.assert_(isinstance(v, Val))
151  self.assertEquals(teststrs[i], v.val)
152 
153  b = StringIO()
154  msg_queue = []
155 
156  #test with null buff
157  try:
158  rospy.msg.deserialize_messages(None, msg_queue, data_class)
159  except genpy.DeserializationError: pass
160  #test will null msg_queue
161  try:
162  rospy.msg.deserialize_messages(b, None, data_class)
163  except genpy.DeserializationError: pass
164  #test with empty buff
165  rospy.msg.deserialize_messages(b, msg_queue, data_class)
166  self.assertEquals(0, len(msg_queue))
167  self.assertEquals(0, b.tell())
168 
169  #deserialize a simple value
170  b.truncate(0)
171  b.write(valids[0])
172  rospy.msg.deserialize_messages(b, msg_queue, data_class)
173  self.assertEquals(1, len(msg_queue))
174  validate_vals(msg_queue)
175  # - buffer should be reset
176  self.assertEquals(0, b.tell())
177  del msg_queue[:]
178 
179  #verify deserialize does not read past b.tell()
180  b.truncate(0)
181  b.write(valids[0])
182  b.write(valids[1])
183  b.seek(len(valids[0]))
184  rospy.msg.deserialize_messages(b, msg_queue, data_class)
185  self.assertEquals(1, len(msg_queue))
186  validate_vals(msg_queue)
187  # - buffer should be reset
188  self.assertEquals(0, b.tell())
189 
190  del msg_queue[:]
191 
192  #deserialize an incomplete message
193  b.truncate(0)
194  b.write(valids[0][:-1])
195  rospy.msg.deserialize_messages(b, msg_queue, data_class)
196  self.failIf(msg_queue, "deserialize of an incomplete buffer returned %s"%msg_queue)
197 
198  del msg_queue[:]
199 
200  #deserialize with extra data leftover
201  b.truncate(0)
202  b.write(valids[0]+'leftovers')
203  rospy.msg.deserialize_messages(b, msg_queue, data_class)
204  self.assertEquals(1, len(msg_queue))
205  validate_vals(msg_queue)
206  # - leftovers should be pushed to the front of the buffer
207  self.assertEquals('leftovers', b.getvalue())
208 
209  del msg_queue[:]
210 
211  #deserialize multiple values
212  b.truncate(0)
213  for v in valids:
214  b.write(v)
215  rospy.msg.deserialize_messages(b, msg_queue, data_class)
216  self.assertEquals(len(valids), len(msg_queue))
217  validate_vals(msg_queue)
218  # - buffer should be reset
219  self.assertEquals(0, b.tell())
220 
221  del msg_queue[:]
222 
223  #deserialize multiple values with max_msgs
224  max_msgs = 5
225  b.truncate(0)
226  for v in valids:
227  b.write(v)
228  rospy.msg.deserialize_messages(b, msg_queue, data_class, max_msgs=max_msgs)
229  self.assertEquals(max_msgs, len(msg_queue))
230  validate_vals(msg_queue)
231  # - buffer should be have remaining msgs
232  b2 = StringIO()
233  for v in valids[max_msgs:]:
234  b2.write(v)
235  self.assertEquals(b.getvalue(), b2.getvalue())
236 
237  #deserialize rest and verify that msg_queue is appended to
238  rospy.msg.deserialize_messages(b, msg_queue, data_class)
239  self.assertEquals(len(valids), len(msg_queue))
240  validate_vals(msg_queue)
241 
242  del msg_queue[:]
243 
244  #deserialize multiple values with queue_size
245  queue_size = 5
246  b.truncate(0)
247  for v in valids:
248  b.write(v)
249  # fill queue with junk
250  msg_queue = [1, 2, 3, 4, 5, 6, 7, 9, 10, 11]
251  rospy.msg.deserialize_messages(b, msg_queue, data_class, queue_size=queue_size)
252  self.assertEquals(queue_size, len(msg_queue))
253  # - msg_queue should have the most recent values only
254  validate_vals(msg_queue, teststrs[-queue_size:])
255  # - buffer should be reset
256  self.assertEquals(0, b.tell())
257 
258  #deserialize multiple values with max_msgs and queue_size
259  queue_size = 5
260  max_msgs = 5
261  b.truncate(0)
262  for v in valids:
263  b.write(v)
264  # fill queue with junk
265  msg_queue = [1, 2, 3, 4, 5, 6, 7, 9, 10, 11]
266  rospy.msg.deserialize_messages(b, msg_queue, data_class, max_msgs=max_msgs, queue_size=queue_size)
267  self.assertEquals(queue_size, len(msg_queue))
268  # - msg_queue should have the oldest messages
269  validate_vals(msg_queue)
270  # - buffer should be have remaining msgs
271  b2 = StringIO()
272  for v in valids[max_msgs:]:
273  b2.write(v)
274  self.assertEquals(b.getvalue(), b2.getvalue())
275 


test_rospy
Author(s): Ken Conley, Dirk Thomas
autogenerated on Mon Nov 2 2020 03:52:56