00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 import roslib; roslib.load_manifest('test_rospy')
00037
00038 import os
00039 import sys
00040 import struct
00041 import unittest
00042 import time
00043
00044 import test_rospy.msg
00045
00046 def callback1(data): pass
00047 def callback2(data): pass
00048
00049
00050 import rospy.impl.transport
00051 class ConnectionOverride(rospy.impl.transport.Transport):
00052 def __init__(self, endpoint_id):
00053 super(ConnectionOverride, self).__init__(rospy.impl.transport.OUTBOUND, endpoint_id)
00054 self.endpoint_id = endpoint_id
00055 self.data = ''
00056
00057 def set_cleanup_callback(self, cb): pass
00058 def write_data(self, data):
00059 self.data = self.data + data
00060
00061
00062 class TestRospyTopics(unittest.TestCase):
00063
00064 def test_Publisher(self):
00065 import rospy
00066 from rospy.impl.registration import get_topic_manager, Registration
00067 from rospy.topics import Publisher, DEFAULT_BUFF_SIZE
00068
00069
00070 name = 'foo'
00071 rname = rospy.resolve_name('foo')
00072 data_class = test_rospy.msg.Val
00073
00074
00075 for n in [None, '', 1]:
00076 try:
00077 Publisher(n, data_class)
00078 self.fail("should not allow invalid name")
00079 except ValueError: pass
00080 for d in [None, 1, TestRospyTopics]:
00081 try:
00082 Publisher(name, d)
00083 self.fail("should now allow invalid data_class")
00084 except ValueError: pass
00085 try:
00086 Publisher(name, None)
00087 self.fail("None should not be allowed for data_class")
00088 except ValueError: pass
00089
00090
00091 pub = Publisher(name, data_class)
00092 self.assertEquals(rname, pub.resolved_name)
00093
00094 self.assertEquals(rname, pub.name)
00095 self.assertEquals(data_class, pub.data_class)
00096 self.assertEquals('test_rospy/Val', pub.type)
00097 self.assertEquals(data_class._md5sum, pub.md5sum)
00098 self.assertEquals(Registration.PUB, pub.reg_type)
00099
00100
00101 impl = get_topic_manager().get_impl(Registration.PUB, rname)
00102 self.assert_(impl == pub.impl)
00103 self.assertEquals(rname, impl.resolved_name)
00104 self.assertEquals(data_class, impl.data_class)
00105 self.failIf(impl.is_latch)
00106 self.assertEquals(None, impl.latch)
00107 self.assertEquals(0, impl.seq)
00108 self.assertEquals(1, impl.ref_count)
00109 self.assertEquals('', impl.buff.getvalue())
00110 self.failIf(impl.closed)
00111 self.failIf(impl.has_connections())
00112
00113 from test_rospy.msg import Val
00114 impl.publish(Val('hello world-1'))
00115
00116
00117 self.assertEquals(0, impl.message_data_sent)
00118
00119 impl.acquire()
00120 impl.release()
00121
00122
00123
00124
00125 co1 = ConnectionOverride('co1')
00126 self.failIf(impl.has_connection('co1'))
00127 impl.add_connection(co1)
00128 self.assert_(impl.has_connection('co1'))
00129 self.assert_(impl.has_connections())
00130 impl.publish(Val('hello world-1'), connection_override=co1)
00131
00132 import cStringIO
00133 buff = cStringIO.StringIO()
00134 Val('hello world-1').serialize(buff)
00135
00136 self.assertEquals(co1.data[4:], buff.getvalue())
00137 self.assertEquals(None, impl.latch)
00138
00139
00140 pub = Publisher(name, data_class, latch=True)
00141 impl = get_topic_manager().get_impl(Registration.PUB, rname)
00142
00143 self.assert_(impl == pub.impl)
00144 self.assertEquals(True, impl.is_latch)
00145 self.assertEquals(None, impl.latch)
00146 self.assertEquals(2, impl.ref_count)
00147
00148 co2 = ConnectionOverride('co2')
00149 self.failIf(impl.has_connection('co2'))
00150 impl.add_connection(co2)
00151 for n in ['co1', 'co2']:
00152 self.assert_(impl.has_connection(n))
00153 self.assert_(impl.has_connections())
00154 v = Val('hello world-2')
00155 impl.publish(v, connection_override=co2)
00156 self.assert_(v == impl.latch)
00157
00158 buff = cStringIO.StringIO()
00159 Val('hello world-2').serialize(buff)
00160
00161 self.assertEquals(co2.data[4:], buff.getvalue())
00162
00163
00164 co3 = ConnectionOverride('co3')
00165 self.failIf(impl.has_connection('co3'))
00166 impl.add_connection(co3)
00167 for n in ['co1', 'co2', 'co3']:
00168 self.assert_(impl.has_connection(n))
00169 self.assert_(impl.has_connections())
00170 self.assertEquals(co3.data[4:], buff.getvalue())
00171
00172
00173
00174 self.assert_(impl.has_connection('co1'))
00175 impl.remove_connection(co1)
00176 self.failIf(impl.has_connection('co1'))
00177 self.assert_(impl.has_connections())
00178
00179 self.assert_([d for d in impl.dead_connections if d.endpoint_id == 'co1'])
00180
00181 self.assert_(impl.has_connection('co3'))
00182 impl.remove_connection(co3)
00183 self.failIf(impl.has_connection('co3'))
00184 self.assert_(impl.has_connections())
00185 for id in ['co1', 'co3']:
00186 self.assert_([d for d in impl.dead_connections if d.endpoint_id == id])
00187
00188 self.assert_(impl.has_connection('co2'))
00189 impl.remove_connection(co2)
00190 self.failIf(impl.has_connection('co2'))
00191 self.failIf(impl.has_connections())
00192 for id in ['co1', 'co2', 'co3']:
00193 self.assert_([d for d in impl.dead_connections if d.endpoint_id == id])
00194
00195
00196
00197 pub = Publisher('bar', data_class, latch=True)
00198 v = Val('no connection test')
00199 pub.impl.publish(v)
00200 self.assert_(v == pub.impl.latch)
00201
00202
00203 h = {'foo': 'bar', 'fuga': 'hoge'}
00204 pub = Publisher('header_test', data_class, headers=h)
00205 self.assertEquals(h, pub.impl.headers)
00206
00207 def test_Subscriber_unregister(self):
00208
00209
00210 import rospy
00211 from rospy.impl.registration import get_topic_manager, Registration
00212 from rospy.topics import Subscriber, DEFAULT_BUFF_SIZE
00213
00214
00215
00216
00217 name = 'unregistertest'
00218 rname = rospy.resolve_name(name)
00219 data_class = test_rospy.msg.Val
00220
00221 sub = Subscriber(name, data_class)
00222 self.assertEquals(None, sub.callback)
00223
00224
00225
00226 impl = get_topic_manager().get_impl(Registration.SUB, rname)
00227 self.assert_(impl == sub.impl)
00228 self.assertEquals(1, impl.ref_count)
00229 self.assertEquals([], impl.callbacks)
00230
00231
00232 sub.unregister()
00233 self.assertEquals(None, get_topic_manager().get_impl(Registration.SUB, rname))
00234
00235
00236 sub2 = Subscriber(name, data_class)
00237 sub3 = Subscriber(name, data_class)
00238
00239 impl = get_topic_manager().get_impl(Registration.SUB, rname)
00240
00241 self.assert_(impl == sub2.impl)
00242 self.assert_(impl == sub3.impl)
00243
00244 self.assertEquals([], impl.callbacks)
00245 self.assertEquals(2, impl.ref_count)
00246 sub2.unregister()
00247 self.assertEquals(1, impl.ref_count)
00248
00249 sub2.unregister()
00250 self.assertEquals(1, impl.ref_count)
00251
00252 sub3.unregister()
00253 self.assertEquals(0, impl.ref_count)
00254 self.assertEquals(None, get_topic_manager().get_impl(Registration.SUB, rname))
00255
00256
00257 cb_args5 = 5
00258 cb_args6 = 6
00259 cb_args7 = 7
00260 sub4 = Subscriber(name, data_class, callback1)
00261
00262
00263 sub5 = Subscriber(name, data_class, callback2, cb_args5)
00264 sub6 = Subscriber(name, data_class, callback2, cb_args6)
00265 sub7 = Subscriber(name, data_class, callback2, cb_args7)
00266 impl = get_topic_manager().get_impl(Registration.SUB, rname)
00267 self.assertEquals(4, impl.ref_count)
00268
00269 self.assertEquals([(callback1, None), (callback2, cb_args5), (callback2, cb_args6), (callback2, cb_args7)], impl.callbacks)
00270
00271 sub6.unregister()
00272 self.assertEquals([(callback1, None), (callback2, cb_args5), (callback2, cb_args7)], impl.callbacks)
00273 self.assertEquals(3, impl.ref_count)
00274 sub5.unregister()
00275 self.assertEquals([(callback1, None), (callback2, cb_args7)], impl.callbacks)
00276 self.assertEquals(2, impl.ref_count)
00277 sub4.unregister()
00278 self.assertEquals([(callback2, cb_args7)], impl.callbacks)
00279 self.assertEquals(1, impl.ref_count)
00280 sub7.unregister()
00281 self.assertEquals([], impl.callbacks)
00282 self.assertEquals(0, impl.ref_count)
00283 self.assertEquals(None, get_topic_manager().get_impl(Registration.SUB, rname))
00284
00285
00286 sub8 = Subscriber(name, data_class, callback1, 'hello')
00287 sub9 = Subscriber(name, data_class, callback1, 'hello')
00288 impl = get_topic_manager().get_impl(Registration.SUB, rname)
00289 self.assertEquals([(callback1, 'hello'), (callback1, 'hello')], impl.callbacks)
00290 self.assertEquals(2, impl.ref_count)
00291 sub8.unregister()
00292 self.assertEquals([(callback1, 'hello')], impl.callbacks)
00293 self.assertEquals(1, impl.ref_count)
00294 sub9.unregister()
00295 self.assertEquals([], impl.callbacks)
00296 self.assertEquals(0, impl.ref_count)
00297
00298 def test_Subscriber(self):
00299
00300
00301
00302 import rospy
00303 from rospy.impl.registration import get_topic_manager, Registration
00304 from rospy.topics import Subscriber, DEFAULT_BUFF_SIZE
00305
00306
00307
00308
00309 name = 'foo'
00310 rname = rospy.resolve_name('foo')
00311 data_class = test_rospy.msg.Val
00312
00313
00314 for n in [None, '', 1]:
00315 try:
00316 Subscriber(n, data_class)
00317 self.fail("should not allow invalid name")
00318 except ValueError: pass
00319 for d in [None, 1, TestRospyTopics]:
00320 try:
00321 Subscriber(name, d)
00322 self.fail("should now allow invalid data_class")
00323 except ValueError: pass
00324 try:
00325 Subscriber(name, None)
00326 self.fail("None should not be allowed for data_class")
00327 except ValueError: pass
00328
00329 sub = Subscriber(name, data_class)
00330 self.assertEquals(rname, sub.resolved_name)
00331 self.assertEquals(data_class, sub.data_class)
00332 self.assertEquals('test_rospy/Val', sub.type)
00333 self.assertEquals(data_class._md5sum, sub.md5sum)
00334 self.assertEquals(Registration.SUB, sub.reg_type)
00335
00336
00337 impl = get_topic_manager().get_impl(Registration.SUB, rname)
00338 self.assert_(impl == sub.impl)
00339 self.assertEquals([], impl.callbacks)
00340 self.assertEquals(rname, impl.resolved_name)
00341 self.assertEquals(data_class, impl.data_class)
00342 self.assertEquals(None, impl.queue_size)
00343 self.assertEquals(DEFAULT_BUFF_SIZE, impl.buff_size)
00344 self.failIf(impl.tcp_nodelay)
00345 self.assertEquals(1, impl.ref_count)
00346 self.failIf(impl.closed)
00347
00348
00349 name = 'foo'
00350 data_class = test_rospy.msg.Val
00351 queue_size = 1
00352 buff_size = 1
00353 sub = Subscriber(name, data_class, callback=callback1,
00354 queue_size=queue_size, buff_size=buff_size, tcp_nodelay=True)
00355 self.assertEquals(rname, sub.resolved_name)
00356
00357 self.assertEquals(rname, sub.name)
00358 self.assertEquals(data_class, sub.data_class)
00359
00360
00361 impl2 = get_topic_manager().get_impl(Registration.SUB, rname)
00362 self.assert_(impl == impl2)
00363 self.assertEquals([(callback1, None)], impl.callbacks)
00364 self.assertEquals(rname, impl.resolved_name)
00365 self.assertEquals(data_class, impl.data_class)
00366 self.assertEquals(queue_size, impl.queue_size)
00367 self.assertEquals(buff_size, impl.buff_size)
00368 self.assert_(impl.tcp_nodelay)
00369 self.assertEquals(2, impl.ref_count)
00370 self.failIf(impl.closed)
00371
00372
00373
00374
00375
00376 name = 'foo'
00377 data_class = test_rospy.msg.Val
00378 queue_size = 2
00379 buff_size = 2
00380 sub = Subscriber(name, data_class, callback=callback2,
00381 queue_size=queue_size, buff_size=buff_size, tcp_nodelay=False)
00382
00383
00384 impl2 = get_topic_manager().get_impl(Registration.SUB, rname)
00385 self.assert_(impl == impl2)
00386 self.assertEquals(set([(callback1, None), (callback2, None)]), set(impl.callbacks))
00387 self.assertEquals(queue_size, impl.queue_size)
00388 self.assertEquals(buff_size, impl.buff_size)
00389 self.assert_(impl.tcp_nodelay)
00390 self.assertEquals(3, impl.ref_count)
00391 self.failIf(impl.closed)
00392
00393 if __name__ == '__main__':
00394 import rostest
00395 rostest.unitrun('test_rospy', 'test_rospy_topic', TestRospyTopics, coverage_packages=['rospy.topics'])