approximate_time.h
Go to the documentation of this file.
00001 /*********************************************************************
00002 * Software License Agreement (BSD License)
00003 *
00004 *  Copyright (c) 2009, Willow Garage, Inc.
00005 *  All rights reserved.
00006 *
00007 *  Redistribution and use in source and binary forms, with or without
00008 *  modification, are permitted provided that the following conditions
00009 *  are met:
00010 *
00011 *   * Redistributions of source code must retain the above copyright
00012 *     notice, this list of conditions and the following disclaimer.
00013 *   * Redistributions in binary form must reproduce the above
00014 *     copyright notice, this list of conditions and the following
00015 *     disclaimer in the documentation and/or other materials provided
00016 *     with the distribution.
00017 *   * Neither the name of the Willow Garage nor the names of its
00018 *     contributors may be used to endorse or promote products derived
00019 *     from this software without specific prior written permission.
00020 *
00021 *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 *  POSSIBILITY OF SUCH DAMAGE.
00033 *********************************************************************/
00034 
00035 #ifndef MESSAGE_FILTERS_SYNC_APPROXIMATE_TIME_H
00036 #define MESSAGE_FILTERS_SYNC_APPROXIMATE_TIME_H
00037 
00038 #include "message_filters/synchronizer.h"
00039 #include "message_filters/connection.h"
00040 #include "message_filters/null_types.h"
00041 #include "message_filters/signal9.h"
00042 
00043 #include <boost/tuple/tuple.hpp>
00044 #include <boost/shared_ptr.hpp>
00045 #include <boost/function.hpp>
00046 #include <boost/thread/mutex.hpp>
00047 
00048 #include <boost/bind.hpp>
00049 #include <boost/type_traits/is_same.hpp>
00050 #include <boost/noncopyable.hpp>
00051 #include <boost/mpl/or.hpp>
00052 #include <boost/mpl/at.hpp>
00053 #include <boost/mpl/vector.hpp>
00054 
00055 #include <ros/assert.h>
00056 #include <ros/message_traits.h>
00057 #include <ros/message_event.h>
00058 
00059 #include <deque>
00060 #include <vector>
00061 #include <string>
00062 
00063 namespace message_filters
00064 {
00065 namespace sync_policies
00066 {
00067 
00068 namespace mpl = boost::mpl;
00069 
00070 template<typename M0, typename M1, typename M2 = NullType, typename M3 = NullType, typename M4 = NullType,
00071          typename M5 = NullType, typename M6 = NullType, typename M7 = NullType, typename M8 = NullType>
00072 struct ApproximateTime : public PolicyBase<M0, M1, M2, M3, M4, M5, M6, M7, M8>
00073 {
00074   typedef Synchronizer<ApproximateTime> Sync;
00075   typedef PolicyBase<M0, M1, M2, M3, M4, M5, M6, M7, M8> Super;
00076   typedef typename Super::Messages Messages;
00077   typedef typename Super::Signal Signal;
00078   typedef typename Super::Events Events;
00079   typedef typename Super::RealTypeCount RealTypeCount;
00080   typedef typename Super::M0Event M0Event;
00081   typedef typename Super::M1Event M1Event;
00082   typedef typename Super::M2Event M2Event;
00083   typedef typename Super::M3Event M3Event;
00084   typedef typename Super::M4Event M4Event;
00085   typedef typename Super::M5Event M5Event;
00086   typedef typename Super::M6Event M6Event;
00087   typedef typename Super::M7Event M7Event;
00088   typedef typename Super::M8Event M8Event;
00089   typedef std::deque<M0Event> M0Deque;
00090   typedef std::deque<M1Event> M1Deque;
00091   typedef std::deque<M2Event> M2Deque;
00092   typedef std::deque<M3Event> M3Deque;
00093   typedef std::deque<M4Event> M4Deque;
00094   typedef std::deque<M5Event> M5Deque;
00095   typedef std::deque<M6Event> M6Deque;
00096   typedef std::deque<M7Event> M7Deque;
00097   typedef std::deque<M8Event> M8Deque;
00098   typedef std::vector<M0Event> M0Vector;
00099   typedef std::vector<M1Event> M1Vector;
00100   typedef std::vector<M2Event> M2Vector;
00101   typedef std::vector<M3Event> M3Vector;
00102   typedef std::vector<M4Event> M4Vector;
00103   typedef std::vector<M5Event> M5Vector;
00104   typedef std::vector<M6Event> M6Vector;
00105   typedef std::vector<M7Event> M7Vector;
00106   typedef std::vector<M8Event> M8Vector;
00107   typedef boost::tuple<M0Event, M1Event, M2Event, M3Event, M4Event, M5Event, M6Event, M7Event, M8Event> Tuple;
00108   typedef boost::tuple<M0Deque, M1Deque, M2Deque, M3Deque, M4Deque, M5Deque, M6Deque, M7Deque, M8Deque> DequeTuple;
00109   typedef boost::tuple<M0Vector, M1Vector, M2Vector, M3Vector, M4Vector, M5Vector, M6Vector, M7Vector, M8Vector> VectorTuple;
00110 
00111   ApproximateTime(uint32_t queue_size)
00112   : parent_(0)
00113   , queue_size_(queue_size)
00114   , num_non_empty_deques_(0)
00115   , pivot_(NO_PIVOT)
00116   , max_interval_duration_(ros::DURATION_MAX)
00117   , age_penalty_(0.1)
00118   , has_dropped_messages_(9, false)
00119   , inter_message_lower_bounds_(9, ros::Duration(0))
00120   , warned_about_incorrect_bound_(9, false)
00121   {
00122     ROS_ASSERT(queue_size_ > 0);  // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
00123   }
00124 
00125   ApproximateTime(const ApproximateTime& e)
00126   {
00127     *this = e;
00128   }
00129 
00130   ApproximateTime& operator=(const ApproximateTime& rhs)
00131   {
00132     parent_ = rhs.parent_;
00133     queue_size_ = rhs.queue_size_;
00134     num_non_empty_deques_ = rhs.num_non_empty_deques_;
00135     pivot_time_ = rhs.pivot_time_;
00136     pivot_ = rhs.pivot_;
00137     max_interval_duration_ = rhs.max_interval_duration_;
00138     age_penalty_ = rhs.age_penalty_;
00139     candidate_start_ = rhs.candidate_start_;
00140     candidate_end_ = rhs.candidate_end_;
00141     deques_ = rhs.deques_;
00142     past_ = rhs.past_;
00143     has_dropped_messages_ = rhs.has_dropped_messages_;
00144     inter_message_lower_bounds_ = rhs.inter_message_lower_bounds_;
00145     warned_about_incorrect_bound_ = rhs.warned_about_incorrect_bound_;
00146 
00147     return *this;
00148   }
00149 
00150   void initParent(Sync* parent)
00151   {
00152     parent_ = parent;
00153   }
00154 
00155   template<int i>
00156   void checkInterMessageBound()
00157   {
00158     namespace mt = ros::message_traits;
00159     if (warned_about_incorrect_bound_[i])
00160     {
00161       return;
00162     }
00163     std::deque<typename mpl::at_c<Events, i>::type>& deque = boost::get<i>(deques_);
00164     std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_);
00165     ROS_ASSERT(!deque.empty());
00166     const typename mpl::at_c<Messages, i>::type &msg = *(deque.back()).getMessage();
00167     ros::Time msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(msg);
00168     ros::Time previous_msg_time;
00169     if (deque.size() == (size_t) 1)
00170     {
00171       if (v.empty())
00172       {
00173         // We have already published (or have never received) the previous message, we cannot check the bound
00174         return;
00175       }
00176       const typename mpl::at_c<Messages, i>::type &previous_msg = *(v.back()).getMessage();
00177       previous_msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(previous_msg);
00178     }
00179     else
00180     {
00181       // There are at least 2 elements in the deque. Check that the gap respects the bound if it was provided.
00182       const typename mpl::at_c<Messages, i>::type &previous_msg = *(deque[deque.size()-2]).getMessage();
00183       previous_msg_time =  mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(previous_msg);
00184     }
00185     if (msg_time < previous_msg_time)
00186     {
00187       ROS_WARN_STREAM("Messages of type " << i << " arrived out of order (will print only once)");
00188       warned_about_incorrect_bound_[i] = true;
00189     }
00190     else if ((msg_time - previous_msg_time) < inter_message_lower_bounds_[i])
00191     {
00192       ROS_WARN_STREAM("Messages of type " << i << " arrived closer (" << (msg_time - previous_msg_time)
00193                       << ") than the lower bound you provided (" << inter_message_lower_bounds_[i]
00194                       << ") (will print only once)");
00195       warned_about_incorrect_bound_[i] = true;
00196     }
00197   }
00198 
00199 
00200   template<int i>
00201   void add(const typename mpl::at_c<Events, i>::type& evt)
00202   {
00203     boost::mutex::scoped_lock lock(data_mutex_);
00204 
00205     std::deque<typename mpl::at_c<Events, i>::type>& deque = boost::get<i>(deques_);
00206     deque.push_back(evt);
00207     if (deque.size() == (size_t)1) {
00208       // We have just added the first message, so it was empty before
00209       ++num_non_empty_deques_;
00210       if (num_non_empty_deques_ == (uint32_t)RealTypeCount::value)
00211       {
00212         // All deques have messages
00213         process();
00214       }
00215     }
00216     else
00217     {
00218       checkInterMessageBound<i>();
00219     }
00220     // Check whether we have more messages than allowed in the queue.
00221     // Note that during the above call to process(), queue i may contain queue_size_+1 messages.
00222     std::vector<typename mpl::at_c<Events, i>::type>& past = boost::get<i>(past_);
00223     if (deque.size() + past.size() > queue_size_)
00224     {
00225       // Cancel ongoing candidate search, if any:
00226       num_non_empty_deques_ = 0; // We will recompute it from scratch
00227       recover<0>();
00228       recover<1>();
00229       recover<2>();
00230       recover<3>();
00231       recover<4>();
00232       recover<5>();
00233       recover<6>();
00234       recover<7>();
00235       recover<8>();
00236       // Drop the oldest message in the offending topic
00237       ROS_ASSERT(!deque.empty());
00238       deque.pop_front();
00239       has_dropped_messages_[i] = true;
00240       if (pivot_ != NO_PIVOT)
00241       {
00242         // The candidate is no longer valid. Destroy it.
00243         candidate_ = Tuple();
00244         pivot_ = NO_PIVOT;
00245         // There might still be enough messages to create a new candidate:
00246         process();
00247       }
00248     }
00249   }
00250 
00251   void setAgePenalty(double age_penalty)
00252   {
00253     // For correctness we only need age_penalty > -1.0, but most likely a negative age_penalty is a mistake.
00254     ROS_ASSERT(age_penalty >= 0);
00255     age_penalty_ = age_penalty;
00256   }
00257 
00258   void setInterMessageLowerBound(int i, ros::Duration lower_bound) {
00259     // For correctness we only need age_penalty > -1.0, but most likely a negative age_penalty is a mistake.
00260     ROS_ASSERT(lower_bound >= ros::Duration(0,0));
00261     inter_message_lower_bounds_[i] = lower_bound;
00262   }
00263 
00264   void setMaxIntervalDuration(ros::Duration max_interval_duration) {
00265     // For correctness we only need age_penalty > -1.0, but most likely a negative age_penalty is a mistake.
00266     ROS_ASSERT(max_interval_duration >= ros::Duration(0,0));
00267     max_interval_duration_ = max_interval_duration;
00268   }
00269 
00270 private:
00271   // Assumes that deque number <index> is non empty
00272   template<int i>
00273   void dequeDeleteFront()
00274   {
00275     std::deque<typename mpl::at_c<Events, i>::type>& deque = boost::get<i>(deques_);
00276     ROS_ASSERT(!deque.empty());
00277     deque.pop_front();
00278     if (deque.empty())
00279     {
00280       --num_non_empty_deques_;
00281     }
00282   }
00283 
00284   // Assumes that deque number <index> is non empty
00285   void dequeDeleteFront(uint32_t index)
00286   {
00287     switch (index)
00288     {
00289     case 0:
00290       dequeDeleteFront<0>();
00291       break;
00292     case 1:
00293       dequeDeleteFront<1>();
00294       break;
00295     case 2:
00296       dequeDeleteFront<2>();
00297       break;
00298     case 3:
00299       dequeDeleteFront<3>();
00300       break;
00301     case 4:
00302       dequeDeleteFront<4>();
00303       break;
00304     case 5:
00305       dequeDeleteFront<5>();
00306       break;
00307     case 6:
00308       dequeDeleteFront<6>();
00309       break;
00310     case 7:
00311       dequeDeleteFront<7>();
00312       break;
00313     case 8:
00314       dequeDeleteFront<8>();
00315       break;
00316     default:
00317       ROS_BREAK();
00318     }
00319   }
00320 
00321   // Assumes that deque number <index> is non empty
00322   template<int i>
00323   void dequeMoveFrontToPast()
00324   {
00325     std::deque<typename mpl::at_c<Events, i>::type>& deque = boost::get<i>(deques_);
00326     std::vector<typename mpl::at_c<Events, i>::type>& vector = boost::get<i>(past_);
00327     ROS_ASSERT(!deque.empty());
00328     vector.push_back(deque.front());
00329     deque.pop_front();
00330     if (deque.empty())
00331     {
00332       --num_non_empty_deques_;
00333     }
00334   }
00335   // Assumes that deque number <index> is non empty
00336   void dequeMoveFrontToPast(uint32_t index)
00337   {
00338     switch (index)
00339     {
00340     case 0:
00341       dequeMoveFrontToPast<0>();
00342       break;
00343     case 1:
00344       dequeMoveFrontToPast<1>();
00345       break;
00346     case 2:
00347       dequeMoveFrontToPast<2>();
00348       break;
00349     case 3:
00350       dequeMoveFrontToPast<3>();
00351       break;
00352     case 4:
00353       dequeMoveFrontToPast<4>();
00354       break;
00355     case 5:
00356       dequeMoveFrontToPast<5>();
00357       break;
00358     case 6:
00359       dequeMoveFrontToPast<6>();
00360       break;
00361     case 7:
00362       dequeMoveFrontToPast<7>();
00363       break;
00364     case 8:
00365       dequeMoveFrontToPast<8>();
00366       break;
00367     default:
00368       ROS_BREAK();
00369     }
00370   }
00371 
00372   void makeCandidate()
00373   {
00374     //printf("Creating candidate\n");
00375     // Create candidate tuple
00376     candidate_ = Tuple(); // Discards old one if any
00377     boost::get<0>(candidate_) = boost::get<0>(deques_).front();
00378     boost::get<1>(candidate_) = boost::get<1>(deques_).front();
00379     if (RealTypeCount::value > 2)
00380     {
00381       boost::get<2>(candidate_) = boost::get<2>(deques_).front();
00382       if (RealTypeCount::value > 3)
00383       {
00384         boost::get<3>(candidate_) = boost::get<3>(deques_).front();
00385         if (RealTypeCount::value > 4)
00386         {
00387           boost::get<4>(candidate_) = boost::get<4>(deques_).front();
00388           if (RealTypeCount::value > 5)
00389           {
00390             boost::get<5>(candidate_) = boost::get<5>(deques_).front();
00391             if (RealTypeCount::value > 6)
00392             {
00393               boost::get<6>(candidate_) = boost::get<6>(deques_).front();
00394               if (RealTypeCount::value > 7)
00395               {
00396                 boost::get<7>(candidate_) = boost::get<7>(deques_).front();
00397                 if (RealTypeCount::value > 8)
00398                 {
00399                   boost::get<8>(candidate_) = boost::get<8>(deques_).front();
00400                 }
00401               }
00402             }
00403           }
00404         }
00405       }
00406     }
00407     // Delete all past messages, since we have found a better candidate
00408     boost::get<0>(past_).clear();
00409     boost::get<1>(past_).clear();
00410     boost::get<2>(past_).clear();
00411     boost::get<3>(past_).clear();
00412     boost::get<4>(past_).clear();
00413     boost::get<5>(past_).clear();
00414     boost::get<6>(past_).clear();
00415     boost::get<7>(past_).clear();
00416     boost::get<8>(past_).clear();
00417     //printf("Candidate created\n");
00418   }
00419 
00420 
00421   // ASSUMES: num_messages <= past_[i].size()
00422   template<int i>
00423   void recover(size_t num_messages)
00424   {
00425     if (i >= RealTypeCount::value)
00426     {
00427       return;
00428     }
00429 
00430     std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_);
00431     std::deque<typename mpl::at_c<Events, i>::type>& q = boost::get<i>(deques_);
00432     ROS_ASSERT(num_messages <= v.size());
00433     while (num_messages > 0)
00434     {
00435       q.push_front(v.back());
00436       v.pop_back();
00437       num_messages--;
00438     }
00439 
00440     if (!q.empty())
00441     {
00442       ++num_non_empty_deques_;
00443     }
00444   }
00445 
00446 
00447   template<int i>
00448   void recover()
00449   {
00450     if (i >= RealTypeCount::value)
00451     {
00452       return;
00453     }
00454 
00455     std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_);
00456     std::deque<typename mpl::at_c<Events, i>::type>& q = boost::get<i>(deques_);
00457     while (!v.empty())
00458     {
00459       q.push_front(v.back());
00460       v.pop_back();
00461     }
00462 
00463     if (!q.empty())
00464     {
00465       ++num_non_empty_deques_;
00466     }
00467   }
00468 
00469 
00470   template<int i>
00471   void recoverAndDelete()
00472   {
00473     if (i >= RealTypeCount::value)
00474     {
00475       return;
00476     }
00477 
00478     std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_);
00479     std::deque<typename mpl::at_c<Events, i>::type>& q = boost::get<i>(deques_);
00480     while (!v.empty())
00481     {
00482       q.push_front(v.back());
00483       v.pop_back();
00484     }
00485 
00486     ROS_ASSERT(!q.empty());
00487 
00488     q.pop_front();
00489     if (!q.empty())
00490     {
00491       ++num_non_empty_deques_;
00492     }
00493   }
00494 
00495   // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
00496   void publishCandidate()
00497   {
00498     //printf("Publishing candidate\n");
00499     // Publish
00500     parent_->signal(boost::get<0>(candidate_), boost::get<1>(candidate_), boost::get<2>(candidate_), boost::get<3>(candidate_),
00501                     boost::get<4>(candidate_), boost::get<5>(candidate_), boost::get<6>(candidate_), boost::get<7>(candidate_),
00502                     boost::get<8>(candidate_));
00503     // Delete this candidate
00504     candidate_ = Tuple();
00505     pivot_ = NO_PIVOT;
00506 
00507     // Recover hidden messages, and delete the ones corresponding to the candidate
00508     num_non_empty_deques_ = 0; // We will recompute it from scratch
00509     recoverAndDelete<0>();
00510     recoverAndDelete<1>();
00511     recoverAndDelete<2>();
00512     recoverAndDelete<3>();
00513     recoverAndDelete<4>();
00514     recoverAndDelete<5>();
00515     recoverAndDelete<6>();
00516     recoverAndDelete<7>();
00517     recoverAndDelete<8>();
00518   }
00519 
00520   // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
00521   // Returns: the oldest message on the deques
00522   void getCandidateStart(uint32_t &start_index, ros::Time &start_time)
00523   {
00524     return getCandidateBoundary(start_index, start_time, false);
00525   }
00526 
00527   // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
00528   // Returns: the latest message among the heads of the deques, i.e. the minimum
00529   //          time to end an interval started at getCandidateStart_index()
00530   void getCandidateEnd(uint32_t &end_index, ros::Time &end_time)
00531   {
00532     return getCandidateBoundary(end_index, end_time, true);
00533   }
00534 
00535   // ASSUMES: all deques are non-empty
00536   // end = true: look for the latest head of deque
00537   //       false: look for the earliest head of deque
00538   void getCandidateBoundary(uint32_t &index, ros::Time &time, bool end)
00539   {
00540     namespace mt = ros::message_traits;
00541 
00542     M0Event& m0 = boost::get<0>(deques_).front();
00543     time = mt::TimeStamp<M0>::value(*m0.getMessage());
00544     index = 0;
00545     if (RealTypeCount::value > 1)
00546     {
00547       M1Event& m1 = boost::get<1>(deques_).front();
00548       if ((mt::TimeStamp<M1>::value(*m1.getMessage()) < time) ^ end)
00549       {
00550         time = mt::TimeStamp<M1>::value(*m1.getMessage());
00551         index = 1;
00552       }
00553     }
00554     if (RealTypeCount::value > 2)
00555     {
00556       M2Event& m2 = boost::get<2>(deques_).front();
00557       if ((mt::TimeStamp<M2>::value(*m2.getMessage()) < time) ^ end)
00558       {
00559         time = mt::TimeStamp<M2>::value(*m2.getMessage());
00560         index = 2;
00561       }
00562     }
00563     if (RealTypeCount::value > 3)
00564     {
00565       M3Event& m3 = boost::get<3>(deques_).front();
00566       if ((mt::TimeStamp<M3>::value(*m3.getMessage()) < time) ^ end)
00567       {
00568         time = mt::TimeStamp<M3>::value(*m3.getMessage());
00569         index = 3;
00570       }
00571     }
00572     if (RealTypeCount::value > 4)
00573     {
00574       M4Event& m4 = boost::get<4>(deques_).front();
00575       if ((mt::TimeStamp<M4>::value(*m4.getMessage()) < time) ^ end)
00576       {
00577         time = mt::TimeStamp<M4>::value(*m4.getMessage());
00578         index = 4;
00579       }
00580     }
00581     if (RealTypeCount::value > 5)
00582     {
00583       M5Event& m5 = boost::get<5>(deques_).front();
00584       if ((mt::TimeStamp<M5>::value(*m5.getMessage()) < time) ^ end)
00585       {
00586         time = mt::TimeStamp<M5>::value(*m5.getMessage());
00587         index = 5;
00588       }
00589     }
00590     if (RealTypeCount::value > 6)
00591     {
00592       M6Event& m6 = boost::get<6>(deques_).front();
00593       if ((mt::TimeStamp<M6>::value(*m6.getMessage()) < time) ^ end)
00594       {
00595         time = mt::TimeStamp<M6>::value(*m6.getMessage());
00596         index = 6;
00597       }
00598     }
00599     if (RealTypeCount::value > 7)
00600     {
00601       M7Event& m7 = boost::get<7>(deques_).front();
00602       if ((mt::TimeStamp<M7>::value(*m7.getMessage()) < time) ^ end)
00603       {
00604         time = mt::TimeStamp<M7>::value(*m7.getMessage());
00605         index = 7;
00606       }
00607     }
00608     if (RealTypeCount::value > 8)
00609     {
00610       M8Event& m8 = boost::get<8>(deques_).front();
00611       if ((mt::TimeStamp<M8>::value(*m8.getMessage()) < time) ^ end)
00612       {
00613         time = mt::TimeStamp<M8>::value(*m8.getMessage());
00614         index = 8;
00615       }
00616     }
00617   }
00618 
00619 
00620   // ASSUMES: we have a pivot and candidate
00621   template<int i>
00622   ros::Time getVirtualTime()
00623   {
00624     namespace mt = ros::message_traits;
00625 
00626     if (i >= RealTypeCount::value)
00627     {
00628       return ros::Time(0,0);  // Dummy return value
00629     }
00630     ROS_ASSERT(pivot_ != NO_PIVOT);
00631 
00632     std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_);
00633     std::deque<typename mpl::at_c<Events, i>::type>& q = boost::get<i>(deques_);
00634     if (q.empty())
00635     {
00636       ROS_ASSERT(!v.empty());  // Because we have a candidate
00637       ros::Time last_msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(*(v.back()).getMessage());
00638       ros::Time msg_time_lower_bound = last_msg_time + inter_message_lower_bounds_[i];
00639       if (msg_time_lower_bound > pivot_time_)  // Take the max
00640       {
00641         return msg_time_lower_bound;
00642       }
00643       return pivot_time_;
00644     }
00645     ros::Time current_msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(*(q.front()).getMessage());
00646     return current_msg_time;
00647   }
00648 
00649 
00650   // ASSUMES: we have a pivot and candidate
00651   void getVirtualCandidateStart(uint32_t &start_index, ros::Time &start_time)
00652   {
00653     return getVirtualCandidateBoundary(start_index, start_time, false);
00654   }
00655 
00656   // ASSUMES: we have a pivot and candidate
00657   void getVirtualCandidateEnd(uint32_t &end_index, ros::Time &end_time)
00658   {
00659     return getVirtualCandidateBoundary(end_index, end_time, true);
00660   }
00661 
00662   // ASSUMES: we have a pivot and candidate
00663   // end = true: look for the latest head of deque
00664   //       false: look for the earliest head of deque
00665   void getVirtualCandidateBoundary(uint32_t &index, ros::Time &time, bool end)
00666   {
00667     namespace mt = ros::message_traits;
00668 
00669     std::vector<ros::Time> virtual_times(9);
00670     virtual_times[0] = getVirtualTime<0>();
00671     virtual_times[1] = getVirtualTime<1>();
00672     virtual_times[2] = getVirtualTime<2>();
00673     virtual_times[3] = getVirtualTime<3>();
00674     virtual_times[4] = getVirtualTime<4>();
00675     virtual_times[5] = getVirtualTime<5>();
00676     virtual_times[6] = getVirtualTime<6>();
00677     virtual_times[7] = getVirtualTime<7>();
00678     virtual_times[8] = getVirtualTime<8>();
00679  
00680     time = virtual_times[0];
00681     index = 0;
00682     for (int i = 0; i < RealTypeCount::value; i++)
00683     {
00684       if ((virtual_times[i] < time) ^ end)
00685       {
00686         time = virtual_times[i];
00687         index = i;
00688       }
00689     }
00690   }
00691 
00692 
00693   // assumes data_mutex_ is already locked
00694   void process()
00695   {
00696     // While no deque is empty
00697     while (num_non_empty_deques_ == (uint32_t)RealTypeCount::value)
00698     {
00699       // Find the start and end of the current interval
00700       //printf("Entering while loop in this state [\n");
00701       //show_internal_state();
00702       //printf("]\n");
00703       ros::Time end_time, start_time;
00704       uint32_t end_index, start_index;
00705       getCandidateEnd(end_index, end_time);
00706       getCandidateStart(start_index, start_time);
00707       for (uint32_t i = 0; i < (uint32_t)RealTypeCount::value; i++)
00708       {
00709         if (i != end_index)
00710         {
00711           // No dropped message could have been better to use than the ones we have,
00712           // so it becomes ok to use this topic as pivot in the future
00713           has_dropped_messages_[i] = false;
00714         }
00715       }
00716       if (pivot_ == NO_PIVOT)
00717       {
00718         // We do not have a candidate
00719         // INVARIANT: the past_ vectors are empty
00720         // INVARIANT: (candidate_ has no filled members)
00721         if (end_time - start_time > max_interval_duration_)
00722         {
00723           // This interval is too big to be a valid candidate, move to the next
00724           dequeDeleteFront(start_index);
00725           continue;
00726         }
00727         if (has_dropped_messages_[end_index])
00728         {
00729           // The topic that would become pivot has dropped messages, so it is not a good pivot
00730           dequeDeleteFront(start_index);
00731           continue;
00732         }
00733         // This is a valid candidate, and we don't have any, so take it
00734         makeCandidate();
00735         candidate_start_ = start_time;
00736         candidate_end_ = end_time;
00737         pivot_ = end_index;
00738         pivot_time_ = end_time;
00739         dequeMoveFrontToPast(start_index);
00740       }
00741       else
00742       {
00743         // We already have a candidate
00744         // Is this one better than the current candidate?
00745         // INVARIANT: has_dropped_messages_ is all false
00746         if ((end_time - candidate_end_) * (1 + age_penalty_) >= (start_time - candidate_start_))
00747         {
00748           // This is not a better candidate, move to the next
00749           dequeMoveFrontToPast(start_index);
00750         }
00751         else
00752         {
00753           // This is a better candidate
00754           makeCandidate();
00755           candidate_start_ = start_time;
00756           candidate_end_ = end_time;
00757           dequeMoveFrontToPast(start_index);
00758           // Keep the same pivot (and pivot time)
00759         }
00760       }
00761       // INVARIANT: we have a candidate and pivot
00762       ROS_ASSERT(pivot_ != NO_PIVOT);
00763       //printf("start_index == %d, pivot_ == %d\n", start_index, pivot_);
00764       if (start_index == pivot_)  // TODO: replace with start_time == pivot_time_
00765       {
00766         // We have exhausted all possible candidates for this pivot, we now can output the best one
00767         publishCandidate();
00768       }
00769       else if ((end_time - candidate_end_) * (1 + age_penalty_) >= (pivot_time_ - candidate_start_))
00770       {
00771         // We have not exhausted all candidates, but this candidate is already provably optimal
00772         // Indeed, any future candidate must contain the interval [pivot_time_ end_time], which
00773         // is already too big.
00774         // Note: this case is subsumed by the next, but it may save some unnecessary work and
00775         //       it makes things (a little) easier to understand
00776         publishCandidate();
00777       }
00778       else if (num_non_empty_deques_ < (uint32_t)RealTypeCount::value)
00779       {
00780         uint32_t num_non_empty_deques_before_virtual_search = num_non_empty_deques_;
00781 
00782         // Before giving up, use the rate bounds, if provided, to further try to prove optimality
00783         std::vector<int> num_virtual_moves(9,0);
00784         while (1)
00785         {
00786           ros::Time end_time, start_time;
00787           uint32_t end_index, start_index;
00788           getVirtualCandidateEnd(end_index, end_time);
00789           getVirtualCandidateStart(start_index, start_time);
00790           if ((end_time - candidate_end_) * (1 + age_penalty_) >= (pivot_time_ - candidate_start_))
00791           {
00792             // We have proved optimality
00793             // As above, any future candidate must contain the interval [pivot_time_ end_time], which
00794             // is already too big.
00795             publishCandidate();  // This cleans up the virtual moves as a byproduct
00796             break;  // From the while(1) loop only
00797           }
00798           if ((end_time - candidate_end_) * (1 + age_penalty_) < (start_time - candidate_start_))
00799           {
00800             // We cannot prove optimality
00801             // Indeed, we have a virtual (i.e. optimistic) candidate that is better than the current
00802             // candidate
00803             // Cleanup the virtual search:
00804             num_non_empty_deques_ = 0; // We will recompute it from scratch
00805             recover<0>(num_virtual_moves[0]);
00806             recover<1>(num_virtual_moves[1]);
00807             recover<2>(num_virtual_moves[2]);
00808             recover<3>(num_virtual_moves[3]);
00809             recover<4>(num_virtual_moves[4]);
00810             recover<5>(num_virtual_moves[5]);
00811             recover<6>(num_virtual_moves[6]);
00812             recover<7>(num_virtual_moves[7]);
00813             recover<8>(num_virtual_moves[8]);
00814             (void)num_non_empty_deques_before_virtual_search; // unused variable warning stopper
00815             ROS_ASSERT(num_non_empty_deques_before_virtual_search == num_non_empty_deques_);
00816             break;
00817           }
00818           // Note: we cannot reach this point with start_index == pivot_ since in that case we would
00819           //       have start_time == pivot_time, in which case the two tests above are the negation
00820           //       of each other, so that one must be true. Therefore the while loop always terminates.
00821           ROS_ASSERT(start_index != pivot_);
00822           ROS_ASSERT(start_time < pivot_time_);
00823           dequeMoveFrontToPast(start_index);
00824           num_virtual_moves[start_index]++;
00825         } // while(1)
00826       }
00827     } // while(num_non_empty_deques_ == (uint32_t)RealTypeCount::value)
00828   }
00829 
00830   Sync* parent_;
00831   uint32_t queue_size_;
00832 
00833   static const uint32_t NO_PIVOT = 9;  // Special value for the pivot indicating that no pivot has been selected
00834 
00835   DequeTuple deques_;
00836   uint32_t num_non_empty_deques_;
00837   VectorTuple past_;
00838   Tuple candidate_;  // NULL if there is no candidate, in which case there is no pivot.
00839   ros::Time candidate_start_;
00840   ros::Time candidate_end_;
00841   ros::Time pivot_time_;
00842   uint32_t pivot_;  // Equal to NO_PIVOT if there is no candidate
00843   boost::mutex data_mutex_;  // Protects all of the above
00844 
00845   ros::Duration max_interval_duration_; // TODO: initialize with a parameter
00846   double age_penalty_;
00847 
00848   std::vector<bool> has_dropped_messages_;
00849   std::vector<ros::Duration> inter_message_lower_bounds_;
00850   std::vector<bool> warned_about_incorrect_bound_;
00851 };
00852 
00853 } // namespace sync
00854 } // namespace message_filters
00855 
00856 #endif // MESSAGE_FILTERS_SYNC_APPROXIMATE_TIME_H
00857 


message_filters
Author(s): Josh Faust, Vijay Pradeep
autogenerated on Tue Mar 7 2017 03:45:14