46 import rospy.impl.transport
49 super(ConnectionOverride, self).
__init__(rospy.impl.transport.OUTBOUND, endpoint_id)
61 from rospy.topics
import _TopicImpl
63 t = _TopicImpl(
'/foo', String)
68 assert not t.has_connection(
'c1')
69 assert t.get_num_connections() == 0
71 assert t.get_num_connections() == 1
72 assert t.has_connection(
'c1')
74 assert t.get_num_connections() == 1
75 assert t.has_connection(
'c1')
77 t.remove_connection(c1)
78 assert t.has_connection(
'c1')
79 t.remove_connection(c1b)
80 assert not t.has_connection(
'c1')
83 assert t.get_num_connections() == 0
85 t = _TopicImpl(
'/foo', String)
89 for x
in [
'c1',
'c2',
'c3']:
90 assert t.has_connection(x)
91 assert t.get_num_connections() == 3
94 assert t.get_num_connections() == 0
98 from rospy.impl.registration
import get_topic_manager, Registration
99 from rospy.topics
import Publisher, DEFAULT_BUFF_SIZE
103 rname = rospy.resolve_name(
'foo')
104 data_class = test_rospy.msg.Val
107 for n
in [
None,
'', 1]:
109 Publisher(n, data_class)
110 self.fail(
"should not allow invalid name")
111 except ValueError:
pass 112 for d
in [
None, 1, TestRospyTopics]:
115 self.fail(
"should now allow invalid data_class")
116 except ValueError:
pass 118 Publisher(name,
None)
119 self.fail(
"None should not be allowed for data_class")
120 except ValueError:
pass 123 pub = Publisher(name, data_class)
124 self.assertEquals(rname, pub.resolved_name)
126 self.assertEquals(rname, pub.name)
127 self.assertEquals(data_class, pub.data_class)
128 self.assertEquals(
'test_rospy/Val', pub.type)
129 self.assertEquals(data_class._md5sum, pub.md5sum)
130 self.assertEquals(Registration.PUB, pub.reg_type)
133 impl = get_topic_manager().get_impl(Registration.PUB, rname)
134 self.assert_(impl == pub.impl)
135 self.assertEquals(rname, impl.resolved_name)
136 self.assertEquals(data_class, impl.data_class)
137 self.failIf(impl.is_latch)
138 self.assertEquals(
None, impl.latch)
139 self.assertEquals(0, impl.seq)
140 self.assertEquals(1, impl.ref_count)
141 self.assertEquals(b
'', impl.buff.getvalue())
142 self.failIf(impl.closed)
143 self.failIf(impl.has_connections())
145 from test_rospy.msg
import Val
146 impl.publish(Val(
'hello world-1'))
149 self.assertEquals(0, impl.message_data_sent)
158 self.failIf(impl.has_connection(
'co1'))
159 impl.add_connection(co1)
160 self.assert_(impl.has_connection(
'co1'))
161 self.assert_(impl.has_connections())
162 impl.publish(Val(
'hello world-1'), connection_override=co1)
165 from cStringIO
import StringIO
167 from io
import StringIO
169 Val(
'hello world-1').serialize(buff)
171 self.assertEquals(co1.data[4:], buff.getvalue())
172 self.assertEquals(
None, impl.latch)
175 pub = Publisher(name, data_class, latch=
True)
176 impl = get_topic_manager().get_impl(Registration.PUB, rname)
178 self.assert_(impl == pub.impl)
179 self.assertEquals(
True, impl.is_latch)
180 self.assertEquals(
None, impl.latch)
181 self.assertEquals(2, impl.ref_count)
184 self.failIf(impl.has_connection(
'co2'))
185 impl.add_connection(co2)
186 for n
in [
'co1',
'co2']:
187 self.assert_(impl.has_connection(n))
188 self.assert_(impl.has_connections())
189 v = Val(
'hello world-2')
190 impl.publish(v, connection_override=co2)
191 self.assert_(v == impl.latch)
194 Val(
'hello world-2').serialize(buff)
196 self.assertEquals(co2.data[4:], buff.getvalue())
200 self.failIf(impl.has_connection(
'co3'))
201 impl.add_connection(co3)
202 for n
in [
'co1',
'co2',
'co3']:
203 self.assert_(impl.has_connection(n))
204 self.assert_(impl.has_connections())
205 self.assertEquals(co3.data[4:], buff.getvalue())
209 self.assert_(impl.has_connection(
'co1'))
210 impl.remove_connection(co1)
211 self.failIf(impl.has_connection(
'co1'))
212 self.assert_(impl.has_connections())
214 self.assert_(impl.has_connection(
'co3'))
215 impl.remove_connection(co3)
216 self.failIf(impl.has_connection(
'co3'))
217 self.assert_(impl.has_connections())
219 self.assert_(impl.has_connection(
'co2'))
220 impl.remove_connection(co2)
221 self.failIf(impl.has_connection(
'co2'))
222 self.failIf(impl.has_connections())
225 pub = Publisher(
'bar', data_class, latch=
True)
226 v = Val(
'no connection test')
228 self.assert_(v == pub.impl.latch)
231 h = {
'foo':
'bar',
'fuga':
'hoge'}
232 pub = Publisher(
'header_test', data_class, headers=h)
233 self.assertEquals(h, pub.impl.headers)
239 from rospy.impl.registration
import get_topic_manager, Registration
240 from rospy.topics
import Subscriber, DEFAULT_BUFF_SIZE
245 name =
'unregistertest' 246 rname = rospy.resolve_name(name)
247 data_class = test_rospy.msg.Val
249 sub = Subscriber(name, data_class)
250 self.assertEquals(
None, sub.callback)
254 impl = get_topic_manager().get_impl(Registration.SUB, rname)
255 self.assert_(impl == sub.impl)
256 self.assertEquals(1, impl.ref_count)
257 self.assertEquals([], impl.callbacks)
261 self.assertEquals(
None, get_topic_manager().get_impl(Registration.SUB, rname))
264 sub2 = Subscriber(name, data_class)
265 sub3 = Subscriber(name, data_class)
267 impl = get_topic_manager().get_impl(Registration.SUB, rname)
269 self.assert_(impl == sub2.impl)
270 self.assert_(impl == sub3.impl)
272 self.assertEquals([], impl.callbacks)
273 self.assertEquals(2, impl.ref_count)
275 self.assertEquals(1, impl.ref_count)
278 self.assertEquals(1, impl.ref_count)
281 self.assertEquals(0, impl.ref_count)
282 self.assertEquals(
None, get_topic_manager().get_impl(Registration.SUB, rname))
288 sub4 = Subscriber(name, data_class, callback1)
291 sub5 = Subscriber(name, data_class, callback2, cb_args5)
292 sub6 = Subscriber(name, data_class, callback2, cb_args6)
293 sub7 = Subscriber(name, data_class, callback2, cb_args7)
294 impl = get_topic_manager().get_impl(Registration.SUB, rname)
295 self.assertEquals(4, impl.ref_count)
297 self.assertEquals([(callback1,
None), (callback2, cb_args5), (callback2, cb_args6), (callback2, cb_args7)], impl.callbacks)
300 self.assertEquals([(callback1,
None), (callback2, cb_args5), (callback2, cb_args7)], impl.callbacks)
301 self.assertEquals(3, impl.ref_count)
303 self.assertEquals([(callback1,
None), (callback2, cb_args7)], impl.callbacks)
304 self.assertEquals(2, impl.ref_count)
306 self.assertEquals([(callback2, cb_args7)], impl.callbacks)
307 self.assertEquals(1, impl.ref_count)
309 self.assertEquals([], impl.callbacks)
310 self.assertEquals(0, impl.ref_count)
311 self.assertEquals(
None, get_topic_manager().get_impl(Registration.SUB, rname))
314 sub8 = Subscriber(name, data_class, callback1,
'hello')
315 sub9 = Subscriber(name, data_class, callback1,
'hello')
316 impl = get_topic_manager().get_impl(Registration.SUB, rname)
317 self.assertEquals([(callback1,
'hello'), (callback1,
'hello')], impl.callbacks)
318 self.assertEquals(2, impl.ref_count)
320 self.assertEquals([(callback1,
'hello')], impl.callbacks)
321 self.assertEquals(1, impl.ref_count)
323 self.assertEquals([], impl.callbacks)
324 self.assertEquals(0, impl.ref_count)
331 from rospy.impl.registration
import get_topic_manager, Registration
332 from rospy.topics
import Subscriber, DEFAULT_BUFF_SIZE
338 rname = rospy.resolve_name(
'foo')
339 data_class = test_rospy.msg.Val
342 for n
in [
None,
'', 1]:
344 Subscriber(n, data_class)
345 self.fail(
"should not allow invalid name")
346 except ValueError:
pass 347 for d
in [
None, 1, TestRospyTopics]:
350 self.fail(
"should now allow invalid data_class")
351 except ValueError:
pass 353 Subscriber(name,
None)
354 self.fail(
"None should not be allowed for data_class")
355 except ValueError:
pass 357 sub = Subscriber(name, data_class)
358 self.assertEquals(rname, sub.resolved_name)
359 self.assertEquals(data_class, sub.data_class)
360 self.assertEquals(
'test_rospy/Val', sub.type)
361 self.assertEquals(data_class._md5sum, sub.md5sum)
362 self.assertEquals(Registration.SUB, sub.reg_type)
365 impl = get_topic_manager().get_impl(Registration.SUB, rname)
366 self.assert_(impl == sub.impl)
367 self.assertEquals([], impl.callbacks)
368 self.assertEquals(rname, impl.resolved_name)
369 self.assertEquals(data_class, impl.data_class)
370 self.assertEquals(
None, impl.queue_size)
371 self.assertEquals(DEFAULT_BUFF_SIZE, impl.buff_size)
372 self.failIf(impl.tcp_nodelay)
373 self.assertEquals(1, impl.ref_count)
374 self.failIf(impl.closed)
378 data_class = test_rospy.msg.Val
381 sub = Subscriber(name, data_class, callback=callback1,
382 queue_size=queue_size, buff_size=buff_size, tcp_nodelay=
True)
383 self.assertEquals(rname, sub.resolved_name)
385 self.assertEquals(rname, sub.name)
386 self.assertEquals(data_class, sub.data_class)
389 impl2 = get_topic_manager().get_impl(Registration.SUB, rname)
390 self.assert_(impl == impl2)
391 self.assertEquals([(callback1,
None)], impl.callbacks)
392 self.assertEquals(rname, impl.resolved_name)
393 self.assertEquals(data_class, impl.data_class)
394 self.assertEquals(queue_size, impl.queue_size)
395 self.assertEquals(buff_size, impl.buff_size)
396 self.assert_(impl.tcp_nodelay)
397 self.assertEquals(2, impl.ref_count)
398 self.failIf(impl.closed)
405 data_class = test_rospy.msg.Val
408 sub = Subscriber(name, data_class, callback=callback2,
409 queue_size=queue_size, buff_size=buff_size, tcp_nodelay=
False)
412 impl2 = get_topic_manager().get_impl(Registration.SUB, rname)
413 self.assert_(impl == impl2)
414 self.assertEquals(set([(callback1,
None), (callback2,
None)]), set(impl.callbacks))
415 self.assertEquals(queue_size, impl.queue_size)
416 self.assertEquals(buff_size, impl.buff_size)
417 self.assert_(impl.tcp_nodelay)
418 self.assertEquals(3, impl.ref_count)
419 self.failIf(impl.closed)
423 from rospy.topics
import Poller
426 for x
in p.error_iter():
429 for x
in p.error_iter():
def test_add_connection(self)
def set_cleanup_callback(self, cb)
def test_Subscriber(self)
def test_Subscriber_unregister(self)
def __init__(self, endpoint_id)
def write_data(self, data)