36 from rosservice
import get_service_list
37 from rosservice
import get_service_type
as rosservice_get_service_type
38 from rosservice
import get_service_node
as rosservice_get_service_node
39 from rosservice
import get_service_uri
40 from rosservice
import rosservice_find
41 from rostopic
import find_by_type
42 from rostopic
import get_topic_type
as rosservice_get_topic_type
43 from rosnode
import get_node_names
44 from rosgraph.masterapi
import Master
46 from rosapi.msg
import TypeDef
48 from .glob_helper
import filter_globs, any_match
52 """ Returns a list of all the active topics in the ROS system """ 57 master = Master(
'/rosbridge')
58 topic_types = master.getTopicTypes()
59 publishers, subscribers, services = master.getSystemState()
60 topics = set([x
for x, _
in publishers] + [x
for x, _
in subscribers])
64 topic_types = [x
for x
in topic_types
if x[0]
in topics]
67 unknown_type = topics.difference([x
for x, _
in topic_types])
68 return zip(* topic_types + [[x,
'']
for x
in unknown_type])
79 """ Returns a list of all the services advertised in the ROS system """ 85 """ Returns a list of services as specific service type """ 87 return filter_globs(services_glob, rosservice_find(service_type))
91 """ Returns a list of all the nodes registered in the ROS system """ 92 return get_node_names()
96 """ Returns a list of topic names that are been published by the specified node """ 98 publishers, subscribers, services = Master(
'/rosbridge').getSystemState()
100 for i, v
in publishers:
110 """ Returns a list of topic names that are been subscribed by the specified node """ 112 publishers, subscribers, services = Master(
'/rosbridge').getSystemState()
114 for i, v
in subscribers:
124 """ Returns a list of service names that are been hosted by the specified node """ 126 publishers, subscribers, services = Master(
'/rosbridge').getSystemState()
128 for i, v
in services:
138 """ Returns the type of the specified ROS topic """ 143 topic_type, _, _ = rosservice_get_topic_type(topic)
144 if topic_type
is None:
154 """ Returns a list of action servers """ 156 possible_action_server =
'' 157 possibility = [0, 0, 0, 0, 0]
159 action_topics = [
'cancel',
'feedback',
'goal',
'result',
'status']
160 for topic
in sorted(topics):
161 split = topic.split(
'/')
164 namespace =
'/'.join(split)
165 if(possible_action_server != namespace):
166 possible_action_server = namespace
167 possibility = [0, 0, 0, 0, 0]
168 if possible_action_server == namespace
and topic
in action_topics:
169 possibility[action_topics.index(topic)] = 1
170 if all(p == 1
for p
in possibility):
171 action_servers.append(possible_action_server)
173 return action_servers
177 """ Returns the type of the specified ROS service, """ 179 if any_match(str(service), services_glob):
181 return rosservice_get_service_type(service)
190 """ Returns a list of node names that are publishing the specified topic """ 193 publishers, subscribers, services = Master(
'/rosbridge').getSystemState()
194 pubdict = dict(publishers)
196 return pubdict[topic]
206 """ Returns a list of node names that are subscribing to the specified topic """ 209 publishers, subscribers, services = Master(
'/rosbridge').getSystemState()
210 subdict = dict(subscribers)
212 return subdict[topic]
222 """ Returns a list of node names that are advertising a service with the specified type """ 223 _, _, services = Master(
'/rosbridge').getSystemState()
225 service_type_providers = []
226 for service, providers
in services:
229 if service_type == queried_type:
230 service_type_providers += providers
231 return service_type_providers
235 """ Returns the name of the node that is providing the given service, or empty string """ 236 node = rosservice_get_service_node(service)
243 """ Returns the name of the machine that is hosting the given service, or empty string """ 244 uri = get_service_uri(service)
249 uri = uri[:uri.find(
':')]
def get_service_host(service)
def get_service_providers(queried_type, services_glob)
def get_publishers(topic, topics_glob)
def get_node_subscriptions(node)
def get_service_type(service, services_glob)
def filter_globs(globs, full_list)
def any_match(query, globs)
def filter_action_servers(topics)
def get_service_node(service)
def get_node_publications(node)
def get_topics_for_type(type, topics_glob)
def get_topic_type(topic, topics_glob)
def get_topics_and_types(topics_glob)
def get_subscribers(topic, topics_glob)
def get_services_for_type(service_type, services_glob)
def get_services(services_glob)
def get_node_services(node)