Go to the documentation of this file.00001
00002
00003 import abc
00004 import argparse
00005 import sys
00006
00007 import rospy
00008
00009 from jsk_topic_tools.name_utils import unresolve_name
00010
00011
00012 __all__ = ('ConnectionBasedTransport',)
00013
00014
00015 SUBSCRIBED = 0
00016 NOT_SUBSCRIBED = 1
00017
00018
00019
00020
00021 class MetaConnectionBasedTransport(abc.ABCMeta):
00022 def __call__(cls, *args, **kwargs):
00023 """Called when you call ConnectionBasedTransport()"""
00024 obj = type.__call__(cls, *args, **kwargs)
00025
00026
00027 parser = argparse.ArgumentParser()
00028 parser.add_argument('--inout', action='store_true')
00029 args = parser.parse_args(rospy.myargv()[1:])
00030 if args.inout:
00031 obj.subscribe()
00032 tp_manager = rospy.topics.get_topic_manager()
00033 print('Publications:')
00034 for topic, topic_type in tp_manager.get_publications():
00035 if topic == '/rosout':
00036 continue
00037 topic = unresolve_name(rospy.get_name(), topic)
00038 print(' * {0} [{1}]'.format(topic, topic_type))
00039 print('Subscriptions:')
00040 for topic, topic_type in tp_manager.get_subscriptions():
00041 topic = unresolve_name(rospy.get_name(), topic)
00042 print(' * {0} [{1}]'.format(topic, topic_type))
00043 sys.exit(0)
00044
00045 obj._post_init()
00046 return obj
00047
00048
00049 class ConnectionBasedTransport(rospy.SubscribeListener):
00050
00051 __metaclass__ = MetaConnectionBasedTransport
00052
00053 def __init__(self):
00054 super(ConnectionBasedTransport, self).__init__()
00055 self.is_initialized = False
00056 self._publishers = []
00057 self._ever_subscribed = False
00058 self._connection_status = NOT_SUBSCRIBED
00059 rospy.Timer(rospy.Duration(5),
00060 self._warn_never_subscribed_cb, oneshot=True)
00061
00062 def _post_init(self):
00063 self.is_initialized = True
00064 if not self._publishers:
00065 raise RuntimeError(
00066 'No publishers registered.'
00067 ' Have you called ConnectionBasedTransport.advertise?')
00068 if rospy.get_param('~always_subscribe', False):
00069 self.subscribe()
00070 self._connection_status = SUBSCRIBED
00071 self._ever_subscribed = True
00072
00073 def _warn_never_subscribed_cb(self, timer_event):
00074 if not self._ever_subscribed:
00075 rospy.logwarn(
00076 '[{name}] subscribes topics only with'
00077 " child subscribers. Set '~always_subscribe' as True"
00078 ' to have it subscribe always.'.format(name=rospy.get_name()))
00079
00080 @abc.abstractmethod
00081 def subscribe(self):
00082 raise NotImplementedError
00083
00084 @abc.abstractmethod
00085 def unsubscribe(self):
00086 raise NotImplementedError
00087
00088 def is_subscribed(self):
00089 return self._connection_status == SUBSCRIBED
00090
00091 def peer_subscribe(self, *args, **kwargs):
00092 rospy.logdebug('[{topic}] is subscribed'.format(topic=args[0]))
00093 if self._connection_status == NOT_SUBSCRIBED:
00094 self.subscribe()
00095 self._connection_status = SUBSCRIBED
00096 if not self._ever_subscribed:
00097 self._ever_subscribed = True
00098
00099 def peer_unsubscribe(self, *args, **kwargs):
00100 rospy.logdebug('[{topic}] is unsubscribed'.format(topic=args[0]))
00101 if rospy.get_param('~always_subscribe', False):
00102 return
00103 if self._connection_status == NOT_SUBSCRIBED:
00104 return
00105 for pub in self._publishers:
00106 if pub.get_num_connections() > 0:
00107 break
00108 else:
00109 self.unsubscribe()
00110 self._connection_status = NOT_SUBSCRIBED
00111
00112 def advertise(self, *args, **kwargs):
00113
00114
00115 assert len(args) < 3 or args[2] is None
00116 assert kwargs.get('subscriber_listener') is None
00117 kwargs['subscriber_listener'] = self
00118
00119 pub = rospy.Publisher(*args, **kwargs)
00120 self._publishers.append(pub)
00121 return pub