subscriber_impl.h
Go to the documentation of this file.
1 // *****************************************************************************
2 //
3 // Copyright (c) 2015, Southwest Research Institute® (SwRI®)
4 // All rights reserved.
5 //
6 // Redistribution and use in source and binary forms, with or without
7 // modification, are permitted provided that the following conditions are met:
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above copyright
11 // notice, this list of conditions and the following disclaimer in the
12 // documentation and/or other materials provided with the distribution.
13 // * Neither the name of Southwest Research Institute® (SwRI®) nor the
14 // names of its contributors may be used to endorse or promote products
15 // derived from this software without specific prior written permission.
16 //
17 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 // ARE DISCLAIMED. IN NO EVENT SHALL SOUTHWEST RESEARCH INSTITUTE BE LIABLE FOR ANY
21 // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
22 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
23 // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
24 // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
26 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 //
28 // *****************************************************************************
29 #ifndef SWRI_ROSCPP_SUBSCRIBER_IMPL_H_
30 #define SWRI_ROSCPP_SUBSCRIBER_IMPL_H_
31 
32 #include <std_msgs/Header.h>
33 #include <ros/subscriber.h>
34 
35 namespace swri
36 {
37 class Subscriber;
39 {
40  protected:
42  std::string unmapped_topic_;
43  std::string mapped_topic_;
44 
46 
49 
53 
57 
62 
63  void processHeader(const ros::Time &stamp)
64  {
65  ros::Time now = ros::Time::now();
66 
67  // Check for timeouts so that we can correctly increment the
68  // timeout count.
69  checkTimeout(now);
70 
71  // Do not use this message for statistics if it is arriving out of order
72  if (stamp >= last_header_stamp_) {
73  message_count_++;
74 
75  if (!stamp.isZero() && !now.isZero()) {
76  ros::Duration latency = now - stamp;
77  if (message_count_ == 1) {
78  min_latency_ = latency;
79  max_latency_ = latency;
80  total_latency_ = latency;
81  } else {
82  min_latency_ = std::min(min_latency_, latency);
83  max_latency_ = std::max(max_latency_, latency);
84  double latency_weight_ = 0.1;
85  total_latency_.fromSec(latency_weight_*latency.toSec() + (1-latency_weight_)*total_latency_.toSec());
86  }
87  }
88 
89  if (message_count_ > 1) {
90  ros::Duration period = now - last_receive_time_;
91  if (message_count_ == 2) {
92  min_period_ = period;
93  max_period_ = period;
94  total_periods_ = period;
95  } else if (message_count_ > 2) {
96  min_period_ = std::min(min_period_, period);
97  max_period_ = std::max(max_period_, period);
98  total_periods_ += period;
99  }
100  }
101  }
102  // Reset the timeout condition to false.
103  in_timeout_ = false;
104 
105  last_receive_time_ = now;
106  last_header_stamp_ = stamp;
107  }
108 
109  void checkTimeout(const ros::Time &now)
110  {
111  if (blocking_timeout_) {
112  return;
113  }
114 
115  if (in_timeout_ || timeout_ <= ros::Duration(0.0)) {
116  return;
117  }
118 
119  if (message_count_ == 0) {
120  return;
121  }
122 
123  if (age(now) > timeout_) {
124  in_timeout_ = true;
125  timeout_count_++;
126  }
127  }
128 
129 
130  public:
132  unmapped_topic_("N/A"),
133  mapped_topic_("N/A"),
134  message_count_(0),
135  timeout_(-1.0),
136  in_timeout_(false),
137  timeout_count_(0),
138  blocking_timeout_(false)
139  {
140  resetStatistics();
141  }
142 
143  const std::string& mappedTopic() const
144  {
145  return mapped_topic_;
146  }
147 
148  const std::string& unmappedTopic() const
149  {
150  return unmapped_topic_;
151  }
152 
153  int numPublishers() const
154  {
155  return sub_.getNumPublishers();
156  }
157 
159  {
160  message_count_ = 0;
161  in_timeout_ = false;
162  timeout_count_ = 0;
163  }
164 
165  int messageCount() const
166  {
167  return message_count_;
168  }
169 
170  ros::Duration age(const ros::Time &now) const
171  {
172  if (message_count_ < 1) {
173  return ros::DURATION_MAX;
174  } else if (last_header_stamp_.isValid()) {
175  return now - last_header_stamp_;
176  } else {
177  // If we've received messages but they don't have valid stamps, we can't
178  // actually determine the age, so just return an empty duration.
179  return ros::Duration(0.0);
180  }
181  }
182 
184  {
185  if (message_count_ < 1) {
186  return ros::DURATION_MAX;
187  } else {
188  return ros::Duration(total_latency_.toSec() / message_count_);
189  }
190  }
191 
193  {
194  if (message_count_ < 1) {
195  return ros::DURATION_MAX;
196  } else {
197  return min_latency_;
198  }
199  }
200 
202  {
203  if (message_count_ < 1) {
204  return ros::DURATION_MAX;
205  } else {
206  return max_latency_;
207  }
208  }
209 
210  double meanFrequencyHz() const
211  {
212  if (message_count_ < 2) {
213  return 0.0;
214  } else {
215  return 1e9 / meanPeriod().toNSec();
216  }
217  }
218 
220  {
221  if (message_count_ < 2) {
222  return ros::DURATION_MAX;
223  } else {
224  return ros::Duration(total_periods_.toSec() / (message_count_ - 1));
225  }
226  }
227 
229  {
230  if (message_count_ < 2) {
231  return ros::DURATION_MAX;
232  } else {
233  return min_period_;
234  }
235  }
236 
238  {
239  if (message_count_ < 2) {
240  return ros::DURATION_MAX;
241  } else {
242  return max_period_;
243  }
244  }
245 
246  void setTimeout(const ros::Duration &time_out)
247  {
248  timeout_ = time_out;
249  in_timeout_ = false;
250  timeout_count_ = 0;
251  }
252 
253  bool blockTimeouts(bool block) {
254  if (block) {
255  in_timeout_ = false;
256  }
257 
258  bool old_block = blocking_timeout_;
259  blocking_timeout_ = block;
260  return old_block;
261  }
262 
263  bool timeoutsBlocked() const {
264  return blocking_timeout_;
265  }
266 
268  {
269  return timeout_;
270  }
271 
272  bool timeoutEnabled() const
273  {
274  return timeout_ > ros::Duration(0.0);
275  }
276 
277  bool inTimeout()
278  {
280  return in_timeout_;
281  }
282 
284  {
286  return timeout_count_;
287  }
288 }; // class SubscriberImpl
289 
290 struct TrueType
291 {
292  static const bool value = true;
293 };
294 
295 template<class M , class T>
297 {
298  T *obj_;
299  void (T::*callback_)(const boost::shared_ptr< M const > &);
300 
301  public:
303  ros::NodeHandle &nh,
304  const std::string &topic,
305  uint32_t queue_size,
306  void(T::*fp)(const boost::shared_ptr< M const > &),
307  T *obj,
308  const ros::TransportHints &transport_hints)
309  {
310  unmapped_topic_ = topic;
311  mapped_topic_ = nh.resolveName(topic);
312 
314  ROS_INFO("Subscribing to '%s'.", mapped_topic_.c_str());
315  } else {
316  ROS_INFO("Subscribing to '%s' at '%s'.",
317  unmapped_topic_.c_str(),
318  mapped_topic_.c_str());
319  }
320 
321  callback_ = fp;
322  obj_ = obj;
323 
324  sub_ = nh.subscribe(mapped_topic_, queue_size,
325  &TypedSubscriberImpl::handleMessage<M>,
326  this,
327  transport_hints);
328  }
329 
330  // Handler for messages with headers
331  template <class M2>
332  typename boost::enable_if< ros::message_traits::HasHeader<M2>, void>::type
334  {
335  processHeader(msg->header.stamp);
336  (obj_->*callback_)(msg);
337  }
338 
339  // Handler for messages without headers
340  template <class M2>
341  typename boost::disable_if< ros::message_traits::HasHeader<M2>, void>::type
343  {
345  (obj_->*callback_)(msg);
346  }
347 }; // class TypedSubscriberImpl
348 
349 template<class M>
351 {
352  boost::function<void(const boost::shared_ptr< M const> &)> callback_;
353 
354 
355  public:
357  ros::NodeHandle &nh,
358  const std::string &topic,
359  uint32_t queue_size,
360  const boost::function<void(const boost::shared_ptr< M const> &)> &callback,
361  const ros::TransportHints &transport_hints)
362  {
363  unmapped_topic_ = topic;
364  mapped_topic_ = nh.resolveName(topic);
365 
367  ROS_INFO("Subscribing to '%s'.", mapped_topic_.c_str());
368  } else {
369  ROS_INFO("Subscribing to '%s' at '%s'.",
370  unmapped_topic_.c_str(),
371  mapped_topic_.c_str());
372  }
373 
374  callback_ = callback;
375 
376  sub_ = nh.subscribe(mapped_topic_, queue_size,
377  &BindSubscriberImpl::handleMessage<M>,
378  this,
379  transport_hints);
380  }
381 
382  // Handler for messages with headers
383  template <class M2>
384  typename boost::enable_if< ros::message_traits::HasHeader<M2>, void>::type
386  {
387  processHeader(msg->header.stamp);
388  callback_(msg);
389  }
390 
391  // Handler for messages without headers
392  template <class M2>
393  typename boost::disable_if< ros::message_traits::HasHeader<M2>, void>::type
395  {
397  callback_(msg);
398  }
399 }; // class BindSubscriberImpl
400 
401 template<class M>
403 {
405 
406  public:
408  ros::NodeHandle &nh,
409  const std::string &topic,
411  const ros::TransportHints &transport_hints)
412  {
413  unmapped_topic_ = topic;
414  mapped_topic_ = nh.resolveName(topic);
415 
417  ROS_INFO("Subscribing to '%s'.", mapped_topic_.c_str());
418  } else {
419  ROS_INFO("Subscribing to '%s' at '%s'.",
420  unmapped_topic_.c_str(),
421  mapped_topic_.c_str());
422  }
423 
424  dest_ = dest;
425 
426  sub_ = nh.subscribe(mapped_topic_, 2,
427  &StorageSubscriberImpl::handleMessage<M>,
428  this,
429  transport_hints);
430  }
431 
432  // Handler for messages with headers
433  template <class M2>
434  typename boost::enable_if< ros::message_traits::HasHeader<M2>, void>::type
436  {
437  processHeader(msg->header.stamp);
438  *dest_ = msg;
439  }
440 
441  // Handler for messages without headers
442  template <class M2>
443  typename boost::disable_if< ros::message_traits::HasHeader<M2>, void>::type
445  {
447  *dest_ = msg;
448  }
449 }; // class StorageSubscriberImpl
450 } // namespace swri
451 #endif // SWRI_ROSCPP_SUBSCRIBER_IMPL_H_
boost::shared_ptr< M const > * dest_
int numPublishers() const
ros::Duration total_periods_
ros::Duration max_latency_
ros::Duration meanPeriod() const
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
std::string resolveName(const std::string &name, bool remap=true) const
boost::enable_if< ros::message_traits::HasHeader< M2 >, void >::type handleMessage(const boost::shared_ptr< M const > &msg)
ros::Duration minPeriod() const
TypedSubscriberImpl(ros::NodeHandle &nh, const std::string &topic, uint32_t queue_size, void(T::*fp)(const boost::shared_ptr< M const > &), T *obj, const ros::TransportHints &transport_hints)
boost::disable_if< ros::message_traits::HasHeader< M2 >, void >::type handleMessage(const boost::shared_ptr< M const > &msg)
double meanFrequencyHz() const
ros::Duration min_period_
StorageSubscriberImpl(ros::NodeHandle &nh, const std::string &topic, boost::shared_ptr< M const > *dest, const ros::TransportHints &transport_hints)
bool timeoutEnabled() const
ros::Duration meanLatency() const
BindSubscriberImpl(ros::NodeHandle &nh, const std::string &topic, uint32_t queue_size, const boost::function< void(const boost::shared_ptr< M const > &)> &callback, const ros::TransportHints &transport_hints)
const std::string & mappedTopic() const
ros::Duration min_latency_
void processHeader(const ros::Time &stamp)
void setTimeout(const ros::Duration &time_out)
ros::Duration maxLatency() const
boost::disable_if< ros::message_traits::HasHeader< M2 >, void >::type handleMessage(const boost::shared_ptr< M const > &msg)
boost::enable_if< ros::message_traits::HasHeader< M2 >, void >::type handleMessage(const boost::shared_ptr< M const > &msg)
boost::enable_if< ros::message_traits::HasHeader< M2 >, void >::type handleMessage(const boost::shared_ptr< M const > &msg)
ros::Duration age(const ros::Time &now) const
Duration & fromSec(double t)
bool blockTimeouts(bool block)
static bool isValid()
std::string mapped_topic_
#define ROS_INFO(...)
int64_t toNSec() const
std::string unmapped_topic_
uint32_t getNumPublishers() const
const std::string & unmappedTopic() const
boost::disable_if< ros::message_traits::HasHeader< M2 >, void >::type handleMessage(const boost::shared_ptr< M const > &msg)
void checkTimeout(const ros::Time &now)
ros::Duration minLatency() const
ros::Time last_receive_time_
ros::Subscriber sub_
ros::Time last_header_stamp_
int messageCount() const
static Time now()
ros::Duration timeout() const
bool timeoutsBlocked() const
ros::Duration max_period_
ROSTIME_DECL const Duration DURATION_MAX
ros::Duration maxPeriod() const
ros::Duration total_latency_
ros::Duration timeout_


swri_roscpp
Author(s):
autogenerated on Tue Apr 6 2021 02:50:35