Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 from rospy import get_published_topics
00035 from rosservice import get_service_list
00036 from rosservice import get_service_type as rosservice_get_service_type
00037 from rosservice import get_service_node as rosservice_get_service_node
00038 from rosservice import get_service_uri
00039 from rostopic import find_by_type
00040 from ros import rosnode, rosgraph
00041 from rosnode import get_node_names
00042 from rosgraph.masterapi import Master
00043
00044 from rosapi.msg import TypeDef
00045
00046
00047 def get_topics():
00048 """ Returns a list of all the topics being published in the ROS system """
00049 return [x[0] for x in get_published_topics()]
00050
00051
00052 def get_topics_for_type(type):
00053 return find_by_type(type)
00054
00055
00056 def get_services():
00057 """ Returns a list of all the services advertised in the ROS system """
00058 return get_service_list()
00059
00060
00061 def get_nodes():
00062 """ Returns a list of all the nodes registered in the ROS system """
00063 return rosnode.get_node_names()
00064
00065
00066 def get_topic_type(topic):
00067 """ Returns the type of the specified ROS topic """
00068
00069 for x in get_published_topics():
00070 if x[0]==topic:
00071 return x[1]
00072
00073 return ""
00074
00075
00076 def get_service_type(service):
00077 """ Returns the type of the specified ROS service, """
00078 try:
00079 return rosservice_get_service_type(service)
00080 except:
00081 return ""
00082
00083
00084 def get_publishers(topic):
00085 """ Returns a list of node names that are publishing the specified topic """
00086 try:
00087 publishers, subscribers, services = Master('/rosbridge').getSystemState()
00088 pubdict = dict(publishers)
00089 if topic in pubdict:
00090 return pubdict[topic]
00091 else:
00092 return []
00093 except socket.error:
00094 return []
00095
00096
00097 def get_subscribers(topic):
00098 """ Returns a list of node names that are subscribing to the specified topic """
00099 try:
00100 publishers, subscribers, services = Master('/rosbridge').getSystemState()
00101 subdict = dict(subscribers)
00102 if topic in subdict:
00103 return subdict[topic]
00104 else:
00105 return []
00106 except socket.error:
00107 return []
00108
00109
00110 def get_service_providers(servicetype):
00111 """ Returns a list of node names that are advertising a service with the specified type """
00112 try:
00113 publishers, subscribers, services = Master('/rosbridge').getSystemState()
00114 servdict = dict(services)
00115 if servicetype in servdict:
00116 return servdict[servicetype]
00117 else:
00118 return []
00119 except socket.error:
00120 return []
00121
00122
00123 def get_service_node(service):
00124 """ Returns the name of the node that is providing the given service, or empty string """
00125 node = rosservice_get_service_node(service)
00126 if node==None:
00127 node = ""
00128 return node
00129
00130
00131 def get_service_host(service):
00132 """ Returns the name of the machine that is hosting the given service, or empty string """
00133 uri = get_service_uri(service)
00134 if uri==None:
00135 uri = ""
00136 else:
00137 uri = uri[9:]
00138 uri = uri[:uri.find(':')]
00139 return uri