subscriber.h
Go to the documentation of this file.
1 // *****************************************************************************
2 //
3 // Copyright (c) 2015, Southwest Research Institute® (SwRI®)
4 // All rights reserved.
5 //
6 // Redistribution and use in source and binary forms, with or without
7 // modification, are permitted provided that the following conditions are met:
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above copyright
11 // notice, this list of conditions and the following disclaimer in the
12 // documentation and/or other materials provided with the distribution.
13 // * Neither the name of Southwest Research Institute® (SwRI®) nor the
14 // names of its contributors may be used to endorse or promote products
15 // derived from this software without specific prior written permission.
16 //
17 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 // ARE DISCLAIMED. IN NO EVENT SHALL SOUTHWEST RESEARCH INSTITUTE BE LIABLE FOR ANY
21 // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
22 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
23 // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
24 // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
26 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 //
28 // *****************************************************************************
29 #ifndef SWRI_ROSCPP_SUBSCRIBER_H_
30 #define SWRI_ROSCPP_SUBSCRIBER_H_
31 
32 
33 #include <ros/node_handle.h>
35 
36 #include <swri_roscpp/parameters.h>
38 
39 namespace swri
40 {
41 // This is an extended interface to the ros::Subscriber class.
42 //
43 // This is an extended interface to the ros::Subscriber class that
44 // provides a little more default functionality and instrumentation.
45 //
46 // - Prints information when the subscription is created (to ROS_INFO)
47 // about the unmapped and mapped topic names.
48 //
49 // - Maintains some statistics about the messages (counts, latency, period)
50 //
51 // - Implements timeout logic and timeout counting.
52 //
53 // This implementation provides two interfaces. It supports the
54 // traditional ROS callback interface (though it is deliberately
55 // limited to callbacks that take a *ConstPtr& type), and an interface
56 // where the message is simply assigned to *ConstPtr variable that is
57 // specified at creation. The second interface replaces the previous
58 // swri_roscpp::LatchedSubscriber and allows you to avoid writing
59 // trivial callback functions.
60 
62 {
63  private:
64  // The implementation is kept in a separate class to maintain clean
65  // copy/move semantics with ROS (since the ROS subscriber binds to a
66  // single address for the callback) and to allow us to hide the
67  // template arguments so the message type doesn't have to be
68  // specified when the subscriber is declared.
70 
71  public:
72  Subscriber();
73 
74  // Using class method callback.
75  template<class M , class T >
77  const std::string &topic,
78  uint32_t queue_size,
79  void(T::*fp)(const boost::shared_ptr< M const > &),
80  T *obj,
81  const ros::TransportHints &transport_hints=ros::TransportHints());
82 
83  // Using a boost function callback.
84  template<class M>
86  const std::string &topic,
87  uint32_t queue_size,
88  const boost::function<void(const boost::shared_ptr<M const> &)> &callback,
89  const ros::TransportHints &transport_hints=ros::TransportHints());
90 
91  // This is an alternate interface that stores a received message in
92  // a variable without calling a user-defined callback function.
93  // This is useful for cases where you just want to store a message
94  // for usage later and avoids having to write a trivial callback
95  // function.
96  template<class M>
98  const std::string &topic,
100  const ros::TransportHints &transport_hints=ros::TransportHints());
101 
102  Subscriber& operator=(const Subscriber &other);
103 
104  // Reset all statistics, including message and timeout counts.
105  void resetStatistics();
106 
107  // Returns the unmapped topic name that was provided when the
108  // subscriber was created.
109  const std::string& unmappedTopic() const;
110  // Returns the fully mapped topic name that the subscriber is
111  // connected on.
112  const std::string& mappedTopic() const;
113  // Returns the number of publishers that are connected to this
114  // subscriber.
115  int numPublishers() const;
116 
117  // How many messages have been received.
118  int messageCount() const;
119 
120  // Age of the most recent message (difference between now and the
121  // header stamp (or time message was received for messages that
122  // don't have headers).
123  ros::Duration age(const ros::Time &now=ros::TIME_MIN) const;
124  double ageSeconds(const ros::Time &now=ros::TIME_MIN) const;
125  double ageMilliseconds(const ros::Time &now=ros::TIME_MIN) const;
126 
127  // Average latency (time difference between the time stamp and when
128  // the message is received). These will be useless for message types
129  // that do not have a header.
130  ros::Duration meanLatency() const;
131  ros::Duration minLatency() const;
132  ros::Duration maxLatency() const;
133  double meanLatencyMicroseconds() const;
134  double minLatencyMicroseconds() const;
135  double maxLatencyMicroseconds() const;
136 
137  // Frequency/Period in terms of when the message was received (not
138  // the header stamp).
139  double meanFrequencyHz() const;
140  ros::Duration meanPeriod() const;
141  ros::Duration minPeriod() const;
142  ros::Duration maxPeriod() const;
143  double meanPeriodMilliseconds() const;
144  double minPeriodMilliseconds() const;
145  double maxPeriodMilliseconds() const;
146 
147  // Provide a negative value to disable the timeout (default is -1).
148  void setTimeout(const ros::Duration &time_out);
149  void setTimeout(const double time_out);
150  // Read the timeout directly from the parameter server.
151  void timeoutParam(const ros::NodeHandle &nh,
152  const std::string &parameter_name,
153  const double default_value);
154 
155  // Block/unblock timeouts from occuring. This allows you to
156  // temporarily block timeouts (for example, if a message is not
157  // expected in a particular mode). Returns the current state
158  bool blockTimeouts(bool block);
159 
160  // Return true if the subscriber is currently blocking timeouts from
161  // occurring.
162  bool timeoutsBlocked() const;
163 
164  // Determine if the timeout is currently enabled.
165  bool timeoutEnabled() const;
166  // Read the current timeout setting.
167  ros::Duration timeout() const;
168  double timeoutMilliseconds() const;
169 
170  // Determine if the topic is in a timed out state.
171  bool inTimeout();
172  // How many times the topic has been in a timeout state.
173  int timeoutCount();
174 
175  // These flags determine which values are added to a diagnostic
176  // status by the appendDiagnostics method.
178  DIAG_CONNECTION = 1 << 0, // Include topic names, publisher counts
179  DIAG_MSG_COUNT = 1 << 1, // Include message count
180  DIAG_TIMEOUT = 1 << 2, // Include timeout counts if applicable
181  DIAG_LATENCY = 1 << 3, // Include latency information
182  DIAG_RATE = 1 << 4, // Include rate information
183 
184  DIAG_ALL = ~0, // Abbreviation to include all information
186  // Abbreviation to include everything except connection info.
187  };
188 
189  // This is a convenience method to add information about this
190  // subscription to a diagnostic status in a standard way.
191  //
192  // The status is merged with a warning if no messages have been
193  // received or if timeouts have occurred. The status is merged with
194  // an error if the subscription is in an active timeout status.
195  //
196  // The flags parameter determines which values are added to the
197  // status' key/value pairs. This should be a bitwise combination of
198  // the values defined in DIAGNOSTIC_FLAGS.
200  const std::string &name,
201  const int flags);
202 }; // class Subscriber
203 
204 inline
206 {
207  // Setup an empty implementation so that we can assume impl_ is
208  // non-null and avoid a lot of unnecessary NULL checks.
209  impl_ = boost::make_shared<SubscriberImpl>();
210 }
211 
212 template<class M , class T >
213 inline
215  const std::string &topic,
216  uint32_t queue_size,
217  void(T::*fp)(const boost::shared_ptr< M const > &),
218  T *obj,
219  const ros::TransportHints &transport_hints)
220 {
223  nh, topic, queue_size, fp, obj, transport_hints));
224 }
225 
226 template<class M>
227 inline
229  const std::string &topic,
230  uint32_t queue_size,
231  const boost::function<void(const boost::shared_ptr<M const> &)> &callback,
232  const ros::TransportHints &transport_hints)
233 {
236  nh, topic, queue_size, callback, transport_hints));
237 }
238 
239 template<class M>
240 inline
242  const std::string &topic,
244  const ros::TransportHints &transport_hints)
245 {
248  nh, topic, dest, transport_hints));
249 }
250 
251 inline
253 {
254  // If we have some non-default parameters and the other class
255  // doesn't, we preserve our parameters across assignment.
256  //
257  // This is to support the use case where we read ROS parameters, set
258  // them in the subscriber, and then create the subscriptions.
259  // Otherwise, the user has to read the timeouts, save them
260  // temporarily, and then set them after the subscription is set up.
261  //
262  // This could cause confusing behavior if you are moving subscribers
263  // around a lot, but I've never seen that kind of use case in any
264  // ROS code.
265 
266  ros::Duration new_timeout = other.impl_->timeout();
267  if (impl_->timeoutEnabled() && !other.impl_->timeoutEnabled()) {
268  new_timeout = impl_->timeout();
269  }
270 
271  impl_ = other.impl_;
272  impl_->setTimeout(new_timeout);
273 
274  return *this;
275 }
276 
277 inline
278 const std::string& Subscriber::unmappedTopic() const
279 {
280  return impl_->unmappedTopic();
281 }
282 
283 inline
284 const std::string& Subscriber::mappedTopic() const
285 {
286  return impl_->mappedTopic();
287 }
288 
289 inline
291 {
292  return impl_->numPublishers();
293 }
294 
295 inline
297 {
298  impl_->resetStatistics();
299 }
300 
301 inline
303 {
304  return impl_->messageCount();
305 }
306 
307 inline
309 {
310  if (now == ros::TIME_MIN) {
311  return impl_->age(ros::Time::now());
312  } else {
313  return impl_->age(now);
314  }
315 }
316 
317 inline
318 double Subscriber::ageSeconds(const ros::Time &now) const
319 {
320  return age(now).toSec();
321 }
322 
323 inline
324 double Subscriber::ageMilliseconds(const ros::Time &now) const
325 {
326  return age(now).toNSec() / 1000000.0;
327 }
328 
329 inline
331 {
332  return impl_->meanLatency();
333 }
334 
335 inline
337 {
338  return impl_->minLatency();
339 }
340 
341 inline
343 {
344  return impl_->maxLatency();
345 }
346 
347 inline
349 {
350  return meanLatency().toNSec() / 1000.0;
351 }
352 
353 inline
355 {
356  return minLatency().toNSec() / 1000.0;
357 }
358 
359 inline
361 {
362  return maxLatency().toNSec() / 1000.0;
363 }
364 
365 inline
367 {
368  return impl_->meanPeriod();
369 }
370 
371 inline
373 {
374  return impl_->meanFrequencyHz();
375 }
376 
377 inline
379 {
380  return impl_->minPeriod();
381 }
382 
383 inline
385 {
386  return impl_->maxPeriod();
387 }
388 
389 inline
391 {
392  return meanPeriod().toNSec() / 1000000.0;
393 }
394 
395 inline
397 {
398  return minPeriod().toNSec() / 1000000.0;
399 }
400 
401 inline
403 {
404  return maxPeriod().toNSec() / 1000000.0;
405 }
406 
407 inline
409 {
410  impl_->setTimeout(time_out);
411 }
412 
413 inline
414 void Subscriber::setTimeout(const double time_out)
415 {
416  setTimeout(ros::Duration(time_out));
417 }
418 
419 inline
421  const ros::NodeHandle &nh,
422  const std::string &parameter_name,
423  const double default_value)
424 {
425  double timeout;
426  swri::param(nh, parameter_name, timeout, default_value);
427  setTimeout(timeout);
428 }
429 
430 inline
432 {
433  return impl_->blockTimeouts(block);
434 }
435 
436 inline
438 {
439  return impl_->timeoutsBlocked();
440 }
441 
442 inline
444 {
445  return impl_->timeout();
446 }
447 
448 inline
450 {
451  return impl_->timeoutEnabled();
452 }
453 
454 inline
456 {
457  return impl_->timeout().toNSec() / 1.0e6;
458 }
459 
460 inline
462 {
463  return impl_->inTimeout();
464 }
465 
466 inline
468 {
469  return impl_->timeoutCount();
470 }
471 
472 inline
474  const std::string &name,
475  int flags)
476 {
477  // Alias a type for easier access to DiagnosticStatus enumerations.
478  typedef diagnostic_msgs::DiagnosticStatus DS;
479 
480  // We are considering no messages seen as a warning because this is
481  // a normal condition during start up, but should be resolved once
482  // the system is running. On the other hand, timeouts are typically
483  // an unexpected condition that occur when a publisher dies or
484  // communications is getting blocked. Active timeouts affect
485  // operation so are treated as an error condition. If a timeout is
486  // not active, but has occurred, we report this as a warning so that
487  // we know something has been wrong and needs to be investigated.
488 
489  if (!messageCount()) {
490  status.mergeSummaryf(DS::WARN, "Waiting for %s messages.", name.c_str());
491  } else if (inTimeout()) {
492  status.mergeSummaryf(DS::ERROR, "%s timeout.", name.c_str());
493  } else if (timeoutCount()) {
494  status.mergeSummaryf(DS::WARN, "%s timeouts have occurred.", name.c_str());
495  }
496 
497  if (flags & DIAG_CONNECTION) {
498  if (mappedTopic() == unmappedTopic()) {
499  status.addf(name + "topic name", "%s", mappedTopic().c_str());
500  } else {
501  status.addf(name + "topic name", "%s -> %s",
502  unmappedTopic().c_str(),
503  mappedTopic().c_str());
504  }
505  status.addf(name + " publishers", "%d", numPublishers());
506  }
507 
508  if (flags & DIAG_MSG_COUNT) {
509  status.add(name + " message count", messageCount());
510  }
511 
512  if ((flags & DIAG_TIMEOUT) && timeoutEnabled()) {
513  status.addf(name + " timeouts count",
514  "%d ( > %.2f ms)",
515  timeoutCount(),
517  }
518 
519  if (flags & DIAG_LATENCY) {
520  if (messageCount() < 1) {
521  status.add(name + " latency",
522  "min: N/A us, mean: N/A us, max: N/A us");
523  } else {
524  status.addf(name + " latency",
525  "min: %.2f us, mean: %.2f us, max: %.2f us",
529  }
530  }
531 
532  if (flags & DIAG_RATE) {
533  if (messageCount() < 2) {
534  status.add(
535  name + " rates",
536  "min period: N/A ms, mean frequency: N/A hz, max period: N/A ms");
537  } else {
538  status.addf(
539  name + " rates",
540  "min period: %.2f ms, mean frequency: %.2f hz, max period: %.2f ms",
542  meanFrequencyHz(),
544  }
545  }
546 }
547 } // namespace swri
548 #endif // SWRI_ROSCPP_SUBSCRIBER_H_
ros::Duration meanPeriod() const
Definition: subscriber.h:366
ros::Duration minPeriod() const
Definition: subscriber.h:378
bool blockTimeouts(bool block)
Definition: subscriber.h:431
double ageMilliseconds(const ros::Time &now=ros::TIME_MIN) const
Definition: subscriber.h:324
void resetStatistics()
Definition: subscriber.h:296
double meanFrequencyHz() const
Definition: subscriber.h:372
const Time TIME_MIN(0, 1)
bool timeoutsBlocked() const
Definition: subscriber.h:437
ros::Duration maxPeriod() const
Definition: subscriber.h:384
Subscriber & operator=(const Subscriber &other)
Definition: subscriber.h:252
void appendDiagnostics(diagnostic_updater::DiagnosticStatusWrapper &status, const std::string &name, const int flags)
Definition: subscriber.h:473
double maxLatencyMicroseconds() const
Definition: subscriber.h:360
double minPeriodMilliseconds() const
Definition: subscriber.h:396
int messageCount() const
Definition: subscriber.h:302
void addf(const std::string &key, const char *format,...)
const std::string & unmappedTopic() const
Definition: subscriber.h:278
int numPublishers() const
Definition: subscriber.h:290
void timeoutParam(const ros::NodeHandle &nh, const std::string &parameter_name, const double default_value)
Definition: subscriber.h:420
const std::string & mappedTopic() const
Definition: subscriber.h:284
ros::Duration minLatency() const
Definition: subscriber.h:336
static void param(const ros::NodeHandle &nh, const std::string &name, int &variable, const int default_value)
Definition: parameters.h:102
bool timeoutEnabled() const
Definition: subscriber.h:449
int64_t toNSec() const
ros::Duration maxLatency() const
Definition: subscriber.h:342
void setTimeout(const ros::Duration &time_out)
Definition: subscriber.h:408
boost::shared_ptr< SubscriberImpl > impl_
Definition: subscriber.h:69
double meanPeriodMilliseconds() const
Definition: subscriber.h:390
double timeoutMilliseconds() const
Definition: subscriber.h:455
double meanLatencyMicroseconds() const
Definition: subscriber.h:348
ros::Duration meanLatency() const
Definition: subscriber.h:330
ros::Duration timeout() const
Definition: subscriber.h:443
static Time now()
void mergeSummaryf(unsigned char lvl, const char *format,...)
void add(const std::string &key, const T &val)
ros::Duration age(const ros::Time &now=ros::TIME_MIN) const
Definition: subscriber.h:308
double ageSeconds(const ros::Time &now=ros::TIME_MIN) const
Definition: subscriber.h:318
double minLatencyMicroseconds() const
Definition: subscriber.h:354
double maxPeriodMilliseconds() const
Definition: subscriber.h:402
diagnostic_msgs::DiagnosticStatus DS


swri_roscpp
Author(s):
autogenerated on Tue Apr 6 2021 02:50:35