$search
00001 /* 00002 * Copyright (c) 2010, Willow Garage, Inc. 00003 * All rights reserved. 00004 * 00005 * Redistribution and use in source and binary forms, with or without 00006 * modification, are permitted provided that the following conditions are met: 00007 * 00008 * * Redistributions of source code must retain the above copyright 00009 * notice, this list of conditions and the following disclaimer. 00010 * * Redistributions in binary form must reproduce the above copyright 00011 * notice, this list of conditions and the following disclaimer in the 00012 * documentation and/or other materials provided with the distribution. 00013 * * Neither the name of the Willow Garage, Inc. nor the names of its 00014 * contributors may be used to endorse or promote products derived from 00015 * this software without specific prior written permission. 00016 * 00017 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00018 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00019 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 00020 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 00021 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 00022 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 00023 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 00024 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 00025 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 00026 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00027 * POSSIBILITY OF SUCH DAMAGE. 00028 */ 00029 00032 #ifndef TF2_ROS_MESSAGE_FILTER_H 00033 #define TF2_ROS_MESSAGE_FILTER_H 00034 00035 #include <tf2/buffer_core.h> 00036 00037 #include <string> 00038 #include <list> 00039 #include <vector> 00040 #include <boost/function.hpp> 00041 #include <boost/bind.hpp> 00042 #include <boost/shared_ptr.hpp> 00043 #include <boost/thread.hpp> 00044 #include <boost/signals.hpp> 00045 00046 #include <message_filters/connection.h> 00047 #include <message_filters/simple_filter.h> 00048 00049 #include <ros/node_handle.h> 00050 #include <ros/callback_queue_interface.h> 00051 #include <ros/init.h> 00052 00053 #define TF2_ROS_MESSAGEFILTER_DEBUG(fmt, ...) \ 00054 ROS_DEBUG_NAMED("message_filter", "MessageFilter [target=%s]: "fmt, getTargetFramesString().c_str(), __VA_ARGS__) 00055 00056 #define TF2_ROS_MESSAGEFILTER_WARN(fmt, ...) \ 00057 ROS_WARN_NAMED("message_filter", "MessageFilter [target=%s]: "fmt, getTargetFramesString().c_str(), __VA_ARGS__) 00058 00059 namespace tf2_ros 00060 { 00061 00062 namespace filter_failure_reasons 00063 { 00064 enum FilterFailureReason 00065 { 00067 Unknown, 00069 OutTheBack, 00071 EmptyFrameID, 00072 }; 00073 } 00074 typedef filter_failure_reasons::FilterFailureReason FilterFailureReason; 00075 00076 class MessageFilterBase 00077 { 00078 public: 00079 typedef std::vector<std::string> V_string; 00080 00081 virtual ~MessageFilterBase(){} 00082 virtual void clear() = 0; 00083 virtual void setTargetFrame(const std::string& target_frame) = 0; 00084 virtual void setTargetFrames(const V_string& target_frames) = 0; 00085 virtual void setTolerance(const ros::Duration& tolerance) = 0; 00086 }; 00087 00104 template<class M> 00105 class MessageFilter : public MessageFilterBase, public message_filters::SimpleFilter<M> 00106 { 00107 public: 00108 typedef boost::shared_ptr<M const> MConstPtr; 00109 typedef ros::MessageEvent<M const> MEvent; 00110 typedef boost::function<void(const MConstPtr&, FilterFailureReason)> FailureCallback; 00111 typedef boost::signal<void(const MConstPtr&, FilterFailureReason)> FailureSignal; 00112 00113 // If you hit this assert your message does not have a header, or does not have the HasHeader trait defined for it 00114 ROS_STATIC_ASSERT(ros::message_traits::HasHeader<M>::value); 00115 00124 MessageFilter(tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, const ros::NodeHandle& nh) 00125 : bc_(bc) 00126 , queue_size_(queue_size) 00127 , callback_queue_(nh.getCallbackQueue()) 00128 { 00129 init(); 00130 00131 setTargetFrame(target_frame); 00132 } 00133 00143 template<class F> 00144 MessageFilter(F& f, tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, const ros::NodeHandle& nh) 00145 : bc_(bc) 00146 , queue_size_(queue_size) 00147 , callback_queue_(nh.getCallbackQueue()) 00148 { 00149 init(); 00150 00151 setTargetFrame(target_frame); 00152 00153 connectInput(f); 00154 } 00155 00166 MessageFilter(tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, ros::CallbackQueueInterface* cbqueue) 00167 : bc_(bc) 00168 , queue_size_(queue_size) 00169 , callback_queue_(cbqueue) 00170 { 00171 init(); 00172 00173 setTargetFrame(target_frame); 00174 } 00175 00187 template<class F> 00188 MessageFilter(F& f, tf2::BufferCore& bc, const std::string& target_frame, uint32_t queue_size, ros::CallbackQueueInterface* cbqueue) 00189 : bc_(bc) 00190 , queue_size_(queue_size) 00191 , callback_queue_(cbqueue) 00192 { 00193 init(); 00194 00195 setTargetFrame(target_frame); 00196 00197 connectInput(f); 00198 } 00199 00203 template<class F> 00204 void connectInput(F& f) 00205 { 00206 message_connection_.disconnect(); 00207 message_connection_ = f.registerCallback(&MessageFilter::incomingMessage, this); 00208 } 00209 00213 ~MessageFilter() 00214 { 00215 message_connection_.disconnect(); 00216 00217 clear(); 00218 00219 TF2_ROS_MESSAGEFILTER_DEBUG("Successful Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu", 00220 (long long unsigned int)successful_transform_count_, 00221 (long long unsigned int)failed_out_the_back_count_, (long long unsigned int)transform_message_count_, 00222 (long long unsigned int)incoming_message_count_, (long long unsigned int)dropped_message_count_); 00223 00224 } 00225 00229 void setTargetFrame(const std::string& target_frame) 00230 { 00231 V_string frames; 00232 frames.push_back(target_frame); 00233 setTargetFrames(frames); 00234 } 00235 00239 void setTargetFrames(const V_string& target_frames) 00240 { 00241 boost::mutex::scoped_lock list_lock(messages_mutex_); 00242 boost::mutex::scoped_lock string_lock(target_frames_string_mutex_); 00243 00244 target_frames_ = target_frames; 00245 expected_success_count_ = target_frames_.size() + (time_tolerance_.isZero() ? 0 : 1); 00246 00247 std::stringstream ss; 00248 for (V_string::iterator it = target_frames_.begin(); it != target_frames_.end(); ++it) 00249 { 00250 ss << *it << " "; 00251 } 00252 target_frames_string_ = ss.str(); 00253 } 00257 std::string getTargetFramesString() 00258 { 00259 boost::mutex::scoped_lock lock(target_frames_string_mutex_); 00260 return target_frames_string_; 00261 }; 00262 00266 void setTolerance(const ros::Duration& tolerance) 00267 { 00268 boost::mutex::scoped_lock list_lock(messages_mutex_); 00269 time_tolerance_ = tolerance; 00270 expected_success_count_ = target_frames_.size() + (time_tolerance_.isZero() ? 0 : 1); 00271 } 00272 00276 void clear() 00277 { 00278 boost::mutex::scoped_lock lock(messages_mutex_); 00279 00280 TF2_ROS_MESSAGEFILTER_DEBUG("%s", "Cleared"); 00281 00282 bc_.removeTransformableCallback(callback_handle_); 00283 callback_handle_ = bc_.addTransformableCallback(boost::bind(&MessageFilter::transformable, this, _1, _2, _3, _4, _5)); 00284 00285 messages_.clear(); 00286 message_count_ = 0; 00287 00288 warned_about_empty_frame_id_ = false; 00289 } 00290 00291 void add(const MEvent& evt) 00292 { 00293 if (target_frames_.empty()) 00294 { 00295 return; 00296 } 00297 00298 namespace mt = ros::message_traits; 00299 const MConstPtr& message = evt.getMessage(); 00300 const std::string& frame_id = *mt::FrameId<M>::pointer(*message); 00301 ros::Time stamp = mt::TimeStamp<M>::value(*message); 00302 00303 if (frame_id.empty()) 00304 { 00305 messageDropped(evt, filter_failure_reasons::EmptyFrameID); 00306 return; 00307 } 00308 00309 boost::mutex::scoped_lock lock(messages_mutex_); 00310 // iterate through the target frames and add requests for each of them 00311 MessageInfo info; 00312 info.handles.reserve(expected_success_count_); 00313 { 00314 V_string::iterator it = target_frames_.begin(); 00315 V_string::iterator end = target_frames_.end(); 00316 for (; it != end; ++it) 00317 { 00318 const std::string& target_frame = *it; 00319 tf2::TransformableRequestHandle handle = bc_.addTransformableRequest(callback_handle_, target_frame, frame_id, stamp); 00320 if (handle == 0xffffffffffffffffULL) // never transformable 00321 { 00322 messageDropped(evt, filter_failure_reasons::OutTheBack); 00323 return; 00324 } 00325 else if (handle == 0) 00326 { 00327 ++info.success_count; 00328 } 00329 else 00330 { 00331 info.handles.push_back(handle); 00332 } 00333 00334 if (!time_tolerance_.isZero()) 00335 { 00336 handle = bc_.addTransformableRequest(callback_handle_, target_frame, frame_id, stamp + time_tolerance_); 00337 if (handle == 0xffffffffffffffffULL) // never transformable 00338 { 00339 messageDropped(evt, filter_failure_reasons::OutTheBack); 00340 return; 00341 } 00342 else if (handle == 0) 00343 { 00344 ++info.success_count; 00345 } 00346 else 00347 { 00348 info.handles.push_back(handle); 00349 } 00350 } 00351 } 00352 } 00353 00354 // We can transform already 00355 if (info.success_count == expected_success_count_) 00356 { 00357 messageReady(evt); 00358 } 00359 else 00360 { 00361 // If this message is about to push us past our queue size, erase the oldest message 00362 if (queue_size_ != 0 && message_count_ + 1 > queue_size_) 00363 { 00364 ++dropped_message_count_; 00365 const MessageInfo& front = messages_.front(); 00366 TF2_ROS_MESSAGEFILTER_DEBUG("Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)", message_count_, 00367 (*mt::FrameId<M>::pointer(*front.event.getMessage())).c_str(), mt::TimeStamp<M>::value(*front.event.getMessage()).toSec()); 00368 00369 V_TransformableRequestHandle::const_iterator it = front.handles.begin(); 00370 V_TransformableRequestHandle::const_iterator end = front.handles.end(); 00371 for (; it != end; ++it) 00372 { 00373 bc_.cancelTransformableRequest(*it); 00374 } 00375 00376 messageDropped(front.event, filter_failure_reasons::Unknown); 00377 00378 messages_.pop_front(); 00379 --message_count_; 00380 } 00381 00382 // Add the message to our list 00383 info.event = evt; 00384 messages_.push_back(info); 00385 ++message_count_; 00386 } 00387 00388 TF2_ROS_MESSAGEFILTER_DEBUG("Added message in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_); 00389 00390 ++incoming_message_count_; 00391 } 00392 00398 void add(const MConstPtr& message) 00399 { 00400 boost::shared_ptr<std::map<std::string, std::string> > header(new std::map<std::string, std::string>); 00401 (*header)["callerid"] = "unknown"; 00402 ros::WallTime n = ros::WallTime::now(); 00403 ros::Time t(n.sec, n.nsec); 00404 add(MEvent(message, header, t)); 00405 } 00406 00411 message_filters::Connection registerFailureCallback(const FailureCallback& callback) 00412 { 00413 boost::mutex::scoped_lock lock(failure_signal_mutex_); 00414 return message_filters::Connection(boost::bind(&MessageFilter::disconnectFailure, this, _1), failure_signal_.connect(callback)); 00415 } 00416 00417 private: 00418 00419 void init() 00420 { 00421 message_count_ = 0; 00422 successful_transform_count_ = 0; 00423 failed_out_the_back_count_ = 0; 00424 transform_message_count_ = 0; 00425 incoming_message_count_ = 0; 00426 dropped_message_count_ = 0; 00427 time_tolerance_ = ros::Duration(0.0); 00428 warned_about_empty_frame_id_ = false; 00429 expected_success_count_ = 1; 00430 00431 callback_handle_ = bc_.addTransformableCallback(boost::bind(&MessageFilter::transformable, this, _1, _2, _3, _4, _5)); 00432 } 00433 00434 void transformable(tf2::TransformableRequestHandle request_handle, const std::string& target_frame, const std::string& source_frame, 00435 ros::Time time, tf2::TransformableResult result) 00436 { 00437 namespace mt = ros::message_traits; 00438 00439 boost::mutex::scoped_lock lock(messages_mutex_); 00440 00441 // find the message this request is associated with 00442 typename L_MessageInfo::iterator msg_it = messages_.begin(); 00443 typename L_MessageInfo::iterator msg_end = messages_.end(); 00444 for (; msg_it != msg_end; ++msg_it) 00445 { 00446 MessageInfo& info = *msg_it; 00447 V_TransformableRequestHandle::const_iterator handle_it = std::find(info.handles.begin(), info.handles.end(), request_handle); 00448 if (handle_it != info.handles.end()) 00449 { 00450 // found msg_it 00451 ++info.success_count; 00452 break; 00453 } 00454 } 00455 00456 if (msg_it == msg_end) 00457 { 00458 return; 00459 } 00460 00461 const MessageInfo& info = *msg_it; 00462 if (info.success_count < expected_success_count_) 00463 { 00464 return; 00465 } 00466 00467 bool can_transform = true; 00468 const MConstPtr& message = info.event.getMessage(); 00469 const std::string& frame_id = *mt::FrameId<M>::pointer(*message); 00470 ros::Time stamp = mt::TimeStamp<M>::value(*message); 00471 00472 if (result == tf2::TransformAvailable) 00473 { 00474 // make sure we can still perform all the necessary transforms 00475 typename V_string::iterator it = target_frames_.begin(); 00476 typename V_string::iterator end = target_frames_.end(); 00477 for (; it != end; ++it) 00478 { 00479 const std::string& target = *it; 00480 if (!bc_.canTransform(target, frame_id, stamp)) 00481 { 00482 can_transform = false; 00483 break; 00484 } 00485 00486 if (!time_tolerance_.isZero()) 00487 { 00488 if (!bc_.canTransform(target, frame_id, stamp + time_tolerance_)) 00489 { 00490 can_transform = false; 00491 break; 00492 } 00493 } 00494 } 00495 } 00496 else 00497 { 00498 can_transform = false; 00499 } 00500 00501 if (can_transform) 00502 { 00503 TF2_ROS_MESSAGEFILTER_DEBUG("Message ready in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_ - 1); 00504 00505 ++successful_transform_count_; 00506 00507 messageReady(info.event); 00508 00509 } 00510 else 00511 { 00512 ++dropped_message_count_; 00513 00514 TF2_ROS_MESSAGEFILTER_DEBUG("Discarding message in frame %s at time %.3f, count now %d", frame_id.c_str(), stamp.toSec(), message_count_ - 1); 00515 messageDropped(info.event, filter_failure_reasons::Unknown); 00516 } 00517 00518 messages_.erase(msg_it); 00519 --message_count_; 00520 } 00521 00525 void incomingMessage(const ros::MessageEvent<M const>& evt) 00526 { 00527 add(evt); 00528 } 00529 00530 void checkFailures() 00531 { 00532 if (next_failure_warning_.isZero()) 00533 { 00534 next_failure_warning_ = ros::WallTime::now() + ros::WallDuration(15); 00535 } 00536 00537 if (ros::WallTime::now() >= next_failure_warning_) 00538 { 00539 if (incoming_message_count_ - message_count_ == 0) 00540 { 00541 return; 00542 } 00543 00544 double dropped_pct = (double)dropped_message_count_ / (double)(incoming_message_count_ - message_count_); 00545 if (dropped_pct > 0.95) 00546 { 00547 TF2_ROS_MESSAGEFILTER_WARN("Dropped %.2f%% of messages so far. Please turn the [%s.message_notifier] rosconsole logger to DEBUG for more information.", dropped_pct*100, ROSCONSOLE_DEFAULT_NAME); 00548 next_failure_warning_ = ros::WallTime::now() + ros::WallDuration(60); 00549 00550 if ((double)failed_out_the_back_count_ / (double)dropped_message_count_ > 0.5) 00551 { 00552 TF2_ROS_MESSAGEFILTER_WARN(" The majority of dropped messages were due to messages growing older than the TF cache time. The last message's timestamp was: %f, and the last frame_id was: %s", last_out_the_back_stamp_.toSec(), last_out_the_back_frame_.c_str()); 00553 } 00554 } 00555 } 00556 } 00557 00558 struct CBQueueCallback : public ros::CallbackInterface 00559 { 00560 CBQueueCallback(MessageFilter* filter, const MEvent& event, bool success, FilterFailureReason reason) 00561 : event_(event) 00562 , filter_(filter) 00563 , reason_(reason) 00564 , success_(success) 00565 {} 00566 00567 00568 virtual CallResult call() 00569 { 00570 if (success_) 00571 { 00572 filter_->signalMessage(event_); 00573 } 00574 else 00575 { 00576 filter_->signalFailure(event_, reason_); 00577 } 00578 00579 return Success; 00580 } 00581 00582 private: 00583 MEvent event_; 00584 MessageFilter* filter_; 00585 FilterFailureReason reason_; 00586 bool success_; 00587 }; 00588 00589 void messageDropped(const MEvent& evt, FilterFailureReason reason) 00590 { 00591 if (callback_queue_) 00592 { 00593 ros::CallbackInterfacePtr cb(new CBQueueCallback(this, evt, false, reason)); 00594 callback_queue_->addCallback(cb, (uint64_t)this); 00595 } 00596 else 00597 { 00598 signalFailure(evt, reason); 00599 } 00600 } 00601 00602 void messageReady(const MEvent& evt) 00603 { 00604 if (callback_queue_) 00605 { 00606 ros::CallbackInterfacePtr cb(new CBQueueCallback(this, evt, true, filter_failure_reasons::Unknown)); 00607 callback_queue_->addCallback(cb, (uint64_t)this); 00608 } 00609 else 00610 { 00611 signalMessage(evt); 00612 } 00613 } 00614 00615 void disconnectFailure(const message_filters::Connection& c) 00616 { 00617 boost::mutex::scoped_lock lock(failure_signal_mutex_); 00618 c.getBoostConnection().disconnect(); 00619 } 00620 00621 void signalFailure(const MEvent& evt, FilterFailureReason reason) 00622 { 00623 boost::mutex::scoped_lock lock(failure_signal_mutex_); 00624 failure_signal_(evt.getMessage(), reason); 00625 } 00626 00627 tf2::BufferCore& bc_; 00628 V_string target_frames_; 00629 std::string target_frames_string_; 00630 boost::mutex target_frames_string_mutex_; 00631 uint32_t queue_size_; 00632 tf2::TransformableCallbackHandle callback_handle_; 00633 00634 typedef std::vector<tf2::TransformableRequestHandle> V_TransformableRequestHandle; 00635 struct MessageInfo 00636 { 00637 MessageInfo() 00638 : success_count(0) 00639 {} 00640 00641 MEvent event; 00642 V_TransformableRequestHandle handles; 00643 uint32_t success_count; 00644 }; 00645 typedef std::list<MessageInfo> L_MessageInfo; 00646 L_MessageInfo messages_; 00647 uint32_t message_count_; 00648 boost::mutex messages_mutex_; 00649 uint32_t expected_success_count_; 00650 00651 bool warned_about_empty_frame_id_; 00652 00653 uint64_t successful_transform_count_; 00654 uint64_t failed_out_the_back_count_; 00655 uint64_t transform_message_count_; 00656 uint64_t incoming_message_count_; 00657 uint64_t dropped_message_count_; 00658 00659 ros::Time last_out_the_back_stamp_; 00660 std::string last_out_the_back_frame_; 00661 00662 ros::WallTime next_failure_warning_; 00663 00664 ros::Duration time_tolerance_; 00665 00666 message_filters::Connection message_connection_; 00667 00668 FailureSignal failure_signal_; 00669 boost::mutex failure_signal_mutex_; 00670 00671 ros::CallbackQueueInterface* callback_queue_; 00672 }; 00673 00674 } // namespace tf2 00675 00676 #endif 00677