macos/Thread.hh
Go to the documentation of this file.
1 
37 #ifndef CRL_MULTISENSE_THREAD_HH
38 #define CRL_MULTISENSE_THREAD_HH
39 
40 #include <stdint.h>
41 #include <errno.h>
42 #include <string.h>
43 #include <pthread.h>
44 
45 #include <mach/mach_init.h>
46 #include <mach/task.h>
47 #include <mach/semaphore.h>
48 
49 #include <vector>
50 #include <deque>
51 
53 
54 #include "../Exception.hh"
55 
56 namespace crl {
57 namespace multisense {
58 namespace details {
59 namespace utility {
60 
61 // #define sem_t semaphore_t
62 // #define sem_init(SEM,UNUSED,VALUE) semaphore_create(current_task(), (SEM), SYNC_POLICY_FIFO, (VALUE))
63 // #define sem_destroy(SEM) semaphore_destroy(current_task(), *(SEM))
64 // #define sem_wait(SEM) semaphore_wait(*(SEM))
65 // #define sem_post(SEM) semaphore_signal(*(SEM))
66 
67 //
68 // Forward declarations.
69 
70 class ScopedLock;
71 
72 //
73 // A simple class to wrap pthread creation and joining
74 
75 class Thread {
76 public:
77 
78  static CRL_CONSTEXPR uint32_t FLAGS_DETACH = (1 << 0);
79 
80  Thread(void *(*functionP)(void *),
81  void *contextP=NULL,
82  uint32_t flags=0,
83  int32_t scheduler=-1,
84  int32_t priority=0) : m_flags(flags) {
85 
86  pthread_attr_t tattr;
87  pthread_attr_init(&tattr);
88 
89  //
90  // -1 means the user wants default scheduling behavior
91 
92  if (-1 != scheduler) {
93  struct sched_param sattr = {0};
94 
95  //
96  // Set our scheduling policy
97 
98  if (0 != pthread_attr_setschedpolicy(&tattr, scheduler))
99  CRL_EXCEPTION("pthread_attr_setschedpolicy(scheduler=%d) failed: %s",
100  scheduler, strerror(errno));
101  //
102  // Set our scheduling parameters (just priority)
103 
104  sattr.sched_priority = priority;
105  if (0 != pthread_attr_setschedparam(&tattr, &sattr))
106  CRL_EXCEPTION("pthread_attr_setschedparam(pri=%d) failed: %s",
107  priority, strerror(errno));
108  //
109  // We must set EXPLICIT_SCHED so the parent's scheduler is not
110  // automatically inherited
111 
112  if (0 != pthread_attr_setinheritsched(&tattr, PTHREAD_EXPLICIT_SCHED))
113  CRL_EXCEPTION("pthread_attr_setinheritsched(explicit) failed: %s",
114  strerror(errno));
115  }
116 
117  //
118  // Create detached, if asked to do so
119 
120  if (FLAGS_DETACH & m_flags &&
121  0 != pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED))
122  CRL_EXCEPTION("pthread_attr_setdetachstate() failed: %s", strerror(errno));
123 
124  //
125  // Finally, create the thread
126 
127  if (0 != pthread_create(&m_id, &tattr, functionP, contextP))
128  CRL_EXCEPTION("pthread_create() failed: %s", strerror(errno));
129  };
130 
132  if (!(m_flags & FLAGS_DETACH) &&
133  0 != pthread_join(m_id, NULL))
134  CRL_DEBUG("pthread_join() failed: %s\n", strerror(errno));
135  };
136 
137 private:
138 
139  uint32_t m_flags;
140  pthread_t m_id;
141 };
142 
143 //
144 // A simple mutex class
145 
146 class Mutex {
147 public:
148  friend class ScopedLock;
149 
150  Mutex() : m_mutex() {
151  if (0 != pthread_mutex_init(&m_mutex, NULL))
152  CRL_EXCEPTION("pthread_mutex_init() failed: %s",
153  strerror(errno));
154  }
155 
156  ~Mutex() {
157  pthread_mutex_destroy(&m_mutex);
158  };
159 
160 private:
161  pthread_mutex_t m_mutex;
162 };
163 
164 //
165 // A simple scoped lock class
166 
167 class ScopedLock
168 {
169 public:
170 
171  ScopedLock(Mutex& mutex) {
172  this->lock(&mutex.m_mutex);
173  };
174 
175  ScopedLock(pthread_mutex_t *lockP) {
176  this->lock(lockP);
177  };
178 
179  ScopedLock(pthread_mutex_t& lock) {
180  this->lock(&lock);
181  };
182 
184  pthread_mutex_unlock(m_lockP);
185  };
186 
187 private:
188 
189  void lock(pthread_mutex_t *lockP) {
190  m_lockP = lockP;
191  pthread_mutex_lock(m_lockP);
192  };
193 
194  pthread_mutex_t *m_lockP;
195 };
196 
197 // A futex-based semaphore.
198 //
199 // This implementation does not work across processes.
200 
201 class Semaphore {
202 public:
203 
204  //
205  // Wait for a post (decrement). If thread contention,
206  // we may wake up, but be unable to snatch
207  // the bait.. hence the while loop.
208 
209  bool wait() {
210  do {
211  if (0 == wait_())
212  return true;
213  } while (1);
214  };
215 
216  //
217  // Wait for a post, retrying until timeout
218 
219  bool timedWait(const double& timeout) {
220 
221  if (timeout < 0.0)
222  CRL_EXCEPTION("invalid timeout: %f", timeout);
223 
224  struct timespec ts;
225  ts.tv_sec = timeout;
226  ts.tv_nsec = (timeout - ts.tv_sec) * 1e9;
227 
228  do {
229  int32_t ret = wait_(&ts);
230 
231  if (0 == ret)
232  return true;
233  else if (ETIMEDOUT == ret)
234  return false;
235 
236  } while (1);
237  };
238 
239  //
240  // Post to the semaphore (increment.) Here we
241  // signal the futex to wake up any waiters.
242 
243  bool post() {
244 
245 
246  // Limit the posts, if asked to do so
247 
248  if (m_maximum > 0 && m_avail >= static_cast<int>(m_maximum))
249  return false;
250 
251  // const int32_t nval = __sync_add_and_fetch(&m_avail, 1);
252  __sync_add_and_fetch(&m_avail, 1);
253 
254  if (m_waiters > 0) {
255  // syscall(__NR_futex, &m_avail, FUTEX_WAKE, nval, NULL, 0, 0);
256  // semaphore_signal(m_sem);
257  semaphore_signal_all(m_sem);
258  }
259 
260  return true;
261  };
262 
263  //
264  // Decrement the semaphore to zero in one-shot.. may
265  // fail with thread contention, returns true if
266  // successful
267 
268  bool clear() {
269  int32_t val = m_avail;
270  if (val > 0)
271  return __sync_bool_compare_and_swap(&m_avail, val, 0);
272  return true;
273  };
274 
275  int32_t count () { return m_avail; };
276  int32_t waiters () { return m_waiters; };
277  bool decrement() { return wait(); };
278  bool increment() { return post(); };
279 
280  Semaphore(std::size_t max=0) :
281  m_maximum(max),
282  m_avail(0),
283  m_waiters(0)
284  {
285  semaphore_create(mach_task_self(), &m_sem, SYNC_POLICY_FIFO, 1);
286  };
287 
289  semaphore_destroy(mach_task_self(), m_sem);
290  };
291 
292 private:
293 
294  //
295  // This actually does the synchronized decrement if possible, and goes
296  // to sleep on the futex if not.
297 
298  inline int32_t wait_(const struct timespec *tsP=NULL) {
299 
300  //
301  // Can we decrement the requested amount? If so, return success
302 
303  const int32_t val = m_avail;
304  if (val >= 1 && __sync_bool_compare_and_swap(&m_avail, val, val - 1))
305  return 0;
306 
307  //
308  // We must go to sleep until someone increments. Also keep track of
309  // how many threads are waiting on this futex.
310 
311  __sync_fetch_and_add(&m_waiters, 1);
312  // const int32_t ret = syscall(__NR_futex, &m_avail, FUTEX_WAIT, val, tsP, 0, 0);
313  const int32_t ret = semaphore_wait(m_sem);
314  __sync_fetch_and_sub(&m_waiters, 1);
315 
316  //
317  // If we just woke up on the futex, then return EAGAIN, so we
318  // can come back in here and attempt a synchronized decrement again.
319 
320  if (ETIMEDOUT == ret || -1 == ret) // hmmm.. timeouts are returning -1
321  return ETIMEDOUT;
322  else
323  return EAGAIN;
324  };
325 
326  typedef int32_t aligned_int32_t __attribute__((aligned (4))); // unnecessary ?
327 
328  const std::size_t m_maximum;
329  aligned_int32_t m_avail;
330  aligned_int32_t m_waiters;
331  semaphore_t m_sem;
332 };
333 
334 //
335 // A templatized variable signaler
336 
337 template<class T> class WaitVar {
338 public:
339 
340  void post(const T& data) {
341  {
342  ScopedLock lock(m_lock);
343  m_val = data;
344  }
345  m_sem.post();
346  };
347 
348  bool wait(T& data) {
349  m_sem.wait();
350  {
351  ScopedLock lock(m_lock);
352  data = m_val;
353  }
354  return true;
355  };
356 
357  bool timedWait(T& data,
358  const double& timeout) {
359 
360  if (false == m_sem.timedWait(timeout))
361  return false;
362  {
363  ScopedLock lock(m_lock);
364  data = m_val;
365  }
366  return true;
367  }
368 
369  //
370  // Use a semaphore with max value of 1. The WaitVar will
371  // either be in a signaled state, or not.
372 
373  WaitVar() : m_val(),
374  m_lock(),
375  m_sem(1) {};
376 
377 private:
378 
379  T m_val;
380  Mutex m_lock;
381  Semaphore m_sem;
382 };
383 
384 //
385 // A templatized wait queue
386 
387 template <class T> class WaitQueue {
388 public:
389 
390  void post(const T& data) {
391  bool postSem=true;
392  {
393  ScopedLock lock(m_lock);
394 
395  //
396  // Limit deque size, if requested
397 
398  if (m_maximum > 0 &&
399  m_maximum == m_queue.size()) {
400 
401  //
402  // If at max entries, we will pop_front the oldest,
403  // push_back the newest, and leave the semaphore alone
404 
405  m_queue.pop_front();
406  postSem = false;
407  }
408 
409  m_queue.push_back(data);
410  }
411  if (postSem)
412  m_sem.post();
413  };
414 
415  void kick() {
416  m_sem.post();
417  };
418 
419  bool wait(T& data) {
420  m_sem.wait();
421  {
422  ScopedLock lock(m_lock);
423 
424  if (0 == m_queue.size())
425  return false;
426  else {
427  data = m_queue.front();
428  m_queue.pop_front();
429  return true;
430  }
431  }
432  }
433 
434  uint32_t waiters() {
435  return m_sem.waiters();
436  };
437 
438  uint32_t size() {
439  ScopedLock lock(m_lock);
440  return m_queue.size();
441  }
442 
443  void clear() {
444  ScopedLock lock(m_lock);
445  m_queue.clear();
446  while(false == m_sem.clear());
447  }
448 
449  WaitQueue(std::size_t max=0) :
450  m_maximum(max) {};
451 
452 private:
453 
454  const std::size_t m_maximum;
455  std::deque<T> m_queue;
456  Mutex m_lock;
457  Semaphore m_sem;
458 };
459 
460 }}}} // namespaces
461 
462 #endif /* #ifndef CRL_MULTISENSE_THREAD_HH */
#define CRL_EXCEPTION(fmt,...)
Definition: Exception.hh:71
bool timedWait(T &data, const double &timeout)
int32_t wait_(const struct timespec *tsP=NULL)
static CRL_CONSTEXPR uint32_t FLAGS_DETACH
Definition: linux/Thread.hh:77
Definition: channel.cc:56
Thread(void *(*functionP)(void *), void *contextP=NULL, uint32_t flags=0, int32_t scheduler=-1, int32_t priority=0)
Definition: macos/Thread.hh:80
#define CRL_DEBUG(fmt,...)
Definition: Exception.hh:83
int32_t aligned_int32_t __attribute__((aligned(4)))
#define CRL_CONSTEXPR
Definition: Portability.hh:51


multisense_lib
Author(s):
autogenerated on Sun Mar 14 2021 02:34:50