subscriber_impl.h
Go to the documentation of this file.
1 // *****************************************************************************
2 //
3 // Copyright (c) 2015, Southwest Research Institute® (SwRI®)
4 // All rights reserved.
5 //
6 // Redistribution and use in source and binary forms, with or without
7 // modification, are permitted provided that the following conditions are met:
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above copyright
11 // notice, this list of conditions and the following disclaimer in the
12 // documentation and/or other materials provided with the distribution.
13 // * Neither the name of Southwest Research Institute® (SwRI®) nor the
14 // names of its contributors may be used to endorse or promote products
15 // derived from this software without specific prior written permission.
16 //
17 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 // ARE DISCLAIMED. IN NO EVENT SHALL SOUTHWEST RESEARCH INSTITUTE BE LIABLE FOR ANY
21 // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
22 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
23 // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
24 // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
26 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 //
28 // *****************************************************************************
29 #ifndef SWRI_ROSCPP_SUBSCRIBER_IMPL_H_
30 #define SWRI_ROSCPP_SUBSCRIBER_IMPL_H_
31 
32 #include <std_msgs/Header.h>
33 #include <ros/subscriber.h>
34 
35 namespace swri
36 {
37 class Subscriber;
39 {
40  protected:
42  std::string unmapped_topic_;
43  std::string mapped_topic_;
44 
46 
49 
53 
57 
62 
63  void processHeader(const ros::Time &stamp)
64  {
65  ros::Time now = ros::Time::now();
66 
67  // Check for timeouts so that we can correctly increment the
68  // timeout count.
69  checkTimeout(now);
70 
71  // Do not use this message for statistics if it is arriving out of order
72  if (stamp >= last_header_stamp_) {
73  message_count_++;
74 
75  if (!stamp.isZero() && !now.isZero()) {
76  ros::Duration latency = now - stamp;
77  if (message_count_ == 1) {
78  min_latency_ = latency;
79  max_latency_ = latency;
80  total_latency_ = latency;
81  } else {
82  min_latency_ = std::min(min_latency_, latency);
83  max_latency_ = std::max(max_latency_, latency);
84  double latency_weight_ = 0.1;
85  total_latency_.fromSec(latency_weight_*latency.toSec() + (1-latency_weight_)*total_latency_.toSec());
86  }
87  }
88 
89  if (message_count_ > 1) {
90  ros::Duration period = now - last_receive_time_;
91  if (message_count_ == 2) {
92  min_period_ = period;
93  max_period_ = period;
94  total_periods_ = period;
95  } else if (message_count_ > 2) {
96  min_period_ = std::min(min_period_, period);
97  max_period_ = std::max(max_period_, period);
98  total_periods_ += period;
99  }
100  }
101  }
102  // Reset the timeout condition to false.
103  in_timeout_ = false;
104 
105  last_receive_time_ = now;
106  last_header_stamp_ = stamp;
107  }
108 
109  void checkTimeout(const ros::Time &now)
110  {
111  if (blocking_timeout_) {
112  return;
113  }
114 
115  if (in_timeout_ || timeout_ <= ros::Duration(0.0)) {
116  return;
117  }
118 
119  if (message_count_ == 0) {
120  return;
121  }
122 
123  if (age(now) > timeout_) {
124  in_timeout_ = true;
125  timeout_count_++;
126  }
127  }
128 
129 
130  public:
132  unmapped_topic_("N/A"),
133  mapped_topic_("N/A"),
134  message_count_(0),
135  timeout_(-1.0),
136  in_timeout_(false),
137  timeout_count_(0),
138  blocking_timeout_(false)
139  {
140  resetStatistics();
141  }
142 
143  virtual ~SubscriberImpl()
144  {
145  }
146 
147  const std::string& mappedTopic() const
148  {
149  return mapped_topic_;
150  }
151 
152  const std::string& unmappedTopic() const
153  {
154  return unmapped_topic_;
155  }
156 
157  int numPublishers() const
158  {
159  return sub_.getNumPublishers();
160  }
161 
163  {
164  message_count_ = 0;
165  in_timeout_ = false;
166  timeout_count_ = 0;
167  }
168 
169  int messageCount() const
170  {
171  return message_count_;
172  }
173 
174  ros::Duration age(const ros::Time &now) const
175  {
176  if (message_count_ < 1) {
177  return ros::DURATION_MAX;
178  } else if (last_header_stamp_.isValid()) {
179  return now - last_header_stamp_;
180  } else {
181  // If we've received messages but they don't have valid stamps, we can't
182  // actually determine the age, so just return an empty duration.
183  return ros::Duration(0.0);
184  }
185  }
186 
188  {
189  if (message_count_ < 1) {
190  return ros::DURATION_MAX;
191  } else {
192  return ros::Duration(total_latency_.toSec() / message_count_);
193  }
194  }
195 
197  {
198  if (message_count_ < 1) {
199  return ros::DURATION_MAX;
200  } else {
201  return min_latency_;
202  }
203  }
204 
206  {
207  if (message_count_ < 1) {
208  return ros::DURATION_MAX;
209  } else {
210  return max_latency_;
211  }
212  }
213 
214  double meanFrequencyHz() const
215  {
216  if (message_count_ < 2) {
217  return 0.0;
218  } else {
219  return 1e9 / meanPeriod().toNSec();
220  }
221  }
222 
224  {
225  if (message_count_ < 2) {
226  return ros::DURATION_MAX;
227  } else {
228  return ros::Duration(total_periods_.toSec() / (message_count_ - 1));
229  }
230  }
231 
233  {
234  if (message_count_ < 2) {
235  return ros::DURATION_MAX;
236  } else {
237  return min_period_;
238  }
239  }
240 
242  {
243  if (message_count_ < 2) {
244  return ros::DURATION_MAX;
245  } else {
246  return max_period_;
247  }
248  }
249 
250  void setTimeout(const ros::Duration &time_out)
251  {
252  timeout_ = time_out;
253  in_timeout_ = false;
254  timeout_count_ = 0;
255  }
256 
257  bool blockTimeouts(bool block) {
258  if (block) {
259  in_timeout_ = false;
260  }
261 
262  bool old_block = blocking_timeout_;
263  blocking_timeout_ = block;
264  return old_block;
265  }
266 
267  bool timeoutsBlocked() const {
268  return blocking_timeout_;
269  }
270 
272  {
273  return timeout_;
274  }
275 
276  bool timeoutEnabled() const
277  {
278  return timeout_ > ros::Duration(0.0);
279  }
280 
281  bool inTimeout()
282  {
284  return in_timeout_;
285  }
286 
288  {
290  return timeout_count_;
291  }
292 }; // class SubscriberImpl
293 
294 struct TrueType
295 {
296  static const bool value = true;
297 };
298 
299 template<class M , class T>
301 {
302  T *obj_;
303  void (T::*callback_)(const boost::shared_ptr< M const > &);
304 
305  public:
307  ros::NodeHandle &nh,
308  const std::string &topic,
309  uint32_t queue_size,
310  void(T::*fp)(const boost::shared_ptr< M const > &),
311  T *obj,
312  const ros::TransportHints &transport_hints)
313  {
314  unmapped_topic_ = topic;
315  mapped_topic_ = nh.resolveName(topic);
316 
318  ROS_INFO("Subscribing to '%s'.", mapped_topic_.c_str());
319  } else {
320  ROS_INFO("Subscribing to '%s' at '%s'.",
321  unmapped_topic_.c_str(),
322  mapped_topic_.c_str());
323  }
324 
325  callback_ = fp;
326  obj_ = obj;
327 
328  sub_ = nh.subscribe(mapped_topic_, queue_size,
329  &TypedSubscriberImpl::handleMessage<M>,
330  this,
331  transport_hints);
332  }
333 
334  // Handler for messages with headers
335  template <class M2>
336  typename boost::enable_if< ros::message_traits::HasHeader<M2>, void>::type
338  {
339  processHeader(msg->header.stamp);
340  (obj_->*callback_)(msg);
341  }
342 
343  // Handler for messages without headers
344  template <class M2>
345  typename boost::disable_if< ros::message_traits::HasHeader<M2>, void>::type
347  {
349  (obj_->*callback_)(msg);
350  }
351 }; // class TypedSubscriberImpl
352 
353 template<class M>
355 {
356  boost::function<void(const boost::shared_ptr< M const> &)> callback_;
357 
358 
359  public:
361  ros::NodeHandle &nh,
362  const std::string &topic,
363  uint32_t queue_size,
364  const boost::function<void(const boost::shared_ptr< M const> &)> &callback,
365  const ros::TransportHints &transport_hints)
366  {
367  unmapped_topic_ = topic;
368  mapped_topic_ = nh.resolveName(topic);
369 
371  ROS_INFO("Subscribing to '%s'.", mapped_topic_.c_str());
372  } else {
373  ROS_INFO("Subscribing to '%s' at '%s'.",
374  unmapped_topic_.c_str(),
375  mapped_topic_.c_str());
376  }
377 
378  callback_ = callback;
379 
380  sub_ = nh.subscribe(mapped_topic_, queue_size,
381  &BindSubscriberImpl::handleMessage<M>,
382  this,
383  transport_hints);
384  }
385 
386  // Handler for messages with headers
387  template <class M2>
388  typename boost::enable_if< ros::message_traits::HasHeader<M2>, void>::type
390  {
391  processHeader(msg->header.stamp);
392  callback_(msg);
393  }
394 
395  // Handler for messages without headers
396  template <class M2>
397  typename boost::disable_if< ros::message_traits::HasHeader<M2>, void>::type
399  {
401  callback_(msg);
402  }
403 }; // class BindSubscriberImpl
404 
405 template<class M>
407 {
409 
410  public:
412  ros::NodeHandle &nh,
413  const std::string &topic,
415  const ros::TransportHints &transport_hints)
416  {
417  unmapped_topic_ = topic;
418  mapped_topic_ = nh.resolveName(topic);
419 
421  ROS_INFO("Subscribing to '%s'.", mapped_topic_.c_str());
422  } else {
423  ROS_INFO("Subscribing to '%s' at '%s'.",
424  unmapped_topic_.c_str(),
425  mapped_topic_.c_str());
426  }
427 
428  dest_ = dest;
429 
430  sub_ = nh.subscribe(mapped_topic_, 2,
431  &StorageSubscriberImpl::handleMessage<M>,
432  this,
433  transport_hints);
434  }
435 
436  // Handler for messages with headers
437  template <class M2>
438  typename boost::enable_if< ros::message_traits::HasHeader<M2>, void>::type
440  {
441  processHeader(msg->header.stamp);
442  *dest_ = msg;
443  }
444 
445  // Handler for messages without headers
446  template <class M2>
447  typename boost::disable_if< ros::message_traits::HasHeader<M2>, void>::type
449  {
451  *dest_ = msg;
452  }
453 }; // class StorageSubscriberImpl
454 } // namespace swri
455 #endif // SWRI_ROSCPP_SUBSCRIBER_IMPL_H_
boost::shared_ptr< M const > * dest_
boost::enable_if< ros::message_traits::HasHeader< M2 >, void >::type handleMessage(const boost::shared_ptr< M const > &msg)
ros::Duration total_periods_
ros::Duration max_latency_
boost::disable_if< ros::message_traits::HasHeader< M2 >, void >::type handleMessage(const boost::shared_ptr< M const > &msg)
double meanFrequencyHz() const
ros::Duration age(const ros::Time &now) const
ros::Duration minPeriod() const
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
boost::disable_if< ros::message_traits::HasHeader< M2 >, void >::type handleMessage(const boost::shared_ptr< M const > &msg)
ros::Duration meanLatency() const
ros::Duration meanPeriod() const
uint32_t getNumPublishers() const
ros::Duration min_period_
boost::disable_if< ros::message_traits::HasHeader< M2 >, void >::type handleMessage(const boost::shared_ptr< M const > &msg)
ros::Duration min_latency_
void processHeader(const ros::Time &stamp)
TypedSubscriberImpl(ros::NodeHandle &nh, const std::string &topic, uint32_t queue_size, void(T::*fp)(const boost::shared_ptr< M const > &), T *obj, const ros::TransportHints &transport_hints)
void setTimeout(const ros::Duration &time_out)
Duration & fromSec(double t)
bool blockTimeouts(bool block)
ros::Duration minLatency() const
static bool isValid()
std::string mapped_topic_
#define ROS_INFO(...)
bool timeoutEnabled() const
std::string unmapped_topic_
ros::Duration maxPeriod() const
std::string resolveName(const std::string &name, bool remap=true) const
const std::string & mappedTopic() const
bool timeoutsBlocked() const
ros::Duration timeout() const
boost::enable_if< ros::message_traits::HasHeader< M2 >, void >::type handleMessage(const boost::shared_ptr< M const > &msg)
void checkTimeout(const ros::Time &now)
boost::enable_if< ros::message_traits::HasHeader< M2 >, void >::type handleMessage(const boost::shared_ptr< M const > &msg)
BindSubscriberImpl(ros::NodeHandle &nh, const std::string &topic, uint32_t queue_size, const boost::function< void(const boost::shared_ptr< M const > &)> &callback, const ros::TransportHints &transport_hints)
ros::Time last_receive_time_
ros::Subscriber sub_
ros::Time last_header_stamp_
int64_t toNSec() const
static Time now()
ros::Duration maxLatency() const
ros::Duration max_period_
const std::string & unmappedTopic() const
ROSTIME_DECL const Duration DURATION_MAX
StorageSubscriberImpl(ros::NodeHandle &nh, const std::string &topic, boost::shared_ptr< M const > *dest, const ros::TransportHints &transport_hints)
ros::Duration total_latency_
ros::Duration timeout_


swri_roscpp
Author(s):
autogenerated on Sat Jan 21 2023 03:13:16