topic_subscriber.cpp
Go to the documentation of this file.
1 // Subcribes to input topics
2 // Author: Max Schwarz <max.schwarz@ais.uni-bonn.de>
3 
4 #include "topic_subscriber.h"
5 #include "message_queue.h"
6 #include "topic_manager.h"
7 
8 namespace rosbag_fancy
9 {
10 
12  : m_topicManager{topicManager}
13  , m_queue{queue}
14 {
15  ros::NodeHandle nh;
16 
17  for(auto& topic : topicManager.topics())
18  {
19  ros::TransportHints transportHints;
20 
21  if(topic.flags & static_cast<int>(Topic::Flag::UDP))
22  transportHints = transportHints.udp();
23 
24  boost::function<void(const ros::MessageEvent<topic_tools::ShapeShifter const>&)> cb{
25  boost::bind(&TopicSubscriber::handle, this, boost::ref(topic), _1)
26  };
28  topic.name, 10,
29  cb, {}, transportHints
30  ));
31  }
32 
34  boost::bind(&TopicSubscriber::updateStats, this)
35  );
36 }
37 
39 {
40  std::uint64_t bytes = msg.getConstMessage()->size();
41 
42  if(topic.rateLimit != 0.0f)
43  {
44  ros::Time now = ros::Time::now();
46  topic.lastMessageReceivedROSTime = now;
47  else
48  {
49  // Basic token bucket algorithm for rate limiting
50  ros::Duration elapsed = now - topic.lastMessageReceivedROSTime;
51  topic.lastMessageReceivedROSTime = now;
52 
53  topic.throttleAllowance = std::min(2.0f,
54  topic.throttleAllowance + static_cast<float>(elapsed.toSec()) * topic.rateLimit
55  );
56 
57  if(topic.throttleAllowance < 1.0f)
58  return;
59 
60  topic.throttleAllowance -= 1.0f;
61  }
62  }
63 
64  topic.notifyMessage(bytes);
65 
66  if(!m_queue.push({topic.name, msg, &topic}))
67  topic.dropCounter++;
68 }
69 
71 {
72  for(std::size_t i = 0; i < m_topicManager.topics().size(); ++i)
73  {
74  auto& topic = m_topicManager.topics()[i];
75  auto& sub = m_subscribers[i];
76 
77  topic.numPublishers = sub.getNumPublishers();
78  }
79 }
80 
81 
82 }
const boost::shared_ptr< ConstMessage > & getConstMessage() const
decltype(sizeof(void *)) typede size_t)
Definition: doctest.h:501
bool push(const Message &msg)
std::vector< Topic > & topics()
f
TransportHints & udp()
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
SteadyTimer createSteadyTimer(WallDuration period, void(T::*callback)(const SteadyTimerEvent &), T *obj, bool oneshot=false, bool autostart=true) const
void notifyMessage(std::uint64_t bytes)
Definition: topic_manager.h:70
std::vector< ros::Subscriber > m_subscribers
TopicSubscriber(TopicManager &topicManager, MessageQueue &queue)
std::uint64_t dropCounter
Definition: topic_manager.h:52
ros::Time lastMessageReceivedROSTime
Definition: topic_manager.h:43
static Time now()
void handle(Topic &topic, const ros::MessageEvent< topic_tools::ShapeShifter const > &msg)


rosbag_fancy
Author(s):
autogenerated on Fri Dec 9 2022 04:00:10