listeners.hh
Go to the documentation of this file.
00001 
00037 #ifndef LibMultiSense_impl_listeners
00038 #define LibMultiSense_impl_listeners
00039 
00040 #include "MultiSenseTypes.hh"
00041 
00042 #include "details/utility/Thread.hh"
00043 #include "details/utility/BufferStream.hh"
00044 
00045 namespace crl {
00046 namespace multisense {
00047 namespace details {
00048 
00049 //
00050 // For access to a buffer back-end in a dispatch thread
00051 
00052 extern CRL_THREAD_LOCAL utility::BufferStream *dispatchBufferReferenceTP;
00053 
00054 //
00055 // The dispatch mechanism. Each instance represents a bound
00056 // listener to a datum stream.
00057 
00058 template<class THeader, class TCallback>
00059 class Listener {
00060 public:
00061     
00062     Listener(TCallback   c,
00063              DataSource s,
00064              void      *d,
00065              uint32_t   m=0)
00066         : m_callback(c),
00067           m_sourceMask(s),
00068           m_userDataP(d),
00069           m_running(false),
00070           m_queue(m),
00071           m_dispatchThreadP(NULL) {
00072         
00073         m_running         = true;
00074         m_dispatchThreadP = new utility::Thread(dispatchThread, this);
00075     };
00076 
00077     Listener() :
00078         m_callback(NULL),
00079         m_sourceMask(0),
00080         m_userDataP(NULL),
00081         m_running(false),
00082         m_queue(),
00083         m_dispatchThreadP(NULL) {};
00084 
00085     ~Listener() {
00086         if (m_running) {
00087             m_running = false;
00088             m_queue.kick();
00089             delete m_dispatchThreadP;
00090         }
00091     };
00092 
00093     void dispatch(THeader& header) {
00094 
00095         if (header.inMask(m_sourceMask))
00096             m_queue.post(Dispatch(m_callback,
00097                                   header,
00098                                   m_userDataP));
00099     };
00100 
00101     void dispatch(utility::BufferStream& buffer,
00102                   THeader&                header) {
00103 
00104         if (header.inMask(m_sourceMask))
00105             m_queue.post(Dispatch(m_callback,
00106                                   buffer,
00107                                   header,
00108                                   m_userDataP));
00109     };
00110 
00111     TCallback callback() { return m_callback; };
00112 
00113 private:
00114 
00115     //
00116     // For thread-safe dispatching
00117 
00118     class Dispatch {
00119     public:
00120 
00121         Dispatch(TCallback  c,
00122                  THeader&   h,
00123                  void     *d) :
00124             m_callback(c),
00125             m_exposeBuffer(false),
00126             m_header(h),
00127             m_userDataP(d) {};
00128 
00129         Dispatch(TCallback               c,
00130                  utility::BufferStream& b,
00131                  THeader&                h,
00132                  void                  *d) :
00133             m_callback(c),
00134             m_buffer(b),
00135             m_exposeBuffer(true),
00136             m_header(h),
00137             m_userDataP(d) {};
00138 
00139         Dispatch() :
00140             m_callback(NULL),
00141             m_buffer(),
00142             m_exposeBuffer(false),
00143             m_header(),
00144             m_userDataP(NULL) {};
00145 
00146         void operator() (void) {
00147 
00148             if (m_callback) {
00149                 if (m_exposeBuffer)
00150                     dispatchBufferReferenceTP = &m_buffer;
00151                 m_callback(m_header, m_userDataP);
00152             }
00153         };
00154 
00155     private:
00156 
00157         TCallback              m_callback;
00158         utility::BufferStream m_buffer;
00159         bool                  m_exposeBuffer;
00160         THeader                m_header;
00161         void                 *m_userDataP;
00162     };
00163 
00164     //
00165     // The dispatch thread
00166     //
00167     // We are penalized with two memory copies of
00168     // HEADER by std::deque, but the image/lidar data
00169     // is zero-copy (reference-counted by BufferStream)
00170 
00171 #if WIN32
00172     static DWORD WINAPI dispatchThread(void *argumentP) {
00173 #else
00174     static void *dispatchThread(void *argumentP) {
00175 #endif
00176         
00177         Listener<THeader,TCallback> *selfP = reinterpret_cast< Listener<THeader,TCallback> * >(argumentP);
00178     
00179         while(selfP->m_running) {
00180             try {
00181                 Dispatch d;
00182                 if (false == selfP->m_queue.wait(d))
00183                     break;
00184                 d();
00185             } catch (const std::exception& e) {
00186                 CRL_DEBUG("exception invoking image callback: %s\n",
00187                           e.what());
00188             } catch ( ... ) {
00189                 CRL_DEBUG("unknown exception invoking image callback\n");
00190             }
00191         };
00192 
00193         return NULL;
00194     }
00195 
00196     //
00197     // Set by user
00198 
00199     TCallback   m_callback;
00200     DataSource m_sourceMask;
00201     void      *m_userDataP;
00202 
00203     //
00204     // Dispatch mechanism
00205     
00206     volatile bool                m_running;
00207     utility::WaitQueue<Dispatch> m_queue;
00208     utility::Thread             *m_dispatchThreadP;
00209 };
00210 
00211 typedef Listener<image::Header, image::Callback> ImageListener;
00212 typedef Listener<lidar::Header, lidar::Callback> LidarListener;
00213 typedef Listener<pps::Header,   pps::Callback>   PpsListener;
00214 typedef Listener<imu::Header,   imu::Callback>   ImuListener;
00215 
00216 }; // namespace details
00217 }; // namespace multisense
00218 }; // namespace crl
00219 
00220 
00221 #endif //  LibMultiSense_impl_listeners


multisense_lib
Author(s):
autogenerated on Mon Oct 9 2017 03:06:21