proxy_publisher.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 import rospy
3 from threading import Timer
4 
5 from flexbe_core.logger import Logger
6 
7 
8 class ProxyPublisher(object):
9  """
10  A proxy for publishing topics.
11  """
12  _topics = {}
13 
14  def __init__(self, topics={}, _latch=False, _queue_size=100):
15  """
16  Initializes the proxy with optionally a given set of topics.
17  Automatically creates a publisher for sending status messages.
18 
19  @type topics: dictionary string - message class
20  @param topics: A dictionay containing a collection of topic - message type pairs.
21 
22  @type _latch: bool
23  @param: _latch: Defines if messages on the given topics should be latched.
24 
25  @type _queue_size: int
26  @param: _queue_size: Defines the queue size of the new publishers.
27  """
28  for topic, msg_type in topics.items():
29  self.createPublisher(topic, msg_type, _latch, _queue_size)
30 
31  def createPublisher(self, topic, msg_type, _latch=False, _queue_size=100):
32  """
33  Adds a new publisher to the proxy.
34 
35  @type topic: string
36  @param topic: The topic to publish on.
37 
38  @type msg_type: a message class
39  @param msg_type: The type of messages of this topic.
40 
41  @type _latch: bool
42  @param: _latch: Defines if messages on the given topics should be latched.
43 
44  @type _queue_size: int
45  @param: _queue_size: Defines the queue size of the publisher.
46  """
47  if topic not in ProxyPublisher._topics:
48  ProxyPublisher._topics[topic] = rospy.Publisher(topic, msg_type, latch=_latch, queue_size=_queue_size)
49 
50  def is_available(self, topic):
51  """
52  Checks if the publisher on the given topic is available.
53 
54  @type topic: string
55  @param topic: The topic of interest.
56  """
57  return topic in ProxyPublisher._topics
58 
59  def publish(self, topic, msg):
60  """
61  Publishes a message on the specified topic.
62 
63  @type topic: string
64  @param topic: The topic to publish on.
65 
66  @type msg: message class (defined when created publisher)
67  @param msg: The message to publish.
68  """
69  if topic not in ProxyPublisher._topics:
70  Logger.logwarn('ProxyPublisher: topic %s not yet registered!' % topic)
71  return
72  try:
73  ProxyPublisher._topics[topic].publish(msg)
74  except Exception as e:
75  Logger.logwarn('Something went wrong when publishing to %s!\n%s' % (topic, str(e)))
76 
77  def wait_for_any(self, topic, timeout=5.0):
78  """
79  Blocks until there are any subscribers to the given topic.
80 
81  @type topic: string
82  @param topic: The topic to publish on.
83 
84  @type timeout: float
85  @param timeout: How many seconds should be the maximum blocked time.
86  """
87  pub = ProxyPublisher._topics.get(topic)
88  if pub is None:
89  Logger.logerr("Publisher %s not yet registered, need to add it first!" % topic)
90  return False
91  t = Timer(.5, self._print_wait_warning, [topic])
92  t.start()
93  available = self._wait_for_subscribers(pub, timeout)
94  warning_sent = False
95  try:
96  t.cancel()
97  except Exception:
98  # already printed the warning
99  warning_sent = True
100 
101  if not available:
102  Logger.logerr("Waiting for subscribers on %s timed out!" % topic)
103  return False
104  else:
105  if warning_sent:
106  Logger.loginfo("Finally found subscriber on %s..." % (topic))
107  return True
108 
109  def _print_wait_warning(self, topic):
110  Logger.logwarn("Waiting for subscribers on %s..." % (topic))
111 
112  def _wait_for_subscribers(self, pub, timeout=5.0):
113  starting_time = rospy.get_rostime()
114  rate = rospy.Rate(100)
115  while not rospy.is_shutdown():
116  elapsed = rospy.get_rostime() - starting_time
117  if elapsed.to_sec() >= timeout:
118  return False
119  if pub.get_num_connections() >= 1:
120  return True
121  rate.sleep()
122  return False
def __init__(self, topics={}, _latch=False, _queue_size=100)
def createPublisher(self, topic, msg_type, _latch=False, _queue_size=100)


flexbe_core
Author(s): Philipp Schillinger
autogenerated on Sun Dec 13 2020 04:01:39