Go to the documentation of this file.00001
00022 #ifndef THREADPOOL_POOL_CORE_HPP_INCLUDED
00023 #define THREADPOOL_POOL_CORE_HPP_INCLUDED
00024
00025
00026
00027
00028 #include "locking_ptr.hpp"
00029 #include "worker_thread.hpp"
00030
00031 #include "../task_adaptors.hpp"
00032
00033 #include <boost/thread.hpp>
00034 #include <boost/thread/exceptions.hpp>
00035 #include <boost/thread/mutex.hpp>
00036 #include <boost/thread/condition.hpp>
00037 #include <boost/smart_ptr.hpp>
00038 #include <boost/bind.hpp>
00039 #include <boost/static_assert.hpp>
00040 #include <boost/type_traits.hpp>
00041
00042 #include <vector>
00043
00044
00046 namespace boost { namespace threadpool { namespace detail
00047 {
00048
00067 template <
00068 typename Task,
00069
00070 template <typename> class SchedulingPolicy,
00071 template <typename> class SizePolicy,
00072 template <typename> class SizePolicyController,
00073 template <typename> class ShutdownPolicy
00074 >
00075 class pool_core
00076 : public enable_shared_from_this< pool_core<Task, SchedulingPolicy, SizePolicy, SizePolicyController, ShutdownPolicy > >
00077 , private noncopyable
00078 {
00079
00080 public:
00081 typedef Task task_type;
00082 typedef SchedulingPolicy<task_type> scheduler_type;
00083 typedef pool_core<Task,
00084 SchedulingPolicy,
00085 SizePolicy,
00086 SizePolicyController,
00087 ShutdownPolicy > pool_type;
00088 typedef SizePolicy<pool_type> size_policy_type;
00089
00090
00091 typedef SizePolicyController<pool_type> size_controller_type;
00092
00093
00094 typedef ShutdownPolicy<pool_type> shutdown_policy_type;
00095
00096 typedef worker_thread<pool_type> worker_type;
00097
00098
00099 BOOST_STATIC_ASSERT(function_traits<task_type()>::arity == 0);
00100
00101
00102 BOOST_STATIC_ASSERT(is_void<typename result_of<task_type()>::type >::value);
00103
00104
00105 private:
00106 friend class worker_thread<pool_type>;
00107
00108 #if defined(__SUNPRO_CC) && (__SUNPRO_CC <= 0x580) // Tested with CC: Sun C++ 5.8 Patch 121018-08 2006/12/06
00109 friend class SizePolicy;
00110 friend class ShutdownPolicy;
00111 #else
00112 friend class SizePolicy<pool_type>;
00113 friend class ShutdownPolicy<pool_type>;
00114 #endif
00115
00116 private:
00117 volatile size_t m_worker_count;
00118 volatile size_t m_target_worker_count;
00119 volatile size_t m_active_worker_count;
00120
00121
00122
00123 private:
00124 scheduler_type m_scheduler;
00125 scoped_ptr<size_policy_type> m_size_policy;
00126
00127 bool m_terminate_all_workers;
00128 std::vector<shared_ptr<worker_type> > m_terminated_workers;
00129
00130 private:
00131 mutable recursive_mutex m_monitor;
00132 mutable condition m_worker_idle_or_terminated_event;
00133 mutable condition m_task_or_terminate_workers_event;
00134
00135 public:
00137 pool_core()
00138 : m_worker_count(0)
00139 , m_target_worker_count(0)
00140 , m_active_worker_count(0)
00141 , m_terminate_all_workers(false)
00142 {
00143 pool_type volatile & self_ref = *this;
00144 m_size_policy.reset(new size_policy_type(self_ref));
00145
00146 m_scheduler.clear();
00147 }
00148
00149
00151 ~pool_core()
00152 {
00153 }
00154
00159 size_controller_type size_controller()
00160 {
00161 return size_controller_type(*m_size_policy, this->shared_from_this());
00162 }
00163
00167 size_t size() const volatile
00168 {
00169 return m_worker_count;
00170 }
00171
00172
00173 void shutdown()
00174 {
00175 ShutdownPolicy<pool_type>::shutdown(*this);
00176 }
00177
00182 bool schedule(task_type const & task) volatile
00183 {
00184 locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
00185
00186 if(lockedThis->m_scheduler.push(task))
00187 {
00188 lockedThis->m_task_or_terminate_workers_event.notify_one();
00189 return true;
00190 }
00191 else
00192 {
00193 return false;
00194 }
00195 }
00196
00197
00201 size_t active() const volatile
00202 {
00203 return m_active_worker_count;
00204 }
00205
00206
00210 size_t pending() const volatile
00211 {
00212 locking_ptr<const pool_type, recursive_mutex> lockedThis(*this, m_monitor);
00213 return lockedThis->m_scheduler.size();
00214 }
00215
00216
00219 void clear() volatile
00220 {
00221 locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
00222 lockedThis->m_scheduler.clear();
00223 }
00224
00225
00230 bool empty() const volatile
00231 {
00232 locking_ptr<const pool_type, recursive_mutex> lockedThis(*this, m_monitor);
00233 return lockedThis->m_scheduler.empty();
00234 }
00235
00236
00241 void wait(size_t const task_threshold = 0) const volatile
00242 {
00243 const pool_type* self = const_cast<const pool_type*>(this);
00244 recursive_mutex::scoped_lock lock(self->m_monitor);
00245
00246 if(0 == task_threshold)
00247 {
00248 while(0 != self->m_active_worker_count || !self->m_scheduler.empty())
00249 {
00250 self->m_worker_idle_or_terminated_event.wait(lock);
00251 }
00252 }
00253 else
00254 {
00255 while(task_threshold < self->m_active_worker_count + self->m_scheduler.size())
00256 {
00257 self->m_worker_idle_or_terminated_event.wait(lock);
00258 }
00259 }
00260 }
00261
00269 bool wait(xtime const & timestamp, size_t const task_threshold = 0) const volatile
00270 {
00271 const pool_type* self = const_cast<const pool_type*>(this);
00272 recursive_mutex::scoped_lock lock(self->m_monitor);
00273
00274 if(0 == task_threshold)
00275 {
00276 while(0 != self->m_active_worker_count || !self->m_scheduler.empty())
00277 {
00278 if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp)) return false;
00279 }
00280 }
00281 else
00282 {
00283 while(task_threshold < self->m_active_worker_count + self->m_scheduler.size())
00284 {
00285 if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp)) return false;
00286 }
00287 }
00288
00289 return true;
00290 }
00291
00292
00293 private:
00294
00295
00296 void terminate_all_workers(bool const wait) volatile
00297 {
00298 pool_type* self = const_cast<pool_type*>(this);
00299 recursive_mutex::scoped_lock lock(self->m_monitor);
00300
00301 self->m_terminate_all_workers = true;
00302
00303 m_target_worker_count = 0;
00304 self->m_task_or_terminate_workers_event.notify_all();
00305
00306 if(wait)
00307 {
00308 while(m_active_worker_count > 0)
00309 {
00310 self->m_worker_idle_or_terminated_event.wait(lock);
00311 }
00312
00313 for(typename std::vector<shared_ptr<worker_type> >::iterator it = self->m_terminated_workers.begin();
00314 it != self->m_terminated_workers.end();
00315 ++it)
00316 {
00317 (*it)->join();
00318 }
00319 self->m_terminated_workers.clear();
00320 }
00321 }
00322
00323
00329 bool resize(size_t const worker_count) volatile
00330 {
00331 locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
00332
00333 if(!m_terminate_all_workers)
00334 {
00335 m_target_worker_count = worker_count;
00336 }
00337 else
00338 {
00339 return false;
00340 }
00341
00342
00343 if(m_worker_count <= m_target_worker_count)
00344 {
00345 while(m_worker_count < m_target_worker_count)
00346 {
00347 try
00348 {
00349 worker_thread<pool_type>::create_and_attach(lockedThis->shared_from_this());
00350 m_worker_count++;
00351 m_active_worker_count++;
00352 }
00353 catch(thread_resource_error)
00354 {
00355 return false;
00356 }
00357 }
00358 }
00359 else
00360 {
00361 lockedThis->m_task_or_terminate_workers_event.notify_all();
00362 }
00363
00364 return true;
00365 }
00366
00367
00368
00369 void worker_died_unexpectedly(shared_ptr<worker_type> worker) volatile
00370 {
00371 locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
00372
00373 m_worker_count--;
00374 m_active_worker_count--;
00375 lockedThis->m_worker_idle_or_terminated_event.notify_all();
00376
00377 if(m_terminate_all_workers)
00378 {
00379 lockedThis->m_terminated_workers.push_back(worker);
00380 }
00381 else
00382 {
00383 lockedThis->m_size_policy->worker_died_unexpectedly(m_worker_count);
00384 }
00385 }
00386
00387 void worker_destructed(shared_ptr<worker_type> worker) volatile
00388 {
00389 locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
00390 m_worker_count--;
00391 m_active_worker_count--;
00392 lockedThis->m_worker_idle_or_terminated_event.notify_all();
00393
00394 if(m_terminate_all_workers)
00395 {
00396 lockedThis->m_terminated_workers.push_back(worker);
00397 }
00398 }
00399
00400
00401 bool execute_task() volatile
00402 {
00403 function0<void> task;
00404
00405 {
00406 pool_type* lockedThis = const_cast<pool_type*>(this);
00407 recursive_mutex::scoped_lock lock(lockedThis->m_monitor);
00408
00409
00410 if(m_worker_count > m_target_worker_count)
00411 {
00412 return false;
00413 }
00414
00415
00416
00417 while(lockedThis->m_scheduler.empty())
00418 {
00419
00420 if(m_worker_count > m_target_worker_count)
00421 {
00422 return false;
00423 }
00424 else
00425 {
00426 m_active_worker_count--;
00427 lockedThis->m_worker_idle_or_terminated_event.notify_all();
00428 lockedThis->m_task_or_terminate_workers_event.wait(lock);
00429 m_active_worker_count++;
00430 }
00431 }
00432
00433 task = lockedThis->m_scheduler.top();
00434 lockedThis->m_scheduler.pop();
00435 }
00436
00437
00438 if(task)
00439 {
00440 task();
00441 }
00442
00443
00444 return true;
00445 }
00446 };
00447
00448
00449
00450
00451 } } }
00452
00453 #endif // THREADPOOL_POOL_CORE_HPP_INCLUDED