example.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # Software License Agreement (BSD License)
4 #
5 # Copyright (c) 2012, Willow Garage, Inc.
6 # All rights reserved.
7 #
8 # Redistribution and use in source and binary forms, with or without
9 # modification, are permitted provided that the following conditions
10 # are met:
11 #
12 # * Redistributions of source code must retain the above copyright
13 # notice, this list of conditions and the following disclaimer.
14 # * Redistributions in binary form must reproduce the above
15 # copyright notice, this list of conditions and the following
16 # disclaimer in the documentation and/or other materials provided
17 # with the distribution.
18 # * Neither the name of Willow Garage, Inc. nor the names of its
19 # contributors may be used to endorse or promote products derived
20 # from this software without specific prior written permission.
21 #
22 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
28 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
30 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
31 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
32 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
33 # POSSIBILITY OF SUCH DAMAGE.
34 
35 # -*- coding: utf-8 -*-
36 
37 """
38 @author Brice Rebsamen <brice [dot] rebsamen [gmail]>
39 """
40 
41 import roslib
42 roslib.load_manifest('diagnostic_updater')
43 import rospy
44 import diagnostic_updater
45 import diagnostic_msgs
46 import std_msgs
47 
48 
49 time_to_launch = 0
50 
51 '''Used as a tutorial for loading and using diagnostic updater.
52 
53 DummyClass and dummy_diagnostics show how to use a diagnostic_updater
54 class.
55 '''
56 
57 def dummy_diagnostic(stat):
58  # stat is supposed to be of type diagnostic_updater.DiagnosticStatusWrapper
59  # DiagnosticStatusWrapper is a derived class of
60  # diagnostic_msgs.msg.DiagnosticStatus that provides a set of convenience
61  # methods.
62 
63  # summary sets the level and message.
64  # As opposed to the C++ API, there is no summaryf function: use python's
65  # string formatting instead
66  if time_to_launch < 10:
67  # summary for formatted text.
68  stat.summary(diagnostic_msgs.msg.DiagnosticStatus.ERROR,
69  "Buckle your seat belt. Launch in %f seconds!" % time_to_launch)
70  else:
71  # summary for unformatted text. It's just the same ;)
72  stat.summary(diagnostic_msgs.msg.DiagnosticStatus.OK,
73  "Launch is in a long time. Have a soda.")
74 
75  # add is used to append key-value pairs.
76  # Again, as oppose to the C++ API, there is no addf function. The second
77  # argument is always converted to a str using the str() function.
78  stat.add("Diagnostic Name", "dummy")
79  # add transparently handles conversion to string (using str()).
80  stat.add("Time to Launch", time_to_launch)
81  # add allows arbitrary printf style formatting.
82  stat.add("Geeky thing to say", "The square of the time to launch %f is %f" % \
83  (time_to_launch, time_to_launch * time_to_launch) )
84 
85  # As opposed to the C++ diagnostic function which modifies its argument,
86  # the python version must return the modified message.
87  return stat
88 
89 
90 class DummyClass:
91  def produce_diagnostics(self, stat):
92  stat.summary(diagnostic_msgs.msg.DiagnosticStatus.WARN, "This is a silly updater.")
93  stat.add("Stupidicity of this updater", 1000.)
94  return stat
95 
96 
98  def __init__(self):
99  diagnostic_updater.DiagnosticTask.__init__(self,
100  "Updater Derived from DiagnosticTask")
101 
102  def run(self, stat):
103  stat.summary(diagnostic_msgs.msg.DiagnosticStatus.WARN,
104  "This is another silly updater.")
105  stat.add("Stupidicity of this updater", 2000.)
106  return stat
107 
108 
110  if time_to_launch > 5:
111  stat.summary(diagnostic_msgs.msg.DiagnosticStatus.OK, "Lower-bound OK")
112  else:
113  stat.summary(diagnostic_msgs.msg.DiagnosticStatus.ERROR, "Too low")
114  stat.add("Low-Side Margin", time_to_launch - 5)
115  return stat
116 
117 
119  if time_to_launch < 10:
120  stat.summary(diagnostic_msgs.msg.DiagnosticStatus.OK, "Upper-bound OK")
121  else:
122  stat.summary(diagnostic_msgs.msg.DiagnosticStatus.WARN, "Too high")
123  stat.add("Top-Side Margin", 10 - time_to_launch)
124  return stat
125 
126 
127 if __name__=='__main__':
128  rospy.init_node("diagnostic_updater_example")
129 
130  # The Updater class advertises to /diagnostics, and has a
131  # ~diagnostic_period parameter that says how often the diagnostics
132  # should be published.
134 
135  # The diagnostic_updater.Updater class will fill out the hardware_id
136  # field of the diagnostic_msgs.msg.DiagnosticStatus message. You need to
137  # use the setHardwareID() method to set the hardware ID.
138  #
139  # The hardware ID should be able to identify the specific device you are
140  # working with. If it is not appropriate to fill out a hardware ID in
141  # your case, you should call setHardwareID("none") to avoid warnings.
142  # (A warning will be generated as soon as your node updates with no
143  # non-OK statuses.)
144  updater.setHardwareID("none")
145  # Or...
146  updater.setHardwareID("Device-%i-%i" % (27, 46) )
147 
148  # Diagnostic tasks are added to the Updater. They will later be run when
149  # the updater decides to update.
150  # As opposed to the C++ API, there is only one add function. It can take
151  # several types of arguments:
152  # - add(task): where task is a DiagnosticTask
153  # - add(name, fn): add a DiagnosticTask embodied by a name and function
154  updater.add("Function updater", dummy_diagnostic)
155  dc = DummyClass()
156  updater.add("Method updater", dc.produce_diagnostics)
157 
158  # Internally, updater.add converts its arguments into a DiagnosticTask.
159  # Sometimes it can be useful to work directly with DiagnosticTasks. Look
160  # at FrequencyStatus and TimestampStatus in update_functions for a
161  # real-life example of how to make a DiagnosticTask by deriving from
162  # DiagnosticTask.
163 
164  # Alternatively, a FunctionDiagnosticTask is a derived class from
165  # DiagnosticTask that can be used to create a DiagnosticTask from
166  # a function. This will be useful when combining multiple diagnostic
167  # tasks using a CompositeDiagnosticTask.
168  lower = diagnostic_updater.FunctionDiagnosticTask("Lower-bound check",
169  check_lower_bound)
170  upper = diagnostic_updater.FunctionDiagnosticTask("Upper-bound check",
171  check_upper_bound)
172 
173  # If you want to merge the outputs of two diagnostic tasks together, you
174  # can create a CompositeDiagnosticTask, also a derived class from
175  # DiagnosticTask. For example, we could combine the upper and lower
176  # bounds check into a single DiagnosticTask.
178  bounds.addTask(lower)
179  bounds.addTask(upper)
180 
181  # We can then add the CompositeDiagnosticTask to our Updater. When it is
182  # run, the overall name will be the name of the composite task, i.e.,
183  # "Bound check". The summary will be a combination of the summary of the
184  # lower and upper tasks (see DiagnosticStatusWrapper.mergeSummary for
185  # details on how the merging is done). The lists of key-value pairs will be
186  # concatenated.
187  updater.add(bounds)
188 
189  # You can broadcast a message in all the DiagnosticStatus if your node
190  # is in a special state.
191  updater.broadcast(0, "Doing important initialization stuff.")
192 
193  pub1 = rospy.Publisher("topic1", std_msgs.msg.Bool, queue_size=10)
194  pub2_temp = rospy.Publisher("topic2", std_msgs.msg.Bool, queue_size=10)
195  rospy.sleep(2) # It isn't important if it doesn't take time.
196 
197  # Some diagnostic tasks are very common, such as checking the rate
198  # at which a topic is publishing, or checking that timestamps are
199  # sufficiently recent. FrequencyStatus and TimestampStatus can do these
200  # checks for you.
201  #
202  # Usually you would instantiate them via a HeaderlessTopicDiagnostic
203  # (FrequencyStatus only, for topics that do not contain a header) or a
204  # TopicDiagnostic (FrequencyStatus and TimestampStatus, for topics that
205  # do contain a header).
206  #
207  # Some values are passed to the constructor as pointers. If these values
208  # are changed, the FrequencyStatus/TimestampStatus will start operating
209  # with the new values.
210  #
211  # Refer to diagnostic_updater.FrequencyStatusParam and
212  # diagnostic_updater.TimestampStatusParam documentation for details on
213  # what the parameters mean:
214  freq_bounds = {'min':0.5, 'max':2} # If you update these values, the
215  # HeaderlessTopicDiagnostic will use the new values.
216  pub1_freq = diagnostic_updater.HeaderlessTopicDiagnostic("topic1", updater,
217  diagnostic_updater.FrequencyStatusParam(freq_bounds, 0.1, 10))
218 
219  # Note that TopicDiagnostic, HeaderlessDiagnosedPublisher,
220  # HeaderlessDiagnosedPublisher and DiagnosedPublisher all descend from
221  # CompositeDiagnosticTask, so you can add your own fields to them using
222  # the addTask method.
223  #
224  # Each time pub1_freq is updated, lower will also get updated and its
225  # output will be merged with the output from pub1_freq.
226  pub1_freq.addTask(lower) # (This wouldn't work if lower was stateful).
227 
228  # If we know that the state of the node just changed, we can force an
229  # immediate update.
230  updater.force_update()
231 
232  # We can remove a task by refering to its name.
233  if not updater.removeByName("Bound check"):
234  rospy.logerr("The Bound check task was not found when trying to remove it.")
235 
236  while not rospy.is_shutdown():
237  msg = std_msgs.msg.Bool()
238  rospy.sleep(0.1)
239 
240  # Calls to pub1 have to be accompanied by calls to pub1_freq to keep
241  # the statistics up to date.
242  msg.data = False
243  pub1.publish(msg)
244  pub1_freq.tick()
245 
246  # We can call updater.update whenever is convenient. It will take care
247  # of rate-limiting the updates.
248  updater.update()
def dummy_diagnostic(stat)
Definition: example.py:57
def run(self, stat)
Definition: example.py:102
A structure that holds the constructor parameters for the FrequencyStatus class.
def check_lower_bound(stat)
Definition: example.py:109
def produce_diagnostics(self, stat)
Definition: example.py:91
def __init__(self)
Definition: example.py:98
Manages a list of diagnostic tasks, and calls them in a rate-limited manner.
A class to facilitate making diagnostics for a topic using a FrequencyStatus.
Definition: publisher.h:55
DiagnosticTask is an abstract base class for collecting diagnostic data.
def check_upper_bound(stat)
Definition: example.py:118
Merges CompositeDiagnosticTask into a single DiagnosticTask.


diagnostic_updater
Author(s): Kevin Watts, Brice Rebsamen , Jeremy Leibs, Blaise Gassend
autogenerated on Thu Oct 8 2020 03:19:34