$search
00001 /********************************************************************* 00002 * Software License Agreement (BSD License) 00003 * 00004 * Copyright (c) 2009-2010, Willow Garage, Inc. 00005 * All rights reserved. 00006 * 00007 * Redistribution and use in source and binary forms, with or without 00008 * modification, are permitted provided that the following conditions 00009 * are met: 00010 * 00011 * * Redistributions of source code must retain the above copyright 00012 * notice, this list of conditions and the following disclaimer. 00013 * * Redistributions in binary form must reproduce the above 00014 * copyright notice, this list of conditions and the following 00015 * disclaimer in the documentation and/or other materials provided 00016 * with the distribution. 00017 * * Neither the name of the Willow Garage nor the names of its 00018 * contributors may be used to endorse or promote products derived 00019 * from this software without specific prior written permission. 00020 * 00021 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 * POSSIBILITY OF SUCH DAMAGE. 00033 *********************************************************************/ 00034 00035 #ifndef __TIMESTAMP_TOOLS__TRIGGER_MATCHER_H__ 00036 #define __TIMESTAMP_TOOLS__TRIGGER_MATCHER_H__ 00037 00038 #include <ros/ros.h> 00039 #include <queue> 00040 #include <utility> 00041 #include <boost/thread.hpp> 00042 #include <boost/thread/condition.hpp> 00043 #include <std_msgs/Header.h> 00044 #include <boost/thread/thread_time.hpp> 00045 00046 namespace timestamp_tools 00047 { 00048 00049 class TriggerMatcherBase 00050 { 00051 protected: 00052 boost::mutex mutex_; 00053 virtual void gotTrigger() = 0; 00054 00055 ros::Time getTimestampNoblockPrelocked(const ros::Time &data_time) 00056 { 00057 while (!trig_queue_.empty()) 00058 { 00059 // If the head trigger is after the head data then we don't have a 00060 // timestamp for the data and drop it. 00061 if (nonCausalHeads(data_time)) 00062 { 00063 if (verbose_) 00064 ROS_WARN("TriggerMatcherBase: data arrived before trigger. Discarding data sample."); 00065 return DropData; 00066 } 00067 00068 ros::Time trig_stamp = trig_queue_.front(); 00069 00070 // If we are not synchronized, then we need to check that the next trigger 00071 // time stamp would not be satisfactory. (If no late counts are 00072 // allowed then we are always unsynchronized.) 00073 00074 //ROS_DEBUG("getTimestampNoblock t=%f, qt=%f, ql=%i, sync = %i, last = %f", t.toSec(), trig_stamp.toSec(), trig_queue_.size(), synchronized_, last_data_stamp_.toSec()); 00075 00076 if (!synchronized_ || !late_data_count_allowed_) 00077 { 00078 //ROS_DEBUG("Not synchronized..."); 00079 while (true) 00080 { 00081 if (trig_queue_.size() < 2) 00082 return RetryLater; // Can't do the check now. 00083 00084 trig_queue_.pop(); 00085 if (nonCausalHeads(data_time)) 00086 break; // Head is non-causal, we have arrived. 00087 trig_stamp = trig_queue_.front(); // Skip that trigger event. 00088 } 00089 synchronized_ = true; 00090 late_data_count_ = 0; 00091 } 00092 else 00093 { 00094 // If the data is later than the next trigger timestamp, we might 00095 // have missed a timestamp, or the data may just have been delayed a 00096 // lot. We count it and if it happens too often we assume that a 00097 // timestamp was missed and redo the locking process. 00098 //ROS_DEBUG("Packet check %f >? %f", last_data_stamp_.toSec(), trig_stamp.toSec()); 00099 if (last_data_stamp_ > trig_stamp) 00100 { 00101 //ROS_DEBUG("Late packet..."); 00102 if (++late_data_count_ >= late_data_count_allowed_) 00103 { 00104 if (verbose_) 00105 ROS_WARN("TriggerMatcherBase: too many late data packets. Assuming missed trigger. Relocking..."); 00106 synchronized_ = false; 00107 continue; 00108 } 00109 } 00110 else 00111 late_data_count_ = 0; 00112 00113 trig_queue_.pop(); 00114 } 00115 00116 last_data_stamp_ = data_time; 00117 return trig_stamp; 00118 } 00119 00120 return RetryLater; 00121 } 00122 00123 virtual void baseReset() 00124 { 00125 synchronized_ = false; 00126 late_data_count_ = 0; 00127 trig_queue_ = std::queue<ros::Time>(); 00128 } 00129 00130 private: 00131 std::queue<ros::Time> trig_queue_; 00132 00133 ros::Duration trig_delay_; 00134 00135 ros::Time last_data_stamp_; 00136 00137 unsigned int late_data_count_allowed_; 00138 unsigned int late_data_count_; 00139 unsigned int max_trig_queue_length_; 00140 bool synchronized_; 00141 00142 bool nonCausalHeads(const ros::Time &data_stamp) 00143 { // Are the heads non-causal? (Assumes the heads are present.) 00144 return trig_queue_.front() > data_stamp; 00145 } 00146 00147 public: 00148 bool verbose_; 00149 00150 static const ros::Time DropData; 00151 static const ros::Time RetryLater; 00152 00153 bool hasTimestamp() 00154 { 00155 return !trig_queue_.empty(); 00156 } 00157 00158 virtual ~TriggerMatcherBase() 00159 { 00160 } 00161 00162 void setLateDataCountAllowed(unsigned int v) 00163 { 00164 late_data_count_allowed_ = v; 00165 } 00166 00167 void setTrigDelay(double delay) 00168 { 00169 setTrigDelay(ros::Duration(delay)); 00170 } 00171 00172 void setTrigDelay(const ros::Duration &delay) 00173 { 00174 trig_delay_ = delay; 00175 } 00176 00177 TriggerMatcherBase(unsigned int late_data_count_allowed, unsigned int max_trig_queue_length) : 00178 trig_delay_(0), 00179 last_data_stamp_(ros::TIME_MIN), 00180 late_data_count_allowed_(late_data_count_allowed), 00181 late_data_count_(0), 00182 max_trig_queue_length_(max_trig_queue_length), 00183 synchronized_(false), 00184 verbose_(false) 00185 { 00186 } 00187 00188 void triggerCallback(double stamp) 00189 { 00190 triggerCallback(ros::Time(stamp)); 00191 } 00192 00193 void triggerCallback(const std_msgs::HeaderConstPtr &msg) 00194 { 00195 triggerCallback(msg->stamp); 00196 } 00197 00198 void triggerCallback(const ros::Time &stamp) 00199 { 00200 boost::mutex::scoped_lock(mutex_); 00201 00202 trig_queue_.push(stamp + trig_delay_); 00203 if (trig_queue_.size() > max_trig_queue_length_) 00204 { 00205 if (verbose_) 00206 ROS_WARN("TriggerMatcherBase: trig_queue_ overflow dropping from front."); 00207 trig_queue_.pop(); 00208 } 00209 gotTrigger(); 00210 } 00211 }; 00212 00213 const ros::Time TriggerMatcherBase::DropData = ros::TIME_MIN; 00214 const ros::Time TriggerMatcherBase::RetryLater = ros::TIME_MAX; 00215 00216 class TriggerMatcher : public TriggerMatcherBase 00217 { 00218 private: 00219 boost::mutex data_source_mutex_; 00220 boost::condition_variable got_trigger_condition_; 00221 00222 protected: 00223 virtual void gotTrigger() 00224 { 00225 got_trigger_condition_.notify_one(); 00226 } 00227 00228 public: 00229 virtual ~TriggerMatcher() 00230 {} 00231 00232 TriggerMatcher(unsigned int late_data_count_allowed, unsigned int max_trig_queue_length) : 00233 TriggerMatcherBase(late_data_count_allowed, max_trig_queue_length) 00234 {} 00235 00236 void reset() 00237 { 00238 boost::mutex::scoped_lock lock(mutex_); 00239 00240 got_trigger_condition_.notify_all(); 00241 baseReset(); 00242 } 00243 00244 ros::Time getTimestampBlocking(const ros::Time &t) 00245 { 00246 boost::mutex::scoped_lock data_lock(data_source_mutex_); 00247 boost::mutex::scoped_lock lock(mutex_); 00248 00249 ros::Time stamp = getTimestampNoblockPrelocked(t); 00250 00251 if (stamp != RetryLater) 00252 return stamp; 00253 00254 got_trigger_condition_.wait(lock); 00255 00256 return getTimestampNoblockPrelocked(t); 00257 } 00258 00259 ros::Time getTimestampBlocking(const ros::Time &t, double timeout) 00260 { 00261 boost::mutex::scoped_lock data_lock(data_source_mutex_); 00262 boost::mutex::scoped_lock lock(mutex_); 00263 00264 ros::Time stamp = getTimestampNoblockPrelocked(t); 00265 00266 if (stamp != RetryLater) 00267 return stamp; 00268 00269 got_trigger_condition_.timed_wait(lock, boost::posix_time::microseconds(timeout * 1e6)); 00270 00271 return getTimestampNoblockPrelocked(t); 00272 } 00273 00274 ros::Time getTimestampNoblock(const ros::Time &data_time) 00275 { 00276 boost::mutex::scoped_lock data_lock(data_source_mutex_); 00277 boost::mutex::scoped_lock lock(mutex_); 00278 00279 return getTimestampNoblockPrelocked(data_time); 00280 } 00281 }; 00282 00283 template <class C> 00284 class QueuedTriggerMatcher : public TriggerMatcherBase 00285 { 00286 public: 00287 typedef std::pair<ros::Time, boost::shared_ptr<C const> > DataPair; 00288 typedef boost::function<void(const ros::Time &, boost::shared_ptr<C const> &)> MatchCallback; 00289 00290 private: 00291 MatchCallback match_callback_; 00292 std::queue<DataPair> data_queue_; 00293 unsigned int max_data_queue_length_; 00294 00295 static void DefaultCallback(const ros::Time &t, boost::shared_ptr<C const> &d) 00296 { 00297 ROS_ERROR("QueuedTriggerMatcher triggered with no callback set. This is a bug in the code that uses QueuedTriggerMatcher."); 00298 } 00299 00300 protected: 00301 virtual void gotTrigger() 00302 { 00303 while (!data_queue_.empty()) 00304 { 00305 DataPair &data = data_queue_.front(); 00306 ros::Time stamp = getTimestampNoblockPrelocked(data.first); 00307 00308 if (stamp == DropData) 00309 data_queue_.pop(); 00310 else if (stamp == RetryLater) 00311 return; 00312 else 00313 { 00314 match_callback_(stamp, data.second); 00315 data_queue_.pop(); 00316 } 00317 } 00318 } 00319 00320 public: 00321 virtual ~QueuedTriggerMatcher() 00322 { 00323 } 00324 00325 void setMatchCallback(MatchCallback &cb) 00326 { 00327 match_callback_ = cb; 00328 } 00329 00330 QueuedTriggerMatcher(unsigned int late_data_count_allowed, unsigned int max_trig_queue_length, unsigned int max_data_queue_length) : 00331 TriggerMatcherBase(late_data_count_allowed, max_trig_queue_length), 00332 max_data_queue_length_(max_data_queue_length) 00333 { 00334 match_callback_ = &QueuedTriggerMatcher::DefaultCallback; 00335 } 00336 00337 void dataCallback(double stamp, const C &data) 00338 { 00339 dataCallback(ros::Time(stamp), data); 00340 } 00341 00342 void dataCallback(const ros::Time &stamp, const C &data) 00343 { 00344 boost::shared_ptr<C const> ptr = boost::shared_ptr<C const>(new C(data)); 00345 dataCallback(stamp, ptr); 00346 } 00347 00348 void dataCallback(double stamp, const boost::shared_ptr<C const> &data) 00349 { 00350 dataCallback(ros::Time(stamp), data); 00351 } 00352 00353 void dataCallback(const ros::Time &stamp, const boost::shared_ptr<C const> &data) 00354 { 00355 dataCallback(DataPair(stamp, data)); 00356 } 00357 00358 void reset() 00359 { 00360 boost::mutex::scoped_lock lock(mutex_); 00361 00362 data_queue_ = std::queue<DataPair>(); 00363 baseReset(); 00364 } 00365 00366 void dataCallback(const DataPair &pair) 00367 { 00368 boost::mutex::scoped_lock lock(mutex_); 00369 00370 data_queue_.push(pair); 00371 if (data_queue_.size() > max_data_queue_length_) 00372 { 00373 if (verbose_) 00374 ROS_WARN("QueuedTriggerMatcher: trig_queue_ overflow dropping from front."); 00375 data_queue_.pop(); 00376 } 00377 gotTrigger(); 00378 } 00379 }; 00380 00381 } 00382 00383 #endif // __TIMESTAMP_TOOLS__TRIGGER_MATCHER_H__