Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 from rosservice import get_service_list
00035 from rosservice import get_service_type as rosservice_get_service_type
00036 from rosservice import get_service_node as rosservice_get_service_node
00037 from rosservice import get_service_uri
00038 from rosservice import rosservice_find
00039 from rostopic import find_by_type
00040 from rostopic import get_topic_type as rosservice_get_topic_type
00041 from ros import rosnode, rosgraph
00042 from rosnode import get_node_names
00043 from rosgraph.masterapi import Master
00044
00045 from rosapi.msg import TypeDef
00046
00047
00048 def get_topics():
00049 """ Returns a list of all the active topics in the ROS system """
00050 try:
00051 publishers, subscribers, services = Master('/rosbridge').getSystemState()
00052 return list(set([x for x, _ in publishers] + [x for x, _, in subscribers]))
00053 except:
00054 return []
00055
00056
00057 def get_topics_for_type(type):
00058 return find_by_type(type)
00059
00060
00061 def get_services():
00062 """ Returns a list of all the services advertised in the ROS system """
00063 return get_service_list()
00064
00065
00066 def get_services_for_type(service_type):
00067 """ Returns a list of services as specific service type """
00068 return rosservice_find(service_type)
00069
00070
00071 def get_nodes():
00072 """ Returns a list of all the nodes registered in the ROS system """
00073 return rosnode.get_node_names()
00074
00075
00076 def get_topic_type(topic):
00077 """ Returns the type of the specified ROS topic """
00078
00079 topic_type, _, _ = rosservice_get_topic_type(topic)
00080 if topic_type is None:
00081
00082 return ""
00083 return topic_type
00084
00085
00086 def get_service_type(service):
00087 """ Returns the type of the specified ROS service, """
00088 try:
00089 return rosservice_find(service)
00090 except:
00091 return ""
00092
00093
00094 def get_publishers(topic):
00095 """ Returns a list of node names that are publishing the specified topic """
00096 try:
00097 publishers, subscribers, services = Master('/rosbridge').getSystemState()
00098 pubdict = dict(publishers)
00099 if topic in pubdict:
00100 return pubdict[topic]
00101 else:
00102 return []
00103 except socket.error:
00104 return []
00105
00106
00107 def get_subscribers(topic):
00108 """ Returns a list of node names that are subscribing to the specified topic """
00109 try:
00110 publishers, subscribers, services = Master('/rosbridge').getSystemState()
00111 subdict = dict(subscribers)
00112 if topic in subdict:
00113 return subdict[topic]
00114 else:
00115 return []
00116 except socket.error:
00117 return []
00118
00119
00120 def get_service_providers(servicetype):
00121 """ Returns a list of node names that are advertising a service with the specified type """
00122 try:
00123 publishers, subscribers, services = Master('/rosbridge').getSystemState()
00124 servdict = dict(services)
00125 if servicetype in servdict:
00126 return servdict[servicetype]
00127 else:
00128 return []
00129 except socket.error:
00130 return []
00131
00132
00133 def get_service_node(service):
00134 """ Returns the name of the node that is providing the given service, or empty string """
00135 node = rosservice_get_service_node(service)
00136 if node == None:
00137 node = ""
00138 return node
00139
00140
00141 def get_service_host(service):
00142 """ Returns the name of the machine that is hosting the given service, or empty string """
00143 uri = get_service_uri(service)
00144 if uri == None:
00145 uri = ""
00146 else:
00147 uri = uri[9:]
00148 uri = uri[:uri.find(':')]
00149 return uri