src
message_queue.cpp
Go to the documentation of this file.
1
// In-Memory message buffer
2
// Author: Max Schwarz <max.schwarz@ais.uni-bonn.de>
3
4
#include "
message_queue.h
"
5
6
namespace
rosbag_fancy
7
{
8
9
MessageQueue::MessageQueue
(uint64_t byteLimit)
10
: m_byteLimit{byteLimit}
11
{
12
}
13
14
bool
MessageQueue::push
(
const
rosbag_fancy::MessageQueue::Message
& msg)
15
{
16
uint64_t len = msg.
size
();
17
if
(
m_bytesInQueue
+ len >
m_byteLimit
)
18
return
false
;
19
20
{
21
std::unique_lock<std::mutex> lock(
m_mutex
);
22
m_queue
.push(msg);
23
m_bytesInQueue
+= len;
24
m_msgsInQueue
++;
25
26
m_cond
.notify_all();
27
}
28
29
return
true
;
30
}
31
32
boost::optional<MessageQueue::Message>
MessageQueue::pop
()
33
{
34
std::unique_lock<std::mutex> lock(
m_mutex
);
35
36
if
(
m_shuttingDown
)
37
return
{};
38
39
while
(
m_queue
.empty())
40
{
41
m_cond
.wait(lock);
42
if
(
m_shuttingDown
)
43
return
{};
44
}
45
46
auto
msg =
m_queue
.front();
47
uint64_t len = msg.size();
48
49
m_bytesInQueue
-= len;
50
m_msgsInQueue
--;
51
m_queue
.pop();
52
53
return
msg;
54
}
55
56
void
MessageQueue::shutdown
()
57
{
58
m_shuttingDown
=
true
;
59
m_cond
.notify_all();
60
}
61
62
}
rosbag_fancy::MessageQueue::shutdown
void shutdown()
Definition:
message_queue.cpp:56
rosbag_fancy::MessageQueue::push
bool push(const Message &msg)
Definition:
message_queue.cpp:14
rosbag_fancy::MessageQueue::pop
boost::optional< Message > pop()
Definition:
message_queue.cpp:32
rosbag_fancy
Definition:
bag_reader.cpp:240
message_queue.h
rosbag_fancy::MessageQueue::MessageQueue
MessageQueue(uint64_t byteLimit)
Definition:
message_queue.cpp:9
rosbag_fancy::MessageQueue::Message
Definition:
message_queue.h:26
rosbag_fancy::MessageQueue::m_byteLimit
uint64_t m_byteLimit
Definition:
message_queue.h:56
rosbag_fancy::MessageQueue::m_mutex
std::mutex m_mutex
Definition:
message_queue.h:51
rosbag_fancy::MessageQueue::m_bytesInQueue
std::atomic< std::uint64_t > m_bytesInQueue
Definition:
message_queue.h:54
rosbag_fancy::MessageQueue::m_shuttingDown
bool m_shuttingDown
Definition:
message_queue.h:58
rosbag_fancy::MessageQueue::m_cond
std::condition_variable m_cond
Definition:
message_queue.h:52
rosbag_fancy::MessageQueue::Message::size
uint64_t size() const
Definition:
message_queue.h:31
rosbag_fancy::MessageQueue::m_msgsInQueue
std::atomic< std::uint64_t > m_msgsInQueue
Definition:
message_queue.h:55
rosbag_fancy::MessageQueue::m_queue
std::queue< Message > m_queue
Definition:
message_queue.h:50
rosbag_fancy
Author(s):
autogenerated on Tue Feb 20 2024 03:20:59