subscriber.h
Go to the documentation of this file.
00001 // *****************************************************************************
00002 //
00003 // Copyright (c) 2015, Southwest Research Institute® (SwRI®)
00004 // All rights reserved.
00005 //
00006 // Redistribution and use in source and binary forms, with or without
00007 // modification, are permitted provided that the following conditions are met:
00008 //     * Redistributions of source code must retain the above copyright
00009 //       notice, this list of conditions and the following disclaimer.
00010 //     * Redistributions in binary form must reproduce the above copyright
00011 //       notice, this list of conditions and the following disclaimer in the
00012 //       documentation and/or other materials provided with the distribution.
00013 //     * Neither the name of Southwest Research Institute® (SwRI®) nor the
00014 //       names of its contributors may be used to endorse or promote products
00015 //       derived from this software without specific prior written permission.
00016 //
00017 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020 // ARE DISCLAIMED. IN NO EVENT SHALL SOUTHWEST RESEARCH INSTITUTE BE LIABLE FOR ANY
00021 // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
00022 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00023 // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
00024 // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00025 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00026 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00027 //
00028 // *****************************************************************************
00029 #ifndef SWRI_ROSCPP_SUBSCRIBER_H_
00030 #define SWRI_ROSCPP_SUBSCRIBER_H_
00031 
00032 
00033 #include <ros/node_handle.h>
00034 #include <diagnostic_updater/DiagnosticStatusWrapper.h>
00035 
00036 #include <swri_roscpp/parameters.h>
00037 #include <swri_roscpp/subscriber_impl.h>
00038 
00039 namespace swri
00040 {
00041 // This is an extended interface to the ros::Subscriber class.
00042 //
00043 // This is an extended interface to the ros::Subscriber class that
00044 // provides a little more default functionality and instrumentation.
00045 //
00046 // - Prints information when the subscription is created (to ROS_INFO)
00047 //   about the unmapped and mapped topic names.
00048 //
00049 // - Maintains some statistics about the messages (counts, latency, period)
00050 //
00051 // - Implements timeout logic and timeout counting.
00052 //
00053 // This implementation provides two interfaces.  It supports the
00054 // traditional ROS callback interface (though it is deliberately
00055 // limited to callbacks that take a *ConstPtr& type), and an interface
00056 // where the message is simply assigned to *ConstPtr variable that is
00057 // specified at creation.  The second interface replaces the previous
00058 // swri_roscpp::LatchedSubscriber and allows you to avoid writing
00059 // trivial callback functions.
00060 
00061 class Subscriber
00062 {
00063  private:
00064   // The implementation is kept in a separate class to maintain clean
00065   // copy/move semantics with ROS (since the ROS subscriber binds to a
00066   // single address for the callback) and to allow us to hide the
00067   // template arguments so the message type doesn't have to be
00068   // specified when the subscriber is declared.
00069   boost::shared_ptr<SubscriberImpl> impl_;
00070 
00071  public:
00072   Subscriber();
00073 
00074   // Using class method callback.
00075   template<class M , class T >
00076   Subscriber(ros::NodeHandle &nh,
00077              const std::string &topic,
00078              uint32_t queue_size,
00079              void(T::*fp)(const boost::shared_ptr< M const > &),
00080              T *obj,
00081              const ros::TransportHints &transport_hints=ros::TransportHints());
00082 
00083   // Using a boost function callback.
00084   template<class M>
00085   Subscriber(ros::NodeHandle &nh,
00086              const std::string &topic,
00087              uint32_t queue_size,
00088              const boost::function<void(const boost::shared_ptr<M const> &)> &callback,
00089              const ros::TransportHints &transport_hints=ros::TransportHints());
00090 
00091   // This is an alternate interface that stores a received message in
00092   // a variable without calling a user-defined callback function.
00093   // This is useful for cases where you just want to store a message
00094   // for usage later and avoids having to write a trivial callback
00095   // function.
00096   template<class M>
00097   Subscriber(ros::NodeHandle &nh,
00098              const std::string &topic,
00099              boost::shared_ptr< M const > *dest,
00100              const ros::TransportHints &transport_hints=ros::TransportHints());
00101   
00102   Subscriber& operator=(const Subscriber &other);
00103 
00104   // Reset all statistics, including message and timeout counts.
00105   void resetStatistics();
00106 
00107   // Returns the unmapped topic name that was provided when the
00108   // subscriber was created.
00109   const std::string& unmappedTopic() const;
00110   // Returns the fully mapped topic name that the subscriber is
00111   // connected on.
00112   const std::string& mappedTopic() const;
00113   // Returns the number of publishers that are connected to this
00114   // subscriber.
00115   int numPublishers() const;
00116 
00117   // How many messages have been received.
00118   int messageCount() const;
00119 
00120   // Age of the most recent message (difference between now and the
00121   // header stamp (or time message was received for messages that
00122   // don't have headers).
00123   ros::Duration age(const ros::Time &now=ros::TIME_MIN) const;
00124   double ageSeconds(const ros::Time &now=ros::TIME_MIN) const;
00125   double ageMilliseconds(const ros::Time &now=ros::TIME_MIN) const;
00126 
00127   // Average latency (time difference between the time stamp and when
00128   // the message is received). These will be useless for message types
00129   // that do not have a header.
00130   ros::Duration meanLatency() const;
00131   ros::Duration minLatency() const;
00132   ros::Duration maxLatency() const;
00133   double meanLatencyMicroseconds() const;
00134   double minLatencyMicroseconds() const;
00135   double maxLatencyMicroseconds() const;
00136 
00137   // Frequency/Period in terms of when the message was received (not
00138   // the header stamp).
00139   double meanFrequencyHz() const;
00140   ros::Duration meanPeriod() const;
00141   ros::Duration minPeriod() const;
00142   ros::Duration maxPeriod() const;
00143   double meanPeriodMilliseconds() const;
00144   double minPeriodMilliseconds() const;
00145   double maxPeriodMilliseconds() const;
00146 
00147   // Provide a negative value to disable the timeout (default is -1).
00148   void setTimeout(const ros::Duration &time_out);
00149   void setTimeout(const double time_out);
00150   // Read the timeout directly from the parameter server.
00151   void timeoutParam(const ros::NodeHandle &nh,
00152                     const std::string &parameter_name,
00153                     const double default_value);
00154   
00155   // Block/unblock timeouts from occuring.  This allows you to
00156   // temporarily block timeouts (for example, if a message is not
00157   // expected in a particular mode).  Returns the current state
00158   bool blockTimeouts(bool block);
00159 
00160   // Return true if the subscriber is currently blocking timeouts from
00161   // occurring.
00162   bool timeoutsBlocked() const;
00163 
00164   // Determine if the timeout is currently enabled.
00165   bool timeoutEnabled() const;
00166   // Read the current timeout setting.
00167   ros::Duration timeout() const;
00168   double timeoutMilliseconds() const;
00169 
00170   // Determine if the topic is in a timed out state.
00171   bool inTimeout();
00172   // How many times the topic has been in a timeout state.
00173   int timeoutCount();
00174 
00175   // These flags determine which values are added to a diagnostic
00176   // status by the appendDiagnostics method.
00177   enum DIAGNOSTIC_FLAGS {
00178     DIAG_CONNECTION = 1 << 0,  // Include topic names, publisher counts
00179     DIAG_MSG_COUNT  = 1 << 1,  // Include message count
00180     DIAG_TIMEOUT    = 1 << 2,  // Include timeout counts if applicable
00181     DIAG_LATENCY    = 1 << 3,  // Include latency information
00182     DIAG_RATE       = 1 << 4,  // Include rate information
00183 
00184     DIAG_ALL        = ~0,       // Abbreviation to include all information
00185     DIAG_MOST       = DIAG_ALL ^ DIAG_CONNECTION
00186     // Abbreviation to include everything except connection info.
00187   };
00188 
00189   // This is a convenience method to add information about this
00190   // subscription to a diagnostic status in a standard way.
00191   //
00192   // The status is merged with a warning if no messages have been
00193   // received or if timeouts have occurred.  The status is merged with
00194   // an error if the subscription is in an active timeout status.
00195   //
00196   // The flags parameter determines which values are added to the
00197   // status' key/value pairs.  This should be a bitwise combination of
00198   // the values defined in DIAGNOSTIC_FLAGS.
00199   void appendDiagnostics(diagnostic_updater::DiagnosticStatusWrapper &status,
00200                          const std::string &name,
00201                          const int flags);
00202 };  // class Subscriber
00203 
00204 inline
00205 Subscriber::Subscriber()
00206 {
00207   // Setup an empty implementation so that we can assume impl_ is
00208   // non-null and avoid a lot of unnecessary NULL checks.
00209   impl_ = boost::make_shared<SubscriberImpl>();
00210 }
00211 
00212 template<class M , class T >
00213 inline
00214 Subscriber::Subscriber(ros::NodeHandle &nh,
00215                        const std::string &topic,
00216                        uint32_t queue_size,
00217                        void(T::*fp)(const boost::shared_ptr< M const > &),
00218                        T *obj,
00219                        const ros::TransportHints &transport_hints)
00220 {
00221   impl_ = boost::shared_ptr<SubscriberImpl>(
00222     new TypedSubscriberImpl<M,T>(
00223       nh, topic, queue_size, fp, obj, transport_hints));
00224 }
00225 
00226 template<class M>
00227 inline
00228 Subscriber::Subscriber(ros::NodeHandle &nh,
00229                        const std::string &topic,
00230                        uint32_t queue_size,
00231                        const boost::function<void(const boost::shared_ptr<M const> &)> &callback,
00232                        const ros::TransportHints &transport_hints)
00233 {
00234   impl_ = boost::shared_ptr<SubscriberImpl>(
00235     new BindSubscriberImpl<M>(
00236       nh, topic, queue_size, callback, transport_hints));
00237 }
00238 
00239 template<class M>
00240 inline
00241 Subscriber::Subscriber(ros::NodeHandle &nh,
00242                        const std::string &topic,
00243                        boost::shared_ptr< M const > *dest,
00244                        const ros::TransportHints &transport_hints)
00245 {
00246   impl_ = boost::shared_ptr<SubscriberImpl>(
00247     new StorageSubscriberImpl<M>(
00248       nh, topic, dest, transport_hints));
00249 }
00250 
00251 inline
00252 Subscriber& Subscriber::operator=(const Subscriber &other)
00253 {
00254   // If we have some non-default parameters and the other class
00255   // doesn't, we preserve our parameters across assignment.
00256   //
00257   // This is to support the use case where we read ROS parameters, set
00258   // them in the subscriber, and then create the subscriptions.
00259   // Otherwise, the user has to read the timeouts, save them
00260   // temporarily, and then set them after the subscription is set up.
00261   //
00262   // This could cause confusing behavior if you are moving subscribers
00263   // around a lot, but I've never seen that kind of use case in any
00264   // ROS code.
00265 
00266   ros::Duration new_timeout = other.impl_->timeout();
00267   if (impl_->timeoutEnabled() && !other.impl_->timeoutEnabled()) {
00268     new_timeout = impl_->timeout();
00269   }
00270 
00271   impl_ = other.impl_;
00272   impl_->setTimeout(new_timeout);
00273 
00274   return *this;
00275 }
00276 
00277 inline
00278 const std::string& Subscriber::unmappedTopic() const
00279 {
00280   return impl_->unmappedTopic();
00281 }
00282 
00283 inline
00284 const std::string& Subscriber::mappedTopic() const
00285 {
00286   return impl_->mappedTopic();
00287 }
00288 
00289 inline
00290 int Subscriber::numPublishers() const
00291 {
00292   return impl_->numPublishers();
00293 }
00294 
00295 inline
00296 void Subscriber::resetStatistics()
00297 {
00298   impl_->resetStatistics();
00299 }
00300 
00301 inline
00302 int Subscriber::messageCount() const
00303 {
00304   return impl_->messageCount();
00305 }
00306 
00307 inline
00308 ros::Duration Subscriber::age(const ros::Time &now) const
00309 {
00310   if (now == ros::TIME_MIN) {
00311     return impl_->age(ros::Time::now());
00312   } else {
00313     return impl_->age(now);
00314   }
00315 }
00316 
00317 inline
00318 double Subscriber::ageSeconds(const ros::Time &now) const
00319 {
00320   return age(now).toSec();
00321 }
00322 
00323 inline
00324 double Subscriber::ageMilliseconds(const ros::Time &now) const
00325 {
00326   return age(now).toNSec() / 1000000.0;
00327 }
00328 
00329 inline
00330 ros::Duration Subscriber::meanLatency() const
00331 {
00332   return impl_->meanLatency();
00333 }
00334 
00335 inline
00336 ros::Duration Subscriber::minLatency() const
00337 {
00338   return impl_->minLatency();
00339 }
00340 
00341 inline
00342 ros::Duration Subscriber::maxLatency() const
00343 {
00344   return impl_->maxLatency();
00345 }
00346 
00347 inline
00348 double Subscriber::meanLatencyMicroseconds() const
00349 {
00350   return meanLatency().toNSec() / 1000.0;
00351 }
00352 
00353 inline
00354 double Subscriber::minLatencyMicroseconds() const
00355 {
00356   return minLatency().toNSec() / 1000.0;
00357 }
00358 
00359 inline
00360 double Subscriber::maxLatencyMicroseconds() const
00361 {
00362   return maxLatency().toNSec() / 1000.0;
00363 }
00364 
00365 inline
00366 ros::Duration Subscriber::meanPeriod() const
00367 {
00368   return impl_->meanPeriod();
00369 }
00370 
00371 inline
00372 double Subscriber::meanFrequencyHz() const
00373 {
00374   return impl_->meanFrequencyHz();
00375 }
00376 
00377 inline
00378 ros::Duration Subscriber::minPeriod() const
00379 {
00380   return impl_->minPeriod();
00381 }
00382 
00383 inline
00384 ros::Duration Subscriber::maxPeriod() const
00385 {
00386   return impl_->maxPeriod();
00387 }
00388 
00389 inline
00390 double Subscriber::meanPeriodMilliseconds() const
00391 {
00392   return meanPeriod().toNSec() / 1000000.0;
00393 }
00394 
00395 inline
00396 double Subscriber::minPeriodMilliseconds() const
00397 {
00398   return minPeriod().toNSec() / 1000000.0;
00399 }
00400 
00401 inline
00402 double Subscriber::maxPeriodMilliseconds() const
00403 {
00404   return maxPeriod().toNSec() / 1000000.0;
00405 }
00406 
00407 inline
00408 void Subscriber::setTimeout(const ros::Duration &time_out)
00409 {
00410   impl_->setTimeout(time_out);
00411 }
00412 
00413 inline
00414 void Subscriber::setTimeout(const double time_out)
00415 {
00416   setTimeout(ros::Duration(time_out));
00417 }
00418 
00419 inline
00420 void Subscriber::timeoutParam(
00421   const ros::NodeHandle &nh,
00422   const std::string &parameter_name,
00423   const double default_value)
00424 {
00425   double timeout;
00426   swri::param(nh, parameter_name, timeout, default_value);
00427   setTimeout(timeout);
00428 }
00429 
00430 inline
00431 bool Subscriber::blockTimeouts(bool block)
00432 {
00433   return impl_->blockTimeouts(block);
00434 }
00435 
00436 inline
00437 bool Subscriber::timeoutsBlocked() const
00438 {
00439   return impl_->timeoutsBlocked();
00440 }
00441 
00442 inline
00443 ros::Duration Subscriber::timeout() const
00444 {
00445   return impl_->timeout();
00446 }
00447 
00448 inline
00449 bool Subscriber::timeoutEnabled() const
00450 {
00451   return impl_->timeoutEnabled();
00452 }
00453 
00454 inline
00455 double Subscriber::timeoutMilliseconds() const
00456 {
00457   return impl_->timeout().toNSec() / 1.0e6;
00458 }
00459 
00460 inline
00461 bool Subscriber::inTimeout()
00462 {
00463   return impl_->inTimeout();
00464 }
00465 
00466 inline
00467 int Subscriber::timeoutCount()
00468 {
00469   return impl_->timeoutCount();
00470 }
00471 
00472 inline
00473 void Subscriber::appendDiagnostics(diagnostic_updater::DiagnosticStatusWrapper &status,
00474                                    const std::string &name,
00475                                    int flags)
00476 {
00477   // Alias a type for easier access to DiagnosticStatus enumerations.
00478   typedef diagnostic_msgs::DiagnosticStatus DS;
00479 
00480   // We are considering no messages seen as a warning because this is
00481   // a normal condition during start up, but should be resolved once
00482   // the system is running.  On the other hand, timeouts are typically
00483   // an unexpected condition that occur when a publisher dies or
00484   // communications is getting blocked.  Active timeouts affect
00485   // operation so are treated as an error condition.  If a timeout is
00486   // not active, but has occurred, we report this as a warning so that
00487   // we know something has been wrong and needs to be investigated.
00488 
00489   if (!messageCount()) {
00490     status.mergeSummaryf(DS::WARN, "Waiting for %s messages.", name.c_str());
00491   } else if (inTimeout()) {
00492     status.mergeSummaryf(DS::ERROR, "%s timeout.", name.c_str());
00493   } else if (timeoutCount()) {
00494     status.mergeSummaryf(DS::WARN, "%s timeouts have occurred.", name.c_str());
00495   }
00496 
00497   if (flags & DIAG_CONNECTION) {
00498     if (mappedTopic() == unmappedTopic()) {
00499       status.addf(name + "topic name", "%s", mappedTopic().c_str());
00500     } else {
00501       status.addf(name + "topic name", "%s -> %s",
00502                   unmappedTopic().c_str(),
00503                   mappedTopic().c_str());
00504     }
00505     status.addf(name + " publishers", "%d", numPublishers());
00506   }
00507 
00508   if (flags & DIAG_MSG_COUNT) {
00509     status.add(name + " message count", messageCount());
00510   }
00511 
00512   if ((flags & DIAG_TIMEOUT) && timeoutEnabled()) {
00513     status.addf(name + " timeouts count",
00514                 "%d ( > %.2f ms)",
00515                 timeoutCount(),
00516                 timeoutMilliseconds());
00517   }
00518 
00519   if (flags & DIAG_LATENCY) {
00520     if (messageCount() < 1) {
00521       status.add(name + " latency",
00522                  "min: N/A us, mean: N/A us, max: N/A us");
00523     } else {
00524       status.addf(name + " latency",
00525                   "min: %.2f us, mean: %.2f us, max: %.2f us",
00526                   minLatencyMicroseconds(),
00527                   meanLatencyMicroseconds(),
00528                   maxLatencyMicroseconds());
00529     }
00530   }
00531 
00532   if (flags & DIAG_RATE) {
00533     if (messageCount() < 2) {
00534     status.add(
00535       name + " rates",
00536       "min period: N/A ms, mean frequency: N/A hz, max period: N/A ms");
00537     } else {
00538       status.addf(
00539         name + " rates",
00540         "min period: %.2f ms, mean frequency: %.2f hz, max period: %.2f ms",
00541         minPeriodMilliseconds(),
00542         meanFrequencyHz(),
00543         maxPeriodMilliseconds());
00544     }
00545   }
00546 }
00547 }  // namespace swri
00548 #endif  // SWRI_ROSCPP_SUBSCRIBER_H_


swri_roscpp
Author(s):
autogenerated on Tue Oct 3 2017 03:19:27