linux/Thread.hh
Go to the documentation of this file.
1 
41 #ifndef CRL_MULTISENSE_THREAD_HH
42 #define CRL_MULTISENSE_THREAD_HH
43 
44 #include <unistd.h>
45 #include <stdint.h>
46 #include <pthread.h>
47 #include <sched.h>
48 #include <errno.h>
49 #include <string.h>
50 #include <linux/futex.h>
51 #include <unistd.h>
52 #include <sys/syscall.h>
53 
54 #include <vector>
55 #include <deque>
56 
58 
59 #include "../Exception.hh"
60 
61 namespace crl {
62 namespace multisense {
63 namespace details {
64 namespace utility {
65 
66 //
67 // Forward declarations.
68 
69 class ScopedLock;
70 
71 //
72 // A simple class to wrap pthread creation and joining
73 
74 class Thread {
75 public:
76 
77  static CRL_CONSTEXPR uint32_t FLAGS_DETACH = (1 << 0);
78 
79  Thread(void *(*functionP)(void *),
80  void *contextP=NULL,
81  uint32_t flags=0,
82  int32_t scheduler=-1,
83  int32_t priority=0) : m_flags(flags) {
84 
85  pthread_attr_t tattr;
86  pthread_attr_init(&tattr);
87 
88  //
89  // -1 means the user wants default scheduling behavior
90 
91  if (-1 != scheduler) {
92  struct sched_param sattr = {0};
93 
94  //
95  // Set our scheduling policy
96 
97  if (0 != pthread_attr_setschedpolicy(&tattr, scheduler))
98  CRL_EXCEPTION("pthread_attr_setschedpolicy(scheduler=%d) failed: %s",
99  scheduler, strerror(errno));
100  //
101  // Set our scheduling parameters (just priority)
102 
103  sattr.sched_priority = priority;
104  if (0 != pthread_attr_setschedparam(&tattr, &sattr))
105  CRL_EXCEPTION("pthread_attr_setschedparam(pri=%d) failed: %s",
106  priority, strerror(errno));
107  //
108  // We must set EXPLICIT_SCHED so the parent's scheduler is not
109  // automatically inherited
110 
111  if (0 != pthread_attr_setinheritsched(&tattr, PTHREAD_EXPLICIT_SCHED))
112  CRL_EXCEPTION("pthread_attr_setinheritsched(explicit) failed: %s",
113  strerror(errno));
114  }
115 
116  //
117  // Create detached, if asked to do so
118 
119  if (FLAGS_DETACH & m_flags &&
120  0 != pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED))
121  CRL_EXCEPTION("pthread_attr_setdetachstate() failed: %s", strerror(errno));
122 
123  //
124  // Finally, create the thread
125 
126  if (0 != pthread_create(&m_id, &tattr, functionP, contextP))
127  CRL_EXCEPTION("pthread_create() failed: %s", strerror(errno));
128  };
129 
131  if (!(m_flags & FLAGS_DETACH) &&
132  0 != pthread_join(m_id, NULL))
133  CRL_DEBUG("pthread_join() failed: %s\n", strerror(errno));
134  };
135 
136 private:
137 
138  uint32_t m_flags;
139  pthread_t m_id;
140 };
141 
142 //
143 // A simple mutex class
144 
145 class Mutex {
146 public:
147  friend class ScopedLock;
148 
149  Mutex() : m_mutex() {
150  if (0 != pthread_mutex_init(&m_mutex, NULL))
151  CRL_EXCEPTION("pthread_mutex_init() failed: %s",
152  strerror(errno));
153  }
154 
155  ~Mutex() {
156  pthread_mutex_destroy(&m_mutex);
157  };
158 
159 private:
160  pthread_mutex_t m_mutex;
161 };
162 
163 //
164 // A simple scoped lock class
165 
167 {
168 public:
169 
170  ScopedLock(Mutex& mutex) {
171  this->lock(&mutex.m_mutex);
172  };
173 
174  ScopedLock(pthread_mutex_t *lockP) {
175  this->lock(lockP);
176  };
177 
178  ScopedLock(pthread_mutex_t& lock) {
179  this->lock(&lock);
180  };
181 
183  pthread_mutex_unlock(m_lockP);
184  };
185 
186 private:
187 
188  void lock(pthread_mutex_t *lockP) {
189  m_lockP = lockP;
190  pthread_mutex_lock(m_lockP);
191  };
192 
193  pthread_mutex_t *m_lockP;
194 };
195 
196 // A futex-based semaphore.
197 //
198 // This implementation does not work across processes.
199 
200 class Semaphore {
201 public:
202 
203  //
204  // Wait for a post (decrement). If thread contention,
205  // we may wake up, but be unable to snatch
206  // the bait.. hence the while loop.
207 
208  bool wait() {
209  do {
210  if (0 == wait_())
211  return true;
212  } while (1);
213  };
214 
215  //
216  // Wait for a post, retrying until timeout
217 
218  bool timedWait(const double& timeout) {
219 
220  if (timeout < 0.0)
221  CRL_EXCEPTION("invalid timeout: %f", timeout);
222 
223  struct timespec ts;
224  ts.tv_sec = timeout;
225  ts.tv_nsec = (timeout - ts.tv_sec) * 1e9;
226 
227  do {
228  int32_t ret = wait_(&ts);
229 
230  if (0 == ret)
231  return true;
232  else if (ETIMEDOUT == ret)
233  return false;
234 
235  } while (1);
236  };
237 
238  //
239  // Post to the semaphore (increment.) Here we
240  // signal the futex to wake up any waiters.
241 
242  bool post() {
243 
244  //
245  // Limit the posts, if asked to do so
246 
247  if (m_maximum > 0 && m_avail >= static_cast<int>(m_maximum))
248  return false;
249 
250  const int32_t nval = __sync_add_and_fetch(&m_avail, 1);
251  if (m_waiters > 0)
252  syscall(__NR_futex, &m_avail, FUTEX_WAKE, nval, NULL, 0, 0);
253 
254  return true;
255  };
256 
257  //
258  // Decrement the semaphore to zero in one-shot.. may
259  // fail with thread contention, returns true if
260  // successful
261 
262  bool clear() {
263  int32_t val = m_avail;
264  if (val > 0)
265  return __sync_bool_compare_and_swap(&m_avail, val, 0);
266  return true;
267  };
268 
269  int32_t count () { return m_avail; };
270  int32_t waiters () { return m_waiters; };
271  bool decrement() { return wait(); };
272  bool increment() { return post(); };
273 
274  Semaphore(std::size_t max=0) :
275  m_maximum(max),
276  m_avail(0),
277  m_waiters(0) {};
278 
280 
281 private:
282 
283  //
284  // This actually does the synchronized decrement if possible, and goes
285  // to sleep on the futex if not.
286 
287  inline int32_t wait_(const struct timespec *tsP=NULL) {
288 
289  //
290  // Can we decrement the requested amount? If so, return success
291 
292  const int32_t val = m_avail;
293  if (val >= 1 && __sync_bool_compare_and_swap(&m_avail, val, val - 1))
294  return 0;
295 
296  //
297  // We must go to sleep until someone increments. Also keep track of
298  // how many threads are waiting on this futex.
299 
300  __sync_fetch_and_add(&m_waiters, 1);
301  const int32_t ret = syscall(__NR_futex, &m_avail, FUTEX_WAIT, val, tsP, 0, 0);
302  __sync_fetch_and_sub(&m_waiters, 1);
303 
304  //
305  // If we just woke up on the futex, then return EAGAIN, so we
306  // can come back in here and attempt a synchronized decrement again.
307 
308  if (ETIMEDOUT == ret || -1 == ret) // hmmm.. timeouts are returning -1
309  return ETIMEDOUT;
310  else
311  return EAGAIN;
312  };
313 
314  typedef int32_t aligned_int32_t __attribute__((aligned (4))); // unnecessary ?
315 
316  const std::size_t m_maximum;
317  aligned_int32_t m_avail;
318  aligned_int32_t m_waiters;
319 };
320 
321 //
322 // A templatized variable signaler
323 
324 template<class T> class WaitVar {
325 public:
326 
327  void post(const T& data) {
328  {
329  ScopedLock lock(m_lock);
330  m_val = data;
331  }
332  m_sem.post();
333  };
334 
335  bool wait(T& data) {
336  m_sem.wait();
337  {
338  ScopedLock lock(m_lock);
339  data = m_val;
340  }
341  return true;
342  };
343 
344  bool timedWait(T& data,
345  const double& timeout) {
346 
347  if (false == m_sem.timedWait(timeout))
348  return false;
349  {
350  ScopedLock lock(m_lock);
351  data = m_val;
352  }
353  return true;
354  }
355 
356  //
357  // Use a semaphore with max value of 1. The WaitVar will
358  // either be in a signaled state, or not.
359 
360  WaitVar() : m_val(),
361  m_lock(),
362  m_sem(1) {};
363 
364 private:
365 
366  T m_val;
369 };
370 
371 //
372 // A templatized wait queue
373 
374 template <class T> class WaitQueue {
375 public:
376 
377  void post(const T& data) {
378  bool postSem=true;
379  {
380  ScopedLock lock(m_lock);
381 
382  //
383  // Limit deque size, if requested
384 
385  if (m_maximum > 0 &&
386  m_maximum == m_queue.size()) {
387 
388  //
389  // If at max entries, we will pop_front the oldest,
390  // push_back the newest, and leave the semaphore alone
391 
392  m_queue.pop_front();
393  postSem = false;
394  }
395 
396  m_queue.push_back(data);
397  }
398  if (postSem)
399  m_sem.post();
400  };
401 
402  void kick() {
403  m_sem.post();
404  };
405 
406  bool wait(T& data) {
407  m_sem.wait();
408  {
409  ScopedLock lock(m_lock);
410 
411  if (0 == m_queue.size())
412  return false;
413  else {
414  data = m_queue.front();
415  m_queue.pop_front();
416  return true;
417  }
418  }
419  }
420 
421  uint32_t waiters() {
422  return m_sem.waiters();
423  };
424 
425  uint32_t size() {
426  ScopedLock lock(m_lock);
427  return m_queue.size();
428  }
429 
430  void clear() {
431  ScopedLock lock(m_lock);
432  m_queue.clear();
433  while(false == m_sem.clear());
434  }
435 
436  WaitQueue(std::size_t max=0) :
437  m_maximum(max) {};
438 
439 private:
440 
441  const std::size_t m_maximum;
442  std::deque<T> m_queue;
445 };
446 
447 }}}} // namespaces
448 
449 #endif /* #ifndef CRL_MULTISENSE_THREAD_HH */
#define CRL_EXCEPTION(fmt,...)
Definition: Exception.hh:71
bool timedWait(T &data, const double &timeout)
int32_t wait_(const struct timespec *tsP=NULL)
static CRL_CONSTEXPR uint32_t FLAGS_DETACH
Definition: linux/Thread.hh:77
Definition: channel.cc:56
Thread(void *(*functionP)(void *), void *contextP=NULL, uint32_t flags=0, int32_t scheduler=-1, int32_t priority=0)
Definition: linux/Thread.hh:79
#define CRL_DEBUG(fmt,...)
Definition: Exception.hh:83
int32_t aligned_int32_t __attribute__((aligned(4)))
#define CRL_CONSTEXPR
Definition: Portability.hh:51


multisense_lib
Author(s):
autogenerated on Sun Mar 14 2021 02:34:50