Source code for catkin_pkg.topological_order

# Software License Agreement (BSD License)
#
# Copyright (c) 2012, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
#  * Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
#  * Redistributions in binary form must reproduce the above
#    copyright notice, this list of conditions and the following
#    disclaimer in the documentation and/or other materials provided
#    with the distribution.
#  * Neither the name of Willow Garage, Inc. nor the names of its
#    contributors may be used to endorse or promote products derived
#    from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

from __future__ import print_function

import copy
import sys

from .packages import find_packages
from .workspaces import get_spaces


class _PackageDecorator(object):

    def __init__(self, package, path):
        self.package = package
        self.path = path
        self.is_metapackage = 'metapackage' in [e.tagname for e in self.package.exports]
        message_generators = [e.content for e in self.package.exports if e.tagname == 'message_generator']
        self.message_generator = message_generators[0] if message_generators else None
        # full includes direct build depends and recursive run_depends of these build_depends
        self.depends_for_topological_order = None

    def __getattr__(self, name):
        if name.startswith('__'):
            raise AttributeError(name)
        return getattr(self.package, name)

    def calculate_depends_for_topological_order(self, packages):
        """
        Sets self.depends_for_topological_order to the recursive
        dependencies required for topological order. It contains all
        direct build- and buildtool dependencies and their recursive
        runtime dependencies. The set only contains packages which
        are in the passed packages dictionary.

        :param packages: dict of name to ``_PackageDecorator``
        """
        self.depends_for_topological_order = set([])
        # skip external dependencies, meaning names that are not known packages
        for name in [d.name for d in (self.package.build_depends + self.package.buildtool_depends + self.package.test_depends) if d.name in packages.keys()]:
            if not self.is_metapackage and packages[name].is_metapackage:
                print('WARNING: package "%s" should not depend on metapackage "%s" but on its packages instead' % (self.name, name), file=sys.stderr)
            packages[name]._add_recursive_run_depends(packages, self.depends_for_topological_order)

    def _add_recursive_run_depends(self, packages, depends_for_topological_order):
        """
        Modifies depends_for_topological_order argument by adding
        run_depends of self recursively. Only packages which are in
        the passed packages are added and recursed into.

        :param packages: dict of name to ``_PackageDecorator``
        :param depends_for_topological_order: set to be extended
        """
        depends_for_topological_order.add(self.package.name)
        package_names = packages.keys()
        for name in [d.name for d in self.package.run_depends if d.name in package_names and d.name not in depends_for_topological_order]:
            packages[name]._add_recursive_run_depends(packages, depends_for_topological_order)


[docs]def topological_order(root_dir, whitelisted=None, blacklisted=None, underlay_workspaces=None): ''' Crawls the filesystem to find packages and uses their dependencies to return a topologically order list. :param root_dir: The path to search in, ``str`` :param whitelisted: A list of whitelisted package names, ``list`` :param blacklisted: A list of blacklisted package names, ``list`` :param underlay_workspaces: A list of underlay workspaces of packages which might provide dependencies in case of partial workspaces, ``list`` :returns: A list of tuples containing the relative path and a ``Package`` object, ``list`` ''' packages = find_packages(root_dir) # find packages in underlayed workspaces underlay_packages = {} if underlay_workspaces: for workspace in reversed(underlay_workspaces): # since underlay workspace might be a devel space # consider spaces stored in the .catkin file spaces = get_spaces([workspace]) for space in spaces: for path, package in find_packages(space).items(): underlay_packages[package.name] = (path, package) return topological_order_packages(packages, whitelisted=whitelisted, blacklisted=blacklisted, underlay_packages=dict(underlay_packages.values()))
[docs]def topological_order_packages(packages, whitelisted=None, blacklisted=None, underlay_packages=None): ''' Topologically orders packages. First returning packages which have message generators and then the rest based on direct build-/buildtool_depends and indirect recursive run_depends. :param packages: A dict mapping relative paths to ``Package`` objects ``dict`` :param whitelisted: A list of whitelisted package names, ``list`` :param blacklisted: A list of blacklisted package names, ``list`` :param underlay_packages: A dict mapping relative paths to ``Package`` objects ``dict`` :returns: A list of tuples containing the relative path and a ``Package`` object, ``list`` ''' decorators_by_name = {} for path, package in packages.items(): # skip non-whitelisted packages if whitelisted and package.name not in whitelisted: continue # skip blacklisted packages if blacklisted and package.name in blacklisted: continue if package.name in decorators_by_name: path_with_same_name = decorators_by_name[package.name].path raise RuntimeError('Two packages with the same name "%s" in the workspace:\n- %s\n- %s' % (package.name, path_with_same_name, path)) decorators_by_name[package.name] = _PackageDecorator(package, path) underlay_decorators_by_name = {} if underlay_packages: for path, package in underlay_packages.items(): # skip overlayed packages if package.name in decorators_by_name: continue underlay_decorators_by_name[package.name] = _PackageDecorator(package, path) decorators_by_name.update(underlay_decorators_by_name) # calculate transitive dependencies for decorator in decorators_by_name.values(): decorator.calculate_depends_for_topological_order(decorators_by_name) tuples = _sort_decorated_packages(decorators_by_name) # remove underlay packages from result return [(path, package) for path, package in tuples if path is None or package.name not in underlay_decorators_by_name]
def _reduce_cycle_set(packages_orig): ''' This function iteratively removes some packages from a set that are definitely not part of any cycle. When there is a cycle in the package dependencies, _sort_decorated_packages only knows the set of packages containing the cycle. :param packages: A dict mapping package name to ``_PackageDecorator`` objects ``dict`` :returns: A list of package names from the input which could not easily be detected as not being part of a cycle. ''' assert(packages_orig) packages = copy.copy(packages_orig) last_depended = None while len(packages) > 0: depended = set([]) for name, decorator in packages.items(): if decorator.depends_for_topological_order: depended = depended.union(decorator.depends_for_topological_order) for name in list(packages.keys()): if not name in depended: del packages[name] if last_depended: if last_depended == depended: return packages.keys() last_depended = depended def _sort_decorated_packages(packages_orig): ''' Sorts packages according to dependency ordering, first considering the message generators and their recursive dependencies and then the rest of the packages. When a circle is detected, a tuple with None and a string giving a superset of the guilty packages. :param packages: A dict mapping package name to ``_PackageDecorator`` objects ``dict`` :returns: A List of tuples containing the relative path and a ``Package`` object ``list`` ''' packages = copy.deepcopy(packages_orig) # mark all packages which are (recursively) dependent on by message generators dependency_names_to_follow = set([name for name, decorator in packages.items() if decorator.message_generator]) not_marked_package_names = set(packages.keys()) - dependency_names_to_follow while dependency_names_to_follow: pkg_name = dependency_names_to_follow.pop() for name in packages[pkg_name].depends_for_topological_order: if name in not_marked_package_names: # mark package packages[name].message_generator = True not_marked_package_names.remove(name) # queue for recursion dependency_names_to_follow.add(name) ordered_packages = [] while len(packages) > 0: # find all packages without build dependencies message_generators = [] non_message_generators = [] for name, decorator in packages.items(): if not decorator.depends_for_topological_order: if decorator.message_generator: message_generators.append(name) else: non_message_generators.append(name) # first choose message generators if message_generators: names = message_generators elif non_message_generators: names = non_message_generators else: # in case of a circular dependency pass a string with # the names list of remaining package names, with path # None to indicate cycle ordered_packages.append([None, ', '.join(sorted(_reduce_cycle_set(packages)))]) break # alphabetic order only for convenience names.sort() # add first candidates to ordered list # do not add all candidates since removing the depends from the first might affect the next candidates name = names[0] ordered_packages.append([packages[name].path, packages[name].package]) # remove package from further processing del packages[name] for package in packages.values(): if name in package.depends_for_topological_order: package.depends_for_topological_order.remove(name) return ordered_packages