00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00032 #ifndef TF_MESSAGE_NOTIFIER_H
00033 #define TF_MESSAGE_NOTIFIER_H
00034
00035 #include <ros/ros.h>
00036 #include <tf/tf.h>
00037 #include <tf/tfMessage.h>
00038 #include <tf/message_notifier_base.h>
00039
00040 #include <string>
00041 #include <list>
00042 #include <vector>
00043 #include <boost/function.hpp>
00044 #include <boost/bind.hpp>
00045 #include <boost/shared_ptr.hpp>
00046 #include <boost/thread.hpp>
00047
00048 #define NOTIFIER_DEBUG(fmt, ...) \
00049 ROS_DEBUG_NAMED("message_notifier", "MessageNotifier [topic=%s, target=%s]: "fmt, topic_.c_str(), getTargetFramesString().c_str(), __VA_ARGS__)
00050
00051 #define NOTIFIER_WARN(fmt, ...) \
00052 ROS_WARN_NAMED("message_notifier", "MessageNotifier [topic=%s, target=%s]: "fmt, topic_.c_str(), getTargetFramesString().c_str(), __VA_ARGS__)
00053
00054 namespace tf
00055 {
00056
00063 inline void* notifierAllocate(uint32_t size)
00064 {
00065 return malloc(size);
00066 }
00073 inline void notifierDeallocate(void* p)
00074 {
00075 free(p);
00076 }
00077
00078 class Transformer;
00079
00109 template<class MessageT>
00110 class MessageNotifier : public MessageNotifierBase
00111 {
00112 public:
00113 typedef boost::shared_ptr<MessageT> MessagePtr;
00114 typedef boost::function<void(const MessagePtr&)> Callback;
00115
00125 ROSCPP_DEPRECATED MessageNotifier(Transformer& tf, Callback callback,
00126 const std::string& topic, const std::string& target_frame,
00127 uint32_t queue_size)
00128 : tf_(tf)
00129 , callback_(callback)
00130 , queue_size_(queue_size)
00131 , message_count_(0)
00132 , destructing_(false)
00133 , new_messages_(false)
00134 , new_transforms_(false)
00135 , successful_transform_count_(0)
00136 , failed_transform_count_(0)
00137 , failed_out_the_back_count_(0)
00138 , transform_message_count_(0)
00139 , incoming_message_count_(0)
00140 , dropped_message_count_(0)
00141 , time_tolerance_(0.0)
00142 {
00143 target_frames_.resize(1);
00144 target_frames_[0] = target_frame;
00145 target_frames_string_ = target_frame;
00146
00147 setTopic(topic);
00148
00149 tf_subscriber_1_ = node_.subscribe<tfMessage>("/tf", 1,
00150 boost::bind(&MessageNotifier::incomingTFMessage, this, _1));
00151
00152 tf_subscriber_2_ = node_.subscribe<tfMessage>("/tf_message", 1,
00153 boost::bind(&MessageNotifier::incomingTFMessage, this, _1));
00154
00155 thread_handle_ = boost::thread(boost::bind(&MessageNotifier::workerThread, this));
00156 }
00157
00161 ~MessageNotifier()
00162 {
00163 NOTIFIER_DEBUG("Successful Transforms: %llu, Failed Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu",
00164 (long long unsigned int)successful_transform_count_, (long long unsigned int)failed_transform_count_,
00165 (long long unsigned int)failed_out_the_back_count_, (long long unsigned int)transform_message_count_,
00166 (long long unsigned int)incoming_message_count_, (long long unsigned int)dropped_message_count_);
00167
00168 unsubscribeFromMessage();
00169
00170
00171
00172 destructing_ = true;
00173 new_data_.notify_all();
00174
00175
00176 thread_handle_.join();
00177
00178 clear();
00179 }
00180
00184 void setTargetFrame(const std::string& target_frame)
00185 {
00186 std::vector<std::string> frames;
00187 frames.push_back(target_frame);
00188 setTargetFrame(frames);
00189 }
00190
00194 void setTargetFrame(const std::vector<std::string>& target_frames)
00195 {
00196 boost::mutex::scoped_lock list_lock(list_mutex_);
00197 boost::mutex::scoped_lock string_lock(target_frames_string_mutex_);
00198
00199 target_frames_ = target_frames;
00200 new_data_.notify_all();
00201
00202 std::stringstream ss;
00203 for (std::vector<std::string>::const_iterator it = target_frames_.begin(); it != target_frames_.end(); ++it)
00204 {
00205 ss << tf::resolve(tf_.getTFPrefix(), *it) << " ";
00206 }
00207 target_frames_string_ = ss.str();
00208 }
00212 std::string getTargetFramesString()
00213 {
00214 boost::mutex::scoped_lock lock(target_frames_string_mutex_);
00215 return target_frames_string_;
00216 };
00217
00221 void setTopic(const std::string& topic)
00222 {
00223 unsubscribeFromMessage();
00224
00225 topic_ = topic;
00226
00227 subscribeToMessage();
00228 }
00229
00233 void setTolerance(const ros::Duration& tolerance)
00234 {
00235 time_tolerance_ = tolerance;
00236 }
00237
00241 void clear()
00242 {
00243 boost::mutex::scoped_lock list_lock(list_mutex_);
00244 boost::mutex::scoped_lock queue_lock(queue_mutex_);
00245
00246 messages_.clear();
00247 new_message_queue_.clear();
00248 message_count_ = 0;
00249 }
00250
00251 void enqueueMessage(const MessagePtr& message)
00252 {
00253 {
00254 boost::mutex::scoped_lock lock(queue_mutex_);
00255
00256 new_message_queue_.push_back(message);
00257
00258 NOTIFIER_DEBUG("Added message to message queue, count now %d", (int)new_message_queue_.size());
00259
00260 new_messages_ = true;
00261
00262
00263 new_data_.notify_all();
00264 }
00265
00266 ++incoming_message_count_;
00267 }
00268
00272 void subscribeToMessage()
00273 {
00274 if (!topic_.empty())
00275 {
00276 subscriber_ = node_.subscribe<MessageT>(topic_, queue_size_,
00277 boost::bind(&MessageNotifier::incomingMessage, this, _1));
00278 }
00279 }
00280
00284 void unsubscribeFromMessage()
00285 {
00286 if (!topic_.empty())
00287 {
00288 subscriber_.shutdown();
00289 }
00290 }
00291
00292 private:
00293
00294 typedef std::vector<MessagePtr> V_Message;
00295 typedef std::list<MessagePtr> L_Message;
00296
00302 void gatherReadyMessages(V_Message& to_notify)
00303 {
00304 if (!messages_.empty() && getTargetFramesString() == " ")
00305 {
00306 ROS_WARN_NAMED("message_notifier", "MessageNotifier [topic=%s, target=%s]: empty target frame", topic_.c_str(), getTargetFramesString().c_str());
00307 }
00308
00309 to_notify.reserve(message_count_);
00310
00311 int i = 0;
00312
00313 typename L_Message::iterator it = messages_.begin();
00314 for (; it != messages_.end(); ++i)
00315 {
00316 MessagePtr& message = *it;
00317
00318 bool should_step_out = false;
00319
00321 for (std::vector<std::string>::iterator target_it = target_frames_.begin(); target_it != target_frames_.end(); ++target_it)
00322 {
00323 const std::string& target_frame = *target_it;
00324
00325 if (target_frame != message->header.frame_id)
00326 {
00327 ros::Time latest_transform_time ;
00328 std::string error_string ;
00329
00330 tf_.getLatestCommonTime(message->header.frame_id, target_frame, latest_transform_time, &error_string) ;
00331 if (message->header.stamp + tf_.getCacheLength() < latest_transform_time)
00332 {
00333 --message_count_;
00334 ++failed_out_the_back_count_;
00335 ++dropped_message_count_;
00336 NOTIFIER_DEBUG("Discarding Message %d , in frame %s, Out of the back of Cache Time(stamp: %.3f + cache_length: %.3f < latest_transform_time %.3f. Message Count now: %d", i, message->header.frame_id.c_str(), message->header.stamp.toSec(), tf_.getCacheLength().toSec(), latest_transform_time.toSec(), message_count_);
00337
00338 last_out_the_back_stamp_ = message->header.stamp;
00339 last_out_the_back_frame_ = message->header.frame_id;
00340 it = messages_.erase(it);
00341 should_step_out = true;
00342 break;
00343 }
00344 }
00345
00346 }
00347 if (should_step_out)
00348 {
00349 should_step_out = false;
00350 continue;
00351 }
00352
00353 bool ready = !target_frames_.empty();
00354 for (std::vector<std::string>::iterator target_it = target_frames_.begin(); ready && target_it != target_frames_.end(); ++target_it)
00355 {
00356 std::string& target_frame = *target_it;
00357 if (time_tolerance_ != ros::Duration(0.0))
00358 {
00359 ready = ready && (tf_.canTransform(target_frame, message->header.frame_id, message->header.stamp) &&
00360 tf_.canTransform(target_frame, message->header.frame_id, message->header.stamp + time_tolerance_) );
00361 }
00362 else
00363 {
00364 ready = ready && tf_.canTransform(target_frame, message->header.frame_id, message->header.stamp);
00365 }
00366 }
00367
00368 if (ready)
00369 {
00370
00371 to_notify.push_back(message);
00372
00373 --message_count_;
00374
00375 NOTIFIER_DEBUG("Message %d ready in frame %s at time %.3f, count now %d", i, message->header.frame_id.c_str(), message->header.stamp.toSec(), message_count_);
00376
00377 it = messages_.erase(it);
00378
00379 ++successful_transform_count_;
00380 }
00381 else
00382 {
00383 ++it;
00384 ++failed_transform_count_;
00385 }
00386 }
00387 }
00388
00393 void notify(V_Message& to_notify)
00394 {
00395 typename V_Message::iterator it = to_notify.begin();
00396 typename V_Message::iterator end = to_notify.end();
00397 for (; it != end; ++it)
00398 {
00399 callback_(*it);
00400 }
00401 }
00402
00406 void processNewMessages(V_Message& messages)
00407 {
00408 typename V_Message::iterator it = messages.begin();
00409 typename V_Message::iterator end = messages.end();
00410 for (; it != end; ++it)
00411 {
00412 MessagePtr& message = *it;
00413
00414
00415 if (queue_size_ != 0 && message_count_ + 1 > queue_size_)
00416 {
00417 ++dropped_message_count_;
00418 NOTIFIER_DEBUG("Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)", message_count_, messages_.front()->header.frame_id.c_str(), messages_.front()->header.stamp.toSec());
00419 messages_.pop_front();
00420 --message_count_;
00421
00422
00423 }
00424
00425
00426 messages_.push_back(message);
00427 ++message_count_;
00428
00429 NOTIFIER_DEBUG("Added message in frame %s at time %.3f, count now %d", message->header.frame_id.c_str(), message->header.stamp.toSec(), message_count_);
00430 }
00431 }
00432
00436 void workerThread()
00437 {
00438 V_Message to_notify;
00439 while (!destructing_)
00440 {
00441 V_Message local_queue;
00442
00443 {
00444 boost::mutex::scoped_lock lock(queue_mutex_);
00445
00446
00447 while (!destructing_ && ((message_count_ == 0 && new_message_queue_.size() == 0) || (!new_transforms_ && !new_messages_)))
00448 {
00449 new_data_.wait(lock);
00450 }
00451
00452
00453 if (destructing_)
00454 {
00455 break;
00456 }
00457
00458 local_queue.swap(new_message_queue_);
00459
00460 new_messages_ = false;
00461 }
00462
00463 {
00464
00465
00466 boost::mutex::scoped_lock lock(list_mutex_);
00467 processNewMessages(local_queue);
00468
00469 local_queue.clear();
00470
00471 gatherReadyMessages(to_notify);
00472
00473 new_transforms_ = false;
00474
00475 notify(to_notify);
00476 to_notify.clear();
00477
00478 checkFailures();
00479 }
00480 }
00481 }
00482
00483
00488 class MessageDeleter
00489 {
00490 public:
00491 void operator()(MessageT* m)
00492 {
00493 m->~MessageT();
00494 notifierDeallocate(m);
00495 }
00496 };
00497
00501 void incomingMessage(typename MessageT::ConstPtr msg)
00502 {
00503
00504 MessageT* mem = (MessageT*) notifierAllocate(sizeof(MessageT));
00505 new (mem) MessageT();
00506
00507
00508 MessagePtr message(mem, MessageDeleter());
00509
00510 *message = *msg;
00511
00512 enqueueMessage(message);
00513 }
00514
00518 void incomingTFMessage(const tf::tfMessage::ConstPtr msg)
00519 {
00520
00521 new_data_.notify_all();
00522 new_transforms_ = true;
00523 ++transform_message_count_;
00524 }
00525
00526 void checkFailures()
00527 {
00528 if (next_failure_warning_.isZero())
00529 {
00530 next_failure_warning_ = ros::Time::now() + ros::Duration(15);
00531 }
00532
00533 if (ros::Time::now() >= next_failure_warning_)
00534 {
00535 if (incoming_message_count_ - message_count_ == 0)
00536 {
00537 return;
00538 }
00539
00540 double dropped_pct = (double)dropped_message_count_ / (double)(incoming_message_count_ - message_count_);
00541 if (dropped_pct > 0.95)
00542 {
00543 NOTIFIER_WARN("Dropped %.2f%% of messages so far. Please turn the [%s.message_notifier] rosconsole logger to DEBUG for more information.", dropped_pct*100, ROSCONSOLE_DEFAULT_NAME);
00544 next_failure_warning_ = ros::Time::now() + ros::Duration(60);
00545
00546 if ((double)failed_out_the_back_count_ / (double)dropped_message_count_ > 0.5)
00547 {
00548 NOTIFIER_WARN(" The majority of dropped messages were due to messages growing older than the TF cache time. The last message's timestamp was: %f, and the last frame_id was: %s", last_out_the_back_stamp_.toSec(), last_out_the_back_frame_.c_str());
00549 }
00550 }
00551 }
00552 }
00553
00554 Transformer& tf_;
00555 ros::NodeHandle node_;
00556 ros::Subscriber subscriber_;
00557 ros::Subscriber tf_subscriber_1_, tf_subscriber_2_;
00558 Callback callback_;
00559 std::vector<std::string> target_frames_;
00560 std::string target_frames_string_;
00561 boost::mutex target_frames_string_mutex_;
00562 std::string topic_;
00563 uint32_t queue_size_;
00564
00565 L_Message messages_;
00566 uint32_t message_count_;
00567 boost::mutex list_mutex_;
00568
00569 bool destructing_;
00570 boost::thread thread_handle_;
00571 boost::condition_variable new_data_;
00572 bool new_messages_;
00573 volatile bool new_transforms_;
00574 V_Message new_message_queue_;
00575 boost::mutex queue_mutex_;
00576
00577 uint64_t successful_transform_count_;
00578 uint64_t failed_transform_count_;
00579 uint64_t failed_out_the_back_count_;
00580 uint64_t transform_message_count_;
00581 uint64_t incoming_message_count_;
00582 uint64_t dropped_message_count_;
00583
00584 ros::Time last_out_the_back_stamp_;
00585 std::string last_out_the_back_frame_;
00586
00587 ros::Time next_failure_warning_;
00588
00589 ros::Duration time_tolerance_;
00590 };
00591
00592 }
00593
00594 #endif