src
topic_subscriber.cpp
Go to the documentation of this file.
1
// Subcribes to input topics
2
// Author: Max Schwarz <max.schwarz@ais.uni-bonn.de>
3
4
#include "
topic_subscriber.h
"
5
#include "
message_queue.h
"
6
#include "
topic_manager.h
"
7
8
namespace
rosbag_fancy
9
{
10
11
TopicSubscriber::TopicSubscriber
(
rosbag_fancy::TopicManager
& topicManager,
rosbag_fancy::MessageQueue
& queue)
12
: m_topicManager{topicManager}
13
, m_queue{queue}
14
{
15
ros::NodeHandle
nh;
16
17
for
(
auto
& topic : topicManager.topics())
18
{
19
ros::TransportHints
transportHints;
20
21
if
(topic.flags &
static_cast<
int
>
(
Topic::Flag::UDP
))
22
transportHints = transportHints.
udp
();
23
24
boost::function<void(
const
ros::MessageEvent<topic_tools::ShapeShifter const>
&)> cb{
25
boost::bind(&
TopicSubscriber::handle
,
this
, boost::ref(topic), _1)
26
};
27
m_subscribers.push_back(nh.
subscribe
<
topic_tools::ShapeShifter
>(
28
topic.name, 10,
29
cb, {}, transportHints
30
));
31
}
32
33
m_timer = nh.
createSteadyTimer
(
ros::WallDuration
(1.0),
34
boost::bind(&
TopicSubscriber::updateStats
,
this
)
35
);
36
}
37
38
void
TopicSubscriber::handle
(
Topic
& topic,
const
ros::MessageEvent<topic_tools::ShapeShifter const>
& msg)
39
{
40
std::uint64_t bytes = msg.
getConstMessage
()->size();
41
42
if
(topic.
rateLimit
!= 0.0f)
43
{
44
ros::Time
now =
ros::Time::now
();
45
if
(topic.
lastMessageReceivedROSTime
==
ros::Time
(0))
46
topic.
lastMessageReceivedROSTime
= now;
47
else
48
{
49
// Basic token bucket algorithm for rate limiting
50
ros::Duration
elapsed = now - topic.
lastMessageReceivedROSTime
;
51
topic.
lastMessageReceivedROSTime
= now;
52
53
topic.
throttleAllowance
= std::min(2.0
f
,
54
topic.
throttleAllowance
+
static_cast<
float
>
(elapsed.
toSec
()) * topic.
rateLimit
55
);
56
57
if
(topic.
throttleAllowance
< 1.0f)
58
return
;
59
60
topic.
throttleAllowance
-= 1.0f;
61
}
62
}
63
64
topic.
notifyMessage
(bytes);
65
66
if
(!
m_queue
.
push
({topic.name, msg, &topic}))
67
topic.
dropCounter
++;
68
}
69
70
void
TopicSubscriber::updateStats
()
71
{
72
for
(
std::size_t
i = 0; i <
m_topicManager
.
topics
().size(); ++i)
73
{
74
auto
& topic =
m_topicManager
.
topics
()[i];
75
auto
& sub =
m_subscribers
[i];
76
77
topic.numPublishers = sub.getNumPublishers();
78
}
79
}
80
81
82
}
rosbag_fancy::MessageQueue
Definition:
message_queue.h:23
topic_subscriber.h
rosbag_fancy::MessageQueue::push
bool push(const Message &msg)
Definition:
message_queue.cpp:14
rosbag_fancy
Definition:
bag_reader.cpp:240
message_queue.h
rosbag_fancy::TopicManager::topics
std::vector< Topic > & topics()
Definition:
topic_manager.h:106
ros::TransportHints
rosbag_fancy::Topic::notifyMessage
void notifyMessage(std::uint64_t bytes)
Definition:
topic_manager.h:70
rosbag_fancy::TopicSubscriber::updateStats
void updateStats()
Definition:
topic_subscriber.cpp:70
rosbag_fancy::TopicSubscriber::TopicSubscriber
TopicSubscriber(TopicManager &topicManager, MessageQueue &queue)
Definition:
topic_subscriber.cpp:11
rosbag_fancy::TopicSubscriber::m_subscribers
std::vector< ros::Subscriber > m_subscribers
Definition:
topic_subscriber.h:31
ros::MessageEvent::getConstMessage
const boost::shared_ptr< ConstMessage > & getConstMessage() const
std::size_t
decltype(sizeof(void *)) typede size_t)
Definition:
doctest.h:501
f
f
topic_manager.h
rosbag_fancy::TopicSubscriber::m_topicManager
TopicManager & m_topicManager
Definition:
topic_subscriber.h:28
rosbag_fancy::Topic::Flag::UDP
@ UDP
ros::NodeHandle::subscribe
Subscriber subscribe(const std::string &topic, uint32_t queue_size, const boost::function< void(C)> &callback, const VoidConstPtr &tracked_object=VoidConstPtr(), const TransportHints &transport_hints=TransportHints())
rosbag_fancy::Topic::throttleAllowance
float throttleAllowance
Definition:
topic_manager.h:47
rosbag_fancy::Topic::dropCounter
std::uint64_t dropCounter
Definition:
topic_manager.h:52
ros::Time
rosbag_fancy::TopicSubscriber::m_queue
MessageQueue & m_queue
Definition:
topic_subscriber.h:29
ros::NodeHandle::createSteadyTimer
SteadyTimer createSteadyTimer(SteadyTimerOptions &ops) const
rosbag_fancy::TopicSubscriber::handle
void handle(Topic &topic, const ros::MessageEvent< topic_tools::ShapeShifter const > &msg)
Definition:
topic_subscriber.cpp:38
rosbag_fancy::Topic::rateLimit
float rateLimit
Definition:
topic_manager.h:35
rosbag_fancy::Topic::lastMessageReceivedROSTime
ros::Time lastMessageReceivedROSTime
Definition:
topic_manager.h:43
rosbag_fancy::Topic
Definition:
topic_manager.h:15
DurationBase< Duration >::toSec
double toSec() const
ros::WallDuration
topic_tools::ShapeShifter
ros::Duration
ros::TransportHints::udp
TransportHints & udp()
rosbag_fancy::TopicManager
Definition:
topic_manager.h:101
ros::NodeHandle
ros::MessageEvent
ros::Time::now
static Time now()
rosbag_fancy
Author(s):
autogenerated on Tue Feb 20 2024 03:20:59