Program Listing for File subscriber.h

Return to documentation for file (/tmp/ws/src/marti_common/swri_roscpp/include/swri_roscpp/subscriber.h)

// *****************************************************************************
//
// Copyright (c) 2015, Southwest Research Institute® (SwRI®)
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//     * Redistributions of source code must retain the above copyright
//       notice, this list of conditions and the following disclaimer.
//     * Redistributions in binary form must reproduce the above copyright
//       notice, this list of conditions and the following disclaimer in the
//       documentation and/or other materials provided with the distribution.
//     * Neither the name of Southwest Research Institute® (SwRI®) nor the
//       names of its contributors may be used to endorse or promote products
//       derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL SOUTHWEST RESEARCH INSTITUTE BE LIABLE FOR ANY
// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
// *****************************************************************************
#ifndef SWRI_ROSCPP_SUBSCRIBER_H_
#define SWRI_ROSCPP_SUBSCRIBER_H_


#include <rclcpp/rclcpp.hpp>
#include <diagnostic_updater/diagnostic_status_wrapper.hpp>

#include <swri_roscpp/subscriber_impl.h>

namespace swri
{
// This is an extended interface to the rclcpp::Subscriber class.
//
// This is an extended interface to the rclcpp::Subscriber class that
// provides a little more default functionality and instrumentation.
//
// - Prints information when the subscription is created (to RCLCPP_INFO)
//   about the unmapped and mapped topic names.
//
// - Maintains some statistics about the messages (counts, latency, period)
//
// - Implements timeout logic and timeout counting.
//
// This implementation provides two interfaces.  It supports the
// traditional ROS callback interface (though it is deliberately
// limited to callbacks that take a *ConstPtr& type), and an interface
// where the message is simply assigned to *ConstPtr variable that is
// specified at creation.  The second interface replaces the previous
// swri_roscpp::LatchedSubscriber and allows you to avoid writing
// trivial callback functions.

class Subscriber
{
 private:
  // The implementation is kept in a separate class to maintain clean
  // copy/move semantics with ROS (since the ROS subscriber binds to a
  // single address for the callback) and to allow us to hide the
  // template arguments so the message type doesn't have to be
  // specified when the subscriber is declared.
  std::shared_ptr<SubscriberImpl> impl_;

 public:
  Subscriber();

  // Using class method callback.
  template<class M , class T >
  Subscriber(rclcpp::Node &nh,
             const std::string &topic,
             uint32_t queue_size,
             void(T::*fp)(const std::shared_ptr< M const > &),
             T *obj,
             const rclcpp::QoS& transport_hints = rclcpp::QoS(1));

  // Using a boost function callback.
  template<class M>
  Subscriber(rclcpp::Node &nh,
             const std::string &topic,
             uint32_t queue_size,
             const std::function<void(const std::shared_ptr<M const> &)> &callback,
             const rclcpp::QoS& transport_hints = rclcpp::QoS(1));

  // This is an alternate interface that stores a received message in
  // a variable without calling a user-defined callback function.
  // This is useful for cases where you just want to store a message
  // for usage later and avoids having to write a trivial callback
  // function.
  template<class M>
  Subscriber(rclcpp::Node &nh,
             const std::string &topic,
             std::shared_ptr< M const > *dest,
             const rclcpp::QoS& transport_hints = rclcpp::QoS(1));

  Subscriber& operator=(const Subscriber &other);

  // Reset all statistics, including message and timeout counts.
  void resetStatistics();

  // Returns the unmapped topic name that was provided when the
  // subscriber was created.
  const std::string& unmappedTopic() const;
  // Returns the number of publishers that are connected to this
  // subscriber.
  int numPublishers() const;

  // How many messages have been received.
  int messageCount() const;

  // Age of the most recent message (difference between now and the
  // header stamp (or time message was received for messages that
  // don't have headers).
  rclcpp::Duration age(const rclcpp::Time &now = rclcpp::Time(0,0,RCL_ROS_TIME)) const;
  double ageSeconds(const rclcpp::Time &now = rclcpp::Time(0,0,RCL_ROS_TIME)) const;
  double ageMilliseconds(const rclcpp::Time &now = rclcpp::Time(0,0,RCL_ROS_TIME)) const;

  // Average latency (time difference between the time stamp and when
  // the message is received). These will be useless for message types
  // that do not have a header.
  rclcpp::Duration meanLatency() const;
  rclcpp::Duration minLatency() const;
  rclcpp::Duration maxLatency() const;
  double meanLatencyMicroseconds() const;
  double minLatencyMicroseconds() const;
  double maxLatencyMicroseconds() const;

  // Frequency/Period in terms of when the message was received (not
  // the header stamp).
  double meanFrequencyHz() const;
  rclcpp::Duration meanPeriod() const;
  rclcpp::Duration minPeriod() const;
  rclcpp::Duration maxPeriod() const;
  double meanPeriodMilliseconds() const;
  double minPeriodMilliseconds() const;
  double maxPeriodMilliseconds() const;

  // Provide a negative value to disable the timeout (default is -1).
  void setTimeout(const rclcpp::Duration &time_out);
  void setTimeout(const double time_out);

  // Block/unblock timeouts from occuring.  This allows you to
  // temporarily block timeouts (for example, if a message is not
  // expected in a particular mode).  Returns the current state
  bool blockTimeouts(bool block);

  // Return true if the subscriber is currently blocking timeouts from
  // occurring.
  bool timeoutsBlocked() const;

  // Determine if the timeout is currently enabled.
  bool timeoutEnabled() const;
  // Read the current timeout setting.
  rclcpp::Duration timeout() const;
  double timeoutMilliseconds() const;

  // Determine if the topic is in a timed out state.
  bool inTimeout();
  // How many times the topic has been in a timeout state.
  int timeoutCount();

  // These flags determine which values are added to a diagnostic
  // status by the appendDiagnostics method.
  enum DIAGNOSTIC_FLAGS {
    DIAG_CONNECTION = 1 << 0,  // Include topic names, publisher counts
    DIAG_MSG_COUNT  = 1 << 1,  // Include message count
    DIAG_TIMEOUT    = 1 << 2,  // Include timeout counts if applicable
    DIAG_LATENCY    = 1 << 3,  // Include latency information
    DIAG_RATE       = 1 << 4,  // Include rate information

    DIAG_ALL        = ~0,       // Abbreviation to include all information
    DIAG_MOST       = DIAG_ALL ^ DIAG_CONNECTION
    // Abbreviation to include everything except connection info.
  };

  // This is a convenience method to add information about this
  // subscription to a diagnostic status in a standard way.
  //
  // The status is merged with a warning if no messages have been
  // received or if timeouts have occurred.  The status is merged with
  // an error if the subscription is in an active timeout status.
  //
  // The flags parameter determines which values are added to the
  // status' key/value pairs.  This should be a bitwise combination of
  // the values defined in DIAGNOSTIC_FLAGS.
  void appendDiagnostics(diagnostic_updater::DiagnosticStatusWrapper &status,
                         const std::string &name,
                         const int flags);
};  // class Subscriber

inline
Subscriber::Subscriber()
{
  // Setup an empty implementation so that we can assume impl_ is
  // non-null and avoid a lot of unnecessary NULL checks.
  impl_ = std::make_shared<SubscriberImpl>();
}

template<class M , class T >
inline
Subscriber::Subscriber(rclcpp::Node &nh,
                       const std::string &topic,
                       uint32_t queue_size,
                       void(T::*fp)(const std::shared_ptr< M const > &),
                       T *obj,
                       const rclcpp::QoS& transport_hints)
{
  impl_ = std::shared_ptr<SubscriberImpl>(
    new TypedSubscriberImpl<M,T>(
      nh, topic, queue_size, fp, obj, transport_hints));
}

template<class M>
inline
Subscriber::Subscriber(rclcpp::Node &nh,
                       const std::string &topic,
                       uint32_t queue_size,
                       const std::function<void(const std::shared_ptr<M const> &)> &callback,
                       const rclcpp::QoS& transport_hints)
{
  impl_ = std::shared_ptr<SubscriberImpl>(
    new BindSubscriberImpl<M>(
      nh, topic, queue_size, callback, transport_hints));
}

template<class M>
inline
Subscriber::Subscriber(rclcpp::Node &nh,
                       const std::string &topic,
                       std::shared_ptr< M const > *dest,
                       const rclcpp::QoS& transport_hints)
{
  impl_ = std::shared_ptr<SubscriberImpl>(
    new StorageSubscriberImpl<M>(
      nh, topic, dest, transport_hints));
}

inline
Subscriber& Subscriber::operator=(const Subscriber &other)
{
  // If we have some non-default parameters and the other class
  // doesn't, we preserve our parameters across assignment.
  //
  // This is to support the use case where we read ROS parameters, set
  // them in the subscriber, and then create the subscriptions.
  // Otherwise, the user has to read the timeouts, save them
  // temporarily, and then set them after the subscription is set up.
  //
  // This could cause confusing behavior if you are moving subscribers
  // around a lot, but I've never seen that kind of use case in any
  // ROS code.

  rclcpp::Duration new_timeout = other.impl_->timeout();
  if (impl_->timeoutEnabled() && !other.impl_->timeoutEnabled()) {
    new_timeout = impl_->timeout();
  }

  impl_ = other.impl_;
  impl_->setTimeout(new_timeout);

  return *this;
}

inline
const std::string& Subscriber::unmappedTopic() const
{
  return impl_->unmappedTopic();
}

inline
int Subscriber::numPublishers() const
{
  return impl_->numPublishers();
}

inline
void Subscriber::resetStatistics()
{
  impl_->resetStatistics();
}

inline
int Subscriber::messageCount() const
{
  return impl_->messageCount();
}

inline
rclcpp::Duration Subscriber::age(const rclcpp::Time &now) const
{
  return impl_->age(now);
}

inline
double Subscriber::ageSeconds(const rclcpp::Time &now) const
{
  return age(now).seconds();
}

inline
double Subscriber::ageMilliseconds(const rclcpp::Time &now) const
{
  return age(now).nanoseconds() / 1000000.0;
}

inline
rclcpp::Duration Subscriber::meanLatency() const
{
  return impl_->meanLatency();
}

inline
rclcpp::Duration Subscriber::minLatency() const
{
  return impl_->minLatency();
}

inline
rclcpp::Duration Subscriber::maxLatency() const
{
  return impl_->maxLatency();
}

inline
double Subscriber::meanLatencyMicroseconds() const
{
  return meanLatency().nanoseconds() / 1000.0;
}

inline
double Subscriber::minLatencyMicroseconds() const
{
  return minLatency().nanoseconds() / 1000.0;
}

inline
double Subscriber::maxLatencyMicroseconds() const
{
  return maxLatency().nanoseconds() / 1000.0;
}

inline
rclcpp::Duration Subscriber::meanPeriod() const
{
  return impl_->meanPeriod();
}

inline
double Subscriber::meanFrequencyHz() const
{
  return impl_->meanFrequencyHz();
}

inline
rclcpp::Duration Subscriber::minPeriod() const
{
  return impl_->minPeriod();
}

inline
rclcpp::Duration Subscriber::maxPeriod() const
{
  return impl_->maxPeriod();
}

inline
double Subscriber::meanPeriodMilliseconds() const
{
  return meanPeriod().nanoseconds() / 1000000.0;
}

inline
double Subscriber::minPeriodMilliseconds() const
{
  return minPeriod().nanoseconds() / 1000000.0;
}

inline
double Subscriber::maxPeriodMilliseconds() const
{
  return maxPeriod().nanoseconds() / 1000000.0;
}

inline
void Subscriber::setTimeout(const rclcpp::Duration &time_out)
{
  impl_->setTimeout(time_out);
}

inline
void Subscriber::setTimeout(const double time_out)
{
  setTimeout(std::chrono::duration<double>(time_out));
}

inline
bool Subscriber::blockTimeouts(bool block)
{
  return impl_->blockTimeouts(block);
}

inline
bool Subscriber::timeoutsBlocked() const
{
  return impl_->timeoutsBlocked();
}

inline
rclcpp::Duration Subscriber::timeout() const
{
  return impl_->timeout();
}

inline
bool Subscriber::timeoutEnabled() const
{
  return impl_->timeoutEnabled();
}

inline
double Subscriber::timeoutMilliseconds() const
{
  return impl_->timeout().nanoseconds() / 1.0e6;
}

inline
bool Subscriber::inTimeout()
{
  return impl_->inTimeout();
}

inline
int Subscriber::timeoutCount()
{
  return impl_->timeoutCount();
}

inline
void Subscriber::appendDiagnostics(diagnostic_updater::DiagnosticStatusWrapper &status,
                                   const std::string &name,
                                   int flags)
{
  // Alias a type for easier access to DiagnosticStatus enumerations.
  typedef diagnostic_msgs::msg::DiagnosticStatus DS;

  // We are considering no messages seen as a warning because this is
  // a normal condition during start up, but should be resolved once
  // the system is running.  On the other hand, timeouts are typically
  // an unexpected condition that occur when a publisher dies or
  // communications is getting blocked.  Active timeouts affect
  // operation so are treated as an error condition.  If a timeout is
  // not active, but has occurred, we report this as a warning so that
  // we know something has been wrong and needs to be investigated.

  if (!messageCount()) {
    status.mergeSummaryf(DS::WARN, "Waiting for %s messages.", name.c_str());
  } else if (inTimeout()) {
    status.mergeSummaryf(DS::ERROR, "%s timeout.", name.c_str());
  } else if (timeoutCount()) {
    status.mergeSummaryf(DS::WARN, "%s timeouts have occurred.", name.c_str());
  }

  if (flags & DIAG_CONNECTION) {
    status.addf(name + "topic name", "%s", unmappedTopic().c_str());
    status.addf(name + " publishers", "%d", numPublishers());
  }

  if (flags & DIAG_MSG_COUNT) {
    status.add(name + " message count", messageCount());
  }

  if ((flags & DIAG_TIMEOUT) && timeoutEnabled()) {
    status.addf(name + " timeouts count",
                "%d ( > %.2f ms)",
                timeoutCount(),
                timeoutMilliseconds());
  }

  if (flags & DIAG_LATENCY) {
    if (messageCount() < 1) {
      status.add(name + " latency",
                 "min: N/A us, mean: N/A us, max: N/A us");
    } else {
      status.addf(name + " latency",
                  "min: %.2f us, mean: %.2f us, max: %.2f us",
                  minLatencyMicroseconds(),
                  meanLatencyMicroseconds(),
                  maxLatencyMicroseconds());
    }
  }

  if (flags & DIAG_RATE) {
    if (messageCount() < 2) {
    status.add(
      name + " rates",
      "min period: N/A ms, mean frequency: N/A hz, max period: N/A ms");
    } else {
      status.addf(
        name + " rates",
        "min period: %.2f ms, mean frequency: %.2f hz, max period: %.2f ms",
        minPeriodMilliseconds(),
        meanFrequencyHz(),
        maxPeriodMilliseconds());
    }
  }
}
}  // namespace swri
#endif  // SWRI_ROSCPP_SUBSCRIBER_H_