rclpy.task module

class rclpy.task.Future(*, executor: Executor | None = None)

Bases: Generic[T]

Represent the outcome of a task in the future.

add_done_callback(callback: Callable[[Future[T]], None]) None

Add a callback to be executed when the task is done.

Callbacks should not raise exceptions.

The callback may be called immediately by this method if the future is already done. If this happens and the callback raises, the exception will be raised by this method.

Parameters:

callback – a callback taking the future as an argument to be run when completed

cancel() None

Request cancellation of the running task if it is not done already.

cancelled() bool

Indicate if the task has been cancelled.

Returns:

True if the task was cancelled

done() bool

Indicate if the task has finished executing.

Returns:

True if the task is finished or raised while it was executing

exception() Exception | None

Get an exception raised by a done task.

Returns:

The exception raised by the task

result() T | None

Get the result of a done task.

Raises:

Exception if one was set during the task.

Returns:

The result set by the task, or None if no result was set.

set_exception(exception: Exception) None

Set the exception raised by the task.

Parameters:

result – The output of a long running task.

set_result(result: T) None

Set the result returned by a task.

Parameters:

result – The output of a long running task.

class rclpy.task.Task(handler: Callable[[], T] | Coroutine[None, None, T] | None, args: List[object] | None = None, kwargs: Dict[str, object] | None = None, executor: Executor | None = None)

Bases: Future[T]

Execute a function or coroutine.

This executes either a normal function or a coroutine to completion. On completion it creates tasks for any ‘done’ callbacks.

This class should only be instantiated by rclpy.executors.Executor.

executing() bool

Check if the task is currently being executed.

Returns:

True if the task is currently executing.