statistics_registry.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # pal_statistics: pal_statistics.py
5 #
6 # Copyright (c) 2018 PAL Robotics SL. All Rights Reserved
7 #
8 # Permission to use, copy, modify, and/or distribute this software for
9 # any purpose with or without fee is hereby granted, provided that the
10 # above copyright notice and this permission notice appear in all
11 # copies.
12 #
13 # THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
14 # REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
15 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY
16 # SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
17 # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
18 # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
19 # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20 #
21 # Authors:
22 # * Victor Lopez, Jordan Palacios
23 
24 from builtins import object
25 
26 import rospy
27 from pal_statistics_msgs.msg import Statistics, Statistic, StatisticsValues, StatisticsNames
28 
29 
30 class Registration(object):
31  """
32  A utility class to handle to a registered funciton or variable, when
33  out of scope unregisters the variable.
34  """
35  def __init__(self, name, registry):
36  self.name = name
37  self.registry = registry
38 
39  def __del__(self):
40  rospy.logdebug("Unregistering " + self.name)
41  self.registry.unregister(self.name)
42 
43 class StatisticsRegistry(object):
44 
45  def __init__(self, topic):
46  self.topic = topic
47  self.functions = {}
48  self.full_pub = rospy.Publisher(topic + "/full", Statistics, queue_size=1)
49  self.names_pub = rospy.Publisher(topic + "/names", StatisticsNames, queue_size=1, latch=True)
50  self.values_pub = rospy.Publisher(topic + "/values", StatisticsValues, queue_size=1)
51  self.names_changed = True
53 
54  # Python will copy the value of variables of simple types
55  # such as numbers, we cannot store them as such
56  # def registerVariable(self, name, variable):
57  # self.variables[name] = variable
58 
59  def registerFunction(self, name, func, registration_list=None):
60  """
61  Registers a function that will be called to read the value to
62  be published when the publish() method is called
63 
64  @param registration_list: If not None, will be extended to include a
65  Registration object for the registered function
66 
67  The function takes no arguments and returns a value convertable to float
68  It can also be used to register a variable using lambda
69 
70  registerFunction("my_function", self.my_function)
71  registerFunction("my_variable", (lambda: variable))
72  """
73  self.functions[name] = func
74  if registration_list is not None:
75  registration_list.append(Registration(name, self))
76  self.names_changed = True
77 
78  def unregister(self, name):
79  """
80  Unregisters a function or variable so it's no longer read
81  """
82  try:
83  self.functions.pop(name)
84  except KeyError as e:
85  rospy.logerr("Error unregistering " + name + e.what())
86  self.names_changed = True
87 
88  def publish(self):
89  if self.full_pub.get_num_connections() > 0:
90  self.full_pub.publish(self.createFullMsg())
91 
92  #When name changes, we need to publish to keep the latched topic in the latest version
93  if self.names_changed or self.values_pub.get_num_connections() > 0:
94  names_msg, values_msg = self.createOptimizedMsgs()
95  if names_msg:
96  self.names_pub.publish(names_msg)
97  if values_msg:
98  self.values_pub.publish(values_msg)
99 
100 
101  def publishCustomStatistic(self, name, value):
102  """
103  Publishes a one-time statistic
104  """
105  msg = Statistics()
106  msg.header.stamp = rospy.Time.now()
107  s = Statistic()
108  s.name = name
109  s.value = value
110  msg.statistics.append(s)
111  self.full_pub.publish(msg)
112 
113  def publishCustomStatistics(self, msg):
114  """
115  Publishes a one-time statistics msg
116  """
117  self.full_pub.publish(msg)
118 
119  def createFullMsg(self):
120  """
121  Create and return a message after reading all registrations
122  """
123  msg = Statistics()
124  msg.header.stamp = rospy.Time.now()
125  for name, func in self.functions.items():
126  s = Statistic()
127  s.name = name
128  s.value = func()
129  msg.statistics.append(s)
130  return msg
131 
133  """
134  Create and return a names, values message after reading all registrations
135  """
136  values_msg = StatisticsValues()
137  values_msg.header.stamp = rospy.Time.now()
138  if self.names_changed:
139  names_msg = StatisticsNames()
140  names_msg.header.stamp = values_msg.header.stamp
141  self.last_names_version += 1
142  names_msg.names_version = self.last_names_version
143  else:
144  names_msg = None
145  values_msg.names_version = self.last_names_version
146 
147  for name, func in self.functions.items():
148  if names_msg:
149  names_msg.names.append(name)
150  values_msg.values.append(func())
151  self.names_changed = False
152  return names_msg, values_msg
def registerFunction(self, name, func, registration_list=None)


pal_statistics
Author(s):
autogenerated on Mon Feb 28 2022 23:04:05