ctpl.h
Go to the documentation of this file.
1 
2 /*********************************************************
3  *
4  * Copyright (C) 2014 by Vitaliy Vitsentiy
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  *********************************************************/
19 
20 
21 #ifndef __ctpl_thread_pool_H__
22 #define __ctpl_thread_pool_H__
23 
24 #include <functional>
25 #include <thread>
26 #include <atomic>
27 #include <vector>
28 #include <memory>
29 #include <exception>
30 #include <future>
31 #include <mutex>
32 #include <boost/lockfree/queue.hpp>
33 
34 
35 #ifndef _ctplThreadPoolLength_
36 #define _ctplThreadPoolLength_ 100
37 #endif
38 
39 
40 // thread pool to run user's functors with signature
41 // ret func(int id, other_params)
42 // where id is the index of the thread that runs the functor
43 // ret is some return type
44 
45 
46 namespace ctpl {
47 
48  class thread_pool {
49 
50  public:
51 
53  thread_pool(int nThreads, int queueSize = _ctplThreadPoolLength_) : q(queueSize) { this->init(); this->resize(nThreads); }
54 
55  // the destructor waits for all the functions in the queue to be finished
57  this->stop(true);
58  }
59 
60  // get the number of running threads in the pool
61  int size() { return static_cast<int>(this->threads.size()); }
62 
63  // number of idle threads
64  int n_idle() { return this->nWaiting; }
65  std::thread & get_thread(int i) { return *this->threads[i]; }
66 
67  // change the number of threads in the pool
68  // should be called from one thread, otherwise be careful to not interleave, also with this->stop()
69  // nThreads must be >= 0
70  void resize(int nThreads) {
71  if (!this->isStop && !this->isDone) {
72  int oldNThreads = static_cast<int>(this->threads.size());
73  if (oldNThreads <= nThreads) { // if the number of threads is increased
74  this->threads.resize(nThreads);
75  this->flags.resize(nThreads);
76 
77  for (int i = oldNThreads; i < nThreads; ++i) {
78  this->flags[i] = std::make_shared<std::atomic<bool>>(false);
79  this->set_thread(i);
80  }
81  }
82  else { // the number of threads is decreased
83  for (int i = oldNThreads - 1; i >= nThreads; --i) {
84  *this->flags[i] = true; // this thread will finish
85  this->threads[i]->detach();
86  }
87  {
88  // stop the detached threads that were waiting
89  std::unique_lock<std::mutex> lock(this->mutex);
90  this->cv.notify_all();
91  }
92  this->threads.resize(nThreads); // safe to delete because the threads are detached
93  this->flags.resize(nThreads); // safe to delete because the threads have copies of shared_ptr of the flags, not originals
94  }
95  }
96  }
97 
98  // empty the queue
99  void clear_queue() {
100  std::function<void(int id)> * _f;
101  while (this->q.pop(_f))
102  delete _f; // empty the queue
103  }
104 
105  // pops a functional wraper to the original function
106  std::function<void(int)> pop() {
107  std::function<void(int id)> * _f = nullptr;
108  this->q.pop(_f);
109  std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
110 
111  std::function<void(int)> f;
112  if (_f)
113  f = *_f;
114  return f;
115  }
116 
117 
118  // wait for all computing threads to finish and stop all threads
119  // may be called asyncronously to not pause the calling thread while waiting
120  // if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions
121  void stop(bool isWait = false) {
122  if (!isWait) {
123  if (this->isStop)
124  return;
125  this->isStop = true;
126  for (int i = 0, n = this->size(); i < n; ++i) {
127  *this->flags[i] = true; // command the threads to stop
128  }
129  this->clear_queue(); // empty the queue
130  }
131  else {
132  if (this->isDone || this->isStop)
133  return;
134  this->isDone = true; // give the waiting threads a command to finish
135  }
136  {
137  std::unique_lock<std::mutex> lock(this->mutex);
138  this->cv.notify_all(); // stop all waiting threads
139  }
140  for (int i = 0; i < static_cast<int>(this->threads.size()); ++i) { // wait for the computing threads to finish
141  if (this->threads[i]->joinable())
142  this->threads[i]->join();
143  }
144  // if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads
145  // therefore delete them here
146  this->clear_queue();
147  this->threads.clear();
148  this->flags.clear();
149  }
150 
151  template<typename F, typename... Rest>
152  auto push(F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {
153  auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
154  std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
155  );
156 
157  auto _f = new std::function<void(int id)>([pck](int id) {
158  (*pck)(id);
159  });
160  this->q.push(_f);
161 
162  std::unique_lock<std::mutex> lock(this->mutex);
163  this->cv.notify_one();
164 
165  return pck->get_future();
166  }
167 
168  // template<typename F, typename.. Rest>
169  // auto pushReturnId(F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {
170  // auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
171  // std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
172  // );
173 
174  // auto _f = new std::function<void(int id)>([pck](int id) {
175  // (*pck)(id);
176  // });
177  // this->q.push(_f);
178 
179  // std::unique_lock<std::mutex> lock(this->mutex);
180  // this->cv.notify_one();
181 
182  // return pck->get_future();
183  // }
184 
185  // run the user's function that excepts argument int - id of the running thread. returned value is templatized
186  // operator returns std::future, where the user can get the result and rethrow the catched exceptins
187  template<typename F>
188  auto push(F && f) ->std::future<decltype(f(0))> {
189  auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));
190 
191  auto _f = new std::function<void(int id)>([pck](int id) {
192  (*pck)(id);
193  });
194  this->q.push(_f);
195 
196  std::unique_lock<std::mutex> lock(this->mutex);
197  this->cv.notify_one();
198 
199  return pck->get_future();
200  }
201 
202 
203  private:
204 
205  // deleted
206  thread_pool(const thread_pool &);// = delete;
207  thread_pool(thread_pool &&);// = delete;
208  thread_pool & operator=(const thread_pool &);// = delete;
209  thread_pool & operator=(thread_pool &&);// = delete;
210 
211  void set_thread(int i) {
212  std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag
213  auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() {
214  std::atomic<bool> & _flag = *flag;
215  std::function<void(int id)> * _f;
216  bool isPop = this->q.pop(_f);
217  while (true) {
218  while (isPop) { // if there is anything in the queue
219  std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
220  (*_f)(i);
221 
222  if (_flag)
223  return; // the thread is wanted to stop, return even if the queue is not empty yet
224  else
225  isPop = this->q.pop(_f);
226  }
227 
228  // the queue is empty here, wait for the next command
229  std::unique_lock<std::mutex> lock(this->mutex);
230  ++this->nWaiting;
231  this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; });
232  --this->nWaiting;
233 
234  if (!isPop)
235  return; // if the queue is empty and this->isDone == true or *flag then return
236  }
237  };
238  this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique()
239  }
240 
241  void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; }
242 
243  std::vector<std::unique_ptr<std::thread>> threads;
244  std::vector<std::shared_ptr<std::atomic<bool>>> flags;
245  mutable boost::lockfree::queue<std::function<void(int id)> *> q;
246  std::atomic<bool> isDone;
247  std::atomic<bool> isStop;
248  std::atomic<int> nWaiting; // how many threads are waiting
249 
250  std::mutex mutex;
251  std::condition_variable cv;
252  };
253 
254 }
255 
256 #endif // __ctpl_thread_pool_H__
#define _ctplThreadPoolLength_
Definition: ctpl.h:36
int n_idle()
Definition: ctpl.h:64
std::condition_variable cv
Definition: ctpl.h:251
thread_pool & operator=(const thread_pool &)
boost::lockfree::queue< std::function< void(int id)> * > q
Definition: ctpl.h:245
Definition: ctpl.h:46
std::atomic< int > nWaiting
Definition: ctpl.h:248
auto push(F &&f, Rest &&... rest) -> std::future< decltype(f(0, rest...))>
Definition: ctpl.h:152
void set_thread(int i)
Definition: ctpl.h:211
std::vector< std::shared_ptr< std::atomic< bool > > > flags
Definition: ctpl.h:244
void resize(int nThreads)
Definition: ctpl.h:70
void clear_queue()
Definition: ctpl.h:99
std::atomic< bool > isDone
Definition: ctpl.h:246
std::vector< std::unique_ptr< std::thread > > threads
Definition: ctpl.h:243
thread_pool(int nThreads, int queueSize=_ctplThreadPoolLength_)
Definition: ctpl.h:53
std::mutex mutex
Definition: ctpl.h:250
void init()
Definition: ctpl.h:241
std::function< void(int)> pop()
Definition: ctpl.h:106
std::thread & get_thread(int i)
Definition: ctpl.h:65
void stop(bool isWait=false)
Definition: ctpl.h:121
int size()
Definition: ctpl.h:61
std::atomic< bool > isStop
Definition: ctpl.h:247
auto push(F &&f) -> std::future< decltype(f(0))>
Definition: ctpl.h:188


lvr2
Author(s): Thomas Wiemann , Sebastian Pütz , Alexander Mock , Lars Kiesow , Lukas Kalbertodt , Tristan Igelbrink , Johan M. von Behren , Dominik Feldschnieders , Alexander Löhr
autogenerated on Mon Feb 28 2022 22:46:06