message_queue.cpp
Go to the documentation of this file.
1 // In-Memory message buffer
2 // Author: Max Schwarz <max.schwarz@ais.uni-bonn.de>
3 
4 #include "message_queue.h"
5 
6 namespace rosbag_fancy
7 {
8 
9 MessageQueue::MessageQueue(uint64_t byteLimit)
10  : m_byteLimit{byteLimit}
11 {
12 }
13 
15 {
16  uint64_t len = msg.size();
17  if(m_bytesInQueue + len > m_byteLimit)
18  return false;
19 
20  {
21  std::unique_lock<std::mutex> lock(m_mutex);
22  m_queue.push(msg);
23  m_bytesInQueue += len;
24  m_msgsInQueue++;
25 
26  m_cond.notify_all();
27  }
28 
29  return true;
30 }
31 
32 boost::optional<MessageQueue::Message> MessageQueue::pop()
33 {
34  std::unique_lock<std::mutex> lock(m_mutex);
35 
36  if(m_shuttingDown)
37  return {};
38 
39  while(m_queue.empty())
40  {
41  m_cond.wait(lock);
42  if(m_shuttingDown)
43  return {};
44  }
45 
46  auto msg = m_queue.front();
47  uint64_t len = msg.size();
48 
49  m_bytesInQueue -= len;
50  m_msgsInQueue--;
51  m_queue.pop();
52 
53  return msg;
54 }
55 
57 {
58  m_shuttingDown = true;
59  m_cond.notify_all();
60 }
61 
62 }
rosbag_fancy::MessageQueue::shutdown
void shutdown()
Definition: message_queue.cpp:56
rosbag_fancy::MessageQueue::push
bool push(const Message &msg)
Definition: message_queue.cpp:14
rosbag_fancy::MessageQueue::pop
boost::optional< Message > pop()
Definition: message_queue.cpp:32
rosbag_fancy
Definition: bag_reader.cpp:240
message_queue.h
rosbag_fancy::MessageQueue::MessageQueue
MessageQueue(uint64_t byteLimit)
Definition: message_queue.cpp:9
rosbag_fancy::MessageQueue::Message
Definition: message_queue.h:26
rosbag_fancy::MessageQueue::m_byteLimit
uint64_t m_byteLimit
Definition: message_queue.h:56
rosbag_fancy::MessageQueue::m_mutex
std::mutex m_mutex
Definition: message_queue.h:51
rosbag_fancy::MessageQueue::m_bytesInQueue
std::atomic< std::uint64_t > m_bytesInQueue
Definition: message_queue.h:54
rosbag_fancy::MessageQueue::m_shuttingDown
bool m_shuttingDown
Definition: message_queue.h:58
rosbag_fancy::MessageQueue::m_cond
std::condition_variable m_cond
Definition: message_queue.h:52
rosbag_fancy::MessageQueue::Message::size
uint64_t size() const
Definition: message_queue.h:31
rosbag_fancy::MessageQueue::m_msgsInQueue
std::atomic< std::uint64_t > m_msgsInQueue
Definition: message_queue.h:55
rosbag_fancy::MessageQueue::m_queue
std::queue< Message > m_queue
Definition: message_queue.h:50


rosbag_fancy
Author(s):
autogenerated on Tue Feb 20 2024 03:20:59