Source code for launch.descriptions.executable

# Copyright 2020 Southwest Research Institute, All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# DISTRIBUTION A. Approved for public release; distribution unlimited.
# OPSEC #4584.

"""Module for a description of an Executable."""

import os
import re
import shlex
import threading
from typing import Dict
from typing import Iterable
from typing import List
from typing import Optional
from typing import Tuple

from ..action import Action
from ..launch_context import LaunchContext
from ..some_substitutions_type import SomeSubstitutionsType
from ..substitution import Substitution
from ..substitutions import LaunchConfiguration
from ..utilities import normalize_to_list_of_substitutions
from ..utilities import perform_substitutions

_executable_process_counter_lock = threading.Lock()
_executable_process_counter = 0  # in Python3, this number is unbounded (no rollover)


[docs] class Executable: """Describes an executable (usually a single process) which may be run by the launch system."""
[docs] def __init__( self, *, cmd: Iterable[SomeSubstitutionsType], prefix: Optional[SomeSubstitutionsType] = None, name: Optional[SomeSubstitutionsType] = None, cwd: Optional[SomeSubstitutionsType] = None, env: Optional[Dict[SomeSubstitutionsType, SomeSubstitutionsType]] = None, additional_env: Optional[Dict[SomeSubstitutionsType, SomeSubstitutionsType]] = None, arguments: Optional[Iterable[SomeSubstitutionsType]] = None, ) -> None: """ Initialize an Executable description. :param cmd: A list where the first item is the executable and the rest are arguments to the executable, each item may be a string or a list of strings and Substitutions to be resolved at runtime :param prefix: a set of commands/arguments to precede the cmd, used for things like gdb/valgrind and defaults to the LaunchConfiguration called 'launch-prefix'. Note that a non-default prefix provided in a launch file will override the prefix provided via the `launch-prefix` launch configuration regardless of whether the `launch-prefix-filter` launch configuration is provided. :param name: The label used to represent the process, as a string or a Substitution to be resolved at runtime, defaults to the basename of the executable :param cwd: The directory in which to run the executable :param env: Dictionary of environment variables to be used, starting from a clean environment. If None, the current environment of the launch context is used. :param additional_env: Dictionary of environment variables to be added. If env was None, they are added to the current environment. If not, env is updated with additional_env. :param arguments: list of extra arguments for the executable """ self.__cmd = [normalize_to_list_of_substitutions(x) for x in cmd] self.__cmd += ([] if arguments is None else [normalize_to_list_of_substitutions(x) for x in arguments]) self.__prefix = normalize_to_list_of_substitutions( LaunchConfiguration('launch-prefix', default='') if prefix is None else prefix ) self.__prefix_filter = normalize_to_list_of_substitutions( LaunchConfiguration('launch-prefix-filter', default='') ) if prefix is None else None self.__name = name if name is None else normalize_to_list_of_substitutions(name) self.__cwd = cwd if cwd is None else normalize_to_list_of_substitutions(cwd) self.__env = None # type: Optional[List[Tuple[List[Substitution], List[Substitution]]]] if env is not None: self.__env = [] for key, value in env.items(): self.__env.append(( normalize_to_list_of_substitutions(key), normalize_to_list_of_substitutions(value))) self.__additional_env: Optional[List[Tuple[List[Substitution], List[Substitution]]]] = None if additional_env is not None: self.__additional_env = [] for key, value in additional_env.items(): self.__additional_env.append(( normalize_to_list_of_substitutions(key), normalize_to_list_of_substitutions(value))) self.__arguments = arguments self.__final_cmd: Optional[List[str]] = None self.__final_cwd: Optional[str] = None self.__final_env: Optional[Dict[str, str]] = None self.__final_name: Optional[str] = None
@property def name(self): """Getter for name.""" return self.__name @property def prefix(self): """Getter for prefix.""" return self.__prefix @property def cmd(self): """Getter for cmd.""" return self.__cmd @property def cwd(self): """Getter for cwd.""" return self.__cwd @property def env(self): """Getter for env.""" return self.__env @property def additional_env(self): """Getter for additional_env.""" return self.__additional_env @property def arguments(self): """Getter for arguments.""" return self.__arguments @property def final_name(self): """Getter for final_name.""" return self.__final_name @property def final_cmd(self): """Getter for final_cmd.""" return self.__final_cmd @property def final_cwd(self): """Getter for cwd.""" return self.__final_cwd @property def final_env(self): """Getter for final_env.""" return self.__final_env
[docs] def prepare(self, context: LaunchContext, action: Action): """ Prepare an executable description for execution in a given environment. This does the following: - performs substitutions on various properties Note that 'action' is not used at this level; it is provided for use by subclasses which may override this method. """ # expand substitutions in arguments to async_execute_process() cmd = [perform_substitutions(context, x) for x in self.__cmd] # Perform filtering for prefix application should_apply_prefix = True # by default if self.__prefix_filter is not None: # no prefix given on construction prefix_filter = perform_substitutions(context, self.__prefix_filter) # Apply if filter regex matches (empty regex matches all strings) should_apply_prefix = re.match(prefix_filter, os.path.basename(cmd[0])) is not None if should_apply_prefix: cmd = shlex.split(perform_substitutions(context, self.__prefix)) + cmd self.__final_cmd = cmd name = os.path.basename(cmd[0]) if self.__name is None \ else perform_substitutions(context, self.__name) with _executable_process_counter_lock: global _executable_process_counter _executable_process_counter += 1 self.__final_name = f'{name}-{_executable_process_counter}' cwd = None if self.__cwd is not None: cwd = ''.join([context.perform_substitution(x) for x in self.__cwd]) self.__final_cwd = cwd env = {} if self.__env is not None: for key, value in self.__env: env[''.join([context.perform_substitution(x) for x in key])] = \ ''.join([context.perform_substitution(x) for x in value]) else: env = dict(context.environment) if self.__additional_env is not None: for key, value in self.__additional_env: env[''.join([context.perform_substitution(x) for x in key])] = \ ''.join([context.perform_substitution(x) for x in value]) self.__final_env = env