Go to the documentation of this file.00001
00002 from rospy import get_published_topics
00003 from rosservice import get_service_list
00004 from rosservice import get_service_type as rosservice_get_service_type
00005 from rosservice import get_service_node as rosservice_get_service_node
00006 from rosservice import get_service_uri
00007 from rostopic import find_by_type
00008 from ros import rosnode, rosgraph
00009 from rosnode import get_node_names
00010 from rosgraph.masterapi import Master
00011
00012 from rosapi.msg import TypeDef
00013
00014
00015 def get_topics():
00016 """ Returns a list of all the topics being published in the ROS system """
00017 return [x[0] for x in get_published_topics()]
00018
00019
00020 def get_topics_for_type(type):
00021 return find_by_type(type)
00022
00023
00024 def get_services():
00025 """ Returns a list of all the services advertised in the ROS system """
00026 return get_service_list()
00027
00028
00029 def get_nodes():
00030 """ Returns a list of all the nodes registered in the ROS system """
00031 return rosnode.get_node_names()
00032
00033
00034 def get_topic_type(topic):
00035 """ Returns the type of the specified ROS topic """
00036
00037 for x in get_published_topics():
00038 if x[0]==topic:
00039 return x[1]
00040
00041 return ""
00042
00043
00044 def get_service_type(service):
00045 """ Returns the type of the specified ROS service, """
00046 try:
00047 return rosservice_get_service_type(service)
00048 except:
00049 return ""
00050
00051
00052 def get_publishers(topic):
00053 """ Returns a list of node names that are publishing the specified topic """
00054 try:
00055 publishers, subscribers, services = Master('/rosbridge').getSystemState()
00056 pubdict = dict(publishers)
00057 if topic in pubdict:
00058 return pubdict[topic]
00059 else:
00060 return []
00061 except socket.error:
00062 return []
00063
00064
00065 def get_subscribers(topic):
00066 """ Returns a list of node names that are subscribing to the specified topic """
00067 try:
00068 publishers, subscribers, services = Master('/rosbridge').getSystemState()
00069 subdict = dict(subscribers)
00070 if topic in subdict:
00071 return subdict[topic]
00072 else:
00073 return []
00074 except socket.error:
00075 return []
00076
00077
00078 def get_service_providers(servicetype):
00079 """ Returns a list of node names that are advertising a service with the specified type """
00080 try:
00081 publishers, subscribers, services = Master('/rosbridge').getSystemState()
00082 servdict = dict(services)
00083 if servicetype in servdict:
00084 return servdict[servicetype]
00085 else:
00086 return []
00087 except socket.error:
00088 return []
00089
00090
00091 def get_service_node(service):
00092 """ Returns the name of the node that is providing the given service, or empty string """
00093 node = rosservice_get_service_node(service)
00094 if node==None:
00095 node = ""
00096 return node
00097
00098
00099 def get_service_host(service):
00100 """ Returns the name of the machine that is hosting the given service, or empty string """
00101 uri = get_service_uri(service)
00102 if uri==None:
00103 uri = ""
00104 else:
00105 uri = uri[9:]
00106 uri = uri[:uri.find(':')]
00107 return uri