Go to the documentation of this file.00001 #ifndef TIMERQUEUE_H
00002 #define TIMERQUEUE_H
00003
00004 #include <mutex>
00005 #include <condition_variable>
00006 #include <thread>
00007 #include <queue>
00008 #include <chrono>
00009 #include <assert.h>
00010
00011 namespace BT
00012 {
00013
00014
00015 namespace details
00016 {
00017 class Semaphore
00018 {
00019 public:
00020 Semaphore(unsigned int count = 0) : m_count(count)
00021 {
00022 }
00023
00024 void notify()
00025 {
00026 std::unique_lock<std::mutex> lock(m_mtx);
00027 m_count++;
00028 m_cv.notify_one();
00029 }
00030
00031 void wait()
00032 {
00033 std::unique_lock<std::mutex> lock(m_mtx);
00034 m_cv.wait(lock, [this]() { return m_count > 0; });
00035 m_count--;
00036 }
00037
00038 template <class Clock, class Duration>
00039 bool waitUntil(const std::chrono::time_point<Clock, Duration>& point)
00040 {
00041 std::unique_lock<std::mutex> lock(m_mtx);
00042 if (!m_cv.wait_until(lock, point, [this]() { return m_count > 0; }))
00043 return false;
00044 m_count--;
00045 return true;
00046 }
00047
00048 private:
00049 std::mutex m_mtx;
00050 std::condition_variable m_cv;
00051 unsigned int m_count;
00052 };
00053 }
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065 class TimerQueue
00066 {
00067 public:
00068 TimerQueue()
00069 {
00070 m_th = std::thread([this] { run(); });
00071 }
00072
00073 ~TimerQueue()
00074 {
00075 cancelAll();
00076
00077 add(std::chrono::milliseconds(0), [this](bool) { m_finish = true; });
00078 m_th.join();
00079 }
00080
00082
00083
00084
00085 uint64_t add(std::chrono::milliseconds milliseconds, std::function<void(bool)> handler)
00086 {
00087 WorkItem item;
00088 item.end = Clock::now() + milliseconds;
00089 item.handler = std::move(handler);
00090
00091 std::unique_lock<std::mutex> lk(m_mtx);
00092 uint64_t id = ++m_idcounter;
00093 item.id = id;
00094 m_items.push(std::move(item));
00095 lk.unlock();
00096
00097
00098 m_checkWork.notify();
00099 return id;
00100 }
00101
00103
00104
00105
00106
00107 size_t cancel(uint64_t id)
00108 {
00109
00110
00111
00112
00113
00114 std::unique_lock<std::mutex> lk(m_mtx);
00115 for (auto&& item : m_items.getContainer())
00116 {
00117 if (item.id == id && item.handler)
00118 {
00119 WorkItem newItem;
00120
00121 newItem.end = Clock::time_point();
00122 newItem.id = 0;
00123
00124
00125
00126
00127
00128 newItem.handler = std::move(item.handler);
00129 item.handler = nullptr;
00130 m_items.push(std::move(newItem));
00131
00132 lk.unlock();
00133
00134 m_checkWork.notify();
00135 return 1;
00136 }
00137 }
00138 return 0;
00139 }
00140
00142
00143
00144 size_t cancelAll()
00145 {
00146
00147
00148 std::unique_lock<std::mutex> lk(m_mtx);
00149 for (auto&& item : m_items.getContainer())
00150 {
00151 if (item.id)
00152 {
00153 item.end = Clock::time_point();
00154 item.id = 0;
00155 }
00156 }
00157 auto ret = m_items.size();
00158
00159 lk.unlock();
00160 m_checkWork.notify();
00161 return ret;
00162 }
00163
00164 private:
00165 using Clock = std::chrono::steady_clock;
00166 TimerQueue(const TimerQueue&) = delete;
00167 TimerQueue& operator=(const TimerQueue&) = delete;
00168
00169 void run()
00170 {
00171 while (!m_finish)
00172 {
00173 auto end = calcWaitTime();
00174 if (end.first)
00175 {
00176
00177
00178 m_checkWork.waitUntil(end.second);
00179 }
00180 else
00181 {
00182
00183 m_checkWork.wait();
00184 }
00185
00186
00187
00188 checkWork();
00189 }
00190
00191
00192
00193 assert(m_items.size() == 0);
00194 }
00195
00196 std::pair<bool, Clock::time_point> calcWaitTime()
00197 {
00198 std::lock_guard<std::mutex> lk(m_mtx);
00199 while (m_items.size())
00200 {
00201 if (m_items.top().handler)
00202 {
00203
00204 return std::make_pair(true, m_items.top().end);
00205 }
00206 else
00207 {
00208
00209 m_items.pop();
00210 }
00211 }
00212
00213
00214
00215 return std::make_pair(false, Clock::time_point());
00216 }
00217
00218 void checkWork()
00219 {
00220 std::unique_lock<std::mutex> lk(m_mtx);
00221 while (m_items.size() && m_items.top().end <= Clock::now())
00222 {
00223 WorkItem item(std::move(m_items.top()));
00224 m_items.pop();
00225
00226 lk.unlock();
00227 if (item.handler)
00228 item.handler(item.id == 0);
00229 lk.lock();
00230 }
00231 }
00232
00233 details::Semaphore m_checkWork;
00234 std::thread m_th;
00235 bool m_finish = false;
00236 uint64_t m_idcounter = 0;
00237
00238 struct WorkItem
00239 {
00240 Clock::time_point end;
00241 uint64_t id;
00242 std::function<void(bool)> handler;
00243 bool operator>(const WorkItem& other) const
00244 {
00245 return end > other.end;
00246 }
00247 };
00248
00249 std::mutex m_mtx;
00250
00251 class Queue
00252 : public std::priority_queue<WorkItem, std::vector<WorkItem>, std::greater<WorkItem>>
00253 {
00254 public:
00255 std::vector<WorkItem>& getContainer()
00256 {
00257 return this->c;
00258 }
00259 } m_items;
00260 };
00261 }
00262
00263 #endif // TIMERQUEUE_H