39 from cStringIO
import StringIO
41 from io
import StringIO
51 from rospy.msg
import args_kwds_to_message
52 from test_rospy.msg
import Val, Empty
54 v = Val(
'hello world-1')
55 d = args_kwds_to_message(Val, (v,),
None)
57 d = args_kwds_to_message(Val, (
'hello world-2',),
None)
58 self.assertEquals(d.val,
'hello world-2')
59 d = args_kwds_to_message(Val, (), {
'val':
'hello world-3'})
60 self.assertEquals(d.val,
'hello world-3')
64 args_kwds_to_message(Val,
'hi', val=
'hello world-3')
65 self.fail(
"should not allow args and kwds")
66 except TypeError:
pass 68 args_kwds_to_message(Empty, (Val(
'hola'),),
None)
69 self.fail(
"should raise TypeError when publishing Val msg to Empty topic")
70 except TypeError:
pass 76 rospy.rostime.set_rostime_initialized(
True)
79 seq = random.randint(1, 1000)
82 from test_rospy.msg
import Val
85 teststr =
'foostr-%s'%time.time()
88 fmt =
"<II%ss"%len(teststr)
89 size = struct.calcsize(fmt) - 4
90 valid = struct.pack(fmt, size, len(teststr), teststr)
92 rospy.msg.serialize_message(buff, seq, val)
94 self.assertEquals(valid, buff.getvalue())
97 rospy.msg.serialize_message(buff, seq, val)
98 rospy.msg.serialize_message(buff, seq, val)
99 self.assertEquals(valid*3, buff.getvalue())
104 rospy.msg.serialize_message(buff, seq, val)
105 self.assertEquals(valid*3, buff.getvalue())
106 rospy.msg.serialize_message(buff, seq, val)
107 self.assertEquals(valid*3, buff.getvalue())
108 rospy.msg.serialize_message(buff, seq, val)
109 self.assertEquals(valid*3, buff.getvalue())
114 from test_rospy.msg
import HeaderVal
117 h = rospy.Header(
None, rospy.Time.now(), teststr)
119 val = HeaderVal(h, teststr)
122 rospy.msg.serialize_message(buff, seq, val)
123 self.assertEquals(val.header, h)
124 self.assertEquals(seq, h.seq)
126 self.assertEquals(t, h.stamp)
127 self.assertEquals(teststr, h.frame_id)
131 rospy.msg.serialize_message(buff, seq, val)
132 self.assertEquals(val.header, h)
133 self.assertEquals(
'0', h.frame_id)
138 from test_rospy.msg
import Val
140 teststrs = [
'foostr-%s'%random.randint(0, 10000)
for i
in range(0, num_tests)]
143 fmt =
"<II%ss"%len(t)
144 size = struct.calcsize(fmt) - 4
145 valids.append(struct.pack(fmt, size, len(t), t))
148 def validate_vals(vals, teststrs=teststrs):
149 for i, v
in zip(range(0, len(vals)), vals):
150 self.assert_(isinstance(v, Val))
151 self.assertEquals(teststrs[i], v.val)
158 rospy.msg.deserialize_messages(
None, msg_queue, data_class)
159 except genpy.DeserializationError:
pass 162 rospy.msg.deserialize_messages(b,
None, data_class)
163 except genpy.DeserializationError:
pass 165 rospy.msg.deserialize_messages(b, msg_queue, data_class)
166 self.assertEquals(0, len(msg_queue))
167 self.assertEquals(0, b.tell())
172 rospy.msg.deserialize_messages(b, msg_queue, data_class)
173 self.assertEquals(1, len(msg_queue))
174 validate_vals(msg_queue)
176 self.assertEquals(0, b.tell())
183 b.seek(len(valids[0]))
184 rospy.msg.deserialize_messages(b, msg_queue, data_class)
185 self.assertEquals(1, len(msg_queue))
186 validate_vals(msg_queue)
188 self.assertEquals(0, b.tell())
194 b.write(valids[0][:-1])
195 rospy.msg.deserialize_messages(b, msg_queue, data_class)
196 self.failIf(msg_queue,
"deserialize of an incomplete buffer returned %s"%msg_queue)
202 b.write(valids[0]+
'leftovers')
203 rospy.msg.deserialize_messages(b, msg_queue, data_class)
204 self.assertEquals(1, len(msg_queue))
205 validate_vals(msg_queue)
207 self.assertEquals(
'leftovers', b.getvalue())
215 rospy.msg.deserialize_messages(b, msg_queue, data_class)
216 self.assertEquals(len(valids), len(msg_queue))
217 validate_vals(msg_queue)
219 self.assertEquals(0, b.tell())
228 rospy.msg.deserialize_messages(b, msg_queue, data_class, max_msgs=max_msgs)
229 self.assertEquals(max_msgs, len(msg_queue))
230 validate_vals(msg_queue)
233 for v
in valids[max_msgs:]:
235 self.assertEquals(b.getvalue(), b2.getvalue())
238 rospy.msg.deserialize_messages(b, msg_queue, data_class)
239 self.assertEquals(len(valids), len(msg_queue))
240 validate_vals(msg_queue)
250 msg_queue = [1, 2, 3, 4, 5, 6, 7, 9, 10, 11]
251 rospy.msg.deserialize_messages(b, msg_queue, data_class, queue_size=queue_size)
252 self.assertEquals(queue_size, len(msg_queue))
254 validate_vals(msg_queue, teststrs[-queue_size:])
256 self.assertEquals(0, b.tell())
265 msg_queue = [1, 2, 3, 4, 5, 6, 7, 9, 10, 11]
266 rospy.msg.deserialize_messages(b, msg_queue, data_class, max_msgs=max_msgs, queue_size=queue_size)
267 self.assertEquals(queue_size, len(msg_queue))
269 validate_vals(msg_queue)
272 for v
in valids[max_msgs:]:
274 self.assertEquals(b.getvalue(), b2.getvalue())
def test_args_kwds_to_message(self)
def test_serialize_message(self)
def test_deserialize_messages(self)