24 from builtins
import object
27 from pal_statistics_msgs.msg
import Statistics, Statistic, StatisticsValues, StatisticsNames
32 A utility class to handle to a registered funciton or variable, when 33 out of scope unregisters the variable. 40 rospy.logdebug(
"Unregistering " + self.
name)
48 self.
full_pub = rospy.Publisher(topic +
"/full", Statistics, queue_size=1)
49 self.
names_pub = rospy.Publisher(topic +
"/names", StatisticsNames, queue_size=1, latch=
True)
50 self.
values_pub = rospy.Publisher(topic +
"/values", StatisticsValues, queue_size=1)
61 Registers a function that will be called to read the value to 62 be published when the publish() method is called 64 @param registration_list: If not None, will be extended to include a 65 Registration object for the registered function 67 The function takes no arguments and returns a value convertable to float 68 It can also be used to register a variable using lambda 70 registerFunction("my_function", self.my_function) 71 registerFunction("my_variable", (lambda: variable)) 74 if registration_list
is not None:
80 Unregisters a function or variable so it's no longer read 85 rospy.logerr(
"Error unregistering " + name + e.what())
89 if self.
full_pub.get_num_connections() > 0:
103 Publishes a one-time statistic 106 msg.header.stamp = rospy.Time.now()
110 msg.statistics.append(s)
115 Publishes a one-time statistics msg 121 Create and return a message after reading all registrations 124 msg.header.stamp = rospy.Time.now()
125 for name, func
in self.
functions.items():
129 msg.statistics.append(s)
134 Create and return a names, values message after reading all registrations 136 values_msg = StatisticsValues()
137 values_msg.header.stamp = rospy.Time.now()
139 names_msg = StatisticsNames()
140 names_msg.header.stamp = values_msg.header.stamp
147 for name, func
in self.
functions.items():
149 names_msg.names.append(name)
150 values_msg.values.append(func())
152 return names_msg, values_msg
def __init__(self, name, registry)
def registerFunction(self, name, func, registration_list=None)
def publishCustomStatistics(self, msg)
def __init__(self, topic)
def publishCustomStatistic(self, name, value)
def unregister(self, name)
def createOptimizedMsgs(self)