15 #include <boost/bind.hpp> 16 #include <boost/bind/placeholders.hpp> 25 #include <topic_tools/shape_shifter.h> 41 std::string inTopic =
"input";
42 std::string outTopic =
"output";
49 throw std::runtime_error(
"Not enough arguments.\nUsage: repeat IN_TOPIC RATE [OUT_TOPIC].");
53 outTopic = (this->
getMyArgv().size() >= 3 ? this->
getMyArgv()[2] : (inTopic +
"_repeat"));
58 catch (
const std::invalid_argument& e)
60 CRAS_WARN(
"Could not parse the given repeat rate: %s", e.what());
65 const auto inQueueSize = params->getParam(
"in_queue_size", 10_sz,
"messages");
66 const auto outQueueSize = params->getParam(
"out_queue_size", inQueueSize,
"messages");
67 const auto lazy = params->getParam(
"lazy",
false);
68 const auto tcpNoDelay = params->getParam(
"tcp_no_delay",
false);
70 const auto rate = params->getParam(
"rate", defaultRate,
"Hz");
71 this->
rate = std::make_unique<ros::Rate>(
rate);
72 if (params->hasParam(
"max_age"))
74 if (params->hasParam(
"max_repeats"))
75 this->
maxRepeats = params->getParam(
"max_repeats", 10_sz);
77 this->
resetOnMsg = params->getParam(
"reset_on_msg",
true);
82 this->
pubSub = std::make_unique<cras::GenericLazyPubSub>(
89 this->
pubSub->setLazy(
false);
91 opts.transport_hints.tcpNoDelay(
true);
96 CRAS_INFO(
"Created%s repeater from %s to %s at rate %s Hz.",
97 (lazy ?
" lazy" :
""), nh.resolveName(inTopic).c_str(), nh.resolveName(outTopic).c_str(),
110 std::lock_guard<std::mutex> lock(this->
msgMutex);
122 const auto&
msg =
event.getConstMessage();
130 CRAS_ERROR(
"Running repeat with timestamp conditions on message type %s which does not have a header! " 138 cras::optional<ros::Time> stamp;
149 CRAS_INFO_THROTTLE(5.0,
"Received message too old (%.3g s > %.3g s) will be discarded.",
157 5.0,
"Received message is %.3g s older than current message, it will be discarded.",
166 std::lock_guard<std::mutex> lock(this->
msgMutex);
183 if (!this->
pub || this->
msg ==
nullptr)
186 std::lock_guard<std::mutex> lock(this->
msgMutex);
203 this->
pub.template publish(this->
msg);
::cras::BoundParamHelperPtr privateParams(const ::std::string &ns="") const
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
::cras::optional<::std_msgs::Header > getHeader(const ::topic_tools::ShapeShifter &msg)
Get the header field of the given message, if it has any.
void setPeriod(const Duration &period, bool reset=true)
This is a simple implementation of a repeater nodelet.
inline ::std::string to_string(const ::XmlRpc::XmlRpcValue &value)
bool resetOnMsg
Whether to reset the publication timer when a new message arrives.
::topic_tools::ShapeShifter::ConstPtr msg
The last stored message (null if no message has been received yet).
virtual void maybePublish()
Publish the last stored message (if it is eligible).
void everyPeriod(const ::ros::TimerEvent &)
Timer callback. Publish the last stored message if it is eligible.
Nodelet for repeating messages coming at a slower rate (or even just a single message).
bool discardOlderMessages
Whether to discard an incoming message if its stamp is older than the previously accepted message...
TransportHints & tcpNoDelay(bool nodelay=true)
ros::NodeHandle & getMTPrivateNodeHandle() const
::cras::optional<::ros::Time > lastMsgStamp
Time stamp of the last stored message (only filled if inspectStamps() returns true.
::ros::Timer timer
The timer periodically republishing the last stored message.
virtual bool inspectStamps() const
Whether Header should be extracted from the message and its time stamp should undergo inspection...
Tools for more convenient working with ShapeShifter objects.
auto bind_front(F &&f, Args &&... args)
::cras::optional< size_t > maxRepeats
Maximum number of repetitions of a single message (only limited when set).
M_string & getConnectionHeader() const
Lazy subscriber that subscribes only when a paired publisher has subscribers. Version for unknown mes...
::cras::optional< bool > hasHeader
void reset()
Reset the repeater, e.g. after a time jump.
double parseDouble(const ::std::string &string)
TransportHints transport_hints
virtual void onReset(const ::ros::MessageEvent< const ::topic_tools::ShapeShifter > &)
Called when the repeater should be reset. The incoming message can be of any type and should not be e...
ros::NodeHandle & getMTNodeHandle() const
::std::mutex msgMutex
Mutex protecting msg, lastMsgStamp and numRepeats.
const V_string & getMyArgv() const
::std::unique_ptr<::ros::Rate > rate
The desired output rate.
size_t numRepeats
Number of times the last stored message has been repeated.
::cras::BoundParamHelperPtr params(const ::ros::NodeHandle &node, const ::std::string &ns="") const
bool hasHeader(const ::topic_tools::ShapeShifter &msg)
Tell whether the given message has header field.
#define PLUGINLIB_EXPORT_CLASS(class_type, base_class_type)
bool publishOnlyOnTimer
Whether to publish only on timer events, or also when an incoming message is received.
::std::unique_ptr<::cras::GenericLazyPubSub > pubSub
The lazy pair of subscriber and publisher.
void processMessage(const ::ros::MessageEvent< const ::topic_tools::ShapeShifter > &event, ::ros::Publisher &pub)
Record the incoming message if it passes validations, and publish it if publishOnlyOnTimer is false...
::ros::Subscriber resetSub
Subscriber to the reset topic.
::cras::optional<::ros::Duration > maxAge
Maximum age of a message to allow publishing it (the message has to have a Header).
#define CRAS_INFO_THROTTLE(period,...)
::ros::Publisher pub
The publisher of the repeated messages.
#define CRAS_WARN_THROTTLE(period,...)