00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 import roslib; roslib.load_manifest('test_rospy')
00037
00038 import os
00039 import sys
00040 import struct
00041 import unittest
00042 from cStringIO import StringIO
00043 import time
00044 import random
00045
00046 class TestRospyMsg(unittest.TestCase):
00047
00048 def test_args_kwds_to_message(self):
00049 import rospy
00050 from rospy.msg import args_kwds_to_message
00051 from test_rospy.msg import Val
00052
00053 v = Val('hello world-1')
00054 d = args_kwds_to_message(Val, (v,), None)
00055 self.assert_(d == v)
00056 d = args_kwds_to_message(Val, ('hello world-2',), None)
00057 self.assertEquals(d.val, 'hello world-2')
00058 d = args_kwds_to_message(Val, (), {'val':'hello world-3'})
00059 self.assertEquals(d.val, 'hello world-3')
00060
00061
00062 try:
00063 args_kwds_to_message(Val, 'hi', val='hello world-3')
00064 self.fail("should not allow args and kwds")
00065 except TypeError: pass
00066
00067 def test_serialize_message(self):
00068 import rospy.msg
00069 import rospy.rostime
00070
00071 rospy.rostime.set_rostime_initialized(True)
00072
00073 buff = StringIO()
00074 seq = random.randint(1, 1000)
00075
00076
00077 from test_rospy.msg import Val
00078
00079
00080 teststr = 'foostr-%s'%time.time()
00081 val = Val(teststr)
00082
00083 fmt = "<II%ss"%len(teststr)
00084 size = struct.calcsize(fmt) - 4
00085 valid = struct.pack(fmt, size, len(teststr), teststr)
00086
00087 rospy.msg.serialize_message(buff, seq, val)
00088
00089 self.assertEquals(valid, buff.getvalue())
00090
00091
00092 rospy.msg.serialize_message(buff, seq, val)
00093 rospy.msg.serialize_message(buff, seq, val)
00094 self.assertEquals(valid*3, buff.getvalue())
00095
00096
00097
00098 buff.seek(0)
00099 rospy.msg.serialize_message(buff, seq, val)
00100 self.assertEquals(valid*3, buff.getvalue())
00101 rospy.msg.serialize_message(buff, seq, val)
00102 self.assertEquals(valid*3, buff.getvalue())
00103 rospy.msg.serialize_message(buff, seq, val)
00104 self.assertEquals(valid*3, buff.getvalue())
00105
00106
00107 buff.truncate(0)
00108
00109 from test_rospy.msg import HeaderVal
00110 t = rospy.Time.now()
00111 t.secs = t.secs - 1
00112 h = rospy.Header(None, rospy.Time.now(), teststr)
00113 h.stamp = t
00114 val = HeaderVal(h, teststr)
00115 seq += 1
00116
00117 rospy.msg.serialize_message(buff, seq, val)
00118 self.assertEquals(val.header, h)
00119 self.assertEquals(seq, h.seq)
00120
00121 self.assertEquals(t, h.stamp)
00122 self.assertEquals(teststr, h.frame_id)
00123
00124
00125 h.frame_id = None
00126 rospy.msg.serialize_message(buff, seq, val)
00127 self.assertEquals(val.header, h)
00128 self.assertEquals('0', h.frame_id)
00129
00130
00131 def test_deserialize_messages(self):
00132 import rospy.msg
00133 from test_rospy.msg import Val
00134 num_tests = 10
00135 teststrs = ['foostr-%s'%random.randint(0, 10000) for i in xrange(0, num_tests)]
00136 valids = []
00137 for t in teststrs:
00138 fmt = "<II%ss"%len(t)
00139 size = struct.calcsize(fmt) - 4
00140 valids.append(struct.pack(fmt, size, len(t), t))
00141 data_class = Val
00142
00143 def validate_vals(vals, teststrs=teststrs):
00144 for i, v in zip(range(0, len(vals)), vals):
00145 self.assert_(isinstance(v, Val))
00146 self.assertEquals(teststrs[i], v.val)
00147
00148 b = StringIO()
00149 msg_queue = []
00150
00151
00152 try:
00153 rospy.msg.deserialize_messages(None, msg_queue, data_class)
00154 except roslib.message.DeserializationError: pass
00155
00156 try:
00157 rospy.msg.deserialize_messages(b, None, data_class)
00158 except roslib.message.DeserializationError: pass
00159
00160 rospy.msg.deserialize_messages(b, msg_queue, data_class)
00161 self.assertEquals(0, len(msg_queue))
00162 self.assertEquals(0, b.tell())
00163
00164
00165 b.truncate(0)
00166 b.write(valids[0])
00167 rospy.msg.deserialize_messages(b, msg_queue, data_class)
00168 self.assertEquals(1, len(msg_queue))
00169 validate_vals(msg_queue)
00170
00171 self.assertEquals(0, b.tell())
00172 del msg_queue[:]
00173
00174
00175 b.truncate(0)
00176 b.write(valids[0])
00177 b.write(valids[1])
00178 b.seek(len(valids[0]))
00179 rospy.msg.deserialize_messages(b, msg_queue, data_class)
00180 self.assertEquals(1, len(msg_queue))
00181 validate_vals(msg_queue)
00182
00183 self.assertEquals(0, b.tell())
00184
00185 del msg_queue[:]
00186
00187
00188 b.truncate(0)
00189 b.write(valids[0][:-1])
00190 rospy.msg.deserialize_messages(b, msg_queue, data_class)
00191 self.failIf(msg_queue, "deserialize of an incomplete buffer returned %s"%msg_queue)
00192
00193 del msg_queue[:]
00194
00195
00196 b.truncate(0)
00197 b.write(valids[0]+'leftovers')
00198 rospy.msg.deserialize_messages(b, msg_queue, data_class)
00199 self.assertEquals(1, len(msg_queue))
00200 validate_vals(msg_queue)
00201
00202 self.assertEquals('leftovers', b.getvalue())
00203
00204 del msg_queue[:]
00205
00206
00207 b.truncate(0)
00208 for v in valids:
00209 b.write(v)
00210 rospy.msg.deserialize_messages(b, msg_queue, data_class)
00211 self.assertEquals(len(valids), len(msg_queue))
00212 validate_vals(msg_queue)
00213
00214 self.assertEquals(0, b.tell())
00215
00216 del msg_queue[:]
00217
00218
00219 max_msgs = 5
00220 b.truncate(0)
00221 for v in valids:
00222 b.write(v)
00223 rospy.msg.deserialize_messages(b, msg_queue, data_class, max_msgs=max_msgs)
00224 self.assertEquals(max_msgs, len(msg_queue))
00225 validate_vals(msg_queue)
00226
00227 b2 = StringIO()
00228 for v in valids[max_msgs:]:
00229 b2.write(v)
00230 self.assertEquals(b.getvalue(), b2.getvalue())
00231
00232
00233 rospy.msg.deserialize_messages(b, msg_queue, data_class)
00234 self.assertEquals(len(valids), len(msg_queue))
00235 validate_vals(msg_queue)
00236
00237 del msg_queue[:]
00238
00239
00240 queue_size = 5
00241 b.truncate(0)
00242 for v in valids:
00243 b.write(v)
00244
00245 msg_queue = [1, 2, 3, 4, 5, 6, 7, 9, 10, 11]
00246 rospy.msg.deserialize_messages(b, msg_queue, data_class, queue_size=queue_size)
00247 self.assertEquals(queue_size, len(msg_queue))
00248
00249 validate_vals(msg_queue, teststrs[-queue_size:])
00250
00251 self.assertEquals(0, b.tell())
00252
00253
00254 queue_size = 5
00255 max_msgs = 5
00256 b.truncate(0)
00257 for v in valids:
00258 b.write(v)
00259
00260 msg_queue = [1, 2, 3, 4, 5, 6, 7, 9, 10, 11]
00261 rospy.msg.deserialize_messages(b, msg_queue, data_class, max_msgs=max_msgs, queue_size=queue_size)
00262 self.assertEquals(queue_size, len(msg_queue))
00263
00264 validate_vals(msg_queue)
00265
00266 b2 = StringIO()
00267 for v in valids[max_msgs:]:
00268 b2.write(v)
00269 self.assertEquals(b.getvalue(), b2.getvalue())
00270
00271
00272 if __name__ == '__main__':
00273 import rostest
00274 rostest.unitrun('test_rospy', sys.argv[0], TestRospyMsg, coverage_packages=['rospy.msg'])