foxglove_bridge_base
include
foxglove_bridge
callback_queue.hpp
Go to the documentation of this file.
1
#pragma once
2
3
#include <atomic>
4
#include <condition_variable>
5
#include <deque>
6
#include <functional>
7
#include <mutex>
8
#include <thread>
9
#include <vector>
10
11
#include "
websocket_logging.hpp
"
12
13
namespace
foxglove
{
14
15
class
CallbackQueue
{
16
public
:
17
CallbackQueue
(
LogCallback
logCallback,
size_t
numThreads = 1)
18
:
_logCallback
(logCallback)
19
,
_quit
(false) {
20
for
(
size_t
i = 0; i < numThreads; ++i) {
21
_workerThreads
.push_back(std::thread(&
CallbackQueue::doWork
,
this
));
22
}
23
}
24
25
~CallbackQueue
() {
26
stop
();
27
}
28
29
void
stop
() {
30
_quit
=
true
;
31
_cv
.notify_all();
32
for
(
auto
& thread :
_workerThreads
) {
33
thread.join();
34
}
35
}
36
37
void
addCallback
(std::function<
void
(
void
)> cb) {
38
if
(
_quit
) {
39
return
;
40
}
41
std::unique_lock<std::mutex> lock(
_mutex
);
42
_callbackQueue
.push_back(cb);
43
_cv
.notify_one();
44
}
45
46
private
:
47
void
doWork
() {
48
while
(!
_quit
) {
49
std::unique_lock<std::mutex> lock(
_mutex
);
50
_cv
.wait(lock, [
this
] {
51
return
(
_quit
|| !
_callbackQueue
.empty());
52
});
53
if
(
_quit
) {
54
break
;
55
}
else
if
(!
_callbackQueue
.empty()) {
56
std::function<void(
void
)> cb =
_callbackQueue
.front();
57
_callbackQueue
.pop_front();
58
lock.unlock();
59
try
{
60
cb();
61
}
catch
(
const
std::exception& ex) {
62
// Should never get here if we catch all exceptions in the callbacks.
63
const
std::string msg =
64
std::string(
"Caught unhandled exception in calback_queue"
) + ex.what();
65
_logCallback
(
WebSocketLogLevel::Error
, msg.c_str());
66
}
catch
(...) {
67
_logCallback
(
WebSocketLogLevel::Error
,
"Caught unhandled exception in calback_queue"
);
68
}
69
}
70
}
71
}
72
73
LogCallback
_logCallback
;
74
std::atomic<bool>
_quit
;
75
std::mutex
_mutex
;
76
std::condition_variable
_cv
;
77
std::deque<std::function<void(
void
)>>
_callbackQueue
;
78
std::vector<std::thread>
_workerThreads
;
79
};
80
81
}
// namespace foxglove
foxglove::CallbackQueue::stop
void stop()
Definition:
callback_queue.hpp:29
foxglove::CallbackQueue::doWork
void doWork()
Definition:
callback_queue.hpp:47
foxglove
Definition:
base64.hpp:8
foxglove::CallbackQueue::_quit
std::atomic< bool > _quit
Definition:
callback_queue.hpp:74
foxglove::CallbackQueue::_workerThreads
std::vector< std::thread > _workerThreads
Definition:
callback_queue.hpp:78
foxglove::WebSocketLogLevel::Error
@ Error
foxglove::LogCallback
std::function< void(WebSocketLogLevel, char const *)> LogCallback
Definition:
websocket_logging.hpp:12
foxglove::CallbackQueue::~CallbackQueue
~CallbackQueue()
Definition:
callback_queue.hpp:25
foxglove::CallbackQueue::_callbackQueue
std::deque< std::function< void(void)> > _callbackQueue
Definition:
callback_queue.hpp:77
foxglove::CallbackQueue::_logCallback
LogCallback _logCallback
Definition:
callback_queue.hpp:73
foxglove::CallbackQueue::CallbackQueue
CallbackQueue(LogCallback logCallback, size_t numThreads=1)
Definition:
callback_queue.hpp:17
foxglove::CallbackQueue::_cv
std::condition_variable _cv
Definition:
callback_queue.hpp:76
foxglove::CallbackQueue::_mutex
std::mutex _mutex
Definition:
callback_queue.hpp:75
foxglove::CallbackQueue
Definition:
callback_queue.hpp:15
foxglove::CallbackQueue::addCallback
void addCallback(std::function< void(void)> cb)
Definition:
callback_queue.hpp:37
websocket_logging.hpp
foxglove_bridge
Author(s): Foxglove
autogenerated on Tue May 20 2025 02:34:26