rate_publishers.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # twist_mux: rate_publishers.py
5 #
6 # Copyright (c) 2014 PAL Robotics SL.
7 # All Rights Reserved
8 #
9 # Permission to use, copy, modify, and/or distribute this software for
10 # any purpose with or without fee is hereby granted, provided that the
11 # above copyright notice and this permission notice appear in all
12 # copies.
13 #
14 # THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
15 # REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
16 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY
17 # SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
18 # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
19 # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
20 # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
21 #
22 # Authors:
23 # * Siegfried-A. Gevatter
24 
25 import sys
26 try:
27  import _thread as thread # Python3
28 except:
29  import thread # Python2
30 
31 import rospy
32 
33 class _RatePublisher(object):
34 
35  # How many seconds before the expected time a message may
36  # be published.
37  _tolerance = 0.01
38 
39  def __init__(self, topic, msg_type, latch=False):
40  self._topic = topic
41  self._publisher = rospy.Publisher(topic, msg_type, latch=True)
42  self._message = None
43  self._period = None # 1 / freq
44  self._last_pub = 0
45 
46  def pub(self, message, rate=None):
47  self._message = message
48  self._period = (1. / rate) if rate else None
49  self.publish_once()
50 
51  def stop(self):
52  self._period = None
53 
54  def publish_once(self):
55  msg = self._message() if callable(self._message) else self._message
56  self._publisher.publish(msg)
57  self._last_pub = rospy.Time.now()
58 
59  def spin_once(self):
60  """
61  Gives _RatePublisher a chance to publish a stored message.
62 
63  This method returns the remaining time until the next scheduled
64  publication (or -1).
65  """
66  if not self._period:
67  return -1
68  elapsed = (rospy.Time.now() - self._last_pub).to_sec()
69  if elapsed >= (self._period - self._tolerance):
70  self.publish_once()
71  return self._period
72  return self._period - elapsed
73 
74 
75 class RatePublishers(object):
76  """
77  A class for managing several ROS publishers repeating messages
78  with different rates.
79 
80  The main purpose of this class is for unit testing.
81  """
82 
83  def __init__(self):
84  self._publishers = {}
85 
86  def add_topic(self, topic, msg_type):
87  """
88  Adds a topic for future publication.
89 
90  This creates a rospy.Publisher internally. Note that the
91  publisher will latch the topic; if that wasn't the case,
92  clients might need to sleep before publishing something
93  for the first time to give subscribers enough time to
94  connect.
95  """
96  assert topic not in self._publishers
97  rate_publisher = _RatePublisher(topic, msg_type, latch=True)
98  self._publishers[topic] = rate_publisher
99  return rate_publisher
100 
101  def pub(self, topic, message, rate=None):
102  """
103  Publishes `message' on the given topic.
104 
105  If `rate' is not None, the message will be repeated at the
106  given rate (expected to be in Hz) until pub() or stop()
107  are invoked again.
108 
109  Note that `rate' may also be a function, in which case
110  it'll be invoked for each publication to obtain the message.
111  """
112  self._publishers[topic].pub(message, rate)
113 
114  def stop(self, topic):
115  """
116  Stops repeating any message on the given topic.
117  """
118  self._publishers[topic].stop()
119 
120  def spin_once(self):
121  """
122  Publishes any scheduled messages and returns the amount of
123  time until it should be called again.
124  """
125  # TODO: Create a class that spawns a global thread and provides
126  # createTimer and createWallTimer, just like NodeHandle
127  # does in rospy?
128  try:
129  next_timeout = sys.maxsize # Python3
130  except AttributeError:
131  next_timeout = sys.maxint # Python2
132  for topic in self._publishers.values():
133  next_timeout = min(topic.spin_once(), next_timeout)
134  return next_timeout
135 
136 
137 # TODO: Make this class more generic (keeping track of timeouts itself)?
138 class TimeoutManager(object):
139 
140  def __init__(self):
141  self._shutdown = False
142  self._members = []
143 
144  def add(self, m):
145  self._members.append(m)
146 
147  def spin(self):
148  while not rospy.core.is_shutdown() and not self._shutdown:
149  try:
150  for m in self._members:
151  m.spin_once()
152  rospy.sleep(0.01) # FIXME
153  except Exception as e:
154  rospy.logfatal(e)
155 
156  def spin_thread(self):
157  rospy.loginfo("Spawning thread for TopicTestManager...")
158  thread.start_new_thread(self.spin, ())
159 
160  def shutdown(self):
161  self._shutdown = True
def __init__(self, topic, msg_type, latch=False)
def pub(self, message, rate=None)
def add_topic(self, topic, msg_type)
def pub(self, topic, message, rate=None)


twist_mux
Author(s): Enrique Fernandez , Siegfried-A. Gevatter Pujals
autogenerated on Wed Oct 26 2022 02:14:17