subscriber_impl.h
Go to the documentation of this file.
00001 // *****************************************************************************
00002 //
00003 // Copyright (c) 2015, Southwest Research Institute® (SwRI®)
00004 // All rights reserved.
00005 //
00006 // Redistribution and use in source and binary forms, with or without
00007 // modification, are permitted provided that the following conditions are met:
00008 //     * Redistributions of source code must retain the above copyright
00009 //       notice, this list of conditions and the following disclaimer.
00010 //     * Redistributions in binary form must reproduce the above copyright
00011 //       notice, this list of conditions and the following disclaimer in the
00012 //       documentation and/or other materials provided with the distribution.
00013 //     * Neither the name of Southwest Research Institute® (SwRI®) nor the
00014 //       names of its contributors may be used to endorse or promote products
00015 //       derived from this software without specific prior written permission.
00016 //
00017 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020 // ARE DISCLAIMED. IN NO EVENT SHALL SOUTHWEST RESEARCH INSTITUTE BE LIABLE FOR ANY
00021 // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
00022 // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00023 // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
00024 // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00025 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00026 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00027 //
00028 // *****************************************************************************
00029 #ifndef SWRI_ROSCPP_SUBSCRIBER_IMPL_H_
00030 #define SWRI_ROSCPP_SUBSCRIBER_IMPL_H_
00031 
00032 #include <std_msgs/Header.h>
00033 #include <ros/subscriber.h>
00034 
00035 namespace swri
00036 {
00037 class Subscriber;
00038 class SubscriberImpl
00039 {
00040  protected:
00041   ros::Subscriber sub_;
00042   std::string unmapped_topic_;
00043   std::string mapped_topic_;
00044 
00045   int message_count_;
00046 
00047   ros::Time last_header_stamp_;
00048   ros::Time last_receive_time_;
00049 
00050   ros::Duration total_latency_;
00051   ros::Duration min_latency_;
00052   ros::Duration max_latency_;
00053 
00054   ros::Duration total_periods_;
00055   ros::Duration min_period_;
00056   ros::Duration max_period_;
00057 
00058   ros::Duration timeout_;
00059   bool in_timeout_;
00060   int timeout_count_;
00061   bool blocking_timeout_;
00062 
00063   void processHeader(const ros::Time &stamp)
00064   {
00065     ros::Time now = ros::Time::now();
00066 
00067     // Check for timeouts so that we can correctly increment the
00068     // timeout count.
00069     checkTimeout(now);
00070 
00071     message_count_++;
00072 
00073     ros::Duration latency = now - stamp;
00074     if (message_count_ == 1) {
00075       min_latency_ = latency;
00076       max_latency_ = latency;
00077       total_latency_ = latency;
00078     } else {
00079       min_latency_ = std::min(min_latency_, latency);
00080       max_latency_ = std::max(max_latency_, latency);
00081       total_latency_ += latency;
00082     }
00083 
00084     if (message_count_ > 1) {
00085       ros::Duration period = now - last_receive_time_;
00086       if (message_count_ == 2) {
00087         min_period_ = period;
00088         max_period_ = period;
00089         total_periods_ = period;
00090       } else if (message_count_ > 2) {
00091         min_period_ = std::min(min_period_, period);
00092         max_period_ = std::max(max_period_, period);
00093         total_periods_ += period;
00094       }
00095     }
00096 
00097     // Reset the timeout condition to false.
00098     in_timeout_ = false;
00099 
00100     last_receive_time_ = now;
00101     last_header_stamp_ = stamp;
00102   }
00103 
00104   void checkTimeout(const ros::Time &now)
00105   {
00106     if (blocking_timeout_) {
00107       return;
00108     }
00109 
00110     if (in_timeout_ || timeout_ <= ros::Duration(0.0)) {
00111       return;
00112     }
00113 
00114     if (message_count_ == 0) {
00115       return;
00116     }
00117 
00118     if (age(now) > timeout_) {
00119       in_timeout_ = true;
00120       timeout_count_++;
00121     }
00122   }
00123 
00124 
00125  public:
00126   SubscriberImpl()
00127   {
00128     unmapped_topic_ = "N/A";
00129     mapped_topic_ = "N/A";
00130     timeout_ = ros::Duration(-1.0);
00131     blocking_timeout_ = false;
00132     resetStatistics();
00133   }
00134 
00135   const std::string& mappedTopic() const
00136   {
00137     return mapped_topic_;
00138   }
00139 
00140   const std::string& unmappedTopic() const
00141   {
00142     return unmapped_topic_;
00143   }
00144 
00145   int numPublishers() const
00146   {
00147     return sub_.getNumPublishers();
00148   }
00149 
00150   void resetStatistics()
00151   {
00152     message_count_ = 0;
00153     in_timeout_ = false;
00154     timeout_count_ = 0;
00155   }
00156 
00157   int messageCount() const
00158   {
00159     return message_count_;
00160   }
00161 
00162   ros::Duration age(const ros::Time &now) const
00163   {
00164     if (message_count_ < 1) {
00165       return ros::DURATION_MAX;
00166     } else {
00167       return now - last_header_stamp_;
00168     }
00169   }
00170 
00171   ros::Duration meanLatency() const
00172   {
00173     if (message_count_ < 1) {
00174       return ros::DURATION_MAX;
00175     } else {
00176       return ros::Duration(total_latency_.toSec() / message_count_);
00177     }
00178   }
00179 
00180   ros::Duration minLatency() const
00181   {
00182     if (message_count_ < 1) {
00183       return ros::DURATION_MAX;
00184     } else {
00185       return min_latency_;
00186     }
00187   }
00188 
00189   ros::Duration maxLatency() const
00190   {
00191     if (message_count_ < 1) {
00192       return ros::DURATION_MAX;
00193     } else {
00194       return max_latency_;
00195     }
00196   }
00197 
00198   double meanFrequencyHz() const
00199   {
00200     if (message_count_ < 2) {
00201       return 0.0;
00202     } else {
00203       return 1e9 / meanPeriod().toNSec();
00204     }
00205   }
00206 
00207   ros::Duration meanPeriod() const
00208   {
00209     if (message_count_ < 2) {
00210       return ros::DURATION_MAX;
00211     } else {
00212       return ros::Duration(total_periods_.toSec() / (message_count_ - 1));
00213     }
00214   }
00215 
00216   ros::Duration minPeriod() const
00217   {
00218     if (message_count_ < 2) {
00219       return ros::DURATION_MAX;
00220     } else {
00221       return min_period_;
00222     }
00223   }
00224 
00225   ros::Duration maxPeriod() const
00226   {
00227     if (message_count_ < 2) {
00228       return ros::DURATION_MAX;
00229     } else {
00230       return max_period_;
00231     }
00232   }
00233 
00234   void setTimeout(const ros::Duration &time_out)
00235   {
00236     timeout_ = time_out;
00237     in_timeout_ = false;
00238     timeout_count_ = 0;
00239   }
00240 
00241   bool blockTimeouts(bool block) {
00242     if (block) {
00243       in_timeout_ = false;
00244     }
00245 
00246     bool old_block = blocking_timeout_;
00247     blocking_timeout_ = block;
00248     return old_block;
00249   }
00250 
00251   bool timeoutsBlocked() const {
00252     return blocking_timeout_;
00253   }
00254 
00255   ros::Duration timeout() const
00256   {
00257     return timeout_;
00258   }
00259 
00260   bool timeoutEnabled() const
00261   {
00262     return timeout_ > ros::Duration(0.0);
00263   }
00264 
00265   bool inTimeout()
00266   {
00267     checkTimeout(ros::Time::now());
00268     return in_timeout_;
00269   }
00270 
00271   int timeoutCount()
00272   {
00273     checkTimeout(ros::Time::now());
00274     return timeout_count_;
00275   }
00276 };  // class SubscriberImpl
00277 
00278 struct TrueType
00279 {
00280   static const bool value = true;
00281 };
00282 
00283 template<class M , class T>
00284 class TypedSubscriberImpl : public SubscriberImpl
00285 {
00286   T *obj_;
00287   void (T::*callback_)(const boost::shared_ptr< M const > &);
00288 
00289  public:
00290   TypedSubscriberImpl(
00291     ros::NodeHandle &nh,
00292     const std::string &topic,
00293     uint32_t queue_size,
00294     void(T::*fp)(const boost::shared_ptr< M const > &),
00295     T *obj,
00296     const ros::TransportHints &transport_hints)
00297   {
00298     unmapped_topic_ = topic;
00299     mapped_topic_ = nh.resolveName(topic);
00300 
00301     if (unmapped_topic_ == mapped_topic_) {
00302       ROS_INFO("Subscribing to '%s'.", mapped_topic_.c_str());
00303     } else {
00304       ROS_INFO("Subscribing to '%s' at '%s'.",
00305                unmapped_topic_.c_str(),
00306                mapped_topic_.c_str());
00307     }
00308 
00309     callback_ = fp;
00310     obj_ = obj;
00311 
00312     sub_ = nh.subscribe(mapped_topic_, queue_size,
00313                         &TypedSubscriberImpl::handleMessage<M>,
00314                         this,
00315                         transport_hints);
00316   }
00317 
00318   // Handler for messages with headers
00319   template <class M2>
00320   typename boost::enable_if< ros::message_traits::HasHeader<M2>, void>::type
00321   handleMessage(const boost::shared_ptr< M const> &msg)
00322   {
00323     processHeader(msg->header.stamp);
00324     (obj_->*callback_)(msg);
00325   }
00326 
00327   // Handler for messages without headers
00328   template <class M2>
00329   typename boost::disable_if< ros::message_traits::HasHeader<M2>, void>::type
00330   handleMessage(const boost::shared_ptr< M const> &msg)
00331   {
00332     processHeader(ros::Time::now());
00333     (obj_->*callback_)(msg);
00334   }
00335 };  // class TypedSubscriberImpl
00336 
00337 template<class M>
00338 class BindSubscriberImpl : public SubscriberImpl
00339 {
00340   boost::function<void(const boost::shared_ptr< M const> &)> callback_;
00341   
00342 
00343  public:
00344   BindSubscriberImpl(
00345     ros::NodeHandle &nh,
00346     const std::string &topic,
00347     uint32_t queue_size,
00348     const boost::function<void(const boost::shared_ptr< M const> &)> &callback,
00349     const ros::TransportHints &transport_hints)
00350   {
00351     unmapped_topic_ = topic;
00352     mapped_topic_ = nh.resolveName(topic);
00353 
00354     if (unmapped_topic_ == mapped_topic_) {
00355       ROS_INFO("Subscribing to '%s'.", mapped_topic_.c_str());
00356     } else {
00357       ROS_INFO("Subscribing to '%s' at '%s'.",
00358                unmapped_topic_.c_str(),
00359                mapped_topic_.c_str());
00360     }
00361 
00362     callback_ = callback;
00363 
00364     sub_ = nh.subscribe(mapped_topic_, queue_size,
00365                         &BindSubscriberImpl::handleMessage<M>,
00366                         this,
00367                         transport_hints);
00368   }
00369 
00370   // Handler for messages with headers
00371   template <class M2>
00372   typename boost::enable_if< ros::message_traits::HasHeader<M2>, void>::type
00373   handleMessage(const boost::shared_ptr< M const> &msg)
00374   {
00375     processHeader(msg->header.stamp);
00376     callback_(msg);
00377   }
00378 
00379   // Handler for messages without headers
00380   template <class M2>
00381   typename boost::disable_if< ros::message_traits::HasHeader<M2>, void>::type
00382   handleMessage(const boost::shared_ptr< M const> &msg)
00383   {
00384     processHeader(ros::Time::now());
00385     callback_(msg);
00386   }
00387 };  // class BindSubscriberImpl
00388 
00389 template<class M>
00390 class StorageSubscriberImpl : public SubscriberImpl
00391 {
00392   boost::shared_ptr< M const > *dest_;
00393 
00394  public:
00395   StorageSubscriberImpl(
00396     ros::NodeHandle &nh,
00397     const std::string &topic,
00398     boost::shared_ptr< M const > *dest,
00399     const ros::TransportHints &transport_hints)
00400   {
00401     unmapped_topic_ = topic;
00402     mapped_topic_ = nh.resolveName(topic);
00403 
00404     if (unmapped_topic_ == mapped_topic_) {
00405       ROS_INFO("Subscribing to '%s'.", mapped_topic_.c_str());
00406     } else {
00407       ROS_INFO("Subscribing to '%s' at '%s'.",
00408                unmapped_topic_.c_str(),
00409                mapped_topic_.c_str());
00410     }
00411 
00412     dest_ = dest;
00413 
00414     sub_ = nh.subscribe(mapped_topic_, 2,
00415                         &StorageSubscriberImpl::handleMessage<M>,
00416                         this,
00417                         transport_hints);
00418   }
00419 
00420   // Handler for messages with headers
00421   template <class M2>
00422   typename boost::enable_if< ros::message_traits::HasHeader<M2>, void>::type
00423   handleMessage(const boost::shared_ptr< M const> &msg)
00424   {
00425     processHeader(msg->header.stamp);
00426     *dest_ = msg;
00427   }
00428 
00429   // Handler for messages without headers
00430   template <class M2>
00431   typename boost::disable_if< ros::message_traits::HasHeader<M2>, void>::type
00432   handleMessage(const boost::shared_ptr< M const> &msg)
00433   {
00434     processHeader(ros::Time::now());
00435     *dest_ = msg;
00436   }
00437 };  // class StorageSubscriberImpl
00438 }  // namespace swri
00439 #endif  // SWRI_ROSCPP_SUBSCRIBER_IMPL_H_


swri_roscpp
Author(s):
autogenerated on Tue Oct 3 2017 03:19:27