00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 import time
00039
00040 import test_rospy.msg
00041
00042 def callback1(data): pass
00043 def callback2(data): pass
00044
00045
00046 import rospy.impl.transport
00047 class ConnectionOverride(rospy.impl.transport.Transport):
00048 def __init__(self, endpoint_id):
00049 super(ConnectionOverride, self).__init__(rospy.impl.transport.OUTBOUND, endpoint_id)
00050 self.endpoint_id = endpoint_id
00051 self.data = ''
00052
00053 def set_cleanup_callback(self, cb): pass
00054 def write_data(self, data):
00055 self.data = self.data + data
00056
00057
00058 class TestRospyTopics(unittest.TestCase):
00059
00060 def test_add_connection(self):
00061 from rospy.topics import _TopicImpl
00062 from std_msgs.msg import String
00063 t = _TopicImpl('/foo', String)
00064 c1 = ConnectionOverride('c1')
00065 c1b = ConnectionOverride('c1')
00066 c2 = ConnectionOverride('c2')
00067 c3 = ConnectionOverride('c3')
00068 assert not t.has_connection('c1')
00069 assert t.get_num_connections() == 0
00070 t.add_connection(c1)
00071 assert t.get_num_connections() == 1
00072 assert t.has_connection('c1')
00073 t.add_connection(c1b)
00074 assert t.get_num_connections() == 1
00075 assert t.has_connection('c1')
00076
00077 t.remove_connection(c1)
00078 assert t.has_connection('c1')
00079 t.remove_connection(c1b)
00080 assert not t.has_connection('c1')
00081
00082 t.close()
00083 assert t.get_num_connections() == 0
00084
00085 t = _TopicImpl('/foo', String)
00086 t.add_connection(c1)
00087 t.add_connection(c2)
00088 t.add_connection(c3)
00089 for x in ['c1', 'c2', 'c3']:
00090 assert t.has_connection(x)
00091 assert t.get_num_connections() == 3
00092
00093 t.close()
00094 assert t.get_num_connections() == 0
00095
00096 def test_Publisher(self):
00097 import rospy
00098 from rospy.impl.registration import get_topic_manager, Registration
00099 from rospy.topics import Publisher, DEFAULT_BUFF_SIZE
00100
00101
00102 name = 'foo'
00103 rname = rospy.resolve_name('foo')
00104 data_class = test_rospy.msg.Val
00105
00106
00107 for n in [None, '', 1]:
00108 try:
00109 Publisher(n, data_class)
00110 self.fail("should not allow invalid name")
00111 except ValueError: pass
00112 for d in [None, 1, TestRospyTopics]:
00113 try:
00114 Publisher(name, d)
00115 self.fail("should now allow invalid data_class")
00116 except ValueError: pass
00117 try:
00118 Publisher(name, None)
00119 self.fail("None should not be allowed for data_class")
00120 except ValueError: pass
00121
00122
00123 pub = Publisher(name, data_class)
00124 self.assertEquals(rname, pub.resolved_name)
00125
00126 self.assertEquals(rname, pub.name)
00127 self.assertEquals(data_class, pub.data_class)
00128 self.assertEquals('test_rospy/Val', pub.type)
00129 self.assertEquals(data_class._md5sum, pub.md5sum)
00130 self.assertEquals(Registration.PUB, pub.reg_type)
00131
00132
00133 impl = get_topic_manager().get_impl(Registration.PUB, rname)
00134 self.assert_(impl == pub.impl)
00135 self.assertEquals(rname, impl.resolved_name)
00136 self.assertEquals(data_class, impl.data_class)
00137 self.failIf(impl.is_latch)
00138 self.assertEquals(None, impl.latch)
00139 self.assertEquals(0, impl.seq)
00140 self.assertEquals(1, impl.ref_count)
00141 self.assertEquals('', impl.buff.getvalue())
00142 self.failIf(impl.closed)
00143 self.failIf(impl.has_connections())
00144
00145 from test_rospy.msg import Val
00146 impl.publish(Val('hello world-1'))
00147
00148
00149 self.assertEquals(0, impl.message_data_sent)
00150
00151 impl.acquire()
00152 impl.release()
00153
00154
00155
00156
00157 co1 = ConnectionOverride('co1')
00158 self.failIf(impl.has_connection('co1'))
00159 impl.add_connection(co1)
00160 self.assert_(impl.has_connection('co1'))
00161 self.assert_(impl.has_connections())
00162 impl.publish(Val('hello world-1'), connection_override=co1)
00163
00164 try:
00165 from cStringIO import StringIO
00166 except ImportError:
00167 from io import StringIO
00168 buff = StringIO()
00169 Val('hello world-1').serialize(buff)
00170
00171 self.assertEquals(co1.data[4:], buff.getvalue())
00172 self.assertEquals(None, impl.latch)
00173
00174
00175 pub = Publisher(name, data_class, latch=True)
00176 impl = get_topic_manager().get_impl(Registration.PUB, rname)
00177
00178 self.assert_(impl == pub.impl)
00179 self.assertEquals(True, impl.is_latch)
00180 self.assertEquals(None, impl.latch)
00181 self.assertEquals(2, impl.ref_count)
00182
00183 co2 = ConnectionOverride('co2')
00184 self.failIf(impl.has_connection('co2'))
00185 impl.add_connection(co2)
00186 for n in ['co1', 'co2']:
00187 self.assert_(impl.has_connection(n))
00188 self.assert_(impl.has_connections())
00189 v = Val('hello world-2')
00190 impl.publish(v, connection_override=co2)
00191 self.assert_(v == impl.latch)
00192
00193 buff = StringIO()
00194 Val('hello world-2').serialize(buff)
00195
00196 self.assertEquals(co2.data[4:], buff.getvalue())
00197
00198
00199 co3 = ConnectionOverride('co3')
00200 self.failIf(impl.has_connection('co3'))
00201 impl.add_connection(co3)
00202 for n in ['co1', 'co2', 'co3']:
00203 self.assert_(impl.has_connection(n))
00204 self.assert_(impl.has_connections())
00205 self.assertEquals(co3.data[4:], buff.getvalue())
00206
00207
00208
00209 self.assert_(impl.has_connection('co1'))
00210 impl.remove_connection(co1)
00211 self.failIf(impl.has_connection('co1'))
00212 self.assert_(impl.has_connections())
00213
00214 self.assert_(impl.has_connection('co3'))
00215 impl.remove_connection(co3)
00216 self.failIf(impl.has_connection('co3'))
00217 self.assert_(impl.has_connections())
00218
00219 self.assert_(impl.has_connection('co2'))
00220 impl.remove_connection(co2)
00221 self.failIf(impl.has_connection('co2'))
00222 self.failIf(impl.has_connections())
00223
00224
00225 pub = Publisher('bar', data_class, latch=True)
00226 v = Val('no connection test')
00227 pub.impl.publish(v)
00228 self.assert_(v == pub.impl.latch)
00229
00230
00231 h = {'foo': 'bar', 'fuga': 'hoge'}
00232 pub = Publisher('header_test', data_class, headers=h)
00233 self.assertEquals(h, pub.impl.headers)
00234
00235 def test_Subscriber_unregister(self):
00236
00237
00238 import rospy
00239 from rospy.impl.registration import get_topic_manager, Registration
00240 from rospy.topics import Subscriber, DEFAULT_BUFF_SIZE
00241
00242
00243
00244
00245 name = 'unregistertest'
00246 rname = rospy.resolve_name(name)
00247 data_class = test_rospy.msg.Val
00248
00249 sub = Subscriber(name, data_class)
00250 self.assertEquals(None, sub.callback)
00251
00252
00253
00254 impl = get_topic_manager().get_impl(Registration.SUB, rname)
00255 self.assert_(impl == sub.impl)
00256 self.assertEquals(1, impl.ref_count)
00257 self.assertEquals([], impl.callbacks)
00258
00259
00260 sub.unregister()
00261 self.assertEquals(None, get_topic_manager().get_impl(Registration.SUB, rname))
00262
00263
00264 sub2 = Subscriber(name, data_class)
00265 sub3 = Subscriber(name, data_class)
00266
00267 impl = get_topic_manager().get_impl(Registration.SUB, rname)
00268
00269 self.assert_(impl == sub2.impl)
00270 self.assert_(impl == sub3.impl)
00271
00272 self.assertEquals([], impl.callbacks)
00273 self.assertEquals(2, impl.ref_count)
00274 sub2.unregister()
00275 self.assertEquals(1, impl.ref_count)
00276
00277 sub2.unregister()
00278 self.assertEquals(1, impl.ref_count)
00279
00280 sub3.unregister()
00281 self.assertEquals(0, impl.ref_count)
00282 self.assertEquals(None, get_topic_manager().get_impl(Registration.SUB, rname))
00283
00284
00285 cb_args5 = 5
00286 cb_args6 = 6
00287 cb_args7 = 7
00288 sub4 = Subscriber(name, data_class, callback1)
00289
00290
00291 sub5 = Subscriber(name, data_class, callback2, cb_args5)
00292 sub6 = Subscriber(name, data_class, callback2, cb_args6)
00293 sub7 = Subscriber(name, data_class, callback2, cb_args7)
00294 impl = get_topic_manager().get_impl(Registration.SUB, rname)
00295 self.assertEquals(4, impl.ref_count)
00296
00297 self.assertEquals([(callback1, None), (callback2, cb_args5), (callback2, cb_args6), (callback2, cb_args7)], impl.callbacks)
00298
00299 sub6.unregister()
00300 self.assertEquals([(callback1, None), (callback2, cb_args5), (callback2, cb_args7)], impl.callbacks)
00301 self.assertEquals(3, impl.ref_count)
00302 sub5.unregister()
00303 self.assertEquals([(callback1, None), (callback2, cb_args7)], impl.callbacks)
00304 self.assertEquals(2, impl.ref_count)
00305 sub4.unregister()
00306 self.assertEquals([(callback2, cb_args7)], impl.callbacks)
00307 self.assertEquals(1, impl.ref_count)
00308 sub7.unregister()
00309 self.assertEquals([], impl.callbacks)
00310 self.assertEquals(0, impl.ref_count)
00311 self.assertEquals(None, get_topic_manager().get_impl(Registration.SUB, rname))
00312
00313
00314 sub8 = Subscriber(name, data_class, callback1, 'hello')
00315 sub9 = Subscriber(name, data_class, callback1, 'hello')
00316 impl = get_topic_manager().get_impl(Registration.SUB, rname)
00317 self.assertEquals([(callback1, 'hello'), (callback1, 'hello')], impl.callbacks)
00318 self.assertEquals(2, impl.ref_count)
00319 sub8.unregister()
00320 self.assertEquals([(callback1, 'hello')], impl.callbacks)
00321 self.assertEquals(1, impl.ref_count)
00322 sub9.unregister()
00323 self.assertEquals([], impl.callbacks)
00324 self.assertEquals(0, impl.ref_count)
00325
00326 def test_Subscriber(self):
00327
00328
00329
00330 import rospy
00331 from rospy.impl.registration import get_topic_manager, Registration
00332 from rospy.topics import Subscriber, DEFAULT_BUFF_SIZE
00333
00334
00335
00336
00337 name = 'foo'
00338 rname = rospy.resolve_name('foo')
00339 data_class = test_rospy.msg.Val
00340
00341
00342 for n in [None, '', 1]:
00343 try:
00344 Subscriber(n, data_class)
00345 self.fail("should not allow invalid name")
00346 except ValueError: pass
00347 for d in [None, 1, TestRospyTopics]:
00348 try:
00349 Subscriber(name, d)
00350 self.fail("should now allow invalid data_class")
00351 except ValueError: pass
00352 try:
00353 Subscriber(name, None)
00354 self.fail("None should not be allowed for data_class")
00355 except ValueError: pass
00356
00357 sub = Subscriber(name, data_class)
00358 self.assertEquals(rname, sub.resolved_name)
00359 self.assertEquals(data_class, sub.data_class)
00360 self.assertEquals('test_rospy/Val', sub.type)
00361 self.assertEquals(data_class._md5sum, sub.md5sum)
00362 self.assertEquals(Registration.SUB, sub.reg_type)
00363
00364
00365 impl = get_topic_manager().get_impl(Registration.SUB, rname)
00366 self.assert_(impl == sub.impl)
00367 self.assertEquals([], impl.callbacks)
00368 self.assertEquals(rname, impl.resolved_name)
00369 self.assertEquals(data_class, impl.data_class)
00370 self.assertEquals(None, impl.queue_size)
00371 self.assertEquals(DEFAULT_BUFF_SIZE, impl.buff_size)
00372 self.failIf(impl.tcp_nodelay)
00373 self.assertEquals(1, impl.ref_count)
00374 self.failIf(impl.closed)
00375
00376
00377 name = 'foo'
00378 data_class = test_rospy.msg.Val
00379 queue_size = 1
00380 buff_size = 1
00381 sub = Subscriber(name, data_class, callback=callback1,
00382 queue_size=queue_size, buff_size=buff_size, tcp_nodelay=True)
00383 self.assertEquals(rname, sub.resolved_name)
00384
00385 self.assertEquals(rname, sub.name)
00386 self.assertEquals(data_class, sub.data_class)
00387
00388
00389 impl2 = get_topic_manager().get_impl(Registration.SUB, rname)
00390 self.assert_(impl == impl2)
00391 self.assertEquals([(callback1, None)], impl.callbacks)
00392 self.assertEquals(rname, impl.resolved_name)
00393 self.assertEquals(data_class, impl.data_class)
00394 self.assertEquals(queue_size, impl.queue_size)
00395 self.assertEquals(buff_size, impl.buff_size)
00396 self.assert_(impl.tcp_nodelay)
00397 self.assertEquals(2, impl.ref_count)
00398 self.failIf(impl.closed)
00399
00400
00401
00402
00403
00404 name = 'foo'
00405 data_class = test_rospy.msg.Val
00406 queue_size = 2
00407 buff_size = 2
00408 sub = Subscriber(name, data_class, callback=callback2,
00409 queue_size=queue_size, buff_size=buff_size, tcp_nodelay=False)
00410
00411
00412 impl2 = get_topic_manager().get_impl(Registration.SUB, rname)
00413 self.assert_(impl == impl2)
00414 self.assertEquals(set([(callback1, None), (callback2, None)]), set(impl.callbacks))
00415 self.assertEquals(queue_size, impl.queue_size)
00416 self.assertEquals(buff_size, impl.buff_size)
00417 self.assert_(impl.tcp_nodelay)
00418 self.assertEquals(3, impl.ref_count)
00419 self.failIf(impl.closed)
00420
00421 def test_Poller(self):
00422
00423 from rospy.topics import Poller
00424 p = Poller()
00425 p.add_fd(1)
00426 for x in p.error_iter():
00427 pass
00428 p.remove_fd(1)
00429 for x in p.error_iter():
00430 pass
00431