rclpy.experimental package

Submodules

Module contents

class rclpy.experimental.AsyncClient(context: ~rclpy.context.Context, client_impl: object, srv_type: ~typing.Type[rosidl_pycommon.interface_base_classes.BaseService.(~SrvRequestT, ~SrvResponseT).(~SrvRequestT, ~SrvResponseT)], srv_name: str, qos_profile: rclpy.qos.QoSProfile, on_destroy: ~typing.Callable[[~rclpy.experimental.async_client.AsyncClient], None], tg: ~asyncio.taskgroups.TaskGroup | None = None)

Bases: BaseClient[SrvRequestT, SrvResponseT]

A client of a ROS service.

Experimental

This API is experimental.

Create a container for a ROS service client.

Warning

Users should not create a service client with this constructor, instead they should call Node.create_client() or AsyncNode.create_client().

async call(request: SrvRequestT) SrvResponseT

Send a service request and await the response.

async wait_for_service(*, check_interval: float = 0.1) None

Wait for a service server to become ready.

To apply a timeout, wrap the call with async with asyncio.timeout().

Parameters:

check_interval – Seconds between checks. Defaults to 0.1.

class rclpy.experimental.AsyncClock(*args: Any, **kwargs: Any)

Bases: BaseClock

A ROS clock with async sleep.

Experimental

This API is experimental.

async sleep(duration_sec: float) None

Sleep for a duration respecting sim time.

Cancelled on clock destruction. Raises TimeSourceChangedError if ROS time is activated or deactivated during the sleep.

class rclpy.experimental.AsyncNode(node_name: str, *, context: Context | None = None, cli_args: list[str] | None = None, namespace: str | None = None, use_global_arguments: bool = True, enable_rosout: bool = True, rosout_qos_profile: rclpy.qos.QoSProfile | int = rclpy.qos.qos_profile_rosout_default, start_parameter_services: bool = True, parameter_overrides: list[Parameter[Any]] | None = None, allow_undeclared_parameters: bool = False, automatically_declare_parameters_from_overrides: bool = False, enable_logger_service: bool = False)

Bases: BaseNode

A Node in the ROS graph that runs on an asyncio event loop.

Like Node, AsyncNode is the primary entrypoint for communication — it creates publishers, subscriptions, services, clients, and timers. Unlike Node, it does not require an rclpy executor: the node owns its own asyncio.TaskGroup, and DDS callbacks are dispatched directly onto the running event loop.

Experimental

This API is experimental.

Example:

async with AsyncNode('my_node') as node:
    node.create_subscription(String, '/topic', callback, 10)
    await my_async_task()

See run() for a shorthand when the node is the only foreground task.

Create an AsyncNode.

Parameters:
  • node_name – A name to give to this node. Validated by validate_node_name().

  • context – The context to be associated with, or None for the default global context.

  • cli_args – A list of strings of command line args to be used only by this node. These arguments are used to extract remappings used by the node and other ROS specific settings, as well as user defined non-ROS arguments.

  • namespace – The namespace to which relative topic and service names will be prefixed. Validated by validate_namespace().

  • use_global_argumentsFalse if the node should ignore process-wide command line args.

  • enable_rosoutFalse if the node should ignore rosout logging.

  • rosout_qos_profile – A QoSProfile or a history depth to apply to rosout publisher. In the case that a history depth is provided, the QoS history is set to KEEP_LAST, the QoS history depth is set to the value of the parameter, and all other QoS settings are set to their default value.

  • start_parameter_servicesFalse if the node should not create parameter services.

  • parameter_overrides – A list of overrides for initial values for parameters declared on the node.

  • allow_undeclared_parameters – True if undeclared parameters are allowed. This flag affects the behavior of parameter-related operations.

  • automatically_declare_parameters_from_overrides – If True, the “parameter overrides” will be used to implicitly declare parameters on the node during creation.

  • enable_logger_serviceTrue if ROS2 services are created to allow external nodes to get and set logger levels of this node. Otherwise, logger levels are only managed locally. That is, logger levels cannot be changed remotely.

create_client(srv_type: ~typing.Type[rosidl_pycommon.interface_base_classes.BaseService.(~SrvRequestT, ~SrvResponseT).(~SrvRequestT, ~SrvResponseT)], srv_name: str, *, qos_profile: rclpy.qos.QoSProfile = rclpy.qos.qos_profile_services_default) AsyncClient[SrvRequestT, SrvResponseT]

Create a new service client.

Parameters:
  • srv_type – The service type.

  • srv_name – The name of the service.

  • qos_profile – The quality of service profile to apply to the service client.

Returns:

The new client.

Raises:

RuntimeError – If the node has been destroyed.

create_publisher(msg_type: Type[MsgT], topic: str, qos_profile: rclpy.qos.QoSProfile | int) AsyncPublisher[MsgT]

Create a new publisher.

Parameters:
  • msg_type – The type of ROS messages the publisher will publish.

  • topic – The name of the topic the publisher will publish to.

  • qos_profile – A QoSProfile or a history depth to apply to the publisher. In the case that a history depth is provided, the QoS history is set to KEEP_LAST, the QoS history depth is set to the value of the parameter, and all other QoS settings are set to their default values.

Returns:

The new publisher.

Raises:

RuntimeError – If the node has been destroyed.

create_service(srv_type: ~typing.Type[rosidl_pycommon.interface_base_classes.BaseService.(~SrvRequestT, ~SrvResponseT).(~SrvRequestT, ~SrvResponseT)], srv_name: str, callback: ~typing.Callable[[~rclpy.type_support.SrvRequestT, ~rclpy.type_support.SrvResponseT], ~rclpy.type_support.SrvResponseT] | ~typing.Callable[[~rclpy.type_support.SrvRequestT, ~rclpy.type_support.SrvResponseT], ~typing.Awaitable[~rclpy.type_support.SrvResponseT]], *, qos_profile: rclpy.qos.QoSProfile = rclpy.qos.qos_profile_services_default, concurrent: bool = False) AsyncService[SrvRequestT, SrvResponseT]

Create a new service server.

Parameters:
  • srv_type – The service type.

  • srv_name – The name of the service.

  • callback – A user-defined callback invoked for each received request; its return value is sent as the response. May be a sync def or async def function.

  • qos_profile – The quality of service profile to apply to the service server.

  • concurrent – If True, each request is taken as soon as it arrives and its handler is dispatched as an independent task, so handlers may run concurrently. If False (default), the next request is not taken from DDS until the previous handler has finished — handler execution back-pressures the read loop.

Returns:

The new service.

Raises:

RuntimeError – If the node has been destroyed.

create_subscription(msg_type: Type[MsgT], topic: str, callback: Callable[[bytes], None] | Callable[[bytes, MessageInfo], None] | Callable[[bytes], Awaitable[None]] | Callable[[bytes, MessageInfo], Awaitable[None]], qos_profile: rclpy.qos.QoSProfile | int, *, raw: Literal[True], concurrent: bool = False, content_filter_options: ContentFilterOptions | None = None) AsyncSubscription[MsgT]
create_subscription(msg_type: Type[MsgT], topic: str, callback: Callable[[MsgT], None] | Callable[[MsgT, MessageInfo], None] | Callable[[MsgT], Awaitable[None]] | Callable[[MsgT, MessageInfo], Awaitable[None]], qos_profile: rclpy.qos.QoSProfile | int, *, raw: Literal[False], concurrent: bool = False, content_filter_options: ContentFilterOptions | None = None) AsyncSubscription[MsgT]
create_subscription(msg_type: Type[MsgT], topic: str, callback: Callable[[MsgT], None] | Callable[[MsgT, MessageInfo], None] | Callable[[MsgT], Awaitable[None]] | Callable[[MsgT, MessageInfo], Awaitable[None]] | Callable[[bytes], None] | Callable[[bytes, MessageInfo], None] | Callable[[bytes], Awaitable[None]] | Callable[[bytes, MessageInfo], Awaitable[None]], qos_profile: rclpy.qos.QoSProfile | int, *, raw: bool = False, concurrent: bool = False, content_filter_options: ContentFilterOptions | None = None) AsyncSubscription[MsgT]

Create a new subscription.

Parameters:
  • msg_type – The type of ROS messages the subscription will subscribe to.

  • topic – The name of the topic the subscription will subscribe to.

  • callback – A user-defined callback invoked for each received message. May be a sync def or async def function.

  • qos_profile – A QoSProfile or a history depth to apply to the subscription. In the case that a history depth is provided, the QoS history is set to KEEP_LAST, the QoS history depth is set to the value of the parameter, and all other QoS settings are set to their default values.

  • raw – If True, received messages will be delivered in raw binary representation.

  • concurrent – If True, each message is taken as soon as it arrives and its callback is dispatched as an independent task, so callbacks may run concurrently. If False (default), the next message is not taken from DDS until the previous callback has finished — callback execution back-pressures the read loop.

  • content_filter_options – The filter expression and parameters for content filtering.

Returns:

The new subscription.

Raises:

RuntimeError – If the node has been destroyed.

create_timer(timer_period_sec: float, callback: Callable[[], None] | Callable[[TimerInfo], None] | Callable[[], Awaitable[None]] | Callable[[TimerInfo], Awaitable[None]], autostart: bool = True) AsyncTimer

Create a new timer.

If autostart is True (the default), the timer will be started and every timer_period_sec number of seconds the provided callback will be invoked. If autostart is False, the timer will be created but not started; it can then be started by calling reset() on the timer object.

Parameters:
  • timer_period_sec – The period (in seconds) of the timer.

  • callback – A user-defined callback invoked when the timer expires. May be a sync def or async def function.

  • autostart – Whether to automatically start the timer after creation; defaults to True.

Returns:

The new timer.

Raises:

RuntimeError – If the node has been destroyed.

destroy_node() None

Destroy the node.

get_clock() AsyncClock

Get the async clock used by the node.

async run() None

Run the node until destroy_node() is called.

Mutually exclusive with async with.

Raises:

RuntimeError – If the node is already running under a context manager.

async wait_for_node(fully_qualified_node_name: str, *, check_interval: float = 0.1) None

Wait until node name is present in the system.

To apply a timeout, wrap the call with async with asyncio.timeout().

Parameters:
  • fully_qualified_node_name – Fully qualified name of the node to wait for.

  • check_interval – Seconds between checks. Defaults to 0.1.

class rclpy.experimental.AsyncPublisher(publisher_impl: object, msg_type: Type[MsgT], topic: str, qos_profile: rclpy.qos.QoSProfile, on_destroy: Callable[[AsyncPublisher], None])

Bases: BasePublisher[MsgT]

A publisher on a ROS topic.

Experimental

This API is experimental.

Create a container for a ROS publisher.

Warning

Users should not create a publisher with this constructor, instead they should call Node.create_publisher() or AsyncNode.create_publisher().

class rclpy.experimental.AsyncService(service_impl: object, srv_type: ~typing.Type[rosidl_pycommon.interface_base_classes.BaseService.(~SrvRequestT, ~SrvResponseT).(~SrvRequestT, ~SrvResponseT)], srv_name: str, callback: ~typing.Callable[[~rclpy.type_support.SrvRequestT, ~rclpy.type_support.SrvResponseT], ~rclpy.type_support.SrvResponseT] | ~typing.Callable[[~rclpy.type_support.SrvRequestT, ~rclpy.type_support.SrvResponseT], ~typing.Awaitable[~rclpy.type_support.SrvResponseT]], qos_profile: rclpy.qos.QoSProfile, on_destroy: ~typing.Callable[[~rclpy.experimental.async_service.AsyncService], None], concurrent: bool = False, tg: ~asyncio.taskgroups.TaskGroup | None = None)

Bases: BaseService[SrvRequestT, SrvResponseT]

A server for a ROS service.

Experimental

This API is experimental.

Create a container for a ROS service server.

Warning

Users should not create a service server with this constructor, instead they should call Node.create_service() or AsyncNode.create_service().

class rclpy.experimental.AsyncSubscription(subscription_impl: object, msg_type: Type[MsgT], topic: str, callback: Callable[[MsgT], None] | Callable[[MsgT, MessageInfo], None] | Callable[[MsgT], Awaitable[None]] | Callable[[MsgT, MessageInfo], Awaitable[None]] | Callable[[bytes], None] | Callable[[bytes, MessageInfo], None] | Callable[[bytes], Awaitable[None]] | Callable[[bytes, MessageInfo], Awaitable[None]], qos_profile: rclpy.qos.QoSProfile, on_destroy: Callable[[AsyncSubscription], None], raw: bool = False, concurrent: bool = False, tg: TaskGroup | None = None)

Bases: BaseSubscription[MsgT]

A subscription to a ROS topic.

Experimental

This API is experimental.

Create a container for a ROS subscription.

Warning

Users should not create a subscription with this constructor, instead they should call Node.create_subscription() or AsyncNode.create_subscription().

class rclpy.experimental.AsyncTimer(timer_period_ns: int, clock: AsyncClock, context: Context, callback: Callable[[], None] | Callable[[TimerInfo], None] | Callable[[], Awaitable[None]] | Callable[[TimerInfo], Awaitable[None]], autostart: bool, on_destroy: Callable[[AsyncTimer], None], tg: TaskGroup | None = None)

Bases: BaseTimer

A periodic timer.

Experimental

This API is experimental.

Create a timer.

Warning

Users should not create a timer with this constructor, instead they should call Node.create_timer() or AsyncNode.create_timer().

property callback: Callable[[], None] | Callable[[TimerInfo], None] | Callable[[], Awaitable[None]] | Callable[[TimerInfo], Awaitable[None]]
cancel() None
rclpy.experimental.EventsExecutor(*, context: Context | None = None) Executor