21 #ifndef __ctpl_thread_pool_H__ 22 #define __ctpl_thread_pool_H__ 32 #include <boost/lockfree/queue.hpp> 35 #ifndef _ctplThreadPoolLength_ 36 #define _ctplThreadPoolLength_ 100 72 int oldNThreads =
static_cast<int>(this->
threads.size());
73 if (oldNThreads <= nThreads) {
75 this->
flags.resize(nThreads);
77 for (
int i = oldNThreads; i < nThreads; ++i) {
78 this->
flags[i] = std::make_shared<std::atomic<bool>>(
false);
83 for (
int i = oldNThreads - 1; i >= nThreads; --i) {
84 *this->
flags[i] =
true;
89 std::unique_lock<std::mutex> lock(this->
mutex);
90 this->
cv.notify_all();
93 this->
flags.resize(nThreads);
100 std::function<void(int id)> * _f;
101 while (this->
q.pop(_f))
106 std::function<void(int)>
pop() {
107 std::function<void(int id)> * _f =
nullptr;
109 std::unique_ptr<std::function<void(int id)>> func(_f);
111 std::function<void(int)> f;
121 void stop(
bool isWait =
false) {
126 for (
int i = 0, n = this->
size(); i < n; ++i) {
127 *this->
flags[i] =
true;
137 std::unique_lock<std::mutex> lock(this->
mutex);
138 this->
cv.notify_all();
140 for (
int i = 0; i < static_cast<int>(this->
threads.size()); ++i) {
141 if (this->
threads[i]->joinable())
151 template<
typename F,
typename... Rest>
152 auto push(F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {
153 auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
154 std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
157 auto _f =
new std::function<void(int id)>([pck](
int id) {
162 std::unique_lock<std::mutex> lock(this->
mutex);
163 this->
cv.notify_one();
165 return pck->get_future();
188 auto push(F && f) ->std::future<decltype(f(0))> {
189 auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));
191 auto _f =
new std::function<void(int id)>([pck](
int id) {
196 std::unique_lock<std::mutex> lock(this->
mutex);
197 this->
cv.notify_one();
199 return pck->get_future();
212 std::shared_ptr<std::atomic<bool>> flag(this->
flags[i]);
213 auto f = [
this, i, flag]() {
214 std::atomic<bool> & _flag = *flag;
215 std::function<void(int id)> * _f;
216 bool isPop = this->
q.pop(_f);
219 std::unique_ptr<std::function<void(int id)>> func(_f);
225 isPop = this->
q.pop(_f);
229 std::unique_lock<std::mutex> lock(this->
mutex);
231 this->
cv.wait(lock, [
this, &_f, &isPop, &_flag](){ isPop = this->
q.pop(_f);
return isPop || this->
isDone || _flag; });
238 this->
threads[i].reset(
new std::thread(f));
243 std::vector<std::unique_ptr<std::thread>>
threads;
244 std::vector<std::shared_ptr<std::atomic<bool>>>
flags;
245 mutable boost::lockfree::queue<std::function<void(int id)> *>
q;
251 std::condition_variable
cv;
256 #endif // __ctpl_thread_pool_H__ #define _ctplThreadPoolLength_
std::condition_variable cv
thread_pool & operator=(const thread_pool &)
boost::lockfree::queue< std::function< void(int id)> * > q
std::atomic< int > nWaiting
auto push(F &&f, Rest &&... rest) -> std::future< decltype(f(0, rest...))>
std::vector< std::shared_ptr< std::atomic< bool > > > flags
void resize(int nThreads)
std::atomic< bool > isDone
std::vector< std::unique_ptr< std::thread > > threads
thread_pool(int nThreads, int queueSize=_ctplThreadPoolLength_)
std::function< void(int)> pop()
std::thread & get_thread(int i)
void stop(bool isWait=false)
std::atomic< bool > isStop
auto push(F &&f) -> std::future< decltype(f(0))>