Source code for rocon_python_comms.services

#
# License: BSD
#   https://raw.github.com/robotics-in-concert/rocon_tools/license/LICENSE
#
##############################################################################
# Description
##############################################################################

"""
.. module:: services
   :platform: Unix
   :synopsis: Useful methods relating to ros services.


This module contains anything relating to introspection or manipulation
of ros services.

----

"""
##############################################################################
# Imports
##############################################################################

import rosgraph
import rospy
import socket
import time
from rosservice import ROSServiceIOException, get_service_headers
from rosgraph.network import ROSHandshakeException

# Local imports
from .exceptions import NotFoundException

##############################################################################
# Find topics/services
##############################################################################


[docs]def service_is_available(service_name): ''' Check whether the specific service is validated or not as retrieving from master state. :param: str service_name service name checking validtation :returns: result of service's validation :rtype: bool ''' master = rosgraph.Master(rospy.get_name()) _, _, services = master.getSystemState() is_available = False for srv_name, unused_node_name in services: if service_name == srv_name: is_available = True break return is_available
[docs]def find_service(service_type, timeout=rospy.rostime.Duration(5.0), unique=False): ''' Do a lookup to find services of the type specified. This will raise exceptions if it times out or returns multiple values. It can apply the additional logic of whether this should return a single unique result, or a list. Under the hood this calls out to the ros master for a list of registered services and it parses that to determine the result. If nothing is found, it loops around internally on a 10Hz loop until the result is found or the specified timeout is reached. .. warning:: This api is particularly dangerous, especially if used across a wireless connection as it creates a socket connection to the master's lookupService api for every service on the system while doing its hunting. More information in: * https://github.com/robotics-in-concert/rocon_tools/issues/68 Usage: .. code-block:: python from rocon_python_comms import find_service try: service_name = rocon_python_comms.find_service('rocon_interaction_msgs/SetInteractions', timeout=rospy.rostime.Duration(15.0), unique=True) except rocon_python_comms.NotFoundException as e: rospy.logwarn("failed to find the set_interactions service.") :param str service_type: service type specification, e.g. concert_msgs/GetInteractions :param rospy.Duration timeout: raise an exception if nothing is found before this timeout occurs. :param bool unique: flag to select the lookup behaviour (single/multiple results) :returns: the fully resolved name of the service (unique) or list of names (non-unique) :rtype: str :raises: :exc:`.NotFoundException` :raises: :exc:`rospy.ROSInterruptException` : if ros has shut down while searching. ''' # we could use rosservice_find here, but that throws exceptions and aborts if it comes # across any rosservice on the system which is no longer valid. To be robust against this # I've just pulled the internals of rosservice_find (ugh its fugly) and replicated it here # with the only difference in that I continue over that exception. unique_service_name = None service_names = [] start_time = time.time() timeout_time = start_time + timeout.to_sec() master = rosgraph.Master(rospy.get_name()) while not rospy.is_shutdown() and time.time() < timeout_time and not service_names: services_information = [] try: _, _, services = master.getSystemState() for service_name, unused_node_name in services: service_uri = master.lookupService(service_name) services_information.append((service_name, service_uri)) except (rosgraph.masterapi.Error, rosgraph.masterapi.Failure, socket.error) as e: raise NotFoundException("unable to communicate with the master [%s]" % str(e)) for (service_name, service_uri) in services_information: try: next_service_type = get_service_headers(service_name, service_uri).get('type', None) except ROSHandshakeException: rospy.logwarn("Handshake exception after waiting for {0} secs".format(time.time() - start_time)) # happens when shutting down (should handle this somehow) continue except ROSServiceIOException: # should also catch socket.error? # ignore this - it is usually a sign of a bad service that could be thrown # up by somebody else and not what we're trying to find. If we can skip past it # here we can be robust to other people's problems. continue if next_service_type == service_type: service_names.append(service_name) if unique: if len(service_names) > 1: raise NotFoundException("multiple services found %s." % service_names) elif len(service_names) == 1: unique_service_name = service_names[0] if not service_names: rospy.rostime.wallsleep(0.1) if not service_names: if rospy.is_shutdown(): raise rospy.ROSInterruptException("ros shut down") else: raise NotFoundException("timed out or ros") return unique_service_name if unique_service_name else service_names