00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #ifndef __TIMESTAMP_TOOLS__TRIGGER_MATCHER_H__
00036 #define __TIMESTAMP_TOOLS__TRIGGER_MATCHER_H__
00037
00038 #include <ros/ros.h>
00039 #include <queue>
00040 #include <utility>
00041 #include <boost/thread.hpp>
00042 #include <boost/thread/condition.hpp>
00043 #include <roslib/Header.h>
00044 #include <boost/thread/thread_time.hpp>
00045
00046 namespace timestamp_tools
00047 {
00048
00049 class TriggerMatcherBase
00050 {
00051 protected:
00052 boost::mutex mutex_;
00053 virtual void gotTrigger() = 0;
00054
00055 ros::Time getTimestampNoblockPrelocked(const ros::Time &data_time)
00056 {
00057 while (!trig_queue_.empty())
00058 {
00059
00060
00061 if (nonCausalHeads(data_time))
00062 {
00063 if (verbose_)
00064 ROS_WARN("TriggerMatcherBase: data arrived before trigger. Discarding data sample.");
00065 return DropData;
00066 }
00067
00068 ros::Time trig_stamp = trig_queue_.front();
00069
00070
00071
00072
00073
00074
00075
00076 if (!synchronized_ || !late_data_count_allowed_)
00077 {
00078
00079 while (true)
00080 {
00081 if (trig_queue_.size() < 2)
00082 return RetryLater;
00083
00084 trig_queue_.pop();
00085 if (nonCausalHeads(data_time))
00086 break;
00087 trig_stamp = trig_queue_.front();
00088 }
00089 synchronized_ = true;
00090 late_data_count_ = 0;
00091 }
00092 else
00093 {
00094
00095
00096
00097
00098
00099 if (last_data_stamp_ > trig_stamp)
00100 {
00101
00102 if (++late_data_count_ >= late_data_count_allowed_)
00103 {
00104 if (verbose_)
00105 ROS_WARN("TriggerMatcherBase: too many late data packets. Assuming missed trigger. Relocking...");
00106 synchronized_ = false;
00107 continue;
00108 }
00109 }
00110 else
00111 late_data_count_ = 0;
00112
00113 trig_queue_.pop();
00114 }
00115
00116 last_data_stamp_ = data_time;
00117 return trig_stamp;
00118 }
00119
00120 return RetryLater;
00121 }
00122
00123 virtual void baseReset()
00124 {
00125 synchronized_ = false;
00126 late_data_count_ = 0;
00127 trig_queue_ = std::queue<ros::Time>();
00128 }
00129
00130 private:
00131 std::queue<ros::Time> trig_queue_;
00132
00133 ros::Duration trig_delay_;
00134
00135 ros::Time last_data_stamp_;
00136
00137 unsigned int late_data_count_allowed_;
00138 unsigned int late_data_count_;
00139 unsigned int max_trig_queue_length_;
00140 bool synchronized_;
00141
00142 bool nonCausalHeads(const ros::Time &data_stamp)
00143 {
00144 return trig_queue_.front() > data_stamp;
00145 }
00146
00147 public:
00148 bool verbose_;
00149
00150 static const ros::Time DropData;
00151 static const ros::Time RetryLater;
00152
00153 bool hasTimestamp()
00154 {
00155 return !trig_queue_.empty();
00156 }
00157
00158 virtual ~TriggerMatcherBase()
00159 {
00160 }
00161
00162 void setLateDataCountAllowed(unsigned int v)
00163 {
00164 late_data_count_allowed_ = v;
00165 }
00166
00167 void setTrigDelay(double delay)
00168 {
00169 setTrigDelay(ros::Duration(delay));
00170 }
00171
00172 void setTrigDelay(const ros::Duration &delay)
00173 {
00174 trig_delay_ = delay;
00175 }
00176
00177 TriggerMatcherBase(unsigned int late_data_count_allowed, unsigned int max_trig_queue_length) :
00178 trig_delay_(0),
00179 last_data_stamp_(ros::TIME_MIN),
00180 late_data_count_allowed_(late_data_count_allowed),
00181 late_data_count_(0),
00182 max_trig_queue_length_(max_trig_queue_length),
00183 synchronized_(false),
00184 verbose_(false)
00185 {
00186 }
00187
00188 void triggerCallback(double stamp)
00189 {
00190 triggerCallback(ros::Time(stamp));
00191 }
00192
00193 void triggerCallback(const roslib::HeaderConstPtr &msg)
00194 {
00195 triggerCallback(msg->stamp);
00196 }
00197
00198 void triggerCallback(const ros::Time &stamp)
00199 {
00200 boost::mutex::scoped_lock(mutex_);
00201
00202 trig_queue_.push(stamp + trig_delay_);
00203 if (trig_queue_.size() > max_trig_queue_length_)
00204 {
00205 if (verbose_)
00206 ROS_WARN("TriggerMatcherBase: trig_queue_ overflow dropping from front.");
00207 trig_queue_.pop();
00208 }
00209 gotTrigger();
00210 }
00211 };
00212
00213 const ros::Time TriggerMatcherBase::DropData = ros::TIME_MIN;
00214 const ros::Time TriggerMatcherBase::RetryLater = ros::TIME_MAX;
00215
00216 class TriggerMatcher : public TriggerMatcherBase
00217 {
00218 private:
00219 boost::mutex data_source_mutex_;
00220 boost::condition_variable got_trigger_condition_;
00221
00222 protected:
00223 virtual void gotTrigger()
00224 {
00225 got_trigger_condition_.notify_one();
00226 }
00227
00228 public:
00229 virtual ~TriggerMatcher()
00230 {}
00231
00232 TriggerMatcher(unsigned int late_data_count_allowed, unsigned int max_trig_queue_length) :
00233 TriggerMatcherBase(late_data_count_allowed, max_trig_queue_length)
00234 {}
00235
00236 void reset()
00237 {
00238 boost::mutex::scoped_lock lock(mutex_);
00239
00240 got_trigger_condition_.notify_all();
00241 baseReset();
00242 }
00243
00244 ros::Time getTimestampBlocking(const ros::Time &t)
00245 {
00246 boost::mutex::scoped_lock data_lock(data_source_mutex_);
00247 boost::mutex::scoped_lock lock(mutex_);
00248
00249 ros::Time stamp = getTimestampNoblockPrelocked(t);
00250
00251 if (stamp != RetryLater)
00252 return stamp;
00253
00254 got_trigger_condition_.wait(lock);
00255
00256 return getTimestampNoblockPrelocked(t);
00257 }
00258
00259 ros::Time getTimestampBlocking(const ros::Time &t, double timeout)
00260 {
00261 boost::mutex::scoped_lock data_lock(data_source_mutex_);
00262 boost::mutex::scoped_lock lock(mutex_);
00263
00264 ros::Time stamp = getTimestampNoblockPrelocked(t);
00265
00266 if (stamp != RetryLater)
00267 return stamp;
00268
00269 got_trigger_condition_.timed_wait(lock, boost::posix_time::microseconds(timeout * 1e6));
00270
00271 return getTimestampNoblockPrelocked(t);
00272 }
00273
00274 ros::Time getTimestampNoblock(const ros::Time &data_time)
00275 {
00276 boost::mutex::scoped_lock data_lock(data_source_mutex_);
00277 boost::mutex::scoped_lock lock(mutex_);
00278
00279 return getTimestampNoblockPrelocked(data_time);
00280 }
00281 };
00282
00283 template <class C>
00284 class QueuedTriggerMatcher : public TriggerMatcherBase
00285 {
00286 public:
00287 typedef std::pair<ros::Time, boost::shared_ptr<C const> > DataPair;
00288 typedef boost::function<void(const ros::Time &, boost::shared_ptr<C const> &)> MatchCallback;
00289
00290 private:
00291 MatchCallback match_callback_;
00292 std::queue<DataPair> data_queue_;
00293 unsigned int max_data_queue_length_;
00294
00295 static void DefaultCallback(const ros::Time &t, boost::shared_ptr<C const> &d)
00296 {
00297 ROS_ERROR("QueuedTriggerMatcher triggered with no callback set. This is a bug in the code that uses QueuedTriggerMatcher.");
00298 }
00299
00300 protected:
00301 virtual void gotTrigger()
00302 {
00303 while (!data_queue_.empty())
00304 {
00305 DataPair &data = data_queue_.front();
00306 ros::Time stamp = getTimestampNoblockPrelocked(data.first);
00307
00308 if (stamp == DropData)
00309 data_queue_.pop();
00310 else if (stamp == RetryLater)
00311 return;
00312 else
00313 {
00314 match_callback_(stamp, data.second);
00315 data_queue_.pop();
00316 }
00317 }
00318 }
00319
00320 public:
00321 virtual ~QueuedTriggerMatcher()
00322 {
00323 }
00324
00325 void setMatchCallback(MatchCallback &cb)
00326 {
00327 match_callback_ = cb;
00328 }
00329
00330 QueuedTriggerMatcher(unsigned int late_data_count_allowed, unsigned int max_trig_queue_length, unsigned int max_data_queue_length) :
00331 TriggerMatcherBase(late_data_count_allowed, max_trig_queue_length),
00332 max_data_queue_length_(max_data_queue_length)
00333 {
00334 match_callback_ = &QueuedTriggerMatcher::DefaultCallback;
00335 }
00336
00337 void dataCallback(double stamp, const C &data)
00338 {
00339 dataCallback(ros::Time(stamp), data);
00340 }
00341
00342 void dataCallback(const ros::Time &stamp, const C &data)
00343 {
00344 boost::shared_ptr<C const> ptr = boost::shared_ptr<C const>(new C(data));
00345 dataCallback(stamp, ptr);
00346 }
00347
00348 void dataCallback(double stamp, const boost::shared_ptr<C const> &data)
00349 {
00350 dataCallback(ros::Time(stamp), data);
00351 }
00352
00353 void dataCallback(const ros::Time &stamp, const boost::shared_ptr<C const> &data)
00354 {
00355 dataCallback(DataPair(stamp, data));
00356 }
00357
00358 void reset()
00359 {
00360 boost::mutex::scoped_lock lock(mutex_);
00361
00362 data_queue_ = std::queue<DataPair>();
00363 baseReset();
00364 }
00365
00366 void dataCallback(const DataPair &pair)
00367 {
00368 boost::mutex::scoped_lock lock(mutex_);
00369
00370 data_queue_.push(pair);
00371 if (data_queue_.size() > max_data_queue_length_)
00372 {
00373 if (verbose_)
00374 ROS_WARN("QueuedTriggerMatcher: trig_queue_ overflow dropping from front.");
00375 data_queue_.pop();
00376 }
00377 gotTrigger();
00378 }
00379 };
00380
00381 }
00382
00383 #endif // __TIMESTAMP_TOOLS__TRIGGER_MATCHER_H__