aggregator.cpp
Go to the documentation of this file.
1 /*********************************************************************
2  * Software License Agreement (BSD License)
3  *
4  * Copyright (c) 2009, Willow Garage, Inc.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above
14  * copyright notice, this list of conditions and the following
15  * disclaimer in the documentation and/or other materials provided
16  * with the distribution.
17  * * Neither the name of the Willow Garage nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25  * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32  * POSSIBILITY OF SUCH DAMAGE.
33  *********************************************************************/
34 
38 
39 using namespace std;
40 using namespace diagnostic_aggregator;
41 
42 Aggregator::Aggregator() :
43  pub_rate_(1.0),
44  analyzer_group_(NULL),
45  other_analyzer_(NULL),
46  base_path_("")
47 {
49  nh.param(string("base_path"), base_path_, string(""));
50  if (base_path_.size() > 0 && base_path_.find("/") != 0)
51  base_path_ = "/" + base_path_;
52 
53  nh.param("pub_rate", pub_rate_, pub_rate_);
54 
55  bool other_as_errors = false;
56  nh.param("other_as_errors", other_as_errors, false);
57 
59 
60  if (!analyzer_group_->init(base_path_, nh))
61  {
62  ROS_ERROR("Analyzer group for diagnostic aggregator failed to initialize!");
63  }
64 
65  // Last analyzer handles remaining data
66  other_analyzer_ = new OtherAnalyzer(other_as_errors);
67  other_analyzer_->init(base_path_); // This always returns true
68  add_srv_ = n_.advertiseService("/diagnostics_agg/add_diagnostics", &Aggregator::addDiagnostics, this);
69  diag_sub_ = n_.subscribe("/diagnostics", 1000, &Aggregator::diagCallback, this);
70  agg_pub_ = n_.advertise<diagnostic_msgs::DiagnosticArray>("/diagnostics_agg", 1);
71  toplevel_state_pub_ = n_.advertise<diagnostic_msgs::DiagnosticStatus>("/diagnostics_toplevel_state", 1);
72 }
73 
74 void Aggregator::checkTimestamp(const diagnostic_msgs::DiagnosticArray::ConstPtr& diag_msg)
75 {
76  if (diag_msg->header.stamp.toSec() != 0)
77  return;
78 
79  string stamp_warn = "No timestamp set for diagnostic message. Message names: ";
80  vector<diagnostic_msgs::DiagnosticStatus>::const_iterator it;
81  for (it = diag_msg->status.begin(); it != diag_msg->status.end(); ++it)
82  {
83  if (it != diag_msg->status.begin())
84  stamp_warn += ", ";
85  stamp_warn += it->name;
86  }
87 
88  auto result = ros_warnings_.insert(stamp_warn);
89  ROS_WARN_COND(result.second, "%s", stamp_warn.c_str());
90 }
91 
92 void Aggregator::diagCallback(const diagnostic_msgs::DiagnosticArray::ConstPtr& diag_msg)
93 {
94  checkTimestamp(diag_msg);
95 
96  bool analyzed = false;
97  { // lock the whole loop to ensure nothing in the analyzer group changes
98  // during it.
99  boost::mutex::scoped_lock lock(mutex_);
100  for (unsigned int j = 0; j < diag_msg->status.size(); ++j)
101  {
102  analyzed = false;
103  boost::shared_ptr<StatusItem> item(new StatusItem(&diag_msg->status[j]));
104 
105  if (analyzer_group_->match(item->getName()))
106  analyzed = analyzer_group_->analyze(item);
107 
108  if (!analyzed)
109  other_analyzer_->analyze(item);
110  }
111  }
112 }
113 
115 {
116  if (analyzer_group_) delete analyzer_group_;
117 
118  if (other_analyzer_) delete other_analyzer_;
119 }
120 
121 
123 {
124  boost::mutex::scoped_lock lock(mutex_); // Possibility of multiple bonds breaking at once
125  ROS_WARN("Bond for namespace %s was broken", bond_id.c_str());
126  std::vector<boost::shared_ptr<bond::Bond> >::iterator elem;
127  elem = std::find_if(bonds_.begin(), bonds_.end(), BondIDMatch(bond_id));
128  if (elem == bonds_.end()){
129  ROS_WARN("Broken bond tried to erase a bond which didn't exist.");
130  } else {
131  bonds_.erase(elem);
132  }
133  if (!analyzer_group_->removeAnalyzer(analyzer))
134  {
135  ROS_WARN("Broken bond tried to remove an analyzer which didn't exist.");
136  }
137 
139 }
140 
142  ROS_DEBUG("Bond formed");
143  boost::mutex::scoped_lock lock(mutex_);
146 }
147 
148 bool Aggregator::addDiagnostics(diagnostic_msgs::AddDiagnostics::Request &req,
149  diagnostic_msgs::AddDiagnostics::Response &res)
150 {
151  ROS_DEBUG("Got load request for namespace %s", req.load_namespace.c_str());
152  // Don't currently support relative or private namespace definitions
153  if (req.load_namespace[0] != '/')
154  {
155  res.message = "Requested load from non-global namespace. Private and relative namespaces are not supported.";
156  res.success = false;
157  return true;
158  }
159 
160  boost::shared_ptr<Analyzer> group = boost::make_shared<AnalyzerGroup>();
161  { // lock here ensures that bonds from the same namespace aren't added twice.
162  // Without it, possibility of two simultaneous calls adding two objects.
163  boost::mutex::scoped_lock lock(mutex_);
164  // rebuff attempts to add things from the same namespace twice
165  if (std::find_if(bonds_.begin(), bonds_.end(), BondIDMatch(req.load_namespace)) != bonds_.end())
166  {
167  res.message = "Requested load from namespace " + req.load_namespace + " which is already in use";
168  res.success = false;
169  return true;
170  }
171 
172  // Use a different topic for each bond to help control the message queue
173  // length. Bond has a fixed size subscriber queue, so we can easily miss
174  // bond heartbeats if there are too many bonds on the same topic.
175  boost::shared_ptr<bond::Bond> req_bond = boost::make_shared<bond::Bond>(
176  "/diagnostics_agg/bond" + req.load_namespace, req.load_namespace,
177  boost::function<void(void)>(boost::bind(&Aggregator::bondBroken, this, req.load_namespace, group)),
178  boost::function<void(void)>(boost::bind(&Aggregator::bondFormed, this, group))
179  );
180  req_bond->start();
181 
182  bonds_.push_back(req_bond); // bond formed, keep track of it
183  }
184 
185  if (group->init(base_path_, ros::NodeHandle(req.load_namespace)))
186  {
187  res.message = "Successfully initialised AnalyzerGroup. Waiting for bond to form.";
188  res.success = true;
189  return true;
190  }
191  else
192  {
193  res.message = "Failed to initialise AnalyzerGroup.";
194  res.success = false;
195  return true;
196  }
197 }
198 
200 {
201  diagnostic_msgs::DiagnosticArray diag_array;
202 
203  diagnostic_msgs::DiagnosticStatus diag_toplevel_state;
204  diag_toplevel_state.name = "toplevel_state";
205  diag_toplevel_state.level = -1;
206  int min_level = 255;
207 
208  vector<boost::shared_ptr<diagnostic_msgs::DiagnosticStatus> > processed;
209  {
210  boost::mutex::scoped_lock lock(mutex_);
211  processed = analyzer_group_->report();
212  }
213  for (unsigned int i = 0; i < processed.size(); ++i)
214  {
215  diag_array.status.push_back(*processed[i]);
216 
217  if (processed[i]->level > diag_toplevel_state.level)
218  diag_toplevel_state.level = processed[i]->level;
219  if (processed[i]->level < min_level)
220  min_level = processed[i]->level;
221  }
222 
223  vector<boost::shared_ptr<diagnostic_msgs::DiagnosticStatus> > processed_other;
224  {
225  boost::mutex::scoped_lock lock(mutex_);
226  processed_other = other_analyzer_->report();
227  }
228  for (unsigned int i = 0; i < processed_other.size(); ++i)
229  {
230  diag_array.status.push_back(*processed_other[i]);
231 
232  if (processed_other[i]->level > diag_toplevel_state.level)
233  diag_toplevel_state.level = processed_other[i]->level;
234  if (processed_other[i]->level < min_level)
235  min_level = processed_other[i]->level;
236  }
237 
238  diag_array.header.stamp = ros::Time::now();
239 
240  agg_pub_.publish(diag_array);
241 
242  // Top level is error if we have stale items, unless all stale
243  if (diag_toplevel_state.level > int(DiagnosticLevel::Level_Error) && min_level <= int(DiagnosticLevel::Level_Error))
244  diag_toplevel_state.level = DiagnosticLevel::Level_Error;
245 
246  toplevel_state_pub_.publish(diag_toplevel_state);
247 }
diagnostic_aggregator::Aggregator::toplevel_state_pub_
ros::Publisher toplevel_state_pub_
Definition: aggregator.h:199
diagnostic_aggregator
Definition: aggregator.h:61
diagnostic_aggregator::Aggregator::bondBroken
void bondBroken(std::string bond_id, boost::shared_ptr< Analyzer > analyzer)
Definition: aggregator.cpp:122
diagnostic_aggregator::Aggregator::ros_warnings_
std::set< std::string > ros_warnings_
Records all ROS warnings. No warnings are repeated.
Definition: aggregator.h:246
diagnostic_aggregator::AnalyzerGroup
Allows analyzers to be grouped together, or used as sub-analyzers.
Definition: analyzer_group.h:142
diagnostic_aggregator::OtherAnalyzer::init
bool init(std::string path)
Definition: other_analyzer.h:141
boost::shared_ptr
diagnostic_aggregator::Aggregator::base_path_
std::string base_path_
Prepended to all status names of aggregator.
Definition: aggregator.h:244
diagnostic_aggregator::Aggregator::addDiagnostics
bool addDiagnostics(diagnostic_msgs::AddDiagnostics::Request &req, diagnostic_msgs::AddDiagnostics::Response &res)
Service request callback for addition of diagnostics. Creates a bond between the calling node and the...
Definition: aggregator.cpp:148
diagnostic_aggregator::Level_Error
@ Level_Error
Definition: status_item.h:108
diagnostic_aggregator::Aggregator::agg_pub_
ros::Publisher agg_pub_
Definition: aggregator.h:198
diagnostic_aggregator::Aggregator::bonds_
std::vector< boost::shared_ptr< bond::Bond > > bonds_
Contains all bonds for additional diagnostics.
Definition: aggregator.h:221
diagnostic_aggregator::GenericAnalyzerBase::analyze
virtual bool analyze(const boost::shared_ptr< StatusItem > item)
Update state with new StatusItem.
Definition: generic_analyzer_base.h:174
ros::NodeHandle::advertiseService
ServiceServer advertiseService(AdvertiseServiceOptions &ops)
diagnostic_aggregator::Aggregator::analyzer_group_
AnalyzerGroup * analyzer_group_
Definition: aggregator.h:217
diagnostic_aggregator::Aggregator::checkTimestamp
void checkTimestamp(const diagnostic_msgs::DiagnosticArray::ConstPtr &diag_msg)
Definition: aggregator.cpp:74
diagnostic_aggregator::Aggregator::n_
ros::NodeHandle n_
Definition: aggregator.h:195
diagnostic_aggregator::Aggregator::other_analyzer_
OtherAnalyzer * other_analyzer_
Definition: aggregator.h:219
diagnostic_aggregator::OtherAnalyzer::report
std::vector< boost::shared_ptr< diagnostic_msgs::DiagnosticStatus > > report()
Reports current state, returns vector of formatted status messages.
Definition: other_analyzer.h:168
diagnostic_aggregator::BondIDMatch
Definition: aggregator.h:226
diagnostic_aggregator::Aggregator::diag_sub_
ros::Subscriber diag_sub_
Definition: aggregator.h:197
ros::Publisher::publish
void publish(const boost::shared_ptr< M > &message) const
ros::NodeHandle::advertise
Publisher advertise(AdvertiseOptions &ops)
ROS_WARN_COND
#define ROS_WARN_COND(cond,...)
diagnostic_aggregator::AnalyzerGroup::removeAnalyzer
virtual bool removeAnalyzer(boost::shared_ptr< Analyzer > &analyzer)
Remove an analyzer from this analyzerGroup.
Definition: analyzer_group.cpp:175
diagnostic_aggregator::Aggregator::publishData
void publishData()
Processes, publishes data. Should be called at pub_rate.
Definition: aggregator.cpp:199
diagnostic_aggregator::OtherAnalyzer
Definition: other_analyzer.h:92
aggregator.h
diagnostic_aggregator::AnalyzerGroup::report
virtual std::vector< boost::shared_ptr< diagnostic_msgs::DiagnosticStatus > > report()
The processed output is the combined output of the sub-analyzers, and the top level status.
Definition: analyzer_group.cpp:235
ROS_DEBUG
#define ROS_DEBUG(...)
diagnostic_aggregator::Aggregator::diagCallback
void diagCallback(const diagnostic_msgs::DiagnosticArray::ConstPtr &diag_msg)
Callback for incoming "/diagnostics".
Definition: aggregator.cpp:92
diagnostic_aggregator::AnalyzerGroup::addAnalyzer
virtual bool addAnalyzer(boost::shared_ptr< Analyzer > &analyzer)
Add an analyzer to this analyzerGroup.
Definition: analyzer_group.cpp:169
ROS_WARN
#define ROS_WARN(...)
diagnostic_aggregator::StatusItem
Helper class to hold, store DiagnosticStatus messages.
Definition: status_item.h:190
ros::NodeHandle::subscribe
Subscriber subscribe(const std::string &topic, uint32_t queue_size, const boost::function< void(C)> &callback, const VoidConstPtr &tracked_object=VoidConstPtr(), const TransportHints &transport_hints=TransportHints())
diagnostic_aggregator::Aggregator::add_srv_
ros::ServiceServer add_srv_
Definition: aggregator.h:196
diagnostic_aggregator::AnalyzerGroup::analyze
virtual bool analyze(const boost::shared_ptr< StatusItem > item)
Analyze returns true if any sub-analyzers will analyze an item.
Definition: analyzer_group.cpp:220
diagnostic_aggregator::AnalyzerGroup::init
virtual bool init(const std::string base_path, const ros::NodeHandle &n)
Initialized with base path and namespace.
Definition: analyzer_group.cpp:51
std
ROS_ERROR
#define ROS_ERROR(...)
diagnostic_aggregator::AnalyzerGroup::resetMatches
void resetMatches()
Clear match arrays. Used when analyzers are added or removed.
Definition: analyzer_group.cpp:214
ros::NodeHandle::param
T param(const std::string &param_name, const T &default_val) const
diagnostic_aggregator::Aggregator::pub_rate_
double pub_rate_
Definition: aggregator.h:201
diagnostic_aggregator::AnalyzerGroup::match
virtual bool match(const std::string name)
Match returns true if any sub-analyzers match an item.
Definition: analyzer_group.cpp:186
diagnostic_aggregator::Aggregator::~Aggregator
~Aggregator()
Definition: aggregator.cpp:114
diagnostic_aggregator::Aggregator::mutex_
boost::mutex mutex_
Definition: aggregator.h:200
diagnostic_aggregator::Aggregator::bondFormed
void bondFormed(boost::shared_ptr< Analyzer > group)
Definition: aggregator.cpp:141
ros::NodeHandle
ros::Time::now
static Time now()


diagnostic_aggregator
Author(s): Kevin Watts, Brice Rebsamen
autogenerated on Tue May 6 2025 02:17:30