27 import _thread
as thread
39 def __init__(self, topic, msg_type, latch=False):
41 self.
_publisher = rospy.Publisher(topic, msg_type, latch=
True)
46 def pub(self, message, rate=None):
48 self.
_period = (1. / rate)
if rate
else None 61 Gives _RatePublisher a chance to publish a stored message. 63 This method returns the remaining time until the next scheduled 68 elapsed = (rospy.Time.now() - self.
_last_pub).to_sec()
77 A class for managing several ROS publishers repeating messages 80 The main purpose of this class is for unit testing. 88 Adds a topic for future publication. 90 This creates a rospy.Publisher internally. Note that the 91 publisher will latch the topic; if that wasn't the case, 92 clients might need to sleep before publishing something 93 for the first time to give subscribers enough time to 101 def pub(self, topic, message, rate=None):
103 Publishes `message' on the given topic. 105 If `rate' is not None, the message will be repeated at the 106 given rate (expected to be in Hz) until pub() or stop() 109 Note that `rate' may also be a function, in which case 110 it'll be invoked for each publication to obtain the message. 116 Stops repeating any message on the given topic. 122 Publishes any scheduled messages and returns the amount of 123 time until it should be called again. 129 next_timeout = sys.maxsize
130 except AttributeError:
131 next_timeout = sys.maxint
133 next_timeout = min(topic.spin_once(), next_timeout)
148 while not rospy.core.is_shutdown()
and not self.
_shutdown:
153 except Exception
as e:
157 rospy.loginfo(
"Spawning thread for TopicTestManager...")
158 thread.start_new_thread(self.
spin, ())
def __init__(self, topic, msg_type, latch=False)
def pub(self, message, rate=None)
def add_topic(self, topic, msg_type)
def pub(self, topic, message, rate=None)