rclpy.experimental package
Submodules
- rclpy.experimental.async_client module
- rclpy.experimental.async_clock module
- rclpy.experimental.async_node module
- rclpy.experimental.async_publisher module
- rclpy.experimental.async_service module
- rclpy.experimental.async_subscription module
- rclpy.experimental.async_timer module
- rclpy.experimental.events_executor module
Module contents
- class rclpy.experimental.AsyncClient(context: ~rclpy.context.Context, client_impl: object, srv_type: ~typing.Type[rosidl_pycommon.interface_base_classes.BaseService.(~SrvRequestT, ~SrvResponseT).(~SrvRequestT, ~SrvResponseT)], srv_name: str, qos_profile: rclpy.qos.QoSProfile, on_destroy: ~typing.Callable[[~rclpy.experimental.async_client.AsyncClient], None], tg: ~asyncio.taskgroups.TaskGroup | None = None)
Bases:
BaseClient[SrvRequestT,SrvResponseT]A client of a ROS service.
Experimental
This API is experimental.
Create a container for a ROS service client.
Warning
Users should not create a service client with this constructor, instead they should call
Node.create_client()orAsyncNode.create_client().- async call(request: SrvRequestT) SrvResponseT
Send a service request and await the response.
- async wait_for_service(*, check_interval: float = 0.1) None
Wait for a service server to become ready.
To apply a timeout, wrap the call with
async with asyncio.timeout().- Parameters:
check_interval – Seconds between checks. Defaults to 0.1.
- class rclpy.experimental.AsyncClock(*args: Any, **kwargs: Any)
Bases:
BaseClockA ROS clock with async
sleep.Experimental
This API is experimental.
- async sleep(duration_sec: float) None
Sleep for a duration respecting sim time.
Cancelled on clock destruction. Raises TimeSourceChangedError if ROS time is activated or deactivated during the sleep.
- class rclpy.experimental.AsyncNode(node_name: str, *, context: Context | None = None, cli_args: list[str] | None = None, namespace: str | None = None, use_global_arguments: bool = True, enable_rosout: bool = True, rosout_qos_profile: rclpy.qos.QoSProfile | int = rclpy.qos.qos_profile_rosout_default, start_parameter_services: bool = True, parameter_overrides: list[Parameter[Any]] | None = None, allow_undeclared_parameters: bool = False, automatically_declare_parameters_from_overrides: bool = False, enable_logger_service: bool = False)
Bases:
BaseNodeA Node in the ROS graph that runs on an asyncio event loop.
Like
Node,AsyncNodeis the primary entrypoint for communication — it creates publishers, subscriptions, services, clients, and timers. UnlikeNode, it does not require an rclpy executor: the node owns its ownasyncio.TaskGroup, and DDS callbacks are dispatched directly onto the running event loop.Experimental
This API is experimental.
Example:
async with AsyncNode('my_node') as node: node.create_subscription(String, '/topic', callback, 10) await my_async_task()
See
run()for a shorthand when the node is the only foreground task.Create an AsyncNode.
- Parameters:
node_name – A name to give to this node. Validated by
validate_node_name().context – The context to be associated with, or
Nonefor the default global context.cli_args – A list of strings of command line args to be used only by this node. These arguments are used to extract remappings used by the node and other ROS specific settings, as well as user defined non-ROS arguments.
namespace – The namespace to which relative topic and service names will be prefixed. Validated by
validate_namespace().use_global_arguments –
Falseif the node should ignore process-wide command line args.enable_rosout –
Falseif the node should ignore rosout logging.rosout_qos_profile – A QoSProfile or a history depth to apply to rosout publisher. In the case that a history depth is provided, the QoS history is set to KEEP_LAST, the QoS history depth is set to the value of the parameter, and all other QoS settings are set to their default value.
start_parameter_services –
Falseif the node should not create parameter services.parameter_overrides – A list of overrides for initial values for parameters declared on the node.
allow_undeclared_parameters – True if undeclared parameters are allowed. This flag affects the behavior of parameter-related operations.
automatically_declare_parameters_from_overrides – If True, the “parameter overrides” will be used to implicitly declare parameters on the node during creation.
enable_logger_service –
Trueif ROS2 services are created to allow external nodes to get and set logger levels of this node. Otherwise, logger levels are only managed locally. That is, logger levels cannot be changed remotely.
- create_client(srv_type: ~typing.Type[rosidl_pycommon.interface_base_classes.BaseService.(~SrvRequestT, ~SrvResponseT).(~SrvRequestT, ~SrvResponseT)], srv_name: str, *, qos_profile: rclpy.qos.QoSProfile = rclpy.qos.qos_profile_services_default) AsyncClient[SrvRequestT, SrvResponseT]
Create a new service client.
- Parameters:
srv_type – The service type.
srv_name – The name of the service.
qos_profile – The quality of service profile to apply to the service client.
- Returns:
The new client.
- Raises:
RuntimeError – If the node has been destroyed.
- create_publisher(msg_type: Type[MsgT], topic: str, qos_profile: rclpy.qos.QoSProfile | int) AsyncPublisher[MsgT]
Create a new publisher.
- Parameters:
msg_type – The type of ROS messages the publisher will publish.
topic – The name of the topic the publisher will publish to.
qos_profile – A QoSProfile or a history depth to apply to the publisher. In the case that a history depth is provided, the QoS history is set to KEEP_LAST, the QoS history depth is set to the value of the parameter, and all other QoS settings are set to their default values.
- Returns:
The new publisher.
- Raises:
RuntimeError – If the node has been destroyed.
- create_service(srv_type: ~typing.Type[rosidl_pycommon.interface_base_classes.BaseService.(~SrvRequestT, ~SrvResponseT).(~SrvRequestT, ~SrvResponseT)], srv_name: str, callback: ~typing.Callable[[~rclpy.type_support.SrvRequestT, ~rclpy.type_support.SrvResponseT], ~rclpy.type_support.SrvResponseT] | ~typing.Callable[[~rclpy.type_support.SrvRequestT, ~rclpy.type_support.SrvResponseT], ~typing.Awaitable[~rclpy.type_support.SrvResponseT]], *, qos_profile: rclpy.qos.QoSProfile = rclpy.qos.qos_profile_services_default, concurrent: bool = False) AsyncService[SrvRequestT, SrvResponseT]
Create a new service server.
- Parameters:
srv_type – The service type.
srv_name – The name of the service.
callback – A user-defined callback invoked for each received request; its return value is sent as the response. May be a sync
deforasync deffunction.qos_profile – The quality of service profile to apply to the service server.
concurrent – If
True, each request is taken as soon as it arrives and its handler is dispatched as an independent task, so handlers may run concurrently. IfFalse(default), the next request is not taken from DDS until the previous handler has finished — handler execution back-pressures the read loop.
- Returns:
The new service.
- Raises:
RuntimeError – If the node has been destroyed.
- create_subscription(msg_type: Type[MsgT], topic: str, callback: Callable[[bytes], None] | Callable[[bytes, MessageInfo], None] | Callable[[bytes], Awaitable[None]] | Callable[[bytes, MessageInfo], Awaitable[None]], qos_profile: rclpy.qos.QoSProfile | int, *, raw: Literal[True], concurrent: bool = False, content_filter_options: ContentFilterOptions | None = None) AsyncSubscription[MsgT]
- create_subscription(msg_type: Type[MsgT], topic: str, callback: Callable[[MsgT], None] | Callable[[MsgT, MessageInfo], None] | Callable[[MsgT], Awaitable[None]] | Callable[[MsgT, MessageInfo], Awaitable[None]], qos_profile: rclpy.qos.QoSProfile | int, *, raw: Literal[False], concurrent: bool = False, content_filter_options: ContentFilterOptions | None = None) AsyncSubscription[MsgT]
- create_subscription(msg_type: Type[MsgT], topic: str, callback: Callable[[MsgT], None] | Callable[[MsgT, MessageInfo], None] | Callable[[MsgT], Awaitable[None]] | Callable[[MsgT, MessageInfo], Awaitable[None]] | Callable[[bytes], None] | Callable[[bytes, MessageInfo], None] | Callable[[bytes], Awaitable[None]] | Callable[[bytes, MessageInfo], Awaitable[None]], qos_profile: rclpy.qos.QoSProfile | int, *, raw: bool = False, concurrent: bool = False, content_filter_options: ContentFilterOptions | None = None) AsyncSubscription[MsgT]
Create a new subscription.
- Parameters:
msg_type – The type of ROS messages the subscription will subscribe to.
topic – The name of the topic the subscription will subscribe to.
callback – A user-defined callback invoked for each received message. May be a sync
deforasync deffunction.qos_profile – A QoSProfile or a history depth to apply to the subscription. In the case that a history depth is provided, the QoS history is set to KEEP_LAST, the QoS history depth is set to the value of the parameter, and all other QoS settings are set to their default values.
raw – If
True, received messages will be delivered in raw binary representation.concurrent – If
True, each message is taken as soon as it arrives and its callback is dispatched as an independent task, so callbacks may run concurrently. IfFalse(default), the next message is not taken from DDS until the previous callback has finished — callback execution back-pressures the read loop.content_filter_options – The filter expression and parameters for content filtering.
- Returns:
The new subscription.
- Raises:
RuntimeError – If the node has been destroyed.
- create_timer(timer_period_sec: float, callback: Callable[[], None] | Callable[[TimerInfo], None] | Callable[[], Awaitable[None]] | Callable[[TimerInfo], Awaitable[None]], autostart: bool = True) AsyncTimer
Create a new timer.
If autostart is
True(the default), the timer will be started and everytimer_period_secnumber of seconds the provided callback will be invoked. If autostart isFalse, the timer will be created but not started; it can then be started by callingreset()on the timer object.- Parameters:
timer_period_sec – The period (in seconds) of the timer.
callback – A user-defined callback invoked when the timer expires. May be a sync
deforasync deffunction.autostart – Whether to automatically start the timer after creation; defaults to
True.
- Returns:
The new timer.
- Raises:
RuntimeError – If the node has been destroyed.
- destroy_node() None
Destroy the node.
- get_clock() AsyncClock
Get the async clock used by the node.
- async run() None
Run the node until destroy_node() is called.
Mutually exclusive with
async with.- Raises:
RuntimeError – If the node is already running under a context manager.
- async wait_for_node(fully_qualified_node_name: str, *, check_interval: float = 0.1) None
Wait until node name is present in the system.
To apply a timeout, wrap the call with
async with asyncio.timeout().- Parameters:
fully_qualified_node_name – Fully qualified name of the node to wait for.
check_interval – Seconds between checks. Defaults to 0.1.
- class rclpy.experimental.AsyncPublisher(publisher_impl: object, msg_type: Type[MsgT], topic: str, qos_profile: rclpy.qos.QoSProfile, on_destroy: Callable[[AsyncPublisher], None])
Bases:
BasePublisher[MsgT]A publisher on a ROS topic.
Experimental
This API is experimental.
Create a container for a ROS publisher.
Warning
Users should not create a publisher with this constructor, instead they should call
Node.create_publisher()orAsyncNode.create_publisher().
- class rclpy.experimental.AsyncService(service_impl: object, srv_type: ~typing.Type[rosidl_pycommon.interface_base_classes.BaseService.(~SrvRequestT, ~SrvResponseT).(~SrvRequestT, ~SrvResponseT)], srv_name: str, callback: ~typing.Callable[[~rclpy.type_support.SrvRequestT, ~rclpy.type_support.SrvResponseT], ~rclpy.type_support.SrvResponseT] | ~typing.Callable[[~rclpy.type_support.SrvRequestT, ~rclpy.type_support.SrvResponseT], ~typing.Awaitable[~rclpy.type_support.SrvResponseT]], qos_profile: rclpy.qos.QoSProfile, on_destroy: ~typing.Callable[[~rclpy.experimental.async_service.AsyncService], None], concurrent: bool = False, tg: ~asyncio.taskgroups.TaskGroup | None = None)
Bases:
BaseService[SrvRequestT,SrvResponseT]A server for a ROS service.
Experimental
This API is experimental.
Create a container for a ROS service server.
Warning
Users should not create a service server with this constructor, instead they should call
Node.create_service()orAsyncNode.create_service().
- class rclpy.experimental.AsyncSubscription(subscription_impl: object, msg_type: Type[MsgT], topic: str, callback: Callable[[MsgT], None] | Callable[[MsgT, MessageInfo], None] | Callable[[MsgT], Awaitable[None]] | Callable[[MsgT, MessageInfo], Awaitable[None]] | Callable[[bytes], None] | Callable[[bytes, MessageInfo], None] | Callable[[bytes], Awaitable[None]] | Callable[[bytes, MessageInfo], Awaitable[None]], qos_profile: rclpy.qos.QoSProfile, on_destroy: Callable[[AsyncSubscription], None], raw: bool = False, concurrent: bool = False, tg: TaskGroup | None = None)
Bases:
BaseSubscription[MsgT]A subscription to a ROS topic.
Experimental
This API is experimental.
Create a container for a ROS subscription.
Warning
Users should not create a subscription with this constructor, instead they should call
Node.create_subscription()orAsyncNode.create_subscription().
- class rclpy.experimental.AsyncTimer(timer_period_ns: int, clock: AsyncClock, context: Context, callback: Callable[[], None] | Callable[[TimerInfo], None] | Callable[[], Awaitable[None]] | Callable[[TimerInfo], Awaitable[None]], autostart: bool, on_destroy: Callable[[AsyncTimer], None], tg: TaskGroup | None = None)
Bases:
BaseTimerA periodic timer.
Experimental
This API is experimental.
Create a timer.
Warning
Users should not create a timer with this constructor, instead they should call
Node.create_timer()orAsyncNode.create_timer().- property callback: Callable[[], None] | Callable[[TimerInfo], None] | Callable[[], Awaitable[None]] | Callable[[TimerInfo], Awaitable[None]]
- cancel() None