approximate_time.h
Go to the documentation of this file.
00001 /*********************************************************************
00002 * Software License Agreement (BSD License)
00003 *
00004 *  Copyright (c) 2009, Willow Garage, Inc.
00005 *  All rights reserved.
00006 *
00007 *  Redistribution and use in source and binary forms, with or without
00008 *  modification, are permitted provided that the following conditions
00009 *  are met:
00010 *
00011 *   * Redistributions of source code must retain the above copyright
00012 *     notice, this list of conditions and the following disclaimer.
00013 *   * Redistributions in binary form must reproduce the above
00014 *     copyright notice, this list of conditions and the following
00015 *     disclaimer in the documentation and/or other materials provided
00016 *     with the distribution.
00017 *   * Neither the name of the Willow Garage nor the names of its
00018 *     contributors may be used to endorse or promote products derived
00019 *     from this software without specific prior written permission.
00020 *
00021 *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 *  POSSIBILITY OF SUCH DAMAGE.
00033 *********************************************************************/
00034 
00035 #ifndef MESSAGE_FILTERS_SYNC_APPROXIMATE_TIME_H
00036 #define MESSAGE_FILTERS_SYNC_APPROXIMATE_TIME_H
00037 
00038 #include "message_filters/synchronizer.h"
00039 #include "message_filters/connection.h"
00040 #include "message_filters/null_types.h"
00041 #include "message_filters/signal9.h"
00042 
00043 #include <boost/tuple/tuple.hpp>
00044 #include <boost/shared_ptr.hpp>
00045 #include <boost/function.hpp>
00046 #include <boost/thread/mutex.hpp>
00047 #include <boost/signals.hpp>
00048 #include <boost/bind.hpp>
00049 #include <boost/type_traits/is_same.hpp>
00050 #include <boost/noncopyable.hpp>
00051 #include <boost/mpl/or.hpp>
00052 #include <boost/mpl/at.hpp>
00053 #include <boost/mpl/vector.hpp>
00054 
00055 #include <ros/message_traits.h>
00056 #include <ros/message_event.h>
00057 
00058 #include <deque>
00059 #include <vector>
00060 #include <string>
00061 
00062 namespace message_filters
00063 {
00064 namespace sync_policies
00065 {
00066 
00067 namespace mpl = boost::mpl;
00068 
00069 template<typename M0, typename M1, typename M2 = NullType, typename M3 = NullType, typename M4 = NullType,
00070          typename M5 = NullType, typename M6 = NullType, typename M7 = NullType, typename M8 = NullType>
00071 struct ApproximateTime : public PolicyBase<M0, M1, M2, M3, M4, M5, M6, M7, M8>
00072 {
00073   typedef Synchronizer<ApproximateTime> Sync;
00074   typedef PolicyBase<M0, M1, M2, M3, M4, M5, M6, M7, M8> Super;
00075   typedef typename Super::Messages Messages;
00076   typedef typename Super::Signal Signal;
00077   typedef typename Super::Events Events;
00078   typedef typename Super::RealTypeCount RealTypeCount;
00079   typedef typename Super::M0Event M0Event;
00080   typedef typename Super::M1Event M1Event;
00081   typedef typename Super::M2Event M2Event;
00082   typedef typename Super::M3Event M3Event;
00083   typedef typename Super::M4Event M4Event;
00084   typedef typename Super::M5Event M5Event;
00085   typedef typename Super::M6Event M6Event;
00086   typedef typename Super::M7Event M7Event;
00087   typedef typename Super::M8Event M8Event;
00088   typedef std::deque<M0Event> M0Deque;
00089   typedef std::deque<M1Event> M1Deque;
00090   typedef std::deque<M2Event> M2Deque;
00091   typedef std::deque<M3Event> M3Deque;
00092   typedef std::deque<M4Event> M4Deque;
00093   typedef std::deque<M5Event> M5Deque;
00094   typedef std::deque<M6Event> M6Deque;
00095   typedef std::deque<M7Event> M7Deque;
00096   typedef std::deque<M8Event> M8Deque;
00097   typedef std::vector<M0Event> M0Vector;
00098   typedef std::vector<M1Event> M1Vector;
00099   typedef std::vector<M2Event> M2Vector;
00100   typedef std::vector<M3Event> M3Vector;
00101   typedef std::vector<M4Event> M4Vector;
00102   typedef std::vector<M5Event> M5Vector;
00103   typedef std::vector<M6Event> M6Vector;
00104   typedef std::vector<M7Event> M7Vector;
00105   typedef std::vector<M8Event> M8Vector;
00106   typedef boost::tuple<M0Event, M1Event, M2Event, M3Event, M4Event, M5Event, M6Event, M7Event, M8Event> Tuple;
00107   typedef boost::tuple<M0Deque, M1Deque, M2Deque, M3Deque, M4Deque, M5Deque, M6Deque, M7Deque, M8Deque> DequeTuple;
00108   typedef boost::tuple<M0Vector, M1Vector, M2Vector, M3Vector, M4Vector, M5Vector, M6Vector, M7Vector, M8Vector> VectorTuple;
00109 
00110   ApproximateTime(uint32_t queue_size)
00111   : parent_(0)
00112   , queue_size_(queue_size)
00113   , num_non_empty_deques_(0)
00114   , pivot_(NO_PIVOT)
00115   , max_interval_duration_(ros::DURATION_MAX)
00116   , age_penalty_(0.1)
00117   , has_dropped_messages_(9, false)
00118   , inter_message_lower_bounds_(9, ros::Duration(0))
00119   , warned_about_incorrect_bound_(9, false)
00120   {
00121     ROS_ASSERT(queue_size_ > 0);  // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
00122   }
00123 
00124   ApproximateTime(const ApproximateTime& e)
00125   {
00126     *this = e;
00127   }
00128 
00129   ApproximateTime& operator=(const ApproximateTime& rhs)
00130   {
00131     parent_ = rhs.parent_;
00132     queue_size_ = rhs.queue_size_;
00133     num_non_empty_deques_ = rhs.num_non_empty_deques_;
00134     pivot_time_ = rhs.pivot_time_;
00135     pivot_ = rhs.pivot_;
00136     max_interval_duration_ = rhs.max_interval_duration_;
00137     age_penalty_ = rhs.age_penalty_;
00138     candidate_start_ = rhs.candidate_start_;
00139     candidate_end_ = rhs.candidate_end_;
00140     deques_ = rhs.deques_;
00141     past_ = rhs.past_;
00142     has_dropped_messages_ = rhs.has_dropped_messages_;
00143     inter_message_lower_bounds_ = rhs.inter_message_lower_bounds_;
00144     warned_about_incorrect_bound_ = rhs.warned_about_incorrect_bound_;
00145 
00146     return *this;
00147   }
00148 
00149   void initParent(Sync* parent)
00150   {
00151     parent_ = parent;
00152   }
00153 
00154   template<int i>
00155   void checkInterMessageBound()
00156   {
00157     namespace mt = ros::message_traits;
00158     if (warned_about_incorrect_bound_[i])
00159     {
00160       return;
00161     }
00162     std::deque<typename mpl::at_c<Events, i>::type>& deque = boost::get<i>(deques_);
00163     std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_);
00164     ROS_ASSERT(!deque.empty());
00165     const typename mpl::at_c<Messages, i>::type &msg = *(deque.back()).getMessage();
00166     ros::Time msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(msg);
00167     ros::Time previous_msg_time;
00168     if (deque.size() == (size_t) 1)
00169     {
00170       if (v.empty())
00171       {
00172         // We have already published (or have never received) the previous message, we cannot check the bound
00173         return;
00174       }
00175       const typename mpl::at_c<Messages, i>::type &previous_msg = *(v.back()).getMessage();
00176       previous_msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(previous_msg);
00177     }
00178     else
00179     {
00180       // There are at least 2 elements in the deque. Check that the gap respects the bound if it was provided.
00181       const typename mpl::at_c<Messages, i>::type &previous_msg = *(deque[deque.size()-2]).getMessage();
00182       previous_msg_time =  mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(previous_msg);
00183     }
00184     if (msg_time < previous_msg_time)
00185     {
00186       ROS_WARN_STREAM("Messages of type " << i << " arrived out of order (will print only once)");
00187       warned_about_incorrect_bound_[i] = true;
00188     }
00189     else if ((msg_time - previous_msg_time) < inter_message_lower_bounds_[i])
00190     {
00191       ROS_WARN_STREAM("Messages of type " << i << " arrived closer (" << (msg_time - previous_msg_time)
00192                       << ") than the lower bound you provided (" << inter_message_lower_bounds_[i]
00193                       << ") (will print only once)");
00194       warned_about_incorrect_bound_[i] = true;
00195     }
00196   }
00197 
00198 
00199   template<int i>
00200   void add(const typename mpl::at_c<Events, i>::type& evt)
00201   {
00202     boost::mutex::scoped_lock lock(data_mutex_);
00203 
00204     std::deque<typename mpl::at_c<Events, i>::type>& deque = boost::get<i>(deques_);
00205     deque.push_back(evt);
00206     if (deque.size() == (size_t)1) {
00207       // We have just added the first message, so it was empty before
00208       ++num_non_empty_deques_;
00209       if (num_non_empty_deques_ == (uint32_t)RealTypeCount::value)
00210       {
00211         // All deques have messages
00212         process();
00213       }
00214     }
00215     else
00216     {
00217       checkInterMessageBound<i>();
00218     }
00219     // Check whether we have more messages than allowed in the queue.
00220     // Note that during the above call to process(), queue i may contain queue_size_+1 messages.
00221     std::vector<typename mpl::at_c<Events, i>::type>& past = boost::get<i>(past_);
00222     if (deque.size() + past.size() > queue_size_)
00223     {
00224       // Cancel ongoing candidate search, if any:
00225       num_non_empty_deques_ = 0; // We will recompute it from scratch
00226       recover<0>();
00227       recover<1>();
00228       recover<2>();
00229       recover<3>();
00230       recover<4>();
00231       recover<5>();
00232       recover<6>();
00233       recover<7>();
00234       recover<8>();
00235       // Drop the oldest message in the offending topic
00236       ROS_ASSERT(!deque.empty());
00237       deque.pop_front();
00238       has_dropped_messages_[i] = true;
00239       if (pivot_ != NO_PIVOT)
00240       {
00241         // The candidate is no longer valid. Destroy it.
00242         candidate_ = Tuple();
00243         pivot_ = NO_PIVOT;
00244         // There might still be enough messages to create a new candidate:
00245         process();
00246       }
00247     }
00248   }
00249 
00250   void setAgePenalty(double age_penalty)
00251   {
00252     // For correctness we only need age_penalty > -1.0, but most likely a negative age_penalty is a mistake.
00253     ROS_ASSERT(age_penalty >= 0);
00254     age_penalty_ = age_penalty;
00255   }
00256 
00257   void setInterMessageLowerBound(int i, ros::Duration lower_bound) {
00258     // For correctness we only need age_penalty > -1.0, but most likely a negative age_penalty is a mistake.
00259     ROS_ASSERT(lower_bound >= ros::Duration(0,0));
00260     inter_message_lower_bounds_[i] = lower_bound;
00261   }
00262 
00263   void setMaxIntervalDuration(ros::Duration max_interval_duration) {
00264     // For correctness we only need age_penalty > -1.0, but most likely a negative age_penalty is a mistake.
00265     ROS_ASSERT(max_interval_duration >= ros::Duration(0,0));
00266     max_interval_duration_ = max_interval_duration;
00267   }
00268 
00269 private:
00270   // Assumes that deque number <index> is non empty
00271   template<int i>
00272   void dequeDeleteFront()
00273   {
00274     std::deque<typename mpl::at_c<Events, i>::type>& deque = boost::get<i>(deques_);
00275     ROS_ASSERT(!deque.empty());
00276     deque.pop_front();
00277     if (deque.empty())
00278     {
00279       --num_non_empty_deques_;
00280     }
00281   }
00282 
00283   // Assumes that deque number <index> is non empty
00284   void dequeDeleteFront(uint32_t index)
00285   {
00286     switch (index)
00287     {
00288     case 0:
00289       dequeDeleteFront<0>();
00290       break;
00291     case 1:
00292       dequeDeleteFront<1>();
00293       break;
00294     case 2:
00295       dequeDeleteFront<2>();
00296       break;
00297     case 3:
00298       dequeDeleteFront<3>();
00299       break;
00300     case 4:
00301       dequeDeleteFront<4>();
00302       break;
00303     case 5:
00304       dequeDeleteFront<5>();
00305       break;
00306     case 6:
00307       dequeDeleteFront<6>();
00308       break;
00309     case 7:
00310       dequeDeleteFront<7>();
00311       break;
00312     case 8:
00313       dequeDeleteFront<8>();
00314       break;
00315     default:
00316       ROS_BREAK();
00317     }
00318   }
00319 
00320   // Assumes that deque number <index> is non empty
00321   template<int i>
00322   void dequeMoveFrontToPast()
00323   {
00324     std::deque<typename mpl::at_c<Events, i>::type>& deque = boost::get<i>(deques_);
00325     std::vector<typename mpl::at_c<Events, i>::type>& vector = boost::get<i>(past_);
00326     ROS_ASSERT(!deque.empty());
00327     vector.push_back(deque.front());
00328     deque.pop_front();
00329     if (deque.empty())
00330     {
00331       --num_non_empty_deques_;
00332     }
00333   }
00334   // Assumes that deque number <index> is non empty
00335   void dequeMoveFrontToPast(uint32_t index)
00336   {
00337     switch (index)
00338     {
00339     case 0:
00340       dequeMoveFrontToPast<0>();
00341       break;
00342     case 1:
00343       dequeMoveFrontToPast<1>();
00344       break;
00345     case 2:
00346       dequeMoveFrontToPast<2>();
00347       break;
00348     case 3:
00349       dequeMoveFrontToPast<3>();
00350       break;
00351     case 4:
00352       dequeMoveFrontToPast<4>();
00353       break;
00354     case 5:
00355       dequeMoveFrontToPast<5>();
00356       break;
00357     case 6:
00358       dequeMoveFrontToPast<6>();
00359       break;
00360     case 7:
00361       dequeMoveFrontToPast<7>();
00362       break;
00363     case 8:
00364       dequeMoveFrontToPast<8>();
00365       break;
00366     default:
00367       ROS_BREAK();
00368     }
00369   }
00370 
00371   void makeCandidate()
00372   {
00373     //printf("Creating candidate\n");
00374     // Create candidate tuple
00375     candidate_ = Tuple(); // Discards old one if any
00376     boost::get<0>(candidate_) = boost::get<0>(deques_).front();
00377     boost::get<1>(candidate_) = boost::get<1>(deques_).front();
00378     if (RealTypeCount::value > 2)
00379     {
00380       boost::get<2>(candidate_) = boost::get<2>(deques_).front();
00381       if (RealTypeCount::value > 3)
00382       {
00383         boost::get<3>(candidate_) = boost::get<3>(deques_).front();
00384         if (RealTypeCount::value > 4)
00385         {
00386           boost::get<4>(candidate_) = boost::get<4>(deques_).front();
00387           if (RealTypeCount::value > 5)
00388           {
00389             boost::get<5>(candidate_) = boost::get<5>(deques_).front();
00390             if (RealTypeCount::value > 6)
00391             {
00392               boost::get<6>(candidate_) = boost::get<6>(deques_).front();
00393               if (RealTypeCount::value > 7)
00394               {
00395                 boost::get<7>(candidate_) = boost::get<7>(deques_).front();
00396                 if (RealTypeCount::value > 8)
00397                 {
00398                   boost::get<8>(candidate_) = boost::get<8>(deques_).front();
00399                 }
00400               }
00401             }
00402           }
00403         }
00404       }
00405     }
00406     // Delete all past messages, since we have found a better candidate
00407     boost::get<0>(past_).clear();
00408     boost::get<1>(past_).clear();
00409     boost::get<2>(past_).clear();
00410     boost::get<3>(past_).clear();
00411     boost::get<4>(past_).clear();
00412     boost::get<5>(past_).clear();
00413     boost::get<6>(past_).clear();
00414     boost::get<7>(past_).clear();
00415     boost::get<8>(past_).clear();
00416     //printf("Candidate created\n");
00417   }
00418 
00419 
00420   // ASSUMES: num_messages <= past_[i].size()
00421   template<int i>
00422   void recover(size_t num_messages)
00423   {
00424     if (i >= RealTypeCount::value)
00425     {
00426       return;
00427     }
00428 
00429     std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_);
00430     std::deque<typename mpl::at_c<Events, i>::type>& q = boost::get<i>(deques_);
00431     ROS_ASSERT(num_messages <= v.size());
00432     while (num_messages > 0)
00433     {
00434       q.push_front(v.back());
00435       v.pop_back();
00436       num_messages--;
00437     }
00438 
00439     if (!q.empty())
00440     {
00441       ++num_non_empty_deques_;
00442     }
00443   }
00444 
00445 
00446   template<int i>
00447   void recover()
00448   {
00449     if (i >= RealTypeCount::value)
00450     {
00451       return;
00452     }
00453 
00454     std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_);
00455     std::deque<typename mpl::at_c<Events, i>::type>& q = boost::get<i>(deques_);
00456     while (!v.empty())
00457     {
00458       q.push_front(v.back());
00459       v.pop_back();
00460     }
00461 
00462     if (!q.empty())
00463     {
00464       ++num_non_empty_deques_;
00465     }
00466   }
00467 
00468 
00469   template<int i>
00470   void recoverAndDelete()
00471   {
00472     if (i >= RealTypeCount::value)
00473     {
00474       return;
00475     }
00476 
00477     std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_);
00478     std::deque<typename mpl::at_c<Events, i>::type>& q = boost::get<i>(deques_);
00479     while (!v.empty())
00480     {
00481       q.push_front(v.back());
00482       v.pop_back();
00483     }
00484 
00485     ROS_ASSERT(!q.empty());
00486 
00487     q.pop_front();
00488     if (!q.empty())
00489     {
00490       ++num_non_empty_deques_;
00491     }
00492   }
00493 
00494   // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
00495   void publishCandidate()
00496   {
00497     //printf("Publishing candidate\n");
00498     // Publish
00499     parent_->signal(boost::get<0>(candidate_), boost::get<1>(candidate_), boost::get<2>(candidate_), boost::get<3>(candidate_),
00500                     boost::get<4>(candidate_), boost::get<5>(candidate_), boost::get<6>(candidate_), boost::get<7>(candidate_),
00501                     boost::get<8>(candidate_));
00502     // Delete this candidate
00503     candidate_ = Tuple();
00504     pivot_ = NO_PIVOT;
00505 
00506     // Recover hidden messages, and delete the ones corresponding to the candidate
00507     num_non_empty_deques_ = 0; // We will recompute it from scratch
00508     recoverAndDelete<0>();
00509     recoverAndDelete<1>();
00510     recoverAndDelete<2>();
00511     recoverAndDelete<3>();
00512     recoverAndDelete<4>();
00513     recoverAndDelete<5>();
00514     recoverAndDelete<6>();
00515     recoverAndDelete<7>();
00516     recoverAndDelete<8>();
00517   }
00518 
00519   // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
00520   // Returns: the oldest message on the deques
00521   void getCandidateStart(uint32_t &start_index, ros::Time &start_time)
00522   {
00523     return getCandidateBoundary(start_index, start_time, false);
00524   }
00525 
00526   // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
00527   // Returns: the latest message among the heads of the deques, i.e. the minimum
00528   //          time to end an interval started at getCandidateStart_index()
00529   void getCandidateEnd(uint32_t &end_index, ros::Time &end_time)
00530   {
00531     return getCandidateBoundary(end_index, end_time, true);
00532   }
00533 
00534   // ASSUMES: all deques are non-empty
00535   // end = true: look for the latest head of deque
00536   //       false: look for the earliest head of deque
00537   void getCandidateBoundary(uint32_t &index, ros::Time &time, bool end)
00538   {
00539     namespace mt = ros::message_traits;
00540 
00541     M0Event& m0 = boost::get<0>(deques_).front();
00542     M1Event& m1 = boost::get<1>(deques_).front();
00543     M2Event& m2 = boost::get<2>(deques_).front();
00544     M3Event& m3 = boost::get<3>(deques_).front();
00545     M4Event& m4 = boost::get<4>(deques_).front();
00546     M5Event& m5 = boost::get<5>(deques_).front();
00547     M6Event& m6 = boost::get<6>(deques_).front();
00548     M7Event& m7 = boost::get<7>(deques_).front();
00549     M8Event& m8 = boost::get<8>(deques_).front();
00550     time = mt::TimeStamp<M0>::value(*m0.getMessage());
00551     index = 0;
00552     if ((RealTypeCount::value > 1) && ((mt::TimeStamp<M1>::value(*m1.getMessage()) < time) ^ end))
00553     {
00554       time = mt::TimeStamp<M1>::value(*m1.getMessage());
00555       index = 1;
00556     }
00557     if ((RealTypeCount::value > 2) && ((mt::TimeStamp<M2>::value(*m2.getMessage()) < time) ^ end))
00558     {
00559       time = mt::TimeStamp<M2>::value(*m2.getMessage());
00560       index = 2;
00561     }
00562     if ((RealTypeCount::value > 3) && ((mt::TimeStamp<M3>::value(*m3.getMessage()) < time) ^ end))
00563     {
00564       time = mt::TimeStamp<M3>::value(*m3.getMessage());
00565       index = 3;
00566     }
00567     if ((RealTypeCount::value > 4) && ((mt::TimeStamp<M4>::value(*m4.getMessage()) < time) ^ end))
00568     {
00569       time = mt::TimeStamp<M4>::value(*m4.getMessage());
00570       index = 4;
00571     }
00572     if ((RealTypeCount::value > 5) && ((mt::TimeStamp<M5>::value(*m5.getMessage()) < time) ^ end))
00573     {
00574       time = mt::TimeStamp<M5>::value(*m5.getMessage());
00575       index = 5;
00576     }
00577     if ((RealTypeCount::value > 6) && ((mt::TimeStamp<M6>::value(*m6.getMessage()) < time) ^ end))
00578     {
00579       time = mt::TimeStamp<M6>::value(*m6.getMessage());
00580       index = 6;
00581     }
00582     if ((RealTypeCount::value > 7) && ((mt::TimeStamp<M7>::value(*m7.getMessage()) < time) ^ end))
00583     {
00584       time = mt::TimeStamp<M7>::value(*m7.getMessage());
00585       index = 7;
00586     }
00587     if ((RealTypeCount::value > 8) && ((mt::TimeStamp<M8>::value(*m8.getMessage()) < time) ^ end))
00588     {
00589       time = mt::TimeStamp<M8>::value(*m8.getMessage());
00590       index = 8;
00591     }
00592   }
00593 
00594 
00595   // ASSUMES: we have a pivot and candidate
00596   template<int i>
00597   ros::Time getVirtualTime()
00598   {
00599     namespace mt = ros::message_traits;
00600 
00601     if (i >= RealTypeCount::value)
00602     {
00603       return ros::Time(0,0);  // Dummy return value
00604     }
00605     ROS_ASSERT(pivot_ != NO_PIVOT);
00606 
00607     std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_);
00608     std::deque<typename mpl::at_c<Events, i>::type>& q = boost::get<i>(deques_);
00609     if (q.empty())
00610     {
00611       ROS_ASSERT(!v.empty());  // Because we have a candidate
00612       ros::Time last_msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(*(v.back()).getMessage());
00613       ros::Time msg_time_lower_bound = last_msg_time + inter_message_lower_bounds_[i];
00614       if (msg_time_lower_bound > pivot_time_)  // Take the max
00615       {
00616         return msg_time_lower_bound;
00617       }
00618       return pivot_time_;
00619     }
00620     ros::Time current_msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(*(q.front()).getMessage());
00621     return current_msg_time;
00622   }
00623 
00624 
00625   // ASSUMES: we have a pivot and candidate
00626   void getVirtualCandidateStart(uint32_t &start_index, ros::Time &start_time)
00627   {
00628     return getVirtualCandidateBoundary(start_index, start_time, false);
00629   }
00630 
00631   // ASSUMES: we have a pivot and candidate
00632   void getVirtualCandidateEnd(uint32_t &end_index, ros::Time &end_time)
00633   {
00634     return getVirtualCandidateBoundary(end_index, end_time, true);
00635   }
00636 
00637   // ASSUMES: we have a pivot and candidate
00638   // end = true: look for the latest head of deque
00639   //       false: look for the earliest head of deque
00640   void getVirtualCandidateBoundary(uint32_t &index, ros::Time &time, bool end)
00641   {
00642     namespace mt = ros::message_traits;
00643 
00644     std::vector<ros::Time> virtual_times(9);
00645     virtual_times[0] = getVirtualTime<0>();
00646     virtual_times[1] = getVirtualTime<1>();
00647     virtual_times[2] = getVirtualTime<2>();
00648     virtual_times[3] = getVirtualTime<3>();
00649     virtual_times[4] = getVirtualTime<4>();
00650     virtual_times[5] = getVirtualTime<5>();
00651     virtual_times[6] = getVirtualTime<6>();
00652     virtual_times[7] = getVirtualTime<7>();
00653     virtual_times[8] = getVirtualTime<8>();
00654  
00655     time = virtual_times[0];
00656     index = 0;
00657     for (int i = 0; i < RealTypeCount::value; i++)
00658     {
00659       if ((virtual_times[i] < time) ^ end)
00660       {
00661         time = virtual_times[i];
00662         index = i;
00663       }
00664     }
00665   }
00666 
00667 
00668   // assumes data_mutex_ is already locked
00669   void process()
00670   {
00671     // While no deque is empty
00672     while (num_non_empty_deques_ == (uint32_t)RealTypeCount::value)
00673     {
00674       // Find the start and end of the current interval
00675       //printf("Entering while loop in this state [\n");
00676       //show_internal_state();
00677       //printf("]\n");
00678       ros::Time end_time, start_time;
00679       uint32_t end_index, start_index;
00680       getCandidateEnd(end_index, end_time);
00681       getCandidateStart(start_index, start_time);
00682       for (uint32_t i = 0; i < (uint32_t)RealTypeCount::value; i++)
00683       {
00684         if (i != end_index)
00685         {
00686           // No dropped message could have been better to use than the ones we have,
00687           // so it becomes ok to use this topic as pivot in the future
00688           has_dropped_messages_[i] = false;
00689         }
00690       }
00691       if (pivot_ == NO_PIVOT)
00692       {
00693         // We do not have a candidate
00694         // INVARIANT: the past_ vectors are empty
00695         // INVARIANT: (candidate_ has no filled members)
00696         if (end_time - start_time > max_interval_duration_)
00697         {
00698           // This interval is too big to be a valid candidate, move to the next
00699           dequeDeleteFront(start_index);
00700           continue;
00701         }
00702         if (has_dropped_messages_[end_index])
00703         {
00704           // The topic that would become pivot has dropped messages, so it is not a good pivot
00705           dequeDeleteFront(start_index);
00706           continue;
00707         }
00708         // This is a valid candidate, and we don't have any, so take it
00709         makeCandidate();
00710         candidate_start_ = start_time;
00711         candidate_end_ = end_time;
00712         pivot_ = end_index;
00713         pivot_time_ = end_time;
00714         dequeMoveFrontToPast(start_index);
00715       }
00716       else
00717       {
00718         // We already have a candidate
00719         // Is this one better than the current candidate?
00720         // INVARIANT: has_dropped_messages_ is all false
00721         if ((end_time - candidate_end_) * (1 + age_penalty_) >= (start_time - candidate_start_))
00722         {
00723           // This is not a better candidate, move to the next
00724           dequeMoveFrontToPast(start_index);
00725         }
00726         else
00727         {
00728           // This is a better candidate
00729           makeCandidate();
00730           candidate_start_ = start_time;
00731           candidate_end_ = end_time;
00732           dequeMoveFrontToPast(start_index);
00733           // Keep the same pivot (and pivot time)
00734         }
00735       }
00736       // INVARIANT: we have a candidate and pivot
00737       ROS_ASSERT(pivot_ != NO_PIVOT);
00738       //printf("start_index == %d, pivot_ == %d\n", start_index, pivot_);
00739       if (start_index == pivot_)  // TODO: replace with start_time == pivot_time_
00740       {
00741         // We have exhausted all possible candidates for this pivot, we now can output the best one
00742         publishCandidate();
00743       }
00744       else if ((end_time - candidate_end_) * (1 + age_penalty_) >= (pivot_time_ - candidate_start_))
00745       {
00746         // We have not exhausted all candidates, but this candidate is already provably optimal
00747         // Indeed, any future candidate must contain the interval [pivot_time_ end_time], which
00748         // is already too big.
00749         // Note: this case is subsumed by the next, but it may save some unnecessary work and
00750         //       it makes things (a little) easier to understand
00751         publishCandidate();
00752       }
00753       else if (num_non_empty_deques_ < (uint32_t)RealTypeCount::value)
00754       {
00755         uint32_t num_non_empty_deques_before_virtual_search = num_non_empty_deques_;
00756 
00757         // Before giving up, use the rate bounds, if provided, to further try to prove optimality
00758         std::vector<int> num_virtual_moves(9,0);
00759         while (1)
00760         {
00761           ros::Time end_time, start_time;
00762           uint32_t end_index, start_index;
00763           getVirtualCandidateEnd(end_index, end_time);
00764           getVirtualCandidateStart(start_index, start_time);
00765           if ((end_time - candidate_end_) * (1 + age_penalty_) >= (pivot_time_ - candidate_start_))
00766           {
00767             // We have proved optimality
00768             // As above, any future candidate must contain the interval [pivot_time_ end_time], which
00769             // is already too big.
00770             publishCandidate();  // This cleans up the virtual moves as a byproduct
00771             break;  // From the while(1) loop only
00772           }
00773           if ((end_time - candidate_end_) * (1 + age_penalty_) < (start_time - candidate_start_))
00774           {
00775             // We cannot prove optimality
00776             // Indeed, we have a virtual (i.e. optimistic) candidate that is better than the current
00777             // candidate
00778             // Cleanup the virtual search:
00779             num_non_empty_deques_ = 0; // We will recompute it from scratch
00780             recover<0>(num_virtual_moves[0]);
00781             recover<1>(num_virtual_moves[1]);
00782             recover<2>(num_virtual_moves[2]);
00783             recover<3>(num_virtual_moves[3]);
00784             recover<4>(num_virtual_moves[4]);
00785             recover<5>(num_virtual_moves[5]);
00786             recover<6>(num_virtual_moves[6]);
00787             recover<7>(num_virtual_moves[7]);
00788             recover<8>(num_virtual_moves[8]);
00789             (void)num_non_empty_deques_before_virtual_search; // unused variable warning stopper
00790             ROS_ASSERT(num_non_empty_deques_before_virtual_search == num_non_empty_deques_);
00791             break;
00792           }
00793           // Note: we cannot reach this point with start_index == pivot_ since in that case we would
00794           //       have start_time == pivot_time, in which case the two tests above are the negation
00795           //       of each other, so that one must be true. Therefore the while loop always terminates.
00796           ROS_ASSERT(start_index != pivot_);
00797           ROS_ASSERT(start_time < pivot_time_);
00798           dequeMoveFrontToPast(start_index);
00799           num_virtual_moves[start_index]++;
00800         } // while(1)
00801       }
00802     } // while(num_non_empty_deques_ == (uint32_t)RealTypeCount::value)
00803   }
00804 
00805   Sync* parent_;
00806   uint32_t queue_size_;
00807 
00808   static const uint32_t NO_PIVOT = 9;  // Special value for the pivot indicating that no pivot has been selected
00809 
00810   DequeTuple deques_;
00811   uint32_t num_non_empty_deques_;
00812   VectorTuple past_;
00813   Tuple candidate_;  // NULL if there is no candidate, in which case there is no pivot.
00814   ros::Time candidate_start_;
00815   ros::Time candidate_end_;
00816   ros::Time pivot_time_;
00817   uint32_t pivot_;  // Equal to NO_PIVOT if there is no candidate
00818   boost::mutex data_mutex_;  // Protects all of the above
00819 
00820   ros::Duration max_interval_duration_; // TODO: initialize with a parameter
00821   double age_penalty_;
00822 
00823   std::vector<bool> has_dropped_messages_;
00824   std::vector<ros::Duration> inter_message_lower_bounds_;
00825   std::vector<bool> warned_about_incorrect_bound_;
00826 };
00827 
00828 } // namespace sync
00829 } // namespace message_filters
00830 
00831 #endif // MESSAGE_FILTERS_SYNC_APPROXIMATE_TIME_H
00832 


message_filters
Author(s): Josh Faust, Vijay Pradeep
autogenerated on Mon Oct 6 2014 11:47:35