pool_core.hpp
Go to the documentation of this file.
00001 
00022 #ifndef THREADPOOL_POOL_CORE_HPP_INCLUDED
00023 #define THREADPOOL_POOL_CORE_HPP_INCLUDED
00024 
00025 
00026 
00027 
00028 #include "locking_ptr.hpp"
00029 #include "worker_thread.hpp"
00030 
00031 #include "../task_adaptors.hpp"
00032 
00033 #include <boost/thread.hpp>
00034 #include <boost/thread/exceptions.hpp>
00035 #include <boost/thread/mutex.hpp>
00036 #include <boost/thread/condition.hpp>
00037 #include <boost/smart_ptr.hpp>
00038 #include <boost/bind.hpp>
00039 #include <boost/static_assert.hpp>
00040 #include <boost/type_traits.hpp>
00041 
00042 #include <vector>
00043 
00044 
00046 namespace boost { namespace threadpool { namespace detail 
00047 {
00048 
00067   template <
00068     typename Task, 
00069 
00070     template <typename> class SchedulingPolicy,
00071     template <typename> class SizePolicy,
00072     template <typename> class SizePolicyController,
00073     template <typename> class ShutdownPolicy
00074   > 
00075   class pool_core
00076   : public enable_shared_from_this< pool_core<Task, SchedulingPolicy, SizePolicy, SizePolicyController, ShutdownPolicy > > 
00077   , private noncopyable
00078   {
00079 
00080   public: // Type definitions
00081     typedef Task task_type;                                 
00082     typedef SchedulingPolicy<task_type> scheduler_type;     
00083     typedef pool_core<Task, 
00084                       SchedulingPolicy, 
00085                       SizePolicy,
00086                       SizePolicyController,
00087                       ShutdownPolicy > pool_type;           
00088     typedef SizePolicy<pool_type> size_policy_type;         
00089     //typedef typename size_policy_type::size_controller size_controller_type;
00090 
00091     typedef SizePolicyController<pool_type> size_controller_type;
00092 
00093 //    typedef SizePolicy<pool_type>::size_controller size_controller_type;
00094     typedef ShutdownPolicy<pool_type> shutdown_policy_type;
00095 
00096     typedef worker_thread<pool_type> worker_type;
00097 
00098     // The task is required to be a nullary function.
00099     BOOST_STATIC_ASSERT(function_traits<task_type()>::arity == 0);
00100 
00101     // The task function's result type is required to be void.
00102     BOOST_STATIC_ASSERT(is_void<typename result_of<task_type()>::type >::value);
00103 
00104 
00105   private:  // Friends 
00106     friend class worker_thread<pool_type>;
00107 
00108 #if defined(__SUNPRO_CC) && (__SUNPRO_CC <= 0x580)  // Tested with CC: Sun C++ 5.8 Patch 121018-08 2006/12/06
00109    friend class SizePolicy;
00110    friend class ShutdownPolicy;
00111 #else
00112    friend class SizePolicy<pool_type>;
00113    friend class ShutdownPolicy<pool_type>;
00114 #endif
00115 
00116   private: // The following members may be accessed by _multiple_ threads at the same time:
00117     volatile size_t m_worker_count;     
00118     volatile size_t m_target_worker_count;      
00119     volatile size_t m_active_worker_count;
00120       
00121 
00122 
00123   private: // The following members are accessed only by _one_ thread at the same time:
00124     scheduler_type  m_scheduler;
00125     scoped_ptr<size_policy_type> m_size_policy; // is never null
00126     
00127     bool  m_terminate_all_workers;                                                              // Indicates if termination of all workers was triggered.
00128     std::vector<shared_ptr<worker_type> > m_terminated_workers; // List of workers which are terminated but not fully destructed.
00129     
00130   private: // The following members are implemented thread-safe:
00131     mutable recursive_mutex  m_monitor;
00132     mutable condition m_worker_idle_or_terminated_event;        // A worker is idle or was terminated.
00133     mutable condition m_task_or_terminate_workers_event;  // Task is available OR total worker count should be reduced.
00134 
00135   public:
00137     pool_core()
00138       : m_worker_count(0) 
00139       , m_target_worker_count(0)
00140       , m_active_worker_count(0)
00141       , m_terminate_all_workers(false)
00142     {
00143       pool_type volatile & self_ref = *this;
00144       m_size_policy.reset(new size_policy_type(self_ref));
00145 
00146       m_scheduler.clear();
00147     }
00148 
00149 
00151     ~pool_core()
00152     {
00153     }
00154 
00159     size_controller_type size_controller()
00160     {
00161       return size_controller_type(*m_size_policy, this->shared_from_this());
00162     }
00163 
00167     size_t size()       const volatile
00168     {
00169       return m_worker_count;
00170     }
00171 
00172 // TODO is only called once
00173     void shutdown()
00174     {
00175       ShutdownPolicy<pool_type>::shutdown(*this);
00176     }
00177 
00182     bool schedule(task_type const & task) volatile
00183     {   
00184       locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor); 
00185       
00186       if(lockedThis->m_scheduler.push(task))
00187       {
00188         lockedThis->m_task_or_terminate_workers_event.notify_one();
00189         return true;
00190       }
00191       else
00192       {
00193         return false;
00194       }
00195     }   
00196 
00197 
00201     size_t active() const volatile
00202     {
00203       return m_active_worker_count;
00204     }
00205 
00206 
00210     size_t pending() const volatile
00211     {
00212       locking_ptr<const pool_type, recursive_mutex> lockedThis(*this, m_monitor);
00213       return lockedThis->m_scheduler.size();
00214     }
00215 
00216 
00219     void clear() volatile
00220     { 
00221       locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
00222       lockedThis->m_scheduler.clear();
00223     }    
00224 
00225 
00230     bool empty() const volatile
00231     {
00232       locking_ptr<const pool_type, recursive_mutex> lockedThis(*this, m_monitor);
00233       return lockedThis->m_scheduler.empty();
00234     }   
00235 
00236 
00241     void wait(size_t const task_threshold = 0) const volatile
00242     {
00243       const pool_type* self = const_cast<const pool_type*>(this);
00244       recursive_mutex::scoped_lock lock(self->m_monitor);
00245 
00246       if(0 == task_threshold)
00247       {
00248         while(0 != self->m_active_worker_count || !self->m_scheduler.empty())
00249         { 
00250           self->m_worker_idle_or_terminated_event.wait(lock);
00251         }
00252       }
00253       else
00254       {
00255         while(task_threshold < self->m_active_worker_count + self->m_scheduler.size())
00256         { 
00257           self->m_worker_idle_or_terminated_event.wait(lock);
00258         }
00259       }
00260     }   
00261 
00269     bool wait(xtime const & timestamp, size_t const task_threshold = 0) const volatile
00270     {
00271       const pool_type* self = const_cast<const pool_type*>(this);
00272       recursive_mutex::scoped_lock lock(self->m_monitor);
00273 
00274       if(0 == task_threshold)
00275       {
00276         while(0 != self->m_active_worker_count || !self->m_scheduler.empty())
00277         { 
00278           if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp)) return false;
00279         }
00280       }
00281       else
00282       {
00283         while(task_threshold < self->m_active_worker_count + self->m_scheduler.size())
00284         { 
00285           if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp)) return false;
00286         }
00287       }
00288 
00289       return true;
00290     }
00291 
00292 
00293   private:      
00294 
00295 
00296     void terminate_all_workers(bool const wait) volatile
00297     {
00298       pool_type* self = const_cast<pool_type*>(this);
00299       recursive_mutex::scoped_lock lock(self->m_monitor);
00300 
00301       self->m_terminate_all_workers = true;
00302 
00303       m_target_worker_count = 0;
00304       self->m_task_or_terminate_workers_event.notify_all();
00305 
00306       if(wait)
00307       {
00308         while(m_active_worker_count > 0)
00309         {
00310           self->m_worker_idle_or_terminated_event.wait(lock);
00311         }
00312 
00313         for(typename std::vector<shared_ptr<worker_type> >::iterator it = self->m_terminated_workers.begin();
00314           it != self->m_terminated_workers.end();
00315           ++it)
00316         {
00317           (*it)->join();
00318         }
00319         self->m_terminated_workers.clear();
00320       }
00321     }
00322 
00323 
00329     bool resize(size_t const worker_count) volatile
00330     {
00331       locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor); 
00332 
00333       if(!m_terminate_all_workers)
00334       {
00335         m_target_worker_count = worker_count;
00336       }
00337       else
00338       { 
00339         return false;
00340       }
00341 
00342 
00343       if(m_worker_count <= m_target_worker_count)
00344       { // increase worker count
00345         while(m_worker_count < m_target_worker_count)
00346         {
00347           try
00348           {
00349             worker_thread<pool_type>::create_and_attach(lockedThis->shared_from_this());
00350             m_worker_count++;
00351             m_active_worker_count++;    
00352           }
00353           catch(thread_resource_error)
00354           {
00355             return false;
00356           }
00357         }
00358       }
00359       else
00360       { // decrease worker count
00361         lockedThis->m_task_or_terminate_workers_event.notify_all();   // TODO: Optimize number of notified workers
00362       }
00363 
00364       return true;
00365     }
00366 
00367 
00368     // worker died with unhandled exception
00369     void worker_died_unexpectedly(shared_ptr<worker_type> worker) volatile
00370     {
00371       locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
00372 
00373       m_worker_count--;
00374       m_active_worker_count--;
00375       lockedThis->m_worker_idle_or_terminated_event.notify_all();       
00376 
00377       if(m_terminate_all_workers)
00378       {
00379         lockedThis->m_terminated_workers.push_back(worker);
00380       }
00381       else
00382       {
00383         lockedThis->m_size_policy->worker_died_unexpectedly(m_worker_count);
00384       }
00385     }
00386 
00387     void worker_destructed(shared_ptr<worker_type> worker) volatile
00388     {
00389       locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
00390       m_worker_count--;
00391       m_active_worker_count--;
00392       lockedThis->m_worker_idle_or_terminated_event.notify_all();       
00393 
00394       if(m_terminate_all_workers)
00395       {
00396         lockedThis->m_terminated_workers.push_back(worker);
00397       }
00398     }
00399 
00400 
00401     bool execute_task() volatile
00402     {
00403       function0<void> task;
00404 
00405       { // fetch task
00406         pool_type* lockedThis = const_cast<pool_type*>(this);
00407         recursive_mutex::scoped_lock lock(lockedThis->m_monitor);
00408 
00409         // decrease number of threads if necessary
00410         if(m_worker_count > m_target_worker_count)
00411         {       
00412           return false; // terminate worker
00413         }
00414 
00415 
00416         // wait for tasks
00417         while(lockedThis->m_scheduler.empty())
00418         {       
00419           // decrease number of workers if necessary
00420           if(m_worker_count > m_target_worker_count)
00421           {     
00422             return false;       // terminate worker
00423           }
00424           else
00425           {
00426             m_active_worker_count--;
00427             lockedThis->m_worker_idle_or_terminated_event.notify_all(); 
00428             lockedThis->m_task_or_terminate_workers_event.wait(lock);
00429             m_active_worker_count++;
00430           }
00431         }
00432 
00433         task = lockedThis->m_scheduler.top();
00434         lockedThis->m_scheduler.pop();
00435       }
00436 
00437       // call task function
00438       if(task)
00439       {
00440         task();
00441       }
00442  
00443       //guard->disable();
00444       return true;
00445     }
00446   };
00447 
00448 
00449 
00450 
00451 } } } // namespace boost::threadpool::detail
00452 
00453 #endif // THREADPOOL_POOL_CORE_HPP_INCLUDED


or_libs
Author(s): Viktor Seib
autogenerated on Tue Jan 7 2014 11:24:03