00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import os
00035 import sys
00036 import struct
00037 import unittest
00038 import time
00039
00040 import test_rospy.msg
00041
00042 def callback1(data): pass
00043 def callback2(data): pass
00044
00045
00046 import rospy.impl.transport
00047 class ConnectionOverride(rospy.impl.transport.Transport):
00048 def __init__(self, endpoint_id):
00049 super(ConnectionOverride, self).__init__(rospy.impl.transport.OUTBOUND, endpoint_id)
00050 self.endpoint_id = endpoint_id
00051 self.data = ''
00052
00053 def set_cleanup_callback(self, cb): pass
00054 def write_data(self, data):
00055 self.data = self.data + data
00056
00057
00058 class TestRospyTopics(unittest.TestCase):
00059
00060 def test_add_connection(self):
00061 from rospy.topics import _TopicImpl
00062 from std_msgs.msg import String
00063 t = _TopicImpl('/foo', String)
00064 c1 = ConnectionOverride('c1')
00065 c1b = ConnectionOverride('c1')
00066 c2 = ConnectionOverride('c2')
00067 c3 = ConnectionOverride('c3')
00068 assert not t.has_connection('c1')
00069 assert t.get_num_connections() == 0
00070 t.add_connection(c1)
00071 assert t.get_num_connections() == 1
00072 assert t.has_connection('c1')
00073 t.add_connection(c1b)
00074 assert t.get_num_connections() == 1
00075 assert t.has_connection('c1')
00076
00077 t.remove_connection(c1)
00078 assert t.has_connection('c1')
00079 t.remove_connection(c1b)
00080 assert not t.has_connection('c1')
00081
00082 t.close()
00083 assert t.get_num_connections() == 0
00084
00085 t = _TopicImpl('/foo', String)
00086 t.add_connection(c1)
00087 t.add_connection(c2)
00088 t.add_connection(c3)
00089 for x in ['c1', 'c2', 'c3']:
00090 assert t.has_connection(x)
00091 assert t.get_num_connections() == 3
00092
00093 t.close()
00094 assert t.get_num_connections() == 0
00095
00096 def test_Publisher(self):
00097 import rospy
00098 from rospy.impl.registration import get_topic_manager, Registration
00099 from rospy.topics import Publisher, DEFAULT_BUFF_SIZE
00100
00101
00102 name = 'foo'
00103 rname = rospy.resolve_name('foo')
00104 data_class = test_rospy.msg.Val
00105
00106
00107 for n in [None, '', 1]:
00108 try:
00109 Publisher(n, data_class)
00110 self.fail("should not allow invalid name")
00111 except ValueError: pass
00112 for d in [None, 1, TestRospyTopics]:
00113 try:
00114 Publisher(name, d)
00115 self.fail("should now allow invalid data_class")
00116 except ValueError: pass
00117 try:
00118 Publisher(name, None)
00119 self.fail("None should not be allowed for data_class")
00120 except ValueError: pass
00121
00122
00123 pub = Publisher(name, data_class)
00124 self.assertEquals(rname, pub.resolved_name)
00125
00126 self.assertEquals(rname, pub.name)
00127 self.assertEquals(data_class, pub.data_class)
00128 self.assertEquals('test_rospy/Val', pub.type)
00129 self.assertEquals(data_class._md5sum, pub.md5sum)
00130 self.assertEquals(Registration.PUB, pub.reg_type)
00131
00132
00133 impl = get_topic_manager().get_impl(Registration.PUB, rname)
00134 self.assert_(impl == pub.impl)
00135 self.assertEquals(rname, impl.resolved_name)
00136 self.assertEquals(data_class, impl.data_class)
00137 self.failIf(impl.is_latch)
00138 self.assertEquals(None, impl.latch)
00139 self.assertEquals(0, impl.seq)
00140 self.assertEquals(1, impl.ref_count)
00141 self.assertEquals('', impl.buff.getvalue())
00142 self.failIf(impl.closed)
00143 self.failIf(impl.has_connections())
00144
00145 from test_rospy.msg import Val
00146 impl.publish(Val('hello world-1'))
00147
00148
00149 self.assertEquals(0, impl.message_data_sent)
00150
00151 impl.acquire()
00152 impl.release()
00153
00154
00155
00156
00157 co1 = ConnectionOverride('co1')
00158 self.failIf(impl.has_connection('co1'))
00159 impl.add_connection(co1)
00160 self.assert_(impl.has_connection('co1'))
00161 self.assert_(impl.has_connections())
00162 impl.publish(Val('hello world-1'), connection_override=co1)
00163
00164 import cStringIO
00165 buff = cStringIO.StringIO()
00166 Val('hello world-1').serialize(buff)
00167
00168 self.assertEquals(co1.data[4:], buff.getvalue())
00169 self.assertEquals(None, impl.latch)
00170
00171
00172 pub = Publisher(name, data_class, latch=True)
00173 impl = get_topic_manager().get_impl(Registration.PUB, rname)
00174
00175 self.assert_(impl == pub.impl)
00176 self.assertEquals(True, impl.is_latch)
00177 self.assertEquals(None, impl.latch)
00178 self.assertEquals(2, impl.ref_count)
00179
00180 co2 = ConnectionOverride('co2')
00181 self.failIf(impl.has_connection('co2'))
00182 impl.add_connection(co2)
00183 for n in ['co1', 'co2']:
00184 self.assert_(impl.has_connection(n))
00185 self.assert_(impl.has_connections())
00186 v = Val('hello world-2')
00187 impl.publish(v, connection_override=co2)
00188 self.assert_(v == impl.latch)
00189
00190 buff = cStringIO.StringIO()
00191 Val('hello world-2').serialize(buff)
00192
00193 self.assertEquals(co2.data[4:], buff.getvalue())
00194
00195
00196 co3 = ConnectionOverride('co3')
00197 self.failIf(impl.has_connection('co3'))
00198 impl.add_connection(co3)
00199 for n in ['co1', 'co2', 'co3']:
00200 self.assert_(impl.has_connection(n))
00201 self.assert_(impl.has_connections())
00202 self.assertEquals(co3.data[4:], buff.getvalue())
00203
00204
00205
00206 self.assert_(impl.has_connection('co1'))
00207 impl.remove_connection(co1)
00208 self.failIf(impl.has_connection('co1'))
00209 self.assert_(impl.has_connections())
00210
00211 self.assert_(impl.has_connection('co3'))
00212 impl.remove_connection(co3)
00213 self.failIf(impl.has_connection('co3'))
00214 self.assert_(impl.has_connections())
00215
00216 self.assert_(impl.has_connection('co2'))
00217 impl.remove_connection(co2)
00218 self.failIf(impl.has_connection('co2'))
00219 self.failIf(impl.has_connections())
00220
00221
00222 pub = Publisher('bar', data_class, latch=True)
00223 v = Val('no connection test')
00224 pub.impl.publish(v)
00225 self.assert_(v == pub.impl.latch)
00226
00227
00228 h = {'foo': 'bar', 'fuga': 'hoge'}
00229 pub = Publisher('header_test', data_class, headers=h)
00230 self.assertEquals(h, pub.impl.headers)
00231
00232 def test_Subscriber_unregister(self):
00233
00234
00235 import rospy
00236 from rospy.impl.registration import get_topic_manager, Registration
00237 from rospy.topics import Subscriber, DEFAULT_BUFF_SIZE
00238
00239
00240
00241
00242 name = 'unregistertest'
00243 rname = rospy.resolve_name(name)
00244 data_class = test_rospy.msg.Val
00245
00246 sub = Subscriber(name, data_class)
00247 self.assertEquals(None, sub.callback)
00248
00249
00250
00251 impl = get_topic_manager().get_impl(Registration.SUB, rname)
00252 self.assert_(impl == sub.impl)
00253 self.assertEquals(1, impl.ref_count)
00254 self.assertEquals([], impl.callbacks)
00255
00256
00257 sub.unregister()
00258 self.assertEquals(None, get_topic_manager().get_impl(Registration.SUB, rname))
00259
00260
00261 sub2 = Subscriber(name, data_class)
00262 sub3 = Subscriber(name, data_class)
00263
00264 impl = get_topic_manager().get_impl(Registration.SUB, rname)
00265
00266 self.assert_(impl == sub2.impl)
00267 self.assert_(impl == sub3.impl)
00268
00269 self.assertEquals([], impl.callbacks)
00270 self.assertEquals(2, impl.ref_count)
00271 sub2.unregister()
00272 self.assertEquals(1, impl.ref_count)
00273
00274 sub2.unregister()
00275 self.assertEquals(1, impl.ref_count)
00276
00277 sub3.unregister()
00278 self.assertEquals(0, impl.ref_count)
00279 self.assertEquals(None, get_topic_manager().get_impl(Registration.SUB, rname))
00280
00281
00282 cb_args5 = 5
00283 cb_args6 = 6
00284 cb_args7 = 7
00285 sub4 = Subscriber(name, data_class, callback1)
00286
00287
00288 sub5 = Subscriber(name, data_class, callback2, cb_args5)
00289 sub6 = Subscriber(name, data_class, callback2, cb_args6)
00290 sub7 = Subscriber(name, data_class, callback2, cb_args7)
00291 impl = get_topic_manager().get_impl(Registration.SUB, rname)
00292 self.assertEquals(4, impl.ref_count)
00293
00294 self.assertEquals([(callback1, None), (callback2, cb_args5), (callback2, cb_args6), (callback2, cb_args7)], impl.callbacks)
00295
00296 sub6.unregister()
00297 self.assertEquals([(callback1, None), (callback2, cb_args5), (callback2, cb_args7)], impl.callbacks)
00298 self.assertEquals(3, impl.ref_count)
00299 sub5.unregister()
00300 self.assertEquals([(callback1, None), (callback2, cb_args7)], impl.callbacks)
00301 self.assertEquals(2, impl.ref_count)
00302 sub4.unregister()
00303 self.assertEquals([(callback2, cb_args7)], impl.callbacks)
00304 self.assertEquals(1, impl.ref_count)
00305 sub7.unregister()
00306 self.assertEquals([], impl.callbacks)
00307 self.assertEquals(0, impl.ref_count)
00308 self.assertEquals(None, get_topic_manager().get_impl(Registration.SUB, rname))
00309
00310
00311 sub8 = Subscriber(name, data_class, callback1, 'hello')
00312 sub9 = Subscriber(name, data_class, callback1, 'hello')
00313 impl = get_topic_manager().get_impl(Registration.SUB, rname)
00314 self.assertEquals([(callback1, 'hello'), (callback1, 'hello')], impl.callbacks)
00315 self.assertEquals(2, impl.ref_count)
00316 sub8.unregister()
00317 self.assertEquals([(callback1, 'hello')], impl.callbacks)
00318 self.assertEquals(1, impl.ref_count)
00319 sub9.unregister()
00320 self.assertEquals([], impl.callbacks)
00321 self.assertEquals(0, impl.ref_count)
00322
00323 def test_Subscriber(self):
00324
00325
00326
00327 import rospy
00328 from rospy.impl.registration import get_topic_manager, Registration
00329 from rospy.topics import Subscriber, DEFAULT_BUFF_SIZE
00330
00331
00332
00333
00334 name = 'foo'
00335 rname = rospy.resolve_name('foo')
00336 data_class = test_rospy.msg.Val
00337
00338
00339 for n in [None, '', 1]:
00340 try:
00341 Subscriber(n, data_class)
00342 self.fail("should not allow invalid name")
00343 except ValueError: pass
00344 for d in [None, 1, TestRospyTopics]:
00345 try:
00346 Subscriber(name, d)
00347 self.fail("should now allow invalid data_class")
00348 except ValueError: pass
00349 try:
00350 Subscriber(name, None)
00351 self.fail("None should not be allowed for data_class")
00352 except ValueError: pass
00353
00354 sub = Subscriber(name, data_class)
00355 self.assertEquals(rname, sub.resolved_name)
00356 self.assertEquals(data_class, sub.data_class)
00357 self.assertEquals('test_rospy/Val', sub.type)
00358 self.assertEquals(data_class._md5sum, sub.md5sum)
00359 self.assertEquals(Registration.SUB, sub.reg_type)
00360
00361
00362 impl = get_topic_manager().get_impl(Registration.SUB, rname)
00363 self.assert_(impl == sub.impl)
00364 self.assertEquals([], impl.callbacks)
00365 self.assertEquals(rname, impl.resolved_name)
00366 self.assertEquals(data_class, impl.data_class)
00367 self.assertEquals(None, impl.queue_size)
00368 self.assertEquals(DEFAULT_BUFF_SIZE, impl.buff_size)
00369 self.failIf(impl.tcp_nodelay)
00370 self.assertEquals(1, impl.ref_count)
00371 self.failIf(impl.closed)
00372
00373
00374 name = 'foo'
00375 data_class = test_rospy.msg.Val
00376 queue_size = 1
00377 buff_size = 1
00378 sub = Subscriber(name, data_class, callback=callback1,
00379 queue_size=queue_size, buff_size=buff_size, tcp_nodelay=True)
00380 self.assertEquals(rname, sub.resolved_name)
00381
00382 self.assertEquals(rname, sub.name)
00383 self.assertEquals(data_class, sub.data_class)
00384
00385
00386 impl2 = get_topic_manager().get_impl(Registration.SUB, rname)
00387 self.assert_(impl == impl2)
00388 self.assertEquals([(callback1, None)], impl.callbacks)
00389 self.assertEquals(rname, impl.resolved_name)
00390 self.assertEquals(data_class, impl.data_class)
00391 self.assertEquals(queue_size, impl.queue_size)
00392 self.assertEquals(buff_size, impl.buff_size)
00393 self.assert_(impl.tcp_nodelay)
00394 self.assertEquals(2, impl.ref_count)
00395 self.failIf(impl.closed)
00396
00397
00398
00399
00400
00401 name = 'foo'
00402 data_class = test_rospy.msg.Val
00403 queue_size = 2
00404 buff_size = 2
00405 sub = Subscriber(name, data_class, callback=callback2,
00406 queue_size=queue_size, buff_size=buff_size, tcp_nodelay=False)
00407
00408
00409 impl2 = get_topic_manager().get_impl(Registration.SUB, rname)
00410 self.assert_(impl == impl2)
00411 self.assertEquals(set([(callback1, None), (callback2, None)]), set(impl.callbacks))
00412 self.assertEquals(queue_size, impl.queue_size)
00413 self.assertEquals(buff_size, impl.buff_size)
00414 self.assert_(impl.tcp_nodelay)
00415 self.assertEquals(3, impl.ref_count)
00416 self.failIf(impl.closed)
00417
00418 def test_Poller(self):
00419
00420 from rospy.topics import Poller
00421 p = Poller()
00422 p.add_fd(1)
00423 for x in p.error_iter():
00424 pass
00425 p.remove_fd(1)
00426 for x in p.error_iter():
00427 pass
00428