Condition.h
Go to the documentation of this file.
00001 // -*- C++ -*-
00020 #ifndef COIL_CONDITION_H
00021 #define COIL_CONDITION_H
00022 
00023 #include <windows.h>
00024 #include <algorithm>
00025 #include <coil/Mutex.h>
00026 #include <iostream>
00027 
00028 namespace coil
00029 {
00030   typedef struct
00031   {
00032     // Number of waiting threads.
00033     int waiters_count_;
00034     
00035     // Serialize access to <waiters_count_>.
00036     coil::Mutex waiters_count_lock_;
00037 
00038     // Semaphore used to queue up threads waiting for the condition to
00039     // become signaled. 
00040     HANDLE sema_;
00041     
00042     // An auto-reset event used by the broadcast/signal thread to wait
00043     // for all the waiting thread(s) to wake up and be released from the
00044     // semaphore. 
00045     HANDLE waiters_done_;
00046     
00047     // Keeps track of whether we were broadcasting or signaling.  This
00048     // allows us to optimize the code if we're just signaling.
00049     size_t was_broadcast_;
00050 
00051   } pthread_cond_t;
00052 
00053   
00054   static int pthread_cond_init (pthread_cond_t *cv)
00055   {
00056     cv->waiters_count_ = 0;
00057     cv->was_broadcast_ = 0;
00058         cv->sema_ = ::CreateSemaphore (NULL,       // no security
00059                                   0,          // initially 0
00060                                   0x7fffffff, // max count
00061                                   NULL);      // unnamed 
00062         cv->waiters_done_ = ::CreateEvent (NULL,  // no security
00063                                      FALSE, // auto-reset
00064                                      FALSE, // non-signaled initially
00065                                      NULL); // unnamed
00066         return 0;
00067   }
00068 
00082   template <class M>
00083   class Condition
00084   {
00085   public:
00086 
00102     Condition(M& mutex)
00103       : m_mutex(mutex)
00104     {
00105       pthread_cond_init(&m_cond);
00106     }
00107 
00123     ~Condition()
00124     {
00125     }
00126 
00142     inline void signal()
00143     {
00144       pthread_cond_signal(&m_cond);
00145     }
00146 
00162     inline void broadcast()
00163     {
00164       pthread_cond_broadcast(&m_cond);
00165     }
00166 
00186     bool wait()
00187     {
00188           return 0 == pthread_cond_wait(&m_cond, &m_mutex, INFINITE);
00189         }
00190 
00216     bool wait(long second, long nano_second = 0)
00217     {
00218       DWORD milli_second = second * 1000 + nano_second / 1000000;
00219       return 0 == pthread_cond_wait(&m_cond, &m_mutex, milli_second);
00220     }
00221 
00222   private:
00223 
00243   int pthread_cond_wait (coil::pthread_cond_t *cv, coil::Mutex *external_mutex, DWORD aMilliSecond)
00244   {
00245     DWORD result;
00246 
00247     // Avoid race conditions.
00248     cv->waiters_count_lock_.lock();
00249     cv->waiters_count_++;
00250     cv->waiters_count_lock_.unlock();
00251     
00252     // This call atomically releases the mutex and waits on the
00253     // semaphore until <pthread_cond_signal> or <pthread_cond_broadcast>
00254     // are called by another thread.
00255 //    std::cout << "Before SignalObjectAndWait [wait with time(" << milliSecond << ")]" << std::endl << std::flush ; 
00256     result = SignalObjectAndWait (external_mutex->mutex_, cv->sema_, aMilliSecond, FALSE);
00257 
00258 //    char * p;
00259 //    switch (result) {
00260 //    case WAIT_ABANDONED :
00261 //        p = "Abandoned";
00262 //        break;
00263 //    case WAIT_OBJECT_0 :
00264 //        p = "Signaled";
00265 //        break;
00266 //    case WAIT_TIMEOUT :
00267 //        p = "Timeout";
00268 //        break;
00269 //    default :
00270 //        p = "Other !?";
00271 //        break;
00272 //    }
00273 //      std::cout << "After SignalObjectAndWait [wait with time(" << milliSecond << ")]" 
00274 //        << " result(" << result << ":" << p << ")"
00275 //        << std::endl << std::flush ; 
00276 
00277     // Reacquire lock to avoid race conditions.
00278     cv->waiters_count_lock_.lock();
00279     
00280     // We're no longer waiting...
00281     cv->waiters_count_--;
00282     
00283     // Check to see if we're the last waiter after <pthread_cond_broadcast>.
00284     int last_waiter = cv->was_broadcast_ && cv->waiters_count_ == 0;
00285 
00286     cv->waiters_count_lock_.unlock();
00287     
00288     // If we're the last waiter thread during this particular broadcast
00289     // then let all the other threads proceed.
00290     if (last_waiter) {
00291       // This call atomically signals the <waiters_done_> event and
00292       // waits until it can acquire the <external_mutex>.  This is
00293       // required to ensure fairness.
00294       DWORD result = SignalObjectAndWait (cv->waiters_done_, external_mutex->mutex_, INFINITE, FALSE);
00295 //      std::cout << "result " << result << std::endl;
00296     } else {
00297       // Always regain the external mutex since that's the guarantee we
00298       // give to our callers. 
00299       ::WaitForSingleObject (external_mutex->mutex_, 0);
00300     }
00301   return result;
00302   }
00303 
00323   int pthread_cond_signal (pthread_cond_t *cv)
00324   {
00325     cv->waiters_count_lock_.lock();
00326     int have_waiters = cv->waiters_count_ > 0;
00327     cv->waiters_count_lock_.unlock();
00328     
00329     // If there aren't any waiters, then this is a no-op.  
00330     if (have_waiters)
00331 //    std::cout << "Before ReleaseSemaphore(1)" << std::endl << std::flush ; 
00332       ReleaseSemaphore (cv->sema_, 1, 0);
00333 //    std::cout << "After ReleaseSemaphore(1)" << std::endl << std::flush ; 
00334         return 0;
00335   }
00336 
00356   int pthread_cond_broadcast (pthread_cond_t *cv)
00357   {
00358     // This is needed to ensure that <waiters_count_> and <was_broadcast_> are
00359     // consistent relative to each other.
00360     cv->waiters_count_lock_.lock();
00361     int have_waiters = 0;
00362     
00363     if (cv->waiters_count_ > 0) {
00364       // We are broadcasting, even if there is just one waiter...
00365       // Record that we are broadcasting, which helps optimize
00366       // <pthread_cond_wait> for the non-broadcast case.
00367       cv->was_broadcast_ = 1;
00368       have_waiters = 1;
00369     }
00370     
00371     if (have_waiters) {
00372       // Wake up all the waiters atomically.
00373 //    std::cout << "Before ReleaseSemaphore(" << cv->waiters_count_ << ")" << std::endl << std::flush ; 
00374       ReleaseSemaphore (cv->sema_, cv->waiters_count_, 0);
00375 //    std::cout << "After ReleaseSemaphore(" << cv->waiters_count_ << ")" << std::endl << std::flush ; 
00376       
00377     cv->waiters_count_lock_.unlock();
00378 
00379       // Wait for all the awakened threads to acquire the counting
00380       // semaphore. 
00381       WaitForSingleObject (cv->waiters_done_, INFINITE);
00382       // This assignment is okay, even without the <waiters_count_lock_> held 
00383       // because no other waiter threads can wake up to access it.
00384       cv->was_broadcast_ = 0;
00385     }
00386     else
00387     cv->waiters_count_lock_.unlock();
00388         return 0;
00389   }
00390 
00391     Condition(const Mutex&);
00392     Condition& operator=(const Mutex &);
00393     coil::pthread_cond_t m_cond;
00394     M& m_mutex;
00395   };  // class Condition
00396 
00397 };    // namespace Coil
00398 #endif // COIL_CONDITION_H


openrtm_aist
Author(s): Noriaki Ando
autogenerated on Thu Aug 27 2015 14:16:37