synchros2.service module

exception synchros2.service.ServiceError[source]

Bases: ServiceException

Exception raised on service error.

exception synchros2.service.ServiceException[source]

Bases: Exception

Base service exception.

__init__(service: rclpy.task.Future) None[source]
exception synchros2.service.ServiceTimeout[source]

Bases: ServiceException

Exception raised on service timeout.

class synchros2.service.Serviced[source]

Bases: Generic[ServiceRequestT, ServiceResponseT], ComposableCallable, VectorizingCallable

An ergonomic interface to call services in ROS 2.

Serviced instances wrap rclpy.Client instances to allow for synchronous and asynchronous service invocations, in a way that resembles remote procedure calls.

__init__(service_type: Type, service_name: str, node: rclpy.node.Node | None = None, **kwargs: Any) None[source]
asynchronous(request: ServiceRequestT | None = None) FutureLike[ServiceResponseT][source]

Invoke service asynchronously.

Parameters:
  • request – request to be serviced. If none is provided, a default

  • instead. (initialized request will be used)

Returns:

the future response.

property client: rclpy.client.Client

Get the underlying service client.

property service_name: str

Get the target service name.

property service_type: Type

Get the target service type.

synchronous(request: ServiceRequestT | None = None, *, timeout_sec: float | None = None) ServiceResponseT[source]
synchronous(request: ServiceRequestT | None = None, *, timeout_sec: float | None = None, nothrow: bool = False) ServiceResponseT | None

Invoke service synchronously.

Check available overloads documentation.

wait_for_service(*args: Any, **kwargs: Any) bool[source]

Wait for service to become available.

See rclpy.Client.wait_for_service() documentation for further reference.

class synchros2.service.ServicedProtocol[source]

Bases: Protocol[ServiceRequestT, ServiceResponseT]

Ergonomic protocol to call services in ROS 2.

__init__(*args, **kwargs)
asynchronous(request: ServiceRequestT | None = Ellipsis) FutureLike[ServiceResponseT][source]

Invoke service asynchronously.

Parameters:

request – request to be serviced, or a default initialized one if none is provided.

Returns:

the future response.

property client: rclpy.client.Client

Get the underlying service client.

synchronous(request: ServiceRequestT | None = ..., *, timeout_sec: float | None = ...) ServiceResponseT
synchronous(request: ServiceRequestT | None = ..., *, timeout_sec: float | None = ..., nothrow: bool = ...) ServiceResponseT | None

Helper for @overload to raise when called.

wait_for_service(*args: Any, **kwargs: Any) bool[source]

Wait for service to become available.