41 #ifndef CRL_MULTISENSE_THREAD_HH 42 #define CRL_MULTISENSE_THREAD_HH 50 #include <linux/futex.h> 52 #include <sys/syscall.h> 59 #include "../Exception.hh" 62 namespace multisense {
83 int32_t priority=0) :
m_flags(flags) {
86 pthread_attr_init(&tattr);
91 if (-1 != scheduler) {
92 struct sched_param sattr = {0};
97 if (0 != pthread_attr_setschedpolicy(&tattr, scheduler))
98 CRL_EXCEPTION(
"pthread_attr_setschedpolicy(scheduler=%d) failed: %s",
99 scheduler, strerror(errno));
103 sattr.sched_priority = priority;
104 if (0 != pthread_attr_setschedparam(&tattr, &sattr))
105 CRL_EXCEPTION(
"pthread_attr_setschedparam(pri=%d) failed: %s",
106 priority, strerror(errno));
111 if (0 != pthread_attr_setinheritsched(&tattr, PTHREAD_EXPLICIT_SCHED))
112 CRL_EXCEPTION(
"pthread_attr_setinheritsched(explicit) failed: %s",
120 0 != pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED))
121 CRL_EXCEPTION(
"pthread_attr_setdetachstate() failed: %s", strerror(errno));
126 if (0 != pthread_create(&
m_id, &tattr, functionP, contextP))
127 CRL_EXCEPTION(
"pthread_create() failed: %s", strerror(errno));
131 if (!(
m_flags & FLAGS_DETACH) &&
132 0 != pthread_join(
m_id, NULL))
133 CRL_DEBUG(
"pthread_join() failed: %s\n", strerror(errno));
150 if (0 != pthread_mutex_init(&m_mutex, NULL))
156 pthread_mutex_destroy(&m_mutex);
183 pthread_mutex_unlock(m_lockP);
188 void lock(pthread_mutex_t *lockP) {
190 pthread_mutex_lock(m_lockP);
225 ts.tv_nsec = (timeout - ts.tv_sec) * 1e9;
228 int32_t ret = wait_(&ts);
232 else if (ETIMEDOUT == ret)
247 if (m_maximum > 0 && m_avail >= static_cast<int>(m_maximum))
250 const int32_t nval = __sync_add_and_fetch(&m_avail, 1);
252 syscall(__NR_futex, &m_avail, FUTEX_WAKE, nval, NULL, 0, 0);
263 int32_t val = m_avail;
265 return __sync_bool_compare_and_swap(&m_avail, val, 0);
269 int32_t
count () {
return m_avail; };
287 inline int32_t
wait_(
const struct timespec *tsP=NULL) {
292 const int32_t val = m_avail;
293 if (val >= 1 && __sync_bool_compare_and_swap(&m_avail, val, val - 1))
300 __sync_fetch_and_add(&m_waiters, 1);
301 const int32_t ret = syscall(__NR_futex, &m_avail, FUTEX_WAIT, val, tsP, 0, 0);
302 __sync_fetch_and_sub(&m_waiters, 1);
308 if (ETIMEDOUT == ret || -1 == ret)
345 const double& timeout) {
347 if (
false == m_sem.timedWait(timeout))
386 m_maximum == m_queue.size()) {
396 m_queue.push_back(data);
411 if (0 == m_queue.size())
414 data = m_queue.front();
422 return m_sem.waiters();
427 return m_queue.size();
433 while(
false == m_sem.clear());
pthread_mutex_t * m_lockP
#define CRL_EXCEPTION(fmt,...)
bool timedWait(T &data, const double &timeout)
int32_t wait_(const struct timespec *tsP=NULL)
ScopedLock(pthread_mutex_t &lock)
bool timedWait(const double &timeout)
ScopedLock(pthread_mutex_t *lockP)
static CRL_CONSTEXPR uint32_t FLAGS_DETACH
const std::size_t m_maximum
WaitQueue(std::size_t max=0)
aligned_int32_t m_waiters
Thread(void *(*functionP)(void *), void *contextP=NULL, uint32_t flags=0, int32_t scheduler=-1, int32_t priority=0)
Semaphore(std::size_t max=0)
#define CRL_DEBUG(fmt,...)
void lock(pthread_mutex_t *lockP)
int32_t aligned_int32_t __attribute__((aligned(4)))
const std::size_t m_maximum