proxy.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2012, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 
00034 import fnmatch
00035 import socket
00036 from rosservice import get_service_list
00037 from rosservice import get_service_type as rosservice_get_service_type
00038 from rosservice import get_service_node as rosservice_get_service_node
00039 from rosservice import get_service_uri
00040 from rosservice import rosservice_find
00041 from rostopic import find_by_type
00042 from rostopic import get_topic_type as rosservice_get_topic_type
00043 from ros import rosnode, rosgraph
00044 from rosnode import get_node_names
00045 from rosgraph.masterapi import Master
00046 
00047 from rosapi.msg import TypeDef
00048 
00049 from .glob_helper import filter_globs, any_match
00050 
00051 
00052 def get_topics(topics_glob):
00053     """ Returns a list of all the active topics in the ROS system """
00054     try:
00055         publishers, subscribers, services = Master('/rosbridge').getSystemState()
00056         # Filter the list of topics by whether they are public before returning.
00057         return filter_globs(topics_glob,
00058                             list(set([x for x, _ in publishers] + [x for x, _, in subscribers])))
00059     except:
00060         return []
00061 
00062 
00063 def get_topics_types(topics, topics_glob):
00064     try:
00065         types = []
00066         for i in topics:
00067             types.append(get_topic_type(i, topics_glob))
00068         return types
00069     except:
00070         return[]
00071 
00072 
00073 def get_topics_for_type(type, topics_glob):
00074     # Filter the list of topics by whether they are public before returning.
00075     return filter_globs(topics_glob, find_by_type(type))
00076 
00077 
00078 def get_services(services_glob):
00079     """ Returns a list of all the services advertised in the ROS system """
00080     # Filter the list of services by whether they are public before returning.
00081     return filter_globs(services_glob, get_service_list())
00082 
00083 
00084 def get_services_for_type(service_type, services_glob):
00085     """ Returns a list of services as specific service type """
00086     # Filter the list of services by whether they are public before returning.
00087     return filter_globs(services_glob, rosservice_find(service_type))
00088 
00089 
00090 def get_nodes():
00091     """ Returns a list of all the nodes registered in the ROS system """
00092     return rosnode.get_node_names()
00093 
00094 
00095 def get_node_publications(node):
00096     """ Returns a list of topic names that are been published by the specified node """
00097     try:
00098         publishers, subscribers, services = Master('/rosbridge').getSystemState()
00099         toReturn = []
00100         for i, v in publishers:
00101             if node in v:
00102                 toReturn.append(i)
00103         toReturn.sort()
00104         return toReturn
00105     except socket.error:
00106         return []
00107 
00108 
00109 def get_node_subscriptions(node):
00110     """ Returns a list of topic names that are been subscribed by the specified node """
00111     try:
00112         publishers, subscribers, services = Master('/rosbridge').getSystemState()
00113         toReturn = []
00114         for i, v in subscribers:
00115             if node in v:
00116                 toReturn.append(i)
00117         toReturn.sort()
00118         return toReturn
00119     except socket.error:
00120         return []
00121 
00122 
00123 def get_node_services(node):
00124     """ Returns a list of service names that are been hosted by the specified node """
00125     try:
00126         publishers, subscribers, services = Master('/rosbridge').getSystemState()
00127         toReturn = []
00128         for i, v in services:
00129             if node in v:
00130                 toReturn.append(i)
00131         toReturn.sort()
00132         return toReturn
00133     except socket.error:
00134         return []
00135 
00136 
00137 def get_topic_type(topic, topics_glob):
00138     """ Returns the type of the specified ROS topic """
00139     # Check if the topic is hidden or public.
00140     # If all topics are public then the type is returned
00141     if any_match(str(topic), topics_glob):
00142         # If the topic is published, return its type
00143         topic_type, _, _ = rosservice_get_topic_type(topic)
00144         if topic_type is None:
00145             # Topic isn't published so return an empty string
00146             return ""
00147         return topic_type
00148     else:
00149         # Topic is hidden so return an empty string
00150         return ""
00151 
00152 
00153 def filter_action_servers(topics):
00154     """ Returns a list of action servers """
00155     action_servers = []
00156     possible_action_server = ''
00157     possibility = [0, 0, 0, 0, 0]
00158 
00159     action_topics = ['cancel', 'feedback', 'goal', 'result', 'status']
00160     for topic in sorted(topics):
00161         split = topic.split('/')
00162         if(len(split) >= 3):
00163             topic = split.pop()
00164             namespace = '/'.join(split)
00165             if(possible_action_server != namespace):
00166                 possible_action_server = namespace
00167                 possibility = [0, 0, 0, 0, 0]
00168             if possible_action_server == namespace and topic in action_topics:
00169                 possibility[action_topics.index(topic)] = 1
00170         if all(p == 1 for p in possibility):
00171             action_servers.append(possible_action_server)
00172 
00173     return action_servers
00174 
00175 
00176 def get_service_type(service, services_glob):
00177     """ Returns the type of the specified ROS service, """
00178     # Check if the service is hidden or public.
00179     if any_match(str(service), services_glob):
00180         try:
00181             return rosservice_get_service_type(service)
00182         except:
00183             return ""
00184     else:
00185         # Service is hidden so return an empty string.
00186         return ""
00187 
00188 
00189 def get_publishers(topic, topics_glob):
00190     """ Returns a list of node names that are publishing the specified topic """
00191     try:
00192         if any_match(str(topic), topics_glob):
00193             publishers, subscribers, services = Master('/rosbridge').getSystemState()
00194             pubdict = dict(publishers)
00195             if topic in pubdict:
00196                 return pubdict[topic]
00197             else:
00198                 return []
00199         else:
00200             return []
00201     except socket.error:
00202         return []
00203 
00204 
00205 def get_subscribers(topic, topics_glob):
00206     """ Returns a list of node names that are subscribing to the specified topic """
00207     try:
00208         if any_match(str(topic), topics_glob):
00209             publishers, subscribers, services = Master('/rosbridge').getSystemState()
00210             subdict = dict(subscribers)
00211             if topic in subdict:
00212                 return subdict[topic]
00213             else:
00214                 return []
00215         else:
00216             return []
00217     except socket.error:
00218         return []
00219 
00220 
00221 def get_service_providers(servicetype, services_glob):
00222     """ Returns a list of node names that are advertising a service with the specified type """
00223     try:
00224         if any_match(str(topic), services_glob):
00225             publishers, subscribers, services = Master('/rosbridge').getSystemState()
00226             servdict = dict(services)
00227             if servicetype in servdict:
00228                 return servdict[servicetype]
00229             else:
00230                 return []
00231         else:
00232             return []
00233     except socket.error:
00234         return []
00235 
00236 
00237 def get_service_node(service):
00238     """ Returns the name of the node that is providing the given service, or empty string """
00239     node = rosservice_get_service_node(service)
00240     if node == None:
00241         node = ""
00242     return node
00243 
00244 
00245 def get_service_host(service):
00246     """ Returns the name of the machine that is hosting the given service, or empty string """
00247     uri = get_service_uri(service)
00248     if uri == None:
00249         uri = ""
00250     else:
00251         uri = uri[9:]
00252         uri = uri[:uri.find(':')]
00253     return uri


rosapi
Author(s): Jonathan Mace
autogenerated on Thu Jun 6 2019 21:51:49