aggregator.cpp
Go to the documentation of this file.
1 /*********************************************************************
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2009, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of the Willow Garage nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  *********************************************************************/
34 
38 
39 using namespace std;
40 using namespace diagnostic_aggregator;
41 
42 Aggregator::Aggregator() :
43  pub_rate_(1.0),
44  analyzer_group_(NULL),
45  other_analyzer_(NULL),
46  base_path_("")
47 {
49  nh.param(string("base_path"), base_path_, string(""));
50  if (base_path_.size() > 0 && base_path_.find("/") != 0)
51  base_path_ = "/" + base_path_;
52 
53  nh.param("pub_rate", pub_rate_, pub_rate_);
54 
55  bool other_as_errors = false;
56  nh.param("other_as_errors", other_as_errors, false);
57 
59 
60  if (!analyzer_group_->init(base_path_, nh))
61  {
62  ROS_ERROR("Analyzer group for diagnostic aggregator failed to initialize!");
63  }
64 
65  // Last analyzer handles remaining data
66  other_analyzer_ = new OtherAnalyzer(other_as_errors);
67  other_analyzer_->init(base_path_); // This always returns true
68  add_srv_ = n_.advertiseService("/diagnostics_agg/add_diagnostics", &Aggregator::addDiagnostics, this);
69  diag_sub_ = n_.subscribe("/diagnostics", 1000, &Aggregator::diagCallback, this);
70  agg_pub_ = n_.advertise<diagnostic_msgs::DiagnosticArray>("/diagnostics_agg", 1);
71  toplevel_state_pub_ = n_.advertise<diagnostic_msgs::DiagnosticStatus>("/diagnostics_toplevel_state", 1);
72 }
73 
74 void Aggregator::checkTimestamp(const diagnostic_msgs::DiagnosticArray::ConstPtr& diag_msg)
75 {
76  if (diag_msg->header.stamp.toSec() != 0)
77  return;
78 
79  string stamp_warn = "No timestamp set for diagnostic message. Message names: ";
80  vector<diagnostic_msgs::DiagnosticStatus>::const_iterator it;
81  for (it = diag_msg->status.begin(); it != diag_msg->status.end(); ++it)
82  {
83  if (it != diag_msg->status.begin())
84  stamp_warn += ", ";
85  stamp_warn += it->name;
86  }
87 
88  if (!ros_warnings_.count(stamp_warn))
89  {
90  ROS_WARN("%s", stamp_warn.c_str());
91  ros_warnings_.insert(stamp_warn);
92  }
93 }
94 
95 void Aggregator::diagCallback(const diagnostic_msgs::DiagnosticArray::ConstPtr& diag_msg)
96 {
97  checkTimestamp(diag_msg);
98 
99  bool analyzed = false;
100  { // lock the whole loop to ensure nothing in the analyzer group changes
101  // during it.
102  boost::mutex::scoped_lock lock(mutex_);
103  for (unsigned int j = 0; j < diag_msg->status.size(); ++j)
104  {
105  analyzed = false;
106  boost::shared_ptr<StatusItem> item(new StatusItem(&diag_msg->status[j]));
107 
108  if (analyzer_group_->match(item->getName()))
109  analyzed = analyzer_group_->analyze(item);
110 
111  if (!analyzed)
112  other_analyzer_->analyze(item);
113  }
114  }
115 }
116 
118 {
119  if (analyzer_group_) delete analyzer_group_;
120 
121  if (other_analyzer_) delete other_analyzer_;
122 }
123 
124 
126 {
127  boost::mutex::scoped_lock lock(mutex_); // Possibility of multiple bonds breaking at once
128  ROS_WARN("Bond for namespace %s was broken", bond_id.c_str());
129  std::vector<boost::shared_ptr<bond::Bond> >::iterator elem;
130  elem = std::find_if(bonds_.begin(), bonds_.end(), BondIDMatch(bond_id));
131  if (elem == bonds_.end()){
132  ROS_WARN("Broken bond tried to erase a bond which didn't exist.");
133  } else {
134  bonds_.erase(elem);
135  }
136  if (!analyzer_group_->removeAnalyzer(analyzer))
137  {
138  ROS_WARN("Broken bond tried to remove an analyzer which didn't exist.");
139  }
140 
142 }
143 
145  ROS_DEBUG("Bond formed");
146  boost::mutex::scoped_lock lock(mutex_);
149 }
150 
151 bool Aggregator::addDiagnostics(diagnostic_msgs::AddDiagnostics::Request &req,
152  diagnostic_msgs::AddDiagnostics::Response &res)
153 {
154  ROS_DEBUG("Got load request for namespace %s", req.load_namespace.c_str());
155  // Don't currently support relative or private namespace definitions
156  if (req.load_namespace[0] != '/')
157  {
158  res.message = "Requested load from non-global namespace. Private and relative namespaces are not supported.";
159  res.success = false;
160  return true;
161  }
162 
163  boost::shared_ptr<Analyzer> group = boost::make_shared<AnalyzerGroup>();
164  { // lock here ensures that bonds from the same namespace aren't added twice.
165  // Without it, possibility of two simultaneous calls adding two objects.
166  boost::mutex::scoped_lock lock(mutex_);
167  // rebuff attempts to add things from the same namespace twice
168  if (std::find_if(bonds_.begin(), bonds_.end(), BondIDMatch(req.load_namespace)) != bonds_.end())
169  {
170  res.message = "Requested load from namespace " + req.load_namespace + " which is already in use";
171  res.success = false;
172  return true;
173  }
174 
175  // Use a different topic for each bond to help control the message queue
176  // length. Bond has a fixed size subscriber queue, so we can easily miss
177  // bond heartbeats if there are too many bonds on the same topic.
178  boost::shared_ptr<bond::Bond> req_bond = boost::make_shared<bond::Bond>(
179  "/diagnostics_agg/bond" + req.load_namespace, req.load_namespace,
180  boost::function<void(void)>(boost::bind(&Aggregator::bondBroken, this, req.load_namespace, group)),
181  boost::function<void(void)>(boost::bind(&Aggregator::bondFormed, this, group))
182  );
183  req_bond->start();
184 
185  bonds_.push_back(req_bond); // bond formed, keep track of it
186  }
187 
188  if (group->init(base_path_, ros::NodeHandle(req.load_namespace)))
189  {
190  res.message = "Successfully initialised AnalyzerGroup. Waiting for bond to form.";
191  res.success = true;
192  return true;
193  }
194  else
195  {
196  res.message = "Failed to initialise AnalyzerGroup.";
197  res.success = false;
198  return true;
199  }
200 }
201 
203 {
204  diagnostic_msgs::DiagnosticArray diag_array;
205 
206  diagnostic_msgs::DiagnosticStatus diag_toplevel_state;
207  diag_toplevel_state.name = "toplevel_state";
208  diag_toplevel_state.level = -1;
209  int min_level = 255;
210 
211  vector<boost::shared_ptr<diagnostic_msgs::DiagnosticStatus> > processed;
212  {
213  boost::mutex::scoped_lock lock(mutex_);
214  processed = analyzer_group_->report();
215  }
216  for (unsigned int i = 0; i < processed.size(); ++i)
217  {
218  diag_array.status.push_back(*processed[i]);
219 
220  if (processed[i]->level > diag_toplevel_state.level)
221  diag_toplevel_state.level = processed[i]->level;
222  if (processed[i]->level < min_level)
223  min_level = processed[i]->level;
224  }
225 
226  vector<boost::shared_ptr<diagnostic_msgs::DiagnosticStatus> > processed_other = other_analyzer_->report();
227  for (unsigned int i = 0; i < processed_other.size(); ++i)
228  {
229  diag_array.status.push_back(*processed_other[i]);
230 
231  if (processed_other[i]->level > diag_toplevel_state.level)
232  diag_toplevel_state.level = processed_other[i]->level;
233  if (processed_other[i]->level < min_level)
234  min_level = processed_other[i]->level;
235  }
236 
237  diag_array.header.stamp = ros::Time::now();
238 
239  agg_pub_.publish(diag_array);
240 
241  // Top level is error if we have stale items, unless all stale
242  if (diag_toplevel_state.level > 2 && min_level <= 2)
243  diag_toplevel_state.level = 2;
244 
245  toplevel_state_pub_.publish(diag_toplevel_state);
246 }
virtual std::vector< boost::shared_ptr< diagnostic_msgs::DiagnosticStatus > > report()
The processed output is the combined output of the sub-analyzers, and the top level status...
void diagCallback(const diagnostic_msgs::DiagnosticArray::ConstPtr &diag_msg)
Callback for incoming "/diagnostics".
Definition: aggregator.cpp:95
virtual bool addAnalyzer(boost::shared_ptr< Analyzer > &analyzer)
Add an analyzer to this analyzerGroup.
void publish(const boost::shared_ptr< M > &message) const
virtual bool analyze(const boost::shared_ptr< StatusItem > item)
Update state with new StatusItem.
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
std::string base_path_
Prepended to all status names of aggregator.
Definition: aggregator.h:180
ServiceServer advertiseService(const std::string &service, bool(T::*srv_func)(MReq &, MRes &), T *obj)
#define ROS_WARN(...)
void bondFormed(boost::shared_ptr< Analyzer > group)
Definition: aggregator.cpp:144
std::vector< boost::shared_ptr< diagnostic_msgs::DiagnosticStatus > > report()
Reports current state, returns vector of formatted status messages.
void checkTimestamp(const diagnostic_msgs::DiagnosticArray::ConstPtr &diag_msg)
Definition: aggregator.cpp:74
void bondBroken(std::string bond_id, boost::shared_ptr< Analyzer > analyzer)
Definition: aggregator.cpp:125
std::set< std::string > ros_warnings_
Records all ROS warnings. No warnings are repeated.
Definition: aggregator.h:182
bool param(const std::string &param_name, T &param_val, const T &default_val) const
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
bool addDiagnostics(diagnostic_msgs::AddDiagnostics::Request &req, diagnostic_msgs::AddDiagnostics::Response &res)
Service request callback for addition of diagnostics. Creates a bond between the calling node and the...
Definition: aggregator.cpp:151
virtual bool analyze(const boost::shared_ptr< StatusItem > item)
Analyze returns true if any sub-analyzers will analyze an item.
std::vector< boost::shared_ptr< bond::Bond > > bonds_
Contains all bonds for additional diagnostics.
Definition: aggregator.h:157
Allows analyzers to be grouped together, or used as sub-analyzers.
virtual bool init(const std::string base_path, const ros::NodeHandle &n)
Initialized with base path and namespace.
void resetMatches()
Clear match arrays. Used when analyzers are added or removed.
void publishData()
Processes, publishes data. Should be called at pub_rate.
Definition: aggregator.cpp:202
static Time now()
virtual bool removeAnalyzer(boost::shared_ptr< Analyzer > &analyzer)
Remove an analyzer from this analyzerGroup.
virtual bool match(const std::string name)
Match returns true if any sub-analyzers match an item.
Helper class to hold, store DiagnosticStatus messages.
Definition: status_item.h:158
#define ROS_ERROR(...)
#define ROS_DEBUG(...)


diagnostic_aggregator
Author(s): Kevin Watts, Brice Rebsamen
autogenerated on Thu Oct 8 2020 03:17:16