20 #ifndef __ctpl_stl_thread_pool_H__ 21 #define __ctpl_stl_thread_pool_H__ 47 bool push(T
const & value) {
48 std::unique_lock<std::mutex> lock(this->
mutex);
54 std::unique_lock<std::mutex> lock(this->
mutex);
62 std::unique_lock<std::mutex> lock(this->
mutex);
63 return this->
q.empty();
76 thread_pool(
int nThreads) { this->init(); this->resize(nThreads); }
84 int size() {
return static_cast<int>(this->threads.size()); }
87 int n_idle() {
return this->nWaiting; }
88 std::thread &
get_thread(
int i) {
return *this->threads[i]; }
94 if (!this->isStop && !this->isDone) {
95 int oldNThreads =
static_cast<int>(this->threads.size());
96 if (oldNThreads <= nThreads) {
97 this->threads.resize(nThreads);
98 this->flags.resize(nThreads);
100 for (
int i = oldNThreads; i < nThreads; ++i) {
101 this->flags[i] = std::make_shared<std::atomic<bool>>(
false);
106 for (
int i = oldNThreads - 1; i >= nThreads; --i) {
107 *this->flags[i] =
true;
108 this->threads[i]->detach();
112 std::unique_lock<std::mutex> lock(this->
mutex);
113 this->cv.notify_all();
115 this->threads.resize(nThreads);
116 this->flags.resize(nThreads);
123 std::function<void(int id)> * _f;
124 while (this->
q.pop(_f))
129 std::function<void(int)>
pop() {
130 std::function<void(int id)> * _f =
nullptr;
132 std::unique_ptr<std::function<void(int id)>> func(_f);
133 std::function<void(int)> f;
142 void stop(
bool isWait =
false) {
147 for (
int i = 0, n = this->size(); i < n; ++i) {
148 *this->flags[i] =
true;
153 if (this->isDone || this->isStop)
158 std::unique_lock<std::mutex> lock(this->
mutex);
159 this->cv.notify_all();
161 for (
int i = 0; i < static_cast<int>(this->threads.size()); ++i) {
162 if (this->threads[i]->joinable())
163 this->threads[i]->join();
168 this->threads.clear();
172 template<
typename F,
typename... Rest>
173 auto push(F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {
174 auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
175 std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
177 auto _f =
new std::function<void(int id)>([pck](
int id) {
181 std::unique_lock<std::mutex> lock(this->
mutex);
182 this->cv.notify_one();
183 return pck->get_future();
189 auto push(F && f) ->std::future<decltype(f(0))> {
190 auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));
191 auto _f =
new std::function<void(int id)>([pck](
int id) {
195 std::unique_lock<std::mutex> lock(this->
mutex);
196 this->cv.notify_one();
197 return pck->get_future();
210 std::shared_ptr<std::atomic<bool>> flag(this->flags[i]);
211 auto f = [
this, i, flag]() {
212 std::atomic<bool> & _flag = *flag;
213 std::function<void(int id)> * _f;
214 bool isPop = this->
q.pop(_f);
217 std::unique_ptr<std::function<void(int id)>> func(_f);
222 isPop = this->
q.pop(_f);
225 std::unique_lock<std::mutex> lock(this->
mutex);
227 this->cv.wait(lock, [
this, &_f, &isPop, &_flag](){ isPop = this->
q.pop(_f);
return isPop || this->isDone || _flag; });
233 this->threads[i].reset(
new std::thread(f));
236 void init() { this->nWaiting = 0; this->isStop =
false; this->isDone =
false; }
238 std::vector<std::unique_ptr<std::thread>> threads;
239 std::vector<std::shared_ptr<std::atomic<bool>>> flags;
241 std::atomic<bool> isDone;
242 std::atomic<bool> isStop;
243 std::atomic<int> nWaiting;
246 std::condition_variable cv;
251 #endif // __ctpl_stl_thread_pool_H__
bool push(T const &value)
thread_pool(int nThreads)
auto push(F &&f, Rest &&... rest) -> std::future< decltype(f(0, rest...))>
void resize(int nThreads)
detail::Queue< std::function< void(int id)> * > q
std::function< void(int)> pop()
std::thread & get_thread(int i)
void stop(bool isWait=false)
auto push(F &&f) -> std::future< decltype(f(0))>