subscriber_impl.h
Go to the documentation of this file.
1 // *****************************************************************************
2 //
3 // Copyright (c) 2015, Southwest Research Institute® (SwRI®)
4 // All rights reserved.
5 //
6 // Redistribution and use in source and binary forms, with or without
7 // modification, are permitted provided that the following conditions are met:
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above copyright
11 // notice, this list of conditions and the following disclaimer in the
12 // documentation and/or other materials provided with the distribution.
13 // * Neither the name of Southwest Research Institute® (SwRI®) nor the
14 // names of its contributors may be used to endorse or promote products
15 // derived from this software without specific prior written permission.
16 //
17 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 // ARE DISCLAIMED. IN NO EVENT SHALL SOUTHWEST RESEARCH INSTITUTE BE LIABLE FOR ANY
21 // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
22 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
23 // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
24 // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
26 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 //
28 // *****************************************************************************
29 #ifndef SWRI_ROSCPP_SUBSCRIBER_IMPL_H_
30 #define SWRI_ROSCPP_SUBSCRIBER_IMPL_H_
31 
32 #include <std_msgs/Header.h>
33 #include <ros/subscriber.h>
34 
35 namespace swri
36 {
37 class Subscriber;
39 {
40  protected:
42  std::string unmapped_topic_;
43  std::string mapped_topic_;
44 
46 
49 
53 
57 
62 
63  void processHeader(const ros::Time &stamp)
64  {
65  ros::Time now = ros::Time::now();
66 
67  // Check for timeouts so that we can correctly increment the
68  // timeout count.
69  checkTimeout(now);
70 
71  // Do not use this message for statistics if it is arriving out of order or time
72  // synchronization makes it appear that the new timestamp is from the future
73  if ((stamp >= last_header_stamp_) && (now >= stamp)) {
75 
76  if (!stamp.isZero() && !now.isZero()) {
77  ros::Duration latency = now - stamp;
78  if (message_count_ == 1) {
79  min_latency_ = latency;
80  max_latency_ = latency;
81  total_latency_ = latency;
82  } else {
83  min_latency_ = std::min(min_latency_, latency);
84  max_latency_ = std::max(max_latency_, latency);
85  double latency_weight_ = 0.1;
86  total_latency_.fromSec(latency_weight_*latency.toSec() + (1-latency_weight_)*total_latency_.toSec());
87  }
88  }
89 
90  if (message_count_ > 1) {
91  ros::Duration period = now - last_receive_time_;
92  if (message_count_ == 2) {
93  min_period_ = period;
94  max_period_ = period;
95  total_periods_ = period;
96  } else if (message_count_ > 2) {
97  min_period_ = std::min(min_period_, period);
98  max_period_ = std::max(max_period_, period);
99  total_periods_ += period;
100  }
101  }
102  }
103  // Reset the timeout condition to false.
104  in_timeout_ = false;
105 
106  last_receive_time_ = now;
107  last_header_stamp_ = stamp;
108  }
109 
110  void checkTimeout(const ros::Time &now)
111  {
112  if (blocking_timeout_) {
113  return;
114  }
115 
116  if (in_timeout_ || timeout_ <= ros::Duration(0.0)) {
117  return;
118  }
119 
120  if (message_count_ == 0) {
121  return;
122  }
123 
124  if (age(now) > timeout_) {
125  in_timeout_ = true;
126  timeout_count_++;
127  }
128  }
129 
130 
131  public:
133  unmapped_topic_("N/A"),
134  mapped_topic_("N/A"),
135  message_count_(0),
136  timeout_(-1.0),
137  in_timeout_(false),
138  timeout_count_(0),
139  blocking_timeout_(false)
140  {
141  resetStatistics();
142  }
143 
144  virtual ~SubscriberImpl()
145  {
146  }
147 
148  const std::string& mappedTopic() const
149  {
150  return mapped_topic_;
151  }
152 
153  const std::string& unmappedTopic() const
154  {
155  return unmapped_topic_;
156  }
157 
158  int numPublishers() const
159  {
160  return sub_.getNumPublishers();
161  }
162 
164  {
165  message_count_ = 0;
166  in_timeout_ = false;
167  timeout_count_ = 0;
168  }
169 
170  int messageCount() const
171  {
172  return message_count_;
173  }
174 
175  ros::Duration age(const ros::Time &now) const
176  {
177  if (message_count_ < 1) {
178  return ros::DURATION_MAX;
179  } else if (last_header_stamp_.isValid()) {
180  return now - last_header_stamp_;
181  } else {
182  // If we've received messages but they don't have valid stamps, we can't
183  // actually determine the age, so just return an empty duration.
184  return ros::Duration(0.0);
185  }
186  }
187 
189  {
190  if (message_count_ < 1) {
191  return ros::DURATION_MAX;
192  } else {
194  }
195  }
196 
198  {
199  if (message_count_ < 1) {
200  return ros::DURATION_MAX;
201  } else {
202  return min_latency_;
203  }
204  }
205 
207  {
208  if (message_count_ < 1) {
209  return ros::DURATION_MAX;
210  } else {
211  return max_latency_;
212  }
213  }
214 
215  double meanFrequencyHz() const
216  {
217  if (message_count_ < 2) {
218  return 0.0;
219  } else {
220  return 1e9 / meanPeriod().toNSec();
221  }
222  }
223 
225  {
226  if (message_count_ < 2) {
227  return ros::DURATION_MAX;
228  } else {
230  }
231  }
232 
234  {
235  if (message_count_ < 2) {
236  return ros::DURATION_MAX;
237  } else {
238  return min_period_;
239  }
240  }
241 
243  {
244  if (message_count_ < 2) {
245  return ros::DURATION_MAX;
246  } else {
247  return max_period_;
248  }
249  }
250 
251  void setTimeout(const ros::Duration &time_out)
252  {
253  timeout_ = time_out;
254  in_timeout_ = false;
255  timeout_count_ = 0;
256  }
257 
258  bool blockTimeouts(bool block) {
259  if (block) {
260  in_timeout_ = false;
261  }
262 
263  bool old_block = blocking_timeout_;
264  blocking_timeout_ = block;
265  return old_block;
266  }
267 
268  bool timeoutsBlocked() const {
269  return blocking_timeout_;
270  }
271 
273  {
274  return timeout_;
275  }
276 
277  bool timeoutEnabled() const
278  {
279  return timeout_ > ros::Duration(0.0);
280  }
281 
282  bool inTimeout()
283  {
285  return in_timeout_;
286  }
287 
289  {
291  return timeout_count_;
292  }
293 }; // class SubscriberImpl
294 
295 struct TrueType
296 {
297  static const bool value = true;
298 };
299 
300 template<class M , class T>
302 {
303  T *obj_;
305 
306  public:
308  ros::NodeHandle &nh,
309  const std::string &topic,
310  uint32_t queue_size,
311  void(T::*fp)(const boost::shared_ptr< M const > &),
312  T *obj,
313  const ros::TransportHints &transport_hints)
314  {
315  unmapped_topic_ = topic;
316  mapped_topic_ = nh.resolveName(topic);
317 
319  ROS_INFO("Subscribing to '%s'.", mapped_topic_.c_str());
320  } else {
321  ROS_INFO("Subscribing to '%s' at '%s'.",
322  unmapped_topic_.c_str(),
323  mapped_topic_.c_str());
324  }
325 
326  callback_ = fp;
327  obj_ = obj;
328 
329  sub_ = nh.subscribe(mapped_topic_, queue_size,
330  &TypedSubscriberImpl::handleMessage<M>,
331  this,
332  transport_hints);
333  }
334 
335  // Handler for messages with headers
336  template <class M2>
337  typename boost::enable_if< ros::message_traits::HasHeader<M2>, void>::type
339  {
340  processHeader(msg->header.stamp);
341  (obj_->*callback_)(msg);
342  }
343 
344  // Handler for messages without headers
345  template <class M2>
346  typename boost::disable_if< ros::message_traits::HasHeader<M2>, void>::type
348  {
350  (obj_->*callback_)(msg);
351  }
352 }; // class TypedSubscriberImpl
353 
354 template<class M>
356 {
357  boost::function<void(const boost::shared_ptr< M const> &)> callback_;
358 
359 
360  public:
362  ros::NodeHandle &nh,
363  const std::string &topic,
364  uint32_t queue_size,
365  const boost::function<void(const boost::shared_ptr< M const> &)> &callback,
366  const ros::TransportHints &transport_hints)
367  {
368  unmapped_topic_ = topic;
369  mapped_topic_ = nh.resolveName(topic);
370 
372  ROS_INFO("Subscribing to '%s'.", mapped_topic_.c_str());
373  } else {
374  ROS_INFO("Subscribing to '%s' at '%s'.",
375  unmapped_topic_.c_str(),
376  mapped_topic_.c_str());
377  }
378 
379  callback_ = callback;
380 
381  sub_ = nh.subscribe(mapped_topic_, queue_size,
382  &BindSubscriberImpl::handleMessage<M>,
383  this,
384  transport_hints);
385  }
386 
387  // Handler for messages with headers
388  template <class M2>
389  typename boost::enable_if< ros::message_traits::HasHeader<M2>, void>::type
391  {
392  processHeader(msg->header.stamp);
393  callback_(msg);
394  }
395 
396  // Handler for messages without headers
397  template <class M2>
398  typename boost::disable_if< ros::message_traits::HasHeader<M2>, void>::type
400  {
402  callback_(msg);
403  }
404 }; // class BindSubscriberImpl
405 
406 template<class M>
408 {
410 
411  public:
413  ros::NodeHandle &nh,
414  const std::string &topic,
416  const ros::TransportHints &transport_hints)
417  {
418  unmapped_topic_ = topic;
419  mapped_topic_ = nh.resolveName(topic);
420 
422  ROS_INFO("Subscribing to '%s'.", mapped_topic_.c_str());
423  } else {
424  ROS_INFO("Subscribing to '%s' at '%s'.",
425  unmapped_topic_.c_str(),
426  mapped_topic_.c_str());
427  }
428 
429  dest_ = dest;
430 
431  sub_ = nh.subscribe(mapped_topic_, 2,
432  &StorageSubscriberImpl::handleMessage<M>,
433  this,
434  transport_hints);
435  }
436 
437  // Handler for messages with headers
438  template <class M2>
439  typename boost::enable_if< ros::message_traits::HasHeader<M2>, void>::type
441  {
442  processHeader(msg->header.stamp);
443  *dest_ = msg;
444  }
445 
446  // Handler for messages without headers
447  template <class M2>
448  typename boost::disable_if< ros::message_traits::HasHeader<M2>, void>::type
450  {
452  *dest_ = msg;
453  }
454 }; // class StorageSubscriberImpl
455 } // namespace swri
456 #endif // SWRI_ROSCPP_SUBSCRIBER_IMPL_H_
swri::SubscriberImpl::message_count_
int message_count_
Definition: subscriber_impl.h:45
swri::SubscriberImpl::messageCount
int messageCount() const
Definition: subscriber_impl.h:170
msg
msg
DurationBase< Duration >::fromSec
Duration & fromSec(double t)
swri::BindSubscriberImpl
Definition: subscriber_impl.h:355
swri::SubscriberImpl::~SubscriberImpl
virtual ~SubscriberImpl()
Definition: subscriber_impl.h:144
swri::SubscriberImpl::last_header_stamp_
ros::Time last_header_stamp_
Definition: subscriber_impl.h:47
swri::SubscriberImpl::sub_
ros::Subscriber sub_
Definition: subscriber_impl.h:41
boost::shared_ptr< M const >
swri::SubscriberImpl::in_timeout_
bool in_timeout_
Definition: subscriber_impl.h:59
swri::SubscriberImpl::min_latency_
ros::Duration min_latency_
Definition: subscriber_impl.h:51
swri::SubscriberImpl::processHeader
void processHeader(const ros::Time &stamp)
Definition: subscriber_impl.h:63
swri::SubscriberImpl::max_period_
ros::Duration max_period_
Definition: subscriber_impl.h:56
swri::SubscriberImpl
Definition: subscriber_impl.h:38
swri::SubscriberImpl::mapped_topic_
std::string mapped_topic_
Definition: subscriber_impl.h:43
swri::TypedSubscriberImpl::obj_
T * obj_
Definition: subscriber_impl.h:303
swri::StorageSubscriberImpl::handleMessage
boost::disable_if< ros::message_traits::HasHeader< M2 >, void >::type handleMessage(const boost::shared_ptr< M const > &msg)
Definition: subscriber_impl.h:449
swri::SubscriberImpl::maxPeriod
ros::Duration maxPeriod() const
Definition: subscriber_impl.h:242
ros::TransportHints
swri::SubscriberImpl::blockTimeouts
bool blockTimeouts(bool block)
Definition: subscriber_impl.h:258
swri::SubscriberImpl::minLatency
ros::Duration minLatency() const
Definition: subscriber_impl.h:197
ros::DURATION_MAX
const ROSTIME_DECL Duration DURATION_MAX
swri::BindSubscriberImpl::BindSubscriberImpl
BindSubscriberImpl(ros::NodeHandle &nh, const std::string &topic, uint32_t queue_size, const boost::function< void(const boost::shared_ptr< M const > &)> &callback, const ros::TransportHints &transport_hints)
Definition: subscriber_impl.h:361
swri::TypedSubscriberImpl::TypedSubscriberImpl
TypedSubscriberImpl(ros::NodeHandle &nh, const std::string &topic, uint32_t queue_size, void(T::*fp)(const boost::shared_ptr< M const > &), T *obj, const ros::TransportHints &transport_hints)
Definition: subscriber_impl.h:307
swri::SubscriberImpl::inTimeout
bool inTimeout()
Definition: subscriber_impl.h:282
ros::Subscriber::getNumPublishers
uint32_t getNumPublishers() const
swri::SubscriberImpl::total_periods_
ros::Duration total_periods_
Definition: subscriber_impl.h:54
swri::SubscriberImpl::timeout
ros::Duration timeout() const
Definition: subscriber_impl.h:272
ros::NodeHandle::resolveName
std::string resolveName(const std::string &name, bool remap=true) const
ros::Time::isValid
static bool isValid()
swri::TypedSubscriberImpl::handleMessage
boost::enable_if< ros::message_traits::HasHeader< M2 >, void >::type handleMessage(const boost::shared_ptr< M const > &msg)
Definition: subscriber_impl.h:338
swri::SubscriberImpl::checkTimeout
void checkTimeout(const ros::Time &now)
Definition: subscriber_impl.h:110
swri::SubscriberImpl::age
ros::Duration age(const ros::Time &now) const
Definition: subscriber_impl.h:175
swri::SubscriberImpl::minPeriod
ros::Duration minPeriod() const
Definition: subscriber_impl.h:233
swri::SubscriberImpl::SubscriberImpl
SubscriberImpl()
Definition: subscriber_impl.h:132
swri::TrueType::value
static const bool value
Definition: subscriber_impl.h:297
swri::SubscriberImpl::timeoutCount
int timeoutCount()
Definition: subscriber_impl.h:288
swri::BindSubscriberImpl::callback_
boost::function< void(const boost::shared_ptr< M const > &)> callback_
Definition: subscriber_impl.h:357
TimeBase< Time, Duration >::isZero
bool isZero() const
subscriber.h
swri::SubscriberImpl::meanPeriod
ros::Duration meanPeriod() const
Definition: subscriber_impl.h:224
swri::SubscriberImpl::resetStatistics
void resetStatistics()
Definition: subscriber_impl.h:163
swri::SubscriberImpl::maxLatency
ros::Duration maxLatency() const
Definition: subscriber_impl.h:206
swri::SubscriberImpl::unmappedTopic
const std::string & unmappedTopic() const
Definition: subscriber_impl.h:153
swri::TrueType
Definition: subscriber_impl.h:295
ros::NodeHandle::subscribe
Subscriber subscribe(const std::string &topic, uint32_t queue_size, const boost::function< void(C)> &callback, const VoidConstPtr &tracked_object=VoidConstPtr(), const TransportHints &transport_hints=TransportHints())
swri::SubscriberImpl::total_latency_
ros::Duration total_latency_
Definition: subscriber_impl.h:50
swri::SubscriberImpl::timeout_
ros::Duration timeout_
Definition: subscriber_impl.h:58
swri::StorageSubscriberImpl::handleMessage
boost::enable_if< ros::message_traits::HasHeader< M2 >, void >::type handleMessage(const boost::shared_ptr< M const > &msg)
Definition: subscriber_impl.h:440
swri::SubscriberImpl::blocking_timeout_
bool blocking_timeout_
Definition: subscriber_impl.h:61
swri::SubscriberImpl::min_period_
ros::Duration min_period_
Definition: subscriber_impl.h:55
swri::TypedSubscriberImpl::handleMessage
boost::disable_if< ros::message_traits::HasHeader< M2 >, void >::type handleMessage(const boost::shared_ptr< M const > &msg)
Definition: subscriber_impl.h:347
swri::SubscriberImpl::setTimeout
void setTimeout(const ros::Duration &time_out)
Definition: subscriber_impl.h:251
ros::Time
swri::SubscriberImpl::timeout_count_
int timeout_count_
Definition: subscriber_impl.h:60
DurationBase< Duration >::toNSec
int64_t toNSec() const
swri::SubscriberImpl::max_latency_
ros::Duration max_latency_
Definition: subscriber_impl.h:52
swri::SubscriberImpl::timeoutEnabled
bool timeoutEnabled() const
Definition: subscriber_impl.h:277
swri
Definition: dynamic_parameters.h:51
swri::TypedSubscriberImpl
Definition: subscriber_impl.h:301
swri::SubscriberImpl::unmapped_topic_
std::string unmapped_topic_
Definition: subscriber_impl.h:42
swri::TypedSubscriberImpl::callback_
void(T::* callback_)(const boost::shared_ptr< M const > &)
Definition: subscriber_impl.h:304
swri::StorageSubscriberImpl
Definition: subscriber_impl.h:407
swri::SubscriberImpl::mappedTopic
const std::string & mappedTopic() const
Definition: subscriber_impl.h:148
swri::BindSubscriberImpl::handleMessage
boost::enable_if< ros::message_traits::HasHeader< M2 >, void >::type handleMessage(const boost::shared_ptr< M const > &msg)
Definition: subscriber_impl.h:390
DurationBase< Duration >::toSec
double toSec() const
swri::StorageSubscriberImpl::dest_
boost::shared_ptr< M const > * dest_
Definition: subscriber_impl.h:409
swri::SubscriberImpl::meanLatency
ros::Duration meanLatency() const
Definition: subscriber_impl.h:188
swri::SubscriberImpl::meanFrequencyHz
double meanFrequencyHz() const
Definition: subscriber_impl.h:215
swri::SubscriberImpl::numPublishers
int numPublishers() const
Definition: subscriber_impl.h:158
ROS_INFO
#define ROS_INFO(...)
ros::Duration
swri::BindSubscriberImpl::handleMessage
boost::disable_if< ros::message_traits::HasHeader< M2 >, void >::type handleMessage(const boost::shared_ptr< M const > &msg)
Definition: subscriber_impl.h:399
ros::NodeHandle
ros::Subscriber
swri::StorageSubscriberImpl::StorageSubscriberImpl
StorageSubscriberImpl(ros::NodeHandle &nh, const std::string &topic, boost::shared_ptr< M const > *dest, const ros::TransportHints &transport_hints)
Definition: subscriber_impl.h:412
swri::SubscriberImpl::timeoutsBlocked
bool timeoutsBlocked() const
Definition: subscriber_impl.h:268
swri::SubscriberImpl::last_receive_time_
ros::Time last_receive_time_
Definition: subscriber_impl.h:48
ros::Time::now
static Time now()


swri_roscpp
Author(s): P. J. Reed
autogenerated on Fri Aug 2 2024 08:39:15