Thread.hh
Go to the documentation of this file.
00001 
00041 #ifndef CRL_MULTISENSE_THREAD_HH
00042 #define CRL_MULTISENSE_THREAD_HH
00043 
00044 #include <unistd.h>
00045 #include <stdint.h>
00046 #include <pthread.h>
00047 #include <sched.h>
00048 #include <errno.h>
00049 #include <string.h>
00050 #include <linux/futex.h>
00051 #include <unistd.h>
00052 #include <sys/syscall.h>
00053 
00054 #include <vector>
00055 #include <deque>
00056 
00057 #include "details/utility/Portability.hh"
00058 
00059 #include "../Exception.hh"
00060 
00061 namespace crl {
00062 namespace multisense {
00063 namespace details {
00064 namespace utility {
00065 
00066 //
00067 // Forward declarations.
00068 
00069 class ScopedLock;
00070 
00071 //
00072 // A simple class to wrap pthread creation and joining
00073 
00074 class Thread {
00075 public:
00076 
00077     static CRL_CONSTEXPR uint32_t FLAGS_DETACH = (1 << 0);
00078 
00079     Thread(void    *(*functionP)(void *),
00080            void    *contextP=NULL,
00081            uint32_t flags=0,    
00082            int32_t  scheduler=-1,
00083            int32_t  priority=0) : m_flags(flags) {
00084 
00085         pthread_attr_t tattr;        
00086         pthread_attr_init(&tattr);
00087 
00088         //
00089         // -1 means the user wants default scheduling behavior
00090 
00091         if (-1 != scheduler) {
00092             struct sched_param sattr = {0};
00093 
00094             //
00095             // Set our scheduling policy
00096 
00097             if (0 != pthread_attr_setschedpolicy(&tattr, scheduler))
00098                 CRL_EXCEPTION("pthread_attr_setschedpolicy(scheduler=%d) failed: %s",
00099                                    scheduler, strerror(errno));
00100             //
00101             // Set our scheduling parameters (just priority)
00102 
00103             sattr.sched_priority = priority;
00104             if (0 != pthread_attr_setschedparam(&tattr, &sattr))
00105                 CRL_EXCEPTION("pthread_attr_setschedparam(pri=%d) failed: %s", 
00106                                    priority, strerror(errno));
00107             //
00108             // We must set EXPLICIT_SCHED so the parent's scheduler is not 
00109             // automatically inherited
00110 
00111             if (0 != pthread_attr_setinheritsched(&tattr, PTHREAD_EXPLICIT_SCHED))
00112                 CRL_EXCEPTION("pthread_attr_setinheritsched(explicit) failed: %s", 
00113                                    strerror(errno));
00114         }
00115 
00116         //
00117         // Create detached, if asked to do so
00118 
00119         if (FLAGS_DETACH & m_flags && 
00120             0 != pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED))
00121             CRL_EXCEPTION("pthread_attr_setdetachstate() failed: %s", strerror(errno));
00122         
00123         //
00124         // Finally, create the thread
00125 
00126         if (0 != pthread_create(&m_id, &tattr, functionP, contextP))
00127             CRL_EXCEPTION("pthread_create() failed: %s", strerror(errno));
00128     };
00129 
00130     ~Thread() {
00131         if (!(m_flags & FLAGS_DETACH) &&
00132             0 != pthread_join(m_id, NULL))
00133             CRL_DEBUG("pthread_join() failed: %s\n", strerror(errno));
00134     };          
00135     
00136 private:
00137 
00138     uint32_t  m_flags;
00139     pthread_t m_id;
00140 };
00141 
00142 //
00143 // A simple mutex class
00144 
00145 class Mutex {
00146 public:
00147     friend class ScopedLock;
00148     
00149     Mutex() : m_mutex() {
00150         if (0 != pthread_mutex_init(&m_mutex, NULL))
00151             CRL_EXCEPTION("pthread_mutex_init() failed: %s",
00152                           strerror(errno));
00153     }
00154 
00155     ~Mutex() {
00156         pthread_mutex_destroy(&m_mutex);
00157     };
00158 
00159 private:
00160     pthread_mutex_t m_mutex;
00161 };
00162     
00163 //
00164 // A simple scoped lock class
00165 
00166 class ScopedLock
00167 {
00168 public:
00169 
00170     ScopedLock(Mutex& mutex) {
00171         this->lock(&mutex.m_mutex);
00172     };
00173 
00174     ScopedLock(pthread_mutex_t *lockP) {
00175         this->lock(lockP);
00176     };
00177 
00178     ScopedLock(pthread_mutex_t& lock) {
00179         this->lock(&lock);
00180     };
00181         
00182     ~ScopedLock() {
00183         pthread_mutex_unlock(m_lockP);
00184     };
00185 
00186 private:
00187 
00188     void lock(pthread_mutex_t *lockP) {
00189         m_lockP = lockP;
00190         pthread_mutex_lock(m_lockP);
00191     };
00192         
00193     pthread_mutex_t *m_lockP;
00194 };
00195 
00196 // A futex-based semaphore.
00197 //
00198 // This implementation does not work across processes.
00199 
00200 class Semaphore {
00201 public:
00202 
00203     //
00204     // Wait for a post (decrement). If thread contention,
00205     // we may wake up, but be unable to snatch
00206     // the bait.. hence the while loop.
00207 
00208     bool wait() {        
00209         do {
00210             if (0 == wait_())
00211                 return true;
00212         } while (1);
00213     };
00214 
00215     //
00216     // Wait for a post, retrying until timeout
00217 
00218     bool timedWait(const double& timeout) {
00219 
00220         if (timeout < 0.0)
00221             CRL_EXCEPTION("invalid timeout: %f", timeout);
00222 
00223         struct timespec ts;
00224         ts.tv_sec  = timeout;
00225         ts.tv_nsec = (timeout - ts.tv_sec) * 1e9;
00226         
00227         do {
00228             int32_t ret = wait_(&ts);
00229 
00230             if (0 == ret)
00231                 return true;
00232             else if (ETIMEDOUT == ret)
00233                 return false;
00234 
00235         } while (1);
00236     };
00237 
00238     //
00239     // Post to the semaphore (increment.) Here we
00240     // signal the futex to wake up any waiters.
00241     
00242     bool post() {
00243 
00244         //
00245         // Limit the posts, if asked to do so
00246 
00247         if (m_maximum > 0 && m_avail >= static_cast<int>(m_maximum))
00248             return false;
00249         
00250         const int32_t nval = __sync_add_and_fetch(&m_avail, 1);
00251         if (m_waiters > 0)
00252             syscall(__NR_futex, &m_avail, FUTEX_WAKE, nval, NULL, 0, 0);
00253 
00254         return true;
00255     };
00256 
00257     //
00258     // Decrement the semaphore to zero in one-shot.. may
00259     // fail with thread contention, returns true if
00260     // successful
00261     
00262     bool clear() {
00263         int32_t val = m_avail;
00264         if (val > 0)
00265             return __sync_bool_compare_and_swap(&m_avail, val, 0);
00266         return true;
00267     };
00268 
00269     int32_t count    () { return m_avail;   };
00270     int32_t waiters  () { return m_waiters; };
00271     bool    decrement() { return wait();    };
00272     bool    increment() { return post();    };
00273 
00274     Semaphore(std::size_t max=0) :
00275         m_maximum(max),
00276         m_avail(0),
00277         m_waiters(0) {};
00278     
00279     ~Semaphore() {};
00280     
00281 private:
00282 
00283     //
00284     // This actually does the synchronized decrement if possible, and goes
00285     // to sleep on the futex if not.
00286 
00287     inline int32_t wait_(const struct timespec *tsP=NULL) {
00288         
00289         //
00290         // Can we decrement the requested amount? If so, return success
00291         
00292         const int32_t val = m_avail;
00293         if (val >= 1 && __sync_bool_compare_and_swap(&m_avail, val, val - 1))
00294             return 0;
00295 
00296         //
00297         // We must go to sleep until someone increments. Also keep track of
00298         // how many threads are waiting on this futex.
00299 
00300         __sync_fetch_and_add(&m_waiters, 1);
00301         const int32_t ret = syscall(__NR_futex, &m_avail, FUTEX_WAIT, val, tsP, 0, 0);
00302         __sync_fetch_and_sub(&m_waiters, 1);
00303 
00304         //
00305         // If we just woke up on the futex, then return EAGAIN, so we
00306         // can come back in here and attempt a synchronized decrement again.
00307 
00308         if (ETIMEDOUT == ret || -1 == ret) // hmmm.. timeouts are returning -1
00309             return ETIMEDOUT;
00310         else
00311             return EAGAIN;
00312     };
00313     
00314     typedef int32_t aligned_int32_t __attribute__((aligned (4))); // unnecessary ?
00315     
00316     const std::size_t m_maximum;
00317     aligned_int32_t   m_avail;
00318     aligned_int32_t   m_waiters;
00319 };
00320 
00321 //
00322 // A templatized variable signaler
00323 
00324 template<class T> class WaitVar {
00325 public:
00326 
00327     void post(const T& data) {
00328         {
00329             ScopedLock lock(m_lock);
00330             m_val = data;
00331         }
00332         m_sem.post();
00333     };
00334 
00335     bool wait(T& data) {
00336         m_sem.wait();
00337         {
00338             ScopedLock lock(m_lock);
00339             data = m_val;
00340         }
00341         return true;
00342     };
00343 
00344     bool timedWait(T& data,
00345                    const double& timeout) {
00346 
00347         if (false == m_sem.timedWait(timeout))
00348             return false;
00349         {
00350             ScopedLock lock(m_lock);
00351             data = m_val;
00352         }
00353         return true;
00354     }
00355     
00356     //
00357     // Use a semaphore with max value of 1. The WaitVar will 
00358     // either be in a signaled state, or not.
00359 
00360     WaitVar() : m_val(),
00361                 m_lock(),
00362                 m_sem(1) {};
00363 
00364 private:
00365 
00366     T                 m_val;
00367     Mutex     m_lock;
00368     Semaphore m_sem;
00369 };
00370 
00371 //
00372 // A templatized wait queue
00373 
00374 template <class T> class WaitQueue {
00375 public:
00376 
00377     void post(const T& data) {
00378         bool postSem=true;
00379         {
00380             ScopedLock lock(m_lock);
00381 
00382             //
00383             // Limit deque size, if requested
00384 
00385             if (m_maximum > 0 && 
00386                 m_maximum == m_queue.size()) {
00387 
00388                 //
00389                 // If at max entries, we will pop_front the oldest,
00390                 // push_back the newest, and leave the semaphore alone
00391 
00392                 m_queue.pop_front();
00393                 postSem = false;
00394             }
00395 
00396             m_queue.push_back(data);
00397         }
00398         if (postSem) 
00399             m_sem.post();
00400     };
00401 
00402     void kick() {
00403         m_sem.post();
00404     };
00405 
00406     bool wait(T& data) {
00407         m_sem.wait();
00408         {
00409             ScopedLock lock(m_lock);
00410 
00411             if (0 == m_queue.size())
00412                 return false;
00413             else {
00414                 data = m_queue.front();
00415                 m_queue.pop_front();
00416                 return true;
00417             }
00418         }
00419     }
00420 
00421     uint32_t waiters() { 
00422         return m_sem.waiters();
00423     };
00424 
00425     uint32_t size() {
00426         ScopedLock lock(m_lock);
00427         return m_queue.size();
00428     }
00429         
00430     void clear() {
00431         ScopedLock lock(m_lock);
00432         m_queue.clear();
00433         while(false == m_sem.clear());
00434     }       
00435 
00436     WaitQueue(std::size_t max=0) : 
00437         m_maximum(max) {};
00438 
00439 private:
00440 
00441     const std::size_t m_maximum;
00442     std::deque<T>     m_queue;
00443     Mutex             m_lock;
00444     Semaphore         m_sem;
00445 };
00446 
00447 }}}} // namespaces
00448 
00449 #endif /* #ifndef CRL_MULTISENSE_THREAD_HH */


multisense_lib
Author(s):
autogenerated on Mon Oct 9 2017 03:06:21