subscriber_impl.h
Go to the documentation of this file.
1 // *****************************************************************************
2 //
3 // Copyright (c) 2015, Southwest Research Institute® (SwRI®)
4 // All rights reserved.
5 //
6 // Redistribution and use in source and binary forms, with or without
7 // modification, are permitted provided that the following conditions are met:
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above copyright
11 // notice, this list of conditions and the following disclaimer in the
12 // documentation and/or other materials provided with the distribution.
13 // * Neither the name of Southwest Research Institute® (SwRI®) nor the
14 // names of its contributors may be used to endorse or promote products
15 // derived from this software without specific prior written permission.
16 //
17 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 // ARE DISCLAIMED. IN NO EVENT SHALL SOUTHWEST RESEARCH INSTITUTE BE LIABLE FOR ANY
21 // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
22 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
23 // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
24 // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
26 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 //
28 // *****************************************************************************
29 #ifndef SWRI_ROSCPP_SUBSCRIBER_IMPL_H_
30 #define SWRI_ROSCPP_SUBSCRIBER_IMPL_H_
31 
32 #include <std_msgs/Header.h>
33 #include <ros/subscriber.h>
34 
35 namespace swri
36 {
37 class Subscriber;
39 {
40  protected:
42  std::string unmapped_topic_;
43  std::string mapped_topic_;
44 
46 
49 
53 
57 
62 
63  void processHeader(const ros::Time &stamp)
64  {
65  ros::Time now = ros::Time::now();
66 
67  // Check for timeouts so that we can correctly increment the
68  // timeout count.
69  checkTimeout(now);
70 
71  // Do not use this message for statistics if it is arriving out of order
72  if (stamp >= last_header_stamp_) {
73  message_count_++;
74 
75  if (!stamp.isZero()) {
76  ros::Duration latency = now - stamp;
77  if (message_count_ == 1) {
78  min_latency_ = latency;
79  max_latency_ = latency;
80  total_latency_ = latency;
81  } else {
82  min_latency_ = std::min(min_latency_, latency);
83  max_latency_ = std::max(max_latency_, latency);
84  total_latency_ += latency;
85  }
86  }
87 
88  if (message_count_ > 1) {
89  ros::Duration period = now - last_receive_time_;
90  if (message_count_ == 2) {
91  min_period_ = period;
92  max_period_ = period;
93  total_periods_ = period;
94  } else if (message_count_ > 2) {
95  min_period_ = std::min(min_period_, period);
96  max_period_ = std::max(max_period_, period);
97  total_periods_ += period;
98  }
99  }
100  }
101  // Reset the timeout condition to false.
102  in_timeout_ = false;
103 
104  last_receive_time_ = now;
105  last_header_stamp_ = stamp;
106  }
107 
108  void checkTimeout(const ros::Time &now)
109  {
110  if (blocking_timeout_) {
111  return;
112  }
113 
114  if (in_timeout_ || timeout_ <= ros::Duration(0.0)) {
115  return;
116  }
117 
118  if (message_count_ == 0) {
119  return;
120  }
121 
122  if (age(now) > timeout_) {
123  in_timeout_ = true;
124  timeout_count_++;
125  }
126  }
127 
128 
129  public:
131  unmapped_topic_("N/A"),
132  mapped_topic_("N/A"),
133  message_count_(0),
134  timeout_(-1.0),
135  in_timeout_(false),
136  timeout_count_(0),
137  blocking_timeout_(false)
138  {
139  resetStatistics();
140  }
141 
142  const std::string& mappedTopic() const
143  {
144  return mapped_topic_;
145  }
146 
147  const std::string& unmappedTopic() const
148  {
149  return unmapped_topic_;
150  }
151 
152  int numPublishers() const
153  {
154  return sub_.getNumPublishers();
155  }
156 
158  {
159  message_count_ = 0;
160  in_timeout_ = false;
161  timeout_count_ = 0;
162  }
163 
164  int messageCount() const
165  {
166  return message_count_;
167  }
168 
169  ros::Duration age(const ros::Time &now) const
170  {
171  if (message_count_ < 1) {
172  return ros::DURATION_MAX;
173  } else if (last_header_stamp_.isValid()) {
174  return now - last_header_stamp_;
175  } else {
176  // If we've received messages but they don't have valid stamps, we can't
177  // actually determine the age, so just return an empty duration.
178  return ros::Duration(0.0);
179  }
180  }
181 
183  {
184  if (message_count_ < 1) {
185  return ros::DURATION_MAX;
186  } else {
187  return ros::Duration(total_latency_.toSec() / message_count_);
188  }
189  }
190 
192  {
193  if (message_count_ < 1) {
194  return ros::DURATION_MAX;
195  } else {
196  return min_latency_;
197  }
198  }
199 
201  {
202  if (message_count_ < 1) {
203  return ros::DURATION_MAX;
204  } else {
205  return max_latency_;
206  }
207  }
208 
209  double meanFrequencyHz() const
210  {
211  if (message_count_ < 2) {
212  return 0.0;
213  } else {
214  return 1e9 / meanPeriod().toNSec();
215  }
216  }
217 
219  {
220  if (message_count_ < 2) {
221  return ros::DURATION_MAX;
222  } else {
223  return ros::Duration(total_periods_.toSec() / (message_count_ - 1));
224  }
225  }
226 
228  {
229  if (message_count_ < 2) {
230  return ros::DURATION_MAX;
231  } else {
232  return min_period_;
233  }
234  }
235 
237  {
238  if (message_count_ < 2) {
239  return ros::DURATION_MAX;
240  } else {
241  return max_period_;
242  }
243  }
244 
245  void setTimeout(const ros::Duration &time_out)
246  {
247  timeout_ = time_out;
248  in_timeout_ = false;
249  timeout_count_ = 0;
250  }
251 
252  bool blockTimeouts(bool block) {
253  if (block) {
254  in_timeout_ = false;
255  }
256 
257  bool old_block = blocking_timeout_;
258  blocking_timeout_ = block;
259  return old_block;
260  }
261 
262  bool timeoutsBlocked() const {
263  return blocking_timeout_;
264  }
265 
267  {
268  return timeout_;
269  }
270 
271  bool timeoutEnabled() const
272  {
273  return timeout_ > ros::Duration(0.0);
274  }
275 
276  bool inTimeout()
277  {
279  return in_timeout_;
280  }
281 
283  {
285  return timeout_count_;
286  }
287 }; // class SubscriberImpl
288 
289 struct TrueType
290 {
291  static const bool value = true;
292 };
293 
294 template<class M , class T>
296 {
297  T *obj_;
298  void (T::*callback_)(const boost::shared_ptr< M const > &);
299 
300  public:
302  ros::NodeHandle &nh,
303  const std::string &topic,
304  uint32_t queue_size,
305  void(T::*fp)(const boost::shared_ptr< M const > &),
306  T *obj,
307  const ros::TransportHints &transport_hints)
308  {
309  unmapped_topic_ = topic;
310  mapped_topic_ = nh.resolveName(topic);
311 
313  ROS_INFO("Subscribing to '%s'.", mapped_topic_.c_str());
314  } else {
315  ROS_INFO("Subscribing to '%s' at '%s'.",
316  unmapped_topic_.c_str(),
317  mapped_topic_.c_str());
318  }
319 
320  callback_ = fp;
321  obj_ = obj;
322 
323  sub_ = nh.subscribe(mapped_topic_, queue_size,
324  &TypedSubscriberImpl::handleMessage<M>,
325  this,
326  transport_hints);
327  }
328 
329  // Handler for messages with headers
330  template <class M2>
331  typename boost::enable_if< ros::message_traits::HasHeader<M2>, void>::type
333  {
334  processHeader(msg->header.stamp);
335  (obj_->*callback_)(msg);
336  }
337 
338  // Handler for messages without headers
339  template <class M2>
340  typename boost::disable_if< ros::message_traits::HasHeader<M2>, void>::type
342  {
344  (obj_->*callback_)(msg);
345  }
346 }; // class TypedSubscriberImpl
347 
348 template<class M>
350 {
351  boost::function<void(const boost::shared_ptr< M const> &)> callback_;
352 
353 
354  public:
356  ros::NodeHandle &nh,
357  const std::string &topic,
358  uint32_t queue_size,
359  const boost::function<void(const boost::shared_ptr< M const> &)> &callback,
360  const ros::TransportHints &transport_hints)
361  {
362  unmapped_topic_ = topic;
363  mapped_topic_ = nh.resolveName(topic);
364 
366  ROS_INFO("Subscribing to '%s'.", mapped_topic_.c_str());
367  } else {
368  ROS_INFO("Subscribing to '%s' at '%s'.",
369  unmapped_topic_.c_str(),
370  mapped_topic_.c_str());
371  }
372 
373  callback_ = callback;
374 
375  sub_ = nh.subscribe(mapped_topic_, queue_size,
376  &BindSubscriberImpl::handleMessage<M>,
377  this,
378  transport_hints);
379  }
380 
381  // Handler for messages with headers
382  template <class M2>
383  typename boost::enable_if< ros::message_traits::HasHeader<M2>, void>::type
385  {
386  processHeader(msg->header.stamp);
387  callback_(msg);
388  }
389 
390  // Handler for messages without headers
391  template <class M2>
392  typename boost::disable_if< ros::message_traits::HasHeader<M2>, void>::type
394  {
396  callback_(msg);
397  }
398 }; // class BindSubscriberImpl
399 
400 template<class M>
402 {
404 
405  public:
407  ros::NodeHandle &nh,
408  const std::string &topic,
410  const ros::TransportHints &transport_hints)
411  {
412  unmapped_topic_ = topic;
413  mapped_topic_ = nh.resolveName(topic);
414 
416  ROS_INFO("Subscribing to '%s'.", mapped_topic_.c_str());
417  } else {
418  ROS_INFO("Subscribing to '%s' at '%s'.",
419  unmapped_topic_.c_str(),
420  mapped_topic_.c_str());
421  }
422 
423  dest_ = dest;
424 
425  sub_ = nh.subscribe(mapped_topic_, 2,
426  &StorageSubscriberImpl::handleMessage<M>,
427  this,
428  transport_hints);
429  }
430 
431  // Handler for messages with headers
432  template <class M2>
433  typename boost::enable_if< ros::message_traits::HasHeader<M2>, void>::type
435  {
436  processHeader(msg->header.stamp);
437  *dest_ = msg;
438  }
439 
440  // Handler for messages without headers
441  template <class M2>
442  typename boost::disable_if< ros::message_traits::HasHeader<M2>, void>::type
444  {
446  *dest_ = msg;
447  }
448 }; // class StorageSubscriberImpl
449 } // namespace swri
450 #endif // SWRI_ROSCPP_SUBSCRIBER_IMPL_H_
boost::shared_ptr< M const > * dest_
int numPublishers() const
ros::Duration total_periods_
ros::Duration max_latency_
ros::Duration meanPeriod() const
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
std::string resolveName(const std::string &name, bool remap=true) const
boost::enable_if< ros::message_traits::HasHeader< M2 >, void >::type handleMessage(const boost::shared_ptr< M const > &msg)
ros::Duration minPeriod() const
TypedSubscriberImpl(ros::NodeHandle &nh, const std::string &topic, uint32_t queue_size, void(T::*fp)(const boost::shared_ptr< M const > &), T *obj, const ros::TransportHints &transport_hints)
boost::disable_if< ros::message_traits::HasHeader< M2 >, void >::type handleMessage(const boost::shared_ptr< M const > &msg)
double meanFrequencyHz() const
ros::Duration min_period_
StorageSubscriberImpl(ros::NodeHandle &nh, const std::string &topic, boost::shared_ptr< M const > *dest, const ros::TransportHints &transport_hints)
bool timeoutEnabled() const
ros::Duration meanLatency() const
BindSubscriberImpl(ros::NodeHandle &nh, const std::string &topic, uint32_t queue_size, const boost::function< void(const boost::shared_ptr< M const > &)> &callback, const ros::TransportHints &transport_hints)
const std::string & mappedTopic() const
ros::Duration min_latency_
void processHeader(const ros::Time &stamp)
void setTimeout(const ros::Duration &time_out)
ros::Duration maxLatency() const
boost::disable_if< ros::message_traits::HasHeader< M2 >, void >::type handleMessage(const boost::shared_ptr< M const > &msg)
boost::enable_if< ros::message_traits::HasHeader< M2 >, void >::type handleMessage(const boost::shared_ptr< M const > &msg)
boost::enable_if< ros::message_traits::HasHeader< M2 >, void >::type handleMessage(const boost::shared_ptr< M const > &msg)
ros::Duration age(const ros::Time &now) const
bool blockTimeouts(bool block)
static bool isValid()
std::string mapped_topic_
#define ROS_INFO(...)
int64_t toNSec() const
std::string unmapped_topic_
uint32_t getNumPublishers() const
const std::string & unmappedTopic() const
boost::disable_if< ros::message_traits::HasHeader< M2 >, void >::type handleMessage(const boost::shared_ptr< M const > &msg)
void checkTimeout(const ros::Time &now)
ros::Duration minLatency() const
ros::Time last_receive_time_
ros::Subscriber sub_
ros::Time last_header_stamp_
int messageCount() const
static Time now()
ros::Duration timeout() const
bool timeoutsBlocked() const
ros::Duration max_period_
ROSTIME_DECL const Duration DURATION_MAX
ros::Duration maxPeriod() const
ros::Duration total_latency_
ros::Duration timeout_


swri_roscpp
Author(s):
autogenerated on Fri Jun 7 2019 22:05:50