launch.utilities package

Submodules

Module contents

Package for utilities.

class launch.utilities.AsyncSafeSignalManager[source]

Bases: object

A context manager class for asynchronous handling of signals.

Similar in purpose to asyncio.loop.add_signal_handler() but not limited to Unix platforms.

Signal handlers can be registered at any time with a given manager. These will become active for the extent of said manager context. Unlike regular signal handlers, asynchronous signals handlers can safely interact with their event loop.

The same manager can be used multiple consecutive times and even be nested with other managers, as these are independent from each other i.e. managers do not override each other’s handlers.

If used outside of the main thread, a ValueError is raised.

The underlying mechanism is built around signal.set_wakeup_fd() so as to not interfere with regular handlers installed via signal.signal(). All signals received are forwarded to the previously setup file descriptor, if any.

..warning::

Within (potentially nested) contexts, signal.set_wakeup_fd() calls are intercepted such that the given file descriptor overrides the previously setup file descriptor for the outermost manager. This ensures the manager’s chain of signal wakeup file descriptors is not broken by third-party code or by asyncio itself in some platforms.

__init__(loop: AbstractEventLoop)[source]

Instantiate manager.

Parameters:

loop – event loop that will handle the signals.

handle(signum: Signals | int, handler: Callable[[int], None] | None) Callable[[int], None] | None[source]

Register a callback for asynchronous handling of a given signal.

Parameters:
  • signum – number of the signal to be handled

  • handler – callback taking a signal number as its sole argument, or None

Returns:

previous handler if any, otherwise None

launch.utilities.ensure_argument_type(argument: Any, types: type | Iterable[type], argument_name: str, caller: str | None = None) None[source]

Ensure that the given argument is an instance of or subclass of one of the given types.

If the argument does not match one of the types, a TypeError is raised. The caller is included in the error message if given and not None.

launch.utilities.is_a(obj, entity_type)[source]

Return True if obj is an instance of the entity_type class.

launch.utilities.is_a_subclass(obj, entity_type)[source]

Return True if obj is an instance of the entity_type class or one of its subclass types.

launch.utilities.isclassinstance(obj)[source]

Return True if obj is an instance of a class.

launch.utilities.normalize_to_list_of_entities(entities: Iterable[LaunchDescriptionEntity | Iterable[LaunchDescriptionEntity]]) List[LaunchDescriptionEntity][source]

Return a list of Substitutions given a variety of starting inputs.

launch.utilities.normalize_to_list_of_substitutions(subs: str | Substitution | Iterable[str | Substitution]) List[Substitution][source]

Return a list of Substitutions given a variety of starting inputs.

launch.utilities.perform_substitutions(context: LaunchContext, subs: List[Substitution]) str[source]

Resolve a list of Substitutions with a context into a single string.

launch.utilities.visit_all_entities_and_collect_futures(entity: LaunchDescriptionEntity, context: LaunchContext) List[Tuple[LaunchDescriptionEntity, Future]][source]

Visit given entity, as well as all sub-entities, and collect any futures.

Sub-entities are visited recursively and depth-first. The future is collected from each entity (unless it returns None) before continuing on to more sub-entities.

This function may call itself to traverse the sub-entities recursively.