throttle_messages.cpp
Go to the documentation of this file.
1 // SPDX-License-Identifier: BSD-3-Clause
2 // SPDX-FileCopyrightText: Czech Technical University in Prague
3 
12 #include <memory>
13 #include <mutex>
14 #include <stdexcept>
15 #include <string>
16 
17 #include <boost/bind.hpp>
18 #include <boost/bind/placeholders.hpp>
19 
20 #include <nodelet/nodelet.h>
22 #include <ros/message_event.h>
23 #include <ros/rate.h>
24 #include <ros/subscribe_options.h>
25 #include <topic_tools/shape_shifter.h>
26 
30 
33 
34 namespace cras
35 {
36 
38 {
39  auto nh = this->getMTPrivateNodeHandle();
40  std::string inTopic = "input";
41  std::string outTopic = "output";
42  ros::Rate defaultRate(1.0);
43 
44  // Mimic the behavior of topic_tools/throttle when called with CLI args
45  if (!this->getMyArgv().empty())
46  {
47  if (this->getMyArgv()[0] != "messages")
48  throw std::runtime_error("First CLI argument of throttle node has to be 'messages'.");
49 
50  if (this->getMyArgv().size() < 3)
51  throw std::runtime_error("Not enough arguments.\nUsage: throttle messages IN_TOPIC RATE [OUT_TOPIC].");
52 
53  nh = this->getMTNodeHandle();
54  inTopic = this->getMyArgv()[1];
55  outTopic = (this->getMyArgv().size() >= 4 ? this->getMyArgv()[3] : (inTopic + "_throttle"));
56  try
57  {
58  defaultRate = ros::Rate(cras::parseDouble(this->getMyArgv()[2]));
59  }
60  catch (const std::invalid_argument& e)
61  {
62  CRAS_WARN("Could not parse the given throttling rate: %s", e.what());
63  }
64  }
65 
66  auto params = this->privateParams();
67  const auto inQueueSize = params->getParam("in_queue_size", 10_sz, "messages");
68  const auto outQueueSize = params->getParam("out_queue_size", inQueueSize, "messages");
69  const auto rate = params->getParam("frequency", defaultRate, "Hz");
70  const auto lazy = params->getParam("lazy", false);
71  const auto tcpNoDelay = params->getParam("tcp_no_delay", false);
72 
73  const auto method = params->getParam("method", "TOKEN_BUCKET"); // TOKEN_BUCKET / THROTTLE
74  if (method == "THROTTLE")
75  {
76  this->limiter = std::make_unique<cras::ThrottleLimiter>(rate);
77  }
78  else // TOKEN_BUCKET and other
79  {
80  if (method != "TOKEN_BUCKET")
81  CRAS_WARN("Unknown rate-limitation method %s. Using TOKEN_BUCKET instead.", method.c_str());
82  const auto bucketCapacity = params->getParam("bucket_capacity", 2_sz, "tokens");
83  const auto initialTokens = params->getParam("initial_tokens", 1_sz, "tokens");
84  this->limiter = std::make_unique<cras::TokenBucketLimiter>(rate, bucketCapacity, initialTokens);
85  }
86 
88  opts.transport_hints.tcpNoDelay(tcpNoDelay);
89  this->pubSub = std::make_unique<cras::GenericLazyPubSub>(
90  nh, inTopic, outTopic, inQueueSize, outQueueSize, cras::bind_front(&ThrottleMessagesNodelet::processMessage, this),
91  opts, this->log);
92 
93  if (!lazy)
94  this->pubSub->setLazy(false);
95 
96  opts.transport_hints.tcpNoDelay(true);
97  auto cb = boost::bind(&ThrottleMessagesNodelet::onReset, this, boost::placeholders::_1);
99  this->resetSub = this->getMTPrivateNodeHandle().subscribe(opts);
100 
101  CRAS_INFO("Created%s throttle from %s to %s at rate %s Hz.",
102  (lazy ? " lazy" : ""), nh.resolveName(inTopic).c_str(), nh.resolveName(outTopic).c_str(),
103  cras::to_string(rate).c_str());
104 }
105 
107 {
108  this->reset();
109 }
110 
112 {
114  std::lock_guard<std::mutex> lock(this->limiterMutex);
115  this->limiter->reset();
116 }
117 
120 {
121  bool shouldPublish;
122  {
123  std::lock_guard<std::mutex> lock(this->limiterMutex);
124  shouldPublish = this->limiter->shouldPublish(ros::Time::now());
125  }
126 
127  if (shouldPublish)
128  pub.publish(event.getConstMessage());
129 }
130 
131 }
132 
cras::ThrottleMessagesNodelet
Nodelet for throttling messages on a topic.
Definition: throttle_messages.h:78
rate_limiter.h
ros::Publisher
cras
cras::ThrottleMessagesNodelet::pubSub
::std::unique_ptr<::cras::GenericLazyPubSub > pubSub
The lazy pair of subscriber and publisher.
Definition: throttle_messages.h:82
NodeletParamHelper< ::nodelet::Nodelet >::privateParams
::cras::BoundParamHelperPtr privateParams(const ::std::string &ns="") const
cras::ThrottleMessagesNodelet::onInit
void onInit() override
Definition: throttle_messages.cpp:37
ros::SubscribeOptions::transport_hints
TransportHints transport_hints
cras::NodeletWithSharedTfBuffer::reset
void reset() override
nodelet::Nodelet::getMTPrivateNodeHandle
ros::NodeHandle & getMTPrivateNodeHandle() const
cras::ThrottleMessagesNodelet::onReset
virtual void onReset(const ::ros::MessageEvent< const ::topic_tools::ShapeShifter > &)
Called when the rate limiter should be reset. The incoming message can be of any type and should not ...
Definition: throttle_messages.cpp:106
CRAS_WARN
#define CRAS_WARN(...)
nodelet::Nodelet::getMyArgv
const V_string & getMyArgv() const
ros::MessageEvent::getConstMessage
const boost::shared_ptr< ConstMessage > & getConstMessage() const
ros::Publisher::publish
void publish(const boost::shared_ptr< M > &message) const
ros::TransportHints::tcpNoDelay
TransportHints & tcpNoDelay(bool nodelay=true)
functional.hpp
CRAS_INFO
#define CRAS_INFO(...)
class_list_macros.h
string_utils.hpp
generic_lazy_pubsub.hpp
Lazy subscriber that subscribes only when a paired publisher has subscribers. Version for unknown mes...
cras::ThrottleMessagesNodelet::resetSub
::ros::Subscriber resetSub
Subscriber to the reset topic.
Definition: throttle_messages.h:85
PLUGINLIB_EXPORT_CLASS
#define PLUGINLIB_EXPORT_CLASS(class_type, base_class_type)
cras::ThrottleMessagesNodelet::limiterMutex
::std::mutex limiterMutex
Mutex for working with the limiter.
Definition: throttle_messages.h:112
throttle_messages.h
This is a simple implementation of a throttle nodelet. It also allows using the more precise token bu...
ros::SubscribeOptions::initByFullCallbackType
void initByFullCallbackType(const std::string &_topic, uint32_t _queue_size, const boost::function< void(P)> &_callback, const boost::function< boost::shared_ptr< typename ParameterAdapter< P >::Message >(void)> &factory_fn=DefaultMessageCreator< typename ParameterAdapter< P >::Message >())
cras::ThrottleMessagesNodelet::processMessage
void processMessage(const ::ros::MessageEvent< const ::topic_tools::ShapeShifter > &event, ::ros::Publisher &pub)
Publish the incoming message if the rate limiter allows.
Definition: throttle_messages.cpp:118
ros::NodeHandle::subscribe
Subscriber subscribe(const std::string &topic, uint32_t queue_size, const boost::function< void(C)> &callback, const VoidConstPtr &tracked_object=VoidConstPtr(), const TransportHints &transport_hints=TransportHints())
rate.h
cras::ThrottleMessagesNodelet::reset
void reset() override
Reset the rate limiter, e.g. after a time jump.
Definition: throttle_messages.cpp:111
cras::bind_front
auto bind_front(F &&f, Args &&... args)
nodelet::Nodelet
ros::SubscribeOptions
cras::ThrottleMessagesNodelet::limiter
::std::unique_ptr<::cras::RateLimiter > limiter
The rate limiter used for limiting output rate.
Definition: throttle_messages.h:109
NodeletParamHelper< ::nodelet::Nodelet >::params
::cras::BoundParamHelperPtr params(const ::ros::NodeHandle &node, const ::std::string &ns="") const
nodelet.h
cras::HasLogger::log
::cras::LogHelperPtr log
ros::Rate
message_event.h
subscribe_options.h
ros::MessageEvent
cras::parseDouble
double parseDouble(const ::std::string &string)
nodelet::Nodelet::getMTNodeHandle
ros::NodeHandle & getMTNodeHandle() const
cras::to_string
inline ::std::string to_string(char *value)
ros::Time::now
static Time now()


cras_topic_tools
Author(s): Martin Pecka
autogenerated on Wed Jan 8 2025 03:50:28