transport.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import abc
4 import argparse
5 import sys
6 
7 import rospy
8 
9 from jsk_topic_tools.name_utils import unresolve_name
10 
11 
12 __all__ = ('ConnectionBasedTransport',)
13 
14 
15 SUBSCRIBED = 0
16 NOT_SUBSCRIBED = 1
17 
18 
19 # define a new metaclass which overrides the '__call__' function
20 # See: http://martyalchin.com/2008/jan/10/simple-plugin-framework/
21 class MetaConnectionBasedTransport(abc.ABCMeta):
22  def __call__(cls, *args, **kwargs):
23  """Called when you call ConnectionBasedTransport()"""
24  obj = type.__call__(cls, *args, **kwargs)
25 
26  # display node input/output topics
27  parser = argparse.ArgumentParser()
28  parser.add_argument('--inout', action='store_true')
29  args = parser.parse_args(rospy.myargv()[1:])
30  if args.inout:
31  obj.subscribe()
32  tp_manager = rospy.topics.get_topic_manager()
33  print('Publications:')
34  for topic, topic_type in tp_manager.get_publications():
35  if topic == '/rosout':
36  continue
37  topic = unresolve_name(rospy.get_name(), topic)
38  print(' * {0} [{1}]'.format(topic, topic_type))
39  print('Subscriptions:')
40  for topic, topic_type in tp_manager.get_subscriptions():
41  topic = unresolve_name(rospy.get_name(), topic)
42  print(' * {0} [{1}]'.format(topic, topic_type))
43  sys.exit(0)
44 
45  obj._post_init()
46  return obj
47 
48 
49 class ConnectionBasedTransport(rospy.SubscribeListener):
50 
51  __metaclass__ = MetaConnectionBasedTransport
52 
53  def __init__(self):
54  super(ConnectionBasedTransport, self).__init__()
55  self.is_initialized = False
56  self._publishers = []
57  self._ever_subscribed = False
58  self._connection_status = NOT_SUBSCRIBED
59  rospy.Timer(rospy.Duration(5),
60  self._warn_never_subscribed_cb, oneshot=True)
61 
62  def _post_init(self):
63  self.is_initialized = True
64  if not self._publishers:
65  raise RuntimeError(
66  'No publishers registered.'
67  ' Have you called ConnectionBasedTransport.advertise?')
68  if rospy.get_param('~always_subscribe', False):
69  self.subscribe()
70  self._connection_status = SUBSCRIBED
71  self._ever_subscribed = True
72 
73  def _warn_never_subscribed_cb(self, timer_event):
74  if not self._ever_subscribed:
75  rospy.logwarn(
76  '[{name}] subscribes topics only with'
77  " child subscribers. Set '~always_subscribe' as True"
78  ' to have it subscribe always.'.format(name=rospy.get_name()))
79 
80  @abc.abstractmethod
81  def subscribe(self):
82  raise NotImplementedError
83 
84  @abc.abstractmethod
85  def unsubscribe(self):
86  raise NotImplementedError
87 
88  def is_subscribed(self):
89  return self._connection_status == SUBSCRIBED
90 
91  def peer_subscribe(self, *args, **kwargs):
92  rospy.logdebug('[{topic}] is subscribed'.format(topic=args[0]))
93  if self._connection_status == NOT_SUBSCRIBED:
94  self.subscribe()
95  self._connection_status = SUBSCRIBED
96  if not self._ever_subscribed:
97  self._ever_subscribed = True
98 
99  def peer_unsubscribe(self, *args, **kwargs):
100  rospy.logdebug('[{topic}] is unsubscribed'.format(topic=args[0]))
101  if rospy.get_param('~always_subscribe', False):
102  return # do not unsubscribe
103  if self._connection_status == NOT_SUBSCRIBED:
104  return # no need to unsubscribe
105  for pub in self._publishers:
106  if pub.get_num_connections() > 0:
107  break
108  else:
109  self.unsubscribe()
110  self._connection_status = NOT_SUBSCRIBED
111 
112  def advertise(self, *args, **kwargs):
113  # subscriber_listener should be 'self'
114  # to detect connection and disconnection of the publishing topics
115  assert len(args) < 3 or args[2] is None
116  assert kwargs.get('subscriber_listener') is None
117  kwargs['subscriber_listener'] = self
118 
119  pub = rospy.Publisher(*args, **kwargs)
120  self._publishers.append(pub)
121  return pub
def unresolve_name(node_name, name)
Definition: name_utils.py:5


jsk_topic_tools
Author(s): Kei Okada , Yusuke Furuta
autogenerated on Tue Feb 6 2018 03:45:19