Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 import sys
00026 import thread
00027
00028 import rospy
00029
00030 class _RatePublisher(object):
00031
00032
00033
00034 _tolerance = 0.01
00035
00036 def __init__(self, topic, msg_type, latch=False):
00037 self._topic = topic
00038 self._publisher = rospy.Publisher(topic, msg_type, latch=True)
00039 self._message = None
00040 self._period = None
00041 self._last_pub = 0
00042
00043 def pub(self, message, rate=None):
00044 self._message = message
00045 self._period = (1. / rate) if rate else None
00046 self.publish_once()
00047
00048 def stop(self):
00049 self._period = None
00050
00051 def publish_once(self):
00052 msg = self._message() if callable(self._message) else self._message
00053 self._publisher.publish(msg)
00054 self._last_pub = rospy.Time.now()
00055
00056 def spin_once(self):
00057 """
00058 Gives _RatePublisher a chance to publish a stored message.
00059
00060 This method returns the remaining time until the next scheduled
00061 publication (or None).
00062 """
00063 if not self._period:
00064 return None
00065 elapsed = (rospy.Time.now() - self._last_pub).to_sec()
00066 if elapsed >= (self._period - self._tolerance):
00067 self.publish_once()
00068 return self._period
00069 return self._period - elapsed
00070
00071
00072 class RatePublishers(object):
00073 """
00074 A class for managing several ROS publishers repeating messages
00075 with different rates.
00076
00077 The main purpose of this class is for unit testing.
00078 """
00079
00080 def __init__(self):
00081 self._publishers = {}
00082
00083 def add_topic(self, topic, msg_type):
00084 """
00085 Adds a topic for future publication.
00086
00087 This creates a rospy.Publisher internally. Note that the
00088 publisher will latch the topic; if that wasn't the case,
00089 clients might need to sleep before publishing something
00090 for the first time to give subscribers enough time to
00091 connect.
00092 """
00093 assert topic not in self._publishers
00094 rate_publisher = _RatePublisher(topic, msg_type, latch=True)
00095 self._publishers[topic] = rate_publisher
00096 return rate_publisher
00097
00098 def pub(self, topic, message, rate=None):
00099 """
00100 Publishes `message' on the given topic.
00101
00102 If `rate' is not None, the message will be repeated at the
00103 given rate (expected to be in Hz) until pub() or stop()
00104 are invoked again.
00105
00106 Note that `rate' may also be a function, in which case
00107 it'll be invoked for each publication to obtain the message.
00108 """
00109 self._publishers[topic].pub(message, rate)
00110
00111 def stop(self, topic):
00112 """
00113 Stops repeating any message on the given topic.
00114 """
00115 self._publishers[topic].stop()
00116
00117 def spin_once(self):
00118 """
00119 Publishes any scheduled messages and returns the amount of
00120 time until it should be called again.
00121 """
00122
00123
00124
00125 next_timeout = sys.maxint
00126 for topic in self._publishers.itervalues():
00127 next_timeout = min(topic.spin_once(), next_timeout)
00128 return next_timeout
00129
00130
00131
00132 class TimeoutManager(object):
00133
00134 def __init__(self):
00135 self._shutdown = False
00136 self._members = []
00137
00138 def add(self, m):
00139 self._members.append(m)
00140
00141 def spin(self):
00142 while not rospy.core.is_shutdown() and not self._shutdown:
00143 try:
00144 for m in self._members:
00145 m.spin_once()
00146 rospy.sleep(0.01)
00147 except Exception, e:
00148 rospy.logfatal(e)
00149
00150 def spin_thread(self):
00151 rospy.loginfo("Spawning thread for TopicTestManager...")
00152 thread.start_new_thread(self.spin, ())
00153
00154 def shutdown(self):
00155 self._shutdown = True