Go to the documentation of this file.00001
00041 #ifndef CRL_MULTISENSE_THREAD_HH
00042 #define CRL_MULTISENSE_THREAD_HH
00043
00044 #ifndef WIN32_LEAN_AND_MEAN
00045 #define WIN32_LEAN_AND_MEAN 1
00046 #endif
00047 #include <windows.h>
00048
00049 #include <stdint.h>
00050 #include <errno.h>
00051 #include <string.h>
00052
00053 #include <vector>
00054 #include <deque>
00055
00056 #include "details/utility/Portability.hh"
00057
00058 #include "../Exception.hh"
00059
00060 namespace crl {
00061 namespace multisense {
00062 namespace details {
00063 namespace utility {
00064
00065
00066
00067
00068 class ScopedLock;
00069
00070
00071
00072
00073 class Thread {
00074 public:
00075
00076 static CRL_CONSTEXPR uint32_t FLAGS_DETACH = (1 << 0);
00077
00078 Thread(LPTHREAD_START_ROUTINE functionP,
00079 void *contextP=NULL,
00080 uint32_t flags=0,
00081 int32_t scheduler=-1,
00082 int32_t priority=0) : m_flags(flags) {
00083
00084
00085
00086
00087 if (-1 != scheduler) {
00088
00089
00090
00091
00092 if (scheduler != 0 )
00093 CRL_EXCEPTION("This platform only supports SCHED_OTHER");
00094
00095 if (priority != 0)
00096 CRL_EXCEPTION("Priority can not be set at this time.");
00097 }
00098
00099
00100
00101
00102 m_threadHandle = CreateThread (NULL, 0, functionP, contextP, 0, &m_threadId);
00103 if (m_threadHandle == NULL)
00104 CRL_EXCEPTION("CreateThread() failed: %d", GetLastError());
00105
00106
00107
00108
00109 if (FLAGS_DETACH & m_flags)
00110 {
00111 CloseHandle (m_threadHandle);
00112 }
00113 };
00114
00115 ~Thread() {
00116 if (!(m_flags & FLAGS_DETACH) &&
00117 0 != WaitForSingleObject(m_threadHandle, INFINITE))
00118 CRL_DEBUG("WaitForSingleObject() failed: %d\n", GetLastError());
00119 };
00120
00121 private:
00122
00123 uint32_t m_flags;
00124 DWORD m_threadId;
00125 HANDLE m_threadHandle;
00126 };
00127
00128
00129
00130
00131 class Mutex {
00132 public:
00133 friend class ScopedLock;
00134
00135 Mutex() {
00136 InitializeCriticalSection(&m_mutex);
00137 }
00138
00139 ~Mutex() {
00140 DeleteCriticalSection(&m_mutex);
00141 };
00142
00143 private:
00144 CRITICAL_SECTION m_mutex;
00145 };
00146
00147
00148
00149
00150 class ScopedLock
00151 {
00152 public:
00153
00154 ScopedLock(Mutex& mutex) {
00155 this->lock(&mutex.m_mutex);
00156 };
00157
00158 ScopedLock(CRITICAL_SECTION *lockP) {
00159 this->lock(lockP);
00160 };
00161
00162 ScopedLock(CRITICAL_SECTION& lock) {
00163 this->lock(&lock);
00164 };
00165
00166 ~ScopedLock() {
00167 LeaveCriticalSection(m_lockP);
00168 };
00169
00170 private:
00171
00172 void lock(CRITICAL_SECTION *lockP) {
00173 m_lockP = lockP;
00174 EnterCriticalSection(m_lockP);
00175 };
00176
00177 CRITICAL_SECTION *m_lockP;
00178 };
00179
00180
00181
00182
00183
00184 class Semaphore {
00185 public:
00186
00187
00188
00189
00190
00191
00192 bool wait() {
00193 do {
00194 if (0 == wait_(INFINITE))
00195 return true;
00196 } while (1);
00197 };
00198
00199
00200
00201
00202 bool timedWait(const double& timeout) {
00203
00204 if (timeout < 0.0)
00205 CRL_EXCEPTION("invalid timeout: %f", timeout);
00206
00207 do {
00208 int32_t ret = wait_((DWORD)(timeout * 1000));
00209
00210 if (0 == ret)
00211 return true;
00212 else if (ETIMEDOUT == ret)
00213 return false;
00214
00215 } while (1);
00216 };
00217
00218
00219
00220
00221
00222 bool post() {
00223
00224 return ReleaseSemaphore(m_handle, 1, NULL) != FALSE;
00225
00226 };
00227
00228
00229
00230
00231
00232
00233 bool clear() {
00234 while(WaitForSingleObject (m_handle, 0) == WAIT_OBJECT_0)
00235 {
00236 }
00237 return true;
00238 };
00239
00240 int32_t waiters () { return m_waiters; };
00241 bool decrement() { return wait(); };
00242 bool increment() { return post(); };
00243
00244 Semaphore(std::size_t max=0) :
00245 m_waiters(0)
00246 {
00247 m_handle = CreateSemaphore (NULL, 0, (max == 0 || max > LONG_MAX) ? LONG_MAX : max, NULL);
00248 if (m_handle == NULL)
00249 CRL_EXCEPTION ("CreateSemaphore() failed: %d\n", GetLastError());
00250 }
00251
00252 ~Semaphore()
00253 {
00254 if (m_handle != NULL)
00255 CloseHandle (m_handle);
00256 }
00257
00258 private:
00259
00260
00261
00262
00263
00264 inline int32_t wait_(DWORD ts=INFINITE) {
00265 InterlockedIncrement (&m_waiters);
00266 const int32_t ret = WaitForSingleObject (m_handle, ts);
00267 InterlockedDecrement (&m_waiters);
00268
00269 if (ret == WAIT_OBJECT_0)
00270 return 0;
00271 else if (ret == WAIT_TIMEOUT)
00272 return ETIMEDOUT;
00273 else
00274 return EAGAIN;
00275 };
00276
00277 HANDLE m_handle;
00278 LONG m_waiters;
00279 };
00280
00281
00282
00283
00284 template<class T> class WaitVar {
00285 public:
00286
00287 void post(const T& data) {
00288 {
00289 ScopedLock lock(m_lock);
00290 m_val = data;
00291 }
00292 m_sem.post();
00293 };
00294
00295 bool wait(T& data) {
00296 m_sem.wait();
00297 {
00298 ScopedLock lock(m_lock);
00299 data = m_val;
00300 }
00301 return true;
00302 };
00303
00304 bool timedWait(T& data,
00305 const double& timeout) {
00306
00307 if (false == m_sem.timedWait(timeout))
00308 return false;
00309 {
00310 ScopedLock lock(m_lock);
00311 data = m_val;
00312 }
00313 return true;
00314 }
00315
00316
00317
00318
00319
00320 WaitVar() : m_val(),
00321 m_lock(),
00322 m_sem(1) {};
00323
00324 private:
00325
00326 T m_val;
00327 Mutex m_lock;
00328 Semaphore m_sem;
00329 };
00330
00331
00332
00333
00334 template <class T> class WaitQueue {
00335 public:
00336
00337 void post(const T& data) {
00338 bool postSem=true;
00339 {
00340 ScopedLock lock(m_lock);
00341
00342
00343
00344
00345 if (m_maximum > 0 &&
00346 m_maximum == m_queue.size()) {
00347
00348
00349
00350
00351
00352 m_queue.pop_front();
00353 postSem = false;
00354 }
00355
00356 m_queue.push_back(data);
00357 }
00358 if (postSem)
00359 m_sem.post();
00360 };
00361
00362 void kick() {
00363 m_sem.post();
00364 };
00365
00366 bool wait(T& data) {
00367 m_sem.wait();
00368 {
00369 ScopedLock lock(m_lock);
00370
00371 if (0 == m_queue.size())
00372 return false;
00373 else {
00374 data = m_queue.front();
00375 m_queue.pop_front();
00376 return true;
00377 }
00378 }
00379 }
00380
00381 uint32_t waiters() {
00382 return m_sem.waiters();
00383 };
00384
00385 uint32_t size() {
00386 ScopedLock lock(m_lock);
00387 return m_queue.size();
00388 }
00389
00390 void clear() {
00391 ScopedLock lock(m_lock);
00392 m_queue.clear();
00393 while(false == m_sem.clear());
00394 }
00395
00396 WaitQueue(std::size_t max=0) :
00397 m_maximum(max) {};
00398
00399 private:
00400
00401 const std::size_t m_maximum;
00402 std::deque<T> m_queue;
00403 Mutex m_lock;
00404 Semaphore m_sem;
00405 };
00406
00407 }}}}
00408
00409 #endif