message_queue.cpp
Go to the documentation of this file.
1 // In-Memory message buffer
2 // Author: Max Schwarz <max.schwarz@ais.uni-bonn.de>
3 
4 #include "message_queue.h"
5 
6 namespace rosbag_fancy
7 {
8 
9 MessageQueue::MessageQueue(uint64_t byteLimit)
10  : m_byteLimit{byteLimit}
11 {
12 }
13 
15 {
16  uint64_t len = msg.size();
17  if(m_bytesInQueue + len > m_byteLimit)
18  return false;
19 
20  {
21  std::unique_lock<std::mutex> lock(m_mutex);
22  m_queue.push(msg);
23  m_bytesInQueue += len;
24  m_msgsInQueue++;
25 
26  m_cond.notify_all();
27  }
28 
29  return true;
30 }
31 
32 boost::optional<MessageQueue::Message> MessageQueue::pop()
33 {
34  std::unique_lock<std::mutex> lock(m_mutex);
35 
36  if(m_shuttingDown)
37  return {};
38 
39  while(m_queue.empty())
40  {
41  m_cond.wait(lock);
42  if(m_shuttingDown)
43  return {};
44  }
45 
46  auto msg = m_queue.front();
47  uint64_t len = msg.size();
48 
49  m_bytesInQueue -= len;
50  m_msgsInQueue--;
51  m_queue.pop();
52 
53  return msg;
54 }
55 
57 {
58  m_shuttingDown = true;
59  m_cond.notify_all();
60 }
61 
62 }
boost::optional< Message > pop()
std::queue< Message > m_queue
Definition: message_queue.h:50
bool push(const Message &msg)
MessageQueue(uint64_t byteLimit)
std::atomic< std::uint64_t > m_bytesInQueue
Definition: message_queue.h:54
std::condition_variable m_cond
Definition: message_queue.h:52
std::atomic< std::uint64_t > m_msgsInQueue
Definition: message_queue.h:55


rosbag_fancy
Author(s):
autogenerated on Fri Dec 9 2022 04:00:09