Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #ifndef __PCL_SYNCHRONIZER__
00036 #define __PCL_SYNCHRONIZER__
00037
00038 namespace pcl
00039 {
00048 template <typename T1, typename T2>
00049 class Synchronizer
00050 {
00051 typedef std::pair<unsigned long, T1> T1Stamped;
00052 typedef std::pair<unsigned long, T2> T2Stamped;
00053 boost::mutex mutex1_;
00054 boost::mutex mutex2_;
00055 boost::mutex publish_mutex_;
00056 std::deque<T1Stamped> queueT1;
00057 std::deque<T2Stamped> queueT2;
00058
00059 typedef boost::function<void(T1, T2, unsigned long, unsigned long) > CallbackFunction;
00060
00061 std::map<int, CallbackFunction> cb_;
00062 int callback_counter;
00063 public:
00064
00065 Synchronizer () : mutex1_ (), mutex2_ (), publish_mutex_ (), queueT1 (), queueT2 (), cb_ (), callback_counter (0) { };
00066
00067 int
00068 addCallback (const CallbackFunction& callback)
00069 {
00070 boost::unique_lock<boost::mutex> publish_lock (publish_mutex_);
00071 cb_[callback_counter] = callback;
00072 return callback_counter++;
00073 }
00074
00075 void
00076 removeCallback (int i)
00077 {
00078 boost::unique_lock<boost::mutex> publish_lock (publish_mutex_);
00079 cb_.erase (i);
00080 }
00081
00082 void
00083 add0 (const T1& t, unsigned long time)
00084 {
00085 mutex1_.lock ();
00086 queueT1.push_back (T1Stamped (time, t));
00087 mutex1_.unlock ();
00088 publish ();
00089 }
00090
00091 void
00092 add1 (const T2& t, unsigned long time)
00093 {
00094 mutex2_.lock ();
00095 queueT2.push_back (T2Stamped (time, t));
00096 mutex2_.unlock ();
00097 publish ();
00098 }
00099
00100 private:
00101
00102 void
00103 publishData ()
00104 {
00105 boost::unique_lock<boost::mutex> lock1 (mutex1_);
00106 boost::unique_lock<boost::mutex> lock2 (mutex2_);
00107
00108 for (typename std::map<int, CallbackFunction>::iterator cb = cb_.begin (); cb != cb_.end (); ++cb)
00109 {
00110 if (!cb->second.empty ())
00111 {
00112 cb->second.operator()(queueT1.front ().second, queueT2.front ().second, queueT1.front ().first, queueT2.front ().first);
00113 }
00114 }
00115
00116 queueT1.pop_front ();
00117 queueT2.pop_front ();
00118 }
00119
00120 void
00121 publish ()
00122 {
00123
00124 boost::unique_lock<boost::mutex> publish_lock (publish_mutex_);
00125
00126 boost::unique_lock<boost::mutex> lock1 (mutex1_);
00127 if (queueT1.empty ())
00128 return;
00129 T1Stamped t1 = queueT1.front ();
00130 lock1.unlock ();
00131
00132 boost::unique_lock<boost::mutex> lock2 (mutex2_);
00133 if (queueT2.empty ())
00134 return;
00135 T2Stamped t2 = queueT2.front ();
00136 lock2.unlock ();
00137
00138 bool do_publish = false;
00139
00140 if (t1.first <= t2.first)
00141 {
00142 lock1.lock ();
00143 while (queueT1.size () > 1 && queueT1[1].first <= t2.first)
00144 queueT1.pop_front ();
00145
00146 if (queueT1.size () > 1)
00147 {
00148 if ( (t2.first << 1) > (queueT1[0].first + queueT1[1].first) )
00149 queueT1.pop_front ();
00150
00151 do_publish = true;
00152 }
00153 lock1.unlock ();
00154 }
00155 else
00156 {
00157 lock2.lock ();
00158 while (queueT2.size () > 1 && (queueT2[1].first <= t1.first) )
00159 queueT2.pop_front ();
00160
00161 if (queueT2.size () > 1)
00162 {
00163 if ( (t1.first << 1) > queueT2[0].first + queueT2[1].first )
00164 queueT2.pop_front ();
00165
00166 do_publish = true;
00167 }
00168 lock2.unlock ();
00169 }
00170
00171 if (do_publish)
00172 publishData ();
00173 }
00174 } ;
00175 }
00176
00177 #endif // __PCL_SYNCHRONIZER__
00178