35 """ diagnostic_updater for Python. 36 @author Brice Rebsamen <brice [dot] rebsamen [gmail]> 41 from diagnostic_msgs.msg
import DiagnosticArray
46 import http.client
as httplib
48 from ._diagnostic_status_wrapper
import *
51 """DiagnosticTask is an abstract base class for collecting diagnostic data. 53 Subclasses are provided for generating common diagnostic information. 54 A DiagnosticTask has a name, and a function that is called to cleate a 55 DiagnosticStatusWrapper. 59 """Constructs a DiagnosticTask setting its name in the process.""" 63 """Returns the name of the DiagnosticTask.""" 67 """Fills out this Task's DiagnosticStatusWrapper. 68 @param stat: the DiagnosticStatusWrapper to fill 69 @return the filled DiagnosticStatusWrapper 76 """A DiagnosticTask based on a function. 78 The FunctionDiagnosticTask calls the function when it updates. The 79 function updates the DiagnosticStatusWrapper and collects data. 81 This is useful for gathering information about a device or driver, like temperature, 86 """Constructs a GenericFunctionDiagnosticTask based on the given name and function. 87 @param name Name of the function. 88 @param fn Function to be called when run is called. 90 DiagnosticTask.__init__(self, name)
99 """Merges CompositeDiagnosticTask into a single DiagnosticTask. 101 The CompositeDiagnosticTask allows multiple DiagnosticTask instances to 102 be combined into a single task that produces a single single 103 DiagnosticStatusWrapped. The output of the combination has the max of 104 the status levels, and a concatenation of the non-zero-level messages. 106 For instance, this could be used to combine the calibration and offset data 111 """Constructs a CompositeDiagnosticTask with the given name.""" 112 DiagnosticTask.__init__(self, name)
116 """Runs each child and merges their outputs.""" 120 original_summary.summary(stat)
122 for task
in self.
tasks:
124 stat.summary(original_summary)
126 stat = task.run(stat)
128 combined_summary.mergeSummary(stat)
132 stat.summary(combined_summary)
136 """Adds a child CompositeDiagnosticTask. 138 This CompositeDiagnosticTask will be called each time this 139 CompositeDiagnosticTask is run. 146 """Internal use only. 148 Base class for diagnostic_updater::Updater and self_test::Dispatcher. 149 The class manages a collection of diagnostic updaters. It contains the 150 common functionality used for producing diagnostic updates and for 155 """Class used to represent a diagnostic task internally in 156 DiagnosticTaskVector. 164 stat.name = self.
name 173 """Allows an action to be taken when a task is added. The Updater class 174 uses this to immediately publish a diagnostic that says that the node 180 """Add a task to the DiagnosticTaskVector. 183 add(task): where task is a DiagnosticTask 184 add(name, fn): add a DiagnosticTask embodied by a name and function 187 task = DiagnosticTaskVector.DiagnosticTaskInternal(args[0].getName(), args[0].run)
189 task = DiagnosticTaskVector.DiagnosticTaskInternal(args[0], args[1])
192 self.tasks.append(task)
193 self.addedTaskCallback(task)
196 """Removes a task based on its name. 198 Removes the first task that matches the specified name. (New in 201 @param name Name of the task to remove. 202 @return Returns true if a task matched and was removed. 206 for i
in range(len(self.
tasks)):
207 if self.
tasks[i].name == name:
217 """Manages a list of diagnostic tasks, and calls them in a rate-limited manner. 219 This class manages a list of diagnostic tasks. Its update function 220 should be called frequently. At some predetermined rate, the update 221 function will cause all the diagnostic tasks to run, and will collate 222 and publish the resulting diagnostics. The publication rate is 223 determined by the "~diagnostic_period" ros parameter. 225 The class also allows an update to be forced when something significant 226 has happened, and allows a single message to be broadcast on all the 227 diagnostics if normal operation of the node is suspended for some 232 """Constructs an updater class.""" 233 DiagnosticTaskVector.__init__(self)
234 self.
publisher = rospy.Publisher(
"/diagnostics", DiagnosticArray, queue_size=10)
245 """Causes the diagnostics to update if the inter-update interval 253 """Forces the diagnostics to update. 255 Useful if the node has undergone a drastic state change that should be 256 published immediately. 260 warn_nohwid = len(self.
hwid)==0
265 for task
in self.
tasks:
267 status.name = task.name
269 status.message =
"No message was set" 270 status.hardware_id = self.
hwid 272 stat = task.run(status)
274 status_vec.append(status)
279 if self.
verbose and status.level:
280 rospy.logwarn(
"Non-zero diagnostic status. Name: '%s', status %i: '%s'" %
281 (status.name, status.level, status.message))
284 rospy.logwarn(
"diagnostic_updater: No HW_ID was set. This is probably a bug. Please report it. For devices that do not have a HW_ID, set this value to 'none'. This warning only occurs once all diagnostics are OK so it is okay to wait until the device is open before calling setHardwareID.");
290 """Outputs a message on all the known DiagnosticStatus. 292 Useful if something drastic is happening such as shutdown or a self-test. 294 @param lvl Level of the diagnostic being output. 295 @param msg Status message to output. 300 for task
in self.
tasks:
302 status.name = task.name
303 status.summary(lvl, msg)
304 status_vec.append(status)
312 """Recheck the diagnostic_period on the parameter server.""" 320 now = rospy.Time.now()
323 self.
period = rospy.get_param_cached(
"~diagnostic_period", 1)
325 except (httplib.CannotSendRequest, httplib.ResponseNotReady):
329 """Publishes a single diagnostic status or a vector of diagnostic statuses.""" 330 if not type(msg)
is list:
334 stat.name = rospy.get_name()[1:]+
": " + stat.name
336 da = DiagnosticArray()
338 da.header.stamp = rospy.Time.now()
339 self.publisher.publish(da)
343 stat.name = task.name
344 stat.summary(0,
"Node starting up")
def addedTaskCallback(self, task)
def removeByName(self, name)
def __init__(self, name, fn)
def broadcast(self, lvl, msg)
def setHardwareID(self, hwid)
def addedTaskCallback(self, task)
def _check_diagnostic_period(self)
def __init__(self, name, fn)