$search
00001 /********************************************************************* 00002 * Software License Agreement (BSD License) 00003 * 00004 * Copyright (c) 2009, Willow Garage, Inc. 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions 00009 * are met: 00010 * 00011 * * Redistributions of source code must retain the above copyright 00012 * notice, this list of conditions and the following disclaimer. 00013 * * Redistributions in binary form must reproduce the above 00014 * copyright notice, this list of conditions and the following 00015 * disclaimer in the documentation and/or other materials provided 00016 * with the distribution. 00017 * * Neither the name of the Willow Garage nor the names of its 00018 * contributors may be used to endorse or promote products derived 00019 * from this software without specific prior written permission. 00020 * 00021 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 * POSSIBILITY OF SUCH DAMAGE. 00033 *********************************************************************/ 00034 00035 #ifndef MESSAGE_FILTERS_SYNC_APPROXIMATE_TIME_H 00036 #define MESSAGE_FILTERS_SYNC_APPROXIMATE_TIME_H 00037 00038 #include "message_filters/synchronizer.h" 00039 #include "message_filters/connection.h" 00040 #include "message_filters/null_types.h" 00041 #include "message_filters/signal9.h" 00042 00043 #include <boost/tuple/tuple.hpp> 00044 #include <boost/shared_ptr.hpp> 00045 #include <boost/function.hpp> 00046 #include <boost/thread/mutex.hpp> 00047 #include <boost/signals.hpp> 00048 #include <boost/bind.hpp> 00049 #include <boost/type_traits/is_same.hpp> 00050 #include <boost/noncopyable.hpp> 00051 #include <boost/mpl/or.hpp> 00052 #include <boost/mpl/at.hpp> 00053 #include <boost/mpl/vector.hpp> 00054 00055 #include <ros/message_traits.h> 00056 #include <ros/message_event.h> 00057 00058 #include <deque> 00059 #include <vector> 00060 #include <string> 00061 00062 namespace message_filters 00063 { 00064 namespace sync_policies 00065 { 00066 00067 namespace mpl = boost::mpl; 00068 00069 template<typename M0, typename M1, typename M2 = NullType, typename M3 = NullType, typename M4 = NullType, 00070 typename M5 = NullType, typename M6 = NullType, typename M7 = NullType, typename M8 = NullType> 00071 struct ApproximateTime : public PolicyBase<M0, M1, M2, M3, M4, M5, M6, M7, M8> 00072 { 00073 typedef Synchronizer<ApproximateTime> Sync; 00074 typedef PolicyBase<M0, M1, M2, M3, M4, M5, M6, M7, M8> Super; 00075 typedef typename Super::Messages Messages; 00076 typedef typename Super::Signal Signal; 00077 typedef typename Super::Events Events; 00078 typedef typename Super::RealTypeCount RealTypeCount; 00079 typedef typename Super::M0Event M0Event; 00080 typedef typename Super::M1Event M1Event; 00081 typedef typename Super::M2Event M2Event; 00082 typedef typename Super::M3Event M3Event; 00083 typedef typename Super::M4Event M4Event; 00084 typedef typename Super::M5Event M5Event; 00085 typedef typename Super::M6Event M6Event; 00086 typedef typename Super::M7Event M7Event; 00087 typedef typename Super::M8Event M8Event; 00088 typedef std::deque<M0Event> M0Deque; 00089 typedef std::deque<M1Event> M1Deque; 00090 typedef std::deque<M2Event> M2Deque; 00091 typedef std::deque<M3Event> M3Deque; 00092 typedef std::deque<M4Event> M4Deque; 00093 typedef std::deque<M5Event> M5Deque; 00094 typedef std::deque<M6Event> M6Deque; 00095 typedef std::deque<M7Event> M7Deque; 00096 typedef std::deque<M8Event> M8Deque; 00097 typedef std::vector<M0Event> M0Vector; 00098 typedef std::vector<M1Event> M1Vector; 00099 typedef std::vector<M2Event> M2Vector; 00100 typedef std::vector<M3Event> M3Vector; 00101 typedef std::vector<M4Event> M4Vector; 00102 typedef std::vector<M5Event> M5Vector; 00103 typedef std::vector<M6Event> M6Vector; 00104 typedef std::vector<M7Event> M7Vector; 00105 typedef std::vector<M8Event> M8Vector; 00106 typedef boost::tuple<M0Event, M1Event, M2Event, M3Event, M4Event, M5Event, M6Event, M7Event, M8Event> Tuple; 00107 typedef boost::tuple<M0Deque, M1Deque, M2Deque, M3Deque, M4Deque, M5Deque, M6Deque, M7Deque, M8Deque> DequeTuple; 00108 typedef boost::tuple<M0Vector, M1Vector, M2Vector, M3Vector, M4Vector, M5Vector, M6Vector, M7Vector, M8Vector> VectorTuple; 00109 00110 ApproximateTime(uint32_t queue_size) 00111 : parent_(0) 00112 , queue_size_(queue_size) 00113 , num_non_empty_deques_(0) 00114 , pivot_(NO_PIVOT) 00115 , max_interval_duration_(ros::DURATION_MAX) 00116 , age_penalty_(0.1) 00117 , has_dropped_messages_(9, false) 00118 , inter_message_lower_bounds_(9, ros::Duration(0)) 00119 , warned_about_incorrect_bound_(9, false) 00120 { 00121 ROS_ASSERT(queue_size_ > 0); // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended. 00122 } 00123 00124 ApproximateTime(const ApproximateTime& e) 00125 { 00126 *this = e; 00127 } 00128 00129 ApproximateTime& operator=(const ApproximateTime& rhs) 00130 { 00131 parent_ = rhs.parent_; 00132 queue_size_ = rhs.queue_size_; 00133 num_non_empty_deques_ = rhs.num_non_empty_deques_; 00134 pivot_time_ = rhs.pivot_time_; 00135 pivot_ = rhs.pivot_; 00136 max_interval_duration_ = rhs.max_interval_duration_; 00137 age_penalty_ = rhs.age_penalty_; 00138 candidate_start_ = rhs.candidate_start_; 00139 candidate_end_ = rhs.candidate_end_; 00140 deques_ = rhs.deques_; 00141 past_ = rhs.past_; 00142 has_dropped_messages_ = rhs.has_dropped_messages_; 00143 inter_message_lower_bounds_ = rhs.inter_message_lower_bounds_; 00144 warned_about_incorrect_bound_ = rhs.warned_about_incorrect_bound_; 00145 00146 return *this; 00147 } 00148 00149 void initParent(Sync* parent) 00150 { 00151 parent_ = parent; 00152 } 00153 00154 template<int i> 00155 void checkInterMessageBound() 00156 { 00157 namespace mt = ros::message_traits; 00158 if (warned_about_incorrect_bound_[i]) 00159 { 00160 return; 00161 } 00162 std::deque<typename mpl::at_c<Events, i>::type>& deque = boost::get<i>(deques_); 00163 std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_); 00164 ROS_ASSERT(!deque.empty()); 00165 const typename mpl::at_c<Messages, i>::type &msg = *(deque.back()).getMessage(); 00166 ros::Time msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(msg); 00167 ros::Time previous_msg_time; 00168 if (deque.size() == (size_t) 1) 00169 { 00170 if (v.empty()) 00171 { 00172 // We have already published (or have never received) the previous message, we cannot check the bound 00173 return; 00174 } 00175 const typename mpl::at_c<Messages, i>::type &previous_msg = *(v.back()).getMessage(); 00176 previous_msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(previous_msg); 00177 } 00178 else 00179 { 00180 // There are at least 2 elements in the deque. Check that the gap respects the bound if it was provided. 00181 const typename mpl::at_c<Messages, i>::type &previous_msg = *(deque[deque.size()-2]).getMessage(); 00182 previous_msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(previous_msg); 00183 } 00184 if (msg_time < previous_msg_time) 00185 { 00186 ROS_WARN_STREAM("Messages of type " << i << " arrived out of order (will print only once)"); 00187 warned_about_incorrect_bound_[i] = true; 00188 } 00189 else if ((msg_time - previous_msg_time) < inter_message_lower_bounds_[i]) 00190 { 00191 ROS_WARN_STREAM("Messages of type " << i << " arrived closer (" << (msg_time - previous_msg_time) 00192 << ") than the lower bound you provided (" << inter_message_lower_bounds_[i] 00193 << ") (will print only once)"); 00194 warned_about_incorrect_bound_[i] = true; 00195 } 00196 } 00197 00198 00199 template<int i> 00200 void add(const typename mpl::at_c<Events, i>::type& evt) 00201 { 00202 boost::mutex::scoped_lock lock(data_mutex_); 00203 00204 std::deque<typename mpl::at_c<Events, i>::type>& deque = boost::get<i>(deques_); 00205 deque.push_back(evt); 00206 if (deque.size() == (size_t)1) { 00207 // We have just added the first message, so it was empty before 00208 ++num_non_empty_deques_; 00209 if (num_non_empty_deques_ == (uint32_t)RealTypeCount::value) 00210 { 00211 // All deques have messages 00212 process(); 00213 } 00214 } 00215 else 00216 { 00217 checkInterMessageBound<i>(); 00218 } 00219 // Check whether we have more messages than allowed in the queue. 00220 // Note that during the above call to process(), queue i may contain queue_size_+1 messages. 00221 std::vector<typename mpl::at_c<Events, i>::type>& past = boost::get<i>(past_); 00222 if (deque.size() + past.size() > queue_size_) 00223 { 00224 // Cancel ongoing candidate search, if any: 00225 num_non_empty_deques_ = 0; // We will recompute it from scratch 00226 recover<0>(); 00227 recover<1>(); 00228 recover<2>(); 00229 recover<3>(); 00230 recover<4>(); 00231 recover<5>(); 00232 recover<6>(); 00233 recover<7>(); 00234 recover<8>(); 00235 // Drop the oldest message in the offending topic 00236 ROS_ASSERT(!deque.empty()); 00237 deque.pop_front(); 00238 has_dropped_messages_[i] = true; 00239 if (pivot_ != NO_PIVOT) 00240 { 00241 // The candidate is no longer valid. Destroy it. 00242 candidate_ = Tuple(); 00243 pivot_ = NO_PIVOT; 00244 // There might still be enough messages to create a new candidate: 00245 process(); 00246 } 00247 } 00248 } 00249 00250 void setAgePenalty(double age_penalty) 00251 { 00252 // For correctness we only need age_penalty > -1.0, but most likely a negative age_penalty is a mistake. 00253 ROS_ASSERT(age_penalty >= 0); 00254 age_penalty_ = age_penalty; 00255 } 00256 00257 void setInterMessageLowerBound(int i, ros::Duration lower_bound) { 00258 // For correctness we only need age_penalty > -1.0, but most likely a negative age_penalty is a mistake. 00259 ROS_ASSERT(lower_bound >= ros::Duration(0,0)); 00260 inter_message_lower_bounds_[i] = lower_bound; 00261 } 00262 00263 void setMaxIntervalDuration(ros::Duration max_interval_duration) { 00264 // For correctness we only need age_penalty > -1.0, but most likely a negative age_penalty is a mistake. 00265 ROS_ASSERT(max_interval_duration >= ros::Duration(0,0)); 00266 max_interval_duration_ = max_interval_duration; 00267 } 00268 00269 private: 00270 // Assumes that deque number <index> is non empty 00271 template<int i> 00272 void dequeDeleteFront() 00273 { 00274 std::deque<typename mpl::at_c<Events, i>::type>& deque = boost::get<i>(deques_); 00275 ROS_ASSERT(!deque.empty()); 00276 deque.pop_front(); 00277 if (deque.empty()) 00278 { 00279 --num_non_empty_deques_; 00280 } 00281 } 00282 00283 // Assumes that deque number <index> is non empty 00284 void dequeDeleteFront(uint32_t index) 00285 { 00286 switch (index) 00287 { 00288 case 0: 00289 dequeDeleteFront<0>(); 00290 break; 00291 case 1: 00292 dequeDeleteFront<1>(); 00293 break; 00294 case 2: 00295 dequeDeleteFront<2>(); 00296 break; 00297 case 3: 00298 dequeDeleteFront<3>(); 00299 break; 00300 case 4: 00301 dequeDeleteFront<4>(); 00302 break; 00303 case 5: 00304 dequeDeleteFront<5>(); 00305 break; 00306 case 6: 00307 dequeDeleteFront<6>(); 00308 break; 00309 case 7: 00310 dequeDeleteFront<7>(); 00311 break; 00312 case 8: 00313 dequeDeleteFront<8>(); 00314 break; 00315 default: 00316 ROS_BREAK(); 00317 } 00318 } 00319 00320 // Assumes that deque number <index> is non empty 00321 template<int i> 00322 void dequeMoveFrontToPast() 00323 { 00324 std::deque<typename mpl::at_c<Events, i>::type>& deque = boost::get<i>(deques_); 00325 std::vector<typename mpl::at_c<Events, i>::type>& vector = boost::get<i>(past_); 00326 ROS_ASSERT(!deque.empty()); 00327 vector.push_back(deque.front()); 00328 deque.pop_front(); 00329 if (deque.empty()) 00330 { 00331 --num_non_empty_deques_; 00332 } 00333 } 00334 // Assumes that deque number <index> is non empty 00335 void dequeMoveFrontToPast(uint32_t index) 00336 { 00337 switch (index) 00338 { 00339 case 0: 00340 dequeMoveFrontToPast<0>(); 00341 break; 00342 case 1: 00343 dequeMoveFrontToPast<1>(); 00344 break; 00345 case 2: 00346 dequeMoveFrontToPast<2>(); 00347 break; 00348 case 3: 00349 dequeMoveFrontToPast<3>(); 00350 break; 00351 case 4: 00352 dequeMoveFrontToPast<4>(); 00353 break; 00354 case 5: 00355 dequeMoveFrontToPast<5>(); 00356 break; 00357 case 6: 00358 dequeMoveFrontToPast<6>(); 00359 break; 00360 case 7: 00361 dequeMoveFrontToPast<7>(); 00362 break; 00363 case 8: 00364 dequeMoveFrontToPast<8>(); 00365 break; 00366 default: 00367 ROS_BREAK(); 00368 } 00369 } 00370 00371 void makeCandidate() 00372 { 00373 //printf("Creating candidate\n"); 00374 // Create candidate tuple 00375 candidate_ = Tuple(); // Discards old one if any 00376 boost::get<0>(candidate_) = boost::get<0>(deques_).front(); 00377 boost::get<1>(candidate_) = boost::get<1>(deques_).front(); 00378 if (RealTypeCount::value > 2) 00379 { 00380 boost::get<2>(candidate_) = boost::get<2>(deques_).front(); 00381 if (RealTypeCount::value > 3) 00382 { 00383 boost::get<3>(candidate_) = boost::get<3>(deques_).front(); 00384 if (RealTypeCount::value > 4) 00385 { 00386 boost::get<4>(candidate_) = boost::get<4>(deques_).front(); 00387 if (RealTypeCount::value > 5) 00388 { 00389 boost::get<5>(candidate_) = boost::get<5>(deques_).front(); 00390 if (RealTypeCount::value > 6) 00391 { 00392 boost::get<6>(candidate_) = boost::get<6>(deques_).front(); 00393 if (RealTypeCount::value > 7) 00394 { 00395 boost::get<7>(candidate_) = boost::get<7>(deques_).front(); 00396 if (RealTypeCount::value > 8) 00397 { 00398 boost::get<8>(candidate_) = boost::get<8>(deques_).front(); 00399 } 00400 } 00401 } 00402 } 00403 } 00404 } 00405 } 00406 // Delete all past messages, since we have found a better candidate 00407 boost::get<0>(past_).clear(); 00408 boost::get<1>(past_).clear(); 00409 boost::get<2>(past_).clear(); 00410 boost::get<3>(past_).clear(); 00411 boost::get<4>(past_).clear(); 00412 boost::get<5>(past_).clear(); 00413 boost::get<6>(past_).clear(); 00414 boost::get<7>(past_).clear(); 00415 boost::get<8>(past_).clear(); 00416 //printf("Candidate created\n"); 00417 } 00418 00419 00420 // ASSUMES: num_messages <= past_[i].size() 00421 template<int i> 00422 void recover(size_t num_messages) 00423 { 00424 if (i >= RealTypeCount::value) 00425 { 00426 return; 00427 } 00428 00429 std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_); 00430 std::deque<typename mpl::at_c<Events, i>::type>& q = boost::get<i>(deques_); 00431 ROS_ASSERT(num_messages <= v.size()); 00432 while (num_messages > 0) 00433 { 00434 q.push_front(v.back()); 00435 v.pop_back(); 00436 num_messages--; 00437 } 00438 00439 if (!q.empty()) 00440 { 00441 ++num_non_empty_deques_; 00442 } 00443 } 00444 00445 00446 template<int i> 00447 void recover() 00448 { 00449 if (i >= RealTypeCount::value) 00450 { 00451 return; 00452 } 00453 00454 std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_); 00455 std::deque<typename mpl::at_c<Events, i>::type>& q = boost::get<i>(deques_); 00456 while (!v.empty()) 00457 { 00458 q.push_front(v.back()); 00459 v.pop_back(); 00460 } 00461 00462 if (!q.empty()) 00463 { 00464 ++num_non_empty_deques_; 00465 } 00466 } 00467 00468 00469 template<int i> 00470 void recoverAndDelete() 00471 { 00472 if (i >= RealTypeCount::value) 00473 { 00474 return; 00475 } 00476 00477 std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_); 00478 std::deque<typename mpl::at_c<Events, i>::type>& q = boost::get<i>(deques_); 00479 while (!v.empty()) 00480 { 00481 q.push_front(v.back()); 00482 v.pop_back(); 00483 } 00484 00485 ROS_ASSERT(!q.empty()); 00486 00487 q.pop_front(); 00488 if (!q.empty()) 00489 { 00490 ++num_non_empty_deques_; 00491 } 00492 } 00493 00494 // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value 00495 void publishCandidate() 00496 { 00497 //printf("Publishing candidate\n"); 00498 // Publish 00499 parent_->signal(boost::get<0>(candidate_), boost::get<1>(candidate_), boost::get<2>(candidate_), boost::get<3>(candidate_), 00500 boost::get<4>(candidate_), boost::get<5>(candidate_), boost::get<6>(candidate_), boost::get<7>(candidate_), 00501 boost::get<8>(candidate_)); 00502 // Delete this candidate 00503 candidate_ = Tuple(); 00504 pivot_ = NO_PIVOT; 00505 00506 // Recover hidden messages, and delete the ones corresponding to the candidate 00507 num_non_empty_deques_ = 0; // We will recompute it from scratch 00508 recoverAndDelete<0>(); 00509 recoverAndDelete<1>(); 00510 recoverAndDelete<2>(); 00511 recoverAndDelete<3>(); 00512 recoverAndDelete<4>(); 00513 recoverAndDelete<5>(); 00514 recoverAndDelete<6>(); 00515 recoverAndDelete<7>(); 00516 recoverAndDelete<8>(); 00517 } 00518 00519 // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value 00520 // Returns: the oldest message on the deques 00521 void getCandidateStart(uint32_t &start_index, ros::Time &start_time) 00522 { 00523 return getCandidateBoundary(start_index, start_time, false); 00524 } 00525 00526 // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value 00527 // Returns: the latest message among the heads of the deques, i.e. the minimum 00528 // time to end an interval started at getCandidateStart_index() 00529 void getCandidateEnd(uint32_t &end_index, ros::Time &end_time) 00530 { 00531 return getCandidateBoundary(end_index, end_time, true); 00532 } 00533 00534 // ASSUMES: all deques are non-empty 00535 // end = true: look for the latest head of deque 00536 // false: look for the earliest head of deque 00537 void getCandidateBoundary(uint32_t &index, ros::Time &time, bool end) 00538 { 00539 namespace mt = ros::message_traits; 00540 00541 M0Event& m0 = boost::get<0>(deques_).front(); 00542 M1Event& m1 = boost::get<1>(deques_).front(); 00543 M2Event& m2 = boost::get<2>(deques_).front(); 00544 M3Event& m3 = boost::get<3>(deques_).front(); 00545 M4Event& m4 = boost::get<4>(deques_).front(); 00546 M5Event& m5 = boost::get<5>(deques_).front(); 00547 M6Event& m6 = boost::get<6>(deques_).front(); 00548 M7Event& m7 = boost::get<7>(deques_).front(); 00549 M8Event& m8 = boost::get<8>(deques_).front(); 00550 time = mt::TimeStamp<M0>::value(*m0.getMessage()); 00551 index = 0; 00552 if ((RealTypeCount::value > 1) && ((mt::TimeStamp<M1>::value(*m1.getMessage()) < time) ^ end)) 00553 { 00554 time = mt::TimeStamp<M1>::value(*m1.getMessage()); 00555 index = 1; 00556 } 00557 if ((RealTypeCount::value > 2) && ((mt::TimeStamp<M2>::value(*m2.getMessage()) < time) ^ end)) 00558 { 00559 time = mt::TimeStamp<M2>::value(*m2.getMessage()); 00560 index = 2; 00561 } 00562 if ((RealTypeCount::value > 3) && ((mt::TimeStamp<M3>::value(*m3.getMessage()) < time) ^ end)) 00563 { 00564 time = mt::TimeStamp<M3>::value(*m3.getMessage()); 00565 index = 3; 00566 } 00567 if ((RealTypeCount::value > 4) && ((mt::TimeStamp<M4>::value(*m4.getMessage()) < time) ^ end)) 00568 { 00569 time = mt::TimeStamp<M4>::value(*m4.getMessage()); 00570 index = 4; 00571 } 00572 if ((RealTypeCount::value > 5) && ((mt::TimeStamp<M5>::value(*m5.getMessage()) < time) ^ end)) 00573 { 00574 time = mt::TimeStamp<M5>::value(*m5.getMessage()); 00575 index = 5; 00576 } 00577 if ((RealTypeCount::value > 6) && ((mt::TimeStamp<M6>::value(*m6.getMessage()) < time) ^ end)) 00578 { 00579 time = mt::TimeStamp<M6>::value(*m6.getMessage()); 00580 index = 6; 00581 } 00582 if ((RealTypeCount::value > 7) && ((mt::TimeStamp<M7>::value(*m7.getMessage()) < time) ^ end)) 00583 { 00584 time = mt::TimeStamp<M7>::value(*m7.getMessage()); 00585 index = 7; 00586 } 00587 if ((RealTypeCount::value > 8) && ((mt::TimeStamp<M8>::value(*m8.getMessage()) < time) ^ end)) 00588 { 00589 time = mt::TimeStamp<M8>::value(*m8.getMessage()); 00590 index = 8; 00591 } 00592 } 00593 00594 00595 // ASSUMES: we have a pivot and candidate 00596 template<int i> 00597 ros::Time getVirtualTime() 00598 { 00599 namespace mt = ros::message_traits; 00600 00601 if (i >= RealTypeCount::value) 00602 { 00603 return ros::Time(0,0); // Dummy return value 00604 } 00605 ROS_ASSERT(pivot_ != NO_PIVOT); 00606 00607 std::vector<typename mpl::at_c<Events, i>::type>& v = boost::get<i>(past_); 00608 std::deque<typename mpl::at_c<Events, i>::type>& q = boost::get<i>(deques_); 00609 if (q.empty()) 00610 { 00611 ROS_ASSERT(!v.empty()); // Because we have a candidate 00612 ros::Time last_msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(*(v.back()).getMessage()); 00613 ros::Time msg_time_lower_bound = last_msg_time + inter_message_lower_bounds_[i]; 00614 if (msg_time_lower_bound > pivot_time_) // Take the max 00615 { 00616 return msg_time_lower_bound; 00617 } 00618 return pivot_time_; 00619 } 00620 ros::Time current_msg_time = mt::TimeStamp<typename mpl::at_c<Messages, i>::type>::value(*(q.front()).getMessage()); 00621 return current_msg_time; 00622 } 00623 00624 00625 // ASSUMES: we have a pivot and candidate 00626 void getVirtualCandidateStart(uint32_t &start_index, ros::Time &start_time) 00627 { 00628 return getVirtualCandidateBoundary(start_index, start_time, false); 00629 } 00630 00631 // ASSUMES: we have a pivot and candidate 00632 void getVirtualCandidateEnd(uint32_t &end_index, ros::Time &end_time) 00633 { 00634 return getVirtualCandidateBoundary(end_index, end_time, true); 00635 } 00636 00637 // ASSUMES: we have a pivot and candidate 00638 // end = true: look for the latest head of deque 00639 // false: look for the earliest head of deque 00640 void getVirtualCandidateBoundary(uint32_t &index, ros::Time &time, bool end) 00641 { 00642 namespace mt = ros::message_traits; 00643 00644 std::vector<ros::Time> virtual_times(9); 00645 virtual_times[0] = getVirtualTime<0>(); 00646 virtual_times[1] = getVirtualTime<1>(); 00647 virtual_times[2] = getVirtualTime<2>(); 00648 virtual_times[3] = getVirtualTime<3>(); 00649 virtual_times[4] = getVirtualTime<4>(); 00650 virtual_times[5] = getVirtualTime<5>(); 00651 virtual_times[6] = getVirtualTime<6>(); 00652 virtual_times[7] = getVirtualTime<7>(); 00653 virtual_times[8] = getVirtualTime<8>(); 00654 00655 time = virtual_times[0]; 00656 index = 0; 00657 for (int i = 0; i < RealTypeCount::value; i++) 00658 { 00659 if ((virtual_times[i] < time) ^ end) 00660 { 00661 time = virtual_times[i]; 00662 index = i; 00663 } 00664 } 00665 } 00666 00667 00668 // assumes data_mutex_ is already locked 00669 void process() 00670 { 00671 // While no deque is empty 00672 while (num_non_empty_deques_ == (uint32_t)RealTypeCount::value) 00673 { 00674 // Find the start and end of the current interval 00675 //printf("Entering while loop in this state [\n"); 00676 //show_internal_state(); 00677 //printf("]\n"); 00678 ros::Time end_time, start_time; 00679 uint32_t end_index, start_index; 00680 getCandidateEnd(end_index, end_time); 00681 getCandidateStart(start_index, start_time); 00682 for (uint32_t i = 0; i < (uint32_t)RealTypeCount::value; i++) 00683 { 00684 if (i != end_index) 00685 { 00686 // No dropped message could have been better to use than the ones we have, 00687 // so it becomes ok to use this topic as pivot in the future 00688 has_dropped_messages_[i] = false; 00689 } 00690 } 00691 if (pivot_ == NO_PIVOT) 00692 { 00693 // We do not have a candidate 00694 // INVARIANT: the past_ vectors are empty 00695 // INVARIANT: (candidate_ has no filled members) 00696 if (end_time - start_time > max_interval_duration_) 00697 { 00698 // This interval is too big to be a valid candidate, move to the next 00699 dequeDeleteFront(start_index); 00700 continue; 00701 } 00702 if (has_dropped_messages_[end_index]) 00703 { 00704 // The topic that would become pivot has dropped messages, so it is not a good pivot 00705 dequeDeleteFront(start_index); 00706 continue; 00707 } 00708 // This is a valid candidate, and we don't have any, so take it 00709 makeCandidate(); 00710 candidate_start_ = start_time; 00711 candidate_end_ = end_time; 00712 pivot_ = end_index; 00713 pivot_time_ = end_time; 00714 dequeMoveFrontToPast(start_index); 00715 } 00716 else 00717 { 00718 // We already have a candidate 00719 // Is this one better than the current candidate? 00720 // INVARIANT: has_dropped_messages_ is all false 00721 if ((end_time - candidate_end_) * (1 + age_penalty_) >= (start_time - candidate_start_)) 00722 { 00723 // This is not a better candidate, move to the next 00724 dequeMoveFrontToPast(start_index); 00725 } 00726 else 00727 { 00728 // This is a better candidate 00729 makeCandidate(); 00730 candidate_start_ = start_time; 00731 candidate_end_ = end_time; 00732 dequeMoveFrontToPast(start_index); 00733 // Keep the same pivot (and pivot time) 00734 } 00735 } 00736 // INVARIANT: we have a candidate and pivot 00737 ROS_ASSERT(pivot_ != NO_PIVOT); 00738 //printf("start_index == %d, pivot_ == %d\n", start_index, pivot_); 00739 if (start_index == pivot_) // TODO: replace with start_time == pivot_time_ 00740 { 00741 // We have exhausted all possible candidates for this pivot, we now can output the best one 00742 publishCandidate(); 00743 } 00744 else if ((end_time - candidate_end_) * (1 + age_penalty_) >= (pivot_time_ - candidate_start_)) 00745 { 00746 // We have not exhausted all candidates, but this candidate is already provably optimal 00747 // Indeed, any future candidate must contain the interval [pivot_time_ end_time], which 00748 // is already too big. 00749 // Note: this case is subsumed by the next, but it may save some unnecessary work and 00750 // it makes things (a little) easier to understand 00751 publishCandidate(); 00752 } 00753 else if (num_non_empty_deques_ < (uint32_t)RealTypeCount::value) 00754 { 00755 uint32_t num_non_empty_deques_before_virtual_search = num_non_empty_deques_; 00756 00757 // Before giving up, use the rate bounds, if provided, to further try to prove optimality 00758 std::vector<int> num_virtual_moves(9,0); 00759 while (1) 00760 { 00761 ros::Time end_time, start_time; 00762 uint32_t end_index, start_index; 00763 getVirtualCandidateEnd(end_index, end_time); 00764 getVirtualCandidateStart(start_index, start_time); 00765 if ((end_time - candidate_end_) * (1 + age_penalty_) >= (pivot_time_ - candidate_start_)) 00766 { 00767 // We have proved optimality 00768 // As above, any future candidate must contain the interval [pivot_time_ end_time], which 00769 // is already too big. 00770 publishCandidate(); // This cleans up the virtual moves as a byproduct 00771 break; // From the while(1) loop only 00772 } 00773 if ((end_time - candidate_end_) * (1 + age_penalty_) < (start_time - candidate_start_)) 00774 { 00775 // We cannot prove optimality 00776 // Indeed, we have a virtual (i.e. optimistic) candidate that is better than the current 00777 // candidate 00778 // Cleanup the virtual search: 00779 num_non_empty_deques_ = 0; // We will recompute it from scratch 00780 recover<0>(num_virtual_moves[0]); 00781 recover<1>(num_virtual_moves[1]); 00782 recover<2>(num_virtual_moves[2]); 00783 recover<3>(num_virtual_moves[3]); 00784 recover<4>(num_virtual_moves[4]); 00785 recover<5>(num_virtual_moves[5]); 00786 recover<6>(num_virtual_moves[6]); 00787 recover<7>(num_virtual_moves[7]); 00788 recover<8>(num_virtual_moves[8]); 00789 (void)num_non_empty_deques_before_virtual_search; // unused variable warning stopper 00790 ROS_ASSERT(num_non_empty_deques_before_virtual_search == num_non_empty_deques_); 00791 break; 00792 } 00793 // Note: we cannot reach this point with start_index == pivot_ since in that case we would 00794 // have start_time == pivot_time, in which case the two tests above are the negation 00795 // of each other, so that one must be true. Therefore the while loop always terminates. 00796 ROS_ASSERT(start_index != pivot_); 00797 ROS_ASSERT(start_time < pivot_time_); 00798 dequeMoveFrontToPast(start_index); 00799 num_virtual_moves[start_index]++; 00800 } // while(1) 00801 } 00802 } // while(num_non_empty_deques_ == (uint32_t)RealTypeCount::value) 00803 } 00804 00805 Sync* parent_; 00806 uint32_t queue_size_; 00807 00808 static const uint32_t NO_PIVOT = 9; // Special value for the pivot indicating that no pivot has been selected 00809 00810 DequeTuple deques_; 00811 uint32_t num_non_empty_deques_; 00812 VectorTuple past_; 00813 Tuple candidate_; // NULL if there is no candidate, in which case there is no pivot. 00814 ros::Time candidate_start_; 00815 ros::Time candidate_end_; 00816 ros::Time pivot_time_; 00817 uint32_t pivot_; // Equal to NO_PIVOT if there is no candidate 00818 boost::mutex data_mutex_; // Protects all of the above 00819 00820 ros::Duration max_interval_duration_; // TODO: initialize with a parameter 00821 double age_penalty_; 00822 00823 std::vector<bool> has_dropped_messages_; 00824 std::vector<ros::Duration> inter_message_lower_bounds_; 00825 std::vector<bool> warned_about_incorrect_bound_; 00826 }; 00827 00828 } // namespace sync 00829 } // namespace message_filters 00830 00831 #endif // MESSAGE_FILTERS_SYNC_APPROXIMATE_TIME_H 00832