00001
00041 #ifndef CRL_MULTISENSE_THREAD_HH
00042 #define CRL_MULTISENSE_THREAD_HH
00043
00044 #include <unistd.h>
00045 #include <stdint.h>
00046 #include <pthread.h>
00047 #include <sched.h>
00048 #include <errno.h>
00049 #include <string.h>
00050 #include <linux/futex.h>
00051 #include <unistd.h>
00052 #include <sys/syscall.h>
00053
00054 #include <vector>
00055 #include <deque>
00056
00057 #include "details/utility/Portability.hh"
00058
00059 #include "../Exception.hh"
00060
00061 namespace crl {
00062 namespace multisense {
00063 namespace details {
00064 namespace utility {
00065
00066
00067
00068
00069 class ScopedLock;
00070
00071
00072
00073
00074 class Thread {
00075 public:
00076
00077 static CRL_CONSTEXPR uint32_t FLAGS_DETACH = (1 << 0);
00078
00079 Thread(void *(*functionP)(void *),
00080 void *contextP=NULL,
00081 uint32_t flags=0,
00082 int32_t scheduler=-1,
00083 int32_t priority=0) : m_flags(flags) {
00084
00085 pthread_attr_t tattr;
00086 pthread_attr_init(&tattr);
00087
00088
00089
00090
00091 if (-1 != scheduler) {
00092 struct sched_param sattr = {0};
00093
00094
00095
00096
00097 if (0 != pthread_attr_setschedpolicy(&tattr, scheduler))
00098 CRL_EXCEPTION("pthread_attr_setschedpolicy(scheduler=%d) failed: %s",
00099 scheduler, strerror(errno));
00100
00101
00102
00103 sattr.sched_priority = priority;
00104 if (0 != pthread_attr_setschedparam(&tattr, &sattr))
00105 CRL_EXCEPTION("pthread_attr_setschedparam(pri=%d) failed: %s",
00106 priority, strerror(errno));
00107
00108
00109
00110
00111 if (0 != pthread_attr_setinheritsched(&tattr, PTHREAD_EXPLICIT_SCHED))
00112 CRL_EXCEPTION("pthread_attr_setinheritsched(explicit) failed: %s",
00113 strerror(errno));
00114 }
00115
00116
00117
00118
00119 if (FLAGS_DETACH & m_flags &&
00120 0 != pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED))
00121 CRL_EXCEPTION("pthread_attr_setdetachstate() failed: %s", strerror(errno));
00122
00123
00124
00125
00126 if (0 != pthread_create(&m_id, &tattr, functionP, contextP))
00127 CRL_EXCEPTION("pthread_create() failed: %s", strerror(errno));
00128 };
00129
00130 ~Thread() {
00131 if (!(m_flags & FLAGS_DETACH) &&
00132 0 != pthread_join(m_id, NULL))
00133 CRL_DEBUG("pthread_join() failed: %s\n", strerror(errno));
00134 };
00135
00136 private:
00137
00138 uint32_t m_flags;
00139 pthread_t m_id;
00140 };
00141
00142
00143
00144
00145 class Mutex {
00146 public:
00147 friend class ScopedLock;
00148
00149 Mutex() : m_mutex() {
00150 if (0 != pthread_mutex_init(&m_mutex, NULL))
00151 CRL_EXCEPTION("pthread_mutex_init() failed: %s",
00152 strerror(errno));
00153 }
00154
00155 ~Mutex() {
00156 pthread_mutex_destroy(&m_mutex);
00157 };
00158
00159 private:
00160 pthread_mutex_t m_mutex;
00161 };
00162
00163
00164
00165
00166 class ScopedLock
00167 {
00168 public:
00169
00170 ScopedLock(Mutex& mutex) {
00171 this->lock(&mutex.m_mutex);
00172 };
00173
00174 ScopedLock(pthread_mutex_t *lockP) {
00175 this->lock(lockP);
00176 };
00177
00178 ScopedLock(pthread_mutex_t& lock) {
00179 this->lock(&lock);
00180 };
00181
00182 ~ScopedLock() {
00183 pthread_mutex_unlock(m_lockP);
00184 };
00185
00186 private:
00187
00188 void lock(pthread_mutex_t *lockP) {
00189 m_lockP = lockP;
00190 pthread_mutex_lock(m_lockP);
00191 };
00192
00193 pthread_mutex_t *m_lockP;
00194 };
00195
00196
00197
00198
00199
00200 class Semaphore {
00201 public:
00202
00203
00204
00205
00206
00207
00208 bool wait() {
00209 do {
00210 if (0 == wait_())
00211 return true;
00212 } while (1);
00213 };
00214
00215
00216
00217
00218 bool timedWait(const double& timeout) {
00219
00220 if (timeout < 0.0)
00221 CRL_EXCEPTION("invalid timeout: %f", timeout);
00222
00223 struct timespec ts;
00224 ts.tv_sec = timeout;
00225 ts.tv_nsec = (timeout - ts.tv_sec) * 1e9;
00226
00227 do {
00228 int32_t ret = wait_(&ts);
00229
00230 if (0 == ret)
00231 return true;
00232 else if (ETIMEDOUT == ret)
00233 return false;
00234
00235 } while (1);
00236 };
00237
00238
00239
00240
00241
00242 bool post() {
00243
00244
00245
00246
00247 if (m_maximum > 0 && m_avail >= static_cast<int>(m_maximum))
00248 return false;
00249
00250 const int32_t nval = __sync_add_and_fetch(&m_avail, 1);
00251 if (m_waiters > 0)
00252 syscall(__NR_futex, &m_avail, FUTEX_WAKE, nval, NULL, 0, 0);
00253
00254 return true;
00255 };
00256
00257
00258
00259
00260
00261
00262 bool clear() {
00263 int32_t val = m_avail;
00264 if (val > 0)
00265 return __sync_bool_compare_and_swap(&m_avail, val, 0);
00266 return true;
00267 };
00268
00269 int32_t count () { return m_avail; };
00270 int32_t waiters () { return m_waiters; };
00271 bool decrement() { return wait(); };
00272 bool increment() { return post(); };
00273
00274 Semaphore(std::size_t max=0) :
00275 m_maximum(max),
00276 m_avail(0),
00277 m_waiters(0) {};
00278
00279 ~Semaphore() {};
00280
00281 private:
00282
00283
00284
00285
00286
00287 inline int32_t wait_(const struct timespec *tsP=NULL) {
00288
00289
00290
00291
00292 const int32_t val = m_avail;
00293 if (val >= 1 && __sync_bool_compare_and_swap(&m_avail, val, val - 1))
00294 return 0;
00295
00296
00297
00298
00299
00300 __sync_fetch_and_add(&m_waiters, 1);
00301 const int32_t ret = syscall(__NR_futex, &m_avail, FUTEX_WAIT, val, tsP, 0, 0);
00302 __sync_fetch_and_sub(&m_waiters, 1);
00303
00304
00305
00306
00307
00308 if (ETIMEDOUT == ret || -1 == ret)
00309 return ETIMEDOUT;
00310 else
00311 return EAGAIN;
00312 };
00313
00314 typedef int32_t aligned_int32_t __attribute__((aligned (4)));
00315
00316 const std::size_t m_maximum;
00317 aligned_int32_t m_avail;
00318 aligned_int32_t m_waiters;
00319 };
00320
00321
00322
00323
00324 template<class T> class WaitVar {
00325 public:
00326
00327 void post(const T& data) {
00328 {
00329 ScopedLock lock(m_lock);
00330 m_val = data;
00331 }
00332 m_sem.post();
00333 };
00334
00335 bool wait(T& data) {
00336 m_sem.wait();
00337 {
00338 ScopedLock lock(m_lock);
00339 data = m_val;
00340 }
00341 return true;
00342 };
00343
00344 bool timedWait(T& data,
00345 const double& timeout) {
00346
00347 if (false == m_sem.timedWait(timeout))
00348 return false;
00349 {
00350 ScopedLock lock(m_lock);
00351 data = m_val;
00352 }
00353 return true;
00354 }
00355
00356
00357
00358
00359
00360 WaitVar() : m_val(),
00361 m_lock(),
00362 m_sem(1) {};
00363
00364 private:
00365
00366 T m_val;
00367 Mutex m_lock;
00368 Semaphore m_sem;
00369 };
00370
00371
00372
00373
00374 template <class T> class WaitQueue {
00375 public:
00376
00377 void post(const T& data) {
00378 bool postSem=true;
00379 {
00380 ScopedLock lock(m_lock);
00381
00382
00383
00384
00385 if (m_maximum > 0 &&
00386 m_maximum == m_queue.size()) {
00387
00388
00389
00390
00391
00392 m_queue.pop_front();
00393 postSem = false;
00394 }
00395
00396 m_queue.push_back(data);
00397 }
00398 if (postSem)
00399 m_sem.post();
00400 };
00401
00402 void kick() {
00403 m_sem.post();
00404 };
00405
00406 bool wait(T& data) {
00407 m_sem.wait();
00408 {
00409 ScopedLock lock(m_lock);
00410
00411 if (0 == m_queue.size())
00412 return false;
00413 else {
00414 data = m_queue.front();
00415 m_queue.pop_front();
00416 return true;
00417 }
00418 }
00419 }
00420
00421 uint32_t waiters() {
00422 return m_sem.waiters();
00423 };
00424
00425 uint32_t size() {
00426 ScopedLock lock(m_lock);
00427 return m_queue.size();
00428 }
00429
00430 void clear() {
00431 ScopedLock lock(m_lock);
00432 m_queue.clear();
00433 while(false == m_sem.clear());
00434 }
00435
00436 WaitQueue(std::size_t max=0) :
00437 m_maximum(max) {};
00438
00439 private:
00440
00441 const std::size_t m_maximum;
00442 std::deque<T> m_queue;
00443 Mutex m_lock;
00444 Semaphore m_sem;
00445 };
00446
00447 }}}}
00448
00449 #endif