$search
00001 /* 00002 * Copyright (c) 2008, Willow Garage, Inc. 00003 * All rights reserved. 00004 * 00005 * Redistribution and use in source and binary forms, with or without 00006 * modification, are permitted provided that the following conditions are met: 00007 * 00008 * * Redistributions of source code must retain the above copyright 00009 * notice, this list of conditions and the following disclaimer. 00010 * * Redistributions in binary form must reproduce the above copyright 00011 * notice, this list of conditions and the following disclaimer in the 00012 * documentation and/or other materials provided with the distribution. 00013 * * Neither the name of the Willow Garage, Inc. nor the names of its 00014 * contributors may be used to endorse or promote products derived from 00015 * this software without specific prior written permission. 00016 * 00017 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00018 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00019 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00020 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00021 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00022 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00023 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00024 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00025 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00026 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00027 * POSSIBILITY OF SUCH DAMAGE. 00028 */ 00029 00032 #ifndef TF_MESSAGE_FILTER_H 00033 #define TF_MESSAGE_FILTER_H 00034 00035 #include <ros/ros.h> 00036 #include <tf/transform_listener.h> 00037 #include <tf/tfMessage.h> 00038 00039 #include <string> 00040 #include <list> 00041 #include <vector> 00042 #include <boost/function.hpp> 00043 #include <boost/bind.hpp> 00044 #include <boost/shared_ptr.hpp> 00045 #include <boost/weak_ptr.hpp> 00046 #include <boost/thread.hpp> 00047 #include <boost/signals.hpp> 00048 00049 #include <ros/callback_queue.h> 00050 00051 #include <message_filters/connection.h> 00052 #include <message_filters/simple_filter.h> 00053 00054 #define TF_MESSAGEFILTER_DEBUG(fmt, ...) \ 00055 ROS_DEBUG_NAMED("message_filter", "MessageFilter [target=%s]: "fmt, getTargetFramesString().c_str(), __VA_ARGS__) 00056 00057 #define TF_MESSAGEFILTER_WARN(fmt, ...) \ 00058 ROS_WARN_NAMED("message_filter", "MessageFilter [target=%s]: "fmt, getTargetFramesString().c_str(), __VA_ARGS__) 00059 00060 namespace tf 00061 { 00062 00063 namespace filter_failure_reasons 00064 { 00065 enum FilterFailureReason 00066 { 00068 Unknown, 00070 OutTheBack, 00072 EmptyFrameID, 00073 }; 00074 } 00075 typedef filter_failure_reasons::FilterFailureReason FilterFailureReason; 00076 00077 class MessageFilterBase 00078 { 00079 public: 00080 virtual ~MessageFilterBase(){} 00081 virtual void clear() = 0; 00082 virtual void setTargetFrame(const std::string& target_frame) = 0; 00083 virtual void setTargetFrames(const std::vector<std::string>& target_frames) = 0; 00084 virtual void setTolerance(const ros::Duration& tolerance) = 0; 00085 }; 00086 00103 template<class M> 00104 class MessageFilter : public MessageFilterBase, public message_filters::SimpleFilter<M> 00105 { 00106 public: 00107 typedef boost::shared_ptr<M const> MConstPtr; 00108 typedef ros::MessageEvent<M const> MEvent; 00109 typedef boost::function<void(const MConstPtr&, FilterFailureReason)> FailureCallback; 00110 typedef boost::signal<void(const MConstPtr&, FilterFailureReason)> FailureSignal; 00111 00112 // If you hit this assert your message does not have a header, or does not have the HasHeader trait defined for it 00113 ROS_STATIC_ASSERT(ros::message_traits::HasHeader<M>::value); 00114 00124 MessageFilter(Transformer& tf, const std::string& target_frame, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle(), ros::Duration max_rate = ros::Duration(0.01)) 00125 : tf_(tf) 00126 , nh_(nh) 00127 , max_rate_(max_rate) 00128 , queue_size_(queue_size) 00129 { 00130 init(); 00131 00132 setTargetFrame(target_frame); 00133 } 00134 00145 template<class F> 00146 MessageFilter(F& f, Transformer& tf, const std::string& target_frame, uint32_t queue_size, ros::NodeHandle nh = ros::NodeHandle(), ros::Duration max_rate = ros::Duration(0.01)) 00147 : tf_(tf) 00148 , nh_(nh) 00149 , max_rate_(max_rate) 00150 , queue_size_(queue_size) 00151 { 00152 init(); 00153 00154 setTargetFrame(target_frame); 00155 00156 connectInput(f); 00157 } 00158 00162 template<class F> 00163 void connectInput(F& f) 00164 { 00165 message_connection_.disconnect(); 00166 message_connection_ = f.registerCallback(&MessageFilter::incomingMessage, this); 00167 } 00168 00172 ~MessageFilter() 00173 { 00174 message_connection_.disconnect(); 00175 tf_.removeTransformsChangedListener(tf_connection_); 00176 00177 clear(); 00178 00179 TF_MESSAGEFILTER_DEBUG("Successful Transforms: %llu, Failed Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu", 00180 (long long unsigned int)successful_transform_count_, (long long unsigned int)failed_transform_count_, 00181 (long long unsigned int)failed_out_the_back_count_, (long long unsigned int)transform_message_count_, 00182 (long long unsigned int)incoming_message_count_, (long long unsigned int)dropped_message_count_); 00183 00184 } 00185 00189 void setTargetFrame(const std::string& target_frame) 00190 { 00191 std::vector<std::string> frames; 00192 frames.push_back(target_frame); 00193 setTargetFrames(frames); 00194 } 00195 00199 void setTargetFrames(const std::vector<std::string>& target_frames) 00200 { 00201 boost::mutex::scoped_lock list_lock(messages_mutex_); 00202 boost::mutex::scoped_lock string_lock(target_frames_string_mutex_); 00203 00204 target_frames_ = target_frames; 00205 00206 std::stringstream ss; 00207 for (std::vector<std::string>::iterator it = target_frames_.begin(); it != target_frames_.end(); ++it) 00208 { 00209 *it = tf::resolve(tf_.getTFPrefix(), *it); 00210 ss << *it << " "; 00211 } 00212 target_frames_string_ = ss.str(); 00213 } 00217 std::string getTargetFramesString() 00218 { 00219 boost::mutex::scoped_lock lock(target_frames_string_mutex_); 00220 return target_frames_string_; 00221 }; 00222 00226 void setTolerance(const ros::Duration& tolerance) 00227 { 00228 time_tolerance_ = tolerance; 00229 } 00230 00234 void clear() 00235 { 00236 boost::mutex::scoped_lock lock(messages_mutex_); 00237 00238 TF_MESSAGEFILTER_DEBUG("%s", "Cleared"); 00239 00240 messages_.clear(); 00241 message_count_ = 0; 00242 00243 warned_about_unresolved_name_ = false; 00244 warned_about_empty_frame_id_ = false; 00245 } 00246 00247 void add(const MEvent& evt) 00248 { 00249 boost::mutex::scoped_lock lock(messages_mutex_); 00250 00251 testMessages(); 00252 00253 if (!testMessage(evt)) 00254 { 00255 // If this message is about to push us past our queue size, erase the oldest message 00256 if (queue_size_ != 0 && message_count_ + 1 > queue_size_) 00257 { 00258 ++dropped_message_count_; 00259 const MEvent& front = messages_.front(); 00260 TF_MESSAGEFILTER_DEBUG("Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)", message_count_, front.getMessage()->header.frame_id.c_str(), front.getMessage()->header.stamp.toSec()); 00261 signalFailure(messages_.front(), filter_failure_reasons::Unknown); 00262 00263 messages_.pop_front(); 00264 --message_count_; 00265 } 00266 00267 // Add the message to our list 00268 messages_.push_back(evt); 00269 ++message_count_; 00270 } 00271 00272 TF_MESSAGEFILTER_DEBUG("Added message in frame %s at time %.3f, count now %d", evt.getMessage()->header.frame_id.c_str(), evt.getMessage()->header.stamp.toSec(), message_count_); 00273 00274 ++incoming_message_count_; 00275 } 00276 00282 void add(const MConstPtr& message) 00283 { 00284 boost::shared_ptr<std::map<std::string, std::string> > header(new std::map<std::string, std::string>); 00285 (*header)["callerid"] = "unknown"; 00286 add(MEvent(message, header, ros::Time::now())); 00287 } 00288 00293 message_filters::Connection registerFailureCallback(const FailureCallback& callback) 00294 { 00295 boost::mutex::scoped_lock lock(failure_signal_mutex_); 00296 return message_filters::Connection(boost::bind(&MessageFilter::disconnectFailure, this, _1), failure_signal_.connect(callback)); 00297 } 00298 00299 private: 00300 00301 void init() 00302 { 00303 message_count_ = 0; 00304 new_transforms_ = false; 00305 successful_transform_count_ = 0; 00306 failed_transform_count_ = 0; 00307 failed_out_the_back_count_ = 0; 00308 transform_message_count_ = 0; 00309 incoming_message_count_ = 0; 00310 dropped_message_count_ = 0; 00311 time_tolerance_ = ros::Duration(0.0); 00312 warned_about_unresolved_name_ = false; 00313 warned_about_empty_frame_id_ = false; 00314 00315 tf_connection_ = tf_.addTransformsChangedListener(boost::bind(&MessageFilter::transformsChanged, this)); 00316 00317 max_rate_timer_ = nh_.createTimer(max_rate_, &MessageFilter::maxRateTimerCallback, this); 00318 } 00319 00320 typedef std::list<MEvent> L_Event; 00321 00322 bool testMessage(const MEvent& evt) 00323 { 00324 const MConstPtr& message = evt.getMessage(); 00325 std::string callerid = evt.getPublisherName();//message->__connection_header ? (*message->__connection_header)["callerid"] : "unknown"; 00326 std::string frame_id = ros::message_traits::FrameId<M>::value(*message); 00327 ros::Time stamp = ros::message_traits::TimeStamp<M>::value(*message); 00328 00329 //Throw out messages which have an empty frame_id 00330 if (frame_id.empty()) 00331 { 00332 if (!warned_about_empty_frame_id_) 00333 { 00334 warned_about_empty_frame_id_ = true; 00335 TF_MESSAGEFILTER_WARN("Discarding message from [%s] due to empty frame_id. This message will only print once.", callerid.c_str()); 00336 } 00337 signalFailure(evt, filter_failure_reasons::EmptyFrameID); 00338 return true; 00339 } 00340 00341 if (frame_id[0] != '/') 00342 { 00343 std::string unresolved = frame_id; 00344 frame_id = tf::resolve(tf_.getTFPrefix(), frame_id); 00345 00346 if (!warned_about_unresolved_name_) 00347 { 00348 warned_about_unresolved_name_ = true; 00349 ROS_WARN("Message from [%s] has a non-fully-qualified frame_id [%s]. Resolved locally to [%s]. This is will likely not work in multi-robot systems. This message will only print once.", callerid.c_str(), unresolved.c_str(), frame_id.c_str()); 00350 } 00351 } 00352 00353 //Throw out messages which are too old 00355 for (std::vector<std::string>::iterator target_it = target_frames_.begin(); target_it != target_frames_.end(); ++target_it) 00356 { 00357 const std::string& target_frame = *target_it; 00358 00359 if (target_frame != frame_id && stamp != ros::Time(0)) 00360 { 00361 ros::Time latest_transform_time ; 00362 00363 tf_.getLatestCommonTime(frame_id, target_frame, latest_transform_time, 0) ; 00364 if (stamp + tf_.getCacheLength() < latest_transform_time) 00365 { 00366 ++failed_out_the_back_count_; 00367 ++dropped_message_count_; 00368 TF_MESSAGEFILTER_DEBUG("Discarding Message, in frame %s, Out of the back of Cache Time(stamp: %.3f + cache_length: %.3f < latest_transform_time %.3f. Message Count now: %d", message->header.frame_id.c_str(), message->header.stamp.toSec(), tf_.getCacheLength().toSec(), latest_transform_time.toSec(), message_count_); 00369 00370 last_out_the_back_stamp_ = stamp; 00371 last_out_the_back_frame_ = frame_id; 00372 00373 signalFailure(evt, filter_failure_reasons::OutTheBack); 00374 return true; 00375 } 00376 } 00377 00378 } 00379 00380 bool ready = !target_frames_.empty(); 00381 for (std::vector<std::string>::iterator target_it = target_frames_.begin(); ready && target_it != target_frames_.end(); ++target_it) 00382 { 00383 std::string& target_frame = *target_it; 00384 if (time_tolerance_ != ros::Duration(0.0)) 00385 { 00386 ready = ready && (tf_.canTransform(target_frame, frame_id, stamp) && 00387 tf_.canTransform(target_frame, frame_id, stamp + time_tolerance_) ); 00388 } 00389 else 00390 { 00391 ready = ready && tf_.canTransform(target_frame, frame_id, stamp); 00392 } 00393 } 00394 00395 if (ready) 00396 { 00397 TF_MESSAGEFILTER_DEBUG("Message ready in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_); 00398 00399 ++successful_transform_count_; 00400 00401 signalMessage(evt); 00402 } 00403 else 00404 { 00405 ++failed_transform_count_; 00406 } 00407 00408 return ready; 00409 } 00410 00411 void testMessages() 00412 { 00413 if (!messages_.empty() && getTargetFramesString() == " ") 00414 { 00415 ROS_WARN_NAMED("message_notifier", "MessageFilter [target=%s]: empty target frame", getTargetFramesString().c_str()); 00416 } 00417 00418 int i = 0; 00419 00420 typename L_Event::iterator it = messages_.begin(); 00421 for (; it != messages_.end(); ++i) 00422 { 00423 MEvent& evt = *it; 00424 00425 if (testMessage(evt)) 00426 { 00427 --message_count_; 00428 it = messages_.erase(it); 00429 } 00430 else 00431 { 00432 ++it; 00433 } 00434 } 00435 } 00436 00437 void maxRateTimerCallback(const ros::TimerEvent&) 00438 { 00439 boost::mutex::scoped_lock list_lock(messages_mutex_); 00440 if (new_transforms_) 00441 { 00442 testMessages(); 00443 new_transforms_ = false; 00444 } 00445 00446 checkFailures(); 00447 } 00448 00452 void incomingMessage(const ros::MessageEvent<M const>& evt) 00453 { 00454 add(evt); 00455 } 00456 00457 void transformsChanged() 00458 { 00459 new_transforms_ = true; 00460 00461 ++transform_message_count_; 00462 } 00463 00464 void checkFailures() 00465 { 00466 if (next_failure_warning_.isZero()) 00467 { 00468 next_failure_warning_ = ros::Time::now() + ros::Duration(15); 00469 } 00470 00471 if (ros::Time::now() >= next_failure_warning_) 00472 { 00473 if (incoming_message_count_ - message_count_ == 0) 00474 { 00475 return; 00476 } 00477 00478 double dropped_pct = (double)dropped_message_count_ / (double)(incoming_message_count_ - message_count_); 00479 if (dropped_pct > 0.95) 00480 { 00481 TF_MESSAGEFILTER_WARN("Dropped %.2f%% of messages so far. Please turn the [%s.message_notifier] rosconsole logger to DEBUG for more information.", dropped_pct*100, ROSCONSOLE_DEFAULT_NAME); 00482 next_failure_warning_ = ros::Time::now() + ros::Duration(60); 00483 00484 if ((double)failed_out_the_back_count_ / (double)dropped_message_count_ > 0.5) 00485 { 00486 TF_MESSAGEFILTER_WARN(" The majority of dropped messages were due to messages growing older than the TF cache time. The last message's timestamp was: %f, and the last frame_id was: %s", last_out_the_back_stamp_.toSec(), last_out_the_back_frame_.c_str()); 00487 } 00488 } 00489 } 00490 } 00491 00492 void disconnectFailure(const message_filters::Connection& c) 00493 { 00494 boost::mutex::scoped_lock lock(failure_signal_mutex_); 00495 c.getBoostConnection().disconnect(); 00496 } 00497 00498 void signalFailure(const MEvent& evt, FilterFailureReason reason) 00499 { 00500 boost::mutex::scoped_lock lock(failure_signal_mutex_); 00501 failure_signal_(evt.getMessage(), reason); 00502 } 00503 00504 Transformer& tf_; 00505 ros::NodeHandle nh_; 00506 ros::Duration max_rate_; 00507 ros::Timer max_rate_timer_; 00508 std::vector<std::string> target_frames_; 00509 std::string target_frames_string_; 00510 boost::mutex target_frames_string_mutex_; 00511 uint32_t queue_size_; 00512 00513 L_Event messages_; 00514 uint32_t message_count_; 00515 boost::mutex messages_mutex_; 00516 00517 bool new_messages_; 00518 volatile bool new_transforms_; 00519 00520 bool warned_about_unresolved_name_; 00521 bool warned_about_empty_frame_id_; 00522 00523 uint64_t successful_transform_count_; 00524 uint64_t failed_transform_count_; 00525 uint64_t failed_out_the_back_count_; 00526 uint64_t transform_message_count_; 00527 uint64_t incoming_message_count_; 00528 uint64_t dropped_message_count_; 00529 00530 ros::Time last_out_the_back_stamp_; 00531 std::string last_out_the_back_frame_; 00532 00533 ros::Time next_failure_warning_; 00534 00535 ros::Duration time_tolerance_; 00536 00537 boost::signals::connection tf_connection_; 00538 message_filters::Connection message_connection_; 00539 00540 FailureSignal failure_signal_; 00541 boost::mutex failure_signal_mutex_; 00542 }; 00543 00544 } // namespace tf 00545 00546 #endif 00547