callback_queue.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #include <atomic>
4 #include <condition_variable>
5 #include <deque>
6 #include <functional>
7 #include <mutex>
8 #include <thread>
9 #include <vector>
10 
11 #include "websocket_logging.hpp"
12 
13 namespace foxglove {
14 
16 public:
17  CallbackQueue(LogCallback logCallback, size_t numThreads = 1)
18  : _logCallback(logCallback)
19  , _quit(false) {
20  for (size_t i = 0; i < numThreads; ++i) {
21  _workerThreads.push_back(std::thread(&CallbackQueue::doWork, this));
22  }
23  }
24 
26  stop();
27  }
28 
29  void stop() {
30  _quit = true;
31  _cv.notify_all();
32  for (auto& thread : _workerThreads) {
33  thread.join();
34  }
35  }
36 
37  void addCallback(std::function<void(void)> cb) {
38  if (_quit) {
39  return;
40  }
41  std::unique_lock<std::mutex> lock(_mutex);
42  _callbackQueue.push_back(cb);
43  _cv.notify_one();
44  }
45 
46 private:
47  void doWork() {
48  while (!_quit) {
49  std::unique_lock<std::mutex> lock(_mutex);
50  _cv.wait(lock, [this] {
51  return (_quit || !_callbackQueue.empty());
52  });
53  if (_quit) {
54  break;
55  } else if (!_callbackQueue.empty()) {
56  std::function<void(void)> cb = _callbackQueue.front();
57  _callbackQueue.pop_front();
58  lock.unlock();
59  try {
60  cb();
61  } catch (const std::exception& ex) {
62  // Should never get here if we catch all exceptions in the callbacks.
63  const std::string msg =
64  std::string("Caught unhandled exception in calback_queue") + ex.what();
66  } catch (...) {
67  _logCallback(WebSocketLogLevel::Error, "Caught unhandled exception in calback_queue");
68  }
69  }
70  }
71  }
72 
74  std::atomic<bool> _quit;
75  std::mutex _mutex;
76  std::condition_variable _cv;
77  std::deque<std::function<void(void)>> _callbackQueue;
78  std::vector<std::thread> _workerThreads;
79 };
80 
81 } // namespace foxglove
CallbackQueue(LogCallback logCallback, size_t numThreads=1)
std::deque< std::function< void(void)> > _callbackQueue
std::condition_variable _cv
std::function< void(WebSocketLogLevel, char const *)> LogCallback
void addCallback(std::function< void(void)> cb)
std::atomic< bool > _quit
std::vector< std::thread > _workerThreads


foxglove_bridge
Author(s): Foxglove
autogenerated on Mon Jul 3 2023 02:12:22