Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00037 #include <diagnostic_aggregator/aggregator.h>
00038
00039 using namespace std;
00040 using namespace diagnostic_aggregator;
00041
00042 Aggregator::Aggregator() :
00043 pub_rate_(1.0),
00044 analyzer_group_(NULL),
00045 other_analyzer_(NULL),
00046 base_path_("")
00047 {
00048 ros::NodeHandle nh = ros::NodeHandle("~");
00049 nh.param(string("base_path"), base_path_, string(""));
00050 if (base_path_.size() > 0 && base_path_.find("/") != 0)
00051 base_path_ = "/" + base_path_;
00052
00053 nh.param("pub_rate", pub_rate_, pub_rate_);
00054
00055 bool other_as_errors = false;
00056 nh.param("other_as_errors", other_as_errors, false);
00057
00058 analyzer_group_ = new AnalyzerGroup();
00059
00060 if (!analyzer_group_->init(base_path_, nh))
00061 {
00062 ROS_ERROR("Analyzer group for diagnostic aggregator failed to initialize!");
00063 }
00064
00065
00066 other_analyzer_ = new OtherAnalyzer(other_as_errors);
00067 other_analyzer_->init(base_path_);
00068 add_srv_ = n_.advertiseService("/diagnostics_agg/add_diagnostics", &Aggregator::addDiagnostics, this);
00069 diag_sub_ = n_.subscribe("/diagnostics", 1000, &Aggregator::diagCallback, this);
00070 agg_pub_ = n_.advertise<diagnostic_msgs::DiagnosticArray>("/diagnostics_agg", 1);
00071 toplevel_state_pub_ = n_.advertise<diagnostic_msgs::DiagnosticStatus>("/diagnostics_toplevel_state", 1);
00072 }
00073
00074 void Aggregator::checkTimestamp(const diagnostic_msgs::DiagnosticArray::ConstPtr& diag_msg)
00075 {
00076 if (diag_msg->header.stamp.toSec() != 0)
00077 return;
00078
00079 string stamp_warn = "No timestamp set for diagnostic message. Message names: ";
00080 vector<diagnostic_msgs::DiagnosticStatus>::const_iterator it;
00081 for (it = diag_msg->status.begin(); it != diag_msg->status.end(); ++it)
00082 {
00083 if (it != diag_msg->status.begin())
00084 stamp_warn += ", ";
00085 stamp_warn += it->name;
00086 }
00087
00088 if (!ros_warnings_.count(stamp_warn))
00089 {
00090 ROS_WARN("%s", stamp_warn.c_str());
00091 ros_warnings_.insert(stamp_warn);
00092 }
00093 }
00094
00095 void Aggregator::diagCallback(const diagnostic_msgs::DiagnosticArray::ConstPtr& diag_msg)
00096 {
00097 checkTimestamp(diag_msg);
00098
00099 bool analyzed = false;
00100 {
00101
00102 boost::mutex::scoped_lock lock(mutex_);
00103 for (unsigned int j = 0; j < diag_msg->status.size(); ++j)
00104 {
00105 analyzed = false;
00106 boost::shared_ptr<StatusItem> item(new StatusItem(&diag_msg->status[j]));
00107
00108 if (analyzer_group_->match(item->getName()))
00109 analyzed = analyzer_group_->analyze(item);
00110
00111 if (!analyzed)
00112 other_analyzer_->analyze(item);
00113 }
00114 }
00115 }
00116
00117 Aggregator::~Aggregator()
00118 {
00119 if (analyzer_group_) delete analyzer_group_;
00120
00121 if (other_analyzer_) delete other_analyzer_;
00122 }
00123
00124
00125 void Aggregator::bondBroken(string bond_id, boost::shared_ptr<Analyzer> analyzer)
00126 {
00127 boost::mutex::scoped_lock lock(mutex_);
00128 ROS_WARN("Bond for namespace %s was broken", bond_id.c_str());
00129 std::vector<boost::shared_ptr<bond::Bond> >::iterator elem;
00130 elem = std::find_if(bonds_.begin(), bonds_.end(), BondIDMatch(bond_id));
00131 if (elem == bonds_.end()){
00132 ROS_WARN("Broken bond tried to erase a bond which didn't exist.");
00133 } else {
00134 bonds_.erase(elem);
00135 }
00136 if (!analyzer_group_->removeAnalyzer(analyzer))
00137 {
00138 ROS_WARN("Broken bond tried to remove an analyzer which didn't exist.");
00139 }
00140
00141 analyzer_group_->resetMatches();
00142 }
00143
00144 void Aggregator::bondFormed(boost::shared_ptr<Analyzer> group){
00145 ROS_DEBUG("Bond formed");
00146 boost::mutex::scoped_lock lock(mutex_);
00147 analyzer_group_->addAnalyzer(group);
00148 analyzer_group_->resetMatches();
00149 }
00150
00151 bool Aggregator::addDiagnostics(diagnostic_msgs::AddDiagnostics::Request &req,
00152 diagnostic_msgs::AddDiagnostics::Response &res)
00153 {
00154 ROS_DEBUG("Got load request for namespace %s", req.load_namespace.c_str());
00155
00156 if (req.load_namespace[0] != '/')
00157 {
00158 res.message = "Requested load from non-global namespace. Private and relative namespaces are not supported.";
00159 res.success = false;
00160 return true;
00161 }
00162
00163 boost::shared_ptr<Analyzer> group = boost::make_shared<AnalyzerGroup>();
00164 {
00165
00166 boost::mutex::scoped_lock lock(mutex_);
00167
00168 if (std::find_if(bonds_.begin(), bonds_.end(), BondIDMatch(req.load_namespace)) != bonds_.end())
00169 {
00170 res.message = "Requested load from namespace " + req.load_namespace + " which is already in use";
00171 res.success = false;
00172 return true;
00173 }
00174
00175
00176
00177
00178 boost::shared_ptr<bond::Bond> req_bond = boost::make_shared<bond::Bond>(
00179 "/diagnostics_agg/bond" + req.load_namespace, req.load_namespace,
00180 boost::function<void(void)>(boost::bind(&Aggregator::bondBroken, this, req.load_namespace, group)),
00181 boost::function<void(void)>(boost::bind(&Aggregator::bondFormed, this, group))
00182 );
00183 req_bond->start();
00184
00185 bonds_.push_back(req_bond);
00186 }
00187
00188 if (group->init(base_path_, ros::NodeHandle(req.load_namespace)))
00189 {
00190 res.message = "Successfully initialised AnalyzerGroup. Waiting for bond to form.";
00191 res.success = true;
00192 return true;
00193 }
00194 else
00195 {
00196 res.message = "Failed to initialise AnalyzerGroup.";
00197 res.success = false;
00198 return true;
00199 }
00200 }
00201
00202 void Aggregator::publishData()
00203 {
00204 diagnostic_msgs::DiagnosticArray diag_array;
00205
00206 diagnostic_msgs::DiagnosticStatus diag_toplevel_state;
00207 diag_toplevel_state.name = "toplevel_state";
00208 diag_toplevel_state.level = -1;
00209 int min_level = 255;
00210
00211 vector<boost::shared_ptr<diagnostic_msgs::DiagnosticStatus> > processed;
00212 {
00213 boost::mutex::scoped_lock lock(mutex_);
00214 processed = analyzer_group_->report();
00215 }
00216 for (unsigned int i = 0; i < processed.size(); ++i)
00217 {
00218 diag_array.status.push_back(*processed[i]);
00219
00220 if (processed[i]->level > diag_toplevel_state.level)
00221 diag_toplevel_state.level = processed[i]->level;
00222 if (processed[i]->level < min_level)
00223 min_level = processed[i]->level;
00224 }
00225
00226 vector<boost::shared_ptr<diagnostic_msgs::DiagnosticStatus> > processed_other = other_analyzer_->report();
00227 for (unsigned int i = 0; i < processed_other.size(); ++i)
00228 {
00229 diag_array.status.push_back(*processed_other[i]);
00230
00231 if (processed_other[i]->level > diag_toplevel_state.level)
00232 diag_toplevel_state.level = processed_other[i]->level;
00233 if (processed_other[i]->level < min_level)
00234 min_level = processed_other[i]->level;
00235 }
00236
00237 diag_array.header.stamp = ros::Time::now();
00238
00239 agg_pub_.publish(diag_array);
00240
00241
00242 if (diag_toplevel_state.level > 2 && min_level <= 2)
00243 diag_toplevel_state.level = 2;
00244
00245 toplevel_state_pub_.publish(diag_toplevel_state);
00246 }