test_rospy_msg.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2008, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 from cStringIO import StringIO
00039 import time
00040 import random
00041 
00042 import genpy
00043 
00044 class TestRospyMsg(unittest.TestCase):
00045 
00046     def test_args_kwds_to_message(self):
00047         import rospy
00048         from rospy.msg import args_kwds_to_message
00049         from test_rospy.msg import Val
00050         
00051         v = Val('hello world-1')
00052         d = args_kwds_to_message(Val, (v,), None)
00053         self.assert_(d == v)
00054         d = args_kwds_to_message(Val, ('hello world-2',), None)
00055         self.assertEquals(d.val, 'hello world-2')
00056         d = args_kwds_to_message(Val, (), {'val':'hello world-3'})
00057         self.assertEquals(d.val, 'hello world-3')
00058 
00059         # error cases
00060         try:
00061             args_kwds_to_message(Val, 'hi', val='hello world-3')
00062             self.fail("should not allow args and kwds")
00063         except TypeError: pass
00064 
00065     def test_serialize_message(self):
00066         import rospy.msg
00067         import rospy.rostime
00068         # have to fake-init rostime so that Header can be stamped
00069         rospy.rostime.set_rostime_initialized(True)
00070         
00071         buff = StringIO()
00072         seq = random.randint(1, 1000)
00073         
00074         #serialize_message(seq, msg)
00075         from test_rospy.msg import Val
00076 
00077         #serialize a simple 'Val' with a string in it
00078         teststr = 'foostr-%s'%time.time()
00079         val = Val(teststr)
00080 
00081         fmt = "<II%ss"%len(teststr)
00082         size = struct.calcsize(fmt) - 4
00083         valid = struct.pack(fmt, size, len(teststr), teststr)
00084         
00085         rospy.msg.serialize_message(buff, seq, val)
00086 
00087         self.assertEquals(valid, buff.getvalue())
00088         
00089         #test repeated serialization
00090         rospy.msg.serialize_message(buff, seq, val)
00091         rospy.msg.serialize_message(buff, seq, val)        
00092         self.assertEquals(valid*3, buff.getvalue())
00093 
00094         # - once more just to make sure that the buffer position is
00095         # being preserved properly
00096         buff.seek(0)
00097         rospy.msg.serialize_message(buff, seq, val)
00098         self.assertEquals(valid*3, buff.getvalue())        
00099         rospy.msg.serialize_message(buff, seq, val)
00100         self.assertEquals(valid*3, buff.getvalue()) 
00101         rospy.msg.serialize_message(buff, seq, val)        
00102         self.assertEquals(valid*3, buff.getvalue())        
00103 
00104         #test sequence parameter
00105         buff.truncate(0)
00106 
00107         from test_rospy.msg import HeaderVal
00108         t = rospy.Time.now()
00109         t.secs = t.secs - 1 # move it back in time
00110         h = rospy.Header(None, rospy.Time.now(), teststr)
00111         h.stamp = t
00112         val = HeaderVal(h, teststr)
00113         seq += 1
00114         
00115         rospy.msg.serialize_message(buff, seq, val)
00116         self.assertEquals(val.header, h)
00117         self.assertEquals(seq, h.seq)
00118         #should not have been changed
00119         self.assertEquals(t, h.stamp) 
00120         self.assertEquals(teststr, h.frame_id) 
00121         
00122         #test frame_id setting
00123         h.frame_id = None
00124         rospy.msg.serialize_message(buff, seq, val)
00125         self.assertEquals(val.header, h)
00126         self.assertEquals('0', h.frame_id) 
00127         
00128 
00129     def test_deserialize_messages(self):
00130         import rospy.msg
00131         from test_rospy.msg import Val
00132         num_tests = 10
00133         teststrs = ['foostr-%s'%random.randint(0, 10000) for i in xrange(0, num_tests)]
00134         valids = []
00135         for t in teststrs:
00136             fmt = "<II%ss"%len(t)
00137             size = struct.calcsize(fmt) - 4
00138             valids.append(struct.pack(fmt, size, len(t), t))
00139         data_class = Val
00140 
00141         def validate_vals(vals, teststrs=teststrs):
00142             for i, v in zip(range(0, len(vals)), vals):
00143                 self.assert_(isinstance(v, Val))
00144                 self.assertEquals(teststrs[i], v.val)        
00145         
00146         b = StringIO()
00147         msg_queue = []
00148         
00149         #test with null buff
00150         try:
00151             rospy.msg.deserialize_messages(None, msg_queue, data_class)
00152         except genpy.DeserializationError: pass
00153         #test will null msg_queue
00154         try:
00155             rospy.msg.deserialize_messages(b, None, data_class)
00156         except genpy.DeserializationError: pass
00157         #test with empty buff
00158         rospy.msg.deserialize_messages(b, msg_queue, data_class)
00159         self.assertEquals(0, len(msg_queue))
00160         self.assertEquals(0, b.tell())
00161 
00162         #deserialize a simple value
00163         b.truncate(0)
00164         b.write(valids[0])
00165         rospy.msg.deserialize_messages(b, msg_queue, data_class)
00166         self.assertEquals(1, len(msg_queue))
00167         validate_vals(msg_queue)
00168         # - buffer should be reset
00169         self.assertEquals(0, b.tell())         
00170         del msg_queue[:]
00171 
00172         #verify deserialize does not read past b.tell()
00173         b.truncate(0)
00174         b.write(valids[0])
00175         b.write(valids[1])        
00176         b.seek(len(valids[0]))
00177         rospy.msg.deserialize_messages(b, msg_queue, data_class)
00178         self.assertEquals(1, len(msg_queue))
00179         validate_vals(msg_queue)
00180         # - buffer should be reset
00181         self.assertEquals(0, b.tell())        
00182 
00183         del msg_queue[:]
00184 
00185         #deserialize an incomplete message
00186         b.truncate(0)
00187         b.write(valids[0][:-1])        
00188         rospy.msg.deserialize_messages(b, msg_queue, data_class)        
00189         self.failIf(msg_queue, "deserialize of an incomplete buffer returned %s"%msg_queue)
00190         
00191         del msg_queue[:]
00192         
00193         #deserialize with extra data leftover
00194         b.truncate(0)
00195         b.write(valids[0]+'leftovers')
00196         rospy.msg.deserialize_messages(b, msg_queue, data_class)
00197         self.assertEquals(1, len(msg_queue))
00198         validate_vals(msg_queue)
00199         # - leftovers should be pushed to the front of the buffer
00200         self.assertEquals('leftovers', b.getvalue())
00201         
00202         del msg_queue[:]
00203 
00204         #deserialize multiple values
00205         b.truncate(0)
00206         for v in valids:
00207             b.write(v)
00208         rospy.msg.deserialize_messages(b, msg_queue, data_class)
00209         self.assertEquals(len(valids), len(msg_queue))
00210         validate_vals(msg_queue)
00211         # - buffer should be reset
00212         self.assertEquals(0, b.tell())        
00213         
00214         del msg_queue[:]
00215 
00216         #deserialize multiple values with max_msgs
00217         max_msgs = 5
00218         b.truncate(0)
00219         for v in valids:
00220             b.write(v)
00221         rospy.msg.deserialize_messages(b, msg_queue, data_class, max_msgs=max_msgs)
00222         self.assertEquals(max_msgs, len(msg_queue))
00223         validate_vals(msg_queue)
00224         # - buffer should be have remaining msgs
00225         b2 = StringIO()
00226         for v in valids[max_msgs:]:
00227             b2.write(v)
00228         self.assertEquals(b.getvalue(), b2.getvalue())
00229 
00230         #deserialize rest and verify that msg_queue is appended to
00231         rospy.msg.deserialize_messages(b, msg_queue, data_class)
00232         self.assertEquals(len(valids), len(msg_queue))
00233         validate_vals(msg_queue)
00234         
00235         del msg_queue[:]
00236         
00237         #deserialize multiple values with queue_size
00238         queue_size = 5
00239         b.truncate(0)
00240         for v in valids:
00241             b.write(v)
00242         # fill queue with junk
00243         msg_queue = [1, 2, 3, 4, 5, 6, 7, 9, 10, 11]
00244         rospy.msg.deserialize_messages(b, msg_queue, data_class, queue_size=queue_size)
00245         self.assertEquals(queue_size, len(msg_queue))
00246         # - msg_queue should have the most recent values only
00247         validate_vals(msg_queue, teststrs[-queue_size:])
00248         # - buffer should be reset
00249         self.assertEquals(0, b.tell())        
00250         
00251         #deserialize multiple values with max_msgs and queue_size
00252         queue_size = 5
00253         max_msgs = 5
00254         b.truncate(0)
00255         for v in valids:
00256             b.write(v)
00257         # fill queue with junk
00258         msg_queue = [1, 2, 3, 4, 5, 6, 7, 9, 10, 11]
00259         rospy.msg.deserialize_messages(b, msg_queue, data_class, max_msgs=max_msgs, queue_size=queue_size)
00260         self.assertEquals(queue_size, len(msg_queue))
00261         # - msg_queue should have the oldest messages 
00262         validate_vals(msg_queue)
00263         # - buffer should be have remaining msgs
00264         b2 = StringIO()
00265         for v in valids[max_msgs:]:
00266             b2.write(v)
00267         self.assertEquals(b.getvalue(), b2.getvalue())
00268 


test_rospy
Author(s): Ken Conley
autogenerated on Fri Aug 28 2015 12:33:56