Program Listing for File subscriber_impl.h

Return to documentation for file (/tmp/ws/src/marti_common/swri_roscpp/include/swri_roscpp/subscriber_impl.h)

// *****************************************************************************
//
// Copyright (c) 2015, Southwest Research Institute® (SwRI®)
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//     * Redistributions of source code must retain the above copyright
//       notice, this list of conditions and the following disclaimer.
//     * Redistributions in binary form must reproduce the above copyright
//       notice, this list of conditions and the following disclaimer in the
//       documentation and/or other materials provided with the distribution.
//     * Neither the name of Southwest Research Institute® (SwRI®) nor the
//       names of its contributors may be used to endorse or promote products
//       derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL SOUTHWEST RESEARCH INSTITUTE BE LIABLE FOR ANY
// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
// *****************************************************************************
#ifndef SWRI_ROSCPP_SUBSCRIBER_IMPL_H_
#define SWRI_ROSCPP_SUBSCRIBER_IMPL_H_

#include <std_msgs/msg/header.hpp>
#include <rclcpp/rclcpp.hpp>

#include <boost/utility/enable_if.hpp>

namespace swri
{

  class Subscriber;
  class SubscriberImpl
  {
  public:
    rclcpp::Node* nh_;

  protected:
    rclcpp::SubscriptionBase::SharedPtr sub_;
    std::string unmapped_topic_;

    int message_count_;

    rclcpp::Time last_header_stamp_ = rclcpp::Time(0, 0, RCL_ROS_TIME);
    rclcpp::Time last_receive_time_ = rclcpp::Time(0, 0, RCL_ROS_TIME);

    rclcpp::Duration total_latency_ = rclcpp::Duration(0);
    rclcpp::Duration min_latency_ = rclcpp::Duration::max();
    rclcpp::Duration max_latency_ = rclcpp::Duration(0);

    rclcpp::Duration total_periods_ = rclcpp::Duration::max();
    rclcpp::Duration min_period_ = rclcpp::Duration::max();
    rclcpp::Duration max_period_ = rclcpp::Duration(0);

    rclcpp::Duration timeout_ = rclcpp::Duration(0, 0);
    bool in_timeout_;
    int timeout_count_;
    bool blocking_timeout_;

    void processHeader(const rclcpp::Time &stamp)
    {
      rclcpp::Time now = nh_->now();

      // Check for timeouts so that we can correctly increment the
      // timeout count.
      checkTimeout(now);

      message_count_++;

      if (stamp.nanoseconds() != 0) {
        rclcpp::Duration latency = now - stamp;
        if (message_count_ == 1) {
          min_latency_ = latency;
          max_latency_ = latency;
          total_latency_ = latency;
        } else {
          min_latency_ = std::min(min_latency_, latency);
          max_latency_ = std::max(max_latency_, latency);
          total_latency_ = total_latency_ + latency;
        }
      }

      if (message_count_ > 1) {
        rclcpp::Duration period = now - last_receive_time_;
        if (message_count_ == 2) {
          min_period_ = period;
          max_period_ = period;
          total_periods_ = period;
        } else if (message_count_ > 2) {
          min_period_ = std::min(min_period_, period);
          max_period_ = std::max(max_period_, period);
          total_periods_ = total_periods_ + period;
        }
      }

      // Reset the timeout condition to false.
      in_timeout_ = false;

      last_receive_time_ = now;
      last_header_stamp_ = stamp;
    }

    void checkTimeout(const rclcpp::Time &now)
    {
      if (blocking_timeout_) {
        return;
      }

      if (in_timeout_ || timeout_ <= rclcpp::Duration(0, 0)) {
        return;
      }

      if (message_count_ == 0) {
        return;
      }

      if (age(now) > timeout_) {
        in_timeout_ = true;
        timeout_count_++;
      }
    }


  public:
    SubscriberImpl()
    {
      unmapped_topic_ = "N/A";
      timeout_ = rclcpp::Duration(-1,0);
      blocking_timeout_ = false;
      resetStatistics();
    }

    /*const std::string& mappedTopic() const
    {
      return mapped_topic_;
    }*/

    const std::string& unmappedTopic() const
    {
      return unmapped_topic_;
    }

    int numPublishers() const
    {
      return 0;//sub_.getNumPublishers();
    }

    void resetStatistics()
    {
      message_count_ = 0;
      in_timeout_ = false;
      timeout_count_ = 0;
    }

    int messageCount() const
    {
      return message_count_;
    }

    rclcpp::Duration age(const rclcpp::Time &now) const
    {
      if (message_count_ < 1) {
        return rclcpp::Duration::max();
      } else if (now == rclcpp::Time(0,0,RCL_ROS_TIME)) {
        return nh_->now() - last_header_stamp_;
      } else {
        return now - last_header_stamp_;
      }
    }

    rclcpp::Duration meanLatency() const
    {
      if (message_count_ < 1) {
        return rclcpp::Duration::max();
      } else {
        return rclcpp::Duration(total_latency_.seconds() / message_count_);
      }
    }

    rclcpp::Duration minLatency() const
    {
      if (message_count_ < 1) {
        return rclcpp::Duration::max();
      } else {
        return min_latency_;
      }
    }

    rclcpp::Duration maxLatency() const
    {
      if (message_count_ < 1) {
        return rclcpp::Duration::max();
      } else {
        return max_latency_;
      }
    }

    double meanFrequencyHz() const
    {
      if (message_count_ < 2) {
        return 0.0;
      } else {
        return 1e9 / meanPeriod().nanoseconds();
      }
    }

    rclcpp::Duration meanPeriod() const
    {
      if (message_count_ < 2) {
        return rclcpp::Duration::max();
      } else {
        return rclcpp::Duration(total_periods_.seconds() / (message_count_ -1));
      }
    }

    rclcpp::Duration minPeriod() const
    {
      if (message_count_ < 2) {
        return rclcpp::Duration::max();
      } else {
        return min_period_;
      }
    }

    rclcpp::Duration maxPeriod() const
    {
      if (message_count_ < 2) {
        return rclcpp::Duration::max();
      } else {
        return max_period_;
      }
    }

    void setTimeout(const rclcpp::Duration &time_out)
    {
      timeout_ = time_out;
      in_timeout_ = false;
      timeout_count_ = 0;
    }

    bool blockTimeouts(bool block) {
      if (block) {
        in_timeout_ = false;
      }

      bool old_block = blocking_timeout_;
      blocking_timeout_ = block;
      return old_block;
    }

    bool timeoutsBlocked() const {
      return blocking_timeout_;
    }

    rclcpp::Duration timeout() const
    {
      return timeout_;
    }

    bool timeoutEnabled() const
    {
      return timeout_ > rclcpp::Duration(0, 0);
    }

    bool inTimeout()
    {
      checkTimeout(nh_->now());
      return in_timeout_;
    }

    int timeoutCount()
    {
      checkTimeout(nh_->now());
      return timeout_count_;
    }
  };  // class SubscriberImpl

#include <type_traits>

// This comes from cppreference
  template<typename... Ts> struct make_void { typedef void type;};
  template<typename... Ts> using void_t = typename make_void<Ts...>::type;

// primary template handles types that have no ::aMember
  template< class T, class = void_t<> >
  struct has_header : std::false_type { };

// specialization recognizes types that do have a ::aMember
  template< class T >
  struct has_header<T, void_t<decltype( T::header )>> : std::true_type { };

  template<class M , class T>
  class TypedSubscriberImpl : public SubscriberImpl
  {
    T *obj_;
    void (T::*callback_)(const std::shared_ptr< const M > &);

  public:
    TypedSubscriberImpl(
        rclcpp::Node& nh,
        const std::string &topic,
        uint32_t queue_size,
        void(T::*fp)(const std::shared_ptr< M const > &),
        T *obj,
        const rclcpp::QoS& transport_hints)
    {
      unmapped_topic_ = topic;
      // mapped_topic_ = nh->ResolveName(topic);
      // nh_ = nh->nh_;
      nh_ = &nh;

      RCLCPP_INFO(nh_->get_logger(), "Subscribing to '%s'.", unmapped_topic_.c_str());

      callback_ = fp;
      obj_ = obj;
      //transport_hints.depth = queue_size;

      rclcpp::QoS hints = transport_hints;
      hints.keep_last(queue_size);

      sub_ = nh_->create_subscription<M>(unmapped_topic_,
                                         hints,
                                         std::bind(&TypedSubscriberImpl::handleMessage<M>,
                                                   this, std::placeholders::_1)
                                         );
    }

    // Handler for messages with headers
    template <class M2 = M>
    typename std::enable_if<(bool)has_header<M2>(), void>::type
    handleMessage(const std::shared_ptr< M > msg)
    {
      processHeader(msg->header.stamp);
      (obj_->*callback_)(msg);
    }

    // Handler for messages without headers
    template <class M2 = M>
    typename std::enable_if< !(bool)has_header<M2>(), void>::type
    handleMessage(const std::shared_ptr< M > msg)
    {
      processHeader(nh_->now());
      (obj_->*callback_)(msg);
    }
  };  // class TypedSubscriberImpl

  template<class M>
  class BindSubscriberImpl : public SubscriberImpl
  {
    std::function<void(const std::shared_ptr< const M > )> callback_;


  public:
    BindSubscriberImpl(
        rclcpp::Node& nh,
        const std::string &topic,
        uint32_t queue_size,
        const std::function<void(const std::shared_ptr< const M > )> &callback,
        const rclcpp::QoS& transport_hints)
    {
      unmapped_topic_ = topic;
      //mapped_topic_ = nh->ResolveName(topic);
      nh_ = &nh;

      RCLCPP_INFO(nh_->get_logger(), "Subscribing to '%s'.", unmapped_topic_.c_str());

      callback_ = callback;

      rclcpp::QoS hints = transport_hints;
      hints.keep_last(queue_size);
      sub_ = nh_->create_subscription<M>(unmapped_topic_,
                                         hints,
                                         std::bind(&BindSubscriberImpl::handleMessage<M>,
                                                   this, std::placeholders::_1)
                                        );
    }

    // Handler for messages with headers
    template <class M2 = M>
    typename std::enable_if< (bool)has_header<M2>(), void>::type
    handleMessage(const std::shared_ptr< const M > msg)
    {
      processHeader(msg->header.stamp);
      callback_(msg);
    }

    // Handler for messages without headers
    template <class M2 = M>
    typename std::enable_if< !(bool)has_header<M2>(), void>::type
    handleMessage(const std::shared_ptr< const M > msg)
    {
      processHeader(nh_->now());
      callback_(msg);
    }
  };  // class BindSubscriberImpl

  template<class M>
  class StorageSubscriberImpl : public SubscriberImpl
  {
    std::shared_ptr< const M > *dest_;

  public:
    StorageSubscriberImpl(
        rclcpp::Node& nh,
        const std::string &topic,
        std::shared_ptr< const M > *dest,
        const rclcpp::QoS& transport_hints)
    {
      unmapped_topic_ = topic;
      //mapped_topic_ = nh->ResolveName(topic);
      nh_ = &nh;

      RCLCPP_INFO(nh_->get_logger(), "Subscribing to '%s'.", unmapped_topic_.c_str());

      dest_ = dest;
      sub_ = nh_->create_subscription<M>(unmapped_topic_,
                                         transport_hints,
                                         std::bind(&StorageSubscriberImpl::handleMessage<M>,
                                                   this, std::placeholders::_1)
                                         );
    }

    // Handler for messages with headers
    template <class M2 = M>
    typename std::enable_if< (bool)has_header<M2>(), void>::type
    handleMessage(const std::shared_ptr< const M > msg)
    {
      processHeader(msg->header.stamp);
      *dest_ = msg;
    }

    // Handler for messages without headers
    template <class M2 = M>
    typename std::enable_if< !(bool)has_header<M2>(), void>::type
    handleMessage(const std::shared_ptr< const M > msg)
    {
      processHeader(nh_->now());
      *dest_ = msg;
    }
  };  // class StorageSubscriberImpl
}  // namespace swri
#endif  // SWRI_ROSCPP_SUBSCRIBER_IMPL_H_