ApproximateTime.h
Go to the documentation of this file.
00001 /*********************************************************************
00002  * Software License Agreement (BSD License)
00003  *
00004  *  Copyright (c) 2015-2018, Dataspeed Inc.
00005  *  All rights reserved.
00006  *
00007  *  Redistribution and use in source and binary forms, with or without
00008  *  modification, are permitted provided that the following conditions
00009  *  are met:
00010  *
00011  *   * Redistributions of source code must retain the above copyright
00012  *     notice, this list of conditions and the following disclaimer.
00013  *   * Redistributions in binary form must reproduce the above
00014  *     copyright notice, this list of conditions and the following
00015  *     disclaimer in the documentation and/or other materials provided
00016  *     with the distribution.
00017  *   * Neither the name of Dataspeed Inc. nor the names of its
00018  *     contributors may be used to endorse or promote products derived
00019  *     from this software without specific prior written permission.
00020  *
00021  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024  *  FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025  *  COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026  *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027  *  BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028  *  LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030  *  LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031  *  ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032  *  POSSIBILITY OF SUCH DAMAGE.
00033  *********************************************************************/
00034 
00035 #ifndef _CAN_APPROXIMATE_TIME_H_
00036 #define _CAN_APPROXIMATE_TIME_H_
00037 
00038 #include <deque>
00039 #include <ros/ros.h>
00040 #include <can_msgs/Frame.h>
00041 
00042 namespace dataspeed_can_msg_filters
00043 {
00044 class ApproximateTime
00045 {
00046 public:
00047   typedef can_msgs::Frame::ConstPtr Type;
00048   typedef boost::function<void(const std::vector<Type> &vec)> Callback;
00049 
00050   static bool ValidId(uint32_t id)
00051   {
00052     if (id & 0x80000000) {
00053       if (!(id & ~0x9FFFFFFF)) {
00054         return true; // Extended ID
00055       }
00056     } else {
00057       if (!(id & ~0x7FF)) {
00058         return true; // Standard ID
00059       }
00060     }
00061     return false;
00062   }
00063   static bool ValidId(uint32_t id, bool extended)
00064   {
00065     return extended ? !(id & ~0x1FFFFFFF) : !(id & ~0x7FF);
00066   }
00067   static bool ValidId(const Type &msg) { return ValidId(msg->id, msg->is_extended); }
00068   static uint32_t BuildId(uint32_t id, bool extended)
00069   {
00070     return extended ? ((id & 0x1FFFFFFF) | 0x80000000) : (id & 0x7FF);
00071   }
00072   static uint32_t BuildId(const Type &msg) { return BuildId(msg->id, msg->is_extended); }
00073 
00074   ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2)
00075 : queue_size_(queue_size)
00076 , callback_(callback)
00077 , num_non_empty_deques_(0)
00078 , pivot_(NO_PIVOT)
00079 , max_interval_duration_(ros::DURATION_MAX)
00080 , age_penalty_(0.1)
00081   {
00082     ROS_ASSERT(queue_size_ > 0);  // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
00083     ROS_ASSERT(ValidId(id1));
00084     ROS_ASSERT(ValidId(id2));
00085 
00086     std::vector<uint32_t> ids(2);
00087     ids[0] = id1;
00088     ids[1] = id2;
00089 
00090     vector_.resize(ids.size());
00091     for (size_t i = 0; i < ids.size(); i++) {
00092       vector_[i].id = ids[i];
00093       vector_[i].has_dropped_messages = false;
00094       vector_[i].inter_message_lower_bounds = ros::Duration(0);
00095       vector_[i].warned_about_incorrect_bound = false;
00096     }
00097   }
00098   ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3)
00099 : queue_size_(queue_size)
00100 , callback_(callback)
00101 , num_non_empty_deques_(0)
00102 , pivot_(NO_PIVOT)
00103 , max_interval_duration_(ros::DURATION_MAX)
00104 , age_penalty_(0.1)
00105   {
00106     ROS_ASSERT(queue_size_ > 0);  // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
00107     ROS_ASSERT(ValidId(id1));
00108     ROS_ASSERT(ValidId(id2));
00109     ROS_ASSERT(ValidId(id3));
00110 
00111     std::vector<uint32_t> ids(3);
00112     ids[0] = id1;
00113     ids[1] = id2;
00114     ids[2] = id3;
00115 
00116     vector_.resize(ids.size());
00117     for (size_t i = 0; i < ids.size(); i++) {
00118       vector_[i].id = ids[i];
00119       vector_[i].has_dropped_messages = false;
00120       vector_[i].inter_message_lower_bounds = ros::Duration(0);
00121       vector_[i].warned_about_incorrect_bound = false;
00122     }
00123   }
00124   ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3, uint32_t id4)
00125 : queue_size_(queue_size)
00126 , callback_(callback)
00127 , num_non_empty_deques_(0)
00128 , pivot_(NO_PIVOT)
00129 , max_interval_duration_(ros::DURATION_MAX)
00130 , age_penalty_(0.1)
00131   {
00132     ROS_ASSERT(queue_size_ > 0);  // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
00133     ROS_ASSERT(ValidId(id1));
00134     ROS_ASSERT(ValidId(id2));
00135     ROS_ASSERT(ValidId(id3));
00136     ROS_ASSERT(ValidId(id4));
00137 
00138     std::vector<uint32_t> ids(4);
00139     ids[0] = id1;
00140     ids[1] = id2;
00141     ids[2] = id3;
00142     ids[3] = id4;
00143 
00144     vector_.resize(ids.size());
00145     for (size_t i = 0; i < ids.size(); i++) {
00146       vector_[i].id = ids[i];
00147       vector_[i].has_dropped_messages = false;
00148       vector_[i].inter_message_lower_bounds = ros::Duration(0);
00149       vector_[i].warned_about_incorrect_bound = false;
00150     }
00151   }
00152   ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3, uint32_t id4, uint32_t id5)
00153 : queue_size_(queue_size)
00154 , callback_(callback)
00155 , num_non_empty_deques_(0)
00156 , pivot_(NO_PIVOT)
00157 , max_interval_duration_(ros::DURATION_MAX)
00158 , age_penalty_(0.1)
00159   {
00160     ROS_ASSERT(queue_size_ > 0);  // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
00161     ROS_ASSERT(ValidId(id1));
00162     ROS_ASSERT(ValidId(id2));
00163     ROS_ASSERT(ValidId(id3));
00164     ROS_ASSERT(ValidId(id4));
00165     ROS_ASSERT(ValidId(id5));
00166 
00167     std::vector<uint32_t> ids(5);
00168     ids[0] = id1;
00169     ids[1] = id2;
00170     ids[2] = id3;
00171     ids[3] = id4;
00172     ids[4] = id5;
00173 
00174     vector_.resize(ids.size());
00175     for (size_t i = 0; i < ids.size(); i++) {
00176       vector_[i].id = ids[i];
00177       vector_[i].has_dropped_messages = false;
00178       vector_[i].inter_message_lower_bounds = ros::Duration(0);
00179       vector_[i].warned_about_incorrect_bound = false;
00180     }
00181   }
00182   ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3, uint32_t id4, uint32_t id5, uint32_t id6)
00183 : queue_size_(queue_size)
00184 , callback_(callback)
00185 , num_non_empty_deques_(0)
00186 , pivot_(NO_PIVOT)
00187 , max_interval_duration_(ros::DURATION_MAX)
00188 , age_penalty_(0.1)
00189   {
00190     ROS_ASSERT(queue_size_ > 0);  // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
00191     ROS_ASSERT(ValidId(id1));
00192     ROS_ASSERT(ValidId(id2));
00193     ROS_ASSERT(ValidId(id3));
00194     ROS_ASSERT(ValidId(id4));
00195     ROS_ASSERT(ValidId(id5));
00196     ROS_ASSERT(ValidId(id6));
00197 
00198     std::vector<uint32_t> ids(6);
00199     ids[0] = id1;
00200     ids[1] = id2;
00201     ids[2] = id3;
00202     ids[3] = id4;
00203     ids[4] = id5;
00204     ids[5] = id6;
00205 
00206     vector_.resize(ids.size());
00207     for (size_t i = 0; i < ids.size(); i++) {
00208       vector_[i].id = ids[i];
00209       vector_[i].has_dropped_messages = false;
00210       vector_[i].inter_message_lower_bounds = ros::Duration(0);
00211       vector_[i].warned_about_incorrect_bound = false;
00212     }
00213   }
00214   ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3, uint32_t id4, uint32_t id5, uint32_t id6, uint32_t id7)
00215 : queue_size_(queue_size)
00216 , callback_(callback)
00217 , num_non_empty_deques_(0)
00218 , pivot_(NO_PIVOT)
00219 , max_interval_duration_(ros::DURATION_MAX)
00220 , age_penalty_(0.1)
00221   {
00222     ROS_ASSERT(queue_size_ > 0);  // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
00223     ROS_ASSERT(ValidId(id1));
00224     ROS_ASSERT(ValidId(id2));
00225     ROS_ASSERT(ValidId(id3));
00226     ROS_ASSERT(ValidId(id4));
00227     ROS_ASSERT(ValidId(id5));
00228     ROS_ASSERT(ValidId(id6));
00229     ROS_ASSERT(ValidId(id7));
00230 
00231     std::vector<uint32_t> ids(7);
00232     ids[0] = id1;
00233     ids[1] = id2;
00234     ids[2] = id3;
00235     ids[3] = id4;
00236     ids[4] = id5;
00237     ids[5] = id6;
00238     ids[6] = id7;
00239 
00240     vector_.resize(ids.size());
00241     for (size_t i = 0; i < ids.size(); i++) {
00242       vector_[i].id = ids[i];
00243       vector_[i].has_dropped_messages = false;
00244       vector_[i].inter_message_lower_bounds = ros::Duration(0);
00245       vector_[i].warned_about_incorrect_bound = false;
00246     }
00247   }
00248   ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3, uint32_t id4, uint32_t id5, uint32_t id6, uint32_t id7, uint32_t id8)
00249 : queue_size_(queue_size)
00250 , callback_(callback)
00251 , num_non_empty_deques_(0)
00252 , pivot_(NO_PIVOT)
00253 , max_interval_duration_(ros::DURATION_MAX)
00254 , age_penalty_(0.1)
00255   {
00256     ROS_ASSERT(queue_size_ > 0);  // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
00257     ROS_ASSERT(ValidId(id1));
00258     ROS_ASSERT(ValidId(id2));
00259     ROS_ASSERT(ValidId(id3));
00260     ROS_ASSERT(ValidId(id4));
00261     ROS_ASSERT(ValidId(id5));
00262     ROS_ASSERT(ValidId(id6));
00263     ROS_ASSERT(ValidId(id7));
00264     ROS_ASSERT(ValidId(id8));
00265 
00266     std::vector<uint32_t> ids(8);
00267     ids[0] = id1;
00268     ids[1] = id2;
00269     ids[2] = id3;
00270     ids[3] = id4;
00271     ids[4] = id5;
00272     ids[5] = id6;
00273     ids[6] = id7;
00274     ids[7] = id8;
00275 
00276     vector_.resize(ids.size());
00277     for (size_t i = 0; i < ids.size(); i++) {
00278       vector_[i].id = ids[i];
00279       vector_[i].has_dropped_messages = false;
00280       vector_[i].inter_message_lower_bounds = ros::Duration(0);
00281       vector_[i].warned_about_incorrect_bound = false;
00282     }
00283   }
00284   ApproximateTime(uint32_t queue_size, Callback callback, uint32_t id1, uint32_t id2, uint32_t id3, uint32_t id4, uint32_t id5, uint32_t id6, uint32_t id7, uint32_t id8, uint32_t id9)
00285 : queue_size_(queue_size)
00286 , callback_(callback)
00287 , num_non_empty_deques_(0)
00288 , pivot_(NO_PIVOT)
00289 , max_interval_duration_(ros::DURATION_MAX)
00290 , age_penalty_(0.1)
00291   {
00292     ROS_ASSERT(queue_size_ > 0);  // The synchronizer will tend to drop many messages with a queue size of 1. At least 2 is recommended.
00293     ROS_ASSERT(ValidId(id1));
00294     ROS_ASSERT(ValidId(id2));
00295     ROS_ASSERT(ValidId(id3));
00296     ROS_ASSERT(ValidId(id4));
00297     ROS_ASSERT(ValidId(id5));
00298     ROS_ASSERT(ValidId(id6));
00299     ROS_ASSERT(ValidId(id7));
00300     ROS_ASSERT(ValidId(id8));
00301     ROS_ASSERT(ValidId(id9));
00302 
00303     std::vector<uint32_t> ids(9);
00304     ids[0] = id1;
00305     ids[1] = id2;
00306     ids[2] = id3;
00307     ids[3] = id4;
00308     ids[4] = id5;
00309     ids[5] = id6;
00310     ids[6] = id7;
00311     ids[7] = id8;
00312     ids[8] = id9;
00313 
00314     vector_.resize(ids.size());
00315     for (size_t i = 0; i < ids.size(); i++) {
00316       vector_[i].id = ids[i];
00317       vector_[i].has_dropped_messages = false;
00318       vector_[i].inter_message_lower_bounds = ros::Duration(0);
00319       vector_[i].warned_about_incorrect_bound = false;
00320     }
00321   }
00322   ~ApproximateTime() {}
00323 
00324   void processMsg(const Type &msg)
00325   {
00326     if (msg->is_rtr || msg->is_error) return;
00327     ROS_WARN_COND(!ValidId(msg), "Processed CAN message with invalid id: 0x%X (%s)", msg->id, msg->is_extended ? "extended" : "standard");
00328     for (size_t i = 0; i < vector_.size(); i++) {
00329       if (BuildId(msg) == vector_[i].id) {
00330 #if 0
00331         ROS_INFO("Id 0x%X: %u.%u", BuildId(msg), msg->header.stamp.sec, msg->header.stamp.nsec);
00332 #endif
00333         std::deque<Type>& deque = vector_[i].deque;
00334         deque.push_back(msg);
00335         if (deque.size() == (size_t)1) {
00336           // We have just added the first message, so it was empty before
00337           ++num_non_empty_deques_;
00338           if (num_non_empty_deques_ == (uint32_t)vector_.size()) {
00339             // All deques have messages
00340             process();
00341           }
00342         } else {
00343           checkInterMessageBound(i);
00344         }
00345 
00346         // Check whether we have more messages than allowed in the queue.
00347         // Note that during the above call to process(), queue i may contain queue_size_+1 messages.
00348         std::vector<Type>& past = vector_[i].past;
00349         if (deque.size() + past.size() > queue_size_) {
00350           // Cancel ongoing candidate search, if any:
00351           num_non_empty_deques_ = 0; // We will recompute it from scratch
00352           {for (size_t i = 0; i < vector_.size(); i++) {
00353             recover(i);
00354           }}
00355           // Drop the oldest message in the offending topic
00356           ROS_ASSERT(!deque.empty());
00357           deque.pop_front();
00358           vector_[i].has_dropped_messages = true;
00359           if (pivot_ != NO_PIVOT) {
00360             // The candidate is no longer valid. Destroy it.
00361             for (size_t i = 0; i < vector_.size(); i++) {
00362               vector_[i].candidate.reset();
00363             }
00364             pivot_ = NO_PIVOT;
00365             // There might still be enough messages to create a new candidate:
00366             process();
00367           }
00368         }
00369         return;
00370       }
00371     }
00372   }
00373 
00374 
00375   void setAgePenalty(double age_penalty) {
00376     // For correctness we only need age_penalty > -1.0, but most likely a negative age_penalty is a mistake.
00377     ROS_ASSERT(age_penalty >= 0);
00378     age_penalty_ = age_penalty;
00379   }
00380 
00381   void setInterMessageLowerBound(size_t i, ros::Duration lower_bound) {
00382     // For correctness we only need age_penalty > -1.0, but most likely a negative age_penalty is a mistake.
00383     ROS_ASSERT(lower_bound >= ros::Duration(0,0));
00384     ROS_ASSERT(i < vector_.size());
00385     vector_[i].inter_message_lower_bounds = lower_bound;
00386   }
00387 
00388   void setMaxIntervalDuration(ros::Duration max_interval_duration) {
00389     // For correctness we only need age_penalty > -1.0, but most likely a negative age_penalty is a mistake.
00390     ROS_ASSERT(max_interval_duration >= ros::Duration(0,0));
00391     max_interval_duration_ = max_interval_duration;
00392   }
00393 
00394 
00395 private:
00396   void checkInterMessageBound(size_t i)
00397   {
00398     namespace mt = ros::message_traits;
00399     if (vector_[i].warned_about_incorrect_bound) {
00400       return;
00401     }
00402     std::deque<Type>& deque = vector_[i].deque;
00403     std::vector<Type>& v = vector_[i].past;
00404     ROS_ASSERT(!deque.empty());
00405     ros::Time msg_time = deque.back()->header.stamp;
00406     ros::Time previous_msg_time;
00407     if (deque.size() == (size_t)1) {
00408       if (v.empty()) {
00409         // We have already published (or have never received) the previous message, we cannot check the bound
00410         return;
00411       }
00412       previous_msg_time = v.back()->header.stamp;
00413     } else {
00414       // There are at least 2 elements in the deque. Check that the gap respects the bound if it was provided.
00415       previous_msg_time =  deque[deque.size()-2]->header.stamp;
00416     }
00417     if (msg_time < previous_msg_time) {
00418       ROS_WARN_STREAM("Messages of type " << i << " arrived out of order (will print only once)");
00419       vector_[i].warned_about_incorrect_bound = true;
00420     } else if ((msg_time - previous_msg_time) < vector_[i].inter_message_lower_bounds) {
00421       ROS_WARN_STREAM("Messages of type " << i << " arrived closer (" << (msg_time - previous_msg_time)
00422           << ") than the lower bound you provided (" << vector_[i].inter_message_lower_bounds
00423           << ") (will print only once)");
00424       vector_[i].warned_about_incorrect_bound = true;
00425     }
00426   }
00427 
00428   // Assumes that deque number <index> is non empty
00429   void dequeDeleteFront(size_t i)
00430   {
00431     std::deque<Type>& deque = vector_[i].deque;
00432     ROS_ASSERT(!deque.empty());
00433     deque.pop_front();
00434     if (deque.empty()) {
00435       --num_non_empty_deques_;
00436     }
00437   }
00438 
00439   // Assumes that deque number <index> is non empty
00440   void dequeMoveFrontToPast(size_t i)
00441   {
00442     std::deque<Type>& deque = vector_[i].deque;
00443     std::vector<Type>& vector = vector_[i].past;
00444     ROS_ASSERT(!deque.empty());
00445     vector.push_back(deque.front());
00446     deque.pop_front();
00447     if (deque.empty()) {
00448       --num_non_empty_deques_;
00449     }
00450   }
00451 
00452   void makeCandidate()
00453   {
00454     //printf("Creating candidate\n");
00455     for (size_t i = 0; i < vector_.size(); i++) {
00456       vector_[i].candidate = vector_[i].deque.front(); // Create candidate tuple
00457       vector_[i].past.clear(); // Delete all past messages, since we have found a better candidate
00458     }
00459     //printf("Candidate created\n");
00460   }
00461 
00462    void recover(size_t i, size_t num_messages)
00463    {
00464      std::vector<Type>& v = vector_[i].past;
00465      std::deque<Type>& q = vector_[i].deque;
00466      ROS_ASSERT(num_messages <= v.size());
00467      while (num_messages > 0) {
00468        q.push_front(v.back());
00469        v.pop_back();
00470        num_messages--;
00471      }
00472 
00473      if (!q.empty()) {
00474        ++num_non_empty_deques_;
00475      }
00476    }
00477 
00478 
00479    void recover(size_t i)
00480    {
00481      std::vector<Type>& v = vector_[i].past;
00482      std::deque<Type>& q = vector_[i].deque;
00483      while (!v.empty()) {
00484        q.push_front(v.back());
00485        v.pop_back();
00486      }
00487 
00488      if (!q.empty()) {
00489        ++num_non_empty_deques_;
00490      }
00491    }
00492 
00493    void recoverAndDelete(size_t i)
00494    {
00495      std::vector<Type>& v = vector_[i].past;
00496      std::deque<Type>& q = vector_[i].deque;
00497      while (!v.empty()) {
00498        q.push_front(v.back());
00499        v.pop_back();
00500      }
00501 
00502      ROS_ASSERT(!q.empty());
00503 
00504      q.pop_front();
00505      if (!q.empty()) {
00506        ++num_non_empty_deques_;
00507      }
00508    }
00509 
00510    // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
00511    void publishCandidate()
00512    {
00513      //printf("Publishing candidate\n");
00514      // Publish
00515      std::vector<Type> candidate;
00516      for (size_t i = 0; i < vector_.size(); i++) {
00517        candidate.push_back(vector_[i].candidate);
00518      }
00519      callback_(candidate);
00520 
00521      // Delete this candidate
00522      for (size_t i = 0; i < vector_.size(); i++) {
00523        vector_[i].candidate.reset();
00524      }
00525      pivot_ = NO_PIVOT;
00526 
00527      // Recover hidden messages, and delete the ones corresponding to the candidate
00528      num_non_empty_deques_ = 0; // We will recompute it from scratch
00529      for (size_t i = 0; i < vector_.size(); i++) {
00530        recoverAndDelete(i);
00531      }
00532    }
00533 
00534    // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
00535    // Returns: the oldest message on the deques
00536    void getCandidateStart(uint32_t &start_index, ros::Time &start_time)
00537    {
00538      return getCandidateBoundary(start_index, start_time, false);
00539    }
00540 
00541    // Assumes: all deques are non empty, i.e. num_non_empty_deques_ == RealTypeCount::value
00542    // Returns: the latest message among the heads of the deques, i.e. the minimum
00543    //          time to end an interval started at getCandidateStart_index()
00544    void getCandidateEnd(uint32_t &end_index, ros::Time &end_time)
00545    {
00546      return getCandidateBoundary(end_index, end_time, true);
00547    }
00548 
00549    // ASSUMES: all deques are non-empty
00550    // end = true: look for the latest head of deque
00551    //       false: look for the earliest head of deque
00552    void getCandidateBoundary(uint32_t &index, ros::Time &time, bool end)
00553    {
00554      time = vector_[0].deque.front()->header.stamp;
00555      index = 0;
00556      for (size_t i = 1; i < vector_.size(); i++) {
00557        const ros::Time &t = vector_[i].deque.front()->header.stamp;
00558        if ((t < time) ^ end) {
00559          time = t;
00560          index = i;
00561        }
00562      }
00563    }
00564 
00565    // ASSUMES: we have a pivot and candidate
00566    ros::Time getVirtualTime(size_t i)
00567    {
00568      namespace mt = ros::message_traits;
00569 
00570      if (i >= vector_.size()) {
00571        return ros::Time(0,0);  // Dummy return value
00572      }
00573      ROS_ASSERT(pivot_ != NO_PIVOT);
00574 
00575      std::vector<Type>& v = vector_[i].past;
00576      std::deque<Type>& q = vector_[i].deque;
00577      if (q.empty()) {
00578        ROS_ASSERT(!v.empty());  // Because we have a candidate
00579        ros::Time last_msg_time = v.back()->header.stamp;
00580        ros::Time msg_time_lower_bound = last_msg_time + vector_[i].inter_message_lower_bounds;
00581        if (msg_time_lower_bound > pivot_time_) { // Take the max
00582          return msg_time_lower_bound;
00583        }
00584        return pivot_time_;
00585      }
00586      ros::Time current_msg_time = q.front()->header.stamp;
00587      return current_msg_time;
00588    }
00589 
00590 
00591    // ASSUMES: we have a pivot and candidate
00592    void getVirtualCandidateStart(uint32_t &start_index, ros::Time &start_time)
00593    {
00594      return getVirtualCandidateBoundary(start_index, start_time, false);
00595    }
00596 
00597    // ASSUMES: we have a pivot and candidate
00598    void getVirtualCandidateEnd(uint32_t &end_index, ros::Time &end_time)
00599    {
00600      return getVirtualCandidateBoundary(end_index, end_time, true);
00601    }
00602 
00603    // ASSUMES: we have a pivot and candidate
00604    // end = true: look for the latest head of deque
00605    //       false: look for the earliest head of deque
00606    void getVirtualCandidateBoundary(uint32_t &index, ros::Time &time, bool end)
00607    {
00608      namespace mt = ros::message_traits;
00609 
00610      std::vector<ros::Time> virtual_times(vector_.size());
00611      for (size_t i = 0; i < vector_.size(); i++) {
00612        virtual_times[i] = getVirtualTime(i);
00613      }
00614 
00615      time = virtual_times[0];
00616      index = 0;
00617      for (size_t i = 0; i < vector_.size(); i++) {
00618        if ((virtual_times[i] < time) ^ end) {
00619          time = virtual_times[i];
00620          index = i;
00621        }
00622      }
00623    }
00624 
00625 
00626    // assumes data_mutex_ is already locked
00627    void process()
00628    {
00629      // While no deque is empty
00630      while (num_non_empty_deques_ == (uint32_t)vector_.size()) {
00631        // Find the start and end of the current interval
00632        //printf("Entering while loop in this state [\n");
00633        //show_internal_state();
00634        //printf("]\n");
00635        ros::Time end_time, start_time;
00636        uint32_t end_index, start_index;
00637        getCandidateEnd(end_index, end_time);
00638        getCandidateStart(start_index, start_time);
00639        for (uint32_t i = 0; i < (uint32_t)vector_.size(); i++) {
00640          if (i != end_index) {
00641            // No dropped message could have been better to use than the ones we have,
00642            // so it becomes ok to use this topic as pivot in the future
00643            vector_[i].has_dropped_messages = false;
00644          }
00645        }
00646        if (pivot_ == NO_PIVOT) {
00647          // We do not have a candidate
00648          // INVARIANT: the past_ vectors are empty
00649          // INVARIANT: (candidate_ has no filled members)
00650          if (end_time - start_time > max_interval_duration_) {
00651            // This interval is too big to be a valid candidate, move to the next
00652            dequeDeleteFront(start_index);
00653            continue;
00654          }
00655          if (vector_[end_index].has_dropped_messages) {
00656            // The topic that would become pivot has dropped messages, so it is not a good pivot
00657            dequeDeleteFront(start_index);
00658            continue;
00659          }
00660          // This is a valid candidate, and we don't have any, so take it
00661          makeCandidate();
00662          candidate_start_ = start_time;
00663          candidate_end_ = end_time;
00664          pivot_ = end_index;
00665          pivot_time_ = end_time;
00666          dequeMoveFrontToPast(start_index);
00667        } else {
00668          // We already have a candidate
00669          // Is this one better than the current candidate?
00670          // INVARIANT: has_dropped_messages_ is all false
00671          if ((end_time - candidate_end_) * (1 + age_penalty_) >= (start_time - candidate_start_)) {
00672            // This is not a better candidate, move to the next
00673            dequeMoveFrontToPast(start_index);
00674          } else {
00675            // This is a better candidate
00676            makeCandidate();
00677            candidate_start_ = start_time;
00678            candidate_end_ = end_time;
00679            dequeMoveFrontToPast(start_index);
00680            // Keep the same pivot (and pivot time)
00681          }
00682        }
00683        // INVARIANT: we have a candidate and pivot
00684        ROS_ASSERT(pivot_ != NO_PIVOT);
00685        //printf("start_index == %d, pivot_ == %d\n", start_index, pivot_);
00686        if (start_index == pivot_) { // TODO: replace with start_time == pivot_time_
00687          // We have exhausted all possible candidates for this pivot, we now can output the best one
00688          publishCandidate();
00689        } else if ((end_time - candidate_end_) * (1 + age_penalty_) >= (pivot_time_ - candidate_start_)) {
00690          // We have not exhausted all candidates, but this candidate is already provably optimal
00691          // Indeed, any future candidate must contain the interval [pivot_time_ end_time], which
00692          // is already too big.
00693          // Note: this case is subsumed by the next, but it may save some unnecessary work and
00694          //       it makes things (a little) easier to understand
00695          publishCandidate();
00696        } else if (num_non_empty_deques_ < (uint32_t)vector_.size()) {
00697          uint32_t num_non_empty_deques_before_virtual_search = num_non_empty_deques_;
00698 
00699          // Before giving up, use the rate bounds, if provided, to further try to prove optimality
00700          std::vector<int> num_virtual_moves(9,0);
00701          while (1) {
00702            ros::Time end_time, start_time;
00703            uint32_t end_index, start_index;
00704            getVirtualCandidateEnd(end_index, end_time);
00705            getVirtualCandidateStart(start_index, start_time);
00706            if ((end_time - candidate_end_) * (1 + age_penalty_) >= (pivot_time_ - candidate_start_)) {
00707              // We have proved optimality
00708              // As above, any future candidate must contain the interval [pivot_time_ end_time], which
00709              // is already too big.
00710              publishCandidate();  // This cleans up the virtual moves as a byproduct
00711              break;  // From the while(1) loop only
00712            }
00713            if ((end_time - candidate_end_) * (1 + age_penalty_) < (start_time - candidate_start_)) {
00714              // We cannot prove optimality
00715              // Indeed, we have a virtual (i.e. optimistic) candidate that is better than the current
00716              // candidate
00717              // Cleanup the virtual search:
00718              num_non_empty_deques_ = 0; // We will recompute it from scratch
00719              for (size_t i = 0; i < vector_.size(); i++) {
00720                recover(i, num_virtual_moves[i]);
00721              }
00722              (void)num_non_empty_deques_before_virtual_search; // unused variable warning stopper
00723              ROS_ASSERT(num_non_empty_deques_before_virtual_search == num_non_empty_deques_);
00724              break;
00725            }
00726            // Note: we cannot reach this point with start_index == pivot_ since in that case we would
00727            //       have start_time == pivot_time, in which case the two tests above are the negation
00728            //       of each other, so that one must be true. Therefore the while loop always terminates.
00729            ROS_ASSERT(start_index != pivot_);
00730            ROS_ASSERT(start_time < pivot_time_);
00731            dequeMoveFrontToPast(start_index);
00732            num_virtual_moves[start_index]++;
00733          } // while(1)
00734        }
00735      } // while(num_non_empty_deques_ == (uint32_t)RealTypeCount::value)
00736    }
00737 
00738 
00739   uint32_t queue_size_;
00740   Callback callback_;
00741 
00742   static const uint32_t NO_PIVOT = 9;  // Special value for the pivot indicating that no pivot has been selected
00743 
00744   typedef struct {
00745     uint32_t id;
00746     std::deque<Type> deque;
00747     std::vector<Type> past;
00748     Type candidate;  // NULL if there is no candidate, in which case there is no pivot.
00749     bool has_dropped_messages;
00750     ros::Duration inter_message_lower_bounds;
00751     bool warned_about_incorrect_bound;
00752   } VectorData;
00753   std::vector<VectorData> vector_;
00754   uint32_t num_non_empty_deques_;
00755   ros::Time candidate_start_;
00756   ros::Time candidate_end_;
00757   ros::Time pivot_time_;
00758   uint32_t pivot_;  // Equal to NO_PIVOT if there is no candidate
00759 
00760   ros::Duration max_interval_duration_; // TODO: initialize with a parameter
00761   double age_penalty_;
00762 };
00763 
00764 } // namespace dataspeed_can_msg_filters
00765 
00766 #endif // _CAN_APPROXIMATE_TIME_H_
00767 


dataspeed_can_msg_filters
Author(s): Kevin Hallenbeck
autogenerated on Thu Jun 6 2019 21:16:37