win32/Thread.hh
Go to the documentation of this file.
1 
41 #ifndef CRL_MULTISENSE_THREAD_HH
42 #define CRL_MULTISENSE_THREAD_HH
43 
44 #ifndef WIN32_LEAN_AND_MEAN
45 #define WIN32_LEAN_AND_MEAN 1
46 #endif
47 #include <windows.h>
48 
49 #include <stdint.h>
50 #include <errno.h>
51 #include <string.h>
52 
53 #include <vector>
54 #include <deque>
55 
57 
58 #include "../Exception.hh"
59 
60 namespace crl {
61 namespace multisense {
62 namespace details {
63 namespace utility {
64 
65 //
66 // Forward declarations.
67 
68 class ScopedLock;
69 
70 //
71 // A simple class to wrap pthread creation and joining
72 
73 class Thread {
74 public:
75 
76  static CRL_CONSTEXPR uint32_t FLAGS_DETACH = (1 << 0);
77 
78  Thread(LPTHREAD_START_ROUTINE functionP,
79  void *contextP=NULL,
80  uint32_t flags=0,
81  int32_t scheduler=-1,
82  int32_t priority=0) : m_flags(flags) {
83 
84  //
85  // -1 means the user wants default scheduling behavior
86 
87  if (-1 != scheduler) {
88 
89  //
90  // Set our scheduling policy
91 
92  if (scheduler != 0 /* SCHED_OTHER */)
93  CRL_EXCEPTION("This platform only supports SCHED_OTHER");
94 
95  if (priority != 0)
96  CRL_EXCEPTION("Priority can not be set at this time.");
97  }
98 
99  //
100  // Finally, create the thread
101 
102  m_threadHandle = CreateThread (NULL, 0, functionP, contextP, 0, &m_threadId);
103  if (m_threadHandle == NULL)
104  CRL_EXCEPTION("CreateThread() failed: %d", GetLastError());
105 
106  //
107  // Automatically detach, if asked to do so
108 
109  if (FLAGS_DETACH & m_flags)
110  {
111  CloseHandle (m_threadHandle);
112  }
113  };
114 
116  if (!(m_flags & FLAGS_DETACH) &&
117  0 != WaitForSingleObject(m_threadHandle, INFINITE))
118  CRL_DEBUG("WaitForSingleObject() failed: %d\n", GetLastError());
119  };
120 
121 private:
122 
123  uint32_t m_flags;
124  DWORD m_threadId;
126 };
127 
128 //
129 // A simple mutex class
130 
131 class Mutex {
132 public:
133  friend class ScopedLock;
134 
135  Mutex() {
136  InitializeCriticalSection(&m_mutex);
137  }
138 
139  ~Mutex() {
140  DeleteCriticalSection(&m_mutex);
141  };
142 
143 private:
144  CRITICAL_SECTION m_mutex;
145 };
146 
147 //
148 // A simple scoped lock class
149 
150 class ScopedLock
151 {
152 public:
153 
154  ScopedLock(Mutex& mutex) {
155  this->lock(&mutex.m_mutex);
156  };
157 
158  ScopedLock(CRITICAL_SECTION *lockP) {
159  this->lock(lockP);
160  };
161 
162  ScopedLock(CRITICAL_SECTION& lock) {
163  this->lock(&lock);
164  };
165 
167  LeaveCriticalSection(m_lockP);
168  };
169 
170 private:
171 
172  void lock(CRITICAL_SECTION *lockP) {
173  m_lockP = lockP;
174  EnterCriticalSection(m_lockP);
175  };
176 
177  CRITICAL_SECTION *m_lockP;
178 };
179 
180 // A futex-based semaphore.
181 //
182 // This implementation does not work across processes.
183 
184 class Semaphore {
185 public:
186 
187  //
188  // Wait for a post (decrement). If thread contention,
189  // we may wake up, but be unable to snatch
190  // the bait.. hence the while loop.
191 
192  bool wait() {
193  do {
194  if (0 == wait_(INFINITE))
195  return true;
196  } while (1);
197  };
198 
199  //
200  // Wait for a post, retrying until timeout
201 
202  bool timedWait(const double& timeout) {
203 
204  if (timeout < 0.0)
205  CRL_EXCEPTION("invalid timeout: %f", timeout);
206 
207  do {
208  int32_t ret = wait_((DWORD)(timeout * 1000));
209 
210  if (0 == ret)
211  return true;
212  else if (ETIMEDOUT == ret)
213  return false;
214 
215  } while (1);
216  };
217 
218  //
219  // Post to the semaphore (increment.) Here we
220  // signal the futex to wake up any waiters.
221 
222  bool post() {
223 
224  return ReleaseSemaphore(m_handle, 1, NULL) != FALSE;
225 
226  };
227 
228  //
229  // Decrement the semaphore to zero in one-shot.. may
230  // fail with thread contention, returns true if
231  // successful
232 
233  bool clear() {
234  while(WaitForSingleObject (m_handle, 0) == WAIT_OBJECT_0)
235  {
236  }
237  return true;
238  };
239 
240  int32_t waiters () { return m_waiters; };
241  bool decrement() { return wait(); };
242  bool increment() { return post(); };
243 
244  Semaphore(std::size_t max=0) :
245  m_waiters(0)
246  {
247  m_handle = CreateSemaphore (NULL, 0, (max == 0 || max > LONG_MAX) ? LONG_MAX : max, NULL);
248  if (m_handle == NULL)
249  CRL_EXCEPTION ("CreateSemaphore() failed: %d\n", GetLastError());
250  }
251 
253  {
254  if (m_handle != NULL)
255  CloseHandle (m_handle);
256  }
257 
258 private:
259 
260  //
261  // This actually does the synchronized decrement if possible, and goes
262  // to sleep on the futex if not.
263 
264  inline int32_t wait_(DWORD ts=INFINITE) {
265  InterlockedIncrement (&m_waiters);
266  const int32_t ret = WaitForSingleObject (m_handle, ts);
267  InterlockedDecrement (&m_waiters);
268 
269  if (ret == WAIT_OBJECT_0)
270  return 0;
271  else if (ret == WAIT_TIMEOUT)
272  return ETIMEDOUT;
273  else
274  return EAGAIN;
275  };
276 
277  HANDLE m_handle;
278  LONG m_waiters;
279 };
280 
281 //
282 // A templatized variable signaler
283 
284 template<class T> class WaitVar {
285 public:
286 
287  void post(const T& data) {
288  {
289  ScopedLock lock(m_lock);
290  m_val = data;
291  }
292  m_sem.post();
293  };
294 
295  bool wait(T& data) {
296  m_sem.wait();
297  {
298  ScopedLock lock(m_lock);
299  data = m_val;
300  }
301  return true;
302  };
303 
304  bool timedWait(T& data,
305  const double& timeout) {
306 
307  if (false == m_sem.timedWait(timeout))
308  return false;
309  {
310  ScopedLock lock(m_lock);
311  data = m_val;
312  }
313  return true;
314  }
315 
316  //
317  // Use a semaphore with max value of 1. The WaitVar will
318  // either be in a signaled state, or not.
319 
320  WaitVar() : m_val(),
321  m_lock(),
322  m_sem(1) {};
323 
324 private:
325 
326  T m_val;
327  Mutex m_lock;
328  Semaphore m_sem;
329 };
330 
331 //
332 // A templatized wait queue
333 
334 template <class T> class WaitQueue {
335 public:
336 
337  void post(const T& data) {
338  bool postSem=true;
339  {
340  ScopedLock lock(m_lock);
341 
342  //
343  // Limit deque size, if requested
344 
345  if (m_maximum > 0 &&
346  m_maximum == m_queue.size()) {
347 
348  //
349  // If at max entries, we will pop_front the oldest,
350  // push_back the newest, and leave the semaphore alone
351 
352  m_queue.pop_front();
353  postSem = false;
354  }
355 
356  m_queue.push_back(data);
357  }
358  if (postSem)
359  m_sem.post();
360  };
361 
362  void kick() {
363  m_sem.post();
364  };
365 
366  bool wait(T& data) {
367  m_sem.wait();
368  {
369  ScopedLock lock(m_lock);
370 
371  if (0 == m_queue.size())
372  return false;
373  else {
374  data = m_queue.front();
375  m_queue.pop_front();
376  return true;
377  }
378  }
379  }
380 
381  uint32_t waiters() {
382  return m_sem.waiters();
383  };
384 
385  uint32_t size() {
386  ScopedLock lock(m_lock);
387  return m_queue.size();
388  }
389 
390  void clear() {
391  ScopedLock lock(m_lock);
392  m_queue.clear();
393  while(false == m_sem.clear());
394  }
395 
396  WaitQueue(std::size_t max=0) :
397  m_maximum(max) {};
398 
399 private:
400 
401  const std::size_t m_maximum;
402  std::deque<T> m_queue;
403  Mutex m_lock;
404  Semaphore m_sem;
405 };
406 
407 }}}} // namespaces
408 
409 #endif /* #ifndef CRL_MULTISENSE_THREAD_HH */
#define CRL_EXCEPTION(fmt,...)
Definition: Exception.hh:71
bool timedWait(T &data, const double &timeout)
Thread(LPTHREAD_START_ROUTINE functionP, void *contextP=NULL, uint32_t flags=0, int32_t scheduler=-1, int32_t priority=0)
Definition: win32/Thread.hh:78
static CRL_CONSTEXPR uint32_t FLAGS_DETACH
Definition: linux/Thread.hh:77
Definition: channel.cc:56
#define CRL_DEBUG(fmt,...)
Definition: Exception.hh:77
#define CRL_CONSTEXPR
Definition: Portability.hh:38


multisense_lib
Author(s):
autogenerated on Sat Apr 6 2019 02:16:46