trigger_matcher.h
Go to the documentation of this file.
00001 /*********************************************************************
00002 * Software License Agreement (BSD License)
00003 *
00004 *  Copyright (c) 2009-2010, Willow Garage, Inc.
00005 *  All rights reserved.
00006 *
00007 *  Redistribution and use in source and binary forms, with or without
00008 *  modification, are permitted provided that the following conditions
00009 *  are met:
00010 *
00011 *   * Redistributions of source code must retain the above copyright
00012 *     notice, this list of conditions and the following disclaimer.
00013 *   * Redistributions in binary form must reproduce the above
00014 *     copyright notice, this list of conditions and the following
00015 *     disclaimer in the documentation and/or other materials provided
00016 *     with the distribution.
00017 *   * Neither the name of the Willow Garage nor the names of its
00018 *     contributors may be used to endorse or promote products derived
00019 *     from this software without specific prior written permission.
00020 *
00021 *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 *  POSSIBILITY OF SUCH DAMAGE.
00033 *********************************************************************/
00034 
00035 #ifndef __TIMESTAMP_TOOLS__TRIGGER_MATCHER_H__
00036 #define __TIMESTAMP_TOOLS__TRIGGER_MATCHER_H__
00037 
00038 #include <ros/ros.h>
00039 #include <queue>
00040 #include <utility>
00041 #include <boost/thread.hpp>
00042 #include <boost/thread/condition.hpp>
00043 #include <std_msgs/Header.h>
00044 #include <boost/thread/thread_time.hpp>
00045 
00046 namespace timestamp_tools
00047 {
00048 
00049 class TriggerMatcherBase
00050 {
00051 protected:
00052   boost::mutex mutex_;
00053   virtual void gotTrigger() = 0;
00054 
00055   ros::Time getTimestampNoblockPrelocked(const ros::Time &data_time)
00056   {
00057     while (!trig_queue_.empty())
00058     {
00059       // If the head trigger is after the head data then we don't have a
00060       // timestamp for the data and drop it.
00061       if (nonCausalHeads(data_time))
00062       {
00063         if (verbose_)
00064           ROS_WARN("TriggerMatcherBase: data arrived before trigger. Discarding data sample.");
00065         return DropData;
00066       }
00067       
00068       ros::Time trig_stamp = trig_queue_.front();
00069       
00070       // If we are not synchronized, then we need to check that the next trigger
00071       // time stamp would not be satisfactory. (If no late counts are
00072       // allowed then we are always unsynchronized.)
00073       
00074       //ROS_DEBUG("getTimestampNoblock t=%f, qt=%f, ql=%i, sync = %i, last = %f", t.toSec(), trig_stamp.toSec(), trig_queue_.size(), synchronized_, last_data_stamp_.toSec());
00075 
00076       if (!synchronized_ || !late_data_count_allowed_)
00077       {
00078         //ROS_DEBUG("Not synchronized...");
00079         while (true)
00080         {
00081           if (trig_queue_.size() < 2)
00082             return RetryLater; // Can't do the check now.
00083 
00084           trig_queue_.pop();
00085           if (nonCausalHeads(data_time)) 
00086             break; // Head is non-causal, we have arrived.
00087           trig_stamp = trig_queue_.front(); // Skip that trigger event.
00088         }
00089         synchronized_ = true;
00090         late_data_count_ = 0;
00091       }
00092       else 
00093       {
00094         // If the data is later than the next trigger timestamp, we might
00095         // have missed a timestamp, or the data may just have been delayed a
00096         // lot. We count it and if it happens too often we assume that a
00097         // timestamp was missed and redo the locking process.
00098         //ROS_DEBUG("Packet check %f >? %f", last_data_stamp_.toSec(), trig_stamp.toSec());
00099         if (last_data_stamp_ > trig_stamp)
00100         {
00101           //ROS_DEBUG("Late packet...");
00102           if (++late_data_count_ >= late_data_count_allowed_)
00103           {
00104             if (verbose_)
00105               ROS_WARN("TriggerMatcherBase: too many late data packets. Assuming missed trigger. Relocking...");
00106             synchronized_ = false;
00107             continue;
00108           }
00109         }
00110         else
00111           late_data_count_ = 0;
00112 
00113         trig_queue_.pop();
00114       }
00115       
00116       last_data_stamp_ = data_time;
00117       return trig_stamp;
00118     }
00119 
00120     return RetryLater;
00121   }
00122   
00123   virtual void baseReset()
00124   {
00125     synchronized_ = false;
00126     late_data_count_ = 0;
00127     trig_queue_ = std::queue<ros::Time>();
00128   }
00129 
00130 private:
00131   std::queue<ros::Time> trig_queue_;
00132   
00133   ros::Duration trig_delay_;
00134 
00135   ros::Time last_data_stamp_;
00136 
00137   unsigned int late_data_count_allowed_;
00138   unsigned int late_data_count_;
00139   unsigned int max_trig_queue_length_;
00140   bool synchronized_;
00141 
00142   bool nonCausalHeads(const ros::Time &data_stamp)
00143   { // Are the heads non-causal? (Assumes the heads are present.)
00144     return trig_queue_.front() > data_stamp;
00145   }
00146   
00147 public:
00148   bool verbose_;
00149   
00150   static const ros::Time DropData;
00151   static const ros::Time RetryLater;
00152 
00153   bool hasTimestamp()
00154   {
00155     return !trig_queue_.empty();
00156   }
00157   
00158   virtual ~TriggerMatcherBase()
00159   {
00160   }
00161 
00162   void setLateDataCountAllowed(unsigned int v)
00163   {
00164     late_data_count_allowed_ = v;
00165   }
00166 
00167   void setTrigDelay(double delay)
00168   {
00169     setTrigDelay(ros::Duration(delay));
00170   }
00171 
00172   void setTrigDelay(const ros::Duration &delay)
00173   {
00174     trig_delay_ = delay;
00175   }
00176 
00177   TriggerMatcherBase(unsigned int late_data_count_allowed, unsigned int max_trig_queue_length) : 
00178     trig_delay_(0), 
00179     last_data_stamp_(ros::TIME_MIN),
00180     late_data_count_allowed_(late_data_count_allowed),
00181     late_data_count_(0),
00182     max_trig_queue_length_(max_trig_queue_length),
00183     synchronized_(false),
00184     verbose_(false)
00185   {
00186   }
00187 
00188   void triggerCallback(double stamp)
00189   {
00190     triggerCallback(ros::Time(stamp));
00191   }
00192 
00193   void triggerCallback(const std_msgs::HeaderConstPtr &msg)
00194   {
00195     triggerCallback(msg->stamp);
00196   }
00197 
00198   void triggerCallback(const ros::Time &stamp)
00199   {
00200     boost::mutex::scoped_lock(mutex_);
00201     
00202     trig_queue_.push(stamp + trig_delay_);
00203     if (trig_queue_.size() > max_trig_queue_length_)
00204     {
00205       if (verbose_)
00206         ROS_WARN("TriggerMatcherBase: trig_queue_ overflow dropping from front.");
00207       trig_queue_.pop();
00208     }
00209     gotTrigger();
00210   }
00211 };
00212   
00213 const ros::Time TriggerMatcherBase::DropData = ros::TIME_MIN;
00214 const ros::Time TriggerMatcherBase::RetryLater = ros::TIME_MAX;
00215  
00216 class TriggerMatcher : public TriggerMatcherBase
00217 {
00218 private:
00219   boost::mutex data_source_mutex_;
00220   boost::condition_variable got_trigger_condition_;
00221 
00222 protected:
00223   virtual void gotTrigger()
00224   {
00225     got_trigger_condition_.notify_one();
00226   }
00227 
00228 public:
00229   virtual ~TriggerMatcher()
00230   {}
00231 
00232   TriggerMatcher(unsigned int late_data_count_allowed, unsigned int max_trig_queue_length) :
00233     TriggerMatcherBase(late_data_count_allowed, max_trig_queue_length)
00234   {}
00235 
00236   void reset()
00237   {
00238     boost::mutex::scoped_lock lock(mutex_);
00239 
00240     got_trigger_condition_.notify_all();
00241     baseReset();
00242   }
00243 
00244   ros::Time getTimestampBlocking(const ros::Time &t)
00245   {
00246     boost::mutex::scoped_lock data_lock(data_source_mutex_);
00247     boost::mutex::scoped_lock lock(mutex_);
00248 
00249     ros::Time stamp = getTimestampNoblockPrelocked(t);
00250 
00251     if (stamp != RetryLater)
00252       return stamp;
00253     
00254     got_trigger_condition_.wait(lock);
00255     
00256     return getTimestampNoblockPrelocked(t);
00257   }
00258 
00259   ros::Time getTimestampBlocking(const ros::Time &t, double timeout)
00260   {
00261     boost::mutex::scoped_lock data_lock(data_source_mutex_);
00262     boost::mutex::scoped_lock lock(mutex_);
00263 
00264     ros::Time stamp = getTimestampNoblockPrelocked(t);
00265 
00266     if (stamp != RetryLater)
00267       return stamp;
00268 
00269     got_trigger_condition_.timed_wait(lock, boost::posix_time::microseconds(timeout * 1e6));
00270     
00271     return getTimestampNoblockPrelocked(t);
00272   }
00273 
00274   ros::Time getTimestampNoblock(const ros::Time &data_time)
00275   {
00276     boost::mutex::scoped_lock data_lock(data_source_mutex_);
00277     boost::mutex::scoped_lock lock(mutex_);
00278 
00279     return getTimestampNoblockPrelocked(data_time);
00280   }
00281 };
00282 
00283 template <class C>
00284 class QueuedTriggerMatcher : public TriggerMatcherBase
00285 {
00286 public:
00287   typedef std::pair<ros::Time, boost::shared_ptr<C const> > DataPair;
00288   typedef boost::function<void(const ros::Time &, boost::shared_ptr<C const> &)> MatchCallback;
00289 
00290 private:
00291   MatchCallback match_callback_;
00292   std::queue<DataPair> data_queue_;
00293   unsigned int max_data_queue_length_;
00294 
00295   static void DefaultCallback(const ros::Time &t, boost::shared_ptr<C const> &d)
00296   {
00297     ROS_ERROR("QueuedTriggerMatcher triggered with no callback set. This is a bug in the code that uses QueuedTriggerMatcher.");
00298   }
00299 
00300 protected:
00301   virtual void gotTrigger()
00302   {
00303     while (!data_queue_.empty())
00304     {
00305       DataPair &data = data_queue_.front();
00306       ros::Time stamp = getTimestampNoblockPrelocked(data.first);
00307 
00308       if (stamp == DropData)
00309         data_queue_.pop();
00310       else if (stamp == RetryLater)
00311         return;
00312       else
00313       {
00314         match_callback_(stamp, data.second);
00315         data_queue_.pop();
00316       }
00317     }
00318   }
00319 
00320 public:
00321   virtual ~QueuedTriggerMatcher()
00322   {
00323   }
00324   
00325   void setMatchCallback(MatchCallback &cb)
00326   {
00327     match_callback_ = cb;
00328   }
00329   
00330   QueuedTriggerMatcher(unsigned int late_data_count_allowed, unsigned int max_trig_queue_length, unsigned int max_data_queue_length) :
00331     TriggerMatcherBase(late_data_count_allowed, max_trig_queue_length),
00332     max_data_queue_length_(max_data_queue_length)
00333   {
00334     match_callback_ = &QueuedTriggerMatcher::DefaultCallback;
00335   }
00336   
00337   void dataCallback(double stamp, const C &data)
00338   {
00339     dataCallback(ros::Time(stamp), data);
00340   }
00341 
00342   void dataCallback(const ros::Time &stamp, const C &data)
00343   {
00344     boost::shared_ptr<C const> ptr = boost::shared_ptr<C const>(new C(data));
00345     dataCallback(stamp, ptr);
00346   }
00347 
00348   void dataCallback(double stamp, const boost::shared_ptr<C const> &data)
00349   {
00350     dataCallback(ros::Time(stamp), data);
00351   }
00352 
00353   void dataCallback(const ros::Time &stamp, const boost::shared_ptr<C const> &data)
00354   {
00355     dataCallback(DataPair(stamp, data));
00356   }
00357 
00358   void reset()
00359   {
00360     boost::mutex::scoped_lock lock(mutex_);
00361 
00362     data_queue_ = std::queue<DataPair>();
00363     baseReset();
00364   }
00365 
00366   void dataCallback(const DataPair &pair)
00367   {
00368     boost::mutex::scoped_lock lock(mutex_);
00369 
00370     data_queue_.push(pair);
00371     if (data_queue_.size() > max_data_queue_length_)
00372     {
00373       if (verbose_)
00374         ROS_WARN("QueuedTriggerMatcher: trig_queue_ overflow dropping from front.");
00375       data_queue_.pop();
00376     }
00377     gotTrigger();
00378   }
00379 };
00380 
00381 }
00382 
00383 #endif // __TIMESTAMP_TOOLS__TRIGGER_MATCHER_H__


timestamp_tools
Author(s): Blaise Gassend
autogenerated on Fri Jan 3 2014 11:19:54