00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 PKG = 'test_roslib_comm'
00034 import roslib; roslib.load_manifest(PKG)
00035
00036 import os
00037 import sys
00038 import time
00039 import unittest
00040 import traceback
00041
00042 import roslib.message
00043 from roslib.rostime import Time, Duration
00044 import rosunit
00045
00046
00047
00048 class MessageTest(unittest.TestCase):
00049
00050 def test_check_types_Header(self):
00051
00052
00053
00054 from test_roslib_comm.msg import HeaderTest
00055 x = HeaderTest()
00056 x._check_types()
00057
00058 def test_Message_check_types(self):
00059
00060
00061 from std_msgs.msg import String, UInt16MultiArray, MultiArrayLayout, MultiArrayDimension
00062 from roslib.message import SerializationError
00063
00064 correct = [String(), String('foo'), String(''), String(data='data'),
00065 UInt16MultiArray(),
00066 UInt16MultiArray(MultiArrayLayout(), []),
00067 UInt16MultiArray(MultiArrayLayout(data_offset=1), [1, 2, 3]),
00068 UInt16MultiArray(layout=MultiArrayLayout(data_offset=1)),
00069 UInt16MultiArray(layout=MultiArrayLayout(dim=[])),
00070 UInt16MultiArray(layout=MultiArrayLayout(dim=[MultiArrayDimension()])),
00071 UInt16MultiArray(data=[1, 2, 3]),
00072 ]
00073 for t in correct:
00074 t._check_types()
00075 for t in correct:
00076 try:
00077 t._check_types(exc=Exception())
00078 self.fail("should have raised wrapped exc")
00079 except SerializationError:
00080 pass
00081
00082 wrong = [String(1), String(data=1),
00083 UInt16MultiArray(1, []),
00084 UInt16MultiArray(MultiArrayLayout(), 1),
00085 UInt16MultiArray(String(), []),
00086 UInt16MultiArray(layout=MultiArrayLayout(dim=[1])),
00087 UInt16MultiArray(layout=MultiArrayLayout(data_offset='')),
00088 ]
00089 for t in wrong:
00090 try:
00091 t._check_types()
00092 self.fail("should have raised")
00093 except SerializationError:
00094 pass
00095
00096 def test_check_types_valid(self):
00097 '''Test directly a bunch of valid combinations to check_types.
00098
00099 check_type will throw an exception when it fails
00100 '''
00101 roslib.message.check_type('test', 'uint8[]', 'byteDataIsAStringInPy')
00102 roslib.message.check_type('test', 'char[]', 'byteDataIsAStringInPy')
00103 roslib.message.check_type('test', 'uint8[]', [3,4,5])
00104 roslib.message.check_type('test', 'uint8[]', (3,4,5))
00105 roslib.message.check_type('test', 'char[]', [3,4,5])
00106 roslib.message.check_type('test', 'int32[]', [3,4,5])
00107 roslib.message.check_type('test', 'int32', -5)
00108 roslib.message.check_type('test', 'int64', -5)
00109 roslib.message.check_type('test', 'int16', -5)
00110 roslib.message.check_type('test', 'int8', -5)
00111 roslib.message.check_type('test', 'uint32', 5)
00112 roslib.message.check_type('test', 'uint64', 5)
00113 roslib.message.check_type('test', 'uint16', 5)
00114 roslib.message.check_type('test', 'uint8', 5)
00115 roslib.message.check_type('test', 'bool', True)
00116 roslib.message.check_type('test', 'bool', False)
00117 roslib.message.check_type('test', 'bool', 0)
00118 roslib.message.check_type('test', 'bool', 1)
00119 roslib.message.check_type('test', 'string', 'IAmAString')
00120 roslib.message.check_type('test', 'time', Time())
00121 roslib.message.check_type('test', 'duration', Duration(5))
00122
00123 def test_check_types_invalid(self):
00124 from roslib.message import SerializationError
00125 self.assertRaises(SerializationError, roslib.message.check_type,
00126 'test', 'int32[]', 'someString')
00127 self.assertRaises(SerializationError, roslib.message.check_type,
00128 'test', 'uint32[]', [3, -2, 4])
00129 self.assertRaises(SerializationError, roslib.message.check_type,
00130 'test', 'uint8', -2)
00131 self.assertRaises(SerializationError, roslib.message.check_type,
00132 'test', 'uint16', -2)
00133 self.assertRaises(SerializationError, roslib.message.check_type,
00134 'test', 'uint32', -2)
00135 self.assertRaises(SerializationError, roslib.message.check_type,
00136 'test', 'uint64', -2)
00137 self.assertRaises(SerializationError, roslib.message.check_type,
00138 'test', 'bool', -2)
00139 self.assertRaises(SerializationError, roslib.message.check_type,
00140 'test', 'bool', 2)
00141 self.assertRaises(SerializationError, roslib.message.check_type,
00142 'test', 'string', u'UnicodeString')
00143
00144 def test_Message(self):
00145 import cStringIO
00146 from roslib.message import Message, SerializationError
00147 self.assert_(isinstance(Message(), Message))
00148 m = Message()
00149 b = cStringIO.StringIO()
00150 m.serialize(b)
00151 m.deserialize('')
00152
00153
00154 try:
00155 Message(1, 2, 3, one=1, two=2, three=3)
00156 self.fail("Message should not allow *args and **kwds")
00157 except TypeError: pass
00158 try:
00159 Message()._get_types()
00160 self.fail("_get_types() should not be callable on abstract Message instance")
00161 except: pass
00162
00163
00164 class M1(Message):
00165 __slots__ = []
00166 _slot_types=[]
00167 def __init__(self, *args, **kwds):
00168 super(M1, self).__init__(*args, **kwds)
00169 def _get_types(self): return []
00170
00171
00172 self.assertEquals('', str(M1()))
00173
00174 M1()._check_types()
00175
00176 try:
00177 M1()._check_types(Exception("test"))
00178 self.fail("_check_types must fail if explicitly provided an exception")
00179 except SerializationError: pass
00180
00181
00182 class M2(Message):
00183 __slots__ = ['a', 'b']
00184 _slot_types=['int32', 'int32']
00185 def _get_types(self): return ['int32', 'int32']
00186 def __init__(self, *args, **kwds):
00187 super(M2, self).__init__(*args, **kwds)
00188 self.assertEquals('a: 1\nb: 2', str(M2(1, 2)))
00189
00190 M2(1, 2)._check_types()
00191 M2(a=1, b=2)._check_types()
00192 invalid = [M2(a=1), M2('1', '2'), M2(1, '2'), M2(1., 2.), M2(None, 2)]
00193 for m in invalid:
00194 try:
00195 m._check_types()
00196 self.fail("check_types for %s should have failed"%m)
00197 except SerializationError: pass
00198
00199
00200 valid = [
00201 ((), {}, M1),
00202 ((), {}, M2),
00203 ((1, 2), {}, M2),
00204 ((), {'a': 1, 'b': 2}, M2),
00205 ((), {'a': 1}, M2),((), {'b': 2}, M2),
00206 ]
00207 invalid = [
00208 ((1,), {}, M1),
00209 ((), {'one': 1}, M1),
00210 ((1), {}, M2),((1, 2, 3), {}, M2),
00211 ((), {'c': 1}, M2),((), {'a': 1, 'b': 2, 'c': 1}, M2),
00212 ]
00213 for args, kwds, cls in valid:
00214 cls(*args, **kwds)
00215 val = time.time()
00216 val2 = time.time()
00217 self.assertEquals(val, M2(val, 2).a)
00218 self.assertEquals(val, M2(1, val).b)
00219 self.assertEquals(val, M2(a=val).a)
00220 self.assertEquals(None, M2(a=val).b)
00221 self.assertEquals(None, M2(b=val).a)
00222 self.assertEquals(val, M2(b=val).b)
00223 self.assertEquals(val, M2(a=val, b=val2).a)
00224 self.assertEquals(val2, M2(a=val, b=val2).b)
00225 for args, kwds, cls in invalid:
00226 try:
00227 cls(*args, **kwds)
00228 self.fail("Message should have failed for cls[%s] *args[%s] and **kwds[%s]"%(cls, args, kwds))
00229 except: pass
00230
00231 def test_strify_message(self):
00232
00233 from roslib.message import Message, strify_message
00234 class M1(Message):
00235 __slots__ = []
00236 _slot_types = []
00237 def __init__(self): pass
00238 self.assertEquals('', strify_message(M1()))
00239 class M2(Message):
00240 __slots__ = ['str', 'int', 'float', 'bool', 'list']
00241 _slot_types = ['string', 'int32', 'float32', 'bool', 'int32[]']
00242 def __init__(self, str_, int_, float_, bool_, list_):
00243 self.str = str_
00244 self.int = int_
00245 self.float = float_
00246 self.bool = bool_
00247 self.list = list_
00248
00249 self.assertEquals("""str: string
00250 int: 123456789101112
00251 float: 5678.0
00252 bool: True
00253 list: [1, 2, 3]""", strify_message(M2('string', 123456789101112, 5678., True, [1,2,3])))
00254
00255 self.assertEquals("""str: ''
00256 int: -1
00257 float: 0.0
00258 bool: False
00259 list: []""", strify_message(M2('', -1, 0., False, [])))
00260
00261 class M3(Message):
00262 __slots__ = ['m2']
00263 _slot_types=['M1']
00264 def __init__(self, m2):
00265 self.m2 = m2
00266 self.assertEquals("""m2:
00267 str: string
00268 int: -1
00269 float: 0.0
00270 bool: False
00271 list: []""", strify_message(M3(M2('string', -1, 0., False, []))))
00272
00273
00274 class M4(Message):
00275 __slots__ = ['m2s']
00276 _slot_types=['M2[]']
00277 def __init__(self, m2s):
00278 self.m2s = m2s
00279
00280 self.assertEquals("""m2s:
00281 -
00282 str: string
00283 int: 1234
00284 float: 5678.0
00285 bool: True
00286 list: [1, 2, 3]
00287 -
00288 str: string
00289 int: -1
00290 float: 0.0
00291 bool: False
00292 list: []""", strify_message(M4([
00293 M2('string', 1234, 5678., True, [1,2,3]),
00294 M2('string', -1, 0., False, []),
00295 ])))
00296
00297 from roslib.rostime import Time, Duration
00298 class M5(Message):
00299 __slots__ = ['t', 'd']
00300 _slot_types=['time', 'duration']
00301 def __init__(self, t, d):
00302 self.t = t
00303 self.d = d
00304 self.assertEquals("""t:
00305 secs: 987
00306 nsecs: 654
00307 d:
00308 secs: 123
00309 nsecs: 456""", strify_message(M5(Time(987, 654), Duration(123, 456))))
00310
00311
00312 self.assertEquals("set([1])", strify_message(set([1])))
00313
00314 def test_strify_yaml(self):
00315 import yaml
00316 def roundtrip(m):
00317 yaml_text = strify_message(m)
00318 print yaml_text
00319 loaded = yaml.load(yaml_text)
00320 print "loaded", loaded
00321 new_inst = m.__class__()
00322 if loaded is not None:
00323 fill_message_args(new_inst, [loaded])
00324 else:
00325 fill_message_args(new_inst, [])
00326 return new_inst
00327
00328
00329
00330 from roslib.message import Message, strify_message, fill_message_args
00331 class M1(Message):
00332 __slots__ = []
00333 _slot_types=[]
00334 def __init__(self): pass
00335 self.assertEquals(M1(), roundtrip(M1()))
00336
00337 class M2(Message):
00338 __slots__ = ['str', 'int', 'float', 'bool', 'list']
00339 _slot_types = ['string', 'int32', 'float32', 'bool', 'int32[]']
00340 def __init__(self, str_=None, int_=None, float_=None, bool_=None, list_=None):
00341 self.str = str_
00342 self.int = int_
00343 self.float = float_
00344 self.bool = bool_
00345 self.list = list_
00346
00347 val = M2('string', 123456789101112, 5678., True, [1,2,3])
00348 self.assertEquals(val, roundtrip(val))
00349
00350 val = M2('', -1, 0., False, [])
00351 self.assertEquals(val, roundtrip(val))
00352
00353 class M3(Message):
00354 __slots__ = ['m2']
00355 _slot_types=['test_roslib/M2']
00356 def __init__(self, m2=None):
00357 self.m2 = m2 or M2()
00358
00359 val = M3(M2('string', -1, 0., False, []))
00360 self.assertEquals(val, roundtrip(val))
00361
00362
00363 from test_roslib_comm.msg import ArrayOfMsgs
00364 from std_msgs.msg import String, Time, MultiArrayLayout, MultiArrayDimension
00365 dims1 = [MultiArrayDimension(*args) for args in [('', 0, 0), ('x', 1, 2), ('y of z', 3, 4)]]
00366 dims2 = [MultiArrayDimension('hello world', 91280, 1983274)]
00367 times = [Time(roslib.rostime.Time(*args)) for args in [(0,), (12345, 6789), (1, 1)]]
00368 val = ArrayOfMsgs([String(''), String('foo'), String('bar of soap')],
00369 times,
00370 [MultiArrayLayout(dims1, 0), MultiArrayLayout(dims2, 12354)],
00371 )
00372 self.assertEquals(val, roundtrip(val))
00373
00374
00375 def test_ServiceDefinition(self):
00376 from roslib.message import ServiceDefinition
00377 self.assert_(isinstance(ServiceDefinition(), ServiceDefinition))
00378
00379 def test_check_type(self):
00380
00381
00382
00383 from roslib.message import check_type, SerializationError
00384 from roslib.rostime import Time, Duration
00385 valids = [
00386 ('byte', 1), ('byte', -1),
00387 ('string', ''), ('string', 'a string of text'),
00388 ('int32[]', []),
00389 ('int32[]', [1, 2, 3, 4]),
00390 ('time', Time()), ('time', Time.from_seconds(1.0)),
00391 ('time', Time(10000)), ('time', Time(1000, -100)),
00392 ('duration', Duration()),('duration', Duration()),
00393 ('duration', Duration(100)), ('duration', Duration(-100, -100)),
00394 ]
00395
00396 for t, v in valids:
00397 try:
00398 check_type('n', t, v)
00399 except Exception, e:
00400 traceback.print_exc()
00401 raise Exception("failure type[%s] value[%s]: %s"%(t, v, str(e)))
00402
00403 invalids = [
00404 ('byte', 129), ('byte', -129), ('byte', 'byte'), ('byte', 1.0),
00405 ('string', 1),
00406 ('uint32', -1),
00407 ('int8', 112312), ('int8', -112312),
00408 ('uint8', -1), ('uint8', 112312),
00409 ('int32', '1'), ('int32', 1.),
00410 ('int32[]', 1), ('int32[]', [1., 2.]), ('int32[]', [1, 2.]),
00411 ('duration', 1), ('time', 1),
00412 ]
00413 for t, v in invalids:
00414 try:
00415 check_type('n', t, v)
00416 self.fail("check_type[%s, %s] should have failed"%(t, v))
00417 except SerializationError: pass
00418
00419 def test_get_message_class(self):
00420 from roslib.message import get_message_class
00421
00422 try:
00423 self.assertEquals(None, get_message_class('String'))
00424 self.fail("should have thrown ValueError")
00425 except ValueError: pass
00426
00427 self.assertEquals(None, get_message_class('fake/Fake'))
00428
00429 self.assertEquals(None, get_message_class('roslib/Fake'))
00430
00431 self.assertEquals(None, get_message_class('genmsg_cpp/Fake'))
00432
00433 import rosgraph_msgs.msg
00434 import std_msgs.msg
00435 self.assertEquals(std_msgs.msg.Header, get_message_class('Header'))
00436 self.assertEquals(std_msgs.msg.Header, get_message_class('std_msgs/Header'))
00437 self.assertEquals(rosgraph_msgs.msg.Log, get_message_class('rosgraph_msgs/Log'))
00438
00439 def test_fill_message_args_embed_time(self):
00440 from roslib.rostime import Time, Duration
00441 from roslib.message import fill_message_args
00442 from test_roslib_comm.msg import FillEmbedTime
00443
00444
00445
00446
00447
00448
00449
00450
00451 tests = [
00452
00453 ]
00454 m = FillEmbedTime()
00455 fill_message_args(m, [{}])
00456 self.assertEquals(m.t, Time())
00457 self.assertEquals(m.d, Duration())
00458 self.assertEquals(m.str_msg.data, '')
00459 self.assertEquals(m.str_msg_array, [])
00460 self.assertEquals(m.i32, 0)
00461
00462
00463
00464 equiv = [
00465 [[10, 20], [30, 40], ['foo'], [['bar'], ['baz']], 32],
00466 [{'secs': 10, 'nsecs': 20}, {'secs': 30, 'nsecs': 40}, ['foo'], [['bar'], ['baz']], 32],
00467 [[10, 20], [30, 40], {'data': 'foo'}, [['bar'], ['baz']], 32],
00468 [[10, 20], [30, 40], ['foo'], [{'data': 'bar'}, {'data': 'baz'}], 32],
00469
00470 [{'t': [10, 20], 'd': [30, 40], 'str_msg': {'data': 'foo'}, 'str_msg_array': [{'data': 'bar'}, {'data': 'baz'}], 'i32': 32}],
00471 [{'t': {'secs': 10, 'nsecs': 20}, 'd': [30, 40], 'str_msg': {'data': 'foo'}, 'str_msg_array': [{'data': 'bar'}, {'data': 'baz'}], 'i32': 32}],
00472 ]
00473 for test in equiv:
00474 m = FillEmbedTime()
00475 try:
00476 fill_message_args(m, test)
00477 except Exception, e:
00478 self.fail("failed to fill with : %s\n%s"%(str(test), traceback.format_exc()))
00479
00480 self.assertEquals(m.t, Time(10, 20))
00481 self.assertEquals(m.d, Duration(30, 40))
00482 self.assertEquals(m.str_msg.data, 'foo')
00483 self.assertEquals(len(m.str_msg_array), 2, m.str_msg_array)
00484 self.assertEquals(m.str_msg_array[0].data, 'bar')
00485 self.assertEquals(m.str_msg_array[1].data, 'baz')
00486 self.assertEquals(m.i32, 32)
00487
00488
00489
00490
00491 m = FillEmbedTime()
00492 fill_message_args(m, [10000000020, 30000000040, ['foo'], [['bar'], ['baz']], 32])
00493 self.assertEquals(10, m.t.secs)
00494 self.assert_(abs(20 - m.t.nsecs) < 2)
00495 self.assertEquals(30, m.d.secs)
00496 self.assert_(abs(40 - m.d.nsecs) < 2)
00497 self.assertEquals(len(m.str_msg_array), 2, m.str_msg_array)
00498 self.assertEquals(m.str_msg_array[0].data, 'bar')
00499 self.assertEquals(m.str_msg_array[1].data, 'baz')
00500 self.assertEquals(m.i32, 32)
00501
00502 bad = [
00503
00504 [[10, 20], [30, 40], ['foo'], [['bar'], ['baz']]],
00505 [[10], [30, 40], ['foo'], [['bar'], ['baz']], 32],
00506 [[10, 20], [30], ['foo'], [['bar'], ['baz']], 32],
00507 [[10, 20], [30, 40], [], [['bar'], ['baz']], 32],
00508 [[10, 20], [30, 40], ['foo'], [['bar'], []], 32],
00509
00510
00511 [[10, 20], [30, 40], ['foo'], [['bar'], ['baz']], 32, 64],
00512 [[10, 20, 30], [30, 40], ['foo'], [['bar'], ['baz']], 32],
00513 [[10, 20], [30, 40, 50], ['foo'], [['bar'], ['baz']], 32],
00514 [[10, 20], [30, 40], ['foo', 'bar'], [['bar'], ['baz']], 32],
00515 [[10, 20], [30, 40], ['foo'], [['bar', 'baz'], ['baz']], 32],
00516 [[10, 20], [30, 40], ['foo'], [['bar'], ['baz', 'car']], 32],
00517
00518
00519 [{'secs': 10, 'nsecs': 20, 'foo': 1}, {'secs': 30, 'nsecs': 40}, ['foo'], [['bar'], ['baz']], 32],
00520 [{'secs': 10, 'nsecs': 20}, {'secs': 30, 'nsecs': 40, 'foo': 1}, ['foo'], [['bar'], ['baz']], 32],
00521 [[10, 20], [30, 40], {'data': 'foo', 'fata': 1}, [['bar'], ['baz']], 32],
00522 [[10, 20], [30, 40], ['foo'], [{'data': 'bar'}, {'beta': 'baz'}], 32],
00523 [{'t': [10, 20], 'd': [30, 40], 'str_msg': {'data': 'foo'}, 'str_msg_array': [{'data': 'bar'}, {'data': 'baz'}], 'i32': 32, 'i64': 64}],
00524 ]
00525 for b in bad:
00526 failed = True
00527 try:
00528 m = FillEmbedTime()
00529 fill_message_args(m, b)
00530 except roslib.message.ROSMessageException:
00531 failed = False
00532 self.failIf(failed, "fill_message_args should have failed: %s"%str(b))
00533
00534
00535
00536 def test_fill_message_args_simple(self):
00537 from roslib.message import fill_message_args
00538 from test_roslib_comm.msg import FillSimple
00539
00540
00541
00542
00543
00544 for v in [[], {}]:
00545 try:
00546 fill_message_args(object(), v)
00547 self.fail("should have raised ValueError")
00548 except ValueError: pass
00549 try:
00550 m = FillSimple()
00551
00552 roslib.message._fill_message_args(m, 1, {}, '')
00553 self.fail("should have raised ValueError for bad msg_args")
00554 except ValueError: pass
00555
00556 simple_tests = [
00557 [1, 'foo', [], True],
00558 [1, 'foo', [1, 2, 3, 4], False],
00559 ]
00560 for test in simple_tests:
00561 m = FillSimple()
00562 fill_message_args(m, test)
00563 self.assertEquals(m.i32, test[0])
00564 self.assertEquals(m.str, test[1])
00565 self.assertEquals(m.i32_array, test[2])
00566 self.assertEquals(m.b, test[3])
00567
00568
00569 m = FillSimple()
00570 fill_message_args(m, [{}])
00571 self.assertEquals(m.i32, 0)
00572 self.assertEquals(m.str, '')
00573 self.assertEquals(m.i32_array, [])
00574 self.assertEquals(m.b, False)
00575
00576 m = FillSimple()
00577 fill_message_args(m, [{'i32': 10}])
00578 self.assertEquals(m.i32, 10)
00579 self.assertEquals(m.str, '')
00580 self.assertEquals(m.i32_array, [])
00581 self.assertEquals(m.b, False)
00582
00583 m = FillSimple()
00584 fill_message_args(m, [{'str': 'hello', 'i32_array': [1, 2, 3]}])
00585 self.assertEquals(m.i32, 0)
00586 self.assertEquals(m.str, 'hello')
00587 self.assertEquals(m.i32_array, [1, 2, 3])
00588 self.assertEquals(m.b, False)
00589
00590
00591 bad = [
00592
00593 [{'bad': 1, 'str': 'hello', 'i32_array': [1, 2, 3]}],
00594
00595 [1, 'foo', [1, 2, 3]],
00596
00597 [1, 'foo', [1, 2, 3], True, 1],
00598
00599 [1, 'foo', 1, True],
00600 ]
00601 for b in bad:
00602 failed = True
00603 try:
00604 m = FillSimple()
00605 fill_message_args(m, b)
00606 except roslib.message.ROSMessageException:
00607 failed = False
00608 self.failIf(failed, "fill_message_args should have failed: %s"%str(b))
00609
00610
00611 def test_get_service_class(self):
00612 from roslib.message import get_service_class
00613
00614
00615 self.assertEquals(None, get_service_class('fake/Fake'))
00616
00617 self.assertEquals(None, get_service_class('roslib/Fake'))
00618
00619 self.assertEquals(None, get_service_class('genmsg_cpp/Fake'))
00620
00621 import std_srvs.srv
00622 self.assertEquals(std_srvs.srv.Empty, get_service_class('std_srvs/Empty'))
00623
00624 if __name__ == '__main__':
00625 rosunit.unitrun(PKG, 'test_message', MessageTest, coverage_packages=['roslib.message'])
00626