Thread.hh
Go to the documentation of this file.
00001 
00041 #ifndef CRL_MULTISENSE_THREAD_HH
00042 #define CRL_MULTISENSE_THREAD_HH
00043 
00044 #ifndef WIN32_LEAN_AND_MEAN
00045 #define WIN32_LEAN_AND_MEAN 1
00046 #endif
00047 #include <windows.h>
00048 
00049 #include <stdint.h>
00050 #include <errno.h>
00051 #include <string.h>
00052 
00053 #include <vector>
00054 #include <deque>
00055 
00056 #include "details/utility/Portability.hh"
00057 
00058 #include "../Exception.hh"
00059 
00060 namespace crl {
00061 namespace multisense {
00062 namespace details {
00063 namespace utility {
00064 
00065 //
00066 // Forward declarations.
00067 
00068 class ScopedLock;
00069 
00070 //
00071 // A simple class to wrap pthread creation and joining
00072 
00073 class Thread {
00074 public:
00075 
00076     static CRL_CONSTEXPR uint32_t FLAGS_DETACH = (1 << 0);
00077 
00078     Thread(LPTHREAD_START_ROUTINE functionP,
00079            void    *contextP=NULL,
00080            uint32_t flags=0,    
00081            int32_t  scheduler=-1,
00082            int32_t  priority=0) : m_flags(flags) {
00083 
00084         //
00085         // -1 means the user wants default scheduling behavior
00086 
00087         if (-1 != scheduler) {
00088 
00089             //
00090             // Set our scheduling policy
00091 
00092             if (scheduler != 0 /* SCHED_OTHER */)
00093                 CRL_EXCEPTION("This platform only supports SCHED_OTHER");
00094 
00095             if (priority != 0)
00096                 CRL_EXCEPTION("Priority can not be set at this time.");
00097         }
00098 
00099         //
00100         // Finally, create the thread
00101 
00102         m_threadHandle = CreateThread (NULL, 0, functionP, contextP, 0, &m_threadId);
00103         if (m_threadHandle == NULL)
00104             CRL_EXCEPTION("CreateThread() failed: %d", GetLastError());
00105 
00106         //
00107         // Automatically detach, if asked to do so
00108 
00109         if (FLAGS_DETACH & m_flags)
00110         {
00111             CloseHandle (m_threadHandle);
00112         }
00113     };
00114 
00115     ~Thread() {
00116         if (!(m_flags & FLAGS_DETACH) &&
00117             0 != WaitForSingleObject(m_threadHandle, INFINITE))
00118             CRL_DEBUG("WaitForSingleObject() failed: %d\n", GetLastError());
00119     };          
00120     
00121 private:
00122 
00123     uint32_t  m_flags;
00124     DWORD m_threadId;
00125     HANDLE m_threadHandle;
00126 };
00127 
00128 //
00129 // A simple mutex class
00130 
00131 class Mutex {
00132 public:
00133     friend class ScopedLock;
00134     
00135     Mutex() {
00136         InitializeCriticalSection(&m_mutex);
00137     }
00138 
00139     ~Mutex() {
00140         DeleteCriticalSection(&m_mutex);
00141     };
00142 
00143 private:
00144     CRITICAL_SECTION m_mutex;
00145 };
00146     
00147 //
00148 // A simple scoped lock class
00149 
00150 class ScopedLock
00151 {
00152 public:
00153 
00154     ScopedLock(Mutex& mutex) {
00155         this->lock(&mutex.m_mutex);
00156     };
00157 
00158     ScopedLock(CRITICAL_SECTION *lockP) {
00159         this->lock(lockP);
00160     };
00161 
00162     ScopedLock(CRITICAL_SECTION& lock) {
00163         this->lock(&lock);
00164     };
00165         
00166     ~ScopedLock() {
00167         LeaveCriticalSection(m_lockP);
00168     };
00169 
00170 private:
00171 
00172     void lock(CRITICAL_SECTION *lockP) {
00173         m_lockP = lockP;
00174         EnterCriticalSection(m_lockP);
00175     };
00176         
00177     CRITICAL_SECTION *m_lockP;
00178 };
00179 
00180 // A futex-based semaphore.
00181 //
00182 // This implementation does not work across processes.
00183 
00184 class Semaphore {
00185 public:
00186 
00187     //
00188     // Wait for a post (decrement). If thread contention,
00189     // we may wake up, but be unable to snatch
00190     // the bait.. hence the while loop.
00191 
00192     bool wait() {        
00193         do {
00194             if (0 == wait_(INFINITE))
00195                 return true;
00196         } while (1);
00197     };
00198 
00199     //
00200     // Wait for a post, retrying until timeout
00201 
00202     bool timedWait(const double& timeout) {
00203 
00204         if (timeout < 0.0)
00205             CRL_EXCEPTION("invalid timeout: %f", timeout);
00206 
00207         do {
00208             int32_t ret = wait_((DWORD)(timeout * 1000));
00209 
00210             if (0 == ret)
00211                 return true;
00212             else if (ETIMEDOUT == ret)
00213                 return false;
00214 
00215         } while (1);
00216     };
00217 
00218     //
00219     // Post to the semaphore (increment.) Here we
00220     // signal the futex to wake up any waiters.
00221     
00222     bool post() {
00223 
00224         return ReleaseSemaphore(m_handle, 1, NULL) != FALSE;
00225 
00226     };
00227 
00228     //
00229     // Decrement the semaphore to zero in one-shot.. may
00230     // fail with thread contention, returns true if
00231     // successful
00232     
00233     bool clear() {
00234         while(WaitForSingleObject (m_handle, 0) == WAIT_OBJECT_0)
00235         {
00236         }
00237         return true;
00238     };
00239 
00240     int32_t waiters  () { return m_waiters; };
00241     bool    decrement() { return wait();    };
00242     bool    increment() { return post();    };
00243 
00244     Semaphore(std::size_t max=0) :
00245         m_waiters(0)
00246     {
00247         m_handle = CreateSemaphore (NULL, 0, (max == 0 || max > LONG_MAX) ? LONG_MAX : max, NULL);
00248         if (m_handle == NULL)
00249             CRL_EXCEPTION ("CreateSemaphore() failed: %d\n", GetLastError());
00250     }
00251     
00252     ~Semaphore()
00253     {
00254         if (m_handle != NULL)
00255             CloseHandle (m_handle);
00256     }
00257     
00258 private:
00259 
00260     //
00261     // This actually does the synchronized decrement if possible, and goes
00262     // to sleep on the futex if not.
00263 
00264     inline int32_t wait_(DWORD ts=INFINITE) {
00265         InterlockedIncrement (&m_waiters);
00266         const int32_t ret = WaitForSingleObject (m_handle, ts);
00267         InterlockedDecrement (&m_waiters);
00268 
00269         if (ret == WAIT_OBJECT_0)
00270             return 0;
00271         else if (ret == WAIT_TIMEOUT)
00272             return ETIMEDOUT;
00273         else
00274             return EAGAIN;
00275     };
00276     
00277     HANDLE m_handle;
00278     LONG   m_waiters;
00279 };
00280 
00281 //
00282 // A templatized variable signaler
00283 
00284 template<class T> class WaitVar {
00285 public:
00286 
00287     void post(const T& data) {
00288         {
00289             ScopedLock lock(m_lock);
00290             m_val = data;
00291         }
00292         m_sem.post();
00293     };
00294 
00295     bool wait(T& data) {
00296         m_sem.wait();
00297         {
00298             ScopedLock lock(m_lock);
00299             data = m_val;
00300         }
00301         return true;
00302     };
00303 
00304     bool timedWait(T& data,
00305                    const double& timeout) {
00306 
00307         if (false == m_sem.timedWait(timeout))
00308             return false;
00309         {
00310             ScopedLock lock(m_lock);
00311             data = m_val;
00312         }
00313         return true;
00314     }
00315     
00316     //
00317     // Use a semaphore with max value of 1. The WaitVar will 
00318     // either be in a signaled state, or not.
00319 
00320     WaitVar() : m_val(),
00321                 m_lock(),
00322                 m_sem(1) {};
00323 
00324 private:
00325 
00326     T                 m_val;
00327     Mutex     m_lock;
00328     Semaphore m_sem;
00329 };
00330 
00331 //
00332 // A templatized wait queue
00333 
00334 template <class T> class WaitQueue {
00335 public:
00336 
00337     void post(const T& data) {
00338         bool postSem=true;
00339         {
00340             ScopedLock lock(m_lock);
00341 
00342             //
00343             // Limit deque size, if requested
00344 
00345             if (m_maximum > 0 && 
00346                 m_maximum == m_queue.size()) {
00347 
00348                 //
00349                 // If at max entries, we will pop_front the oldest,
00350                 // push_back the newest, and leave the semaphore alone
00351 
00352                 m_queue.pop_front();
00353                 postSem = false;
00354             }
00355 
00356             m_queue.push_back(data);
00357         }
00358         if (postSem) 
00359             m_sem.post();
00360     };
00361 
00362     void kick() {
00363         m_sem.post();
00364     };
00365 
00366     bool wait(T& data) {
00367         m_sem.wait();
00368         {
00369             ScopedLock lock(m_lock);
00370 
00371             if (0 == m_queue.size())
00372                 return false;
00373             else {
00374                 data = m_queue.front();
00375                 m_queue.pop_front();
00376                 return true;
00377             }
00378         }
00379     }
00380 
00381     uint32_t waiters() { 
00382         return m_sem.waiters();
00383     };
00384 
00385     uint32_t size() {
00386         ScopedLock lock(m_lock);
00387         return m_queue.size();
00388     }
00389         
00390     void clear() {
00391         ScopedLock lock(m_lock);
00392         m_queue.clear();
00393         while(false == m_sem.clear());
00394     }       
00395 
00396     WaitQueue(std::size_t max=0) : 
00397         m_maximum(max) {};
00398 
00399 private:
00400 
00401     const std::size_t m_maximum;
00402     std::deque<T>     m_queue;
00403     Mutex             m_lock;
00404     Semaphore         m_sem;
00405 };
00406 
00407 }}}} // namespaces
00408 
00409 #endif /* #ifndef CRL_MULTISENSE_THREAD_HH */


multisense_lib
Author(s):
autogenerated on Thu Aug 27 2015 14:01:11