topic_subscriber.cpp
Go to the documentation of this file.
1 // Subcribes to input topics
2 // Author: Max Schwarz <max.schwarz@ais.uni-bonn.de>
3 
4 #include "topic_subscriber.h"
5 #include "message_queue.h"
6 #include "topic_manager.h"
7 
8 namespace rosbag_fancy
9 {
10 
12  : m_topicManager{topicManager}
13  , m_queue{queue}
14 {
15  ros::NodeHandle nh;
16 
17  for(auto& topic : topicManager.topics())
18  {
19  ros::TransportHints transportHints;
20 
21  if(topic.flags & static_cast<int>(Topic::Flag::UDP))
22  transportHints = transportHints.udp();
23 
24  boost::function<void(const ros::MessageEvent<topic_tools::ShapeShifter const>&)> cb{
25  boost::bind(&TopicSubscriber::handle, this, boost::ref(topic), _1)
26  };
27  m_subscribers.push_back(nh.subscribe<topic_tools::ShapeShifter>(
28  topic.name, 10,
29  cb, {}, transportHints
30  ));
31  }
32 
33  m_timer = nh.createSteadyTimer(ros::WallDuration(1.0),
34  boost::bind(&TopicSubscriber::updateStats, this)
35  );
36 }
37 
39 {
40  std::uint64_t bytes = msg.getConstMessage()->size();
41 
42  if(topic.rateLimit != 0.0f)
43  {
44  ros::Time now = ros::Time::now();
46  topic.lastMessageReceivedROSTime = now;
47  else
48  {
49  // Basic token bucket algorithm for rate limiting
50  ros::Duration elapsed = now - topic.lastMessageReceivedROSTime;
51  topic.lastMessageReceivedROSTime = now;
52 
53  topic.throttleAllowance = std::min(2.0f,
54  topic.throttleAllowance + static_cast<float>(elapsed.toSec()) * topic.rateLimit
55  );
56 
57  if(topic.throttleAllowance < 1.0f)
58  return;
59 
60  topic.throttleAllowance -= 1.0f;
61  }
62  }
63 
64  topic.notifyMessage(bytes);
65 
66  if(!m_queue.push({topic.name, msg, &topic}))
67  topic.dropCounter++;
68 }
69 
71 {
72  for(std::size_t i = 0; i < m_topicManager.topics().size(); ++i)
73  {
74  auto& topic = m_topicManager.topics()[i];
75  auto& sub = m_subscribers[i];
76 
77  topic.numPublishers = sub.getNumPublishers();
78  }
79 }
80 
81 
82 }
rosbag_fancy::MessageQueue
Definition: message_queue.h:23
topic_subscriber.h
rosbag_fancy::MessageQueue::push
bool push(const Message &msg)
Definition: message_queue.cpp:14
rosbag_fancy
Definition: bag_reader.cpp:240
message_queue.h
rosbag_fancy::TopicManager::topics
std::vector< Topic > & topics()
Definition: topic_manager.h:106
ros::TransportHints
rosbag_fancy::Topic::notifyMessage
void notifyMessage(std::uint64_t bytes)
Definition: topic_manager.h:70
rosbag_fancy::TopicSubscriber::updateStats
void updateStats()
Definition: topic_subscriber.cpp:70
rosbag_fancy::TopicSubscriber::TopicSubscriber
TopicSubscriber(TopicManager &topicManager, MessageQueue &queue)
Definition: topic_subscriber.cpp:11
rosbag_fancy::TopicSubscriber::m_subscribers
std::vector< ros::Subscriber > m_subscribers
Definition: topic_subscriber.h:31
ros::MessageEvent::getConstMessage
const boost::shared_ptr< ConstMessage > & getConstMessage() const
std::size_t
decltype(sizeof(void *)) typede size_t)
Definition: doctest.h:501
f
f
topic_manager.h
rosbag_fancy::TopicSubscriber::m_topicManager
TopicManager & m_topicManager
Definition: topic_subscriber.h:28
rosbag_fancy::Topic::Flag::UDP
@ UDP
ros::NodeHandle::subscribe
Subscriber subscribe(const std::string &topic, uint32_t queue_size, const boost::function< void(C)> &callback, const VoidConstPtr &tracked_object=VoidConstPtr(), const TransportHints &transport_hints=TransportHints())
rosbag_fancy::Topic::throttleAllowance
float throttleAllowance
Definition: topic_manager.h:47
rosbag_fancy::Topic::dropCounter
std::uint64_t dropCounter
Definition: topic_manager.h:52
ros::Time
rosbag_fancy::TopicSubscriber::m_queue
MessageQueue & m_queue
Definition: topic_subscriber.h:29
ros::NodeHandle::createSteadyTimer
SteadyTimer createSteadyTimer(SteadyTimerOptions &ops) const
rosbag_fancy::TopicSubscriber::handle
void handle(Topic &topic, const ros::MessageEvent< topic_tools::ShapeShifter const > &msg)
Definition: topic_subscriber.cpp:38
rosbag_fancy::Topic::rateLimit
float rateLimit
Definition: topic_manager.h:35
rosbag_fancy::Topic::lastMessageReceivedROSTime
ros::Time lastMessageReceivedROSTime
Definition: topic_manager.h:43
rosbag_fancy::Topic
Definition: topic_manager.h:15
DurationBase< Duration >::toSec
double toSec() const
ros::WallDuration
topic_tools::ShapeShifter
ros::Duration
ros::TransportHints::udp
TransportHints & udp()
rosbag_fancy::TopicManager
Definition: topic_manager.h:101
ros::NodeHandle
ros::MessageEvent
ros::Time::now
static Time now()


rosbag_fancy
Author(s):
autogenerated on Tue Feb 20 2024 03:20:59