Source code for rospkg.rospack

# Software License Agreement (BSD License)
#
# Copyright (c) 2011, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
#  * Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
#  * Redistributions in binary form must reproduce the above
#    copyright notice, this list of conditions and the following
#    disclaimer in the documentation and/or other materials provided
#    with the distribution.
#  * Neither the name of Willow Garage, Inc. nor the names of its
#    contributors may be used to endorse or promote products derived
#    from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

import os
from threading import Lock

try:
    from xml.etree.cElementTree import ElementTree
except ImportError:
    from xml.etree.ElementTree import ElementTree

from .common import MANIFEST_FILE, PACKAGE_FILE, ResourceNotFound, STACK_FILE
from .environment import get_ros_paths
from .manifest import InvalidManifest, parse_manifest_file
from .stack import InvalidStack, parse_stack_file

_cache_lock = Lock()


def list_by_path(manifest_name, path, cache):
    """
    List ROS stacks or packages within the specified path.

    The cache will be updated with the resource->path
    mappings. list_by_path() does NOT returned cached results
    -- it only updates the cache.

    :param manifest_name: MANIFEST_FILE or STACK_FILE, ``str``
    :param path: path to list resources in, ``str``
    :param cache: path cache to update. Maps resource name to directory path, ``{str: str}``
    :returns: complete list of resources in ROS environment, ``[str]``
    """
    resources = []
    path = os.path.abspath(path)
    basename = os.path.basename
    for d, dirs, files in os.walk(path, topdown=True, followlinks=True):
        if 'CATKIN_IGNORE' in files:
            del dirs[:]
            continue  # leaf
        if PACKAGE_FILE in files:
            # parse package.xml and decide if it matches the search criteria
            root = ElementTree(None, os.path.join(d, PACKAGE_FILE))
            is_metapackage = root.find('./export/metapackage') is not None
            if (
                (manifest_name == STACK_FILE and is_metapackage) or
                (manifest_name == MANIFEST_FILE and not is_metapackage) or
                manifest_name == PACKAGE_FILE
            ):
                resource_name = root.findtext('name').strip(' \n\r\t')
                if resource_name not in resources:
                    resources.append(resource_name)
                    if cache is not None:
                        cache[resource_name] = d
                del dirs[:]
                continue  # leaf
        if manifest_name in files:
            resource_name = basename(d)
            if resource_name not in resources:
                resources.append(resource_name)
                if cache is not None:
                    cache[resource_name] = d
            del dirs[:]
            continue  # leaf
        elif MANIFEST_FILE in files or PACKAGE_FILE in files:
            # noop if manifest_name==MANIFEST_FILE, but a good
            # optimization for stacks.
            del dirs[:]
            continue  # leaf
        elif 'rospack_nosubdirs' in files:
            del dirs[:]
            continue   # leaf
        # remove hidden dirs (esp. .svn/.git)
        [dirs.remove(di) for di in dirs if di[0] == '.']
    return resources


class ManifestManager(object):
    """
    Base class implementation for :class:`RosPack` and
    :class:`RosStack`.  This class indexes resources on paths with
    where manifests denote the precense of the resource.  NOTE: for
    performance reasons, instances cache information and will not
    reflect changes made on disk or to environment configuration.
    """

    def __init__(self, manifest_name, ros_paths=None):
        """
        ctor. subclasses are expected to use *manifest_name*
        to customize behavior of ManifestManager.

        :param manifest_name: MANIFEST_FILE or STACK_FILE
        :param ros_paths: Ordered list of paths to search for
          resources. If `None` (default), use environment ROS path.
        """
        self._manifest_name = manifest_name

        if ros_paths is None:
            self._ros_paths = get_ros_paths()
        else:
            self._ros_paths = ros_paths

        self._manifests = {}
        self._depends_cache = {}
        self._rosdeps_cache = {}
        self._location_cache = None
        self._custom_cache = {}

    @classmethod
    def get_instance(cls, ros_paths=None):
        """
        Reuse an existing instance for the specified ros_paths instead of creating a new one.
        Only works for subclasses, as the ManifestManager itself expects two args for the ctor.

        :param ros_paths: Ordered list of paths to search for
          resources. If `None` (default), use environment ROS path.
        """
        if not hasattr(cls, '_instances'):
            # add class variable _instances to cls
            cls._instances = {}

        # generate instance_key from ros_paths variable
        if ros_paths is None:
            ros_paths = get_ros_paths()
        instance_key = str(tuple(ros_paths))

        if instance_key not in cls._instances:
            # create and cache new instance
            cls._instances[instance_key] = cls(ros_paths)
        return cls._instances[instance_key]

    def get_ros_paths(self):
        return self._ros_paths[:]
    ros_paths = property(get_ros_paths, doc="Get ROS paths of this instance")

    def get_manifest(self, name):
        """
        :raises: :exc:`InvalidManifest`
        """
        if name in self._manifests:
            return self._manifests[name]
        else:
            return self._load_manifest(name)

    def _update_location_cache(self):
        global _cache_lock
        # ensure self._location_cache is not checked while it is being updated
        # (i.e. while it is not None, but also not completely populated)
        with _cache_lock:
            if self._location_cache is not None:
                return
            # initialize cache
            cache = self._location_cache = {}
            # nothing to search, #3680
            if not self._ros_paths:
                return
            # crawl paths using our own logic, in reverse order to get
            # correct precedence
            for path in reversed(self._ros_paths):
                list_by_path(self._manifest_name, path, cache)

    def list(self):
        """
        List resources.

        :returns: complete list of package names in ROS environment, ``[str]``
        """
        self._update_location_cache()
        return list(self._location_cache.keys())

    def get_path(self, name):
        """
        :param name: package name, ``str``
        :returns: filesystem path of package
        :raises: :exc:`ResourceNotFound`
        """
        self._update_location_cache()
        if name not in self._location_cache:
            raise ResourceNotFound(name, ros_paths=self._ros_paths)
        else:
            return self._location_cache[name]

    def _load_manifest(self, name):
        """
        :raises: :exc:`ResourceNotFound`
        """
        retval = self._manifests[name] = parse_manifest_file(self.get_path(name), self._manifest_name, rospack=self)
        return retval

    def get_depends(self, name, implicit=True):
        """
        Get dependencies of a resource.  If implicit is ``True``, this
        includes implicit (recursive) dependencies.

        :param name: resource name, ``str``
        :param implicit: include implicit (recursive) dependencies, ``bool``

        :returns: list of names of dependencies, ``[str]``
        :raises: :exc:`InvalidManifest` If resource or any of its
          dependencies have an invalid manifest.
        """
        if not implicit:
            m = self.get_manifest(name)
            return [d.name for d in m.depends]
        else:
            if name in self._depends_cache:
                return self._depends_cache[name]

            # take the union of all dependencies
            names = [p.name for p in self.get_manifest(name).depends]

            # assign key before recursive call to prevent infinite case
            self._depends_cache[name] = s = set()

            for p in names:
                s.update(self.get_depends(p, implicit))
            # add in our own deps
            s.update(names)
            # cache the return value as a list
            s = list(s)
            self._depends_cache[name] = s
            return s

    def get_depends_on(self, name, implicit=True):
        """
        Get resources that depend on a resource.  If implicit is ``True``, this
        includes implicit (recursive) dependency relationships.

        NOTE: this does *not* raise :exc:`rospkg.InvalidManifest` if
        there are invalid manifests found.

        :param name: resource name, ``str``
        :param implicit: include implicit (recursive) dependencies, ``bool``

        :returns: list of names of dependencies, ``[str]``
        """
        depends_on = []
        if not implicit:
            # have to examine all dependencies
            for r in self.list():
                if r == name:
                    continue
                try:
                    m = self.get_manifest(r)
                    if any(d for d in m.depends if d.name == name):
                        depends_on.append(r)
                except InvalidManifest:
                    # robust to bad packages
                    pass
                except ResourceNotFound:
                    # robust to bad packages
                    pass
        else:
            # Computing implicit dependencies requires examining the
            # dependencies of all packages.  As we already implement
            # this logic in get_depends(), we simply reuse it here for
            # the reverse calculation.  This enables us to use the
            # same dependency cache that get_depends() uses.  The
            # efficiency is roughly the same due to the caching.
            for r in self.list():
                if r == name:
                    continue
                try:
                    depends = self.get_depends(r, implicit=True)
                    if name in depends:
                        depends_on.append(r)
                except InvalidManifest:
                    # robust to bad packages
                    pass
                except ResourceNotFound:
                    # robust to bad packages
                    pass
        return depends_on

    def get_custom_cache(self, key, default=None):
        return self._custom_cache.get(key, default)

    def set_custom_cache(self, key, value):
        self._custom_cache[key] = value


[docs]class RosPack(ManifestManager): """ Utility class for querying properties about ROS packages. This should be used when querying properties about multiple packages. NOTE 1: for performance reasons, RosPack caches information about packages. NOTE 2: RosPack is not thread-safe. Example:: from rospkg import RosPack rp = RosPack() packages = rp.list() path = rp.get_path('rospy') depends = rp.get_depends('roscpp') direct_depends = rp.get_depends('roscpp', implicit=False) """ def __init__(self, ros_paths=None): """ :param ros_paths: Ordered list of paths to search for resources. If `None` (default), use environment ROS path. """ super(RosPack, self).__init__(MANIFEST_FILE, ros_paths) self._rosdeps_cache = {}
[docs] def get_rosdeps(self, package, implicit=True): """ Collect rosdeps of specified package into a dictionary. :param package: package name, ``str`` :param implicit: include implicit (recursive) rosdeps, ``bool`` :returns: list of rosdep names, ``[str]`` """ if implicit: return self._implicit_rosdeps(package) else: m = self.get_manifest(package) return [d.name for d in m.rosdeps]
def _implicit_rosdeps(self, package): """ Compute recursive rosdeps of a single package and cache the result in self._rosdeps_cache. :param package: package name, ``str`` :returns: list of rosdeps, ``[str]`` """ if package in self._rosdeps_cache: return self._rosdeps_cache[package] # set the key before recursive call to prevent infinite case self._rosdeps_cache[package] = s = set() # take the union of all dependencies packages = self.get_depends(package, implicit=True) for p in packages: s.update(self.get_rosdeps(p, implicit=False)) # add in our own deps m = self.get_manifest(package) s.update([d.name for d in m.rosdeps]) # cache the return value as a list s = list(s) self._rosdeps_cache[package] = s return s
[docs] def stack_of(self, package): """ :param package: package name, ``str`` :returns: name of stack that package is in, or None if package is not part of a stack, ``str`` :raises: :exc:`ResourceNotFound` If package cannot be located """ d = self.get_path(package) while d and os.path.dirname(d) != d: stack_file = os.path.join(d, STACK_FILE) if os.path.exists(stack_file): return os.path.basename(d) else: d = os.path.dirname(d)
[docs]class RosStack(ManifestManager): """ Utility class for querying properties about ROS stacks. This should be used when querying properties about multiple stacks. NOTE 1: for performance reasons, RosStack caches information about stacks. NOTE 2: RosStack is not thread-safe. """ def __init__(self, ros_paths=None): """ :param ros_paths: Ordered list of paths to search for resources. If `None` (default), use environment ROS path. """ super(RosStack, self).__init__(STACK_FILE, ros_paths)
[docs] def packages_of(self, stack): """ :returns: name of packages that are part of stack, ``[str]`` :raises: :exc:`ResourceNotFound` If stack cannot be located """ return list_by_path(MANIFEST_FILE, self.get_path(stack), {})
[docs] def get_stack_version(self, stack): """ :param env: override environment variables, ``{str: str}`` :returns: version number of stack, or None if stack is unversioned, ``str`` """ return get_stack_version_by_dir(self.get_path(stack))
# #2022
[docs]def expand_to_packages(names, rospack, rosstack): """ Expand names into a list of packages. Names can either be of packages or stacks. :param names: names of stacks or packages, ``[str]`` :returns: ([packages], [not_found]). Returns two lists. The first is of packages names. The second is a list of names for which no matching stack or package was found. Lists may have duplicates. ``([str], [str])`` """ if type(names) not in (tuple, list): raise ValueError("names must be a list of strings") # do full package list first. This forces an entire tree # crawl. This is less efficient for a small list of names, but # much more efficient for many names. package_list = rospack.list() valid = [] invalid = [] for n in names: if n not in package_list: try: valid.extend(rosstack.packages_of(n)) except ResourceNotFound: invalid.append(n) else: valid.append(n) return valid, invalid
[docs]def get_stack_version_by_dir(stack_dir): """ Get stack version where stack_dir points to root directory of stack. :param env: override environment variables, ``{str: str}`` :returns: version number of stack, or None if stack is unversioned, ``str`` :raises: :exc:`IOError` :raises: :exc:`InvalidStack` """ catkin_stack_filename = os.path.join(stack_dir, 'stack.xml') if os.path.isfile(catkin_stack_filename): try: stack = parse_stack_file(catkin_stack_filename) return stack.version except InvalidStack: pass cmake_filename = os.path.join(stack_dir, 'CMakeLists.txt') if os.path.isfile(cmake_filename): with open(cmake_filename) as f: try: return _get_cmake_version(f.read()) except ValueError: return None else: return None
def _get_cmake_version(text): """ :raises :exc:`ValueError` If version number in CMakeLists.txt cannot be parsed correctly """ import re for l in text.split('\n'): if l.strip().startswith('rosbuild_make_distribution'): x_re = re.compile(r'[()]') lsplit = x_re.split(l.strip()) if len(lsplit) < 2: raise ValueError("couldn't find version number in CMakeLists.txt:\n\n%s" % l) version = lsplit[1] if version: return version else: raise ValueError("cannot parse version number in CMakeLists.txt:\n\n%s" % l)
[docs]def get_package_name(path): """ Get the name of the ROS package that contains *path*. This is determined by finding the nearest parent ``manifest.xml`` file. This routine may not traverse package setups that rely on internal symlinks within the package itself. :param path: filesystem path :return: Package name or ``None`` if package cannot be found, ``str`` """ # NOTE: the realpath is going to create issues with symlinks, most # likely. parent = os.path.dirname(os.path.realpath(path)) # walk up until we hit ros root or ros/pkg while not os.path.exists(os.path.join(path, MANIFEST_FILE)) and not os.path.exists(os.path.join(path, PACKAGE_FILE)) and parent != path: path = parent parent = os.path.dirname(path) # check termination condition if os.path.exists(os.path.join(path, MANIFEST_FILE)): return os.path.basename(os.path.abspath(path)) elif os.path.exists(os.path.join(path, PACKAGE_FILE)): root = ElementTree(None, os.path.join(path, PACKAGE_FILE)) return root.findtext('name') else: return None