proxy.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2012, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 
34 import fnmatch
35 import socket
36 from rosservice import get_service_list
37 from rosservice import get_service_type as rosservice_get_service_type
38 from rosservice import get_service_node as rosservice_get_service_node
39 from rosservice import get_service_uri
40 from rosservice import rosservice_find
41 from rostopic import find_by_type
42 from rostopic import get_topic_type as rosservice_get_topic_type
43 from rosnode import get_node_names
44 from rosgraph.masterapi import Master
45 
46 from rosapi.msg import TypeDef
47 
48 from .glob_helper import filter_globs, any_match
49 
50 
51 def get_topics_and_types(topics_glob):
52  """ Returns a list of all the active topics in the ROS system """
53  try:
54  # Function getTopicTypes also returns inactive topics and does not
55  # return topics with unknown message types, so it must be compared
56  # to results from getSystemState.
57  master = Master('/rosbridge')
58  topic_types = master.getTopicTypes()
59  publishers, subscribers, services = master.getSystemState()
60  topics = set([x for x, _ in publishers] + [x for x, _ in subscribers])
61 
62  # Filter the list of topics by whether they are public.
63  topics = set(filter_globs(topics_glob, topics))
64  topic_types = [x for x in topic_types if x[0] in topics]
65 
66  # Add topics with unknown type messages.
67  unknown_type = topics.difference([x for x, _ in topic_types])
68  topic_types.extend([[x, ''] for x in unknown_type])
69  if topic_types:
70  return zip(*topic_types)
71  else:
72  return [], []
73  except:
74  return [], []
75 
76 
77 def get_topics_for_type(type, topics_glob):
78  # Filter the list of topics by whether they are public before returning.
79  return filter_globs(topics_glob, find_by_type(type))
80 
81 
82 def get_services(services_glob):
83  """ Returns a list of all the services advertised in the ROS system """
84  # Filter the list of services by whether they are public before returning.
85  return filter_globs(services_glob, get_service_list())
86 
87 
88 def get_services_for_type(service_type, services_glob):
89  """ Returns a list of services as specific service type """
90  # Filter the list of services by whether they are public before returning.
91  return filter_globs(services_glob, rosservice_find(service_type))
92 
93 
94 def get_nodes():
95  """ Returns a list of all the nodes registered in the ROS system """
96  return get_node_names()
97 
98 
100  """ Returns a list of topic names that are been published by the specified node """
101  try:
102  publishers, subscribers, services = Master('/rosbridge').getSystemState()
103  toReturn = []
104  for i, v in publishers:
105  if node in v:
106  toReturn.append(i)
107  toReturn.sort()
108  return toReturn
109  except socket.error:
110  return []
111 
112 
114  """ Returns a list of topic names that are been subscribed by the specified node """
115  try:
116  publishers, subscribers, services = Master('/rosbridge').getSystemState()
117  toReturn = []
118  for i, v in subscribers:
119  if node in v:
120  toReturn.append(i)
121  toReturn.sort()
122  return toReturn
123  except socket.error:
124  return []
125 
126 
128  """ Returns a list of service names that are been hosted by the specified node """
129  try:
130  publishers, subscribers, services = Master('/rosbridge').getSystemState()
131  toReturn = []
132  for i, v in services:
133  if node in v:
134  toReturn.append(i)
135  toReturn.sort()
136  return toReturn
137  except socket.error:
138  return []
139 
140 
141 def get_topic_type(topic, topics_glob):
142  """ Returns the type of the specified ROS topic """
143  # Check if the topic is hidden or public.
144  # If all topics are public then the type is returned
145  if any_match(str(topic), topics_glob):
146  # If the topic is published, return its type
147  topic_type, _, _ = rosservice_get_topic_type(topic)
148  if topic_type is None:
149  # Topic isn't published so return an empty string
150  return ""
151  return topic_type
152  else:
153  # Topic is hidden so return an empty string
154  return ""
155 
156 
158  """ Returns a list of action servers """
159  action_servers = []
160  possible_action_server = ''
161  possibility = [0, 0, 0, 0, 0]
162 
163  action_topics = ['cancel', 'feedback', 'goal', 'result', 'status']
164  for topic in sorted(topics):
165  split = topic.split('/')
166  if(len(split) >= 3):
167  topic = split.pop()
168  namespace = '/'.join(split)
169  if(possible_action_server != namespace):
170  possible_action_server = namespace
171  possibility = [0, 0, 0, 0, 0]
172  if possible_action_server == namespace and topic in action_topics:
173  possibility[action_topics.index(topic)] = 1
174  if all(p == 1 for p in possibility):
175  action_servers.append(possible_action_server)
176 
177  return action_servers
178 
179 
180 def get_service_type(service, services_glob):
181  """ Returns the type of the specified ROS service, """
182  # Check if the service is hidden or public.
183  if any_match(str(service), services_glob):
184  try:
185  return rosservice_get_service_type(service)
186  except:
187  return ""
188  else:
189  # Service is hidden so return an empty string.
190  return ""
191 
192 
193 def get_publishers(topic, topics_glob):
194  """ Returns a list of node names that are publishing the specified topic """
195  try:
196  if any_match(str(topic), topics_glob):
197  publishers, subscribers, services = Master('/rosbridge').getSystemState()
198  pubdict = dict(publishers)
199  if topic in pubdict:
200  return pubdict[topic]
201  else:
202  return []
203  else:
204  return []
205  except socket.error:
206  return []
207 
208 
209 def get_subscribers(topic, topics_glob):
210  """ Returns a list of node names that are subscribing to the specified topic """
211  try:
212  if any_match(str(topic), topics_glob):
213  publishers, subscribers, services = Master('/rosbridge').getSystemState()
214  subdict = dict(subscribers)
215  if topic in subdict:
216  return subdict[topic]
217  else:
218  return []
219  else:
220  return []
221  except socket.error:
222  return []
223 
224 
225 def get_service_providers(queried_type, services_glob):
226  """ Returns a list of node names that are advertising a service with the specified type """
227  _, _, services = Master('/rosbridge').getSystemState()
228 
229  service_type_providers = []
230  for service, providers in services:
231  service_type = get_service_type(service, services_glob)
232 
233  if service_type == queried_type:
234  service_type_providers += providers
235  return service_type_providers
236 
237 
238 def get_service_node(service):
239  """ Returns the name of the node that is providing the given service, or empty string """
240  node = rosservice_get_service_node(service)
241  if node == None:
242  node = ""
243  return node
244 
245 
246 def get_service_host(service):
247  """ Returns the name of the machine that is hosting the given service, or empty string """
248  uri = get_service_uri(service)
249  if uri == None:
250  uri = ""
251  else:
252  uri = uri[9:]
253  uri = uri[:uri.find(':')]
254  return uri
rosapi.proxy.get_node_subscriptions
def get_node_subscriptions(node)
Definition: proxy.py:113
rosapi.proxy.get_service_type
def get_service_type(service, services_glob)
Definition: proxy.py:180
rosapi.proxy.filter_action_servers
def filter_action_servers(topics)
Definition: proxy.py:157
rosapi.proxy.get_subscribers
def get_subscribers(topic, topics_glob)
Definition: proxy.py:209
rosapi.glob_helper.filter_globs
def filter_globs(globs, full_list)
Definition: glob_helper.py:30
rosapi.proxy.get_services_for_type
def get_services_for_type(service_type, services_glob)
Definition: proxy.py:88
rosapi.proxy.get_topics_for_type
def get_topics_for_type(type, topics_glob)
Definition: proxy.py:77
rosapi.proxy.get_publishers
def get_publishers(topic, topics_glob)
Definition: proxy.py:193
rosapi.proxy.get_topic_type
def get_topic_type(topic, topics_glob)
Definition: proxy.py:141
rosapi.proxy.get_topics_and_types
def get_topics_and_types(topics_glob)
Definition: proxy.py:51
rosapi.proxy.get_node_publications
def get_node_publications(node)
Definition: proxy.py:99
rosapi.glob_helper.any_match
def any_match(query, globs)
Definition: glob_helper.py:38
rosapi.proxy.get_service_host
def get_service_host(service)
Definition: proxy.py:246
rosapi.proxy.get_service_providers
def get_service_providers(queried_type, services_glob)
Definition: proxy.py:225
rosapi.proxy.get_services
def get_services(services_glob)
Definition: proxy.py:82
rosapi.proxy.get_service_node
def get_service_node(service)
Definition: proxy.py:238
rosapi.proxy.get_node_services
def get_node_services(node)
Definition: proxy.py:127
rosapi.proxy.get_nodes
def get_nodes()
Definition: proxy.py:94


rosapi
Author(s): Jonathan Mace
autogenerated on Wed Feb 12 2025 03:14:55