message_notifier.h
Go to the documentation of this file.
00001 /*
00002  * Copyright (c) 2008, Willow Garage, Inc.
00003  * All rights reserved.
00004  *
00005  * Redistribution and use in source and binary forms, with or without
00006  * modification, are permitted provided that the following conditions are met:
00007  *
00008  *     * Redistributions of source code must retain the above copyright
00009  *       notice, this list of conditions and the following disclaimer.
00010  *     * Redistributions in binary form must reproduce the above copyright
00011  *       notice, this list of conditions and the following disclaimer in the
00012  *       documentation and/or other materials provided with the distribution.
00013  *     * Neither the name of the Willow Garage, Inc. nor the names of its
00014  *       contributors may be used to endorse or promote products derived from
00015  *       this software without specific prior written permission.
00016  *
00017  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
00021  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00022  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00023  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00024  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00025  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00026  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00027  * POSSIBILITY OF SUCH DAMAGE.
00028  */
00029 
00032 #ifndef TF_MESSAGE_NOTIFIER_H
00033 #define TF_MESSAGE_NOTIFIER_H
00034 
00035 #include <ros/ros.h>
00036 #include <tf/tf.h>
00037 #include <tf/tfMessage.h>
00038 #include <tf/message_notifier_base.h>
00039 
00040 #include <string>
00041 #include <list>
00042 #include <vector>
00043 #include <boost/function.hpp>
00044 #include <boost/bind.hpp>
00045 #include <boost/shared_ptr.hpp>
00046 #include <boost/thread.hpp>
00047 
00048 #define NOTIFIER_DEBUG(fmt, ...) \
00049   ROS_DEBUG_NAMED("message_notifier", "MessageNotifier [topic=%s, target=%s]: "fmt, topic_.c_str(), getTargetFramesString().c_str(), __VA_ARGS__)
00050 
00051 #define NOTIFIER_WARN(fmt, ...) \
00052   ROS_WARN_NAMED("message_notifier", "MessageNotifier [topic=%s, target=%s]: "fmt, topic_.c_str(), getTargetFramesString().c_str(), __VA_ARGS__)
00053 
00054 namespace tf
00055 {
00056 
00063 inline void* notifierAllocate(uint32_t size)
00064 {
00065         return malloc(size);
00066 }
00073 inline void notifierDeallocate(void* p)
00074 {
00075         free(p);
00076 }
00077 
00078 class Transformer;
00079 
00109 template<class MessageT>
00110 class MessageNotifier : public MessageNotifierBase
00111 {
00112 public:
00113   typedef boost::shared_ptr<MessageT> MessagePtr;
00114   typedef boost::function<void(const MessagePtr&)> Callback;
00115 
00125   ROSCPP_DEPRECATED MessageNotifier(Transformer& tf, Callback callback,
00126       const std::string& topic, const std::string& target_frame,
00127       uint32_t queue_size)
00128   : tf_(tf)
00129   , callback_(callback)
00130   , queue_size_(queue_size)
00131   , message_count_(0)
00132   , destructing_(false)
00133   , new_messages_(false)
00134   , new_transforms_(false)
00135   , successful_transform_count_(0)
00136   , failed_transform_count_(0)
00137   , failed_out_the_back_count_(0)
00138   , transform_message_count_(0)
00139   , incoming_message_count_(0)
00140   , dropped_message_count_(0)
00141   , time_tolerance_(0.0)
00142   {
00143     target_frames_.resize(1);
00144     target_frames_[0] = target_frame;
00145     target_frames_string_ = target_frame;
00146 
00147     setTopic(topic);
00148 
00149     tf_subscriber_1_ = node_.subscribe<tfMessage>("/tf", 1,
00150                                                   boost::bind(&MessageNotifier::incomingTFMessage, this, _1));
00151 
00152     tf_subscriber_2_ = node_.subscribe<tfMessage>("/tf_message", 1,
00153                                                   boost::bind(&MessageNotifier::incomingTFMessage, this, _1));
00154 
00155     thread_handle_ = boost::thread(boost::bind(&MessageNotifier::workerThread, this));
00156   }
00157 
00161   ~MessageNotifier()
00162   {
00163     NOTIFIER_DEBUG("Successful Transforms: %llu, Failed Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu",
00164                    (long long unsigned int)successful_transform_count_, (long long unsigned int)failed_transform_count_, 
00165                    (long long unsigned int)failed_out_the_back_count_, (long long unsigned int)transform_message_count_, 
00166                    (long long unsigned int)incoming_message_count_, (long long unsigned int)dropped_message_count_);
00167 
00168     unsubscribeFromMessage();
00169 
00170 
00171     // Tell the worker thread that we're destructing
00172     destructing_ = true;
00173     new_data_.notify_all();
00174 
00175     // Wait for the worker thread to exit
00176     thread_handle_.join();
00177 
00178     clear();
00179   }
00180 
00184   void setTargetFrame(const std::string& target_frame)
00185   {
00186     std::vector<std::string> frames;
00187     frames.push_back(target_frame);
00188     setTargetFrame(frames);
00189   }
00190 
00194   void setTargetFrame(const std::vector<std::string>& target_frames)
00195   {
00196     boost::mutex::scoped_lock list_lock(list_mutex_);
00197     boost::mutex::scoped_lock string_lock(target_frames_string_mutex_);
00198 
00199     target_frames_ = target_frames;
00200     new_data_.notify_all();
00201 
00202     std::stringstream ss;
00203     for (std::vector<std::string>::const_iterator it = target_frames_.begin(); it != target_frames_.end(); ++it)
00204     {
00205       ss << tf::resolve(tf_.getTFPrefix(), *it) << " ";
00206     }
00207     target_frames_string_ = ss.str();
00208   }
00212   std::string getTargetFramesString()
00213   {
00214     boost::mutex::scoped_lock lock(target_frames_string_mutex_);
00215     return target_frames_string_;
00216   };
00217 
00221   void setTopic(const std::string& topic)
00222   {
00223     unsubscribeFromMessage();
00224 
00225     topic_ = topic;
00226 
00227     subscribeToMessage();
00228   }
00229 
00233   void setTolerance(const ros::Duration& tolerance)
00234   {
00235     time_tolerance_ = tolerance;
00236   }
00237 
00241   void clear()
00242   {
00243     boost::mutex::scoped_lock list_lock(list_mutex_);
00244     boost::mutex::scoped_lock queue_lock(queue_mutex_);
00245 
00246     messages_.clear();
00247     new_message_queue_.clear();
00248     message_count_ = 0;
00249   }
00250 
00251   void enqueueMessage(const MessagePtr& message)
00252   {
00253     {
00254       boost::mutex::scoped_lock lock(queue_mutex_);
00255 
00256       new_message_queue_.push_back(message);
00257 
00258       NOTIFIER_DEBUG("Added message to message queue, count now %d", (int)new_message_queue_.size());
00259 
00260       new_messages_ = true;
00261 
00262       // Notify the worker thread that there is new data available
00263       new_data_.notify_all();
00264     }
00265 
00266     ++incoming_message_count_;
00267   }
00268 
00272   void subscribeToMessage()
00273   {
00274     if (!topic_.empty())
00275     {
00276       subscriber_ = node_.subscribe<MessageT>(topic_, queue_size_,
00277                                               boost::bind(&MessageNotifier::incomingMessage,  this, _1));
00278     }
00279   }
00280 
00284   void unsubscribeFromMessage()
00285   {
00286     if (!topic_.empty())
00287     {
00288       subscriber_.shutdown();
00289     }
00290   }
00291 
00292 private:
00293 
00294   typedef std::vector<MessagePtr> V_Message;
00295   typedef std::list<MessagePtr> L_Message;
00296 
00302   void gatherReadyMessages(V_Message& to_notify)
00303   {
00304     if (!messages_.empty() && getTargetFramesString() == " ")
00305     {
00306       ROS_WARN_NAMED("message_notifier", "MessageNotifier [topic=%s, target=%s]: empty target frame", topic_.c_str(), getTargetFramesString().c_str());
00307     }
00308 
00309     to_notify.reserve(message_count_);
00310 
00311     int i = 0;
00312 
00313     typename L_Message::iterator it = messages_.begin();
00314     for (; it != messages_.end(); ++i)
00315     {
00316       MessagePtr& message = *it;
00317 
00318       bool should_step_out = false;
00319       //Throw out messages which are too old
00321       for (std::vector<std::string>::iterator target_it = target_frames_.begin(); target_it != target_frames_.end(); ++target_it)
00322       {
00323         const std::string& target_frame = *target_it;
00324 
00325         if (target_frame != message->header.frame_id)
00326         {
00327           ros::Time latest_transform_time ;
00328           std::string error_string ;
00329 
00330           tf_.getLatestCommonTime(message->header.frame_id, target_frame, latest_transform_time, &error_string) ;
00331           if (message->header.stamp + tf_.getCacheLength() < latest_transform_time)
00332           {
00333             --message_count_;
00334             ++failed_out_the_back_count_;
00335             ++dropped_message_count_;
00336             NOTIFIER_DEBUG("Discarding Message %d , in frame %s, Out of the back of Cache Time(stamp: %.3f + cache_length: %.3f < latest_transform_time %.3f.  Message Count now: %d", i, message->header.frame_id.c_str(), message->header.stamp.toSec(),  tf_.getCacheLength().toSec(), latest_transform_time.toSec(), message_count_);
00337 
00338             last_out_the_back_stamp_ = message->header.stamp;
00339             last_out_the_back_frame_ = message->header.frame_id;
00340             it = messages_.erase(it);
00341             should_step_out = true;
00342             break;
00343           }
00344         }
00345 
00346       }
00347       if (should_step_out) //If we just deleted a message don't try to use it
00348       {
00349         should_step_out = false;
00350         continue;
00351       }
00352 
00353       bool ready = !target_frames_.empty();
00354       for (std::vector<std::string>::iterator target_it = target_frames_.begin(); ready && target_it != target_frames_.end(); ++target_it)
00355       {
00356         std::string& target_frame = *target_it;
00357         if (time_tolerance_ != ros::Duration(0.0))
00358         {
00359           ready = ready && (tf_.canTransform(target_frame, message->header.frame_id, message->header.stamp) &&
00360                             tf_.canTransform(target_frame, message->header.frame_id, message->header.stamp + time_tolerance_) );
00361         }
00362         else
00363         {
00364           ready = ready && tf_.canTransform(target_frame, message->header.frame_id, message->header.stamp);
00365         }
00366       }
00367 
00368       if (ready)
00369       {
00370         // If we get here the transform succeeded, so push the message onto the notify list, and erase it from or message list
00371         to_notify.push_back(message);
00372 
00373         --message_count_;
00374 
00375         NOTIFIER_DEBUG("Message %d ready in frame %s at time %.3f, count now %d", i, message->header.frame_id.c_str(), message->header.stamp.toSec(), message_count_);
00376 
00377         it = messages_.erase(it);
00378 
00379         ++successful_transform_count_;
00380       }
00381       else
00382       {
00383         ++it;
00384         ++failed_transform_count_;
00385       }
00386     }
00387   }
00388 
00393   void notify(V_Message& to_notify)
00394   {
00395     typename V_Message::iterator it = to_notify.begin();
00396     typename V_Message::iterator end = to_notify.end();
00397     for (; it != end; ++it)
00398     {
00399       callback_(*it);
00400     }
00401   }
00402 
00406   void processNewMessages(V_Message& messages)
00407   {
00408     typename V_Message::iterator it = messages.begin();
00409     typename V_Message::iterator end = messages.end();
00410     for (; it != end; ++it)
00411     {
00412       MessagePtr& message = *it;
00413 
00414       // If this message is about to push us past our queue size, erase the oldest message
00415       if (queue_size_ != 0 && message_count_ + 1 > queue_size_)
00416       {
00417         ++dropped_message_count_;
00418         NOTIFIER_DEBUG("Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)", message_count_, messages_.front()->header.frame_id.c_str(), messages_.front()->header.stamp.toSec());
00419         messages_.pop_front();
00420         --message_count_;
00421 
00422 
00423       }
00424 
00425       // Add the message to our list
00426       messages_.push_back(message);
00427       ++message_count_;
00428 
00429       NOTIFIER_DEBUG("Added message in frame %s at time %.3f, count now %d", message->header.frame_id.c_str(), message->header.stamp.toSec(), message_count_);
00430     }
00431   }
00432 
00436   void workerThread()
00437   {
00438     V_Message to_notify;
00439     while (!destructing_)
00440     {
00441       V_Message local_queue;
00442 
00443       {
00444         boost::mutex::scoped_lock lock(queue_mutex_);
00445 
00446         // Wait for new data to be available
00447         while (!destructing_ && ((message_count_ == 0 && new_message_queue_.size() == 0) || (!new_transforms_ && !new_messages_)))
00448         {
00449           new_data_.wait(lock);
00450         }
00451 
00452         // If we're destructing, break out of the loop
00453         if (destructing_)
00454         {
00455           break;
00456         }
00457 
00458         local_queue.swap(new_message_queue_);
00459 
00460         new_messages_ = false;
00461       }
00462 
00463       {
00464         // Outside the queue lock, gather and notify that the messages are ready
00465         // Need to lock the list mutex because clear() can modify the message list
00466         boost::mutex::scoped_lock lock(list_mutex_);
00467         processNewMessages(local_queue);
00468 
00469         local_queue.clear();
00470 
00471         gatherReadyMessages(to_notify);
00472 
00473         new_transforms_ = false;
00474 
00475         notify(to_notify);
00476         to_notify.clear();
00477 
00478         checkFailures();
00479       }
00480     }
00481   }
00482 
00483 
00488   class MessageDeleter
00489   {
00490   public:
00491     void operator()(MessageT* m)
00492     {
00493       m->~MessageT();
00494       notifierDeallocate(m);
00495     }
00496   };
00497 
00501   void incomingMessage(typename MessageT::ConstPtr msg)
00502   {
00503     // Allocate our new message and placement-new it
00504     MessageT* mem = (MessageT*) notifierAllocate(sizeof(MessageT));
00505     new (mem) MessageT();
00506 
00507     // Create a boost::shared_ptr from the message, with our custom deleter
00508     MessagePtr message(mem, MessageDeleter());
00509     // Copy the message
00510     *message = *msg;
00511 
00512     enqueueMessage(message);
00513   }
00514 
00518   void incomingTFMessage(const tf::tfMessage::ConstPtr msg)
00519   {
00520     // Notify the worker thread that there is new data available
00521     new_data_.notify_all();
00522     new_transforms_ = true;
00523     ++transform_message_count_;
00524   }
00525 
00526   void checkFailures()
00527   {
00528     if (next_failure_warning_.isZero())
00529     {
00530       next_failure_warning_ = ros::Time::now() + ros::Duration(15);
00531     }
00532 
00533     if (ros::Time::now() >= next_failure_warning_)
00534     {
00535       if (incoming_message_count_ - message_count_ == 0)
00536       {
00537         return;
00538       }
00539 
00540       double dropped_pct = (double)dropped_message_count_ / (double)(incoming_message_count_ - message_count_);
00541       if (dropped_pct > 0.95)
00542       {
00543         NOTIFIER_WARN("Dropped %.2f%% of messages so far. Please turn the [%s.message_notifier] rosconsole logger to DEBUG for more information.", dropped_pct*100, ROSCONSOLE_DEFAULT_NAME);
00544         next_failure_warning_ = ros::Time::now() + ros::Duration(60);
00545 
00546         if ((double)failed_out_the_back_count_ / (double)dropped_message_count_ > 0.5)
00547         {
00548           NOTIFIER_WARN("  The majority of dropped messages were due to messages growing older than the TF cache time.  The last message's timestamp was: %f, and the last frame_id was: %s", last_out_the_back_stamp_.toSec(), last_out_the_back_frame_.c_str());
00549         }
00550       }
00551     }
00552   }
00553 
00554   Transformer& tf_; 
00555   ros::NodeHandle node_; 
00556   ros::Subscriber subscriber_;
00557   ros::Subscriber tf_subscriber_1_, tf_subscriber_2_;
00558   Callback callback_; 
00559   std::vector<std::string> target_frames_; 
00560   std::string target_frames_string_;
00561   boost::mutex target_frames_string_mutex_;
00562   std::string topic_; 
00563   uint32_t queue_size_; 
00564 
00565   L_Message messages_; 
00566   uint32_t message_count_; 
00567   boost::mutex list_mutex_; 
00568 
00569   bool destructing_; 
00570   boost::thread thread_handle_; 
00571   boost::condition_variable new_data_; 
00572   bool new_messages_; 
00573   volatile bool new_transforms_; 
00574   V_Message new_message_queue_; 
00575   boost::mutex queue_mutex_; 
00576 
00577   uint64_t successful_transform_count_;
00578   uint64_t failed_transform_count_;
00579   uint64_t failed_out_the_back_count_;
00580   uint64_t transform_message_count_;
00581   uint64_t incoming_message_count_;
00582   uint64_t dropped_message_count_;
00583 
00584   ros::Time last_out_the_back_stamp_;
00585   std::string last_out_the_back_frame_;
00586 
00587   ros::Time next_failure_warning_;
00588 
00589   ros::Duration time_tolerance_; 
00590 };
00591 
00592 } // namespace tf
00593 
00594 #endif


door_handle_detector
Author(s): Radu Bogdan Rusu, Marius
autogenerated on Wed Dec 11 2013 14:17:01