timer.h
Go to the documentation of this file.
1 /*
2 # Software License Agreement (MIT License)
3 #
4 # Copyright (c) 2018, UFACTORY, Inc.
5 # All rights reserved.
6 #
7 # Author: Vinman <vinman.wen@ufactory.cc> <vinman.cub@gmail.com>
8 */
9 #ifndef WRAPPER_COMMON_TIMER_H_
10 #define WRAPPER_COMMON_TIMER_H_
11 
12 #include <iostream>
13 #include <functional>
14 #include <chrono>
15 #include <thread>
16 #include <atomic>
17 #include <memory>
18 #include <mutex>
19 #include <condition_variable>
20 #include <vector>
21 #include <queue>
22 
23 class Timer {
24 public:
25  Timer() :expired_(true), try_to_expire_(false) {
26  }
27 
28  Timer(const Timer& t) {
29  expired_ = t.expired_.load();
30  try_to_expire_ = t.try_to_expire_.load();
31  }
32  ~Timer() {
33  Expire();
34  // std::cout << "timer destructed!" << std::endl;
35  }
36 
37  void StartTimer(int interval, std::function<void()> task) {
38  if (expired_ == false) {
39  // std::cout << "timer is currently running, please expire it first..." << std::endl;
40  return;
41  }
42  expired_ = false;
43  std::thread([this, interval, task]() {
44  while (!try_to_expire_) {
45  std::this_thread::sleep_for(std::chrono::milliseconds(interval));
46  task();
47  }
48  // std::cout << "stop task..." << std::endl;
49  {
50  std::lock_guard<std::mutex> locker(mutex_);
51  expired_ = true;
52  expired_cond_.notify_one();
53  }
54  }).detach();
55  }
56 
57  void Expire() {
58  if (expired_) {
59  return;
60  }
61 
62  if (try_to_expire_) {
63  // std::cout << "timer is trying to expire, please wait..." << std::endl;
64  return;
65  }
66  try_to_expire_ = true;
67  {
68  std::unique_lock<std::mutex> locker(mutex_);
69  expired_cond_.wait(locker, [this] {return expired_ == true; });
70  if (expired_ == true) {
71  // std::cout << "timer expired!" << std::endl;
72  try_to_expire_ = false;
73  }
74  }
75  }
76 
77  template<typename callable, class... arguments>
78  void SyncWait(int after, callable&& f, arguments&&... args) {
79 
80  std::function<typename std::result_of<callable(arguments...)>::type()> task
81  (std::bind(std::forward<callable>(f), std::forward<arguments>(args)...));
82  std::this_thread::sleep_for(std::chrono::milliseconds(after));
83  task();
84  }
85  template<typename callable, class... arguments>
86  void AsyncWait(int after, callable&& f, arguments&&... args) {
87  std::function<typename std::result_of<callable(arguments...)>::type()> task
88  (std::bind(std::forward<callable>(f), std::forward<arguments>(args)...));
89 
90  std::thread([after, task]() {
91  std::this_thread::sleep_for(std::chrono::milliseconds(after));
92  task();
93  }).detach();
94  }
95 
96 private:
97  std::atomic<bool> expired_;
98  std::atomic<bool> try_to_expire_;
99  std::mutex mutex_;
100  std::condition_variable expired_cond_;
101 };
102 
103 class ThreadPool {
104 public:
105  ThreadPool(int max_thread_count = 10) : max_thread_count_(max_thread_count), total_thread_count_(0), free_thread_count_(0) {};
107  stop();
108  };
109 
110  static void thread_handle(void *arg) {
111  ThreadPool *pool = (ThreadPool *)arg;
112  pool->_thread_process();
113  };
114 
115  void _thread_process(void) {
116  std::unique_lock<std::mutex> locker(mutex_);
117  total_thread_count_ += 1;
118  free_thread_count_ += 1;
119  // int thread_inx = total_thread_count_;
120  locker.unlock();
121 
122  // std::cout << "callback thread start, thread_index=" << thread_inx << ", thread_id=" << std::this_thread::get_id() << std::endl;
123  while (!stoped_) {
124  std::function<void()> task;
125  locker.lock();
126  task_cond_.wait(locker, [this] {
127  return stoped_ || !task_que_.empty();
128  });
129  if (stoped_ && task_que_.empty()) {
130  locker.unlock();
131  break;
132  }
133  free_thread_count_ -= 1;
134  task = std::move(task_que_.front());
135  task_que_.pop();
136  locker.unlock();
137  task();
138  locker.lock();
139  free_thread_count_ += 1;
140  locker.unlock();
141  }
142  locker.lock();
143  total_thread_count_ -= 1;
144  free_thread_count_ -= 1;
145  locker.unlock();
146  // std::cout << "callback thread finished, thread_index=" << thread_inx << ", thread_id=" << std::this_thread::get_id() << std::endl;
147  };
148 
149  void stop() {
150  stoped_.store(true);
151  task_cond_.notify_all();
152  for (std::thread& thread : pool_) {
153  try {
154  if (thread.joinable()) thread.join();
155  }
156  catch (...) {}
157  }
158  };
159 
160  void set_max_thread_count(int max_thread_count) {
161  max_thread_count_ = max_thread_count;
162  };
163 
164  template<typename callable, class... arguments>
165  void dispatch(callable&& f, arguments&&... args) {
166  std::function<typename std::result_of<callable(arguments...)>::type()> task
167  (std::bind(std::forward<callable>(f), std::forward<arguments>(args)...));
168  {
169  std::lock_guard<std::mutex> lock{ mutex_ };
170  task_que_.emplace([task]() {
171  task();
172  });
173  _thread_check();
174  task_cond_.notify_one();
175  }
176  }
177 
178  template<typename callable, class... arguments>
179  void commit(callable&& f, arguments&&... args) {
180  std::function<typename std::result_of<callable(arguments...)>::type()> task
181  (std::bind(std::forward<callable>(f), std::forward<arguments>(args)...));
182  task();
183  }
184 
185 private:
186  void _thread_check(void) {
187  if (free_thread_count_ == 0 && (max_thread_count_ < 0 || total_thread_count_ < max_thread_count_)) {
188  pool_.emplace_back(std::thread(thread_handle, this));
189  }
190  }
191 
192 private:
193  using Task = std::function<void()>;
194  std::queue<Task> task_que_;
195  std::vector<std::thread> pool_;
196  std::mutex mutex_;
197  std::condition_variable task_cond_;
198 
202  std::atomic<bool> stoped_;
203 };
204 
205 #endif // WRAPPER_COMMON_TIMER_H_
int free_thread_count_
Definition: timer.h:201
std::queue< Task > task_que_
Definition: timer.h:194
void stop()
Definition: timer.h:149
std::vector< std::thread > pool_
Definition: timer.h:195
f
int total_thread_count_
Definition: timer.h:200
~Timer()
Definition: timer.h:32
static void thread_handle(void *arg)
Definition: timer.h:110
std::atomic< bool > expired_
Definition: timer.h:97
std::condition_variable expired_cond_
Definition: timer.h:100
void StartTimer(int interval, std::function< void()> task)
Definition: timer.h:37
~ThreadPool()
Definition: timer.h:106
std::atomic< bool > try_to_expire_
Definition: timer.h:98
Timer(const Timer &t)
Definition: timer.h:28
ThreadPool(int max_thread_count=10)
Definition: timer.h:105
void _thread_process(void)
Definition: timer.h:115
void dispatch(callable &&f, arguments &&...args)
Definition: timer.h:165
void SyncWait(int after, callable &&f, arguments &&...args)
Definition: timer.h:78
int max_thread_count_
Definition: timer.h:199
Timer()
Definition: timer.h:25
std::function< void()> Task
Definition: timer.h:193
std::condition_variable task_cond_
Definition: timer.h:197
void Expire()
Definition: timer.h:57
void set_max_thread_count(int max_thread_count)
Definition: timer.h:160
void commit(callable &&f, arguments &&...args)
Definition: timer.h:179
std::mutex mutex_
Definition: timer.h:196
std::mutex mutex_
Definition: timer.h:99
Definition: timer.h:23
void _thread_check(void)
Definition: timer.h:186
std::atomic< bool > stoped_
Definition: timer.h:202
void AsyncWait(int after, callable &&f, arguments &&...args)
Definition: timer.h:86


xarm_api
Author(s):
autogenerated on Sat May 8 2021 02:51:23