27 import _thread
as thread
39 def __init__(self, topic, msg_type, latch=False):
41 self.
_publisher = rospy.Publisher(topic, msg_type, latch=
True)
46 def pub(self, message, rate=None):
48 self.
_period = (1. / rate)
if rate
else None
61 Gives _RatePublisher a chance to publish a stored message.
63 This method returns the remaining time until the next scheduled
68 elapsed = (rospy.Time.now() - self.
_last_pub).to_sec()
77 A class for managing several ROS publishers repeating messages
80 The main purpose of this class is for unit testing.
88 Adds a topic for future publication.
90 This creates a rospy.Publisher internally. Note that the
91 publisher will latch the topic; if that wasn't the case,
92 clients might need to sleep before publishing something
93 for the first time to give subscribers enough time to
101 def pub(self, topic, message, rate=None):
103 Publishes `message' on the given topic.
105 If `rate' is not None, the message will be repeated at the
106 given rate (expected to be in Hz) until pub() or stop()
109 Note that `rate' may also be a function, in which case
110 it'll be invoked for each publication to obtain the message.
116 Stops repeating any message on the given topic.
122 Publishes any scheduled messages and returns the amount of
123 time until it should be called again.
129 next_timeout = sys.maxsize
130 except AttributeError:
131 next_timeout = sys.maxint
133 next_timeout = min(topic.spin_once(), next_timeout)
148 while not rospy.core.is_shutdown()
and not self.
_shutdown:
153 except Exception
as e:
157 rospy.loginfo(
"Spawning thread for TopicTestManager...")
158 thread.start_new_thread(self.
spin, ())