12 __all__ = (
'ConnectionBasedTransport',)
23 """Called when you call ConnectionBasedTransport()""" 24 obj = type.__call__(cls, *args, **kwargs)
27 parser = argparse.ArgumentParser()
28 parser.add_argument(
'--inout', action=
'store_true')
29 args = parser.parse_args(rospy.myargv()[1:])
32 tp_manager = rospy.topics.get_topic_manager()
33 print(
'Publications:')
34 for topic, topic_type
in tp_manager.get_publications():
35 if topic ==
'/rosout':
38 print(
' * {0} [{1}]'.format(topic, topic_type))
39 print(
'Subscriptions:')
40 for topic, topic_type
in tp_manager.get_subscriptions():
42 print(
' * {0} [{1}]'.format(topic, topic_type))
51 __metaclass__ = MetaConnectionBasedTransport
54 super(ConnectionBasedTransport, self).
__init__()
59 rospy.Timer(rospy.Duration(5),
66 'No publishers registered.' 67 ' Have you called ConnectionBasedTransport.advertise?')
68 if rospy.get_param(
'~always_subscribe',
False):
76 '[{name}] subscribes topics only with' 77 " child subscribers. Set '~always_subscribe' as True" 78 ' to have it subscribe always.'.format(name=rospy.get_name()))
82 raise NotImplementedError
86 raise NotImplementedError
92 rospy.logdebug(
'[{topic}] is subscribed'.format(topic=args[0]))
100 rospy.logdebug(
'[{topic}] is unsubscribed'.format(topic=args[0]))
101 if rospy.get_param(
'~always_subscribe',
False):
106 if pub.get_num_connections() > 0:
115 assert len(args) < 3
or args[2]
is None 116 assert kwargs.get(
'subscriber_listener')
is None 117 kwargs[
'subscriber_listener'] = self
119 pub = rospy.Publisher(*args, **kwargs)
120 self._publishers.append(pub)