subscriber.h
Go to the documentation of this file.
1 // *****************************************************************************
2 //
3 // Copyright (c) 2015, Southwest Research Institute® (SwRI®)
4 // All rights reserved.
5 //
6 // Redistribution and use in source and binary forms, with or without
7 // modification, are permitted provided that the following conditions are met:
8 // * Redistributions of source code must retain the above copyright
9 // notice, this list of conditions and the following disclaimer.
10 // * Redistributions in binary form must reproduce the above copyright
11 // notice, this list of conditions and the following disclaimer in the
12 // documentation and/or other materials provided with the distribution.
13 // * Neither the name of Southwest Research Institute® (SwRI®) nor the
14 // names of its contributors may be used to endorse or promote products
15 // derived from this software without specific prior written permission.
16 //
17 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 // ARE DISCLAIMED. IN NO EVENT SHALL SOUTHWEST RESEARCH INSTITUTE BE LIABLE FOR ANY
21 // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
22 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
23 // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
24 // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
26 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 //
28 // *****************************************************************************
29 #ifndef SWRI_ROSCPP_SUBSCRIBER_H_
30 #define SWRI_ROSCPP_SUBSCRIBER_H_
31 
32 
33 #include <ros/node_handle.h>
35 
36 #include <swri_roscpp/parameters.h>
38 
39 namespace swri
40 {
41 // This is an extended interface to the ros::Subscriber class.
42 //
43 // This is an extended interface to the ros::Subscriber class that
44 // provides a little more default functionality and instrumentation.
45 //
46 // - Prints information when the subscription is created (to ROS_INFO)
47 // about the unmapped and mapped topic names.
48 //
49 // - Maintains some statistics about the messages (counts, latency, period)
50 //
51 // - Implements timeout logic and timeout counting.
52 //
53 // This implementation provides two interfaces. It supports the
54 // traditional ROS callback interface (though it is deliberately
55 // limited to callbacks that take a *ConstPtr& type), and an interface
56 // where the message is simply assigned to *ConstPtr variable that is
57 // specified at creation. The second interface replaces the previous
58 // swri_roscpp::LatchedSubscriber and allows you to avoid writing
59 // trivial callback functions.
60 
62 {
63  private:
64  // The implementation is kept in a separate class to maintain clean
65  // copy/move semantics with ROS (since the ROS subscriber binds to a
66  // single address for the callback) and to allow us to hide the
67  // template arguments so the message type doesn't have to be
68  // specified when the subscriber is declared.
70 
71  public:
72  Subscriber();
73 
74  // Using class method callback.
75  template<class M , class T >
77  const std::string &topic,
78  uint32_t queue_size,
79  void(T::*fp)(const boost::shared_ptr< M const > &),
80  T *obj,
81  const ros::TransportHints &transport_hints=ros::TransportHints());
82 
83  // Using a boost function callback.
84  template<class M>
86  const std::string &topic,
87  uint32_t queue_size,
88  const boost::function<void(const boost::shared_ptr<M const> &)> &callback,
89  const ros::TransportHints &transport_hints=ros::TransportHints());
90 
91  // This is an alternate interface that stores a received message in
92  // a variable without calling a user-defined callback function.
93  // This is useful for cases where you just want to store a message
94  // for usage later and avoids having to write a trivial callback
95  // function.
96  template<class M>
98  const std::string &topic,
100  const ros::TransportHints &transport_hints=ros::TransportHints());
101 
102  Subscriber& operator=(const Subscriber &other);
103 
104  virtual ~Subscriber()
105  {
106  }
107 
108  // Reset all statistics, including message and timeout counts.
109  void resetStatistics();
110 
111  // Returns the unmapped topic name that was provided when the
112  // subscriber was created.
113  const std::string& unmappedTopic() const;
114  // Returns the fully mapped topic name that the subscriber is
115  // connected on.
116  const std::string& mappedTopic() const;
117  // Returns the number of publishers that are connected to this
118  // subscriber.
119  int numPublishers() const;
120 
121  // How many messages have been received.
122  int messageCount() const;
123 
124  // Age of the most recent message (difference between now and the
125  // header stamp (or time message was received for messages that
126  // don't have headers).
127  ros::Duration age(const ros::Time &now=ros::TIME_MIN) const;
128  double ageSeconds(const ros::Time &now=ros::TIME_MIN) const;
129  double ageMilliseconds(const ros::Time &now=ros::TIME_MIN) const;
130 
131  // Average latency (time difference between the time stamp and when
132  // the message is received). These will be useless for message types
133  // that do not have a header.
134  ros::Duration meanLatency() const;
135  ros::Duration minLatency() const;
136  ros::Duration maxLatency() const;
137  double meanLatencyMicroseconds() const;
138  double minLatencyMicroseconds() const;
139  double maxLatencyMicroseconds() const;
140 
141  // Frequency/Period in terms of when the message was received (not
142  // the header stamp).
143  double meanFrequencyHz() const;
144  ros::Duration meanPeriod() const;
145  ros::Duration minPeriod() const;
146  ros::Duration maxPeriod() const;
147  double meanPeriodMilliseconds() const;
148  double minPeriodMilliseconds() const;
149  double maxPeriodMilliseconds() const;
150 
151  // Provide a negative value to disable the timeout (default is -1).
152  void setTimeout(const ros::Duration &time_out);
153  void setTimeout(const double time_out);
154  // Read the timeout directly from the parameter server.
155  void timeoutParam(const ros::NodeHandle &nh,
156  const std::string &parameter_name,
157  const double default_value);
158 
159  // Block/unblock timeouts from occuring. This allows you to
160  // temporarily block timeouts (for example, if a message is not
161  // expected in a particular mode). Returns the current state
162  bool blockTimeouts(bool block);
163 
164  // Return true if the subscriber is currently blocking timeouts from
165  // occurring.
166  bool timeoutsBlocked() const;
167 
168  // Determine if the timeout is currently enabled.
169  bool timeoutEnabled() const;
170  // Read the current timeout setting.
171  ros::Duration timeout() const;
172  double timeoutMilliseconds() const;
173 
174  // Determine if the topic is in a timed out state.
175  bool inTimeout();
176  // How many times the topic has been in a timeout state.
177  int timeoutCount();
178 
179  // These flags determine which values are added to a diagnostic
180  // status by the appendDiagnostics method.
182  DIAG_CONNECTION = 1 << 0, // Include topic names, publisher counts
183  DIAG_MSG_COUNT = 1 << 1, // Include message count
184  DIAG_TIMEOUT = 1 << 2, // Include timeout counts if applicable
185  DIAG_LATENCY = 1 << 3, // Include latency information
186  DIAG_RATE = 1 << 4, // Include rate information
187 
188  DIAG_ALL = ~0, // Abbreviation to include all information
190  // Abbreviation to include everything except connection info.
191  };
192 
193  // This is a convenience method to add information about this
194  // subscription to a diagnostic status in a standard way.
195  //
196  // The status is merged with a warning if no messages have been
197  // received or if timeouts have occurred. The status is merged with
198  // an error if the subscription is in an active timeout status.
199  //
200  // The flags parameter determines which values are added to the
201  // status' key/value pairs. This should be a bitwise combination of
202  // the values defined in DIAGNOSTIC_FLAGS.
204  const std::string &name,
205  const int flags);
206 }; // class Subscriber
207 
208 inline
210 {
211  // Setup an empty implementation so that we can assume impl_ is
212  // non-null and avoid a lot of unnecessary NULL checks.
213  impl_ = boost::make_shared<SubscriberImpl>();
214 }
215 
216 template<class M , class T >
217 inline
219  const std::string &topic,
220  uint32_t queue_size,
221  void(T::*fp)(const boost::shared_ptr< M const > &),
222  T *obj,
223  const ros::TransportHints &transport_hints)
224 {
227  nh, topic, queue_size, fp, obj, transport_hints));
228 }
229 
230 template<class M>
231 inline
233  const std::string &topic,
234  uint32_t queue_size,
235  const boost::function<void(const boost::shared_ptr<M const> &)> &callback,
236  const ros::TransportHints &transport_hints)
237 {
240  nh, topic, queue_size, callback, transport_hints));
241 }
242 
243 template<class M>
244 inline
246  const std::string &topic,
248  const ros::TransportHints &transport_hints)
249 {
252  nh, topic, dest, transport_hints));
253 }
254 
255 inline
257 {
258  // If we have some non-default parameters and the other class
259  // doesn't, we preserve our parameters across assignment.
260  //
261  // This is to support the use case where we read ROS parameters, set
262  // them in the subscriber, and then create the subscriptions.
263  // Otherwise, the user has to read the timeouts, save them
264  // temporarily, and then set them after the subscription is set up.
265  //
266  // This could cause confusing behavior if you are moving subscribers
267  // around a lot, but I've never seen that kind of use case in any
268  // ROS code.
269 
270  ros::Duration new_timeout = other.impl_->timeout();
271  if (impl_->timeoutEnabled() && !other.impl_->timeoutEnabled()) {
272  new_timeout = impl_->timeout();
273  }
274 
275  impl_ = other.impl_;
276  impl_->setTimeout(new_timeout);
277 
278  return *this;
279 }
280 
281 inline
282 const std::string& Subscriber::unmappedTopic() const
283 {
284  return impl_->unmappedTopic();
285 }
286 
287 inline
288 const std::string& Subscriber::mappedTopic() const
289 {
290  return impl_->mappedTopic();
291 }
292 
293 inline
295 {
296  return impl_->numPublishers();
297 }
298 
299 inline
301 {
302  impl_->resetStatistics();
303 }
304 
305 inline
307 {
308  return impl_->messageCount();
309 }
310 
311 inline
313 {
314  if (now == ros::TIME_MIN) {
315  return impl_->age(ros::Time::now());
316  } else {
317  return impl_->age(now);
318  }
319 }
320 
321 inline
322 double Subscriber::ageSeconds(const ros::Time &now) const
323 {
324  return age(now).toSec();
325 }
326 
327 inline
328 double Subscriber::ageMilliseconds(const ros::Time &now) const
329 {
330  return age(now).toNSec() / 1000000.0;
331 }
332 
333 inline
335 {
336  return impl_->meanLatency();
337 }
338 
339 inline
341 {
342  return impl_->minLatency();
343 }
344 
345 inline
347 {
348  return impl_->maxLatency();
349 }
350 
351 inline
353 {
354  return meanLatency().toNSec() / 1000.0;
355 }
356 
357 inline
359 {
360  return minLatency().toNSec() / 1000.0;
361 }
362 
363 inline
365 {
366  return maxLatency().toNSec() / 1000.0;
367 }
368 
369 inline
371 {
372  return impl_->meanPeriod();
373 }
374 
375 inline
377 {
378  return impl_->meanFrequencyHz();
379 }
380 
381 inline
383 {
384  return impl_->minPeriod();
385 }
386 
387 inline
389 {
390  return impl_->maxPeriod();
391 }
392 
393 inline
395 {
396  return meanPeriod().toNSec() / 1000000.0;
397 }
398 
399 inline
401 {
402  return minPeriod().toNSec() / 1000000.0;
403 }
404 
405 inline
407 {
408  return maxPeriod().toNSec() / 1000000.0;
409 }
410 
411 inline
413 {
414  impl_->setTimeout(time_out);
415 }
416 
417 inline
418 void Subscriber::setTimeout(const double time_out)
419 {
420  setTimeout(ros::Duration(time_out));
421 }
422 
423 inline
425  const ros::NodeHandle &nh,
426  const std::string &parameter_name,
427  const double default_value)
428 {
429  double timeout;
430  swri::param(nh, parameter_name, timeout, default_value);
431  setTimeout(timeout);
432 }
433 
434 inline
436 {
437  return impl_->blockTimeouts(block);
438 }
439 
440 inline
442 {
443  return impl_->timeoutsBlocked();
444 }
445 
446 inline
448 {
449  return impl_->timeout();
450 }
451 
452 inline
454 {
455  return impl_->timeoutEnabled();
456 }
457 
458 inline
460 {
461  return impl_->timeout().toNSec() / 1.0e6;
462 }
463 
464 inline
466 {
467  return impl_->inTimeout();
468 }
469 
470 inline
472 {
473  return impl_->timeoutCount();
474 }
475 
476 inline
478  const std::string &name,
479  int flags)
480 {
481  // Alias a type for easier access to DiagnosticStatus enumerations.
482  typedef diagnostic_msgs::DiagnosticStatus DS;
483 
484  // We are considering no messages seen as a warning because this is
485  // a normal condition during start up, but should be resolved once
486  // the system is running. On the other hand, timeouts are typically
487  // an unexpected condition that occur when a publisher dies or
488  // communications is getting blocked. Active timeouts affect
489  // operation so are treated as an error condition. If a timeout is
490  // not active, but has occurred, we report this as a warning so that
491  // we know something has been wrong and needs to be investigated.
492 
493  if (!messageCount()) {
494  status.mergeSummaryf(DS::WARN, "Waiting for %s messages.", name.c_str());
495  } else if (inTimeout()) {
496  status.mergeSummaryf(DS::ERROR, "%s timeout.", name.c_str());
497  } else if (timeoutCount()) {
498  status.mergeSummaryf(DS::WARN, "%s timeouts have occurred.", name.c_str());
499  }
500 
501  if (flags & DIAG_CONNECTION) {
502  if (mappedTopic() == unmappedTopic()) {
503  status.addf(name + "topic name", "%s", mappedTopic().c_str());
504  } else {
505  status.addf(name + "topic name", "%s -> %s",
506  unmappedTopic().c_str(),
507  mappedTopic().c_str());
508  }
509  status.addf(name + " publishers", "%d", numPublishers());
510  }
511 
512  if (flags & DIAG_MSG_COUNT) {
513  status.add(name + " message count", messageCount());
514  }
515 
516  if ((flags & DIAG_TIMEOUT) && timeoutEnabled()) {
517  status.addf(name + " timeouts count",
518  "%d ( > %.2f ms)",
519  timeoutCount(),
521  }
522 
523  if (flags & DIAG_LATENCY) {
524  if (messageCount() < 1) {
525  status.add(name + " latency",
526  "min: N/A us, mean: N/A us, max: N/A us");
527  } else {
528  status.addf(name + " latency",
529  "min: %.2f us, mean: %.2f us, max: %.2f us",
533  }
534  }
535 
536  if (flags & DIAG_RATE) {
537  if (messageCount() < 2) {
538  status.add(
539  name + " rates",
540  "min period: N/A ms, mean frequency: N/A hz, max period: N/A ms");
541  } else {
542  status.addf(
543  name + " rates",
544  "min period: %.2f ms, mean frequency: %.2f hz, max period: %.2f ms",
546  meanFrequencyHz(),
548  }
549  }
550 }
551 } // namespace swri
552 #endif // SWRI_ROSCPP_SUBSCRIBER_H_
ros::Duration age(const ros::Time &now=ros::TIME_MIN) const
Definition: subscriber.h:312
double ageMilliseconds(const ros::Time &now=ros::TIME_MIN) const
Definition: subscriber.h:328
const std::string & mappedTopic() const
Definition: subscriber.h:288
ros::Duration maxPeriod() const
Definition: subscriber.h:388
bool blockTimeouts(bool block)
Definition: subscriber.h:435
void resetStatistics()
Definition: subscriber.h:300
const Time TIME_MIN(0, 1)
double maxPeriodMilliseconds() const
Definition: subscriber.h:406
Subscriber & operator=(const Subscriber &other)
Definition: subscriber.h:256
void appendDiagnostics(diagnostic_updater::DiagnosticStatusWrapper &status, const std::string &name, const int flags)
Definition: subscriber.h:477
double maxLatencyMicroseconds() const
Definition: subscriber.h:364
ros::Duration minPeriod() const
Definition: subscriber.h:382
def default_value(type_)
const std::string & unmappedTopic() const
Definition: subscriber.h:282
void addf(const std::string &key, const char *format,...)
ros::Duration timeout() const
Definition: subscriber.h:447
void timeoutParam(const ros::NodeHandle &nh, const std::string &parameter_name, const double default_value)
Definition: subscriber.h:424
double meanLatencyMicroseconds() const
Definition: subscriber.h:352
int numPublishers() const
Definition: subscriber.h:294
bool timeoutsBlocked() const
Definition: subscriber.h:441
ros::Duration meanPeriod() const
Definition: subscriber.h:370
ros::Duration maxLatency() const
Definition: subscriber.h:346
void param(swri::NodeHandle &nh, const std::string name, std::string &value, const std::string def, const std::string description)
Definition: node_handle.h:1159
virtual ~Subscriber()
Definition: subscriber.h:104
ros::Duration meanLatency() const
Definition: subscriber.h:334
void setTimeout(const ros::Duration &time_out)
Definition: subscriber.h:412
boost::shared_ptr< SubscriberImpl > impl_
Definition: subscriber.h:69
ros::Duration minLatency() const
Definition: subscriber.h:340
double ageSeconds(const ros::Time &now=ros::TIME_MIN) const
Definition: subscriber.h:322
double timeoutMilliseconds() const
Definition: subscriber.h:459
double minPeriodMilliseconds() const
Definition: subscriber.h:400
double minLatencyMicroseconds() const
Definition: subscriber.h:358
bool timeoutEnabled() const
Definition: subscriber.h:453
int64_t toNSec() const
static Time now()
int messageCount() const
Definition: subscriber.h:306
void mergeSummaryf(unsigned char lvl, const char *format,...)
void add(const std::string &key, const T &val)
double meanPeriodMilliseconds() const
Definition: subscriber.h:394
diagnostic_msgs::DiagnosticStatus DS
double meanFrequencyHz() const
Definition: subscriber.h:376


swri_roscpp
Author(s):
autogenerated on Sat Jan 21 2023 03:13:16