Go to the documentation of this file.
17 #include <boost/bind.hpp>
18 #include <boost/bind/placeholders.hpp>
25 #include <topic_tools/shape_shifter.h>
40 std::string inTopic =
"input";
41 std::string outTopic =
"output";
48 throw std::runtime_error(
"First CLI argument of throttle node has to be 'messages'.");
51 throw std::runtime_error(
"Not enough arguments.\nUsage: throttle messages IN_TOPIC RATE [OUT_TOPIC].");
55 outTopic = (this->
getMyArgv().size() >= 4 ? this->
getMyArgv()[3] : (inTopic +
"_throttle"));
60 catch (
const std::invalid_argument& e)
62 CRAS_WARN(
"Could not parse the given throttling rate: %s", e.what());
67 const auto inQueueSize =
params->getParam(
"in_queue_size", 10_sz,
"messages");
68 const auto outQueueSize =
params->getParam(
"out_queue_size", inQueueSize,
"messages");
69 const auto rate =
params->getParam(
"frequency", defaultRate,
"Hz");
70 const auto lazy =
params->getParam(
"lazy",
false);
71 const auto tcpNoDelay =
params->getParam(
"tcp_no_delay",
false);
73 const auto method =
params->getParam(
"method",
"TOKEN_BUCKET");
74 if (method ==
"THROTTLE")
76 this->
limiter = std::make_unique<cras::ThrottleLimiter>(rate);
80 if (method !=
"TOKEN_BUCKET")
81 CRAS_WARN(
"Unknown rate-limitation method %s. Using TOKEN_BUCKET instead.", method.c_str());
82 const auto bucketCapacity =
params->getParam(
"bucket_capacity", 2_sz,
"tokens");
83 const auto initialTokens =
params->getParam(
"initial_tokens", 1_sz,
"tokens");
84 this->
limiter = std::make_unique<cras::TokenBucketLimiter>(rate, bucketCapacity, initialTokens);
89 this->
pubSub = std::make_unique<cras::GenericLazyPubSub>(
94 this->
pubSub->setLazy(
false);
101 CRAS_INFO(
"Created%s throttle from %s to %s at rate %s Hz.",
102 (lazy ?
" lazy" :
""), nh.resolveName(inTopic).c_str(), nh.resolveName(outTopic).c_str(),
Nodelet for throttling messages on a topic.
::std::unique_ptr<::cras::GenericLazyPubSub > pubSub
The lazy pair of subscriber and publisher.
::cras::BoundParamHelperPtr privateParams(const ::std::string &ns="") const
TransportHints transport_hints
ros::NodeHandle & getMTPrivateNodeHandle() const
virtual void onReset(const ::ros::MessageEvent< const ::topic_tools::ShapeShifter > &)
Called when the rate limiter should be reset. The incoming message can be of any type and should not ...
const V_string & getMyArgv() const
const boost::shared_ptr< ConstMessage > & getConstMessage() const
void publish(const boost::shared_ptr< M > &message) const
TransportHints & tcpNoDelay(bool nodelay=true)
Lazy subscriber that subscribes only when a paired publisher has subscribers. Version for unknown mes...
::ros::Subscriber resetSub
Subscriber to the reset topic.
#define PLUGINLIB_EXPORT_CLASS(class_type, base_class_type)
::std::mutex limiterMutex
Mutex for working with the limiter.
This is a simple implementation of a throttle nodelet. It also allows using the more precise token bu...
void initByFullCallbackType(const std::string &_topic, uint32_t _queue_size, const boost::function< void(P)> &_callback, const boost::function< boost::shared_ptr< typename ParameterAdapter< P >::Message >(void)> &factory_fn=DefaultMessageCreator< typename ParameterAdapter< P >::Message >())
void processMessage(const ::ros::MessageEvent< const ::topic_tools::ShapeShifter > &event, ::ros::Publisher &pub)
Publish the incoming message if the rate limiter allows.
Subscriber subscribe(const std::string &topic, uint32_t queue_size, const boost::function< void(C)> &callback, const VoidConstPtr &tracked_object=VoidConstPtr(), const TransportHints &transport_hints=TransportHints())
void reset() override
Reset the rate limiter, e.g. after a time jump.
auto bind_front(F &&f, Args &&... args)
::std::unique_ptr<::cras::RateLimiter > limiter
The rate limiter used for limiting output rate.
::cras::BoundParamHelperPtr params(const ::ros::NodeHandle &node, const ::std::string &ns="") const
double parseDouble(const ::std::string &string)
ros::NodeHandle & getMTNodeHandle() const
inline ::std::string to_string(char *value)
cras_topic_tools
Author(s): Martin Pecka
autogenerated on Wed Jan 8 2025 03:50:28