00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 try:
00039 from cStringIO import StringIO
00040 except ImportError:
00041 from io import StringIO
00042 import time
00043 import random
00044
00045 import genpy
00046
00047 class TestRospyMsg(unittest.TestCase):
00048
00049 def test_args_kwds_to_message(self):
00050 import rospy
00051 from rospy.msg import args_kwds_to_message
00052 from test_rospy.msg import Val, Empty
00053
00054 v = Val('hello world-1')
00055 d = args_kwds_to_message(Val, (v,), None)
00056 self.assert_(d == v)
00057 d = args_kwds_to_message(Val, ('hello world-2',), None)
00058 self.assertEquals(d.val, 'hello world-2')
00059 d = args_kwds_to_message(Val, (), {'val':'hello world-3'})
00060 self.assertEquals(d.val, 'hello world-3')
00061
00062
00063 try:
00064 args_kwds_to_message(Val, 'hi', val='hello world-3')
00065 self.fail("should not allow args and kwds")
00066 except TypeError: pass
00067 try:
00068 args_kwds_to_message(Empty, (Val('hola'),), None)
00069 self.fail("should raise TypeError when publishing Val msg to Empty topic")
00070 except TypeError: pass
00071
00072 def test_serialize_message(self):
00073 import rospy.msg
00074 import rospy.rostime
00075
00076 rospy.rostime.set_rostime_initialized(True)
00077
00078 buff = StringIO()
00079 seq = random.randint(1, 1000)
00080
00081
00082 from test_rospy.msg import Val
00083
00084
00085 teststr = 'foostr-%s'%time.time()
00086 val = Val(teststr)
00087
00088 fmt = "<II%ss"%len(teststr)
00089 size = struct.calcsize(fmt) - 4
00090 valid = struct.pack(fmt, size, len(teststr), teststr)
00091
00092 rospy.msg.serialize_message(buff, seq, val)
00093
00094 self.assertEquals(valid, buff.getvalue())
00095
00096
00097 rospy.msg.serialize_message(buff, seq, val)
00098 rospy.msg.serialize_message(buff, seq, val)
00099 self.assertEquals(valid*3, buff.getvalue())
00100
00101
00102
00103 buff.seek(0)
00104 rospy.msg.serialize_message(buff, seq, val)
00105 self.assertEquals(valid*3, buff.getvalue())
00106 rospy.msg.serialize_message(buff, seq, val)
00107 self.assertEquals(valid*3, buff.getvalue())
00108 rospy.msg.serialize_message(buff, seq, val)
00109 self.assertEquals(valid*3, buff.getvalue())
00110
00111
00112 buff.truncate(0)
00113
00114 from test_rospy.msg import HeaderVal
00115 t = rospy.Time.now()
00116 t.secs = t.secs - 1
00117 h = rospy.Header(None, rospy.Time.now(), teststr)
00118 h.stamp = t
00119 val = HeaderVal(h, teststr)
00120 seq += 1
00121
00122 rospy.msg.serialize_message(buff, seq, val)
00123 self.assertEquals(val.header, h)
00124 self.assertEquals(seq, h.seq)
00125
00126 self.assertEquals(t, h.stamp)
00127 self.assertEquals(teststr, h.frame_id)
00128
00129
00130 h.frame_id = None
00131 rospy.msg.serialize_message(buff, seq, val)
00132 self.assertEquals(val.header, h)
00133 self.assertEquals('0', h.frame_id)
00134
00135
00136 def test_deserialize_messages(self):
00137 import rospy.msg
00138 from test_rospy.msg import Val
00139 num_tests = 10
00140 teststrs = ['foostr-%s'%random.randint(0, 10000) for i in range(0, num_tests)]
00141 valids = []
00142 for t in teststrs:
00143 fmt = "<II%ss"%len(t)
00144 size = struct.calcsize(fmt) - 4
00145 valids.append(struct.pack(fmt, size, len(t), t))
00146 data_class = Val
00147
00148 def validate_vals(vals, teststrs=teststrs):
00149 for i, v in zip(range(0, len(vals)), vals):
00150 self.assert_(isinstance(v, Val))
00151 self.assertEquals(teststrs[i], v.val)
00152
00153 b = StringIO()
00154 msg_queue = []
00155
00156
00157 try:
00158 rospy.msg.deserialize_messages(None, msg_queue, data_class)
00159 except genpy.DeserializationError: pass
00160
00161 try:
00162 rospy.msg.deserialize_messages(b, None, data_class)
00163 except genpy.DeserializationError: pass
00164
00165 rospy.msg.deserialize_messages(b, msg_queue, data_class)
00166 self.assertEquals(0, len(msg_queue))
00167 self.assertEquals(0, b.tell())
00168
00169
00170 b.truncate(0)
00171 b.write(valids[0])
00172 rospy.msg.deserialize_messages(b, msg_queue, data_class)
00173 self.assertEquals(1, len(msg_queue))
00174 validate_vals(msg_queue)
00175
00176 self.assertEquals(0, b.tell())
00177 del msg_queue[:]
00178
00179
00180 b.truncate(0)
00181 b.write(valids[0])
00182 b.write(valids[1])
00183 b.seek(len(valids[0]))
00184 rospy.msg.deserialize_messages(b, msg_queue, data_class)
00185 self.assertEquals(1, len(msg_queue))
00186 validate_vals(msg_queue)
00187
00188 self.assertEquals(0, b.tell())
00189
00190 del msg_queue[:]
00191
00192
00193 b.truncate(0)
00194 b.write(valids[0][:-1])
00195 rospy.msg.deserialize_messages(b, msg_queue, data_class)
00196 self.failIf(msg_queue, "deserialize of an incomplete buffer returned %s"%msg_queue)
00197
00198 del msg_queue[:]
00199
00200
00201 b.truncate(0)
00202 b.write(valids[0]+'leftovers')
00203 rospy.msg.deserialize_messages(b, msg_queue, data_class)
00204 self.assertEquals(1, len(msg_queue))
00205 validate_vals(msg_queue)
00206
00207 self.assertEquals('leftovers', b.getvalue())
00208
00209 del msg_queue[:]
00210
00211
00212 b.truncate(0)
00213 for v in valids:
00214 b.write(v)
00215 rospy.msg.deserialize_messages(b, msg_queue, data_class)
00216 self.assertEquals(len(valids), len(msg_queue))
00217 validate_vals(msg_queue)
00218
00219 self.assertEquals(0, b.tell())
00220
00221 del msg_queue[:]
00222
00223
00224 max_msgs = 5
00225 b.truncate(0)
00226 for v in valids:
00227 b.write(v)
00228 rospy.msg.deserialize_messages(b, msg_queue, data_class, max_msgs=max_msgs)
00229 self.assertEquals(max_msgs, len(msg_queue))
00230 validate_vals(msg_queue)
00231
00232 b2 = StringIO()
00233 for v in valids[max_msgs:]:
00234 b2.write(v)
00235 self.assertEquals(b.getvalue(), b2.getvalue())
00236
00237
00238 rospy.msg.deserialize_messages(b, msg_queue, data_class)
00239 self.assertEquals(len(valids), len(msg_queue))
00240 validate_vals(msg_queue)
00241
00242 del msg_queue[:]
00243
00244
00245 queue_size = 5
00246 b.truncate(0)
00247 for v in valids:
00248 b.write(v)
00249
00250 msg_queue = [1, 2, 3, 4, 5, 6, 7, 9, 10, 11]
00251 rospy.msg.deserialize_messages(b, msg_queue, data_class, queue_size=queue_size)
00252 self.assertEquals(queue_size, len(msg_queue))
00253
00254 validate_vals(msg_queue, teststrs[-queue_size:])
00255
00256 self.assertEquals(0, b.tell())
00257
00258
00259 queue_size = 5
00260 max_msgs = 5
00261 b.truncate(0)
00262 for v in valids:
00263 b.write(v)
00264
00265 msg_queue = [1, 2, 3, 4, 5, 6, 7, 9, 10, 11]
00266 rospy.msg.deserialize_messages(b, msg_queue, data_class, max_msgs=max_msgs, queue_size=queue_size)
00267 self.assertEquals(queue_size, len(msg_queue))
00268
00269 validate_vals(msg_queue)
00270
00271 b2 = StringIO()
00272 for v in valids[max_msgs:]:
00273 b2.write(v)
00274 self.assertEquals(b.getvalue(), b2.getvalue())
00275