Source code for rosdistro.dependency_walker

# Software License Agreement (BSD License)
#
# Copyright (c) 2013, Open Source Robotics Foundation, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
#  * Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
#  * Redistributions in binary form must reproduce the above
#    copyright notice, this list of conditions and the following
#    disclaimer in the documentation and/or other materials provided
#    with the distribution.
#  * Neither the name of Open Source Robotics Foundation, Inc. nor
#    the names of its contributors may be used to endorse or promote
#    products derived from this software without specific prior
#    written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

from catkin_pkg.package import InvalidPackage, parse_package_string


[docs]class DependencyWalker(object): def __init__(self, distribution_instance, evaluate_condition_context=None): self._distribution_instance = distribution_instance self._packages = {} self._evaluate_condition_context = evaluate_condition_context def _get_package(self, pkg_name): if pkg_name not in self._packages: repo = self._distribution_instance.repositories[self._distribution_instance.release_packages[pkg_name].repository_name].release_repository assert repo is not None and repo.version is not None, "Package '%s' in repository '%s' has no version set" % (pkg_name, repo.name) assert 'release' in repo.tags, "Package '%s' in repository '%s' has no 'release' tag set" % (pkg_name, repo.name) pkg_xml = self._distribution_instance.get_release_package_xml(pkg_name) try: pkg = parse_package_string(pkg_xml) except InvalidPackage as e: raise InvalidPackage(pkg_name + ': %s' % str(e)) else: if self._evaluate_condition_context is not None: pkg.evaluate_conditions(self._evaluate_condition_context) self._packages[pkg_name] = pkg return self._packages[pkg_name] def _get_package_names(self): return self._distribution_instance.release_packages.keys()
[docs] def get_depends(self, pkg_name, depend_type, ros_packages_only=False): '''Return a set of package names which the package depends on.''' deps = self._get_dependencies(pkg_name, depend_type) if ros_packages_only: deps &= set(self._get_package_names()) return deps
[docs] def get_recursive_depends(self, pkg_name, depend_types, ros_packages_only=False, ignore_pkgs=None, limit_depth=None): '''Return a set of package names which the package (transitively) depends on.''' ignore_pkgs = set(ignore_pkgs or []) depends = set([]) # mapping from the scheduled pkg names to their dependency level pkgs_to_check = {pkg_name: 0} while pkgs_to_check: next_pkg_to_check = sorted(pkgs_to_check.keys())[0] current_level = pkgs_to_check.pop(next_pkg_to_check) if next_pkg_to_check in ignore_pkgs or (limit_depth is not None and current_level >= limit_depth): continue for depend_type in depend_types: deps = self.get_depends(next_pkg_to_check, depend_type, ros_packages_only=ros_packages_only) deps -= ignore_pkgs new_deps = deps - depends for new_dep in new_deps: if new_dep not in pkgs_to_check: pkgs_to_check[new_dep] = current_level + 1 else: pkgs_to_check[new_dep] = min(pkgs_to_check[new_dep], current_level + 1) depends |= new_deps return depends
[docs] def get_depends_on(self, pkg_name, depend_type, ignore_pkgs=None): '''Return a set of package names which depend on the package.''' ignore_pkgs = ignore_pkgs or [] depends_on = set([]) for name in self._get_package_names(): if name in ignore_pkgs: continue repo = self._get_package_repo(name) if repo is None or repo.version is None: continue deps = self._get_dependencies(name, depend_type) if pkg_name in deps: depends_on.add(name) return depends_on
[docs] def get_recursive_depends_on(self, pkg_name, depend_types, ignore_pkgs=None): '''Return a set of package names which (transitively) depend on the package.''' ignore_pkgs = ignore_pkgs or [] depends_on = set([]) pkgs_to_check = set([pkg_name]) while pkgs_to_check: next_pkg_to_check = pkgs_to_check.pop() for depend_type in depend_types: deps = self.get_depends_on(next_pkg_to_check, depend_type, ignore_pkgs=ignore_pkgs) new_deps = deps - depends_on pkgs_to_check |= new_deps depends_on |= new_deps return depends_on
def _get_dependencies(self, pkg_name, dep_type): pkg = self._get_package(pkg_name) deps = { 'build': pkg.build_depends, 'buildtool': pkg.buildtool_depends, 'build_export': pkg.build_export_depends, 'buildtool_export': pkg.buildtool_export_depends, 'exec': pkg.exec_depends, 'run': pkg.run_depends, 'test': pkg.test_depends, 'doc': pkg.doc_depends, } return set([d.name for d in deps[dep_type] if d.evaluated_condition is not False]) def _get_package_repo(self, name): return self._distribution_instance.repositories[self._distribution_instance.release_packages[name].repository_name].release_repository
[docs]class SourceDependencyWalker(DependencyWalker): def _get_package(self, pkg_name): if pkg_name not in self._packages: repo = self._distribution_instance.repositories[self._distribution_instance.source_packages[pkg_name].repository_name].source_repository assert repo is not None, "Package '%s' in repository '%s' is missing a source entry." % (pkg_name, repo.name) pkg_xml = self._distribution_instance.get_source_package_xml(pkg_name) try: pkg = parse_package_string(pkg_xml) except InvalidPackage as e: raise InvalidPackage(pkg_name + ': %s' % str(e)) self._packages[pkg_name] = pkg return self._packages[pkg_name] def _get_package_names(self): return self._distribution_instance.source_packages.keys() def _get_package_repo(self, name): return self._distribution_instance.repositories[self._distribution_instance.source_packages[name].repository_name].source_repository