Class SubscriptionBase

Inheritance Relationships

Base Type

  • public std::enable_shared_from_this< SubscriptionBase >

Derived Types

Class Documentation

class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>

Virtual base class for subscriptions. This pattern allows us to iterate over different template specializations of Subscription, among other things.

Subclassed by rclcpp::Subscription< rcl_interfaces::msg::ParameterEvent >, rclcpp::GenericSubscription, rclcpp::Subscription< MessageT, AllocatorT, SubscribedT, ROSMessageT, MessageMemoryStrategyT >

Public Types

using IntraProcessManagerWeakPtr = std::weak_ptr<rclcpp::experimental::IntraProcessManager>

Public Functions

SubscriptionBase(rclcpp::node_interfaces::NodeBaseInterface *node_base, const rosidl_message_type_support_t &type_support_handle, const std::string &topic_name, const rcl_subscription_options_t &subscription_options, bool is_serialized = false)

Constructor.

This accepts rcl_subscription_options_t instead of rclcpp::SubscriptionOptions because rclcpp::SubscriptionOptions::to_rcl_subscription_options depends on the message type.

Parameters:
  • node_base[in] NodeBaseInterface pointer used in parts of the setup.

  • type_support_handle[in] rosidl type support struct, for the Message type of the topic.

  • topic_name[in] Name of the topic to subscribe to.

  • subscription_options[in] Options for the subscription.

  • is_serialized[in] is true if the message will be delivered still serialized

virtual ~SubscriptionBase()

Destructor.

const char *get_topic_name() const

Get the topic that this subscription is subscribed on.

std::shared_ptr<rcl_subscription_t> get_subscription_handle()
std::shared_ptr<const rcl_subscription_t> get_subscription_handle() const
const std::unordered_map<rcl_subscription_event_type_t, std::shared_ptr<rclcpp::QOSEventHandlerBase>> &get_event_handlers() const

Get all the QoS event handlers associated with this subscription.

Returns:

The map of QoS event handlers.

rclcpp::QoS get_actual_qos() const

Get the actual QoS settings, after the defaults have been determined.

The actual configuration applied when using RMW_QOS_POLICY_*_SYSTEM_DEFAULT can only be resolved after the creation of the subscription, and it depends on the underlying rmw implementation. If the underlying setting in use can’t be represented in ROS terms, it will be set to RMW_QOS_POLICY_*_UNKNOWN. May throw runtime_error when an unexpected error occurs.

Throws:

std::runtime_error – if failed to get qos settings

Returns:

The actual qos settings.

bool take_type_erased(void *message_out, rclcpp::MessageInfo &message_info_out)

Take the next inter-process message from the subscription as a type erased pointer.

The only difference is that it takes a type erased pointer rather than a reference to the exact message type.

See also

Subscription::take() for details on how this function works.

This type erased version facilitates using the subscriptions in a type agnostic way using SubscriptionBase::create_message() and SubscriptionBase::handle_message().

Parameters:
  • message_out[out] The type erased message pointer into which take will copy the data.

  • message_info_out[out] The message info for the taken message.

Throws:

any – rcl errors from rcl_take,

Returns:

true if data was taken and is valid, otherwise false

bool take_serialized(rclcpp::SerializedMessage &message_out, rclcpp::MessageInfo &message_info_out)

Take the next inter-process message, in its serialized form, from the subscription.

For now, if data is taken (written) into the message_out and message_info_out then true will be returned. Unlike Subscription::take(), taking data serialized is not possible via intra-process for the time being, so it will not need to de-duplicate data in any case.

Parameters:
  • message_out[out] The serialized message data structure used to store the taken message.

  • message_info_out[out] The message info for the taken message.

Throws:

any – rcl errors from rcl_take,

Returns:

true if data was taken and is valid, otherwise false

virtual std::shared_ptr<void> create_message() = 0

Borrow a new message.

Returns:

Shared pointer to the fresh message.

virtual std::shared_ptr<rclcpp::SerializedMessage> create_serialized_message() = 0

Borrow a new serialized message.

Returns:

Shared pointer to a rcl_message_serialized_t.

virtual void handle_message(std::shared_ptr<void> &message, const rclcpp::MessageInfo &message_info) = 0

Check if we need to handle the message, and execute the callback if we do.

Parameters:
  • message[in] Shared pointer to the message to handle.

  • message_info[in] Metadata associated with this message.

virtual void handle_serialized_message(const std::shared_ptr<rclcpp::SerializedMessage> &serialized_message, const rclcpp::MessageInfo &message_info) = 0
virtual void handle_loaned_message(void *loaned_message, const rclcpp::MessageInfo &message_info) = 0
virtual void return_message(std::shared_ptr<void> &message) = 0

Return the message borrowed in create_message.

Parameters:

message[in] Shared pointer to the returned message.

virtual void return_serialized_message(std::shared_ptr<rclcpp::SerializedMessage> &message) = 0

Return the message borrowed in create_serialized_message.

Parameters:

message[in] Shared pointer to the returned message.

const rosidl_message_type_support_t &get_message_type_support_handle() const
bool is_serialized() const

Return if the subscription is serialized.

Returns:

true if the subscription is serialized, false otherwise

size_t get_publisher_count() const

Get matching publisher count.

Returns:

The number of publishers on this topic.

bool can_loan_messages() const

Check if subscription instance can loan messages.

Depending on the middleware and the message type, this will return true if the middleware can allocate a ROS message instance.

Returns:

boolean flag indicating if middleware can loan messages.

void setup_intra_process(uint64_t intra_process_subscription_id, IntraProcessManagerWeakPtr weak_ipm)

Implemenation detail.

rclcpp::Waitable::SharedPtr get_intra_process_waitable() const

Return the waitable for intra-process.

Throws:

std::runtime_error – if the intra process manager is destroyed

Returns:

the waitable sharedpointer for intra-process, or nullptr if intra-process is not setup.

bool exchange_in_use_by_wait_set_state(void *pointer_to_subscription_part, bool in_use_state)

Exchange state of whether or not a part of the subscription is used by a wait set.

Used to ensure parts of the subscription are not used with multiple wait sets simultaneously.

Parameters:
  • pointer_to_subscription_part[in] address of a subscription part

  • in_use_state[in] the new state to exchange, true means “now in use”, and false means “no longer in use”.

Throws:
  • std::invalid_argument – If pointer_to_subscription_part is nullptr.

  • std::runtime_error – If the pointer given is not a pointer to one of the parts of the subscription which can be used with a wait set.

Returns:

the current “in use” state.

std::vector<rclcpp::NetworkFlowEndpoint> get_network_flow_endpoints() const

Get network flow endpoints.

Describes network flow endpoints that this subscription is receiving messages on

Returns:

vector of NetworkFlowEndpoint

inline void set_on_new_message_callback(std::function<void(size_t)> callback)

Set a callback to be called when each new message is received.

The callback receives a size_t which is the number of messages received since the last time this callback was called. Normally this is 1, but can be > 1 if messages were received before any callback was set.

Since this callback is called from the middleware, you should aim to make it fast and not blocking. If you need to do a lot of work or wait for some other event, you should spin it off to another thread, otherwise you risk blocking the middleware.

Calling it again will clear any previously set callback.

This function is thread-safe.

If you want more information available in the callback, like the subscription or other information, you may use a lambda with captures or std::bind.

See also

rmw_subscription_set_on_new_message_callback

See also

rcl_subscription_set_on_new_message_callback

Parameters:

callback[in] functor to be called when a new message is received

inline void clear_on_new_message_callback()

Unset the callback registered for new messages, if any.

inline void set_on_new_intra_process_message_callback(std::function<void(size_t)> callback)

Set a callback to be called when each new intra-process message is received.

The callback receives a size_t which is the number of messages received since the last time this callback was called. Normally this is 1, but can be > 1 if messages were received before any callback was set.

Calling it again will clear any previously set callback.

This function is thread-safe.

If you want more information available in the callback, like the subscription or other information, you may use a lambda with captures or std::bind.

See also

rclcpp::SubscriptionIntraProcessBase::set_on_ready_callback

Parameters:

callback[in] functor to be called when a new message is received

inline void clear_on_new_intra_process_message_callback()

Unset the callback registered for new intra-process messages, if any.

inline void set_on_new_qos_event_callback(std::function<void(size_t)> callback, rcl_subscription_event_type_t event_type)

Set a callback to be called when each new qos event instance occurs.

The callback receives a size_t which is the number of events that occurred since the last time this callback was called. Normally this is 1, but can be > 1 if events occurred before any callback was set.

Since this callback is called from the middleware, you should aim to make it fast and not blocking. If you need to do a lot of work or wait for some other event, you should spin it off to another thread, otherwise you risk blocking the middleware.

Calling it again will clear any previously set callback.

An exception will be thrown if the callback is not callable.

This function is thread-safe.

If you want more information available in the callback, like the qos event or other information, you may use a lambda with captures or std::bind.

Parameters:
  • callback[in] functor to be called when a new event occurs

  • event_type[in] identifier for the qos event we want to attach the callback to

inline void clear_on_new_qos_event_callback(rcl_subscription_event_type_t event_type)

Unset the callback registered for new qos events, if any.

bool is_cft_enabled() const

Check if content filtered topic feature of the subscription instance is enabled.

Returns:

boolean flag indicating if the content filtered topic of this subscription is enabled.

void set_content_filter(const std::string &filter_expression, const std::vector<std::string> &expression_parameters = {})

Set the filter expression and expression parameters for the subscription.

See also

ContentFilterOptions::filter_expression An empty string (“”) will clear the content filter setting of the subscription.

Parameters:
  • filter_expression[in] A filter expression to set.

  • expression_parameters[in] Array of expression parameters to set.

Throws:
  • RCLBadAlloc – if memory cannot be allocated

  • RCLError – if an unexpect error occurs

rclcpp::ContentFilterOptions get_content_filter() const

Get the filter expression and expression parameters for the subscription.

Throws:
  • RCLBadAlloc – if memory cannot be allocated

  • RCLError – if an unexpect error occurs

Returns:

rclcpp::ContentFilterOptions The content filter options to get.

Protected Functions

template<typename EventCallbackT>
inline void add_event_handler(const EventCallbackT &callback, const rcl_subscription_event_type_t event_type)
void default_incompatible_qos_callback(QOSRequestedIncompatibleQoSInfo &info) const
bool matches_any_intra_process_publishers(const rmw_gid_t *sender_gid) const
void set_on_new_message_callback(rcl_event_callback_t callback, const void *user_data)

Protected Attributes

rclcpp::node_interfaces::NodeBaseInterface *const node_base_
std::shared_ptr<rcl_node_t> node_handle_
std::shared_ptr<rcl_subscription_t> subscription_handle_
std::shared_ptr<rcl_subscription_t> intra_process_subscription_handle_
rclcpp::Logger node_logger_
std::unordered_map<rcl_subscription_event_type_t, std::shared_ptr<rclcpp::QOSEventHandlerBase>> event_handlers_
bool use_intra_process_
IntraProcessManagerWeakPtr weak_ipm_
uint64_t intra_process_subscription_id_
std::shared_ptr<rclcpp::experimental::SubscriptionIntraProcessBase> subscription_intra_process_