.. _program_listing_file__tmp_ws_src_marti_common_swri_roscpp_include_swri_roscpp_subscriber.h: Program Listing for File subscriber.h ===================================== |exhale_lsh| :ref:`Return to documentation for file ` (``/tmp/ws/src/marti_common/swri_roscpp/include/swri_roscpp/subscriber.h``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp // ***************************************************************************** // // Copyright (c) 2015, Southwest Research Institute® (SwRI®) // All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are met: // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above copyright // notice, this list of conditions and the following disclaimer in the // documentation and/or other materials provided with the distribution. // * Neither the name of Southwest Research Institute® (SwRI®) nor the // names of its contributors may be used to endorse or promote products // derived from this software without specific prior written permission. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE // ARE DISCLAIMED. IN NO EVENT SHALL SOUTHWEST RESEARCH INSTITUTE BE LIABLE FOR ANY // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // // ***************************************************************************** #ifndef SWRI_ROSCPP_SUBSCRIBER_H_ #define SWRI_ROSCPP_SUBSCRIBER_H_ #include #include #include namespace swri { // This is an extended interface to the rclcpp::Subscriber class. // // This is an extended interface to the rclcpp::Subscriber class that // provides a little more default functionality and instrumentation. // // - Prints information when the subscription is created (to RCLCPP_INFO) // about the unmapped and mapped topic names. // // - Maintains some statistics about the messages (counts, latency, period) // // - Implements timeout logic and timeout counting. // // This implementation provides two interfaces. It supports the // traditional ROS callback interface (though it is deliberately // limited to callbacks that take a *ConstPtr& type), and an interface // where the message is simply assigned to *ConstPtr variable that is // specified at creation. The second interface replaces the previous // swri_roscpp::LatchedSubscriber and allows you to avoid writing // trivial callback functions. class Subscriber { private: // The implementation is kept in a separate class to maintain clean // copy/move semantics with ROS (since the ROS subscriber binds to a // single address for the callback) and to allow us to hide the // template arguments so the message type doesn't have to be // specified when the subscriber is declared. std::shared_ptr impl_; public: Subscriber(); // Using class method callback. template Subscriber(rclcpp::Node &nh, const std::string &topic, uint32_t queue_size, void(T::*fp)(const std::shared_ptr< M const > &), T *obj, const rclcpp::QoS& transport_hints = rclcpp::QoS(1)); // Using a boost function callback. template Subscriber(rclcpp::Node &nh, const std::string &topic, uint32_t queue_size, const std::function &)> &callback, const rclcpp::QoS& transport_hints = rclcpp::QoS(1)); // This is an alternate interface that stores a received message in // a variable without calling a user-defined callback function. // This is useful for cases where you just want to store a message // for usage later and avoids having to write a trivial callback // function. template Subscriber(rclcpp::Node &nh, const std::string &topic, std::shared_ptr< M const > *dest, const rclcpp::QoS& transport_hints = rclcpp::QoS(1)); Subscriber& operator=(const Subscriber &other); // Reset all statistics, including message and timeout counts. void resetStatistics(); // Returns the unmapped topic name that was provided when the // subscriber was created. const std::string& unmappedTopic() const; // Returns the number of publishers that are connected to this // subscriber. int numPublishers() const; // How many messages have been received. int messageCount() const; // Age of the most recent message (difference between now and the // header stamp (or time message was received for messages that // don't have headers). rclcpp::Duration age(const rclcpp::Time &now = rclcpp::Time(0,0,RCL_ROS_TIME)) const; double ageSeconds(const rclcpp::Time &now = rclcpp::Time(0,0,RCL_ROS_TIME)) const; double ageMilliseconds(const rclcpp::Time &now = rclcpp::Time(0,0,RCL_ROS_TIME)) const; // Average latency (time difference between the time stamp and when // the message is received). These will be useless for message types // that do not have a header. rclcpp::Duration meanLatency() const; rclcpp::Duration minLatency() const; rclcpp::Duration maxLatency() const; double meanLatencyMicroseconds() const; double minLatencyMicroseconds() const; double maxLatencyMicroseconds() const; // Frequency/Period in terms of when the message was received (not // the header stamp). double meanFrequencyHz() const; rclcpp::Duration meanPeriod() const; rclcpp::Duration minPeriod() const; rclcpp::Duration maxPeriod() const; double meanPeriodMilliseconds() const; double minPeriodMilliseconds() const; double maxPeriodMilliseconds() const; // Provide a negative value to disable the timeout (default is -1). void setTimeout(const rclcpp::Duration &time_out); void setTimeout(const double time_out); // Block/unblock timeouts from occuring. This allows you to // temporarily block timeouts (for example, if a message is not // expected in a particular mode). Returns the current state bool blockTimeouts(bool block); // Return true if the subscriber is currently blocking timeouts from // occurring. bool timeoutsBlocked() const; // Determine if the timeout is currently enabled. bool timeoutEnabled() const; // Read the current timeout setting. rclcpp::Duration timeout() const; double timeoutMilliseconds() const; // Determine if the topic is in a timed out state. bool inTimeout(); // How many times the topic has been in a timeout state. int timeoutCount(); // These flags determine which values are added to a diagnostic // status by the appendDiagnostics method. enum DIAGNOSTIC_FLAGS { DIAG_CONNECTION = 1 << 0, // Include topic names, publisher counts DIAG_MSG_COUNT = 1 << 1, // Include message count DIAG_TIMEOUT = 1 << 2, // Include timeout counts if applicable DIAG_LATENCY = 1 << 3, // Include latency information DIAG_RATE = 1 << 4, // Include rate information DIAG_ALL = ~0, // Abbreviation to include all information DIAG_MOST = DIAG_ALL ^ DIAG_CONNECTION // Abbreviation to include everything except connection info. }; // This is a convenience method to add information about this // subscription to a diagnostic status in a standard way. // // The status is merged with a warning if no messages have been // received or if timeouts have occurred. The status is merged with // an error if the subscription is in an active timeout status. // // The flags parameter determines which values are added to the // status' key/value pairs. This should be a bitwise combination of // the values defined in DIAGNOSTIC_FLAGS. void appendDiagnostics(diagnostic_updater::DiagnosticStatusWrapper &status, const std::string &name, const int flags); }; // class Subscriber inline Subscriber::Subscriber() { // Setup an empty implementation so that we can assume impl_ is // non-null and avoid a lot of unnecessary NULL checks. impl_ = std::make_shared(); } template inline Subscriber::Subscriber(rclcpp::Node &nh, const std::string &topic, uint32_t queue_size, void(T::*fp)(const std::shared_ptr< M const > &), T *obj, const rclcpp::QoS& transport_hints) { impl_ = std::shared_ptr( new TypedSubscriberImpl( nh, topic, queue_size, fp, obj, transport_hints)); } template inline Subscriber::Subscriber(rclcpp::Node &nh, const std::string &topic, uint32_t queue_size, const std::function &)> &callback, const rclcpp::QoS& transport_hints) { impl_ = std::shared_ptr( new BindSubscriberImpl( nh, topic, queue_size, callback, transport_hints)); } template inline Subscriber::Subscriber(rclcpp::Node &nh, const std::string &topic, std::shared_ptr< M const > *dest, const rclcpp::QoS& transport_hints) { impl_ = std::shared_ptr( new StorageSubscriberImpl( nh, topic, dest, transport_hints)); } inline Subscriber& Subscriber::operator=(const Subscriber &other) { // If we have some non-default parameters and the other class // doesn't, we preserve our parameters across assignment. // // This is to support the use case where we read ROS parameters, set // them in the subscriber, and then create the subscriptions. // Otherwise, the user has to read the timeouts, save them // temporarily, and then set them after the subscription is set up. // // This could cause confusing behavior if you are moving subscribers // around a lot, but I've never seen that kind of use case in any // ROS code. rclcpp::Duration new_timeout = other.impl_->timeout(); if (impl_->timeoutEnabled() && !other.impl_->timeoutEnabled()) { new_timeout = impl_->timeout(); } impl_ = other.impl_; impl_->setTimeout(new_timeout); return *this; } inline const std::string& Subscriber::unmappedTopic() const { return impl_->unmappedTopic(); } inline int Subscriber::numPublishers() const { return impl_->numPublishers(); } inline void Subscriber::resetStatistics() { impl_->resetStatistics(); } inline int Subscriber::messageCount() const { return impl_->messageCount(); } inline rclcpp::Duration Subscriber::age(const rclcpp::Time &now) const { return impl_->age(now); } inline double Subscriber::ageSeconds(const rclcpp::Time &now) const { return age(now).seconds(); } inline double Subscriber::ageMilliseconds(const rclcpp::Time &now) const { return age(now).nanoseconds() / 1000000.0; } inline rclcpp::Duration Subscriber::meanLatency() const { return impl_->meanLatency(); } inline rclcpp::Duration Subscriber::minLatency() const { return impl_->minLatency(); } inline rclcpp::Duration Subscriber::maxLatency() const { return impl_->maxLatency(); } inline double Subscriber::meanLatencyMicroseconds() const { return meanLatency().nanoseconds() / 1000.0; } inline double Subscriber::minLatencyMicroseconds() const { return minLatency().nanoseconds() / 1000.0; } inline double Subscriber::maxLatencyMicroseconds() const { return maxLatency().nanoseconds() / 1000.0; } inline rclcpp::Duration Subscriber::meanPeriod() const { return impl_->meanPeriod(); } inline double Subscriber::meanFrequencyHz() const { return impl_->meanFrequencyHz(); } inline rclcpp::Duration Subscriber::minPeriod() const { return impl_->minPeriod(); } inline rclcpp::Duration Subscriber::maxPeriod() const { return impl_->maxPeriod(); } inline double Subscriber::meanPeriodMilliseconds() const { return meanPeriod().nanoseconds() / 1000000.0; } inline double Subscriber::minPeriodMilliseconds() const { return minPeriod().nanoseconds() / 1000000.0; } inline double Subscriber::maxPeriodMilliseconds() const { return maxPeriod().nanoseconds() / 1000000.0; } inline void Subscriber::setTimeout(const rclcpp::Duration &time_out) { impl_->setTimeout(time_out); } inline void Subscriber::setTimeout(const double time_out) { setTimeout(std::chrono::duration(time_out)); } inline bool Subscriber::blockTimeouts(bool block) { return impl_->blockTimeouts(block); } inline bool Subscriber::timeoutsBlocked() const { return impl_->timeoutsBlocked(); } inline rclcpp::Duration Subscriber::timeout() const { return impl_->timeout(); } inline bool Subscriber::timeoutEnabled() const { return impl_->timeoutEnabled(); } inline double Subscriber::timeoutMilliseconds() const { return impl_->timeout().nanoseconds() / 1.0e6; } inline bool Subscriber::inTimeout() { return impl_->inTimeout(); } inline int Subscriber::timeoutCount() { return impl_->timeoutCount(); } inline void Subscriber::appendDiagnostics(diagnostic_updater::DiagnosticStatusWrapper &status, const std::string &name, int flags) { // Alias a type for easier access to DiagnosticStatus enumerations. typedef diagnostic_msgs::msg::DiagnosticStatus DS; // We are considering no messages seen as a warning because this is // a normal condition during start up, but should be resolved once // the system is running. On the other hand, timeouts are typically // an unexpected condition that occur when a publisher dies or // communications is getting blocked. Active timeouts affect // operation so are treated as an error condition. If a timeout is // not active, but has occurred, we report this as a warning so that // we know something has been wrong and needs to be investigated. if (!messageCount()) { status.mergeSummaryf(DS::WARN, "Waiting for %s messages.", name.c_str()); } else if (inTimeout()) { status.mergeSummaryf(DS::ERROR, "%s timeout.", name.c_str()); } else if (timeoutCount()) { status.mergeSummaryf(DS::WARN, "%s timeouts have occurred.", name.c_str()); } if (flags & DIAG_CONNECTION) { status.addf(name + "topic name", "%s", unmappedTopic().c_str()); status.addf(name + " publishers", "%d", numPublishers()); } if (flags & DIAG_MSG_COUNT) { status.add(name + " message count", messageCount()); } if ((flags & DIAG_TIMEOUT) && timeoutEnabled()) { status.addf(name + " timeouts count", "%d ( > %.2f ms)", timeoutCount(), timeoutMilliseconds()); } if (flags & DIAG_LATENCY) { if (messageCount() < 1) { status.add(name + " latency", "min: N/A us, mean: N/A us, max: N/A us"); } else { status.addf(name + " latency", "min: %.2f us, mean: %.2f us, max: %.2f us", minLatencyMicroseconds(), meanLatencyMicroseconds(), maxLatencyMicroseconds()); } } if (flags & DIAG_RATE) { if (messageCount() < 2) { status.add( name + " rates", "min period: N/A ms, mean frequency: N/A hz, max period: N/A ms"); } else { status.addf( name + " rates", "min period: %.2f ms, mean frequency: %.2f hz, max period: %.2f ms", minPeriodMilliseconds(), meanFrequencyHz(), maxPeriodMilliseconds()); } } } } // namespace swri #endif // SWRI_ROSCPP_SUBSCRIBER_H_