linux/Thread.hh
Go to the documentation of this file.
1 
41 #ifndef CRL_MULTISENSE_THREAD_HH
42 #define CRL_MULTISENSE_THREAD_HH
43 
44 #include <unistd.h>
45 #include <stdint.h>
46 #include <pthread.h>
47 #include <sched.h>
48 #include <errno.h>
49 #include <string.h>
50 #include <linux/futex.h>
51 #include <sys/syscall.h>
52 
53 #include <vector>
54 #include <deque>
55 
57 
58 #include <utility/Exception.hh>
59 
60 namespace crl {
61 namespace multisense {
62 namespace details {
63 namespace utility {
64 
65 //
66 // Forward declarations.
67 
68 class ScopedLock;
69 
70 //
71 // A simple class to wrap pthread creation and joining
72 
73 class Thread {
74 public:
75 
76  static CRL_CONSTEXPR uint32_t FLAGS_DETACH = (1 << 0);
77 
78  Thread(void *(*functionP)(void *),
79  void *contextP=NULL,
80  uint32_t flags=0,
81  int32_t scheduler=-1,
82  int32_t priority=0) : m_flags(flags) {
83 
84  pthread_attr_t tattr;
85  pthread_attr_init(&tattr);
86 
87  //
88  // -1 means the user wants default scheduling behavior
89 
90  if (-1 != scheduler) {
91  struct sched_param sattr = {0};
92 
93  //
94  // Set our scheduling policy
95 
96  if (0 != pthread_attr_setschedpolicy(&tattr, scheduler))
97  CRL_EXCEPTION("pthread_attr_setschedpolicy(scheduler=%d) failed: %s",
98  scheduler, strerror(errno));
99  //
100  // Set our scheduling parameters (just priority)
101 
102  sattr.sched_priority = priority;
103  if (0 != pthread_attr_setschedparam(&tattr, &sattr))
104  CRL_EXCEPTION("pthread_attr_setschedparam(pri=%d) failed: %s",
105  priority, strerror(errno));
106  //
107  // We must set EXPLICIT_SCHED so the parent's scheduler is not
108  // automatically inherited
109 
110  if (0 != pthread_attr_setinheritsched(&tattr, PTHREAD_EXPLICIT_SCHED))
111  CRL_EXCEPTION("pthread_attr_setinheritsched(explicit) failed: %s",
112  strerror(errno));
113  }
114 
115  //
116  // Create detached, if asked to do so
117 
118  if (FLAGS_DETACH & m_flags &&
119  0 != pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED))
120  CRL_EXCEPTION("pthread_attr_setdetachstate() failed: %s", strerror(errno));
121 
122  //
123  // Finally, create the thread
124 
125  if (0 != pthread_create(&m_id, &tattr, functionP, contextP))
126  CRL_EXCEPTION("pthread_create() failed: %s", strerror(errno));
127  };
128 
130  if (!(m_flags & FLAGS_DETACH) &&
131  0 !=pthread_join(m_id, NULL))
132  CRL_DEBUG("pthread_join() failed: %s\n", strerror(errno));
133  };
134 
135 private:
136 
137  uint32_t m_flags;
138  pthread_t m_id;
139 };
140 
141 //
142 // A simple mutex class
143 
144 class Mutex {
145 public:
146  friend class ScopedLock;
147 
148  Mutex() : m_mutex() {
149  if (0 != pthread_mutex_init(&m_mutex, NULL))
150  CRL_EXCEPTION("pthread_mutex_init() failed: %s",
151  strerror(errno));
152  }
153 
154  ~Mutex() {
155  pthread_mutex_destroy(&m_mutex);
156  };
157 
158 private:
159  pthread_mutex_t m_mutex;
160 };
161 
162 //
163 // A simple scoped lock class
164 
166 {
167 public:
168 
169  ScopedLock(Mutex& mutex) {
170  this->lock(&mutex.m_mutex);
171  };
172 
173  ScopedLock(pthread_mutex_t *lockP) {
174  this->lock(lockP);
175  };
176 
177  ScopedLock(pthread_mutex_t& lock) {
178  this->lock(&lock);
179  };
180 
182  pthread_mutex_unlock(m_lockP);
183  };
184 
185 private:
186 
187  void lock(pthread_mutex_t *lockP) {
188  m_lockP = lockP;
189  pthread_mutex_lock(m_lockP);
190  };
191 
192  pthread_mutex_t *m_lockP;
193 };
194 
195 // A futex-based semaphore.
196 //
197 // This implementation does not work across processes.
198 
199 class Semaphore {
200 public:
201 
202  //
203  // Wait for a post (decrement). If thread contention,
204  // we may wake up, but be unable to snatch
205  // the bait.. hence the while loop.
206 
207  bool wait() {
208  do {
209  if (0 == wait_())
210  return true;
211  } while (1);
212  };
213 
214  //
215  // Wait for a post, retrying until timeout
216 
217  bool timedWait(const double& timeout) {
218 
219  if (timeout < 0.0)
220  CRL_EXCEPTION("invalid timeout: %f", timeout);
221 
222  struct timespec ts;
223  ts.tv_sec = timeout;
224  ts.tv_nsec = (timeout - ts.tv_sec) * 1e9;
225 
226  do {
227  int32_t ret = wait_(&ts);
228 
229  if (0 == ret)
230  return true;
231  else if (ETIMEDOUT == ret)
232  return false;
233 
234  } while (1);
235  };
236 
237  //
238  // Post to the semaphore (increment.) Here we
239  // signal the futex to wake up any waiters.
240 
241  bool post() {
242 
243  //
244  // Limit the posts, if asked to do so
245 
246  if (m_maximum > 0 && m_avail >= static_cast<int>(m_maximum))
247  return false;
248 
249  const int32_t nval = __sync_add_and_fetch(&m_avail, 1);
250  if (m_waiters > 0)
251  syscall(__NR_futex, &m_avail, FUTEX_WAKE, nval, NULL, 0, 0);
252 
253  return true;
254  };
255 
256  //
257  // Decrement the semaphore to zero in one-shot.. may
258  // fail with thread contention, returns true if
259  // successful
260 
261  bool clear() {
262  int32_t val = m_avail;
263  if (val > 0)
264  return __sync_bool_compare_and_swap(&m_avail, val, 0);
265  return true;
266  };
267 
268  int32_t count () { return m_avail; };
269  int32_t waiters () { return m_waiters; };
270  bool decrement() { return wait(); };
271  bool increment() { return post(); };
272 
273  Semaphore(std::size_t max=0) :
274  m_maximum(max),
275  m_avail(0),
276  m_waiters(0) {};
277 
279 
280 private:
281 
282  //
283  // This actually does the synchronized decrement if possible, and goes
284  // to sleep on the futex if not.
285 
286  inline int32_t wait_(const struct timespec *tsP=NULL) {
287 
288  //
289  // Can we decrement the requested amount? If so, return success
290 
291  const int32_t val = m_avail;
292  if (val >= 1 && __sync_bool_compare_and_swap(&m_avail, val, val - 1))
293  return 0;
294 
295  //
296  // We must go to sleep until someone increments. Also keep track of
297  // how many threads are waiting on this futex.
298 
299  __sync_fetch_and_add(&m_waiters, 1);
300  const int32_t ret = syscall(__NR_futex, &m_avail, FUTEX_WAIT, val, tsP, 0, 0);
301  __sync_fetch_and_sub(&m_waiters, 1);
302 
303  //
304  // If we just woke up on the futex, then return EAGAIN, so we
305  // can come back in here and attempt a synchronized decrement again.
306 
307  if (ETIMEDOUT == ret || -1 == ret) // hmmm.. timeouts are returning -1
308  return ETIMEDOUT;
309  else
310  return EAGAIN;
311  };
312 
313  typedef int32_t aligned_int32_t __attribute__((aligned (4))); // unnecessary ?
314 
315  const std::size_t m_maximum;
316  aligned_int32_t m_avail;
317  aligned_int32_t m_waiters;
318 };
319 
320 //
321 // A templatized variable signaler
322 
323 template<class T> class WaitVar {
324 public:
325 
326  void post(const T& data) {
327  {
328  ScopedLock lock(m_lock);
329  m_val = data;
330  }
331  m_sem.post();
332  };
333 
334  bool wait(T& data) {
335  m_sem.wait();
336  {
337  ScopedLock lock(m_lock);
338  data = m_val;
339  }
340  return true;
341  };
342 
343  bool timedWait(T& data,
344  const double& timeout) {
345 
346  if (false == m_sem.timedWait(timeout))
347  return false;
348  {
349  ScopedLock lock(m_lock);
350  data = m_val;
351  }
352  return true;
353  }
354 
355  //
356  // Use a semaphore with max value of 1. The WaitVar will
357  // either be in a signaled state, or not.
358 
359  WaitVar() : m_val(),
360  m_lock(),
361  m_sem(1) {};
362 
363 private:
364 
365  T m_val;
368 };
369 
370 //
371 // A templatized wait queue
372 
373 template <class T> class WaitQueue {
374 public:
375 
376  void post(const T& data) {
377  bool postSem=true;
378  {
379  ScopedLock lock(m_lock);
380 
381  //
382  // Limit deque size, if requested
383 
384  if (m_maximum > 0 &&
385  m_maximum == m_queue.size()) {
386 
387  //
388  // If at max entries, we will pop_front the oldest,
389  // push_back the newest, and leave the semaphore alone
390 
391  m_queue.pop_front();
392  postSem = false;
393  }
394 
395  m_queue.push_back(data);
396  }
397  if (postSem)
398  m_sem.post();
399  };
400 
401  void kick() {
402  m_sem.post();
403  };
404 
405  bool wait(T& data) {
406  m_sem.wait();
407  {
408  ScopedLock lock(m_lock);
409 
410  if (0 == m_queue.size())
411  return false;
412  else {
413  data = m_queue.front();
414  m_queue.pop_front();
415  return true;
416  }
417  }
418  }
419 
420  uint32_t waiters() {
421  return m_sem.waiters();
422  };
423 
424  uint32_t size() {
425  ScopedLock lock(m_lock);
426  return m_queue.size();
427  }
428 
429  void clear() {
430  ScopedLock lock(m_lock);
431  m_queue.clear();
432  while(false == m_sem.clear());
433  }
434 
435  WaitQueue(std::size_t max=0) :
436  m_maximum(max) {};
437 
438 private:
439 
440  const std::size_t m_maximum;
441  std::deque<T> m_queue;
444 };
445 
446 }}}} // namespaces
447 
448 #endif /* #ifndef CRL_MULTISENSE_THREAD_HH */
crl::multisense::details::utility::Semaphore::~Semaphore
~Semaphore()
Definition: linux/Thread.hh:278
crl::multisense::details::utility::WaitQueue::size
uint32_t size()
Definition: linux/Thread.hh:424
crl::multisense::details::utility::Mutex
Definition: linux/Thread.hh:144
crl::multisense::details::utility::Semaphore::decrement
bool decrement()
Definition: linux/Thread.hh:270
crl::multisense::details::utility::Semaphore::m_avail
aligned_int32_t m_avail
Definition: linux/Thread.hh:316
CRL_DEBUG
#define CRL_DEBUG(fmt,...)
Definition: Exception.hh:71
Exception.hh
crl::multisense::details::utility::WaitQueue::wait
bool wait(T &data)
Definition: linux/Thread.hh:405
crl::multisense::details::utility::Semaphore::__attribute__
int32_t aligned_int32_t __attribute__((aligned(4)))
Definition: linux/Thread.hh:313
crl::multisense::details::utility::WaitQueue::m_maximum
const std::size_t m_maximum
Definition: linux/Thread.hh:436
crl::multisense::details::utility::WaitVar
Definition: linux/Thread.hh:323
crl::multisense::details::utility::ScopedLock::ScopedLock
ScopedLock(pthread_mutex_t &lock)
Definition: linux/Thread.hh:177
crl::multisense::details::utility::WaitQueue::m_sem
Semaphore m_sem
Definition: linux/Thread.hh:443
CRL_CONSTEXPR
#define CRL_CONSTEXPR
Definition: Legacy/include/MultiSense/details/utility/Portability.hh:49
crl::multisense::details::utility::Semaphore::count
int32_t count()
Definition: linux/Thread.hh:268
crl::multisense::details::utility::Semaphore::wait
bool wait()
Definition: linux/Thread.hh:207
crl::multisense::details::utility::WaitQueue::kick
void kick()
Definition: linux/Thread.hh:401
crl::multisense::details::utility::Semaphore::clear
bool clear()
Definition: linux/Thread.hh:261
crl::multisense::details::utility::WaitQueue::m_lock
Mutex m_lock
Definition: linux/Thread.hh:442
crl::multisense::details::utility::Mutex::~Mutex
~Mutex()
Definition: linux/Thread.hh:154
crl::multisense::details::utility::WaitQueue
Definition: linux/Thread.hh:373
crl::multisense::details::utility::ScopedLock::ScopedLock
ScopedLock(pthread_mutex_t *lockP)
Definition: linux/Thread.hh:173
Portability.hh
crl::multisense::details::utility::Semaphore::wait_
int32_t wait_(const struct timespec *tsP=NULL)
Definition: linux/Thread.hh:286
crl::multisense::details::utility::WaitQueue::waiters
uint32_t waiters()
Definition: linux/Thread.hh:420
crl::multisense::details::utility::Thread::m_id
pthread_t m_id
Definition: linux/Thread.hh:138
CRL_EXCEPTION
#define CRL_EXCEPTION(fmt,...)
Definition: Exception.hh:85
crl
Definition: Legacy/details/channel.cc:61
crl::multisense::details::utility::Semaphore::Semaphore
Semaphore(std::size_t max=0)
Definition: linux/Thread.hh:273
crl::multisense::details::utility::WaitVar::wait
bool wait(T &data)
Definition: linux/Thread.hh:334
crl::multisense::details::utility::WaitVar::WaitVar
WaitVar()
Definition: linux/Thread.hh:359
crl::multisense::details::utility::Semaphore::timedWait
bool timedWait(const double &timeout)
Definition: linux/Thread.hh:217
crl::multisense::details::utility::WaitQueue::clear
void clear()
Definition: linux/Thread.hh:429
crl::multisense::details::utility::WaitVar::m_lock
Mutex m_lock
Definition: linux/Thread.hh:366
crl::multisense::details::utility::Thread::FLAGS_DETACH
static CRL_CONSTEXPR uint32_t FLAGS_DETACH
Definition: linux/Thread.hh:76
crl::multisense::details::utility::WaitQueue::WaitQueue
WaitQueue(std::size_t max=0)
Definition: linux/Thread.hh:435
crl::multisense::details::utility::Semaphore::waiters
int32_t waiters()
Definition: linux/Thread.hh:269
crl::multisense::details::utility::Semaphore::post
bool post()
Definition: linux/Thread.hh:241
crl::multisense::details::utility::ScopedLock::~ScopedLock
~ScopedLock()
Definition: linux/Thread.hh:181
crl::multisense::details::utility::Thread::m_flags
uint32_t m_flags
Definition: linux/Thread.hh:133
crl::multisense::details::utility::WaitVar::m_val
T m_val
Definition: linux/Thread.hh:361
crl::multisense::details::utility::Mutex::Mutex
Mutex()
Definition: linux/Thread.hh:148
multisense
Definition: factory.cc:39
crl::multisense::details::utility::ScopedLock::lock
void lock(pthread_mutex_t *lockP)
Definition: linux/Thread.hh:187
crl::multisense::details::utility::Semaphore::increment
bool increment()
Definition: linux/Thread.hh:271
crl::multisense::details::utility::WaitVar::m_sem
Semaphore m_sem
Definition: linux/Thread.hh:367
crl::multisense::details::utility::WaitVar::post
void post(const T &data)
Definition: linux/Thread.hh:326
crl::multisense::details::utility::Semaphore
Definition: linux/Thread.hh:199
crl::multisense::details::utility::Semaphore::m_maximum
const std::size_t m_maximum
Definition: linux/Thread.hh:315
crl::multisense::details::utility::Thread::~Thread
~Thread()
Definition: linux/Thread.hh:129
crl::multisense::details::utility::ScopedLock
Definition: linux/Thread.hh:165
crl::multisense::details::utility::ScopedLock::ScopedLock
ScopedLock(Mutex &mutex)
Definition: linux/Thread.hh:169
crl::multisense::details::utility::Thread
Definition: linux/Thread.hh:73
crl::multisense::details::utility::Thread::Thread
Thread(void *(*functionP)(void *), void *contextP=NULL, uint32_t flags=0, int32_t scheduler=-1, int32_t priority=0)
Definition: linux/Thread.hh:78
crl::multisense::details::utility::ScopedLock::m_lockP
pthread_mutex_t * m_lockP
Definition: linux/Thread.hh:190
crl::multisense::details::utility::Mutex::m_mutex
pthread_mutex_t m_mutex
Definition: linux/Thread.hh:156
crl::multisense::details::utility::WaitQueue::m_queue
std::deque< T > m_queue
Definition: linux/Thread.hh:441
crl::multisense::details::utility::Semaphore::m_waiters
aligned_int32_t m_waiters
Definition: linux/Thread.hh:317
crl::multisense::details::utility::WaitQueue::post
void post(const T &data)
Definition: linux/Thread.hh:376
crl::multisense::details::utility::WaitVar::timedWait
bool timedWait(T &data, const double &timeout)
Definition: linux/Thread.hh:343


multisense_lib
Author(s):
autogenerated on Thu Apr 17 2025 02:49:09