$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2008, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id$ 00035 00036 PKG = 'test_roslib_comm' 00037 NAME = 'test_genpy' 00038 import roslib; roslib.load_manifest(PKG) 00039 00040 import sys 00041 import unittest 00042 import cStringIO 00043 import time 00044 00045 import rosunit 00046 00047 # NOTE: roslib.genpy is in the roslib package to prevent circular 00048 # dependencies on messages in the roslib package 00049 # (Header/Time/Duration) 00050 00051 class TestGenpy(unittest.TestCase): 00052 00053 def setUp(self): 00054 pass 00055 00056 ## Test genpy.reduce_pattern 00057 def test_reduce_pattern(self): 00058 tests = [ 00059 ('', ''), 00060 ('hhhh', '4h'), 00061 ('hhhhi', '4hi'), 00062 ('hhhhiiiibbb', '4h4i3b'), 00063 ('1h2h3h', '1h2h3h'), 00064 ('hIi', 'hIi'), 00065 ('66h', '66h'), 00066 ('%ss', '%ss'), #don't reduce strings with format chars in them 00067 ('<I', '<I'), 00068 ('<11s', '<11s'), 00069 ] 00070 from roslib.genpy import reduce_pattern 00071 for input, result in tests: 00072 self.assertEquals(result, reduce_pattern(input)) 00073 def test_is_simple(self): 00074 from roslib.genpy import is_simple 00075 for t in ['uint8', 'int8', 'uint16', 'int16', 'uint32', 'int32', 'uint64', 'int64', 'float32', 'float64']: 00076 self.assert_(is_simple(t)) 00077 def test_SIMPLE_TYPES(self): 00078 import roslib.msgs 00079 import roslib.genpy 00080 # tripwire to make sure we don't add builtin types without making sure that simple types has been updated 00081 self.assertEquals(set(['string', 'time', 'duration']), 00082 set(roslib.msgs.BUILTIN_TYPES) - set(roslib.genpy.SIMPLE_TYPES)) 00083 def test_is_special(self): 00084 from roslib.genpy import is_special 00085 for t in ['time', 'duration', 'Header']: 00086 self.assert_(is_special(t)) 00087 def test_Simple(self): 00088 from roslib.genpy import get_special 00089 self.assertEquals('import roslib.rostime', get_special('time').import_str) 00090 self.assertEquals('import roslib.rostime', get_special('duration').import_str) 00091 self.assertEquals('import std_msgs.msg', get_special('Header').import_str) 00092 00093 self.assertEquals('roslib.rostime.Time()', get_special('time').constructor) 00094 self.assertEquals('roslib.rostime.Duration()', get_special('duration').constructor) 00095 self.assertEquals('std_msgs.msg._Header.Header()', get_special('Header').constructor) 00096 00097 self.assertEquals('self.foo.canon()', get_special('time').get_post_deserialize('self.foo')) 00098 self.assertEquals('bar.canon()', get_special('time').get_post_deserialize('bar')) 00099 self.assertEquals('self.foo.canon()', get_special('duration').get_post_deserialize('self.foo')) 00100 self.assertEquals(None, get_special('Header').get_post_deserialize('self.foo')) 00101 00102 def test_compute_post_deserialize(self): 00103 from roslib.genpy import compute_post_deserialize 00104 self.assertEquals('self.bar.canon()', compute_post_deserialize('time', 'self.bar')) 00105 self.assertEquals('self.bar.canon()', compute_post_deserialize('duration', 'self.bar')) 00106 self.assertEquals(None, compute_post_deserialize('Header', 'self.bar')) 00107 00108 self.assertEquals(None, compute_post_deserialize('int8', 'self.bar')) 00109 self.assertEquals(None, compute_post_deserialize('string', 'self.bar')) 00110 00111 def test_compute_struct_pattern(self): 00112 from roslib.genpy import compute_struct_pattern 00113 self.assertEquals(None, compute_struct_pattern(None)) 00114 self.assertEquals(None, compute_struct_pattern([])) 00115 # string should immediately bork any simple types 00116 self.assertEquals(None, compute_struct_pattern(['string'])) 00117 self.assertEquals(None, compute_struct_pattern(['uint32', 'string'])) 00118 self.assertEquals(None, compute_struct_pattern(['string', 'int32'])) 00119 # array types should not compute 00120 self.assertEquals(None, compute_struct_pattern(['uint32[]'])) 00121 self.assertEquals(None, compute_struct_pattern(['uint32[1]'])) 00122 00123 self.assertEquals("B", compute_struct_pattern(['uint8'])) 00124 self.assertEquals("BB", compute_struct_pattern(['uint8', 'uint8'])) 00125 self.assertEquals("B", compute_struct_pattern(['char'])) 00126 self.assertEquals("BB", compute_struct_pattern(['char', 'char'])) 00127 self.assertEquals("b", compute_struct_pattern(['byte'])) 00128 self.assertEquals("bb", compute_struct_pattern(['byte', 'byte'])) 00129 self.assertEquals("b", compute_struct_pattern(['int8'])) 00130 self.assertEquals("bb", compute_struct_pattern(['int8', 'int8'])) 00131 self.assertEquals("H", compute_struct_pattern(['uint16'])) 00132 self.assertEquals("HH", compute_struct_pattern(['uint16', 'uint16'])) 00133 self.assertEquals("h", compute_struct_pattern(['int16'])) 00134 self.assertEquals("hh", compute_struct_pattern(['int16', 'int16'])) 00135 self.assertEquals("I", compute_struct_pattern(['uint32'])) 00136 self.assertEquals("II", compute_struct_pattern(['uint32', 'uint32'])) 00137 self.assertEquals("i", compute_struct_pattern(['int32'])) 00138 self.assertEquals("ii", compute_struct_pattern(['int32', 'int32'])) 00139 self.assertEquals("Q", compute_struct_pattern(['uint64'])) 00140 self.assertEquals("QQ", compute_struct_pattern(['uint64', 'uint64'])) 00141 self.assertEquals("q", compute_struct_pattern(['int64'])) 00142 self.assertEquals("qq", compute_struct_pattern(['int64', 'int64'])) 00143 self.assertEquals("f", compute_struct_pattern(['float32'])) 00144 self.assertEquals("ff", compute_struct_pattern(['float32', 'float32'])) 00145 self.assertEquals("d", compute_struct_pattern(['float64'])) 00146 self.assertEquals("dd", compute_struct_pattern(['float64', 'float64'])) 00147 00148 self.assertEquals("bBhHiIqQfd", compute_struct_pattern(['int8', 'uint8', 'int16', 'uint16', 'int32', 'uint32', 'int64', 'uint64', 'float32', 'float64'])) 00149 00150 def test_flatten(self): 00151 from roslib.msgs import register, MsgSpec 00152 from roslib.genpy import flatten 00153 00154 simple = MsgSpec(['string'], ['data'], [], 'string data\n') 00155 simple2 = MsgSpec(['string', 'int32'], ['data', 'data2'], [], 'string data\nint32 data2\n') 00156 self.assertEquals(simple, flatten(simple)) 00157 self.assertEquals(simple2, flatten(simple2)) 00158 00159 b1 = MsgSpec(['int8'], ['data'], [], 'X') 00160 b2 = MsgSpec(['f_msgs/Base'], ['data'], [], 'X') 00161 b3 = MsgSpec(['f_msgs/Base2', 'f_msgs/Base2'], ['data3', 'data4'], [], 'X') 00162 b4 = MsgSpec(['f_msgs/Base3', 'f_msgs/Base3'], ['dataA', 'dataB'], [], 'X') 00163 register('f_msgs/Base', b1) 00164 register('f_msgs/Base2', b2) 00165 register('f_msgs/Base3', b3) 00166 register('f_msgs/Base4', b4) 00167 00168 self.assertEquals(MsgSpec(['int8'], ['data.data'], [], 'X'), flatten(b2)) 00169 self.assertEquals(MsgSpec(['int8', 'int8'], ['data3.data.data', 'data4.data.data'], [], 'X'), flatten(b3)) 00170 self.assertEquals(MsgSpec(['int8', 'int8', 'int8', 'int8'], 00171 ['dataA.data3.data.data', 'dataA.data4.data.data', 'dataB.data3.data.data', 'dataB.data4.data.data'], 00172 [], 'X'), flatten(b4)) 00173 00174 def test_numpy_dtype(self): 00175 for t in roslib.genpy.SIMPLE_TYPES: 00176 self.assert_(t in roslib.genpy._NUMPY_DTYPE) 00177 00178 def test_default_value(self): 00179 from roslib.msgs import register, MsgSpec 00180 from roslib.genpy import default_value 00181 00182 register('fake_msgs/String', MsgSpec(['string'], ['data'], [], 'string data\n')) 00183 register('fake_msgs/ThreeNums', MsgSpec(['int32', 'int32', 'int32'], ['x', 'y', 'z'], [], 'int32 x\nint32 y\nint32 z\n')) 00184 00185 # trip-wire: make sure all builtins have a default value 00186 for t in roslib.msgs.BUILTIN_TYPES: 00187 self.assert_(type(default_value(t, 'roslib')) == str) 00188 00189 # simple types first 00190 for t in ['uint8', 'int8', 'uint16', 'int16', 'uint32', 'int32', 'uint64', 'int64', 'byte', 'char']: 00191 self.assertEquals('0', default_value(t, 'std_msgs')) 00192 self.assertEquals('0', default_value(t, 'roslib')) 00193 for t in ['float32', 'float64']: 00194 self.assertEquals('0.', default_value(t, 'std_msgs')) 00195 self.assertEquals('0.', default_value(t, 'roslib')) 00196 self.assertEquals("''", default_value('string', 'roslib')) 00197 00198 # builtin specials 00199 self.assertEquals('roslib.rostime.Time()', default_value('time', 'roslib')) 00200 self.assertEquals('roslib.rostime.Duration()', default_value('duration', 'roslib')) 00201 self.assertEquals('std_msgs.msg._Header.Header()', default_value('Header', 'roslib')) 00202 00203 self.assertEquals('roslib.rostime.Time()', default_value('time', 'std_msgs')) 00204 self.assertEquals('roslib.rostime.Duration()', default_value('duration', 'std_msgs')) 00205 self.assertEquals('std_msgs.msg._Header.Header()', default_value('Header', 'std_msgs')) 00206 00207 # generic instances 00208 # - unregistered type 00209 self.assertEquals(None, default_value("unknown_msgs/Foo", "unknown_msgs")) 00210 # - wrong context 00211 self.assertEquals(None, default_value('ThreeNums', 'std_msgs')) 00212 00213 # - registered types 00214 self.assertEquals('fake_msgs.msg.String()', default_value('fake_msgs/String', 'std_msgs')) 00215 self.assertEquals('fake_msgs.msg.String()', default_value('fake_msgs/String', 'fake_msgs')) 00216 self.assertEquals('fake_msgs.msg.String()', default_value('String', 'fake_msgs')) 00217 self.assertEquals('fake_msgs.msg.ThreeNums()', default_value('fake_msgs/ThreeNums', 'roslib')) 00218 self.assertEquals('fake_msgs.msg.ThreeNums()', default_value('fake_msgs/ThreeNums', 'fake_msgs')) 00219 self.assertEquals('fake_msgs.msg.ThreeNums()', default_value('ThreeNums', 'fake_msgs')) 00220 00221 # var-length arrays always default to empty arrays... except for byte and uint8 which are strings 00222 for t in ['int8', 'uint16', 'int16', 'uint32', 'int32', 'uint64', 'int64', 'float32', 'float64']: 00223 self.assertEquals('[]', default_value(t+'[]', 'std_msgs')) 00224 self.assertEquals('[]', default_value(t+'[]', 'roslib')) 00225 00226 self.assertEquals("''", default_value('uint8[]', 'roslib')) 00227 00228 # fixed-length arrays should be zero-filled... except for uint8 which is a string 00229 for t in ['float32', 'float64']: 00230 self.assertEquals('[0.,0.,0.]', default_value(t+'[3]', 'std_msgs')) 00231 self.assertEquals('[0.]', default_value(t+'[1]', 'std_msgs')) 00232 for t in ['int8', 'uint16', 'int16', 'uint32', 'int32', 'uint64', 'int64']: 00233 self.assertEquals('[0,0,0,0]', default_value(t+'[4]', 'std_msgs')) 00234 self.assertEquals('[0]', default_value(t+'[1]', 'roslib')) 00235 00236 self.assertEquals("chr(0)*1", default_value('uint8[1]', 'roslib')) 00237 self.assertEquals("chr(0)*4", default_value('uint8[4]', 'roslib')) 00238 00239 self.assertEquals('[]', default_value('fake_msgs/String[]', 'std_msgs')) 00240 self.assertEquals('[fake_msgs.msg.String(),fake_msgs.msg.String()]', default_value('fake_msgs/String[2]', 'std_msgs')) 00241 00242 def test_make_python_safe(self): 00243 from roslib.msgs import register, MsgSpec, Constant 00244 from roslib.genpy import make_python_safe 00245 s = MsgSpec(['int32', 'int32', 'int32', 'int32'], ['ok', 'if', 'self', 'fine'], 00246 [Constant('int32', 'if', '1', '1'), Constant('int32', 'okgo', '1', '1')], 00247 'x') 00248 s2 = make_python_safe(s) 00249 self.assertNotEquals(s, s2) 00250 self.assertEquals(['ok', 'if_', 'self_', 'fine'], s2.names) 00251 self.assertEquals(s2.types, s.types) 00252 self.assertEquals([Constant('int32', 'if_', '1', '1'), Constant('int32', 'okgo', '1', '1')], s2.constants) 00253 self.assertEquals(s2.text, s.text) 00254 00255 def test_compute_pkg_type(self): 00256 from roslib.genpy import compute_pkg_type, MsgGenerationException 00257 try: 00258 compute_pkg_type('std_msgs', 'really/bad/std_msgs/String') 00259 except MsgGenerationException: pass 00260 self.assertEquals(('std_msgs', 'String'), compute_pkg_type('std_msgs', 'std_msgs/String')) 00261 self.assertEquals(('std_msgs', 'String'), compute_pkg_type('foo', 'std_msgs/String')) 00262 self.assertEquals(('std_msgs', 'String'), compute_pkg_type('std_msgs', 'String')) 00263 00264 def test_compute_import(self): 00265 from roslib.msgs import register, MsgSpec 00266 from roslib.genpy import compute_import 00267 00268 self.assertEquals([], compute_import('foo', 'bar')) 00269 self.assertEquals([], compute_import('foo', 'int32')) 00270 00271 register('ci_msgs/Base', MsgSpec(['int8'], ['data'], [], 'int8 data\n')) 00272 register('ci2_msgs/Base2', MsgSpec(['ci_msgs/Base'], ['data2'], [], 'ci_msgs/Base data2\n')) 00273 register('ci3_msgs/Base3', MsgSpec(['ci2_msgs/Base2'], ['data3'], [], 'ci2_msgs/Base2 data3\n')) 00274 register('ci4_msgs/Base', MsgSpec(['int8'], ['data'], [], 'int8 data\n')) 00275 register('ci4_msgs/Base4', MsgSpec(['ci2_msgs/Base2', 'ci3_msgs/Base3', 'ci4_msgs/Base'], 00276 ['data4a', 'data4b', 'data4c'], 00277 [], 'ci2_msgs/Base2 data4a\nci3_msgs/Base3 data4b\nci4_msgs/Base data4c\n')) 00278 00279 register('ci5_msgs/Base', MsgSpec(['time'], ['data'], [], 'time data\n')) 00280 00281 self.assertEquals(['import ci_msgs.msg'], compute_import('foo', 'ci_msgs/Base')) 00282 self.assertEquals(['import ci_msgs.msg'], compute_import('ci_msgs', 'ci_msgs/Base')) 00283 self.assertEquals(['import ci2_msgs.msg', 'import ci_msgs.msg'], compute_import('ci2_msgs', 'ci2_msgs/Base2')) 00284 self.assertEquals(['import ci2_msgs.msg', 'import ci_msgs.msg'], compute_import('foo', 'ci2_msgs/Base2')) 00285 self.assertEquals(['import ci3_msgs.msg', 'import ci2_msgs.msg', 'import ci_msgs.msg'], compute_import('ci3_msgs', 'ci3_msgs/Base3')) 00286 00287 self.assertEquals(set(['import ci4_msgs.msg', 'import ci3_msgs.msg', 'import ci2_msgs.msg', 'import ci_msgs.msg']), 00288 set(compute_import('foo', 'ci4_msgs/Base4'))) 00289 self.assertEquals(set(['import ci4_msgs.msg', 'import ci3_msgs.msg', 'import ci2_msgs.msg', 'import ci_msgs.msg']), 00290 set(compute_import('ci4_msgs', 'ci4_msgs/Base4'))) 00291 00292 self.assertEquals(['import ci4_msgs.msg'], compute_import('foo', 'ci4_msgs/Base')) 00293 self.assertEquals(['import ci4_msgs.msg'], compute_import('ci4_msgs', 'ci4_msgs/Base')) 00294 self.assertEquals(['import ci4_msgs.msg'], compute_import('ci4_msgs', 'Base')) 00295 00296 self.assertEquals(['import ci5_msgs.msg', 'import roslib.rostime'], compute_import('foo', 'ci5_msgs/Base')) 00297 00298 def test_get_registered_ex(self): 00299 from roslib.msgs import MsgSpec, register 00300 from roslib.genpy import MsgGenerationException, get_registered_ex 00301 s = MsgSpec(['string'], ['data'], [], 'string data\n') 00302 register('tgr_msgs/String', s) 00303 self.assertEquals(s, get_registered_ex('tgr_msgs/String')) 00304 try: 00305 get_registered_ex('bad_msgs/String') 00306 except MsgGenerationException: pass 00307 00308 def test_compute_constructor(self): 00309 from roslib.msgs import register, MsgSpec 00310 from roslib.genpy import compute_constructor 00311 register('fake_msgs/String', MsgSpec(['string'], ['data'], [], 'string data\n')) 00312 register('fake_msgs/ThreeNums', MsgSpec(['int32', 'int32', 'int32'], ['x', 'y', 'z'], [], 'int32 x\nint32 y\nint32 z\n')) 00313 00314 # builtin specials 00315 self.assertEquals('roslib.rostime.Time()', compute_constructor('roslib', 'time')) 00316 self.assertEquals('roslib.rostime.Duration()', compute_constructor('roslib', 'duration')) 00317 self.assertEquals('std_msgs.msg._Header.Header()', compute_constructor('std_msgs', 'Header')) 00318 00319 self.assertEquals('roslib.rostime.Time()', compute_constructor('std_msgs', 'time')) 00320 self.assertEquals('roslib.rostime.Duration()', compute_constructor('std_msgs', 'duration')) 00321 00322 # generic instances 00323 # - unregistered type 00324 self.assertEquals(None, compute_constructor("unknown_msgs", "unknown_msgs/Foo")) 00325 self.assertEquals(None, compute_constructor("unknown_msgs", "Foo")) 00326 # - wrong context 00327 self.assertEquals(None, compute_constructor('std_msgs', 'ThreeNums')) 00328 00329 # - registered types 00330 self.assertEquals('fake_msgs.msg.String()', compute_constructor('std_msgs', 'fake_msgs/String')) 00331 self.assertEquals('fake_msgs.msg.String()', compute_constructor('fake_msgs', 'fake_msgs/String')) 00332 self.assertEquals('fake_msgs.msg.String()', compute_constructor('fake_msgs', 'String')) 00333 self.assertEquals('fake_msgs.msg.ThreeNums()', compute_constructor('fake_msgs', 'fake_msgs/ThreeNums')) 00334 self.assertEquals('fake_msgs.msg.ThreeNums()', compute_constructor('fake_msgs', 'fake_msgs/ThreeNums')) 00335 self.assertEquals('fake_msgs.msg.ThreeNums()', compute_constructor('fake_msgs', 'ThreeNums')) 00336 00337 def test_pack(self): 00338 from roslib.genpy import pack 00339 self.assertEquals("buff.write(_struct_3lL3bB.pack(foo, bar))", pack('lllLbbbB', 'foo, bar')) 00340 00341 def test_pack2(self): 00342 from roslib.genpy import pack2 00343 self.assertEquals('buff.write(struct.pack(patt_name, foo, bar))', pack2('patt_name', 'foo, bar')) 00344 00345 def test_unpack(self): 00346 from roslib.genpy import unpack 00347 self.assertEquals("var_x = _struct_I3if2I.unpack(bname)", unpack('var_x', 'IiiifII', 'bname')) 00348 00349 def test_unpack2(self): 00350 from roslib.genpy import unpack2 00351 self.assertEquals('x = struct.unpack(patt, b)', unpack2('x', 'patt', 'b')) 00352 00353 def test_generate_dynamic(self): 00354 from roslib.genpy import generate_dynamic 00355 msgs = generate_dynamic("gd_msgs/EasyString", "string data\n") 00356 self.assertEquals(['gd_msgs/EasyString'], msgs.keys()) 00357 m_cls = msgs['gd_msgs/EasyString'] 00358 m_instance = m_cls() 00359 m_instance.data = 'foo' 00360 buff = cStringIO.StringIO() 00361 m_instance.serialize(buff) 00362 m_cls().deserialize(buff.getvalue()) 00363 00364 # 'probot_msgs' is a test for #1183, failure if the package no longer exists 00365 msgs = generate_dynamic("gd_msgs/MoveArmState", """Header header 00366 probot_msgs/ControllerStatus status 00367 00368 #Current arm configuration 00369 probot_msgs/JointState[] configuration 00370 #Goal arm configuration 00371 probot_msgs/JointState[] goal 00372 00373 ================================================================================ 00374 MSG: std_msgs/Header 00375 #Standard metadata for higher-level flow data types 00376 #sequence ID: consecutively increasing ID 00377 uint32 seq 00378 #Two-integer timestamp that is expressed as: 00379 # * stamp.secs: seconds (stamp_secs) since epoch 00380 # * stamp.nsecs: nanoseconds since stamp_secs 00381 # time-handling sugar is provided by the client library 00382 time stamp 00383 #Frame this data is associated with 00384 # 0: no frame 00385 # 1: global frame 00386 string frame_id 00387 00388 ================================================================================ 00389 MSG: probot_msgs/ControllerStatus 00390 # This message defines the expected format for Controller Statuss messages 00391 # Embed this in the feedback state message of highlevel controllers 00392 byte UNDEFINED=0 00393 byte SUCCESS=1 00394 byte ABORTED=2 00395 byte PREEMPTED=3 00396 byte ACTIVE=4 00397 00398 # Status of the controller = {UNDEFINED, SUCCESS, ABORTED, PREEMPTED, ACTIVE} 00399 byte value 00400 00401 #Comment for debug 00402 string comment 00403 ================================================================================ 00404 MSG: probot_msgs/JointState 00405 string name 00406 float64 position 00407 float64 velocity 00408 float64 applied_effort 00409 float64 commanded_effort 00410 byte is_calibrated 00411 00412 """) 00413 self.assertEquals(set(['gd_msgs/MoveArmState', 'probot_msgs/JointState', 'probot_msgs/ControllerStatus', 'std_msgs/Header']), 00414 set(msgs.keys())) 00415 import roslib.rostime 00416 import time 00417 m_instance1 = msgs['std_msgs/Header']() # make sure default constructor works 00418 m_instance2 = msgs['std_msgs/Header'](stamp=roslib.rostime.Time.from_sec(time.time()), frame_id='foo-%s'%time.time(), seq=12390) 00419 self._test_ser_deser(m_instance2, m_instance1) 00420 00421 m_instance1 = msgs['probot_msgs/ControllerStatus']() 00422 m_instance2 = msgs['probot_msgs/ControllerStatus'](value=4, comment=str(time.time())) 00423 d = {'UNDEFINED':0,'SUCCESS':1,'ABORTED':2,'PREEMPTED':3,'ACTIVE':4} 00424 for k, v in d.iteritems(): 00425 self.assertEquals(v, getattr(m_instance1, k)) 00426 self._test_ser_deser(m_instance2, m_instance1) 00427 00428 m_instance1 = msgs['probot_msgs/JointState']() 00429 m_instance2 = msgs['probot_msgs/JointState'](position=time.time(), velocity=time.time(), applied_effort=time.time(), commanded_effort=time.time(), is_calibrated=2) 00430 self._test_ser_deser(m_instance2, m_instance1) 00431 00432 m_instance1 = msgs['gd_msgs/MoveArmState']() 00433 js = msgs['probot_msgs/JointState'] 00434 config = [] 00435 goal = [] 00436 # generate some data for config/goal 00437 for i in range(0, 10): 00438 config.append(js(position=time.time(), velocity=time.time(), applied_effort=time.time(), commanded_effort=time.time(), is_calibrated=2)) 00439 goal.append(js(position=time.time(), velocity=time.time(), applied_effort=time.time(), commanded_effort=time.time(), is_calibrated=2)) 00440 m_instance2 = msgs['gd_msgs/MoveArmState'](header=msgs['std_msgs/Header'](), 00441 status=msgs['probot_msgs/ControllerStatus'](), 00442 configuration=config, goal=goal) 00443 self._test_ser_deser(m_instance2, m_instance1) 00444 00445 def _test_ser_deser(self, m_instance1, m_instance2): 00446 buff = cStringIO.StringIO() 00447 m_instance1.serialize(buff) 00448 m_instance2.deserialize(buff.getvalue()) 00449 self.assertEquals(m_instance1, m_instance2) 00450 00451 def test_len_serializer_generator(self): 00452 # generator tests are mainly tripwires/coverage tests 00453 from roslib.genpy import len_serializer_generator 00454 # Test Serializers 00455 # string serializer simply initializes local var 00456 g = len_serializer_generator('foo', True, True) 00457 self.assertEquals('length = len(foo)', '\n'.join(g)) 00458 # array len serializer writes var 00459 g = len_serializer_generator('foo', False, True) 00460 self.assertEquals("length = len(foo)\nbuff.write(_struct_I.pack(length))", '\n'.join(g)) 00461 00462 # Test Deserializers 00463 val = """start = end 00464 end += 4 00465 (length,) = _struct_I.unpack(str[start:end])""" 00466 # string serializer and array serializer are identical 00467 g = len_serializer_generator('foo', True, False) 00468 self.assertEquals(val, '\n'.join(g)) 00469 g = len_serializer_generator('foo', False, False) 00470 self.assertEquals(val, '\n'.join(g)) 00471 00472 def test_string_serializer_generator(self): 00473 # generator tests are mainly tripwires/coverage tests 00474 from roslib.genpy import string_serializer_generator 00475 # Test Serializers 00476 g = string_serializer_generator('foo', 'string', 'var_name', True) 00477 self.assertEquals("""length = len(var_name) 00478 buff.write(struct.pack('<I%ss'%length, length, var_name))""", '\n'.join(g)) 00479 00480 for t in ['uint8[]', 'byte[]', 'uint8[10]', 'byte[20]']: 00481 g = string_serializer_generator('foo', 'uint8[]', 'b_name', True) 00482 self.assertEquals("""length = len(b_name) 00483 # - if encoded as a list instead, serialize as bytes instead of string 00484 if type(b_name) in [list, tuple]: 00485 buff.write(struct.pack('<I%sB'%length, length, *b_name)) 00486 else: 00487 buff.write(struct.pack('<I%ss'%length, length, b_name))""", '\n'.join(g)) 00488 00489 # Test Deserializers 00490 val = """start = end 00491 end += 4 00492 (length,) = _struct_I.unpack(str[start:end]) 00493 start = end 00494 end += length 00495 var_name = str[start:end]""" 00496 # string serializer and array serializer are identical 00497 g = string_serializer_generator('foo', 'string', 'var_name', False) 00498 self.assertEquals(val, '\n'.join(g)) 00499 00500 if __name__ == '__main__': 00501 rosunit.unitrun(PKG, NAME, TestGenpy, sys.argv, coverage_packages=['roslib.genpy'])