aggregator.cpp
Go to the documentation of this file.
00001 /*********************************************************************
00002  * Software License Agreement (BSD License)
00003  *
00004  *  Copyright (c) 2009, Willow Garage, Inc.
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions
00009  *  are met:
00010  *
00011  *   * Redistributions of source code must retain the above copyright
00012  *     notice, this list of conditions and the following disclaimer.
00013  *   * Redistributions in binary form must reproduce the above
00014  *     copyright notice, this list of conditions and the following
00015  *     disclaimer in the documentation and/or other materials provided
00016  *     with the distribution.
00017  *   * Neither the name of the Willow Garage nor the names of its
00018  *     contributors may be used to endorse or promote products derived
00019  *     from this software without specific prior written permission.
00020  *
00021  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024  *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025  *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027  *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028  *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030  *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031  *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032  *  POSSIBILITY OF SUCH DAMAGE.
00033  *********************************************************************/
00034 
00037 #include <diagnostic_aggregator/aggregator.h>
00038 
00039 using namespace std;
00040 using namespace diagnostic_aggregator;
00041 
00042 Aggregator::Aggregator() :
00043   pub_rate_(1.0),
00044   analyzer_group_(NULL),
00045   other_analyzer_(NULL),
00046   base_path_("")
00047 {
00048   ros::NodeHandle nh = ros::NodeHandle("~");
00049   nh.param(string("base_path"), base_path_, string(""));
00050   if (base_path_.size() > 0 && base_path_.find("/") != 0)
00051     base_path_ = "/" + base_path_;
00052 
00053   nh.param("pub_rate", pub_rate_, pub_rate_);
00054 
00055   bool other_as_errors = false;
00056   nh.param("other_as_errors", other_as_errors, false);
00057 
00058   analyzer_group_ = new AnalyzerGroup();
00059 
00060   if (!analyzer_group_->init(base_path_, nh))
00061   {
00062     ROS_ERROR("Analyzer group for diagnostic aggregator failed to initialize!");
00063   }
00064 
00065   // Last analyzer handles remaining data
00066   other_analyzer_ = new OtherAnalyzer(other_as_errors);
00067   other_analyzer_->init(base_path_); // This always returns true
00068   add_srv_ = n_.advertiseService("/diagnostics_agg/add_diagnostics", &Aggregator::addDiagnostics, this);
00069   diag_sub_ = n_.subscribe("/diagnostics", 1000, &Aggregator::diagCallback, this);
00070   agg_pub_ = n_.advertise<diagnostic_msgs::DiagnosticArray>("/diagnostics_agg", 1);
00071   toplevel_state_pub_ = n_.advertise<diagnostic_msgs::DiagnosticStatus>("/diagnostics_toplevel_state", 1);
00072 }
00073 
00074 void Aggregator::checkTimestamp(const diagnostic_msgs::DiagnosticArray::ConstPtr& diag_msg)
00075 {
00076   if (diag_msg->header.stamp.toSec() != 0)
00077     return;
00078 
00079   string stamp_warn = "No timestamp set for diagnostic message. Message names: ";
00080   vector<diagnostic_msgs::DiagnosticStatus>::const_iterator it;
00081   for (it = diag_msg->status.begin(); it != diag_msg->status.end(); ++it)
00082   {
00083     if (it != diag_msg->status.begin())
00084       stamp_warn += ", ";
00085     stamp_warn += it->name;
00086   }
00087   
00088   if (!ros_warnings_.count(stamp_warn))
00089   {
00090     ROS_WARN("%s", stamp_warn.c_str());
00091     ros_warnings_.insert(stamp_warn);
00092   }
00093 }
00094 
00095 void Aggregator::diagCallback(const diagnostic_msgs::DiagnosticArray::ConstPtr& diag_msg)
00096 {
00097   checkTimestamp(diag_msg);
00098 
00099   bool analyzed = false;
00100   { // lock the whole loop to ensure nothing in the analyzer group changes
00101     // during it.
00102     boost::mutex::scoped_lock lock(mutex_);
00103     for (unsigned int j = 0; j < diag_msg->status.size(); ++j)
00104     {
00105       analyzed = false;
00106       boost::shared_ptr<StatusItem> item(new StatusItem(&diag_msg->status[j]));
00107 
00108       if (analyzer_group_->match(item->getName()))
00109         analyzed = analyzer_group_->analyze(item);
00110 
00111       if (!analyzed)
00112         other_analyzer_->analyze(item);
00113     }
00114   }
00115 }
00116 
00117 Aggregator::~Aggregator()
00118 {
00119   if (analyzer_group_) delete analyzer_group_;
00120 
00121   if (other_analyzer_) delete other_analyzer_;
00122 }
00123 
00124 
00125 void Aggregator::bondBroken(string bond_id, boost::shared_ptr<Analyzer> analyzer)
00126 {
00127   boost::mutex::scoped_lock lock(mutex_); // Possibility of multiple bonds breaking at once
00128   ROS_WARN("Bond for namespace %s was broken", bond_id.c_str());
00129   std::vector<boost::shared_ptr<bond::Bond> >::iterator elem;
00130   elem = std::find_if(bonds_.begin(), bonds_.end(), BondIDMatch(bond_id));
00131   if (elem == bonds_.end()){
00132     ROS_WARN("Broken bond tried to erase a bond which didn't exist.");
00133   } else {
00134     bonds_.erase(elem);
00135   }
00136   if (!analyzer_group_->removeAnalyzer(analyzer))
00137   {
00138     ROS_WARN("Broken bond tried to remove an analyzer which didn't exist.");
00139   }
00140 
00141   analyzer_group_->resetMatches();
00142 }
00143 
00144 void Aggregator::bondFormed(boost::shared_ptr<Analyzer> group){
00145   ROS_DEBUG("Bond formed");
00146   boost::mutex::scoped_lock lock(mutex_);
00147   analyzer_group_->addAnalyzer(group);
00148   analyzer_group_->resetMatches();
00149 }
00150 
00151 bool Aggregator::addDiagnostics(diagnostic_msgs::AddDiagnostics::Request &req,
00152                                 diagnostic_msgs::AddDiagnostics::Response &res)
00153 {
00154   ROS_DEBUG("Got load request for namespace %s", req.load_namespace.c_str());
00155   // Don't currently support relative or private namespace definitions
00156   if (req.load_namespace[0] != '/')
00157   {
00158     res.message = "Requested load from non-global namespace. Private and relative namespaces are not supported.";
00159     res.success = false;
00160     return true;
00161   }
00162 
00163   boost::shared_ptr<Analyzer> group = boost::make_shared<AnalyzerGroup>();
00164   { // lock here ensures that bonds from the same namespace aren't added twice.
00165     // Without it, possibility of two simultaneous calls adding two objects.
00166     boost::mutex::scoped_lock lock(mutex_);
00167     // rebuff attempts to add things from the same namespace twice
00168     if (std::find_if(bonds_.begin(), bonds_.end(), BondIDMatch(req.load_namespace)) != bonds_.end())
00169     {
00170       res.message = "Requested load from namespace " + req.load_namespace + " which is already in use";
00171       res.success = false;
00172       return true;
00173     }
00174 
00175     // Use a different topic for each bond to help control the message queue
00176     // length. Bond has a fixed size subscriber queue, so we can easily miss
00177     // bond heartbeats if there are too many bonds on the same topic.
00178     boost::shared_ptr<bond::Bond> req_bond = boost::make_shared<bond::Bond>(
00179       "/diagnostics_agg/bond" + req.load_namespace, req.load_namespace,
00180       boost::function<void(void)>(boost::bind(&Aggregator::bondBroken, this, req.load_namespace, group)),
00181       boost::function<void(void)>(boost::bind(&Aggregator::bondFormed, this, group))
00182                                                                             );
00183     req_bond->start();
00184 
00185     bonds_.push_back(req_bond); // bond formed, keep track of it
00186   }
00187 
00188   if (group->init(base_path_, ros::NodeHandle(req.load_namespace)))
00189   {
00190     res.message = "Successfully initialised AnalyzerGroup. Waiting for bond to form.";
00191     res.success = true;
00192     return true;
00193   }
00194   else
00195   {
00196     res.message = "Failed to initialise AnalyzerGroup.";
00197     res.success = false;
00198     return true;
00199   }
00200 }
00201 
00202 void Aggregator::publishData()
00203 {
00204   diagnostic_msgs::DiagnosticArray diag_array;
00205 
00206   diagnostic_msgs::DiagnosticStatus diag_toplevel_state;
00207   diag_toplevel_state.name = "toplevel_state";
00208   diag_toplevel_state.level = -1;
00209   int min_level = 255;
00210   
00211   vector<boost::shared_ptr<diagnostic_msgs::DiagnosticStatus> > processed;
00212   {
00213     boost::mutex::scoped_lock lock(mutex_);
00214     processed = analyzer_group_->report();
00215   }
00216   for (unsigned int i = 0; i < processed.size(); ++i)
00217   {
00218     diag_array.status.push_back(*processed[i]);
00219 
00220     if (processed[i]->level > diag_toplevel_state.level)
00221       diag_toplevel_state.level = processed[i]->level;
00222     if (processed[i]->level < min_level)
00223       min_level = processed[i]->level;
00224   }
00225 
00226   vector<boost::shared_ptr<diagnostic_msgs::DiagnosticStatus> > processed_other = other_analyzer_->report();
00227   for (unsigned int i = 0; i < processed_other.size(); ++i)
00228   {
00229     diag_array.status.push_back(*processed_other[i]);
00230 
00231     if (processed_other[i]->level > diag_toplevel_state.level)
00232       diag_toplevel_state.level = processed_other[i]->level;
00233     if (processed_other[i]->level < min_level)
00234       min_level = processed_other[i]->level;
00235   }
00236 
00237   diag_array.header.stamp = ros::Time::now();
00238 
00239   agg_pub_.publish(diag_array);
00240 
00241   // Top level is error if we have stale items, unless all stale
00242   if (diag_toplevel_state.level > 2 && min_level <= 2)
00243     diag_toplevel_state.level = 2;
00244 
00245   toplevel_state_pub_.publish(diag_toplevel_state);
00246 }


diagnostic_aggregator
Author(s): Kevin Watts, Brice Rebsamen
autogenerated on Mon Aug 14 2017 02:51:51