Source code for capabilities.server

# Software License Agreement (BSD License)
#
# Copyright (c) 2013, Open Source Robotics Foundation, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
#  * Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
#  * Redistributions in binary form must reproduce the above
#    copyright notice, this list of conditions and the following
#    disclaimer in the documentation and/or other materials provided
#    with the distribution.
#  * Neither the name of Open Source Robotics Foundation, Inc. nor
#    the names of its contributors may be used to endorse or promote
#    products derived from this software without specific prior
#    written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

# Author: Tully Foote <tfoote@osrfoundation.org>
# Author: William Woodall <william@osrfoundation.org>

"""
This module implements the Capability server.

The Capability server provides access to queries and services related
to capabilities.
"""

from __future__ import print_function

import argparse
import logging
import os
import sys
import threading
import traceback
import uuid

import rospy

from bondpy.bondpy import Bond

from std_srvs.srv import Empty
from std_srvs.srv import EmptyResponse

from capabilities.srv import EstablishBond
from capabilities.srv import EstablishBondResponse
from capabilities.srv import GetCapabilitySpec
from capabilities.srv import GetCapabilitySpecResponse
from capabilities.srv import FreeCapability
from capabilities.srv import FreeCapabilityResponse
from capabilities.srv import GetCapabilitySpecs
from capabilities.srv import GetCapabilitySpecsResponse
from capabilities.srv import GetInterfaces
from capabilities.srv import GetInterfacesResponse
from capabilities.srv import GetNodeletManagerName
from capabilities.srv import GetNodeletManagerNameResponse
from capabilities.srv import GetProviders
from capabilities.srv import GetProvidersResponse
from capabilities.srv import GetSemanticInterfaces
from capabilities.srv import GetSemanticInterfacesResponse
from capabilities.srv import GetRemappings
from capabilities.srv import GetRemappingsResponse
from capabilities.srv import GetRunningCapabilities
from capabilities.srv import GetRunningCapabilitiesResponse
from capabilities.srv import StartCapability
from capabilities.srv import StartCapabilityResponse
from capabilities.srv import StopCapability
from capabilities.srv import StopCapabilityResponse
from capabilities.srv import UseCapability
from capabilities.srv import UseCapabilityResponse

from capabilities.discovery import package_index_from_package_path
from capabilities.discovery import spec_file_index_from_package_index
from capabilities.discovery import spec_index_from_spec_file_index

from capabilities.launch_manager import _special_nodelet_manager_capability
from capabilities.launch_manager import LaunchManager

from capabilities.msg import Capability
from capabilities.msg import CapabilityEvent
from capabilities.msg import CapabilitySpec
from capabilities.msg import Remapping
from capabilities.msg import RunningCapability

from capabilities.specs.interface import capability_interface_from_string
from capabilities.specs.semantic_interface import semantic_capability_interface_from_string

USER_SERVICE_REASON = 'user service call'

## Hack to squelch output from Service call failure ##

from rospy.impl import tcpros_service


def custom__handle_request(self, transport, request):  # pragma: no cover
    import struct
    from rospy.impl.tcpros_service import convert_return_to_response
    from rospy.service import ServiceException
    try:
        # convert return type to response Message instance
        response = convert_return_to_response(self.handler(request), self.response_class)
        self.seq += 1
        # ok byte
        transport.write_buff.write(struct.pack('<B', 1))
        transport.send_message(response, self.seq)
    except ServiceException as e:
        rospy.core.rospydebug("handler raised ServiceException: %s" % e)
        self._write_service_error(transport, "service cannot process request: %s" % e)
    except Exception as e:
        # rospy.logerr("Error processing request: %s\n%s" % (e, traceback.print_exc()))
        self._write_service_error(transport, "error processing request: %s" % e)

tcpros_service.ServiceImpl._handle_request = custom__handle_request

## End hacks ##


[docs]class CapabilityInstance(object): """Encapsulates the state of an instance of a Capability Provider This class encapsulates the state of the capability instance and provides methods for changing the states of the instance. """ def __init__(self, provider, provider_path, started_by='unknown'): self.__state = 'waiting' self.name = provider.name self.provider = provider self.provider_path = provider_path self.interface = provider.implements self.pid = None self.depends_on = [x for x in provider.dependencies] self.canceled = False self.started_by = started_by self.bonds = {} # {bond_id: reference_count} @property def reference_count(self): return sum(list(self.bonds.values())) @property def state(self): """Get the current state of the CapabilityInstance""" return self.__state
[docs] def launch(self): """Change to the 'launching' state Fails to transition if the current state is not 'waiting'. :returns: True if transition is successful, False otherwise :rtype: bool """ if self.state != 'waiting': rospy.logerr( "Capability Provider '{0}' ".format(self.name) + "cannot transition to 'launching' from anything but " + "'waiting', current state is '{0}'".format(self.state)) return False self.__state = 'launching' return True
[docs] def cancel(self): """Cancels the instance, which can only be done while it is still 'waiting' Fails to cancel if the current state is not 'waiting'. "Canceling" is achieved by setting the canceled member variable to True. :returns: True if canceling is successful, False otherwise :rtype: bool """ if self.state != 'waiting': rospy.logerr( "Capability Instance '{0}' ".format(self.name) + "cannot be canceled from anything but " + "'waiting', current state is '{0}'".format(self.state)) return False self.canceled = True return True
[docs] def launched(self, pid): """Called once the instance is "launched", changes state to 'running' Fails to transition if the current state is not 'launching'. If successful, the state changes to 'running'. :param pid: process ID of the instance being tracked :type pid: int :returns: True if transition is successful, False otherwise :rtype: bool """ self.pid = pid if self.state != 'launching': rospy.logerr( "Capability Instance '{0}' ".format(self.name) + "cannot transition to 'running' from anything but " + "'launching', current state is '{0}'".format(self.state)) return False self.__state = 'running' return True
[docs] def stopped(self): """Change to the 'stopping' state Fails to transition if the current state is not either 'running' or 'launching'. :returns: True if transition is successful, False otherwise :rtype: bool """ if self.state not in ['running', 'launching']: rospy.logerr( "Capability Instance '{0}' ".format(self.name) + "cannot transition to 'stopping' from anything but " + "'launching' or 'running', " + "current state is '{0}'".format(self.state)) return False self.__state = 'stopping' return True
[docs] def terminated(self): """Called when the instance has terminated, transitions to the 'terminated' state Fails to transition if the current state is not 'stopping'. :returns: True if transition is successful, False otherwise :rtype: bool """ result = True if self.state != 'stopping': rospy.logerr( "Capability Instance '{0}' ".format(self.name) + "terminated unexpectedly, it was previously in the " + "'{0}' state.".format(self.state)) result = False self.__state = 'terminated' return result
[docs]def get_reverse_depends(name, capability_instances): """Gets the reverse dependencies of a given Capability :param name: Name of the Capability which the instances might depend on :type name: str :param capability_instances: list of instances to search for having a dependency on the given Capability :type capability_instances: :py:obj:`list` of :py:class:`CapabilityInstance` :returns: A list of :py:class:`CapabilityInstance`'s which depend on the given Capability name :rtype: :py:obj:`list` of :py:class:`CapabilityInstance` """ rdepends = [] for instance in capability_instances: if name in instance.depends_on: rdepends.append(instance) return rdepends
[docs]class CapabilityServer(object): """A class to expose the :py:class:`discovery.SpecIndex` over a ROS API """ def __init__(self, package_paths, screen=None): self.__package_paths = package_paths self.__spec_index = None self.__graph_lock = threading.Lock() self.__capability_instances = {} self.__launch_manager = LaunchManager( screen=bool(rospy.get_param('~use_screen', screen)), nodelet_manager_name=rospy.get_param('~nodelet_manager_name', None) ) self.__debug = False self.__package_whitelist = None self.__package_blacklist = None self.__whitelist = None self.__blacklist = None self.__default_providers = {} self.__missing_default_provider_is_an_error = rospy.get_param('~missing_default_provider_is_an_error', False) self.__bonds = {}
[docs] def spin(self): """Starts the capability server by setting up ROS comms, then spins""" self.__package_whitelist = rospy.get_param('~package_whitelist', None) if not isinstance(self.__package_whitelist, (list, tuple, type(None))): msg = "~package_whitelist must be a list or null, got a '{0}'".format(type(self.__whitelist)) rospy.logerr(msg) self.__package_whitelist = None self.__package_blacklist = rospy.get_param('~package_blacklist', None) if not isinstance(self.__package_blacklist, (list, tuple, type(None))): msg = "~package_blacklist must be a list or null, got a '{0}'".format(type(self.__whitelist)) rospy.logerr(msg) self.__package_blacklist = None self.__whitelist = rospy.get_param('~whitelist', None) if not isinstance(self.__whitelist, (list, tuple, type(None))): msg = "~whitelist must be a list or null, got a '{0}'".format(type(self.__whitelist)) rospy.logerr(msg) self.__whitelist = None self.__blacklist = rospy.get_param('~blacklist', None) if not isinstance(self.__blacklist, (list, tuple, type(None))): msg = "~blacklist must be a list or null, got a '{0}'".format(type(self.__blacklist)) rospy.logerr(msg) self.__blacklist = None self.__debug = rospy.get_param('~debug', False) if self.__debug: logger = logging.getLogger('rosout') logger.setLevel(logging.DEBUG) rospy.logdebug('Debug messages enabled.') self.__load_capabilities() self.__bond_topic = rospy.get_name() + "/bonds" # Collect default arguments self.__populate_default_providers() rospy.Subscriber( '~events', CapabilityEvent, self.handle_capability_events) self.__start_capability_service = rospy.Service( '~start_capability', StartCapability, self.handle_start_capability) self.__stop_capability_service = rospy.Service( '~stop_capability', StopCapability, self.handle_stop_capability) self.__establish_bond_service = rospy.Service( '~establish_bond', EstablishBond, self.handle_establish_bond) self.__free_capability_service = rospy.Service( '~free_capability', FreeCapability, self.handle_free_capability) self.__use_capability_service = rospy.Service( '~use_capability', UseCapability, self.handle_use_capability) self.__reload_service = rospy.Service( '~reload_capabilities', Empty, self.handle_reload_request) self.__interfaces_service = rospy.Service( '~get_interfaces', GetInterfaces, self.handle_get_interfaces) self.__providers_service = rospy.Service( '~get_providers', GetProviders, self.handle_get_providers) self.__semantic_interfaces_service = rospy.Service( '~get_semantic_interfaces', GetSemanticInterfaces, self.handle_get_semantic_interfaces) self.__running_capabilities = rospy.Service( '~get_running_capabilities', GetRunningCapabilities, self.handle_get_running_capabilities) self.__capability_specs = rospy.Service( '~get_capability_specs', GetCapabilitySpecs, self.handle_get_capability_specs) self.__capability_spec = rospy.Service( '~get_capability_spec', GetCapabilitySpec, self.handle_get_capability_spec) self.__get_nodelet_manager_name_service = rospy.Service( '~get_nodelet_manager_name', GetNodeletManagerName, self.handle_get_nodelet_manager_name) self.__get_remappings_service = rospy.Service( '~get_remappings', GetRemappings, self.handle_get_remappings) rospy.loginfo("Capability Server Ready") rospy.Publisher("~events", CapabilityEvent, queue_size=1000).publish( CapabilityEvent(type=CapabilityEvent.SERVER_READY)) rospy.spin()
[docs] def shutdown(self): """Stops the capability server and cleans up any running processes""" for instance in self.__capability_instances.values(): # pragma: no cover if instance.state in ['running', 'launching']: instance.stopped() if instance.state == 'waiting': instance.cancel() self.__launch_manager.stop()
def __load_capabilities(self): package_index = package_index_from_package_path(self.__package_paths) self.spec_file_index = spec_file_index_from_package_index(package_index) # Prune packages by black and white list for package in self.spec_file_index.keys(): if self.__package_whitelist and package not in self.__package_whitelist: rospy.loginfo("Package '{0}' not in whitelist, skipping.".format(package)) del self.spec_file_index[package] elif self.__package_blacklist and package in self.__package_blacklist: rospy.loginfo("Package '{0}' in blacklist, skipping.".format(package)) del self.spec_file_index[package] # Generate spec_index from spec file index spec_index, errors = spec_index_from_spec_file_index(self.spec_file_index) if errors: rospy.logerr("Errors were encountered loading capabilities:") for error in errors: rospy.logerr(" " + str(error.__class__.__name__) + ": " + str(error)) # Prune specific capabilities based on black and white lists removed_interfaces = [] for specs, remove_func in [ (spec_index.interfaces, spec_index.remove_interface), (spec_index.semantic_interfaces, spec_index.remove_semantic_interface), (spec_index.providers, spec_index.remove_provider) ]: for spec in specs.keys(): if self.__whitelist and spec not in self.__whitelist: removed_interfaces.append(spec) remove_func(spec) rospy.loginfo("Spec '{0}' is not in the whitelist, skipping.".format(spec)) elif self.__blacklist and spec in self.__blacklist: removed_interfaces.append(spec) remove_func(spec) rospy.loginfo("Spec '{0}' is in the blacklist, skipping.".format(spec)) # Remove providers which no longer have an interface for interface in removed_interfaces: for provider in spec_index.providers.values(): if provider.implements == interface: spec_index.remove_provider(provider.name) self.__spec_index = spec_index # Prune spec_file_index spec_paths = spec_index.interface_paths.values() + \ spec_index.semantic_interface_paths.values() + \ spec_index.provider_paths.values() for package_name, package_dict in self.spec_file_index.items(): for spec_type in ['capability_interface', 'semantic_capability_interface', 'capability_provider']: package_dict[spec_type][:] = [path for path in package_dict[spec_type] if path in spec_paths] def __populate_default_providers(self): # Collect available interfaces interfaces = self.__spec_index.interface_names + self.__spec_index.semantic_interface_names for interface in interfaces: # Collect the providers for each interface providers = [n for n, p in self.__spec_index.providers.items() if p.implements == interface] if not providers: # If an interface has not providers, ignore it rospy.logwarn("No providers for capability interface '{0}', not checking for default provider." .format(interface)) continue try: # Try to get the default provider from the corresponding ros parameter self.__default_providers[interface] = rospy.get_param('~defaults/' + interface) except KeyError: # No ros parameter set for this capability interface rospy.logwarn("No default provider given for capability interface '{0}'. ".format(interface)) if len(providers) == 1: # If there is only one provider, allow it to be the default rospy.logwarn("'{0}' has only one provider, '{1}', using that as the default." .format(interface, providers[0])) self.__default_providers[interface] = providers[0] else: # Otherwise we can't decide if self.__missing_default_provider_is_an_error: rospy.logerr("Could not determine a default provider for capability interface '{0}', aborting." .format(interface)) sys.exit(-1) else: rospy.logwarn("Could not determine a default provider for capability interface '{0}'." .format(interface)) continue # Make sure the given default provider exists if self.__default_providers[interface] not in self.__spec_index.provider_names: rospy.logerr("Given default provider '{0}' for interface '{1}' does not exist." .format(self.__default_providers[interface], interface)) sys.exit(-1) # Make sure the given provider implements this interface if self.__default_providers[interface] not in providers: rospy.logerr("Given default provider '{0}' does not implment interface '{1}'." .format(self.__default_providers[interface], interface)) sys.exit(-1) # Update the interface object with the default provider iface = self.__spec_index.interfaces.get( interface, self.__spec_index.semantic_interfaces.get(interface, None)) iface.default_provider = self.__default_providers[interface] # Summarize defaults if self.__default_providers: rospy.loginfo("For each available interface, the default provider:") for interface, provider in self.__default_providers.items(): rospy.loginfo("'{0}'".format(interface)) rospy.loginfo(" => '{0}'".format(provider)) rospy.loginfo("") else: # pragma: no cover rospy.logwarn("No runnable Capabilities loaded.") def __catch_and_log(self, func, *args, **kwargs): warning_level_exceptions = ['because it is not running'] try: return func(*args, **kwargs) except Exception as exc: msg = "{0}".format(exc) log_func = rospy.logerr if [x for x in warning_level_exceptions if x in msg]: log_func = rospy.logwarn rospy.logdebug(traceback.format_exc()) log_func('{0}: {1}'.format(exc.__class__.__name__, msg)) raise
[docs] def handle_capability_events(self, event): """Callback for handling messages (events) from the /events topic This callback only process events generated by this node. :param event: ROS message encapsulating an event :type event: :py:class:`capabilities.msgs.CapabilityEvent` """ return self.__catch_and_log(self._handle_capability_events, event)
def _handle_capability_events(self, event): # Ignore any publications which we did not send (external publishers) if event._connection_header['callerid'] != rospy.get_name(): return # pragma: no cover # Ignore the `server_ready` event if event.type == event.SERVER_READY: return # Specially handle the nodelet manager if event.capability == _special_nodelet_manager_capability: if event.type == event.TERMINATED: if not rospy.is_shutdown(): rospy.logerr("Capability server's nodelet manager terminated unexpectedly.") self.shutdown() return # Update the capability capability = event.capability with self.__graph_lock: if capability not in self.__capability_instances.keys(): rospy.logerr("Unknown capability instance: '{0}'" .format(capability)) return instance = self.__capability_instances[capability] if event.type == event.LAUNCHED: if instance.canceled: # pragma: no cover # This is defensive programming, it should not happen self.__stop_capability(instance.name) else: instance.launched(event.pid) elif event.type == event.TERMINATED: instance.terminated() rospy.loginfo( "Capability Provider '{0}' for Capability '{1}' " .format(event.provider, event.capability) + "has terminated.") # Update the graph self.__update_graph() def __remove_terminated_capabilities(self): # collect all of the terminated capabilities terminated = [x for x in self.__capability_instances.values() if x.state == 'terminated'] # Remove terminated instances for instance in terminated: del self.__capability_instances[instance.interface] # Shutdown unused capabilities self.__cleanup_graph() def __cleanup_graph(self): """Iterate over the running capabilities and shutdown ones which are no longer needed For each running capability, if it was not started by the user then look at who depends on it. If no other capabilities depend on it, then shut it down. """ # Collect all running capabilities running_capabilities = [x for x in self.__capability_instances.values() if x.state == 'running'] for cap in running_capabilities: if cap.started_by == USER_SERVICE_REASON: # Started by user, do not garbage collect this continue rdepends = get_reverse_depends(cap.interface, self.__capability_instances.values()) if rdepends: # Someone depends on me, do not garbage collect this rospy.logdebug("Keeping the '{0}' provider of the '{1}' interface, ".format(cap.name, cap.interface) + "because other running capabilities depend on it.") continue if cap.state == 'running': rospy.loginfo("Stopping the '{0}' provider of the '{1}' interface, because it has no dependents left." .format(cap.name, cap.interface)) self.__stop_capability(cap.interface) elif cap.state == 'waiting': # pragma: no cover rospy.loginfo("Canceling the '{0}' provider of the '{1}' interface, because it has no dependents left." .format(cap.name, cap.interface)) cap.cancel() # Else the state is launching, stopping, or terminated # In which case launching will be caught on the next cleanup # and the latter two will get cleared out also. def __update_graph(self): # collect all of the waiting capabilities waiting = [x for x in self.__capability_instances.values() if x.state == 'waiting'] # If any of the waiting have no blocking dependencies start them for instance in waiting: blocking_dependencies = [] for dependency_name in instance.depends_on: if dependency_name not in self.__capability_instances: # pragma: no cover rospy.logerr( "Inconsistent capability run graph, '{0}' depends on " .format(instance.name) + "'{0}', ".format(dependency_name) + "which is not in the list of capability instances.") return dependency = self.__capability_instances[dependency_name] if dependency.state != 'running': blocking_dependencies.append(dependency) if not blocking_dependencies: instance.launch() self.__launch_manager.run_capability_provider( instance.provider, instance.provider_path ) # Remove any terminated capabilities self.__remove_terminated_capabilities() def __stop_capability(self, name): if name not in self.__capability_instances: rospy.logerr("Inconsistent capability run graph, asked to stop " + "capability '{0}', ".format(name) + "which is not in the list of capability instances.") return capability = self.__capability_instances[name] rdepends = get_reverse_depends(name, self.__capability_instances.values()) for cap in rdepends: if cap.state in ['stopping', 'terminated']: # pragma: no cover # It is possible that this cap was stopped by another cap in this list # This is purely defensive continue rospy.loginfo( "Capability '{0}' being stopped because its dependency '{1}' is being stopped.".format(cap.name, name)) self.__stop_capability(cap.interface) capability.stopped() self.__launch_manager.stop_capability_provider(capability.pid) def __get_provider_dependencies(self, provider): result = [] for interface, dep in provider.dependencies.items(): provider_name = dep.provider or self.__default_providers[interface] if provider_name not in self.__spec_index.providers: # This is the case where a provider depends on another interface, # but the preferred provider does not exist raise RuntimeError("Capability Provider '{0}' not found" .format(provider_name)) dep_provider = self.__spec_index.providers[provider_name] result.append((dep_provider, provider.name)) return result def __get_capability_instances_from_provider(self, provider): instances = [] providers = [(provider, USER_SERVICE_REASON)] while providers: curr, reason = providers.pop() providers.extend(self.__get_provider_dependencies(curr)) curr_path = self.__spec_index.provider_paths[curr.name] instances.append(CapabilityInstance(curr, curr_path, started_by=reason)) return instances def __get_providers_for_interface(self, interface, allow_semantic=False): valid_interfaces = [interface] if allow_semantic: # Add semantic interfaces which redefine this one valid_interfaces.extend( [k for k, v in self.__spec_index.semantic_interfaces.items() if v.redefines == interface] ) providers = dict([(n, p) for n, p in self.__spec_index.providers.items() if p.implements in valid_interfaces]) return providers # Could be empty def __start_capability(self, capability, preferred_provider): if capability not in self.__spec_index.interfaces.keys() + self.__spec_index.semantic_interfaces.keys(): raise RuntimeError("Capability '{0}' not found.".format(capability)) # If no preferred provider is given, use the default preferred_provider = preferred_provider or self.__default_providers[capability] providers = self.__get_providers_for_interface(capability, allow_semantic=True) if preferred_provider not in providers: raise RuntimeError( "Capability Provider '{0}' not found for Capability '{1}'" .format(preferred_provider, capability)) provider = providers[preferred_provider] instances = self.__get_capability_instances_from_provider(provider) with self.__graph_lock: # If the requested capability has an existing instance, we don't start it # again. Return a result that lets the callee know this happened. requested_instance = instances[0] if requested_instance.interface in self.__capability_instances: requested_instance_state = self.__capability_instances[ requested_instance.interface].state if requested_instance_state in ['running']: # Current instance is running (or will be soon) return StartCapabilityResponse.RESULT_CURRENTLY_RUNNING elif requested_instance_state in ['waiting', 'launching']: return StartCapabilityResponse.RESULT_CURRENTLY_STARTING elif requested_instance_state in ['stopping', 'terminated']: # Current instance is in the process of stopping return StartCapabilityResponse.RESULT_CURRENTLY_STOPPING else: raise RuntimeError( "Instance for capability '{0}' has improper state '{1}'" .format(capability, requested_instance_state)) for x in instances: if x.interface not in self.__capability_instances: self.__capability_instances[x.interface] = x self.__update_graph() return StartCapabilityResponse.RESULT_SUCCESS def handle_get_capability_specs(self, req): return self.__catch_and_log(self._handle_get_capability_specs, req) def _handle_get_capability_specs(self, req): rospy.loginfo("Servicing request for capability specs...") response = GetCapabilitySpecsResponse() for package_name, package_dict in self.spec_file_index.items(): for spec_type in ['capability_interface', 'semantic_capability_interface', 'capability_provider']: for path in package_dict[spec_type]: with open(path, 'r') as f: raw = f.read() default_provider = '' # If a capability interface, try to lookup the default provider iface = None if spec_type == 'capability_interface': iface = capability_interface_from_string(raw, path) if spec_type == 'semantic_capability_interface': iface = semantic_capability_interface_from_string(raw, path) if spec_type in ['capability_interface', 'semantic_capability_interface']: iface.name = '{package}/{name}'.format(package=package_name, name=iface.name) if iface.name not in self.__default_providers: default_provider = '' else: default_provider = self.__default_providers[iface.name] cs = CapabilitySpec(package_name, spec_type, raw, default_provider) response.capability_specs.append(cs) return response def handle_get_capability_spec(self, req): return self.__catch_and_log(self._handle_get_capability_spec, req) def _handle_get_capability_spec(self, req): rospy.loginfo("Servicing request for get capability spec '{0}'...".format(req.capability_spec)) response = GetCapabilitySpecResponse() for package_name, package_dict in self.spec_file_index.items(): for spec_type in ['capability_interface', 'semantic_capability_interface', 'capability_provider']: for path in package_dict[spec_type]: with open(path, 'r') as f: raw = f.read() default_provider = '' # If a capability interface, try to lookup the default provider iface = None if spec_type == 'capability_interface': iface = capability_interface_from_string(raw, path) if spec_type == 'semantic_capability_interface': iface = semantic_capability_interface_from_string(raw, path) if spec_type in ['capability_interface', 'semantic_capability_interface']: iface.name = '{package}/{name}'.format(package=package_name, name=iface.name) if iface.name not in self.__default_providers: default_provider = '' else: default_provider = self.__default_providers[iface.name] if iface and iface.name == req.capability_spec: response.capability_spec = CapabilitySpec(package_name, spec_type, raw, default_provider) return response raise RuntimeError("Could not find requested spec '{0}'".format(req.capability_spec)) def handle_start_capability(self, req): return self.__catch_and_log(self._handle_start_capability, req) def _handle_start_capability(self, req): msg = "Request to start capability '{0}'".format(req.capability) if req.preferred_provider: msg += " with provider '{0}'".format(req.preferred_provider) rospy.loginfo(msg) ret = self.__start_capability(req.capability, req.preferred_provider) return StartCapabilityResponse(ret) def handle_stop_capability(self, req): return self.__catch_and_log(self._handle_stop_capability, req) def _handle_stop_capability(self, req): rospy.loginfo("Request to stop capability '{0}'".format(req.capability)) capability = req.capability if capability not in self.__capability_instances: raise RuntimeError("No Capability '{0}' running".format(capability)) self.__stop_capability(capability) return StopCapabilityResponse(True) def handle_establish_bond(self, req): return self.__catch_and_log(self._handle_establish_bond, req) def _handle_establish_bond(self, req): rospy.loginfo("Request to establish a bond") bond_id = str(uuid.uuid1()) def on_formed(): rospy.loginfo("Bond formed with bond_id of '{0}'" .format(bond_id)) def on_broken(): # if bond_id in self.__bonds: rospy.loginfo("Bond with bond id '{0}' was broken, freeing associated capabilities" .format(bond_id)) self.__free_capabilities_by_bond_id(bond_id) del self.__bonds[bond_id] self.__bonds[bond_id] = Bond(self.__bond_topic, bond_id, on_broken=on_broken, on_formed=on_formed) self.__bonds[bond_id].start() return EstablishBondResponse(bond_id) def __free_capabilities_by_bond_id(self, bond_id): if bond_id in self.__bonds: for capability in self.__capability_instances.values(): if bond_id in capability.bonds: del capability.bonds[bond_id] if capability.reference_count == 0: rospy.loginfo("Capability '{0}' being stopped because it has zero references" .format(capability.interface)) self.__stop_capability(capability.interface) def __free_capability(self, capability_name, bond_id): if capability_name not in self.__capability_instances: # If you update this exception's message, then update the corresponding code # in capabilities.client.CapabilitiesClient.free_capability() raise RuntimeError("Cannot free Capability '{0}', because it is not running".format(capability_name)) capability = self.__capability_instances[capability_name] if bond_id not in capability.bonds: raise RuntimeError("Given bond_id '{0}' not associated with given capability '{1}'" .format(bond_id, capability_name)) if capability.bonds[bond_id] == 0: # pragma: no cover # this is defensive, it should never happen raise RuntimeError("Cannot free capability '{0}' for bond_id '{1}', it already has a reference count of 0" .format(capability_name, bond_id)) capability.bonds[bond_id] -= 1 if capability.reference_count == 0: rospy.loginfo("Capability '{0}' being stopped because it has zero references" .format(capability.interface)) self.__stop_capability(capability.interface) def handle_free_capability(self, req): return self.__catch_and_log(self._handle_free_capability, req) def _handle_free_capability(self, req): rospy.loginfo("Request to free usage of capability '{0}' (bond id '{1}')" .format(req.capability, req.bond_id)) self.__free_capability(req.capability, req.bond_id) return FreeCapabilityResponse() def handle_use_capability(self, req): return self.__catch_and_log(self._handle_use_capability, req) def _handle_use_capability(self, req): msg = "Request to use capability '{0}'".format(req.capability) if req.preferred_provider: msg += " with provider '{0}'".format(req.preferred_provider) rospy.loginfo(msg) # Make sure the bond_id is valid if req.bond_id not in self.__bonds: raise RuntimeError("Invalid bond_id given to ~use_capability: '{0}'".format(req.bond_id)) # Start the capability if it is not already running if req.capability not in self.__capability_instances: # This will raise if it failes to start the capability self.__start_capability(req.capability, req.preferred_provider) assert req.capability in self.__capability_instances # Should be true # Get a handle ont he capability capability = self.__capability_instances[req.capability] if req.preferred_provider and capability.name != req.preferred_provider: raise RuntimeError("Requested to use capability '{0}' with preferred provider '{1}', " .format(capability.interface, req.preferred_provider) + "but the capability is already running with provider '{0}'" .format(capability.name)) if req.bond_id not in capability.bonds: capability.bonds[req.bond_id] = 0 capability.bonds[req.bond_id] += 1 return UseCapabilityResponse() def handle_reload_request(self, req): return self.__catch_and_log(self._handle_reload_request, req) def _handle_reload_request(self, req): rospy.loginfo("Reloading capabilities...") self.__load_capabilities() return EmptyResponse() def handle_get_interfaces(self, req): return self.__catch_and_log(self._handle_get_interfaces, req) def _handle_get_interfaces(self, req): return GetInterfacesResponse(self.__spec_index.interface_names) def handle_get_providers(self, req): return self.__catch_and_log(self._handle_get_providers, req) def _handle_get_providers(self, req): if req.interface: if req.interface not in self.__spec_index.interfaces.keys() + self.__spec_index.semantic_interfaces.keys(): raise RuntimeError("Capability Interface '{0}' not found.".format(req.interface)) providers = self.__get_providers_for_interface(req.interface, allow_semantic=req.include_semantic).keys() default_provider = self.__default_providers[req.interface] else: providers = self.__spec_index.provider_names default_provider = '' return GetProvidersResponse(providers, default_provider) def handle_get_semantic_interfaces(self, req): return self.__catch_and_log(self._handle_get_semantic_interfaces, req) def _handle_get_semantic_interfaces(self, req): if req.interface: sifaces = [si.name for si in self.__spec_index.semantic_interfaces.values() if si.redefines == req.interface] else: sifaces = self.__spec_index.semantic_interface_names return GetSemanticInterfacesResponse(sifaces) def handle_get_running_capabilities(self, req): return self.__catch_and_log(self._handle_get_running_capabilities, req) def _handle_get_running_capabilities(self, req): resp = GetRunningCapabilitiesResponse() for instance in self.__capability_instances.values(): if instance.state not in ['running']: # pragma: no cover continue running_capability = RunningCapability() running_capability.capability = Capability(instance.interface, instance.name) running_capability.started_by = instance.started_by running_capability.pid = instance.pid rdepends = get_reverse_depends(instance.interface, self.__capability_instances.values()) for dep in rdepends: running_capability.dependent_capabilities.append(Capability(dep.interface, dep.name)) resp.running_capabilities.append(running_capability) return resp def handle_get_nodelet_manager_name(self, req): return self.__catch_and_log(self._handle_get_nodelet_manager_name, req) def _handle_get_nodelet_manager_name(self, req): resp = GetNodeletManagerNameResponse() resp.nodelet_manager_name = self.__launch_manager.nodelet_manager_name return resp def handle_get_remappings(self, req): return self.__catch_and_log(self._handle_get_remappings, req) def _handle_get_remappings(self, req): interface = None if req.spec in self.__capability_instances.keys(): interface = self.__capability_instances[req.spec] else: providers = dict([(i.provider.name, i) for i in self.__capability_instances.values()]) if req.spec not in providers: raise RuntimeError("Spec '{0}' is neither a running Interface nor a running Provider." .format(req.spec)) interface = providers[req.spec] resp = GetRemappingsResponse() remappings = { 'topics': {}, 'services': {}, 'actions': {}, 'parameters': {} } # Iterate this instance and its recursive dependencies for iface in reversed(self.__get_capability_instances_from_provider(interface.provider)): # For each iterate over their remappings and add them to the combined remappings, # flattening the remappings as you go for map_type, mapping in iface.provider.remappings_by_type.items(): assert map_type in remappings remappings[map_type].update(mapping) # Collapse remapping chains for mapping in remappings.values(): for key, value in mapping.items(): if value in mapping: mapping[key] = mapping[value] del mapping[value] for map_type, mapping in remappings.items(): resp_mapping = getattr(resp, map_type) for key, value in mapping.items(): remapping = Remapping() remapping.key = key remapping.value = value resp_mapping.append(remapping) return resp
def create_parser(): parser = argparse.ArgumentParser(description="Runs the capability server") add = parser.add_argument add('package_path', nargs='?', help="Overrides ROS_PACKAGE_PATH when discovering capabilities") add('--screen', '-s', action='store_true', default=False, help="Passes `--screen` down to roslaunch, `use_screen` rosparam takes priority.") return parser def main(sysargv=None): sys.argv = rospy.myargv(argv=sys.argv) parser = create_parser() args = parser.parse_args(sysargv) ros_package_path = args.package_path or os.getenv('ROS_PACKAGE_PATH', '') ros_package_path = [x for x in ros_package_path.split(':') if x] if not ros_package_path: sys.exit('No package paths specified, set ROS_PACKAGE_PATH or ' 'pass them as an argument') # Extend the ROS_PACKAGE_PATH os.environ['ROS_PACKAGE_PATH'] = ':'.join( os.getenv('ROS_PACKAGE_PATH', '').split(':') + ros_package_path) rospy.init_node('capability_server') capability_server = CapabilityServer(ros_package_path, screen=args.screen) capability_server.spin() capability_server.shutdown()