36 from rosservice
import get_service_list
37 from rosservice
import get_service_type
as rosservice_get_service_type
38 from rosservice
import get_service_node
as rosservice_get_service_node
39 from rosservice
import get_service_uri
40 from rosservice
import rosservice_find
41 from rostopic
import find_by_type
42 from rostopic
import get_topic_type
as rosservice_get_topic_type
43 from rosnode
import get_node_names
44 from rosgraph.masterapi
import Master
46 from rosapi.msg
import TypeDef
48 from .glob_helper
import filter_globs, any_match
52 """ Returns a list of all the active topics in the ROS system """
57 master = Master(
'/rosbridge')
58 topic_types = master.getTopicTypes()
59 publishers, subscribers, services = master.getSystemState()
60 topics = set([x
for x, _
in publishers] + [x
for x, _
in subscribers])
64 topic_types = [x
for x
in topic_types
if x[0]
in topics]
67 unknown_type = topics.difference([x
for x, _
in topic_types])
68 topic_types.extend([[x,
'']
for x
in unknown_type])
70 return zip(*topic_types)
83 """ Returns a list of all the services advertised in the ROS system """
89 """ Returns a list of services as specific service type """
91 return filter_globs(services_glob, rosservice_find(service_type))
95 """ Returns a list of all the nodes registered in the ROS system """
96 return get_node_names()
100 """ Returns a list of topic names that are been published by the specified node """
102 publishers, subscribers, services = Master(
'/rosbridge').getSystemState()
104 for i, v
in publishers:
114 """ Returns a list of topic names that are been subscribed by the specified node """
116 publishers, subscribers, services = Master(
'/rosbridge').getSystemState()
118 for i, v
in subscribers:
128 """ Returns a list of service names that are been hosted by the specified node """
130 publishers, subscribers, services = Master(
'/rosbridge').getSystemState()
132 for i, v
in services:
142 """ Returns the type of the specified ROS topic """
147 topic_type, _, _ = rosservice_get_topic_type(topic)
148 if topic_type
is None:
158 """ Returns a list of action servers """
160 possible_action_server =
''
161 possibility = [0, 0, 0, 0, 0]
163 action_topics = [
'cancel',
'feedback',
'goal',
'result',
'status']
164 for topic
in sorted(topics):
165 split = topic.split(
'/')
168 namespace =
'/'.join(split)
169 if(possible_action_server != namespace):
170 possible_action_server = namespace
171 possibility = [0, 0, 0, 0, 0]
172 if possible_action_server == namespace
and topic
in action_topics:
173 possibility[action_topics.index(topic)] = 1
174 if all(p == 1
for p
in possibility):
175 action_servers.append(possible_action_server)
177 return action_servers
181 """ Returns the type of the specified ROS service, """
183 if any_match(str(service), services_glob):
185 return rosservice_get_service_type(service)
194 """ Returns a list of node names that are publishing the specified topic """
197 publishers, subscribers, services = Master(
'/rosbridge').getSystemState()
198 pubdict = dict(publishers)
200 return pubdict[topic]
210 """ Returns a list of node names that are subscribing to the specified topic """
213 publishers, subscribers, services = Master(
'/rosbridge').getSystemState()
214 subdict = dict(subscribers)
216 return subdict[topic]
226 """ Returns a list of node names that are advertising a service with the specified type """
227 _, _, services = Master(
'/rosbridge').getSystemState()
229 service_type_providers = []
230 for service, providers
in services:
233 if service_type == queried_type:
234 service_type_providers += providers
235 return service_type_providers
239 """ Returns the name of the node that is providing the given service, or empty string """
240 node = rosservice_get_service_node(service)
247 """ Returns the name of the machine that is hosting the given service, or empty string """
248 uri = get_service_uri(service)
253 uri = uri[:uri.find(
':')]