win32/coil/Condition.h
Go to the documentation of this file.
1 // -*- C++ -*-
20 #ifndef COIL_CONDITION_H
21 #define COIL_CONDITION_H
22 
23 #include <windows.h>
24 #include <algorithm>
25 #include <coil/Mutex.h>
26 #include <iostream>
27 
28 namespace coil
29 {
30  typedef struct
31  {
32  // Number of waiting threads.
34 
35  // Serialize access to <waiters_count_>.
37 
38  // Semaphore used to queue up threads waiting for the condition to
39  // become signaled.
40  HANDLE sema_;
41 
42  // An auto-reset event used by the broadcast/signal thread to wait
43  // for all the waiting thread(s) to wake up and be released from the
44  // semaphore.
45  HANDLE waiters_done_;
46 
47  // Keeps track of whether we were broadcasting or signaling. This
48  // allows us to optimize the code if we're just signaling.
50 
52 
53 
55  {
56  cv->waiters_count_ = 0;
57  cv->was_broadcast_ = 0;
58  cv->sema_ = ::CreateSemaphore (NULL, // no security
59  0, // initially 0
60  0x7fffffff, // max count
61  NULL); // unnamed
62  cv->waiters_done_ = ::CreateEvent (NULL, // no security
63  FALSE, // auto-reset
64  FALSE, // non-signaled initially
65  NULL); // unnamed
66  return 0;
67  }
68 
82  template <class M>
83  class Condition
84  {
85  public:
86 
102  Condition(M& mutex)
103  : m_mutex(mutex)
104  {
106  }
107 
124  {
125  }
126 
142  inline void signal()
143  {
145  }
146 
162  inline void broadcast()
163  {
165  }
166 
186  bool wait()
187  {
188  return 0 == pthread_cond_wait(&m_cond, &m_mutex, INFINITE);
189  }
190 
216  bool wait(long second, long nano_second = 0)
217  {
218  DWORD milli_second = second * 1000 + nano_second / 1000000;
219  return 0 == pthread_cond_wait(&m_cond, &m_mutex, milli_second);
220  }
221 
222  private:
223 
243  int pthread_cond_wait (coil::pthread_cond_t *cv, coil::Mutex *external_mutex, DWORD aMilliSecond)
244  {
245  DWORD result;
246 
247  // Avoid race conditions.
249  cv->waiters_count_++;
251 
252  // This call atomically releases the mutex and waits on the
253  // semaphore until <pthread_cond_signal> or <pthread_cond_broadcast>
254  // are called by another thread.
255 // std::cout << "Before SignalObjectAndWait [wait with time(" << milliSecond << ")]" << std::endl << std::flush ;
256  result = SignalObjectAndWait (external_mutex->mutex_, cv->sema_, aMilliSecond, FALSE);
257 
258 // char * p;
259 // switch (result) {
260 // case WAIT_ABANDONED :
261 // p = "Abandoned";
262 // break;
263 // case WAIT_OBJECT_0 :
264 // p = "Signaled";
265 // break;
266 // case WAIT_TIMEOUT :
267 // p = "Timeout";
268 // break;
269 // default :
270 // p = "Other !?";
271 // break;
272 // }
273 // std::cout << "After SignalObjectAndWait [wait with time(" << milliSecond << ")]"
274 // << " result(" << result << ":" << p << ")"
275 // << std::endl << std::flush ;
276 
277  // Reacquire lock to avoid race conditions.
279 
280  // We're no longer waiting...
281  cv->waiters_count_--;
282 
283  // Check to see if we're the last waiter after <pthread_cond_broadcast>.
284  int last_waiter = cv->was_broadcast_ && cv->waiters_count_ == 0;
285 
287 
288  // If we're the last waiter thread during this particular broadcast
289  // then let all the other threads proceed.
290  if (last_waiter) {
291  // This call atomically signals the <waiters_done_> event and
292  // waits until it can acquire the <external_mutex>. This is
293  // required to ensure fairness.
294  DWORD result = SignalObjectAndWait (cv->waiters_done_, external_mutex->mutex_, INFINITE, FALSE);
295 // std::cout << "result " << result << std::endl;
296  } else {
297  // Always regain the external mutex since that's the guarantee we
298  // give to our callers.
299  ::WaitForSingleObject (external_mutex->mutex_, 0);
300  }
301  return result;
302  }
303 
324  {
326  int have_waiters = cv->waiters_count_ > 0;
328 
329  // If there aren't any waiters, then this is a no-op.
330  if (have_waiters)
331 // std::cout << "Before ReleaseSemaphore(1)" << std::endl << std::flush ;
332  ReleaseSemaphore (cv->sema_, 1, 0);
333 // std::cout << "After ReleaseSemaphore(1)" << std::endl << std::flush ;
334  return 0;
335  }
336 
357  {
358  // This is needed to ensure that <waiters_count_> and <was_broadcast_> are
359  // consistent relative to each other.
361  int have_waiters = 0;
362 
363  if (cv->waiters_count_ > 0) {
364  // We are broadcasting, even if there is just one waiter...
365  // Record that we are broadcasting, which helps optimize
366  // <pthread_cond_wait> for the non-broadcast case.
367  cv->was_broadcast_ = 1;
368  have_waiters = 1;
369  }
370 
371  if (have_waiters) {
372  // Wake up all the waiters atomically.
373 // std::cout << "Before ReleaseSemaphore(" << cv->waiters_count_ << ")" << std::endl << std::flush ;
374  ReleaseSemaphore (cv->sema_, cv->waiters_count_, 0);
375 // std::cout << "After ReleaseSemaphore(" << cv->waiters_count_ << ")" << std::endl << std::flush ;
376 
378 
379  // Wait for all the awakened threads to acquire the counting
380  // semaphore.
381  WaitForSingleObject (cv->waiters_done_, INFINITE);
382  // This assignment is okay, even without the <waiters_count_lock_> held
383  // because no other waiter threads can wake up to access it.
384  cv->was_broadcast_ = 0;
385  }
386  else
388  return 0;
389  }
390 
391  Condition(const Mutex&);
392  Condition& operator=(const Mutex &);
394  M& m_mutex;
395  }; // class Condition
396 
397 }; // namespace Coil
398 #endif // COIL_CONDITION_H
int pthread_cond_broadcast(pthread_cond_t *cv)
Resume of all the thread practice.
int pthread_cond_wait(coil::pthread_cond_t *cv, coil::Mutex *external_mutex, DWORD aMilliSecond)
Wait of the thread practice.
ACE_thread_mutex_t mutex_
~Condition()
Destructor.
Mutex class.
Condition(M &mutex)
Constructor.
Condition & operator=(const Mutex &)
void broadcast()
Resume of all the thread practice.
Condition(Mutex &mutex)
int pthread_cond_signal(pthread_cond_t *cv)
Resume of the thread practice.
static int pthread_cond_init(pthread_cond_t *cv)
coil::pthread_cond_t m_cond
bool wait(long second, long nano_second=0)
Thread practice wait of set time.
void signal()
Resume of the thread practice.
Common Object Interface Layer.
bool wait()
Wait of the thread practice.


openrtm_aist
Author(s): Noriaki Ando
autogenerated on Mon Feb 28 2022 23:00:42