test_rospy_msg.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2008, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 try:
00039     from cStringIO import StringIO
00040 except ImportError:
00041     from io import StringIO
00042 import time
00043 import random
00044 
00045 import genpy
00046 
00047 class TestRospyMsg(unittest.TestCase):
00048 
00049     def test_args_kwds_to_message(self):
00050         import rospy
00051         from rospy.msg import args_kwds_to_message
00052         from test_rospy.msg import Val, Empty
00053         
00054         v = Val('hello world-1')
00055         d = args_kwds_to_message(Val, (v,), None)
00056         self.assert_(d == v)
00057         d = args_kwds_to_message(Val, ('hello world-2',), None)
00058         self.assertEquals(d.val, 'hello world-2')
00059         d = args_kwds_to_message(Val, (), {'val':'hello world-3'})
00060         self.assertEquals(d.val, 'hello world-3')
00061 
00062         # error cases
00063         try:
00064             args_kwds_to_message(Val, 'hi', val='hello world-3')
00065             self.fail("should not allow args and kwds")
00066         except TypeError: pass
00067         try:
00068             args_kwds_to_message(Empty, (Val('hola'),), None)
00069             self.fail("should raise TypeError when publishing Val msg to Empty topic")
00070         except TypeError: pass
00071 
00072     def test_serialize_message(self):
00073         import rospy.msg
00074         import rospy.rostime
00075         # have to fake-init rostime so that Header can be stamped
00076         rospy.rostime.set_rostime_initialized(True)
00077         
00078         buff = StringIO()
00079         seq = random.randint(1, 1000)
00080         
00081         #serialize_message(seq, msg)
00082         from test_rospy.msg import Val
00083 
00084         #serialize a simple 'Val' with a string in it
00085         teststr = 'foostr-%s'%time.time()
00086         val = Val(teststr)
00087 
00088         fmt = "<II%ss"%len(teststr)
00089         size = struct.calcsize(fmt) - 4
00090         valid = struct.pack(fmt, size, len(teststr), teststr)
00091         
00092         rospy.msg.serialize_message(buff, seq, val)
00093 
00094         self.assertEquals(valid, buff.getvalue())
00095         
00096         #test repeated serialization
00097         rospy.msg.serialize_message(buff, seq, val)
00098         rospy.msg.serialize_message(buff, seq, val)        
00099         self.assertEquals(valid*3, buff.getvalue())
00100 
00101         # - once more just to make sure that the buffer position is
00102         # being preserved properly
00103         buff.seek(0)
00104         rospy.msg.serialize_message(buff, seq, val)
00105         self.assertEquals(valid*3, buff.getvalue())        
00106         rospy.msg.serialize_message(buff, seq, val)
00107         self.assertEquals(valid*3, buff.getvalue()) 
00108         rospy.msg.serialize_message(buff, seq, val)        
00109         self.assertEquals(valid*3, buff.getvalue())        
00110 
00111         #test sequence parameter
00112         buff.truncate(0)
00113 
00114         from test_rospy.msg import HeaderVal
00115         t = rospy.Time.now()
00116         t.secs = t.secs - 1 # move it back in time
00117         h = rospy.Header(None, rospy.Time.now(), teststr)
00118         h.stamp = t
00119         val = HeaderVal(h, teststr)
00120         seq += 1
00121         
00122         rospy.msg.serialize_message(buff, seq, val)
00123         self.assertEquals(val.header, h)
00124         self.assertEquals(seq, h.seq)
00125         #should not have been changed
00126         self.assertEquals(t, h.stamp) 
00127         self.assertEquals(teststr, h.frame_id) 
00128         
00129         #test frame_id setting
00130         h.frame_id = None
00131         rospy.msg.serialize_message(buff, seq, val)
00132         self.assertEquals(val.header, h)
00133         self.assertEquals('0', h.frame_id) 
00134         
00135 
00136     def test_deserialize_messages(self):
00137         import rospy.msg
00138         from test_rospy.msg import Val
00139         num_tests = 10
00140         teststrs = ['foostr-%s'%random.randint(0, 10000) for i in range(0, num_tests)]
00141         valids = []
00142         for t in teststrs:
00143             fmt = "<II%ss"%len(t)
00144             size = struct.calcsize(fmt) - 4
00145             valids.append(struct.pack(fmt, size, len(t), t))
00146         data_class = Val
00147 
00148         def validate_vals(vals, teststrs=teststrs):
00149             for i, v in zip(range(0, len(vals)), vals):
00150                 self.assert_(isinstance(v, Val))
00151                 self.assertEquals(teststrs[i], v.val)        
00152         
00153         b = StringIO()
00154         msg_queue = []
00155         
00156         #test with null buff
00157         try:
00158             rospy.msg.deserialize_messages(None, msg_queue, data_class)
00159         except genpy.DeserializationError: pass
00160         #test will null msg_queue
00161         try:
00162             rospy.msg.deserialize_messages(b, None, data_class)
00163         except genpy.DeserializationError: pass
00164         #test with empty buff
00165         rospy.msg.deserialize_messages(b, msg_queue, data_class)
00166         self.assertEquals(0, len(msg_queue))
00167         self.assertEquals(0, b.tell())
00168 
00169         #deserialize a simple value
00170         b.truncate(0)
00171         b.write(valids[0])
00172         rospy.msg.deserialize_messages(b, msg_queue, data_class)
00173         self.assertEquals(1, len(msg_queue))
00174         validate_vals(msg_queue)
00175         # - buffer should be reset
00176         self.assertEquals(0, b.tell())         
00177         del msg_queue[:]
00178 
00179         #verify deserialize does not read past b.tell()
00180         b.truncate(0)
00181         b.write(valids[0])
00182         b.write(valids[1])        
00183         b.seek(len(valids[0]))
00184         rospy.msg.deserialize_messages(b, msg_queue, data_class)
00185         self.assertEquals(1, len(msg_queue))
00186         validate_vals(msg_queue)
00187         # - buffer should be reset
00188         self.assertEquals(0, b.tell())        
00189 
00190         del msg_queue[:]
00191 
00192         #deserialize an incomplete message
00193         b.truncate(0)
00194         b.write(valids[0][:-1])        
00195         rospy.msg.deserialize_messages(b, msg_queue, data_class)        
00196         self.failIf(msg_queue, "deserialize of an incomplete buffer returned %s"%msg_queue)
00197         
00198         del msg_queue[:]
00199         
00200         #deserialize with extra data leftover
00201         b.truncate(0)
00202         b.write(valids[0]+'leftovers')
00203         rospy.msg.deserialize_messages(b, msg_queue, data_class)
00204         self.assertEquals(1, len(msg_queue))
00205         validate_vals(msg_queue)
00206         # - leftovers should be pushed to the front of the buffer
00207         self.assertEquals('leftovers', b.getvalue())
00208         
00209         del msg_queue[:]
00210 
00211         #deserialize multiple values
00212         b.truncate(0)
00213         for v in valids:
00214             b.write(v)
00215         rospy.msg.deserialize_messages(b, msg_queue, data_class)
00216         self.assertEquals(len(valids), len(msg_queue))
00217         validate_vals(msg_queue)
00218         # - buffer should be reset
00219         self.assertEquals(0, b.tell())        
00220         
00221         del msg_queue[:]
00222 
00223         #deserialize multiple values with max_msgs
00224         max_msgs = 5
00225         b.truncate(0)
00226         for v in valids:
00227             b.write(v)
00228         rospy.msg.deserialize_messages(b, msg_queue, data_class, max_msgs=max_msgs)
00229         self.assertEquals(max_msgs, len(msg_queue))
00230         validate_vals(msg_queue)
00231         # - buffer should be have remaining msgs
00232         b2 = StringIO()
00233         for v in valids[max_msgs:]:
00234             b2.write(v)
00235         self.assertEquals(b.getvalue(), b2.getvalue())
00236 
00237         #deserialize rest and verify that msg_queue is appended to
00238         rospy.msg.deserialize_messages(b, msg_queue, data_class)
00239         self.assertEquals(len(valids), len(msg_queue))
00240         validate_vals(msg_queue)
00241         
00242         del msg_queue[:]
00243         
00244         #deserialize multiple values with queue_size
00245         queue_size = 5
00246         b.truncate(0)
00247         for v in valids:
00248             b.write(v)
00249         # fill queue with junk
00250         msg_queue = [1, 2, 3, 4, 5, 6, 7, 9, 10, 11]
00251         rospy.msg.deserialize_messages(b, msg_queue, data_class, queue_size=queue_size)
00252         self.assertEquals(queue_size, len(msg_queue))
00253         # - msg_queue should have the most recent values only
00254         validate_vals(msg_queue, teststrs[-queue_size:])
00255         # - buffer should be reset
00256         self.assertEquals(0, b.tell())        
00257         
00258         #deserialize multiple values with max_msgs and queue_size
00259         queue_size = 5
00260         max_msgs = 5
00261         b.truncate(0)
00262         for v in valids:
00263             b.write(v)
00264         # fill queue with junk
00265         msg_queue = [1, 2, 3, 4, 5, 6, 7, 9, 10, 11]
00266         rospy.msg.deserialize_messages(b, msg_queue, data_class, max_msgs=max_msgs, queue_size=queue_size)
00267         self.assertEquals(queue_size, len(msg_queue))
00268         # - msg_queue should have the oldest messages 
00269         validate_vals(msg_queue)
00270         # - buffer should be have remaining msgs
00271         b2 = StringIO()
00272         for v in valids[max_msgs:]:
00273             b2.write(v)
00274         self.assertEquals(b.getvalue(), b2.getvalue())
00275 


test_rospy
Author(s): Ken Conley
autogenerated on Thu Jun 6 2019 21:10:57