Source code for rosdep2.installers

# Copyright (c) 2009, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
#     * Redistributions of source code must retain the above copyright
#       notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#     * Neither the name of the Willow Garage, Inc. nor the names of its
#       contributors may be used to endorse or promote products derived from
#       this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

# Author Tully Foote/tfoote@willowgarage.com, Ken Conley/kwc@willowgarage.com

from __future__ import print_function

import os
import subprocess
import traceback

from rospkg.os_detect import OsDetect

from .core import rd_debug, RosdepInternalError, InstallFailed, print_bold, InvalidData

# kwc: InstallerContext is basically just a bunch of dictionaries with
# defined lookup methods.  It really encompasses two facets of a
# rosdep configuration: the pluggable nature of installers and
# platforms, as well as the resolution of the operating system for a
# specific machine.  It is possible to decouple those two notions,
# though there are some touch points over how this interfaces with the
# rospkg.os_detect library, i.e. how platforms can tweak these
# detectors and how the higher-level APIs can override them.


[docs]class InstallerContext(object): """ :class:`InstallerContext` manages the context of execution for rosdep as it relates to the installers, OS detectors, and other extensible APIs. """ def __init__(self, os_detect=None): """ :param os_detect: (optional) :class:`rospkg.os_detect.OsDetect` instance to use for detecting platforms. If `None`, default instance will be used. """ # platform configuration self.installers = {} self.os_installers = {} self.default_os_installer = {} # stores configuration of which value to use for the OS version key (version number or codename) self.os_version_type = {} # OS detection and override if os_detect is None: os_detect = OsDetect() self.os_detect = os_detect self.os_override = None self.verbose = False def set_verbose(self, verbose): self.verbose = verbose
[docs] def set_os_override(self, os_name, os_version): """ Override the OS detector with *os_name* and *os_version*. See :meth:`InstallerContext.detect_os`. :param os_name: OS name value to use, ``str`` :param os_version: OS version value to use, ``str`` """ if self.verbose: print('overriding OS to [%s:%s]' % (os_name, os_version)) self.os_override = os_name, os_version
def get_os_version_type(self, os_name): return self.os_version_type.get(os_name, OsDetect.get_version) def set_os_version_type(self, os_name, version_type): if not hasattr(version_type, '__call__'): raise ValueError('version type should be a method') self.os_version_type[os_name] = version_type
[docs] def get_os_name_and_version(self): """ Get the OS name and version key to use for resolution and installation. This will be the detected OS name/version unless :meth:`InstallerContext.set_os_override()` has been called. :returns: (os_name, os_version), ``(str, str)`` """ if self.os_override: return self.os_override else: os_name = self.os_detect.get_name() os_key = self.get_os_version_type(os_name) os_version = os_key(self.os_detect) return os_name, os_version
[docs] def get_os_detect(self): """ :returns os_detect: :class:`OsDetect` instance used for detecting platforms. """ return self.os_detect
[docs] def set_installer(self, installer_key, installer): """ Set the installer to use for *installer_key*. This will replace any existing installer associated with the key. *installer_key* should be the same key used for the ``rosdep.yaml`` package manager key. If *installer* is ``None``, this will delete any existing associated installer from this context. :param installer_key: key/name to associate with installer, ``str`` :param installer: :class:`Installer` implementation, ``class``. :raises: :exc:`TypeError` if *installer* is not a subclass of :class:`Installer` """ if installer is None: del self.installers[installer_key] return if not isinstance(installer, Installer): raise TypeError('installer must be a instance of Installer') if self.verbose: print('registering installer [%s]' % (installer_key)) self.installers[installer_key] = installer
[docs] def get_installer(self, installer_key): """ :returns: :class:`Installer` class associated with *installer_key*. :raises: :exc:`KeyError` If not associated installer :raises: :exc:`InstallFailed` If installer cannot produce an install command (e.g. if installer is not installed) """ return self.installers[installer_key]
[docs] def get_installer_keys(self): """ :returns: list of registered installer keys """ return self.installers.keys()
[docs] def get_os_keys(self): """ :returns: list of OS keys that have registered with this context, ``[str]`` """ return self.os_installers.keys()
[docs] def add_os_installer_key(self, os_key, installer_key): """ Register an installer for the specified OS. This will fail with a :exc:`KeyError` if no :class:`Installer` can be found with the associated *installer_key*. :param os_key: Key for OS :param installer_key: Key for installer to add to OS :raises: :exc:`KeyError`: if installer for *installer_key* is not set. """ # validate, will throw KeyError self.get_installer(installer_key) if self.verbose: print('add installer [%s] to OS [%s]' % (installer_key, os_key)) if os_key in self.os_installers: self.os_installers[os_key].append(installer_key) else: self.os_installers[os_key] = [installer_key]
[docs] def get_os_installer_keys(self, os_key): """ Get list of installer keys registered for the specified OS. These keys can be resolved by calling :meth:`InstallerContext.get_installer`. :param os_key: Key for OS :raises: :exc:`KeyError`: if no information for OS *os_key* is registered. """ if os_key in self.os_installers: return self.os_installers[os_key][:] else: raise KeyError(os_key)
[docs] def set_default_os_installer_key(self, os_key, installer_key): """ Set the default OS installer to use for OS. :meth:`InstallerContext.add_os_installer` must have previously been called with the same arguments. :param os_key: Key for OS :param installer_key: Key for installer to add to OS :raises: :exc:`KeyError`: if installer for *installer_key* is not set or if OS for *os_key* has no associated installers. """ if os_key not in self.os_installers: raise KeyError('unknown OS: %s' % (os_key)) if not hasattr(installer_key, '__call__'): raise ValueError('version type should be a method') if not installer_key(self.os_detect) in self.os_installers[os_key]: raise KeyError('installer [%s] is not associated with OS [%s]. call add_os_installer_key() first' % (installer_key(self.os_detect), os_key)) if self.verbose: print('set default installer [%s] for OS [%s]' % (installer_key(self.os_detect), os_key,)) self.default_os_installer[os_key] = installer_key
[docs] def get_default_os_installer_key(self, os_key): """ Get the default OS installer key to use for OS, or ``None`` if there is no default. :param os_key: Key for OS :returns: :class:`Installer` :raises: :exc:`KeyError`: if no information for OS *os_key* is registered. """ if os_key not in self.os_installers: raise KeyError('unknown OS: %s' % (os_key)) try: installer_key = self.default_os_installer[os_key](self.os_detect) if installer_key not in self.os_installers[os_key]: raise KeyError('installer [%s] is not associated with OS [%s]. call add_os_installer_key() first' % (installer_key, os_key)) # validate, will throw KeyError self.get_installer(installer_key) return installer_key except KeyError: return None
[docs]class Installer(object): """ The :class:`Installer` API is designed around opaque *resolved* parameters. These parameters can be any type of sequence object, but they must obey set arithmetic. They should also implement ``__str__()`` methods so they can be pretty printed. """
[docs] def is_installed(self, resolved_item): """ :param resolved: resolved installation item. NOTE: this is a single item, not a list of items like the other APIs, ``opaque``. :returns: ``True`` if all of the *resolved* items are installed on the local system """ raise NotImplementedError('is_installed', resolved_item)
[docs] def get_install_command(self, resolved, interactive=True, reinstall=False, quiet=False): """ :param resolved: list of resolved installation items, ``[opaque]`` :param interactive: If `False`, disable interactive prompts, e.g. Pass through ``-y`` or equivalant to package manager. :param reinstall: If `True`, install everything even if already installed """ raise NotImplementedError('get_package_install_command', resolved, interactive, reinstall, quiet)
[docs] def get_depends(self, rosdep_args): """ :returns: list of dependencies on other rosdep keys. Only necessary if the package manager doesn't handle dependencies. """ return [] # Default return empty list
[docs] def resolve(self, rosdep_args_dict): """ :param rosdep_args_dict: argument dictionary to the rosdep rule for this package manager :returns: [resolutions]. resolved objects should be printable to a user, but are otherwise opaque. """ raise NotImplementedError('Base class resolve', rosdep_args_dict)
[docs] def unique(self, *resolved_rules): """ Combine the resolved rules into a unique list. This is meant to combine the results of multiple calls to :meth:`PackageManagerInstaller.resolve`. Example:: resolved1 = installer.resolve(args1) resolved2 = installer.resolve(args2) resolved = installer.unique(resolved1, resolved2) :param resolved_rules: resolved arguments. Resolved arguments must all be from this :class:`Installer` instance. """ raise NotImplementedError('Base class unique', resolved_rules)
[docs]class PackageManagerInstaller(Installer): """ General form of a package manager :class:`Installer` implementation that assumes: - installer rosdep args spec is a list of package names stored with the key "packages" - a detect function exists that can return a list of packages that are installed Also, if *supports_depends* is set to ``True``: - installer rosdep args spec can also include dependency specification with the key "depends" """ def __init__(self, detect_fn, supports_depends=False): """ :param supports_depends: package manager supports dependency key :param detect_fn: function that for a given list of packages determines the list of installed packages. """ self.detect_fn = detect_fn self.supports_depends = supports_depends self.as_root = True self.sudo_command = 'sudo -H' if hasattr(os, 'geteuid') and os.geteuid() != 0 else ''
[docs] def elevate_priv(self, cmd): """ Prepend *self.sudo_command* to the command if *self.as_root* is ``True``. :param list cmd: list of strings comprising the command :returns: a list of commands """ return (self.sudo_command.split() if self.as_root else []) + cmd
[docs] def resolve(self, rosdep_args): """ See :meth:`Installer.resolve()` """ packages = None if type(rosdep_args) == dict: packages = rosdep_args.get('packages', []) if isinstance(packages, str): packages = packages.split() elif isinstance(rosdep_args, str): packages = rosdep_args.split(' ') elif type(rosdep_args) == list: packages = rosdep_args else: raise InvalidData('Invalid rosdep args: %s' % (rosdep_args)) return packages
[docs] def unique(self, *resolved_rules): """ See :meth:`Installer.unique()` """ s = set() for resolved in resolved_rules: s.update(resolved) return sorted(s)
[docs] def get_packages_to_install(self, resolved, reinstall=False): """ Return a list of packages (out of *resolved*) that still need to get installed. """ if reinstall: return resolved if not resolved: return [] else: detected = self.detect_fn(resolved) return [x for x in resolved if x not in detected]
[docs] def is_installed(self, resolved_item): """ Check if a given package was installed. """ return not self.get_packages_to_install([resolved_item])
[docs] def get_version_strings(self): """ Return a list of version information strings. Where each string is of the form "<installer> <version string>". For example, ["apt-get x.y.z"] or ["pip x.y.z", "setuptools x.y.z"]. """ raise NotImplementedError('subclasses must implement get_version_strings method')
[docs] def get_install_command(self, resolved, interactive=True, reinstall=False, quiet=False): raise NotImplementedError('subclasses must implement', resolved, interactive, reinstall, quiet)
[docs] def get_depends(self, rosdep_args): """ :returns: list of dependencies on other rosdep keys. Only necessary if the package manager doesn't handle dependencies. """ if self.supports_depends and type(rosdep_args) == dict: return rosdep_args.get('depends', []) return [] # Default return empty list
def normalize_uninstalled_to_list(uninstalled): uninstalled_dependencies = [] for pkg_or_list in [v for k, v in uninstalled]: if isinstance(pkg_or_list, list): for pkg in pkg_or_list: uninstalled_dependencies.append(str(pkg)) else: uninstalled_dependencies.append(str(pkg)) return uninstalled_dependencies class RosdepInstaller(object): def __init__(self, installer_context, lookup): self.installer_context = installer_context self.lookup = lookup def get_uninstalled(self, resources, implicit=False, verbose=False): """ Get list of system dependencies that have not been installed as well as a list of errors from performing the resolution. This is a bulk API in order to provide performance optimizations in checking install state. :param resources: List of resource names (e.g. ROS package names), ``[str]]`` :param implicit: Install implicit (recursive) dependencies of resources. Default ``False``. :returns: (uninstalled, errors), ``({str: [opaque]}, {str: ResolutionError})``. Uninstalled is a dictionary with the installer_key as the key. :raises: :exc:`RosdepInternalError` """ installer_context = self.installer_context # resolutions have been unique()d if verbose: print('resolving for resources [%s]' % (', '.join(resources))) resolutions, errors = self.lookup.resolve_all(resources, installer_context, implicit=implicit) # for each installer, figure out what is left to install uninstalled = [] if resolutions == []: return uninstalled, errors for installer_key, resolved in resolutions: # py3k if verbose: print('resolution: %s [%s]' % (installer_key, ', '.join([str(r) for r in resolved]))) try: installer = installer_context.get_installer(installer_key) except KeyError as e: # lookup has to be buggy to cause this raise RosdepInternalError(e) try: packages_to_install = installer.get_packages_to_install(resolved) except Exception as e: rd_debug(traceback.format_exc()) raise RosdepInternalError(e, message='Bad installer [%s]: %s' % (installer_key, e)) # only create key if there is something to do if packages_to_install: uninstalled.append((installer_key, packages_to_install)) if verbose: print('uninstalled: [%s]' % (', '.join([str(p) for p in packages_to_install]))) return uninstalled, errors def install(self, uninstalled, interactive=True, simulate=False, continue_on_error=False, reinstall=False, verbose=False, quiet=False): """ Install the uninstalled rosdeps. This API is for the bulk workflow of rosdep (see example below). For a more targeted install API, see :meth:`RosdepInstaller.install_resolved`. :param uninstalled: uninstalled value from :meth:`RosdepInstaller.get_uninstalled`. Value is a dictionary mapping installer key to a dictionary with resolution data, ``{str: {str: vals}}`` :param interactive: If ``False``, suppress interactive prompts (e.g. by passing '-y' to ``apt``). :param simulate: If ``False`` simulate installation without actually executing. :param continue_on_error: If ``True``, continue installation even if an install fails. Otherwise, stop after first installation failure. :param reinstall: If ``True``, install dependencies if even already installed (default ``False``). :raises: :exc:`InstallFailed` if any rosdeps fail to install and *continue_on_error* is ``False``. :raises: :exc:`KeyError` If *uninstalled* value has invalid installer keys Example:: uninstalled, errors = installer.get_uninstalled(packages) installer.install(uninstalled) """ if verbose: print( 'install options: reinstall[%s] simulate[%s] interactive[%s]' % (reinstall, simulate, interactive) ) uninstalled_list = normalize_uninstalled_to_list(uninstalled) print('install: uninstalled keys are %s' % ', '.join(uninstalled_list)) # Squash uninstalled again, in case some dependencies were already installed squashed_uninstalled = [] previous_installer_key = None for installer_key, resolved in uninstalled: if previous_installer_key != installer_key: squashed_uninstalled.append((installer_key, [])) previous_installer_key = installer_key squashed_uninstalled[-1][1].extend(resolved) failures = [] for installer_key, resolved in squashed_uninstalled: try: self.install_resolved(installer_key, resolved, simulate=simulate, interactive=interactive, reinstall=reinstall, continue_on_error=continue_on_error, verbose=verbose, quiet=quiet) except InstallFailed as e: if not continue_on_error: raise else: # accumulate errors failures.extend(e.failures) if failures: raise InstallFailed(failures=failures) def install_resolved(self, installer_key, resolved, simulate=False, interactive=True, reinstall=False, continue_on_error=False, verbose=False, quiet=False): """ Lower-level API for installing a rosdep dependency. The rosdep keys have already been resolved to *installer_key* and *resolved* via :exc:`RosdepLookup` or other means. :param installer_key: Key for installer to apply to *resolved*, ``str`` :param resolved: Opaque resolution list from :class:`RosdepLookup`. :param interactive: If ``True``, allow interactive prompts (default ``True``) :param simulate: If ``True``, don't execute installation commands, just print to screen. :param reinstall: If ``True``, install dependencies if even already installed (default ``False``). :param verbose: If ``True``, print verbose output to screen (default ``False``) :param quiet: If ``True``, supress output except for errors (default ``False``) :raises: :exc:`InstallFailed` if any of *resolved* fail to install. """ installer_context = self.installer_context installer = installer_context.get_installer(installer_key) command = installer.get_install_command(resolved, interactive=interactive, reinstall=reinstall, quiet=quiet) if not command: if verbose: print('#No packages to install') return if simulate: print('#[%s] Installation commands:' % (installer_key)) for sub_command in command: if isinstance(sub_command[0], list): sub_cmd_len = len(sub_command) for i, cmd in enumerate(sub_command): print(" '%s' (alternative %d/%d)" % (' '.join(cmd), i + 1, sub_cmd_len)) else: print(' ' + ' '.join(sub_command)) # nothing left to do for simulation if simulate: return def run_command(command, installer_key, failures, verbose): # always echo commands to screen print_bold('executing command [%s]' % ' '.join(command)) result = subprocess.call(command) if verbose: print('command return code [%s]: %s' % (' '.join(command), result)) if result != 0: failures.append((installer_key, 'command [%s] failed' % (' '.join(command)))) return result # run each install command set and collect errors failures = [] for sub_command in command: if isinstance(sub_command[0], list): # list of alternatives alt_failures = [] for alt_command in sub_command: result = run_command(alt_command, installer_key, alt_failures, verbose) if result == 0: # one successsfull command is sufficient alt_failures = [] # clear failuers from other alternatives break failures.extend(alt_failures) else: result = run_command(sub_command, installer_key, failures, verbose) if result != 0: if not continue_on_error: raise InstallFailed(failures=failures) # test installation of each for r in resolved: if not installer.is_installed(r): failures.append((installer_key, 'Failed to detect successful installation of [%s]' % (r))) # finalize result if failures: raise InstallFailed(failures=failures) elif verbose: print('#successfully installed')