21 #ifndef __ctpl_thread_pool_H__ 
   22 #define __ctpl_thread_pool_H__ 
   32 #include <boost/lockfree/queue.hpp> 
   35 #ifndef _ctplThreadPoolLength_ 
   36 #define _ctplThreadPoolLength_  100 
   72                 int oldNThreads = 
static_cast<int>(this->
threads.size());
 
   73                 if (oldNThreads <= nThreads) {  
 
   75                     this->
flags.resize(nThreads);
 
   77                     for (
int i = oldNThreads; i < nThreads; ++i) {
 
   78                         this->
flags[i] = std::make_shared<std::atomic<bool>>(
false);
 
   83                     for (
int i = oldNThreads - 1; i >= nThreads; --i) {
 
   84                         *this->
flags[i] = 
true;  
 
   89                         std::unique_lock<std::mutex> lock(this->
mutex);
 
   90                         this->
cv.notify_all();
 
   93                     this->
flags.resize(nThreads);  
 
  100             std::function<void(
int id)> * _f;
 
  101             while (this->
q.pop(_f))
 
  106         std::function<void(
int)> 
pop() {
 
  107             std::function<void(
int id)> * _f = 
nullptr;
 
  109             std::unique_ptr<std::function<void(
int id)>> func(_f);  
 
  111             std::function<void(
int)> f;
 
  121         void stop(
bool isWait = 
false) {
 
  126                 for (
int i = 0, n = this->
size(); i < n; ++i) {
 
  127                     *this->
flags[i] = 
true;  
 
  137                 std::unique_lock<std::mutex> lock(this->
mutex);
 
  138                 this->
cv.notify_all();  
 
  140             for (
int i = 0; i < static_cast<int>(this->
threads.size()); ++i) {  
 
  141                 if (this->
threads[i]->joinable())
 
  151         template<
typename F, 
typename... Rest>
 
  152         auto push(F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {
 
  153             auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
 
  154                 std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
 
  157             auto _f = 
new std::function<void(int id)>([pck](
int id) {
 
  162             std::unique_lock<std::mutex> lock(this->
mutex);
 
  163             this->
cv.notify_one();
 
  165             return pck->get_future();
 
  188         auto push(F && f) ->std::future<decltype(f(0))> {
 
  189             auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));
 
  191             auto _f = 
new std::function<void(int id)>([pck](
int id) {
 
  196             std::unique_lock<std::mutex> lock(this->
mutex);
 
  197             this->
cv.notify_one();
 
  199             return pck->get_future();
 
  212             std::shared_ptr<std::atomic<bool>> flag(this->
flags[i]);  
 
  213             auto f = [
this, i, flag]() {
 
  214                 std::atomic<bool> & _flag = *flag;
 
  215                 std::function<void(
int id)> * _f;
 
  216                 bool isPop = this->
q.pop(_f);
 
  219                         std::unique_ptr<std::function<void(
int id)>> func(_f);  
 
  225                             isPop = this->
q.pop(_f);
 
  229                     std::unique_lock<std::mutex> lock(this->
mutex);
 
  231                     this->
cv.wait(lock, [
this, &_f, &isPop, &_flag](){ isPop = this->
q.pop(_f); 
return isPop || this->
isDone || _flag; });
 
  238             this->
threads[i].reset(
new std::thread(f));  
 
  243         std::vector<std::unique_ptr<std::thread>> 
threads;
 
  244         std::vector<std::shared_ptr<std::atomic<bool>>> 
flags;
 
  245         mutable boost::lockfree::queue<std::function<void(
int id)> *> 
q;
 
  251         std::condition_variable 
cv;
 
  256 #endif // __ctpl_thread_pool_H__