00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 from cStringIO import StringIO
00039 import time
00040 import random
00041
00042 import genpy
00043
00044 class TestRospyMsg(unittest.TestCase):
00045
00046 def test_args_kwds_to_message(self):
00047 import rospy
00048 from rospy.msg import args_kwds_to_message
00049 from test_rospy.msg import Val
00050
00051 v = Val('hello world-1')
00052 d = args_kwds_to_message(Val, (v,), None)
00053 self.assert_(d == v)
00054 d = args_kwds_to_message(Val, ('hello world-2',), None)
00055 self.assertEquals(d.val, 'hello world-2')
00056 d = args_kwds_to_message(Val, (), {'val':'hello world-3'})
00057 self.assertEquals(d.val, 'hello world-3')
00058
00059
00060 try:
00061 args_kwds_to_message(Val, 'hi', val='hello world-3')
00062 self.fail("should not allow args and kwds")
00063 except TypeError: pass
00064
00065 def test_serialize_message(self):
00066 import rospy.msg
00067 import rospy.rostime
00068
00069 rospy.rostime.set_rostime_initialized(True)
00070
00071 buff = StringIO()
00072 seq = random.randint(1, 1000)
00073
00074
00075 from test_rospy.msg import Val
00076
00077
00078 teststr = 'foostr-%s'%time.time()
00079 val = Val(teststr)
00080
00081 fmt = "<II%ss"%len(teststr)
00082 size = struct.calcsize(fmt) - 4
00083 valid = struct.pack(fmt, size, len(teststr), teststr)
00084
00085 rospy.msg.serialize_message(buff, seq, val)
00086
00087 self.assertEquals(valid, buff.getvalue())
00088
00089
00090 rospy.msg.serialize_message(buff, seq, val)
00091 rospy.msg.serialize_message(buff, seq, val)
00092 self.assertEquals(valid*3, buff.getvalue())
00093
00094
00095
00096 buff.seek(0)
00097 rospy.msg.serialize_message(buff, seq, val)
00098 self.assertEquals(valid*3, buff.getvalue())
00099 rospy.msg.serialize_message(buff, seq, val)
00100 self.assertEquals(valid*3, buff.getvalue())
00101 rospy.msg.serialize_message(buff, seq, val)
00102 self.assertEquals(valid*3, buff.getvalue())
00103
00104
00105 buff.truncate(0)
00106
00107 from test_rospy.msg import HeaderVal
00108 t = rospy.Time.now()
00109 t.secs = t.secs - 1
00110 h = rospy.Header(None, rospy.Time.now(), teststr)
00111 h.stamp = t
00112 val = HeaderVal(h, teststr)
00113 seq += 1
00114
00115 rospy.msg.serialize_message(buff, seq, val)
00116 self.assertEquals(val.header, h)
00117 self.assertEquals(seq, h.seq)
00118
00119 self.assertEquals(t, h.stamp)
00120 self.assertEquals(teststr, h.frame_id)
00121
00122
00123 h.frame_id = None
00124 rospy.msg.serialize_message(buff, seq, val)
00125 self.assertEquals(val.header, h)
00126 self.assertEquals('0', h.frame_id)
00127
00128
00129 def test_deserialize_messages(self):
00130 import rospy.msg
00131 from test_rospy.msg import Val
00132 num_tests = 10
00133 teststrs = ['foostr-%s'%random.randint(0, 10000) for i in xrange(0, num_tests)]
00134 valids = []
00135 for t in teststrs:
00136 fmt = "<II%ss"%len(t)
00137 size = struct.calcsize(fmt) - 4
00138 valids.append(struct.pack(fmt, size, len(t), t))
00139 data_class = Val
00140
00141 def validate_vals(vals, teststrs=teststrs):
00142 for i, v in zip(range(0, len(vals)), vals):
00143 self.assert_(isinstance(v, Val))
00144 self.assertEquals(teststrs[i], v.val)
00145
00146 b = StringIO()
00147 msg_queue = []
00148
00149
00150 try:
00151 rospy.msg.deserialize_messages(None, msg_queue, data_class)
00152 except genpy.DeserializationError: pass
00153
00154 try:
00155 rospy.msg.deserialize_messages(b, None, data_class)
00156 except genpy.DeserializationError: pass
00157
00158 rospy.msg.deserialize_messages(b, msg_queue, data_class)
00159 self.assertEquals(0, len(msg_queue))
00160 self.assertEquals(0, b.tell())
00161
00162
00163 b.truncate(0)
00164 b.write(valids[0])
00165 rospy.msg.deserialize_messages(b, msg_queue, data_class)
00166 self.assertEquals(1, len(msg_queue))
00167 validate_vals(msg_queue)
00168
00169 self.assertEquals(0, b.tell())
00170 del msg_queue[:]
00171
00172
00173 b.truncate(0)
00174 b.write(valids[0])
00175 b.write(valids[1])
00176 b.seek(len(valids[0]))
00177 rospy.msg.deserialize_messages(b, msg_queue, data_class)
00178 self.assertEquals(1, len(msg_queue))
00179 validate_vals(msg_queue)
00180
00181 self.assertEquals(0, b.tell())
00182
00183 del msg_queue[:]
00184
00185
00186 b.truncate(0)
00187 b.write(valids[0][:-1])
00188 rospy.msg.deserialize_messages(b, msg_queue, data_class)
00189 self.failIf(msg_queue, "deserialize of an incomplete buffer returned %s"%msg_queue)
00190
00191 del msg_queue[:]
00192
00193
00194 b.truncate(0)
00195 b.write(valids[0]+'leftovers')
00196 rospy.msg.deserialize_messages(b, msg_queue, data_class)
00197 self.assertEquals(1, len(msg_queue))
00198 validate_vals(msg_queue)
00199
00200 self.assertEquals('leftovers', b.getvalue())
00201
00202 del msg_queue[:]
00203
00204
00205 b.truncate(0)
00206 for v in valids:
00207 b.write(v)
00208 rospy.msg.deserialize_messages(b, msg_queue, data_class)
00209 self.assertEquals(len(valids), len(msg_queue))
00210 validate_vals(msg_queue)
00211
00212 self.assertEquals(0, b.tell())
00213
00214 del msg_queue[:]
00215
00216
00217 max_msgs = 5
00218 b.truncate(0)
00219 for v in valids:
00220 b.write(v)
00221 rospy.msg.deserialize_messages(b, msg_queue, data_class, max_msgs=max_msgs)
00222 self.assertEquals(max_msgs, len(msg_queue))
00223 validate_vals(msg_queue)
00224
00225 b2 = StringIO()
00226 for v in valids[max_msgs:]:
00227 b2.write(v)
00228 self.assertEquals(b.getvalue(), b2.getvalue())
00229
00230
00231 rospy.msg.deserialize_messages(b, msg_queue, data_class)
00232 self.assertEquals(len(valids), len(msg_queue))
00233 validate_vals(msg_queue)
00234
00235 del msg_queue[:]
00236
00237
00238 queue_size = 5
00239 b.truncate(0)
00240 for v in valids:
00241 b.write(v)
00242
00243 msg_queue = [1, 2, 3, 4, 5, 6, 7, 9, 10, 11]
00244 rospy.msg.deserialize_messages(b, msg_queue, data_class, queue_size=queue_size)
00245 self.assertEquals(queue_size, len(msg_queue))
00246
00247 validate_vals(msg_queue, teststrs[-queue_size:])
00248
00249 self.assertEquals(0, b.tell())
00250
00251
00252 queue_size = 5
00253 max_msgs = 5
00254 b.truncate(0)
00255 for v in valids:
00256 b.write(v)
00257
00258 msg_queue = [1, 2, 3, 4, 5, 6, 7, 9, 10, 11]
00259 rospy.msg.deserialize_messages(b, msg_queue, data_class, max_msgs=max_msgs, queue_size=queue_size)
00260 self.assertEquals(queue_size, len(msg_queue))
00261
00262 validate_vals(msg_queue)
00263
00264 b2 = StringIO()
00265 for v in valids[max_msgs:]:
00266 b2.write(v)
00267 self.assertEquals(b.getvalue(), b2.getvalue())
00268