Source code for master_discovery_fkie.interface_finder

# Software License Agreement (BSD License)
#
# Copyright (c) 2012, Fraunhofer FKIE/US, Alexander Tiderko
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
#  * Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
#  * Redistributions in binary form must reproduce the above
#    copyright notice, this list of conditions and the following
#    disclaimer in the documentation and/or other materials provided
#    with the distribution.
#  * Neither the name of Fraunhofer nor the names of its
#    contributors may be used to endorse or promote products derived
#    from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

import time
import xmlrpclib

import rospy
from .common import get_hostname

[docs]def get_changes_topic(masteruri, wait=True): ''' Search in publishers of ROS master for a topic with type `master_discovery_fkie.msg.MasterState <http://www.ros.org/doc/api/master_discovery_fkie/html/msg/MasterState.html>`_ and returns his name, if it runs on the local host. Returns empty list if no topic was found and `wait` is ``False``. :param masteruri: the URI of the ROS master :type masteruri: str :param wait: check every second for the topic :type wait: bool :return: the list with names of the topics of type `master_discovery_fkie.msg.MasterState <http://www.ros.org/doc/api/master_discovery_fkie/html/msg/MasterState.html>`_ :rtype: list of strings ''' return _get_topic(masteruri, 'MasterState', wait)
[docs]def get_stats_topic(masteruri, wait=True): ''' Search in publishers of ROS master for a topic with type LinkStatesStamped and returns his name, if it runs on the local host. Returns empty list if no topic was found and `wait` is ``False``. :param masteruri: the URI of the ROS master :type masteruri: str :param wait: check every second for the topic :type wait: bool :return: the list of names of the topic with type `master_discovery_fkie.msg.LinkStatesStamped <http://www.ros.org/doc/api/master_discovery_fkie/html/msg/LinkStatesStamped.html>`_ :rtype: list of strings ''' return _get_topic(masteruri, 'LinkStatesStamped', wait)
def _get_topic(masteruri, ttype, wait=True): ''' Search in publishers of ROS master for a topic with given type and returns his name, if it runs on the local host. Returns empty list if no topic was found and `wait` is ``False``. :param masteruri: the URI of the ROS master :type masteruri: str :param ttype: the type of the topic :type ttype: str :param wait: check every second for the topic :type wait: bool :return: the list of names of the topic with type `master_discovery_fkie.msg.LinkStatesStamped <http://www.ros.org/doc/api/master_discovery_fkie/html/msg/LinkStatesStamped.html>`_ :rtype: list of strings ''' result = [] while not result and not rospy.is_shutdown(): master = xmlrpclib.ServerProxy(masteruri) # get the system state to resolve the published nodes code, _, state = master.getSystemState(rospy.get_name()) # read topic types code, msg, val = master.getPublishedTopics(rospy.get_name(), '') if code == 1: # search for a topic with type MasterState for topic, topic_type in val: if topic_type.endswith(ttype): # get the name of the publisher node for t, l in state[0]: if topic == t: # get the URI of the publisher node for n in l: code, msg, val = master.lookupNode(rospy.get_name(), n) # only local publisher will be tacked if code == 1 and get_hostname(val) == get_hostname(masteruri): result.append(topic) if not result and wait: rospy.logwarn("Master_discovery node appear not to running. Wait for topic with type '%s." % ttype) time.sleep(1) elif not result and wait: rospy.logwarn("Can't get published topics from ROS master: %s, %s. Will keep trying!" % (code, msg)) time.sleep(1) if not wait: return result return result
[docs]def get_listmaster_service(masteruri, wait=True): ''' Search in services of ROS master for a service with name ending by `list_masters` and returns his name, if it runs on the local host. Returns empty list if no service was found and `wait` is ``False``. :param masteruri: the URI of the ROS master :type masteruri: str :param wait: check every second for the service :type wait: boo :return: the list with names of the services ending with `list_masters` :rtype: list of strings ''' return _get_service(masteruri, 'list_masters', wait)
[docs]def get_refresh_service(masteruri, wait=True): ''' Search in services of ROS master for a service with name ending by `refresh` and returns his name, if it runs on the local host. Returns empty list if no service was found and `wait` is ``False``. :param masteruri: the URI of the ROS master :type masteruri: str :param wait: check every second for the service :type wait: boo :return: the list with names of the services ending with `refresh` :rtype: list of strings ''' return _get_service(masteruri, 'refresh', wait)
def _get_service(masteruri, name, wait=True): ''' Search in services of ROS master for a service with name ending by given name and returns his name, if it runs on the local host. Returns empty list if no service was found and `wait` is ``False``. :param masteruri: the URI of the ROS master :type masteruri: str :param name: the ending name of the service :type name: str :param wait: check every second for the service :type wait: bool :return: the list with names of the services ending with `refresh` :rtype: list of strings ''' result = [] while not result and not rospy.is_shutdown(): master = xmlrpclib.ServerProxy(masteruri) code, msg, val = master.getSystemState(rospy.get_name()) if code == 1: pubs, subs, srvs = val # search for a service for srv, providers in srvs: if srv.endswith(name): # only local service will be tacked code, msg, val = master.lookupService(rospy.get_name(), srv) if code == 1 and get_hostname(val) == get_hostname(masteruri): result.append(srv) if not result and wait: rospy.logwarn("Master_discovery node appear not to running. Wait for service '%s'." % name) time.sleep(1) elif not result and wait: rospy.logwarn("can't get state from ROS master: %s, %s" % (code, msg)) time.sleep(1) if not wait: return result return result