synchros2.subscription module

class synchros2.subscription.Subscription[source]

Bases: MessageFeed[MessageT]

An ergonomic interface for topic subscriptions in ROS 2.

Subscription instances are MessageFeed instances wrapping message_filters.Subscriber instances and thus allow synchronous and asynchronous iteration and fetching of published data.

__init__(message_type: Type[MessageT], topic_name: str, qos_profile: rclpy.qos.QoSProfile | int | None = None, history_length: int | None = None, node: rclpy.node.Node | None = None, autostart: bool = True, trace: bool = False, **kwargs: Any) None[source]

Initializes the subscription.

Parameters:
  • message_type – Target message type class (as generated by rosidl).

  • topic_name – Name of the target topic in the ROS 2 graph.

  • qos_profile – an optional quality-of-service profile or simply a history depth

  • subscription (to use for the underlying native)

  • 1. (defaults to a history depth of)

  • history_length – optional historic data size, defaults to 1

  • node – optional node for the underlying native subscription, defaults to

  • node. (the current process)

  • autostart – whether to start feeding messages immediately or not.

  • trace – Whether to log when messages are received by the message feed

  • kwargs – other keyword arguments are used to create the underlying native subscription.

  • reference. (See rclpy.node.Node.create_subscription documentation for further)

cancel() None

Stop the message feed.

property matched_publishers: int

Gets the number publishers matched and linked to.

Note that in ROS 2 Humble and earlier distributions, this property relies on the number of known publishers for the topic subscribed as subscription matching status info is missing.

property message_type: Type[MessageT]

Gets the type of the message subscribed.

publisher_matches(num_publishers: int) rclpy.task.Future[source]

Gets a future to next publisher matching status update.

Note that in ROS 2 Humble and earlier distributions, this method relies on polling the number of known publishers for the topic subscribed, as subscription matching events are missing.

Parameters:

num_publishers – lower bound on the number of publishers to match.

Returns:

a future, done if the current number of publishers already matches the specified lower bound.

property subscriber: Subscriber

Gets the underlying subscriber.

Type-casted alias of Subscription.link.

property topic_name: str

Gets the name of the topic subscribed.

unsubscribe() None

Stop the message feed.

synchros2.subscription.wait_for_message(msg_type: MessageT, topic_name: str, timeout_sec: float | None = None, *, node: rclpy.node.Node | None = None, **kwargs: Any) MessageT | None[source]

Wait for message on a given topic synchronously.

Parameters:
  • msg_type – type of message to wait for.

  • topic_name – name of the topic to wait on.

  • timeout_sec – optional timeout, in seconds, for the wait.

  • node – An optional Node to provide. If none is provided, the one from the scope is used.

  • kwargs – keyword argument to pass to wait_for_message_async

See wait_for_message_async documentation for a reference on additional keyword arguments.

Returns:

The message received, or None on timeout.

synchros2.subscription.wait_for_message_async(msg_type: MessageT, topic_name: str, *, qos_profile: rclpy.qos.QoSProfile | int = 1, node: rclpy.node.Node | None = None) FutureLike[MessageT][source]

Wait for message on a given topic asynchronously.

Parameters:
  • msg_type – type of message to wait for.

  • topic_name – name of the topic to wait on.

  • qos_profile – optional QoS profile for temporary topic subscription.

  • node (the current process-wide) – optional node for temporary topic subscription, defaults to

  • node

Returns:

A future for the incoming message.

Raises:

RuntimeError – if no node is available.

synchros2.subscription.wait_for_messages(topic_names: Sequence[str], message_types: Sequence[Type], *, timeout_sec: float | None = None, node: rclpy.node.Node | None = None, **kwargs: Any) Sequence[Any] | None[source]

Waits for messages to arrive at multiple topics within a given time window.

Uses message_filters.ApproximateTimeSynchronizer. This function blocks until receiving the messages or when a given timeout expires. Assumes the given node is spinning by some external executor.

Requires the user to pass in a node, since message_filters.Subscriber requires a node upon construction.

Parameters:
  • topic_names – list of topic names

  • message_types – list of message types, one for each topic.

  • timeout_sec – optional time in seconds to wait. None if forever.

  • node – optional node for temporary topic subscription

  • kwargs – additional arguments passed into wait_for_messages_async.

See wait_for_messages_async documentation for a reference on additional keyword arguments.

synchros2.subscription.wait_for_messages_async(topic_names: Sequence[str], message_types: Sequence[Type], *, queue_size: int = 10, delay: float = 0.2, allow_headerless: bool = False, node: rclpy.node.Node | None = None, qos_profiles: Sequence[rclpy.qos.QoSProfile | None] | None = None, callback_group: rclpy.callback_groups.CallbackGroup | None = None) FutureLike[Sequence[Any]][source]

Asynchronous version of wait_for_messages

Parameters:
  • topic_names – list of topic names

  • message_types – List of message types, one for each topic.

  • queue_size – synchronizer message queue size

  • delay – the delay in seconds for which the messages could be

  • synchronized (i.e. the time window)

  • allow_headerless – whether it’s ok for there to be no header in the messages.

  • qos_profiles – optional list of QoS profiles, one for each topic.

  • topic (If no QoS profile is specified for a given)

  • with (the default profile)

  • used. (a history depth of 10 will be)

  • node (process-wide) – optional node for temporary topic subscription, defaults to the current

  • node

  • callback_group – optional callback group for the message filter subscribers.