synchros2.action module
- exception synchros2.action.ActionAborted[source]
Bases:
ActionExceptionException raised when the action is aborted.
- exception synchros2.action.ActionCancelled[source]
Bases:
ActionExceptionException raised when the action is cancelled.
- exception synchros2.action.ActionException[source]
Bases:
ExceptionBase action exception.
- __init__(action: ActionFuture) None[source]
- class synchros2.action.ActionFuture[source]
Bases:
FutureConvertible[ActionResultT],Generic[ActionResultT,ActionFeedbackT]A proxy to a ROS 2 action invocation.
Action futures are to actions what plain futures are to services, with a bit more functionality to cover action specific semantics such as feedback and cancellation.
Action futures are rarely instantiated explicitly, but implicitly through Actionable APIs.
- __init__(goal_handle_future: rclpy.task.Future, feedback_tape: Tape[ActionFeedbackT] | None = None) None[source]
Initializes the action future.
- Parameters:
goal_handle_future – action goal handle future, as returned by the
method. (rclpy.action.ActionClient.send_goal_async)
feedback_tape – optional feedback tape, assumed to be tracking action feedback.
- property aborted: bool
Check if action was aborted.
May raise if the action result request triggered an exception.
- property accepted: bool
Check if action was accepted.
May raise if the action goal submission triggered an exception.
- property acknowledgement: FutureLike[bool]
Get future to wait for action acknowledgement (either acceptance or rejection).
- add_done_callback(done_callback: Callable[[ActionFuture[ActionResultT, ActionFeedbackT]], None]) None[source]
Add a callback to be called on action finalization.
Finalization includes action rejection, abortion, cancellation, and successful completion. Exceptions raised during action execution will also propagate through during action future introspection.
- Parameters:
done_callback – callback to be called.
- add_feedback_callback(feedback_callback: Callable[[ActionFuture[ActionResultT, ActionFeedbackT], ActionFeedbackT], None], forward_only: bool = False) None[source]
Add a callback to be called on each action feedback.
- Parameters:
feedback_callback – callback to be called on each action feedback.
forward_only – whether to ignore buffered action feedback or not.
- as_future() FutureLike[ActionResultT][source]
Get action future as a plain future.
Useful to pass action futures to APIs that expect plain futures.
- cancel() FutureLike[int][source]
Cancel action.
- Returns:
a future for the cancellation status code. See action_msgs.srv.CancelGoal documentation for the list of possible status codes.
- property cancelled: bool
Check if action was cancelled.
May raise if the action result request triggered an exception.
- property feedback: List[ActionFeedbackT]
Get all buffered action feedback.
Action must have been accepted before any feedback can be fetched.
- Raises:
RuntimeError – if action feedback tracking is not enabled,
action acknowledgement has not been received yet, or –
action was not accepted. –
- feedback_stream(*, forward_only: bool = False, buffer_size: int | None = None, timeout_sec: float | None = None) Iterator[ActionFeedbackT][source]
Iterate over action feedback as it comes.
Iteration stops when the given timeout expires or when the action is done executing. Note that iterating over action feedback to come is a blocking operation.
Action must have been accepted before feedback streaming is allowed.
- Parameters:
forward_only – whether to ignore buffered action feedback or not.
buffer_size – optional maximum size for the incoming feedback buffer.
provided (If none is)
unbounded. (the buffer will grow)
timeout_sec – optional timeout, in seconds, for new action feedback.
- Returns:
a lazy iterator over action feedback.
- Raises:
RuntimeError – if action feedback tracking is not enabled,
action acknowledgement has not been received yet, or action –
was not accepted. –
- property finalization: FutureLike[bool]
Get future to wait for action finalization.
Action rejection also counts as finalization.
- property goal_handle: rclpy.action.client.ClientGoalHandle
Get action goal handle.
- Raises:
RuntimeError – if action has not been accepted or rejected yet.
- property outcome: ActionOutcome[ActionResultT, ActionFeedbackT]
Get action outcome.
- Raises:
RuntimeError – if internal errors occurred during action execution.
- property result: ActionResultT
Get action result.
- Raises:
RuntimeError – if action has not yet been acknowledged, if it was rejected, or if it is still executing.
- property status: int | None
Get action status code.
See action_msgs.msg.GoalStatus documentation for status codes. Rejected actions are conventially assigned None for status.
- Raises:
RuntimeError – if action has not been acknowledged or is still executing.
- class synchros2.action.ActionOutcome[source]
Bases:
Generic[ActionResultT,ActionFeedbackT]A simple container for action outcome.
- __init__(goal_handle: rclpy.action.client.ClientGoalHandle, result: Any | None, feedback: List[ActionFeedbackT] | None) None[source]
Initializes the action outcome.
- Parameters:
goal_handle – action client goal handle.
result – action result response, may be None if action was rejected.
feedback – buffered action feedback, may be None if feedback
enabled. (tracking was not)
- property feedback: List[ActionFeedbackT] | None
Get all buffered action feedback.
May be None if feedback tracking was not enabled.
- exception synchros2.action.ActionRejected[source]
Bases:
ActionExceptionException raised when the action is rejected.
- exception synchros2.action.ActionTimeout[source]
Bases:
ActionExceptionException raised on action timeout.
- class synchros2.action.Actionable[source]
Bases:
Generic[ActionGoalT,ActionResultT,ActionFeedbackT],ComposableCallable,VectorizingCallableAn ergonomic interface to call actions in ROS 2.
Actionables wrap rclpy.action.ActionClient instances to allow for synchronous and asynchronous action invocation, in a way that resembles remote procedure calls.
- __init__(action_type: Type, action_name: str, node: rclpy.node.Node | None = None, **kwargs: Any) None[source]
Initializes the actionable.
- Parameters:
action_type – Target action type class (as generated by
rosidl).action_name – Name of the target action in the ROS 2 graph.
node – optional node for the underlying action client, defaults to
node. (the current process)
kwargs – other keyword arguments are forwarded to the underlying action client.
reference. (See rclpy.action.ActionClient documentation for further)
- property action_client: rclpy.action.client.ActionClient
Get the underlying action client.
- asynchronous(goal: ActionGoalT | None = None, *, done_callback: Callable[[ActionFuture[ActionResultT, ActionFeedbackT]], None] | None = None, feedback_callback: Callable[[ActionFuture[ActionResultT, ActionFeedbackT], ActionFeedbackT], None] | None = None, track_feedback: int | bool = False) ActionFuture[ActionResultT, ActionFeedbackT][source]
Invoke action asynchronously.
- Parameters:
goal – goal to invoke action with.
done_callback – optional action finalization callback, early registered through
ActionFuture.add_done_callback().
feedback_callback – optional action feedback callback, early registered through
not (ActionFuture.add_feedback_callback(). Implies minimal feedback tracking if)
enabled. (already)
track_feedback – whether and how to track action feedback. Other
tracking (than a boolean to enable or disable)
integer (a positive)
size. (may be provided to cap feedback buffer)
- Returns:
the future action outcome.
- synchronous(goal: ActionGoalT | None = None, *, feedback_callback: Callable[[ActionFeedbackT], None] | None = None, timeout_sec: float | None = None) ActionResultT[source]
- synchronous(goal: ActionGoalT | None = None, *, feedback_callback: Callable[[ActionFeedbackT], None] | None = None, timeout_sec: float | None = None, nothrow: bool = False) ActionResultT | None
Invoke action synchronously.
See Actionable.synchronous() overloads documentation.
- class synchros2.action.ActionableProtocol[source]
Bases:
Protocol[ActionGoalT,ActionResultT,ActionFeedbackT]Ergonomic protocol to call actions in ROS 2.
- __init__(*args, **kwargs)
- property action_client: rclpy.action.client.ActionClient
Get the underlying action client.
- asynchronous(goal: ActionGoalT | None = Ellipsis, *, done_callback: Callable[[ActionFuture[ActionResultT, ActionFeedbackT]], None] | None = Ellipsis, feedback_callback: Callable[[ActionFuture[ActionResultT, ActionFeedbackT], ActionFeedbackT], None] | None = Ellipsis, track_feedback: int | bool = Ellipsis) ActionFuture[ActionResultT, ActionFeedbackT][source]
Invoke action asynchronously.
- Parameters:
goal – target action goal, or a default initialized one if none is provided.
done_callback – optional action finalization callback, early registered through
ActionFuture.add_done_callback().
feedback_callback – optional action feedback callback, early registered through
not (ActionFuture.add_feedback_callback(). Implies minimal feedback tracking if)
enabled. (already)
track_feedback – whether and how to track action feedback. Other than a boolean to
tracking (enable or disable)
feedback (a non negative integer may be provided to cap)
size. (buffer)
- Returns:
the future action outcome.
- compose(func: Callable) ComposedCallable[source]
Compose this actionable with the given func.
- Parameters:
func – callable to be composed, assumed synchronous.
- Returns:
the composed generalized callable.
- synchronous(goal: ActionGoalT | None = ..., *, feedback_callback: Callable[[ActionFeedbackT], None] | None = ..., timeout_sec: float | None = ...) ActionResultT
- synchronous(goal: ActionGoalT | None = ..., *, feedback_callback: Callable[[ActionFeedbackT], None] | None = ..., timeout_sec: float | None = ..., nothrow: bool = ...) ActionResultT | None
Helper for @overload to raise when called.
- property vectorized: VectorizedCallable
Get a vectorized version of this actionable.
- synchros2.action.unwrap_outcome(action: ActionFuture[ActionResultT, ActionFeedbackT], timeout_sec: float | None = None) ActionOutcome[ActionResultT, ActionFeedbackT][source]
Fetch action outcome when finalized.
- Parameters:
action – action future to wait for.
timeout_sec – optional timeout, in seconds.
- Returns:
the action outcome.
- Raises:
ValueError – on timeout.
- synchros2.action.wait_and_return_outcome(action: ActionFuture[ActionResultT, ActionFeedbackT], timeout_sec: float | None = None) ActionOutcome[ActionResultT, ActionFeedbackT]
Fetch action outcome when finalized.
Alias for unwrap_outcome().
- synchros2.action.wait_for_outcome(action: ActionFuture[ActionResultT, ActionFeedbackT], timeout_sec: float | None = None) bool[source]
Wait for action outcome.
- Parameters:
action – action future to wait for.
timeout_sec – optional timeout, in seconds.
- Returns:
whether the action finalized before the timeout expired.