subscriber_impl.h
Go to the documentation of this file.
00001 // *****************************************************************************
00002 //
00003 // Copyright (c) 2015, Southwest Research Institute® (SwRI®)
00004 // All rights reserved.
00005 //
00006 // Redistribution and use in source and binary forms, with or without
00007 // modification, are permitted provided that the following conditions are met:
00008 //     * Redistributions of source code must retain the above copyright
00009 //       notice, this list of conditions and the following disclaimer.
00010 //     * Redistributions in binary form must reproduce the above copyright
00011 //       notice, this list of conditions and the following disclaimer in the
00012 //       documentation and/or other materials provided with the distribution.
00013 //     * Neither the name of Southwest Research Institute® (SwRI®) nor the
00014 //       names of its contributors may be used to endorse or promote products
00015 //       derived from this software without specific prior written permission.
00016 //
00017 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020 // ARE DISCLAIMED. IN NO EVENT SHALL SOUTHWEST RESEARCH INSTITUTE BE LIABLE FOR ANY
00021 // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
00022 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00023 // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
00024 // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00025 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00026 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00027 //
00028 // *****************************************************************************
00029 #ifndef SWRI_ROSCPP_SUBSCRIBER_IMPL_H_
00030 #define SWRI_ROSCPP_SUBSCRIBER_IMPL_H_
00031 
00032 #include <std_msgs/Header.h>
00033 #include <ros/subscriber.h>
00034 
00035 namespace swri
00036 {
00037 class Subscriber;
00038 class SubscriberImpl
00039 {
00040  protected:
00041   ros::Subscriber sub_;
00042   std::string unmapped_topic_;
00043   std::string mapped_topic_;
00044 
00045   int message_count_;
00046 
00047   ros::Time last_header_stamp_;
00048   ros::Time last_receive_time_;
00049 
00050   ros::Duration total_latency_;
00051   ros::Duration min_latency_;
00052   ros::Duration max_latency_;
00053 
00054   ros::Duration total_periods_;
00055   ros::Duration min_period_;
00056   ros::Duration max_period_;
00057 
00058   ros::Duration timeout_;
00059   bool in_timeout_;
00060   int timeout_count_;
00061   bool blocking_timeout_;
00062 
00063   void processHeader(const ros::Time &stamp)
00064   {
00065     ros::Time now = ros::Time::now();
00066 
00067     // Check for timeouts so that we can correctly increment the
00068     // timeout count.
00069     checkTimeout(now);
00070 
00071     // Do not use this message for statistics if it is arriving out of order
00072     if (stamp >= last_header_stamp_) {
00073       message_count_++;
00074 
00075       if (!stamp.isZero()) {
00076         ros::Duration latency = now - stamp;
00077         if (message_count_ == 1) {
00078           min_latency_ = latency;
00079           max_latency_ = latency;
00080           total_latency_ = latency;
00081         } else {
00082           min_latency_ = std::min(min_latency_, latency);
00083           max_latency_ = std::max(max_latency_, latency);
00084           total_latency_ += latency;
00085         }
00086       }
00087 
00088       if (message_count_ > 1) {
00089         ros::Duration period = now - last_receive_time_;
00090         if (message_count_ == 2) {
00091           min_period_ = period;
00092           max_period_ = period;
00093           total_periods_ = period;
00094         } else if (message_count_ > 2) {
00095           min_period_ = std::min(min_period_, period);
00096           max_period_ = std::max(max_period_, period);
00097           total_periods_ += period;
00098         }
00099       }
00100     }
00101     // Reset the timeout condition to false.
00102     in_timeout_ = false;
00103 
00104     last_receive_time_ = now;
00105     last_header_stamp_ = stamp;
00106   }
00107 
00108   void checkTimeout(const ros::Time &now)
00109   {
00110     if (blocking_timeout_) {
00111       return;
00112     }
00113 
00114     if (in_timeout_ || timeout_ <= ros::Duration(0.0)) {
00115       return;
00116     }
00117 
00118     if (message_count_ == 0) {
00119       return;
00120     }
00121 
00122     if (age(now) > timeout_) {
00123       in_timeout_ = true;
00124       timeout_count_++;
00125     }
00126   }
00127 
00128 
00129  public:
00130   SubscriberImpl() :
00131     unmapped_topic_("N/A"),
00132     mapped_topic_("N/A"),
00133     message_count_(0),
00134     timeout_(-1.0),
00135     in_timeout_(false),
00136     timeout_count_(0),
00137     blocking_timeout_(false)
00138   {
00139     resetStatistics();
00140   }
00141 
00142   const std::string& mappedTopic() const
00143   {
00144     return mapped_topic_;
00145   }
00146 
00147   const std::string& unmappedTopic() const
00148   {
00149     return unmapped_topic_;
00150   }
00151 
00152   int numPublishers() const
00153   {
00154     return sub_.getNumPublishers();
00155   }
00156 
00157   void resetStatistics()
00158   {
00159     message_count_ = 0;
00160     in_timeout_ = false;
00161     timeout_count_ = 0;
00162   }
00163 
00164   int messageCount() const
00165   {
00166     return message_count_;
00167   }
00168 
00169   ros::Duration age(const ros::Time &now) const
00170   {
00171     if (message_count_ < 1) {
00172       return ros::DURATION_MAX;
00173     } else if (last_header_stamp_.isValid()) {
00174       return now - last_header_stamp_;
00175     } else {
00176       // If we've received messages but they don't have valid stamps, we can't
00177       // actually determine the age, so just return an empty duration.
00178       return ros::Duration(0.0);
00179     }
00180   }
00181 
00182   ros::Duration meanLatency() const
00183   {
00184     if (message_count_ < 1) {
00185       return ros::DURATION_MAX;
00186     } else {
00187       return ros::Duration(total_latency_.toSec() / message_count_);
00188     }
00189   }
00190 
00191   ros::Duration minLatency() const
00192   {
00193     if (message_count_ < 1) {
00194       return ros::DURATION_MAX;
00195     } else {
00196       return min_latency_;
00197     }
00198   }
00199 
00200   ros::Duration maxLatency() const
00201   {
00202     if (message_count_ < 1) {
00203       return ros::DURATION_MAX;
00204     } else {
00205       return max_latency_;
00206     }
00207   }
00208 
00209   double meanFrequencyHz() const
00210   {
00211     if (message_count_ < 2) {
00212       return 0.0;
00213     } else {
00214       return 1e9 / meanPeriod().toNSec();
00215     }
00216   }
00217 
00218   ros::Duration meanPeriod() const
00219   {
00220     if (message_count_ < 2) {
00221       return ros::DURATION_MAX;
00222     } else {
00223       return ros::Duration(total_periods_.toSec() / (message_count_ - 1));
00224     }
00225   }
00226 
00227   ros::Duration minPeriod() const
00228   {
00229     if (message_count_ < 2) {
00230       return ros::DURATION_MAX;
00231     } else {
00232       return min_period_;
00233     }
00234   }
00235 
00236   ros::Duration maxPeriod() const
00237   {
00238     if (message_count_ < 2) {
00239       return ros::DURATION_MAX;
00240     } else {
00241       return max_period_;
00242     }
00243   }
00244 
00245   void setTimeout(const ros::Duration &time_out)
00246   {
00247     timeout_ = time_out;
00248     in_timeout_ = false;
00249     timeout_count_ = 0;
00250   }
00251 
00252   bool blockTimeouts(bool block) {
00253     if (block) {
00254       in_timeout_ = false;
00255     }
00256 
00257     bool old_block = blocking_timeout_;
00258     blocking_timeout_ = block;
00259     return old_block;
00260   }
00261 
00262   bool timeoutsBlocked() const {
00263     return blocking_timeout_;
00264   }
00265 
00266   ros::Duration timeout() const
00267   {
00268     return timeout_;
00269   }
00270 
00271   bool timeoutEnabled() const
00272   {
00273     return timeout_ > ros::Duration(0.0);
00274   }
00275 
00276   bool inTimeout()
00277   {
00278     checkTimeout(ros::Time::now());
00279     return in_timeout_;
00280   }
00281 
00282   int timeoutCount()
00283   {
00284     checkTimeout(ros::Time::now());
00285     return timeout_count_;
00286   }
00287 };  // class SubscriberImpl
00288 
00289 struct TrueType
00290 {
00291   static const bool value = true;
00292 };
00293 
00294 template<class M , class T>
00295 class TypedSubscriberImpl : public SubscriberImpl
00296 {
00297   T *obj_;
00298   void (T::*callback_)(const boost::shared_ptr< M const > &);
00299 
00300  public:
00301   TypedSubscriberImpl(
00302     ros::NodeHandle &nh,
00303     const std::string &topic,
00304     uint32_t queue_size,
00305     void(T::*fp)(const boost::shared_ptr< M const > &),
00306     T *obj,
00307     const ros::TransportHints &transport_hints)
00308   {
00309     unmapped_topic_ = topic;
00310     mapped_topic_ = nh.resolveName(topic);
00311 
00312     if (unmapped_topic_ == mapped_topic_) {
00313       ROS_INFO("Subscribing to '%s'.", mapped_topic_.c_str());
00314     } else {
00315       ROS_INFO("Subscribing to '%s' at '%s'.",
00316                unmapped_topic_.c_str(),
00317                mapped_topic_.c_str());
00318     }
00319 
00320     callback_ = fp;
00321     obj_ = obj;
00322 
00323     sub_ = nh.subscribe(mapped_topic_, queue_size,
00324                         &TypedSubscriberImpl::handleMessage<M>,
00325                         this,
00326                         transport_hints);
00327   }
00328 
00329   // Handler for messages with headers
00330   template <class M2>
00331   typename boost::enable_if< ros::message_traits::HasHeader<M2>, void>::type
00332   handleMessage(const boost::shared_ptr< M const> &msg)
00333   {
00334     processHeader(msg->header.stamp);
00335     (obj_->*callback_)(msg);
00336   }
00337 
00338   // Handler for messages without headers
00339   template <class M2>
00340   typename boost::disable_if< ros::message_traits::HasHeader<M2>, void>::type
00341   handleMessage(const boost::shared_ptr< M const> &msg)
00342   {
00343     processHeader(ros::Time::now());
00344     (obj_->*callback_)(msg);
00345   }
00346 };  // class TypedSubscriberImpl
00347 
00348 template<class M>
00349 class BindSubscriberImpl : public SubscriberImpl
00350 {
00351   boost::function<void(const boost::shared_ptr< M const> &)> callback_;
00352   
00353 
00354  public:
00355   BindSubscriberImpl(
00356     ros::NodeHandle &nh,
00357     const std::string &topic,
00358     uint32_t queue_size,
00359     const boost::function<void(const boost::shared_ptr< M const> &)> &callback,
00360     const ros::TransportHints &transport_hints)
00361   {
00362     unmapped_topic_ = topic;
00363     mapped_topic_ = nh.resolveName(topic);
00364 
00365     if (unmapped_topic_ == mapped_topic_) {
00366       ROS_INFO("Subscribing to '%s'.", mapped_topic_.c_str());
00367     } else {
00368       ROS_INFO("Subscribing to '%s' at '%s'.",
00369                unmapped_topic_.c_str(),
00370                mapped_topic_.c_str());
00371     }
00372 
00373     callback_ = callback;
00374 
00375     sub_ = nh.subscribe(mapped_topic_, queue_size,
00376                         &BindSubscriberImpl::handleMessage<M>,
00377                         this,
00378                         transport_hints);
00379   }
00380 
00381   // Handler for messages with headers
00382   template <class M2>
00383   typename boost::enable_if< ros::message_traits::HasHeader<M2>, void>::type
00384   handleMessage(const boost::shared_ptr< M const> &msg)
00385   {
00386     processHeader(msg->header.stamp);
00387     callback_(msg);
00388   }
00389 
00390   // Handler for messages without headers
00391   template <class M2>
00392   typename boost::disable_if< ros::message_traits::HasHeader<M2>, void>::type
00393   handleMessage(const boost::shared_ptr< M const> &msg)
00394   {
00395     processHeader(ros::Time::now());
00396     callback_(msg);
00397   }
00398 };  // class BindSubscriberImpl
00399 
00400 template<class M>
00401 class StorageSubscriberImpl : public SubscriberImpl
00402 {
00403   boost::shared_ptr< M const > *dest_;
00404 
00405  public:
00406   StorageSubscriberImpl(
00407     ros::NodeHandle &nh,
00408     const std::string &topic,
00409     boost::shared_ptr< M const > *dest,
00410     const ros::TransportHints &transport_hints)
00411   {
00412     unmapped_topic_ = topic;
00413     mapped_topic_ = nh.resolveName(topic);
00414 
00415     if (unmapped_topic_ == mapped_topic_) {
00416       ROS_INFO("Subscribing to '%s'.", mapped_topic_.c_str());
00417     } else {
00418       ROS_INFO("Subscribing to '%s' at '%s'.",
00419                unmapped_topic_.c_str(),
00420                mapped_topic_.c_str());
00421     }
00422 
00423     dest_ = dest;
00424 
00425     sub_ = nh.subscribe(mapped_topic_, 2,
00426                         &StorageSubscriberImpl::handleMessage<M>,
00427                         this,
00428                         transport_hints);
00429   }
00430 
00431   // Handler for messages with headers
00432   template <class M2>
00433   typename boost::enable_if< ros::message_traits::HasHeader<M2>, void>::type
00434   handleMessage(const boost::shared_ptr< M const> &msg)
00435   {
00436     processHeader(msg->header.stamp);
00437     *dest_ = msg;
00438   }
00439 
00440   // Handler for messages without headers
00441   template <class M2>
00442   typename boost::disable_if< ros::message_traits::HasHeader<M2>, void>::type
00443   handleMessage(const boost::shared_ptr< M const> &msg)
00444   {
00445     processHeader(ros::Time::now());
00446     *dest_ = msg;
00447   }
00448 };  // class StorageSubscriberImpl
00449 }  // namespace swri
00450 #endif  // SWRI_ROSCPP_SUBSCRIBER_IMPL_H_


swri_roscpp
Author(s):
autogenerated on Thu Jun 6 2019 20:34:47