00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 PKG = 'test_roslib_comm'
00037 NAME = 'test_genpy'
00038 import roslib; roslib.load_manifest(PKG)
00039
00040 import sys
00041 import unittest
00042 import cStringIO
00043 import time
00044
00045 import rosunit
00046
00047
00048
00049
00050
00051 class TestGenpy(unittest.TestCase):
00052
00053 def setUp(self):
00054 pass
00055
00056
00057 def test_reduce_pattern(self):
00058 tests = [
00059 ('', ''),
00060 ('hhhh', '4h'),
00061 ('hhhhi', '4hi'),
00062 ('hhhhiiiibbb', '4h4i3b'),
00063 ('1h2h3h', '1h2h3h'),
00064 ('hIi', 'hIi'),
00065 ('66h', '66h'),
00066 ('%ss', '%ss'),
00067 ('<I', '<I'),
00068 ('<11s', '<11s'),
00069 ]
00070 from roslib.genpy import reduce_pattern
00071 for input, result in tests:
00072 self.assertEquals(result, reduce_pattern(input))
00073 def test_is_simple(self):
00074 from roslib.genpy import is_simple
00075 for t in ['uint8', 'int8', 'uint16', 'int16', 'uint32', 'int32', 'uint64', 'int64', 'float32', 'float64', 'byte', 'char']:
00076 self.assert_(is_simple(t))
00077 def test_SIMPLE_TYPES(self):
00078 import roslib.msgs
00079 import roslib.genpy
00080
00081 self.assertEquals(set(['string', 'time', 'duration']),
00082 set(roslib.msgs.BUILTIN_TYPES) - set(roslib.genpy.SIMPLE_TYPES))
00083 def test_is_special(self):
00084 from roslib.genpy import is_special
00085 for t in ['time', 'duration', 'Header']:
00086 self.assert_(is_special(t))
00087 def test_Simple(self):
00088 from roslib.genpy import get_special
00089 self.assertEquals('import roslib.rostime', get_special('time').import_str)
00090 self.assertEquals('import roslib.rostime', get_special('duration').import_str)
00091 self.assertEquals('import std_msgs.msg', get_special('Header').import_str)
00092
00093 self.assertEquals('roslib.rostime.Time()', get_special('time').constructor)
00094 self.assertEquals('roslib.rostime.Duration()', get_special('duration').constructor)
00095 self.assertEquals('std_msgs.msg._Header.Header()', get_special('Header').constructor)
00096
00097 self.assertEquals('self.foo.canon()', get_special('time').get_post_deserialize('self.foo'))
00098 self.assertEquals('bar.canon()', get_special('time').get_post_deserialize('bar'))
00099 self.assertEquals('self.foo.canon()', get_special('duration').get_post_deserialize('self.foo'))
00100 self.assertEquals(None, get_special('Header').get_post_deserialize('self.foo'))
00101
00102 def test_compute_post_deserialize(self):
00103 from roslib.genpy import compute_post_deserialize
00104 self.assertEquals('self.bar.canon()', compute_post_deserialize('time', 'self.bar'))
00105 self.assertEquals('self.bar.canon()', compute_post_deserialize('duration', 'self.bar'))
00106 self.assertEquals(None, compute_post_deserialize('Header', 'self.bar'))
00107
00108 self.assertEquals(None, compute_post_deserialize('int8', 'self.bar'))
00109 self.assertEquals(None, compute_post_deserialize('string', 'self.bar'))
00110
00111 def test_compute_struct_pattern(self):
00112 from roslib.genpy import compute_struct_pattern
00113 self.assertEquals(None, compute_struct_pattern(None))
00114 self.assertEquals(None, compute_struct_pattern([]))
00115
00116 self.assertEquals(None, compute_struct_pattern(['string']))
00117 self.assertEquals(None, compute_struct_pattern(['uint32', 'string']))
00118 self.assertEquals(None, compute_struct_pattern(['string', 'int32']))
00119
00120 self.assertEquals(None, compute_struct_pattern(['uint32[]']))
00121 self.assertEquals(None, compute_struct_pattern(['uint32[1]']))
00122
00123 self.assertEquals("B", compute_struct_pattern(['uint8']))
00124 self.assertEquals("BB", compute_struct_pattern(['uint8', 'uint8']))
00125 self.assertEquals("B", compute_struct_pattern(['char']))
00126 self.assertEquals("BB", compute_struct_pattern(['char', 'char']))
00127 self.assertEquals("b", compute_struct_pattern(['byte']))
00128 self.assertEquals("bb", compute_struct_pattern(['byte', 'byte']))
00129 self.assertEquals("b", compute_struct_pattern(['int8']))
00130 self.assertEquals("bb", compute_struct_pattern(['int8', 'int8']))
00131 self.assertEquals("H", compute_struct_pattern(['uint16']))
00132 self.assertEquals("HH", compute_struct_pattern(['uint16', 'uint16']))
00133 self.assertEquals("h", compute_struct_pattern(['int16']))
00134 self.assertEquals("hh", compute_struct_pattern(['int16', 'int16']))
00135 self.assertEquals("I", compute_struct_pattern(['uint32']))
00136 self.assertEquals("II", compute_struct_pattern(['uint32', 'uint32']))
00137 self.assertEquals("i", compute_struct_pattern(['int32']))
00138 self.assertEquals("ii", compute_struct_pattern(['int32', 'int32']))
00139 self.assertEquals("Q", compute_struct_pattern(['uint64']))
00140 self.assertEquals("QQ", compute_struct_pattern(['uint64', 'uint64']))
00141 self.assertEquals("q", compute_struct_pattern(['int64']))
00142 self.assertEquals("qq", compute_struct_pattern(['int64', 'int64']))
00143 self.assertEquals("f", compute_struct_pattern(['float32']))
00144 self.assertEquals("ff", compute_struct_pattern(['float32', 'float32']))
00145 self.assertEquals("d", compute_struct_pattern(['float64']))
00146 self.assertEquals("dd", compute_struct_pattern(['float64', 'float64']))
00147
00148 self.assertEquals("bBhHiIqQfd", compute_struct_pattern(['int8', 'uint8', 'int16', 'uint16', 'int32', 'uint32', 'int64', 'uint64', 'float32', 'float64']))
00149
00150 def test_flatten(self):
00151 from roslib.msgs import register, MsgSpec
00152 from roslib.genpy import flatten
00153
00154 simple = MsgSpec(['string'], ['data'], [], 'string data\n')
00155 simple2 = MsgSpec(['string', 'int32'], ['data', 'data2'], [], 'string data\nint32 data2\n')
00156 self.assertEquals(simple, flatten(simple))
00157 self.assertEquals(simple2, flatten(simple2))
00158
00159 b1 = MsgSpec(['int8'], ['data'], [], 'X')
00160 b2 = MsgSpec(['f_msgs/Base'], ['data'], [], 'X')
00161 b3 = MsgSpec(['f_msgs/Base2', 'f_msgs/Base2'], ['data3', 'data4'], [], 'X')
00162 b4 = MsgSpec(['f_msgs/Base3', 'f_msgs/Base3'], ['dataA', 'dataB'], [], 'X')
00163 register('f_msgs/Base', b1)
00164 register('f_msgs/Base2', b2)
00165 register('f_msgs/Base3', b3)
00166 register('f_msgs/Base4', b4)
00167
00168 self.assertEquals(MsgSpec(['int8'], ['data.data'], [], 'X'), flatten(b2))
00169 self.assertEquals(MsgSpec(['int8', 'int8'], ['data3.data.data', 'data4.data.data'], [], 'X'), flatten(b3))
00170 self.assertEquals(MsgSpec(['int8', 'int8', 'int8', 'int8'],
00171 ['dataA.data3.data.data', 'dataA.data4.data.data', 'dataB.data3.data.data', 'dataB.data4.data.data'],
00172 [], 'X'), flatten(b4))
00173
00174 def test_numpy_dtype(self):
00175 for t in roslib.genpy.SIMPLE_TYPES:
00176 self.assert_(t in roslib.genpy._NUMPY_DTYPE)
00177
00178 def test_default_value(self):
00179 from roslib.msgs import register, MsgSpec
00180 from roslib.genpy import default_value
00181
00182 register('fake_msgs/String', MsgSpec(['string'], ['data'], [], 'string data\n'))
00183 register('fake_msgs/ThreeNums', MsgSpec(['int32', 'int32', 'int32'], ['x', 'y', 'z'], [], 'int32 x\nint32 y\nint32 z\n'))
00184
00185
00186 for t in roslib.msgs.BUILTIN_TYPES:
00187 self.assert_(type(default_value(t, 'roslib')) == str)
00188
00189
00190 for t in ['uint8', 'int8', 'uint16', 'int16', 'uint32', 'int32', 'uint64', 'int64', 'byte', 'char']:
00191 self.assertEquals('0', default_value(t, 'std_msgs'))
00192 self.assertEquals('0', default_value(t, 'roslib'))
00193 for t in ['float32', 'float64']:
00194 self.assertEquals('0.', default_value(t, 'std_msgs'))
00195 self.assertEquals('0.', default_value(t, 'roslib'))
00196 self.assertEquals("''", default_value('string', 'roslib'))
00197
00198
00199 self.assertEquals('roslib.rostime.Time()', default_value('time', 'roslib'))
00200 self.assertEquals('roslib.rostime.Duration()', default_value('duration', 'roslib'))
00201 self.assertEquals('std_msgs.msg._Header.Header()', default_value('Header', 'roslib'))
00202
00203 self.assertEquals('roslib.rostime.Time()', default_value('time', 'std_msgs'))
00204 self.assertEquals('roslib.rostime.Duration()', default_value('duration', 'std_msgs'))
00205 self.assertEquals('std_msgs.msg._Header.Header()', default_value('Header', 'std_msgs'))
00206
00207
00208
00209 self.assertEquals(None, default_value("unknown_msgs/Foo", "unknown_msgs"))
00210
00211 self.assertEquals(None, default_value('ThreeNums', 'std_msgs'))
00212
00213
00214 self.assertEquals('fake_msgs.msg.String()', default_value('fake_msgs/String', 'std_msgs'))
00215 self.assertEquals('fake_msgs.msg.String()', default_value('fake_msgs/String', 'fake_msgs'))
00216 self.assertEquals('fake_msgs.msg.String()', default_value('String', 'fake_msgs'))
00217 self.assertEquals('fake_msgs.msg.ThreeNums()', default_value('fake_msgs/ThreeNums', 'roslib'))
00218 self.assertEquals('fake_msgs.msg.ThreeNums()', default_value('fake_msgs/ThreeNums', 'fake_msgs'))
00219 self.assertEquals('fake_msgs.msg.ThreeNums()', default_value('ThreeNums', 'fake_msgs'))
00220
00221
00222 for t in ['int8', 'uint16', 'int16', 'uint32', 'int32', 'uint64', 'int64', 'float32', 'float64', 'char']:
00223 self.assertEquals('[]', default_value(t+'[]', 'std_msgs'))
00224 self.assertEquals('[]', default_value(t+'[]', 'roslib'))
00225
00226 self.assertEquals("''", default_value('uint8[]', 'roslib'))
00227 self.assertEquals("''", default_value('byte[]', 'roslib'))
00228
00229
00230 for t in ['float32', 'float64']:
00231 self.assertEquals('[0.,0.,0.]', default_value(t+'[3]', 'std_msgs'))
00232 self.assertEquals('[0.]', default_value(t+'[1]', 'std_msgs'))
00233 for t in ['int8', 'uint16', 'int16', 'uint32', 'int32', 'uint64', 'int64', 'char']:
00234 self.assertEquals('[0,0,0,0]', default_value(t+'[4]', 'std_msgs'))
00235 self.assertEquals('[0]', default_value(t+'[1]', 'roslib'))
00236
00237 self.assertEquals("chr(0)*1", default_value('uint8[1]', 'roslib'))
00238 self.assertEquals("chr(0)*4", default_value('uint8[4]', 'roslib'))
00239 self.assertEquals("chr(0)*1", default_value('byte[1]', 'roslib'))
00240 self.assertEquals("chr(0)*4", default_value('byte[4]', 'roslib'))
00241
00242 self.assertEquals('[]', default_value('fake_msgs/String[]', 'std_msgs'))
00243 self.assertEquals('[fake_msgs.msg.String(),fake_msgs.msg.String()]', default_value('fake_msgs/String[2]', 'std_msgs'))
00244
00245 def test_make_python_safe(self):
00246 from roslib.msgs import register, MsgSpec, Constant
00247 from roslib.genpy import make_python_safe
00248 s = MsgSpec(['int32', 'int32', 'int32', 'int32'], ['ok', 'if', 'self', 'fine'],
00249 [Constant('int32', 'if', '1', '1'), Constant('int32', 'okgo', '1', '1')],
00250 'x')
00251 s2 = make_python_safe(s)
00252 self.assertNotEquals(s, s2)
00253 self.assertEquals(['ok', 'if_', 'self_', 'fine'], s2.names)
00254 self.assertEquals(s2.types, s.types)
00255 self.assertEquals([Constant('int32', 'if_', '1', '1'), Constant('int32', 'okgo', '1', '1')], s2.constants)
00256 self.assertEquals(s2.text, s.text)
00257
00258 def test_compute_pkg_type(self):
00259 from roslib.genpy import compute_pkg_type, MsgGenerationException
00260 try:
00261 compute_pkg_type('std_msgs', 'really/bad/std_msgs/String')
00262 except MsgGenerationException: pass
00263 self.assertEquals(('std_msgs', 'String'), compute_pkg_type('std_msgs', 'std_msgs/String'))
00264 self.assertEquals(('std_msgs', 'String'), compute_pkg_type('foo', 'std_msgs/String'))
00265 self.assertEquals(('std_msgs', 'String'), compute_pkg_type('std_msgs', 'String'))
00266
00267 def test_compute_import(self):
00268 from roslib.msgs import register, MsgSpec
00269 from roslib.genpy import compute_import
00270
00271 self.assertEquals([], compute_import('foo', 'bar'))
00272 self.assertEquals([], compute_import('foo', 'int32'))
00273
00274 register('ci_msgs/Base', MsgSpec(['int8'], ['data'], [], 'int8 data\n'))
00275 register('ci2_msgs/Base2', MsgSpec(['ci_msgs/Base'], ['data2'], [], 'ci_msgs/Base data2\n'))
00276 register('ci3_msgs/Base3', MsgSpec(['ci2_msgs/Base2'], ['data3'], [], 'ci2_msgs/Base2 data3\n'))
00277 register('ci4_msgs/Base', MsgSpec(['int8'], ['data'], [], 'int8 data\n'))
00278 register('ci4_msgs/Base4', MsgSpec(['ci2_msgs/Base2', 'ci3_msgs/Base3', 'ci4_msgs/Base'],
00279 ['data4a', 'data4b', 'data4c'],
00280 [], 'ci2_msgs/Base2 data4a\nci3_msgs/Base3 data4b\nci4_msgs/Base data4c\n'))
00281
00282 register('ci5_msgs/Base', MsgSpec(['time'], ['data'], [], 'time data\n'))
00283
00284 self.assertEquals(['import ci_msgs.msg'], compute_import('foo', 'ci_msgs/Base'))
00285 self.assertEquals(['import ci_msgs.msg'], compute_import('ci_msgs', 'ci_msgs/Base'))
00286 self.assertEquals(['import ci2_msgs.msg', 'import ci_msgs.msg'], compute_import('ci2_msgs', 'ci2_msgs/Base2'))
00287 self.assertEquals(['import ci2_msgs.msg', 'import ci_msgs.msg'], compute_import('foo', 'ci2_msgs/Base2'))
00288 self.assertEquals(['import ci3_msgs.msg', 'import ci2_msgs.msg', 'import ci_msgs.msg'], compute_import('ci3_msgs', 'ci3_msgs/Base3'))
00289
00290 self.assertEquals(set(['import ci4_msgs.msg', 'import ci3_msgs.msg', 'import ci2_msgs.msg', 'import ci_msgs.msg']),
00291 set(compute_import('foo', 'ci4_msgs/Base4')))
00292 self.assertEquals(set(['import ci4_msgs.msg', 'import ci3_msgs.msg', 'import ci2_msgs.msg', 'import ci_msgs.msg']),
00293 set(compute_import('ci4_msgs', 'ci4_msgs/Base4')))
00294
00295 self.assertEquals(['import ci4_msgs.msg'], compute_import('foo', 'ci4_msgs/Base'))
00296 self.assertEquals(['import ci4_msgs.msg'], compute_import('ci4_msgs', 'ci4_msgs/Base'))
00297 self.assertEquals(['import ci4_msgs.msg'], compute_import('ci4_msgs', 'Base'))
00298
00299 self.assertEquals(['import ci5_msgs.msg', 'import roslib.rostime'], compute_import('foo', 'ci5_msgs/Base'))
00300
00301 def test_get_registered_ex(self):
00302 from roslib.msgs import MsgSpec, register
00303 from roslib.genpy import MsgGenerationException, get_registered_ex
00304 s = MsgSpec(['string'], ['data'], [], 'string data\n')
00305 register('tgr_msgs/String', s)
00306 self.assertEquals(s, get_registered_ex('tgr_msgs/String'))
00307 try:
00308 get_registered_ex('bad_msgs/String')
00309 except MsgGenerationException: pass
00310
00311 def test_compute_constructor(self):
00312 from roslib.msgs import register, MsgSpec
00313 from roslib.genpy import compute_constructor
00314 register('fake_msgs/String', MsgSpec(['string'], ['data'], [], 'string data\n'))
00315 register('fake_msgs/ThreeNums', MsgSpec(['int32', 'int32', 'int32'], ['x', 'y', 'z'], [], 'int32 x\nint32 y\nint32 z\n'))
00316
00317
00318 self.assertEquals('roslib.rostime.Time()', compute_constructor('roslib', 'time'))
00319 self.assertEquals('roslib.rostime.Duration()', compute_constructor('roslib', 'duration'))
00320 self.assertEquals('std_msgs.msg._Header.Header()', compute_constructor('std_msgs', 'Header'))
00321
00322 self.assertEquals('roslib.rostime.Time()', compute_constructor('std_msgs', 'time'))
00323 self.assertEquals('roslib.rostime.Duration()', compute_constructor('std_msgs', 'duration'))
00324
00325
00326
00327 self.assertEquals(None, compute_constructor("unknown_msgs", "unknown_msgs/Foo"))
00328 self.assertEquals(None, compute_constructor("unknown_msgs", "Foo"))
00329
00330 self.assertEquals(None, compute_constructor('std_msgs', 'ThreeNums'))
00331
00332
00333 self.assertEquals('fake_msgs.msg.String()', compute_constructor('std_msgs', 'fake_msgs/String'))
00334 self.assertEquals('fake_msgs.msg.String()', compute_constructor('fake_msgs', 'fake_msgs/String'))
00335 self.assertEquals('fake_msgs.msg.String()', compute_constructor('fake_msgs', 'String'))
00336 self.assertEquals('fake_msgs.msg.ThreeNums()', compute_constructor('fake_msgs', 'fake_msgs/ThreeNums'))
00337 self.assertEquals('fake_msgs.msg.ThreeNums()', compute_constructor('fake_msgs', 'fake_msgs/ThreeNums'))
00338 self.assertEquals('fake_msgs.msg.ThreeNums()', compute_constructor('fake_msgs', 'ThreeNums'))
00339
00340 def test_pack(self):
00341 from roslib.genpy import pack
00342 self.assertEquals("buff.write(_struct_3lL3bB.pack(foo, bar))", pack('lllLbbbB', 'foo, bar'))
00343
00344 def test_pack2(self):
00345 from roslib.genpy import pack2
00346 self.assertEquals('buff.write(struct.pack(patt_name, foo, bar))', pack2('patt_name', 'foo, bar'))
00347
00348 def test_unpack(self):
00349 from roslib.genpy import unpack
00350 self.assertEquals("var_x = _struct_I3if2I.unpack(bname)", unpack('var_x', 'IiiifII', 'bname'))
00351
00352 def test_unpack2(self):
00353 from roslib.genpy import unpack2
00354 self.assertEquals('x = struct.unpack(patt, b)', unpack2('x', 'patt', 'b'))
00355
00356 def test_generate_dynamic(self):
00357 from roslib.genpy import generate_dynamic
00358 msgs = generate_dynamic("gd_msgs/EasyString", "string data\n")
00359 self.assertEquals(['gd_msgs/EasyString'], msgs.keys())
00360 m_cls = msgs['gd_msgs/EasyString']
00361 m_instance = m_cls()
00362 m_instance.data = 'foo'
00363 buff = cStringIO.StringIO()
00364 m_instance.serialize(buff)
00365 m_cls().deserialize(buff.getvalue())
00366
00367
00368 msgs = generate_dynamic("gd_msgs/MoveArmState", """Header header
00369 probot_msgs/ControllerStatus status
00370
00371 #Current arm configuration
00372 probot_msgs/JointState[] configuration
00373 #Goal arm configuration
00374 probot_msgs/JointState[] goal
00375
00376 ================================================================================
00377 MSG: std_msgs/Header
00378 #Standard metadata for higher-level flow data types
00379 #sequence ID: consecutively increasing ID
00380 uint32 seq
00381 #Two-integer timestamp that is expressed as:
00382 # * stamp.secs: seconds (stamp_secs) since epoch
00383 # * stamp.nsecs: nanoseconds since stamp_secs
00384 # time-handling sugar is provided by the client library
00385 time stamp
00386 #Frame this data is associated with
00387 # 0: no frame
00388 # 1: global frame
00389 string frame_id
00390
00391 ================================================================================
00392 MSG: probot_msgs/ControllerStatus
00393 # This message defines the expected format for Controller Statuss messages
00394 # Embed this in the feedback state message of highlevel controllers
00395 byte UNDEFINED=0
00396 byte SUCCESS=1
00397 byte ABORTED=2
00398 byte PREEMPTED=3
00399 byte ACTIVE=4
00400
00401 # Status of the controller = {UNDEFINED, SUCCESS, ABORTED, PREEMPTED, ACTIVE}
00402 byte value
00403
00404 #Comment for debug
00405 string comment
00406 ================================================================================
00407 MSG: probot_msgs/JointState
00408 string name
00409 float64 position
00410 float64 velocity
00411 float64 applied_effort
00412 float64 commanded_effort
00413 byte is_calibrated
00414
00415 """)
00416 self.assertEquals(set(['gd_msgs/MoveArmState', 'probot_msgs/JointState', 'probot_msgs/ControllerStatus', 'std_msgs/Header']),
00417 set(msgs.keys()))
00418 import roslib.rostime
00419 import time
00420 m_instance1 = msgs['std_msgs/Header']()
00421 m_instance2 = msgs['std_msgs/Header'](stamp=roslib.rostime.Time.from_seconds(time.time()), frame_id='foo-%s'%time.time(), seq=12391)
00422 self._test_ser_deser(m_instance2, m_instance1)
00423
00424 m_instance1 = msgs['probot_msgs/ControllerStatus']()
00425 m_instance2 = msgs['probot_msgs/ControllerStatus'](value=4, comment=str(time.time()))
00426 d = {'UNDEFINED':0,'SUCCESS':1,'ABORTED':2,'PREEMPTED':3,'ACTIVE':4}
00427 for k, v in d.iteritems():
00428 self.assertEquals(v, getattr(m_instance1, k))
00429 self._test_ser_deser(m_instance2, m_instance1)
00430
00431 m_instance1 = msgs['probot_msgs/JointState']()
00432 m_instance2 = msgs['probot_msgs/JointState'](position=time.time(), velocity=time.time(), applied_effort=time.time(), commanded_effort=time.time(), is_calibrated=2)
00433 self._test_ser_deser(m_instance2, m_instance1)
00434
00435 m_instance1 = msgs['gd_msgs/MoveArmState']()
00436 js = msgs['probot_msgs/JointState']
00437 config = []
00438 goal = []
00439
00440 for i in range(0, 10):
00441 config.append(js(position=time.time(), velocity=time.time(), applied_effort=time.time(), commanded_effort=time.time(), is_calibrated=2))
00442 goal.append(js(position=time.time(), velocity=time.time(), applied_effort=time.time(), commanded_effort=time.time(), is_calibrated=2))
00443 m_instance2 = msgs['gd_msgs/MoveArmState'](header=msgs['std_msgs/Header'](),
00444 status=msgs['probot_msgs/ControllerStatus'](),
00445 configuration=config, goal=goal)
00446 self._test_ser_deser(m_instance2, m_instance1)
00447
00448 def _test_ser_deser(self, m_instance1, m_instance2):
00449 buff = cStringIO.StringIO()
00450 m_instance1.serialize(buff)
00451 m_instance2.deserialize(buff.getvalue())
00452 self.assertEquals(m_instance1, m_instance2)
00453
00454 def test_len_serializer_generator(self):
00455
00456 from roslib.genpy import len_serializer_generator
00457
00458
00459 g = len_serializer_generator('foo', True, True)
00460 self.assertEquals('length = len(foo)', '\n'.join(g))
00461
00462 g = len_serializer_generator('foo', False, True)
00463 self.assertEquals("length = len(foo)\nbuff.write(_struct_I.pack(length))", '\n'.join(g))
00464
00465
00466 val = """start = end
00467 end += 4
00468 (length,) = _struct_I.unpack(str[start:end])"""
00469
00470 g = len_serializer_generator('foo', True, False)
00471 self.assertEquals(val, '\n'.join(g))
00472 g = len_serializer_generator('foo', False, False)
00473 self.assertEquals(val, '\n'.join(g))
00474
00475 def test_string_serializer_generator(self):
00476
00477 from roslib.genpy import string_serializer_generator
00478
00479 g = string_serializer_generator('foo', 'string', 'var_name', True)
00480 self.assertEquals("""length = len(var_name)
00481 buff.write(struct.pack('<I%ss'%length, length, var_name))""", '\n'.join(g))
00482
00483 for t in ['uint8[]', 'byte[]', 'uint8[10]', 'byte[20]']:
00484 g = string_serializer_generator('foo', 'uint8[]', 'b_name', True)
00485 self.assertEquals("""length = len(b_name)
00486 # - if encoded as a list instead, serialize as bytes instead of string
00487 if type(b_name) in [list, tuple]:
00488 buff.write(struct.pack('<I%sB'%length, length, *b_name))
00489 else:
00490 buff.write(struct.pack('<I%ss'%length, length, b_name))""", '\n'.join(g))
00491
00492
00493 val = """start = end
00494 end += 4
00495 (length,) = _struct_I.unpack(str[start:end])
00496 start = end
00497 end += length
00498 var_name = str[start:end]"""
00499
00500 g = string_serializer_generator('foo', 'string', 'var_name', False)
00501 self.assertEquals(val, '\n'.join(g))
00502
00503 if __name__ == '__main__':
00504 rosunit.unitrun(PKG, NAME, TestGenpy, sys.argv, coverage_packages=['roslib.genpy'])