$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2008, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id$ 00035 00036 import roslib; roslib.load_manifest('test_rospy') 00037 00038 import os 00039 import sys 00040 import struct 00041 import unittest 00042 from cStringIO import StringIO 00043 import time 00044 import random 00045 00046 class TestRospyMsg(unittest.TestCase): 00047 00048 def test_args_kwds_to_message(self): 00049 import rospy 00050 from rospy.msg import args_kwds_to_message 00051 from test_rospy.msg import Val 00052 00053 v = Val('hello world-1') 00054 d = args_kwds_to_message(Val, (v,), None) 00055 self.assert_(d == v) 00056 d = args_kwds_to_message(Val, ('hello world-2',), None) 00057 self.assertEquals(d.val, 'hello world-2') 00058 d = args_kwds_to_message(Val, (), {'val':'hello world-3'}) 00059 self.assertEquals(d.val, 'hello world-3') 00060 00061 # error cases 00062 try: 00063 args_kwds_to_message(Val, 'hi', val='hello world-3') 00064 self.fail("should not allow args and kwds") 00065 except TypeError: pass 00066 00067 def test_serialize_message(self): 00068 import rospy.msg 00069 import rospy.rostime 00070 # have to fake-init rostime so that Header can be stamped 00071 rospy.rostime.set_rostime_initialized(True) 00072 00073 buff = StringIO() 00074 seq = random.randint(1, 1000) 00075 00076 #serialize_message(seq, msg) 00077 from test_rospy.msg import Val 00078 00079 #serialize a simple 'Val' with a string in it 00080 teststr = 'foostr-%s'%time.time() 00081 val = Val(teststr) 00082 00083 fmt = "<II%ss"%len(teststr) 00084 size = struct.calcsize(fmt) - 4 00085 valid = struct.pack(fmt, size, len(teststr), teststr) 00086 00087 rospy.msg.serialize_message(buff, seq, val) 00088 00089 self.assertEquals(valid, buff.getvalue()) 00090 00091 #test repeated serialization 00092 rospy.msg.serialize_message(buff, seq, val) 00093 rospy.msg.serialize_message(buff, seq, val) 00094 self.assertEquals(valid*3, buff.getvalue()) 00095 00096 # - once more just to make sure that the buffer position is 00097 # being preserved properly 00098 buff.seek(0) 00099 rospy.msg.serialize_message(buff, seq, val) 00100 self.assertEquals(valid*3, buff.getvalue()) 00101 rospy.msg.serialize_message(buff, seq, val) 00102 self.assertEquals(valid*3, buff.getvalue()) 00103 rospy.msg.serialize_message(buff, seq, val) 00104 self.assertEquals(valid*3, buff.getvalue()) 00105 00106 #test sequence parameter 00107 buff.truncate(0) 00108 00109 from test_rospy.msg import HeaderVal 00110 t = rospy.Time.now() 00111 t.secs = t.secs - 1 # move it back in time 00112 h = rospy.Header(None, rospy.Time.now(), teststr) 00113 h.stamp = t 00114 val = HeaderVal(h, teststr) 00115 seq += 1 00116 00117 rospy.msg.serialize_message(buff, seq, val) 00118 self.assertEquals(val.header, h) 00119 self.assertEquals(seq, h.seq) 00120 #should not have been changed 00121 self.assertEquals(t, h.stamp) 00122 self.assertEquals(teststr, h.frame_id) 00123 00124 #test frame_id setting 00125 h.frame_id = None 00126 rospy.msg.serialize_message(buff, seq, val) 00127 self.assertEquals(val.header, h) 00128 self.assertEquals('0', h.frame_id) 00129 00130 00131 def test_deserialize_messages(self): 00132 import rospy.msg 00133 from test_rospy.msg import Val 00134 num_tests = 10 00135 teststrs = ['foostr-%s'%random.randint(0, 10000) for i in xrange(0, num_tests)] 00136 valids = [] 00137 for t in teststrs: 00138 fmt = "<II%ss"%len(t) 00139 size = struct.calcsize(fmt) - 4 00140 valids.append(struct.pack(fmt, size, len(t), t)) 00141 data_class = Val 00142 00143 def validate_vals(vals, teststrs=teststrs): 00144 for i, v in zip(range(0, len(vals)), vals): 00145 self.assert_(isinstance(v, Val)) 00146 self.assertEquals(teststrs[i], v.val) 00147 00148 b = StringIO() 00149 msg_queue = [] 00150 00151 #test with null buff 00152 try: 00153 rospy.msg.deserialize_messages(None, msg_queue, data_class) 00154 except roslib.message.DeserializationError: pass 00155 #test will null msg_queue 00156 try: 00157 rospy.msg.deserialize_messages(b, None, data_class) 00158 except roslib.message.DeserializationError: pass 00159 #test with empty buff 00160 rospy.msg.deserialize_messages(b, msg_queue, data_class) 00161 self.assertEquals(0, len(msg_queue)) 00162 self.assertEquals(0, b.tell()) 00163 00164 #deserialize a simple value 00165 b.truncate(0) 00166 b.write(valids[0]) 00167 rospy.msg.deserialize_messages(b, msg_queue, data_class) 00168 self.assertEquals(1, len(msg_queue)) 00169 validate_vals(msg_queue) 00170 # - buffer should be reset 00171 self.assertEquals(0, b.tell()) 00172 del msg_queue[:] 00173 00174 #verify deserialize does not read past b.tell() 00175 b.truncate(0) 00176 b.write(valids[0]) 00177 b.write(valids[1]) 00178 b.seek(len(valids[0])) 00179 rospy.msg.deserialize_messages(b, msg_queue, data_class) 00180 self.assertEquals(1, len(msg_queue)) 00181 validate_vals(msg_queue) 00182 # - buffer should be reset 00183 self.assertEquals(0, b.tell()) 00184 00185 del msg_queue[:] 00186 00187 #deserialize an incomplete message 00188 b.truncate(0) 00189 b.write(valids[0][:-1]) 00190 rospy.msg.deserialize_messages(b, msg_queue, data_class) 00191 self.failIf(msg_queue, "deserialize of an incomplete buffer returned %s"%msg_queue) 00192 00193 del msg_queue[:] 00194 00195 #deserialize with extra data leftover 00196 b.truncate(0) 00197 b.write(valids[0]+'leftovers') 00198 rospy.msg.deserialize_messages(b, msg_queue, data_class) 00199 self.assertEquals(1, len(msg_queue)) 00200 validate_vals(msg_queue) 00201 # - leftovers should be pushed to the front of the buffer 00202 self.assertEquals('leftovers', b.getvalue()) 00203 00204 del msg_queue[:] 00205 00206 #deserialize multiple values 00207 b.truncate(0) 00208 for v in valids: 00209 b.write(v) 00210 rospy.msg.deserialize_messages(b, msg_queue, data_class) 00211 self.assertEquals(len(valids), len(msg_queue)) 00212 validate_vals(msg_queue) 00213 # - buffer should be reset 00214 self.assertEquals(0, b.tell()) 00215 00216 del msg_queue[:] 00217 00218 #deserialize multiple values with max_msgs 00219 max_msgs = 5 00220 b.truncate(0) 00221 for v in valids: 00222 b.write(v) 00223 rospy.msg.deserialize_messages(b, msg_queue, data_class, max_msgs=max_msgs) 00224 self.assertEquals(max_msgs, len(msg_queue)) 00225 validate_vals(msg_queue) 00226 # - buffer should be have remaining msgs 00227 b2 = StringIO() 00228 for v in valids[max_msgs:]: 00229 b2.write(v) 00230 self.assertEquals(b.getvalue(), b2.getvalue()) 00231 00232 #deserialize rest and verify that msg_queue is appended to 00233 rospy.msg.deserialize_messages(b, msg_queue, data_class) 00234 self.assertEquals(len(valids), len(msg_queue)) 00235 validate_vals(msg_queue) 00236 00237 del msg_queue[:] 00238 00239 #deserialize multiple values with queue_size 00240 queue_size = 5 00241 b.truncate(0) 00242 for v in valids: 00243 b.write(v) 00244 # fill queue with junk 00245 msg_queue = [1, 2, 3, 4, 5, 6, 7, 9, 10, 11] 00246 rospy.msg.deserialize_messages(b, msg_queue, data_class, queue_size=queue_size) 00247 self.assertEquals(queue_size, len(msg_queue)) 00248 # - msg_queue should have the most recent values only 00249 validate_vals(msg_queue, teststrs[-queue_size:]) 00250 # - buffer should be reset 00251 self.assertEquals(0, b.tell()) 00252 00253 #deserialize multiple values with max_msgs and queue_size 00254 queue_size = 5 00255 max_msgs = 5 00256 b.truncate(0) 00257 for v in valids: 00258 b.write(v) 00259 # fill queue with junk 00260 msg_queue = [1, 2, 3, 4, 5, 6, 7, 9, 10, 11] 00261 rospy.msg.deserialize_messages(b, msg_queue, data_class, max_msgs=max_msgs, queue_size=queue_size) 00262 self.assertEquals(queue_size, len(msg_queue)) 00263 # - msg_queue should have the oldest messages 00264 validate_vals(msg_queue) 00265 # - buffer should be have remaining msgs 00266 b2 = StringIO() 00267 for v in valids[max_msgs:]: 00268 b2.write(v) 00269 self.assertEquals(b.getvalue(), b2.getvalue()) 00270 00271 00272 if __name__ == '__main__': 00273 import rostest 00274 rostest.unitrun('test_rospy', sys.argv[0], TestRospyMsg, coverage_packages=['rospy.msg'])