repeat.cpp
Go to the documentation of this file.
1 // SPDX-License-Identifier: BSD-3-Clause
2 // SPDX-FileCopyrightText: Czech Technical University in Prague
3 
10 #include <memory>
11 #include <mutex>
12 #include <stdexcept>
13 #include <string>
14 
15 #include <boost/bind.hpp>
16 #include <boost/bind/placeholders.hpp>
17 
18 #include <nodelet/nodelet.h>
20 #include <ros/message_event.h>
21 #include <ros/publisher.h>
22 #include <ros/rate.h>
23 #include <ros/subscribe_options.h>
24 #include <ros/time.h>
25 #include <topic_tools/shape_shifter.h>
26 
30 
34 
35 namespace cras
36 {
37 
39 {
40  auto nh = this->getMTPrivateNodeHandle();
41  std::string inTopic = "input";
42  std::string outTopic = "output";
43  ros::Rate defaultRate(1.0);
44 
45  // Allow basic configuration via command line args.
46  if (!this->getMyArgv().empty())
47  {
48  if (this->getMyArgv().size() == 1)
49  throw std::runtime_error("Not enough arguments.\nUsage: repeat IN_TOPIC RATE [OUT_TOPIC].");
50 
51  nh = this->getMTNodeHandle();
52  inTopic = this->getMyArgv()[0];
53  outTopic = (this->getMyArgv().size() >= 3 ? this->getMyArgv()[2] : (inTopic + "_repeat"));
54  try
55  {
56  defaultRate = ros::Rate(cras::parseDouble(this->getMyArgv()[1]));
57  }
58  catch (const std::invalid_argument& e)
59  {
60  CRAS_WARN("Could not parse the given repeat rate: %s", e.what());
61  }
62  }
63 
64  auto params = this->privateParams();
65  const auto inQueueSize = params->getParam("in_queue_size", 10_sz, "messages");
66  const auto outQueueSize = params->getParam("out_queue_size", inQueueSize, "messages");
67  const auto lazy = params->getParam("lazy", false);
68  const auto tcpNoDelay = params->getParam("tcp_no_delay", false);
69 
70  const auto rate = params->getParam("rate", defaultRate, "Hz");
71  this->rate = std::make_unique<ros::Rate>(rate);
72  if (params->hasParam("max_age"))
73  this->maxAge = params->getParam("max_age", ros::Duration(1), "s");
74  if (params->hasParam("max_repeats"))
75  this->maxRepeats = params->getParam("max_repeats", 10_sz);
76  this->discardOlderMessages = params->getParam("discard_older_messages", false);
77  this->resetOnMsg = params->getParam("reset_on_msg", true);
78  this->publishOnlyOnTimer = params->getParam("publish_only_on_timer", false);
79 
81  opts.transport_hints.tcpNoDelay(tcpNoDelay);
82  this->pubSub = std::make_unique<cras::GenericLazyPubSub>(
83  nh, inTopic, outTopic, inQueueSize, outQueueSize, cras::bind_front(&RepeatMessagesNodelet::processMessage, this),
84  opts, this->log);
85 
86  this->timer = nh.createTimer(this->rate->expectedCycleTime(), &RepeatMessagesNodelet::everyPeriod, this);
87 
88  if (!lazy)
89  this->pubSub->setLazy(false);
90 
91  opts.transport_hints.tcpNoDelay(true);
92  auto cb = boost::bind(&RepeatMessagesNodelet::onReset, this, boost::placeholders::_1);
94  this->resetSub = this->getMTPrivateNodeHandle().subscribe(opts);
95 
96  CRAS_INFO("Created%s repeater from %s to %s at rate %s Hz.",
97  (lazy ? " lazy" : ""), nh.resolveName(inTopic).c_str(), nh.resolveName(outTopic).c_str(),
98  cras::to_string(rate).c_str());
99 }
100 
102 {
103  this->reset();
104 }
105 
107 {
109 
110  this->timer.setPeriod(this->rate->expectedCycleTime(), true);
111 
112  std::lock_guard<std::mutex> lock(this->msgMutex);
113  this->msg.reset();
114  this->numRepeats = 0;
115  this->lastMsgStamp.reset();
116 }
117 
120 {
121  if (!this->pub)
122  this->pub = pub;
123 
124  const auto& msg = event.getConstMessage();
125 
126  if (!this->hasHeader.has_value() && this->inspectStamps())
127  {
128  this->hasHeader = cras::hasHeader(*msg);
129 
130  if (!this->hasHeader.value())
131  {
132  CRAS_ERROR("Running repeat with timestamp conditions on message type %s which does not have a header! "
133  "Ignoring all messages.", event.getConnectionHeader()["type"].c_str());
134  }
135  }
136 
137  if (this->inspectStamps() && !this->hasHeader.value())
138  return;
139 
140  cras::optional<ros::Time> stamp;
141  // If inspecting time stamps is required, deserialize the Header and validate the time stamp.
142  if (this->inspectStamps())
143  {
144  // This is potentially unsafe if the subscribed message actually does not have a Header field at the beginning.
145  const auto header = cras::getHeader(*msg);
146  if (header.has_value())
147  {
148  stamp = header->stamp;
149  if (this->maxAge.has_value() && (stamp.value() + this->maxAge.value()) < ros::Time::now())
150  {
151  CRAS_INFO_THROTTLE(5.0, "Received message too old (%.3g s > %.3g s) will be discarded.",
152  (ros::Time::now() - stamp.value()).toSec(), this->maxAge.value().toSec());
153  return;
154  }
155  if (this->discardOlderMessages && this->lastMsgStamp.has_value()
156  && stamp.value() < this->lastMsgStamp.value())
157  {
159  5.0, "Received message is %.3g s older than current message, it will be discarded.",
160  (this->lastMsgStamp.value() - stamp.value()).toSec());
161  return;
162  }
163  }
164  }
165 
166  // Record the incoming message.
167  {
168  std::lock_guard<std::mutex> lock(this->msgMutex);
169  this->msg = msg;
170  this->numRepeats = 0;
171  this->lastMsgStamp = stamp;
172  }
173 
174  // Republish the message right away if the configuration says so.
175  if (!this->publishOnlyOnTimer)
176  this->maybePublish();
177 
178  // If resetOnMsg, we reset the publication timer to start counting from zero.
179  if (this->resetOnMsg)
180  this->timer.setPeriod(this->rate->expectedCycleTime(), true);
181 }
182 
184 {
185  if (!this->pub || this->msg == nullptr)
186  return;
187 
188  std::lock_guard<std::mutex> lock(this->msgMutex);
189 
190  if (this->maxRepeats.has_value() && this->numRepeats > this->maxRepeats.value())
191  {
192  CRAS_WARN_THROTTLE(5.0, "Message already republished %zu times.", this->numRepeats);
193  return;
194  }
195 
196  if (this->inspectStamps() && this->maxAge.has_value() && this->lastMsgStamp.has_value() &&
197  (this->lastMsgStamp.value() + this->maxAge.value()) < ros::Time::now())
198  {
199  CRAS_WARN_THROTTLE(5.0, "Message too old (%.3g s > %.3g s) will not be republished.",
200  (ros::Time::now() - this->lastMsgStamp.value()).toSec(), this->maxAge.value().toSec());
201  return;
202  }
203 
204  this->numRepeats += 1;
205  this->pub.template publish(this->msg);
206 }
207 
209 {
210  return this->maxAge.has_value() || this->discardOlderMessages;
211 }
212 
214 {
215  this->maybePublish();
216 }
217 
218 }
219 
optional.hpp
cras::RepeatMessagesNodelet::resetSub
::ros::Subscriber resetSub
Subscriber to the reset topic.
Definition: repeat.h:101
ros::Publisher
cras::RepeatMessagesNodelet::onInit
void onInit() override
Definition: repeat.cpp:38
cras
cras::hasHeader
bool hasHeader(const ::topic_tools::ShapeShifter &msg)
Tell whether the given message has header field.
cras::RepeatMessagesNodelet::everyPeriod
void everyPeriod(const ::ros::TimerEvent &)
Timer callback. Publish the last stored message if it is eligible.
Definition: repeat.cpp:213
shape_shifter.h
Tools for more convenient working with ShapeShifter objects.
cras::RepeatMessagesNodelet::maxAge
::cras::optional<::ros::Duration > maxAge
Maximum age of a message to allow publishing it (the message has to have a Header).
Definition: repeat.h:147
NodeletParamHelper< ::nodelet::Nodelet >::privateParams
::cras::BoundParamHelperPtr privateParams(const ::std::string &ns="") const
cras::RepeatMessagesNodelet::timer
::ros::Timer timer
The timer periodically republishing the last stored message.
Definition: repeat.h:173
time.h
cras::RepeatMessagesNodelet::rate
::std::unique_ptr<::ros::Rate > rate
The desired output rate.
Definition: repeat.h:141
ros::SubscribeOptions::transport_hints
TransportHints transport_hints
cras::NodeletWithSharedTfBuffer::reset
void reset() override
nodelet::Nodelet::getMTPrivateNodeHandle
ros::NodeHandle & getMTPrivateNodeHandle() const
cras::RepeatMessagesNodelet::inspectStamps
virtual bool inspectStamps() const
Whether Header should be extracted from the message and its time stamp should undergo inspection.
Definition: repeat.cpp:208
cras::RepeatMessagesNodelet::discardOlderMessages
bool discardOlderMessages
Whether to discard an incoming message if its stamp is older than the previously accepted message.
Definition: repeat.h:150
CRAS_WARN
#define CRAS_WARN(...)
nodelet::Nodelet::getMyArgv
const V_string & getMyArgv() const
CRAS_ERROR
#define CRAS_ERROR(...)
publisher.h
cras::RepeatMessagesNodelet::maybePublish
virtual void maybePublish()
Publish the last stored message (if it is eligible).
Definition: repeat.cpp:183
ros::TransportHints::tcpNoDelay
TransportHints & tcpNoDelay(bool nodelay=true)
functional.hpp
CRAS_INFO
#define CRAS_INFO(...)
class_list_macros.h
string_utils.hpp
generic_lazy_pubsub.hpp
Lazy subscriber that subscribes only when a paired publisher has subscribers. Version for unknown mes...
cras::RepeatMessagesNodelet::publishOnlyOnTimer
bool publishOnlyOnTimer
Whether to publish only on timer events, or also when an incoming message is received.
Definition: repeat.h:156
PLUGINLIB_EXPORT_CLASS
#define PLUGINLIB_EXPORT_CLASS(class_type, base_class_type)
cras::RepeatMessagesNodelet::pub
::ros::Publisher pub
The publisher of the repeated messages.
Definition: repeat.h:176
ros::Timer::setPeriod
void setPeriod(const Duration &period, bool reset=true)
cras::RepeatMessagesNodelet::reset
void reset() override
Reset the repeater, e.g. after a time jump.
Definition: repeat.cpp:106
ros::SubscribeOptions::initByFullCallbackType
void initByFullCallbackType(const std::string &_topic, uint32_t _queue_size, const boost::function< void(P)> &_callback, const boost::function< boost::shared_ptr< typename ParameterAdapter< P >::Message >(void)> &factory_fn=DefaultMessageCreator< typename ParameterAdapter< P >::Message >())
ros::NodeHandle::subscribe
Subscriber subscribe(const std::string &topic, uint32_t queue_size, const boost::function< void(C)> &callback, const VoidConstPtr &tracked_object=VoidConstPtr(), const TransportHints &transport_hints=TransportHints())
rate.h
ros::TimerEvent
cras::RepeatMessagesNodelet::onReset
virtual void onReset(const ::ros::MessageEvent< const ::topic_tools::ShapeShifter > &)
Called when the repeater should be reset. The incoming message can be of any type and should not be e...
Definition: repeat.cpp:101
cras::RepeatMessagesNodelet::pubSub
::std::unique_ptr<::cras::GenericLazyPubSub > pubSub
The lazy pair of subscriber and publisher.
Definition: repeat.h:98
cras::RepeatMessagesNodelet::numRepeats
size_t numRepeats
Number of times the last stored message has been repeated.
Definition: repeat.h:168
cras::bind_front
auto bind_front(F &&f, Args &&... args)
CRAS_INFO_THROTTLE
#define CRAS_INFO_THROTTLE(period,...)
nodelet::Nodelet
ros::SubscribeOptions
CRAS_WARN_THROTTLE
#define CRAS_WARN_THROTTLE(period,...)
cras::RepeatMessagesNodelet::processMessage
void processMessage(const ::ros::MessageEvent< const ::topic_tools::ShapeShifter > &event, ::ros::Publisher &pub)
Record the incoming message if it passes validations, and publish it if publishOnlyOnTimer is false.
Definition: repeat.cpp:118
cras::RepeatMessagesNodelet
Nodelet for repeating messages coming at a slower rate (or even just a single message).
Definition: repeat.h:94
NodeletParamHelper< ::nodelet::Nodelet >::params
::cras::BoundParamHelperPtr params(const ::ros::NodeHandle &node, const ::std::string &ns="") const
nodelet.h
cras::HasLogger::log
::cras::LogHelperPtr log
ros::Rate
repeat.h
This is a simple implementation of a repeater nodelet.
cras::RepeatMessagesNodelet::resetOnMsg
bool resetOnMsg
Whether to reset the publication timer when a new message arrives.
Definition: repeat.h:153
cras::RepeatMessagesNodelet::hasHeader
::cras::optional< bool > hasHeader
Definition: repeat.h:170
ros::MessageEvent::getConnectionHeader
M_string & getConnectionHeader() const
header
const std::string header
cras::RepeatMessagesNodelet::lastMsgStamp
::cras::optional<::ros::Time > lastMsgStamp
Time stamp of the last stored message (only filled if inspectStamps() returns true.
Definition: repeat.h:165
ros::Duration
message_event.h
subscribe_options.h
cras::getHeader
::cras::optional<::std_msgs::Header > getHeader(const ::topic_tools::ShapeShifter &msg)
Get the header field of the given message, if it has any.
cras::RepeatMessagesNodelet::maxRepeats
::cras::optional< size_t > maxRepeats
Maximum number of repetitions of a single message (only limited when set).
Definition: repeat.h:144
cras::RepeatMessagesNodelet::msgMutex
::std::mutex msgMutex
Mutex protecting msg, lastMsgStamp and numRepeats.
Definition: repeat.h:159
ros::MessageEvent
cras::parseDouble
double parseDouble(const ::std::string &string)
nodelet::Nodelet::getMTNodeHandle
ros::NodeHandle & getMTNodeHandle() const
cras::to_string
inline ::std::string to_string(char *value)
cras::RepeatMessagesNodelet::msg
::topic_tools::ShapeShifter::ConstPtr msg
The last stored message (null if no message has been received yet).
Definition: repeat.h:162
ros::Time::now
static Time now()


cras_topic_tools
Author(s): Martin Pecka
autogenerated on Wed Jan 8 2025 03:50:28