synchros2.action module

exception synchros2.action.ActionAborted[source]

Bases: ActionException

Exception raised when the action is aborted.

exception synchros2.action.ActionCancelled[source]

Bases: ActionException

Exception raised when the action is cancelled.

exception synchros2.action.ActionException[source]

Bases: Exception

Base action exception.

__init__(action: ActionFuture) None[source]
class synchros2.action.ActionFuture[source]

Bases: FutureConvertible[ActionResultT], Generic[ActionResultT, ActionFeedbackT]

A proxy to a ROS 2 action invocation.

Action futures are to actions what plain futures are to services, with a bit more functionality to cover action specific semantics such as feedback and cancellation.

Action futures are rarely instantiated explicitly, but implicitly through Actionable APIs.

__init__(goal_handle_future: rclpy.task.Future, feedback_tape: Tape[ActionFeedbackT] | None = None) None[source]

Initializes the action future.

Parameters:
  • goal_handle_future – action goal handle future, as returned by the

  • method. (rclpy.action.ActionClient.send_goal_async)

  • feedback_tape – optional feedback tape, assumed to be tracking action feedback.

property aborted: bool

Check if action was aborted.

May raise if the action result request triggered an exception.

property accepted: bool

Check if action was accepted.

May raise if the action goal submission triggered an exception.

property acknowledged: bool

Check if action was acknowledged (i.e. accepted or rejected)

property acknowledgement: FutureLike[bool]

Get future to wait for action acknowledgement (either acceptance or rejection).

add_done_callback(done_callback: Callable[[ActionFuture[ActionResultT, ActionFeedbackT]], None]) None[source]

Add a callback to be called on action finalization.

Finalization includes action rejection, abortion, cancellation, and successful completion. Exceptions raised during action execution will also propagate through during action future introspection.

Parameters:

done_callback – callback to be called.

add_feedback_callback(feedback_callback: Callable[[ActionFuture[ActionResultT, ActionFeedbackT], ActionFeedbackT], None], forward_only: bool = False) None[source]

Add a callback to be called on each action feedback.

Parameters:
  • feedback_callback – callback to be called on each action feedback.

  • forward_only – whether to ignore buffered action feedback or not.

as_future() FutureLike[ActionResultT][source]

Get action future as a plain future.

Useful to pass action futures to APIs that expect plain futures.

cancel() FutureLike[int][source]

Cancel action.

Returns:

a future for the cancellation status code. See action_msgs.srv.CancelGoal documentation for the list of possible status codes.

property cancelled: bool

Check if action was cancelled.

May raise if the action result request triggered an exception.

property feedback: List[ActionFeedbackT]

Get all buffered action feedback.

Action must have been accepted before any feedback can be fetched.

Raises:
  • RuntimeError – if action feedback tracking is not enabled,

  • action acknowledgement has not been received yet, or

  • action was not accepted.

feedback_stream(*, forward_only: bool = False, buffer_size: int | None = None, timeout_sec: float | None = None) Iterator[ActionFeedbackT][source]

Iterate over action feedback as it comes.

Iteration stops when the given timeout expires or when the action is done executing. Note that iterating over action feedback to come is a blocking operation.

Action must have been accepted before feedback streaming is allowed.

Parameters:
  • forward_only – whether to ignore buffered action feedback or not.

  • buffer_size – optional maximum size for the incoming feedback buffer.

  • provided (If none is)

  • unbounded. (the buffer will grow)

  • timeout_sec – optional timeout, in seconds, for new action feedback.

Returns:

a lazy iterator over action feedback.

Raises:
  • RuntimeError – if action feedback tracking is not enabled,

  • action acknowledgement has not been received yet, or action

  • was not accepted.

property finalization: FutureLike[bool]

Get future to wait for action finalization.

Action rejection also counts as finalization.

property finalized: bool

Check if action was finalized.

property goal_handle: rclpy.action.client.ClientGoalHandle

Get action goal handle.

Raises:

RuntimeError – if action has not been accepted or rejected yet.

property outcome: ActionOutcome[ActionResultT, ActionFeedbackT]

Get action outcome.

Raises:

RuntimeError – if internal errors occurred during action execution.

property result: ActionResultT

Get action result.

Raises:

RuntimeError – if action has not yet been acknowledged, if it was rejected, or if it is still executing.

property status: int | None

Get action status code.

See action_msgs.msg.GoalStatus documentation for status codes. Rejected actions are conventially assigned None for status.

Raises:

RuntimeError – if action has not been acknowledged or is still executing.

property succeeded: bool

Check if action was succeeded.

May raise if the action result request triggered an exception.

property tracks_feedback: bool

Check if action future tracks action feedback.

class synchros2.action.ActionOutcome[source]

Bases: Generic[ActionResultT, ActionFeedbackT]

A simple container for action outcome.

__init__(goal_handle: rclpy.action.client.ClientGoalHandle, result: Any | None, feedback: List[ActionFeedbackT] | None) None[source]

Initializes the action outcome.

Parameters:
  • goal_handle – action client goal handle.

  • result – action result response, may be None if action was rejected.

  • feedback – buffered action feedback, may be None if feedback

  • enabled. (tracking was not)

property aborted: bool

Check if action was aborted.

property accepted: bool

Check if action was accepted.

property cancelled: bool

Check if action was cancelled.

property feedback: List[ActionFeedbackT] | None

Get all buffered action feedback.

May be None if feedback tracking was not enabled.

property result: ActionResultT | None

Get action result.

May be None if the action was rejected.

property status: int | None

Get action status code.

See action_msgs.msg.GoalStatus documentation for status codes. Rejected actions will be assigned None for status.

property succeeded: bool

Check if action was succeeded.

exception synchros2.action.ActionRejected[source]

Bases: ActionException

Exception raised when the action is rejected.

exception synchros2.action.ActionTimeout[source]

Bases: ActionException

Exception raised on action timeout.

class synchros2.action.Actionable[source]

Bases: Generic[ActionGoalT, ActionResultT, ActionFeedbackT], ComposableCallable, VectorizingCallable

An ergonomic interface to call actions in ROS 2.

Actionables wrap rclpy.action.ActionClient instances to allow for synchronous and asynchronous action invocation, in a way that resembles remote procedure calls.

__init__(action_type: Type, action_name: str, node: rclpy.node.Node | None = None, **kwargs: Any) None[source]

Initializes the actionable.

Parameters:
  • action_type – Target action type class (as generated by rosidl).

  • action_name – Name of the target action in the ROS 2 graph.

  • node – optional node for the underlying action client, defaults to

  • node. (the current process)

  • kwargs – other keyword arguments are forwarded to the underlying action client.

  • reference. (See rclpy.action.ActionClient documentation for further)

property action_client: rclpy.action.client.ActionClient

Get the underlying action client.

property action_name: str

Get the target action name.

property action_type: Type

Get the target action type.

asynchronous(goal: ActionGoalT | None = None, *, done_callback: Callable[[ActionFuture[ActionResultT, ActionFeedbackT]], None] | None = None, feedback_callback: Callable[[ActionFuture[ActionResultT, ActionFeedbackT], ActionFeedbackT], None] | None = None, track_feedback: int | bool = False) ActionFuture[ActionResultT, ActionFeedbackT][source]

Invoke action asynchronously.

Parameters:
  • goal – goal to invoke action with.

  • done_callback – optional action finalization callback, early registered through

  • ActionFuture.add_done_callback().

  • feedback_callback – optional action feedback callback, early registered through

  • not (ActionFuture.add_feedback_callback(). Implies minimal feedback tracking if)

  • enabled. (already)

  • track_feedback – whether and how to track action feedback. Other

  • tracking (than a boolean to enable or disable)

  • integer (a positive)

  • size. (may be provided to cap feedback buffer)

Returns:

the future action outcome.

synchronous(goal: ActionGoalT | None = None, *, feedback_callback: Callable[[ActionFeedbackT], None] | None = None, timeout_sec: float | None = None) ActionResultT[source]
synchronous(goal: ActionGoalT | None = None, *, feedback_callback: Callable[[ActionFeedbackT], None] | None = None, timeout_sec: float | None = None, nothrow: bool = False) ActionResultT | None

Invoke action synchronously.

See Actionable.synchronous() overloads documentation.

wait_for_server(*args: Any, **kwargs: Any) bool[source]

Wait for action server to become available.

See rclpy.action.ActionClient.wait_for_server() documentation for further reference.

class synchros2.action.ActionableProtocol[source]

Bases: Protocol[ActionGoalT, ActionResultT, ActionFeedbackT]

Ergonomic protocol to call actions in ROS 2.

__init__(*args, **kwargs)
property action_client: rclpy.action.client.ActionClient

Get the underlying action client.

asynchronous(goal: ActionGoalT | None = Ellipsis, *, done_callback: Callable[[ActionFuture[ActionResultT, ActionFeedbackT]], None] | None = Ellipsis, feedback_callback: Callable[[ActionFuture[ActionResultT, ActionFeedbackT], ActionFeedbackT], None] | None = Ellipsis, track_feedback: int | bool = Ellipsis) ActionFuture[ActionResultT, ActionFeedbackT][source]

Invoke action asynchronously.

Parameters:
  • goal – target action goal, or a default initialized one if none is provided.

  • done_callback – optional action finalization callback, early registered through

  • ActionFuture.add_done_callback().

  • feedback_callback – optional action feedback callback, early registered through

  • not (ActionFuture.add_feedback_callback(). Implies minimal feedback tracking if)

  • enabled. (already)

  • track_feedback – whether and how to track action feedback. Other than a boolean to

  • tracking (enable or disable)

  • feedback (a non negative integer may be provided to cap)

  • size. (buffer)

Returns:

the future action outcome.

compose(func: Callable) ComposedCallable[source]

Compose this actionable with the given func.

Parameters:

func – callable to be composed, assumed synchronous.

Returns:

the composed generalized callable.

synchronous(goal: ActionGoalT | None = ..., *, feedback_callback: Callable[[ActionFeedbackT], None] | None = ..., timeout_sec: float | None = ...) ActionResultT
synchronous(goal: ActionGoalT | None = ..., *, feedback_callback: Callable[[ActionFeedbackT], None] | None = ..., timeout_sec: float | None = ..., nothrow: bool = ...) ActionResultT | None

Helper for @overload to raise when called.

property vectorized: VectorizedCallable

Get a vectorized version of this actionable.

wait_for_server(*args: Any, **kwargs: Any) bool[source]

Wait for action to become available.

synchros2.action.unwrap_outcome(action: ActionFuture[ActionResultT, ActionFeedbackT], timeout_sec: float | None = None) ActionOutcome[ActionResultT, ActionFeedbackT][source]

Fetch action outcome when finalized.

Parameters:
  • action – action future to wait for.

  • timeout_sec – optional timeout, in seconds.

Returns:

the action outcome.

Raises:

ValueError – on timeout.

synchros2.action.wait_and_return_outcome(action: ActionFuture[ActionResultT, ActionFeedbackT], timeout_sec: float | None = None) ActionOutcome[ActionResultT, ActionFeedbackT]

Fetch action outcome when finalized.

Alias for unwrap_outcome().

synchros2.action.wait_for_outcome(action: ActionFuture[ActionResultT, ActionFeedbackT], timeout_sec: float | None = None) bool[source]

Wait for action outcome.

Parameters:
  • action – action future to wait for.

  • timeout_sec – optional timeout, in seconds.

Returns:

whether the action finalized before the timeout expired.