1
2
3
4
5
6
7
8
9
10
11 import os
12 import socket
13 import errno
14 import rospy
15 import rosgraph
16 import rostopic
17 import rosservice
18 import roslib.names
19 import xmlrpclib
20 from rosmaster.util import xmlrpcapi
21 try:
22 import urllib.parse as urlparse
23 except ImportError:
24 import urlparse
25 import re
26 from gateway_msgs.msg import Rule, ConnectionType
27
28
29 import utils
30
31
32 PUBLISHER = ConnectionType.PUBLISHER
33 SUBSCRIBER = ConnectionType.SUBSCRIBER
34 SERVICE = ConnectionType.SERVICE
35 ACTION_SERVER = ConnectionType.ACTION_SERVER
36 ACTION_CLIENT = ConnectionType.ACTION_CLIENT
37
38
39
40
41
42
44
53
54 - def update(self, new_system_state=None):
55 '''
56 Currently completely regenerating the connections dictionary and then taking
57 diffs. Could be faster if we took diffs on the system state instead, but that's
58 a bit more awkward since each element has a variable list of nodes that we'd have
59 to check against to get good diffs. e.g.
60 old_publishers = ['/chatter', ['/talker']]
61 new_publishers = ['/chatter', ['/talker', '/babbler']]
62 '''
63 if new_system_state is None:
64 publishers, subscribers, services = self._get_system_state()
65 else:
66 publishers = new_system_state[PUBLISHER]
67 subscribers = new_system_state[SUBSCRIBER]
68 services = new_system_state[SERVICE]
69 action_servers, publishers, subscribers = self._get_action_servers(publishers, subscribers)
70 action_clients, publishers, subscribers = self._get_action_clients(publishers, subscribers)
71 connections = utils.create_empty_connection_type_dictionary()
72 connections[PUBLISHER] = self._get_connections_from_pub_sub_list(publishers, PUBLISHER)
73 connections[SUBSCRIBER] = self._get_connections_from_pub_sub_list(subscribers, SUBSCRIBER)
74 connections[SERVICE] = self._get_connections_from_service_list(services, SERVICE)
75 connections[ACTION_SERVER] = self._get_connections_from_action_list(action_servers, ACTION_SERVER)
76 connections[ACTION_CLIENT] = self._get_connections_from_action_list(action_clients, ACTION_CLIENT)
77
78
79 diff = lambda l1, l2: [x for x in l1 if x not in l2]
80 new_connections = utils.create_empty_connection_type_dictionary()
81 lost_connections = utils.create_empty_connection_type_dictionary()
82 for connection_type in utils.connection_types:
83 new_connections[connection_type] = diff(connections[connection_type], self._connections[connection_type])
84 lost_connections[connection_type] = diff(self._connections[connection_type], connections[connection_type])
85 self._connections = connections
86 return new_connections, lost_connections
87
89
90 available = False
91 for candidate in topic_node_list:
92 if candidate[0] == topic and node in candidate[1]:
93 available = True
94 break
95 return available
96
98 connections = []
99 for action in connection_list:
100 action_name = action[0]
101
102
103
104 nodes = action[1]
105 for node in nodes:
106
107
108
109
110 rule = Rule(connection_type, action_name, node)
111 connection = utils.Connection(rule, None, None)
112 connections.append(connection)
113 return connections
114
116 connections = []
117 for service in connection_list:
118 service_name = service[0]
119
120 nodes = service[1]
121 for node in nodes:
122
123
124
125
126 rule = Rule(connection_type, service_name, node)
127 connection = utils.Connection(rule, None, None)
128 connections.append(connection)
129 return connections
130
132 connections = []
133 for topic in connection_list:
134 topic_name = topic[0]
135
136
137 nodes = topic[1]
138 for node in nodes:
139
140
141
142
143 rule = Rule(connection_type, topic_name, node)
144 connection = utils.Connection(rule, None, None)
145 connections.append(connection)
146 return connections
147
149 '''
150 Return actions and pruned publisher, subscriber lists.
151
152 @param publishers
153 @type list of publishers in the form returned by rosgraph.Master.get_system_state
154 @param subscribers
155 @type list of subscribers in the form returned by rosgraph.Master.get_system_state
156 @return list of actions, pruned_publishers, pruned_subscribers
157 @rtype [base_topic, [nodes]], as param type, as param type
158 '''
159
160 actions = []
161 for goal_candidate in pubs:
162 if re.search('\/goal$', goal_candidate[0]):
163
164 base_topic = re.sub('\/goal$', '', goal_candidate[0])
165 nodes = goal_candidate[1]
166 action_nodes = []
167
168
169 for node in nodes:
170 is_action = True
171 is_action &= self._is_topic_node_in_list(base_topic + '/goal', node, pubs)
172 is_action &= self._is_topic_node_in_list(base_topic + '/cancel', node, pubs)
173 is_action &= self._is_topic_node_in_list(base_topic + '/status', node, subs)
174 is_action &= self._is_topic_node_in_list(base_topic + '/feedback', node, subs)
175 is_action &= self._is_topic_node_in_list(base_topic + '/result', node, subs)
176
177 if is_action:
178 action_nodes.append(node)
179
180 if len(action_nodes) != 0:
181
182 actions.append([base_topic, action_nodes])
183
184 for connection in pubs:
185 if connection[0] in [base_topic + '/goal', base_topic + '/cancel']:
186 for node in action_nodes:
187 try:
188 connection[1].remove(node)
189 except ValueError:
190 rospy.logerr("Gateway : couldn't remove an action publisher from the master connections list [%s][%s]" % (connection[0], node))
191 for connection in subs:
192 if connection[0] in [base_topic + '/status', base_topic + '/feedback', base_topic + '/result']:
193 for node in action_nodes:
194 try:
195 connection[1].remove(node)
196 except ValueError:
197 rospy.logerr("Gateway : couldn't remove an action subscriber from the master connections list [%s][%s]" % (connection[0], node))
198 pubs[:] = [connection for connection in pubs if len(connection[1]) != 0]
199 subs[:] = [connection for connection in subs if len(connection[1]) != 0]
200 return actions, pubs, subs
201
203 '''
204 Return action servers and pruned publisher, subscriber lists.
205
206 @param publishers
207 @type list of publishers in the form returned by rosgraph.Master.get_system_state
208 @param subscribers
209 @type list of subscribers in the form returned by rosgraph.Master.get_system_state
210 @return list of actions, pruned_publishers, pruned_subscribers
211 @rtype [base_topic, [nodes]], as param type, as param type
212 '''
213 actions, subs, pubs = self._get_actions(subscribers, publishers)
214 return actions, pubs, subs
215
217 '''
218 Return action clients and pruned publisher, subscriber lists.
219
220 @param publishers
221 @type list of publishers in the form returned by rosgraph.Master.get_system_state
222 @param subscribers
223 @type list of subscribers in the form returned by rosgraph.Master.get_system_state
224 @return list of actions, pruned_publishers, pruned_subscribers
225 @rtype [base_topic, [nodes]], as param type, as param type
226 '''
227 actions, pubs, subs = self._get_actions(publishers, subscribers)
228 return actions, pubs, subs
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
291 '''
292 Representing a ros master (local ros master). Just contains a
293 few utility methods for retrieving master related information as well
294 as handles for registering and unregistering rules that have
295 been pulled or flipped in from another gateway.
296 '''
297
299 rosgraph.Master.__init__(self, rospy.get_name())
300
301 self.get_system_state = self.getSystemState
302 self._connection_cache = ConnectionCache(self.get_system_state)
303
304
305
306
307
309 '''
310 Registers a rule with the local master.
311
312 @param registration : registration details
313 @type utils.Registration
314
315 @return the updated registration object (only adds an anonymously generated local node name)
316 @rtype utils.Registration
317 '''
318 registration.local_node = self._get_anonymous_node_name(registration.connection.rule.node)
319 rospy.logdebug("Gateway : registering a new node [%s] for [%s]" % (registration.local_node, registration))
320
321
322
323
324 node_master = rosgraph.Master(registration.local_node)
325 if registration.connection.rule.type == PUBLISHER:
326 node_master.registerPublisher(registration.connection.rule.name, registration.connection.type_info, registration.connection.xmlrpc_uri)
327 return registration
328 elif registration.connection.rule.type == SUBSCRIBER:
329 self._register_subscriber(node_master, registration.connection.rule.name, registration.connection.type_info, registration.connection.xmlrpc_uri)
330 return registration
331 elif registration.connection.rule.type == SERVICE:
332 if rosservice.get_service_node(registration.connection.rule.name):
333 rospy.logwarn("Gateway : tried to register a service that is already locally available, aborting [%s]" % registration.connection.rule.name)
334 return None
335 else:
336 if registration.connection.rule.name is None:
337 rospy.logerr("Gateway : tried to register a service with name set to None [%s, %s, %s]" % (registration.connection.rule.name, registration.connection.type_info, registration.connection.xmlrpc_uri))
338 if registration.connection.type_info is None:
339 rospy.logerr("Gateway : tried to register a service with type_info set to None [%s, %s, %s]" % (registration.connection.rule.name, registration.connection.type_info, registration.connection.xmlrpc_uri))
340 if registration.connection.xmlrpc_uri is None:
341 rospy.logerr("Gateway : tried to register a service with xmlrpc_uri set to None [%s, %s, %s]" % (registration.connection.rule.name, registration.connection.type_info, registration.connection.xmlrpc_uri))
342 node_master.registerService(registration.connection.rule.name, registration.connection.type_info, registration.connection.xmlrpc_uri)
343 return registration
344 elif registration.connection.rule.type == ACTION_SERVER:
345
346 self._register_subscriber(node_master, registration.connection.rule.name + "/goal", registration.connection.type_info + "ActionGoal", registration.connection.xmlrpc_uri)
347 self._register_subscriber(node_master, registration.connection.rule.name + "/cancel", "actionlib_msgs/GoalID", registration.connection.xmlrpc_uri)
348 node_master.registerPublisher(registration.connection.rule.name + "/status", "actionlib_msgs/GoalStatusArray", registration.connection.xmlrpc_uri)
349 node_master.registerPublisher(registration.connection.rule.name + "/feedback", registration.connection.type_info + "ActionFeedback", registration.connection.xmlrpc_uri)
350 node_master.registerPublisher(registration.connection.rule.name + "/result", registration.connection.type_info + "ActionResult", registration.connection.xmlrpc_uri)
351 return registration
352 elif registration.connection.rule.type == ACTION_CLIENT:
353 node_master.registerPublisher(registration.connection.rule.name + "/goal", registration.connection.type_info + "ActionGoal", registration.connection.xmlrpc_uri)
354 node_master.registerPublisher(registration.connection.rule.name + "/cancel", "actionlib_msgs/GoalID", registration.connection.xmlrpc_uri)
355 self._register_subscriber(node_master, registration.connection.rule.name + "/status", "actionlib_msgs/GoalStatusArray", registration.connection.xmlrpc_uri)
356 self._register_subscriber(node_master, registration.connection.rule.name + "/feedback", registration.connection.type_info + "ActionFeedback", registration.connection.xmlrpc_uri)
357 self._register_subscriber(node_master, registration.connection.rule.name + "/result", registration.connection.type_info + "ActionResult", registration.connection.xmlrpc_uri)
358 return registration
359 return None
360
362 '''
363 Unregisters a rule with the local master.
364
365 @param registration : registration details for an existing gateway registered rule
366 @type utils.Registration
367 '''
368 node_master = rosgraph.Master(registration.local_node)
369 rospy.logdebug("Gateway : unregistering local node [%s] for [%s]" % (registration.local_node, registration))
370 if registration.connection.rule.type == PUBLISHER:
371 node_master.unregisterPublisher(registration.connection.rule.name, registration.connection.xmlrpc_uri)
372 elif registration.connection.rule.type == SUBSCRIBER:
373 self._unregister_subscriber(node_master, registration.connection.xmlrpc_uri, registration.connection.rule.name)
374 elif registration.connection.rule.type == SERVICE:
375 node_master.unregisterService(registration.connection.rule.name, registration.connection.type_info)
376 elif registration.connection.rule.type == ACTION_SERVER:
377 self._unregister_subscriber(node_master, registration.connection.xmlrpc_uri, registration.connection.rule.name + "/goal")
378 self._unregister_subscriber(node_master, registration.connection.xmlrpc_uri, registration.connection.rule.name + "/cancel")
379 node_master.unregisterPublisher(registration.connection.rule.name + "/status", registration.connection.xmlrpc_uri)
380 node_master.unregisterPublisher(registration.connection.rule.name + "/feedback", registration.connection.xmlrpc_uri)
381 node_master.unregisterPublisher(registration.connection.rule.name + "/result", registration.connection.xmlrpc_uri)
382 elif registration.connection.rule.type == ACTION_CLIENT:
383 node_master.unregisterPublisher(registration.connection.rule.name + "/goal", registration.connection.xmlrpc_uri)
384 node_master.unregisterPublisher(registration.connection.rule.name + "/cancel", registration.connection.xmlrpc_uri)
385 self._unregister_subscriber(node_master, registration.connection.xmlrpc_uri, registration.connection.rule.name + "/status")
386 self._unregister_subscriber(node_master, registration.connection.xmlrpc_uri, registration.connection.rule.name + "/feedback")
387 self._unregister_subscriber(node_master, registration.connection.xmlrpc_uri, registration.connection.rule.name + "/result")
388
390 '''
391 This one is not necessary, since you can pretty much guarantee the
392 existence of the subscriber here, but it pays to be safe - we've seen
393 some errors come out here when the ROS_MASTER_URI was only set to
394 localhost.
395
396 @param node_master : node-master xmlrpc method handler
397 @param type_info : type of the subscriber message
398 @param xmlrpc_uri : the uri of the node (xmlrpc server)
399 @type string
400 @param name : fully resolved subscriber name
401 '''
402
403
404 pub_uri_list = node_master.registerSubscriber(name, type_info, xmlrpc_uri)
405 try:
406
407 xmlrpcapi(xmlrpc_uri).publisherUpdate('/master', name, pub_uri_list)
408 except socket.error, v:
409 errorcode = v[0]
410 if errorcode != errno.ECONNREFUSED:
411 rospy.logerr("Gateway : error registering subscriber (is ROS_MASTER_URI and ROS_HOSTNAME or ROS_IP correctly set?)")
412 rospy.logerr("Gateway : errorcode [%s] xmlrpc_uri [%s]" % (str(errorcode), xmlrpc_uri))
413 raise
414 else:
415 pass
416
418 '''
419 It is a special case as it requires xmlrpc handling to inform the subscriber of
420 the disappearance of publishers it was connected to. It also needs to handle the
421 case when that information doesn't get to the subscriber because it is down.
422
423 @param node_master : node-master xmlrpc method handler
424 @param xmlrpc_uri : the uri of the node (xmlrpc server)
425 @type string
426 @param name : fully resolved subscriber name
427 '''
428
429
430 try:
431 xmlrpcapi(xmlrpc_uri).publisherUpdate('/master', name, [])
432 except socket.error, v:
433 errorcode = v[0]
434 if errorcode != errno.ECONNREFUSED:
435 raise
436 else:
437 pass
438 except xmlrpclib.Fault:
439
440
441
442
443 pass
444 node_master.unregisterSubscriber(name, xmlrpc_uri)
445
446
447
448
449
451 '''
452 Creates all the extra details to create a connection object from a
453 rule.
454
455 @param connection_type : the connection type (one of gateway_msgs.msg.ConnectionType)
456 @type string
457 @param name : the name of the connection
458 @type string
459 @param node : the master node name it comes from
460 @param string
461
462 @return the utils.Connection object complete with type_info and xmlrpc_uri
463 @type utils.Connection
464 '''
465 xmlrpc_uri = self.lookupNode(node)
466 connections = []
467 if connection_type == PUBLISHER or connection_type == SUBSCRIBER:
468 type_info = rostopic.get_topic_type(name)[0]
469 connections.append(utils.Connection(Rule(connection_type, name, node), type_info, xmlrpc_uri))
470 elif connection_type == SERVICE:
471 type_info = rosservice.get_service_uri(name)
472 connections.append(utils.Connection(Rule(connection_type, name, node), type_info, xmlrpc_uri))
473 elif connection_type == ACTION_SERVER:
474 type_info = rostopic.get_topic_type(name + '/goal')[0]
475 connections.append(utils.Connection(Rule(SUBSCRIBER, name + '/goal', node), type_info, xmlrpc_uri))
476 type_info = rostopic.get_topic_type(name + '/cancel')[0]
477 connections.append(utils.Connection(Rule(SUBSCRIBER, name + '/cancel', node), type_info, xmlrpc_uri))
478 type_info = rostopic.get_topic_type(name + '/status')[0]
479 connections.append(utils.Connection(Rule(PUBLISHER, name + '/status', node), type_info, xmlrpc_uri))
480 type_info = rostopic.get_topic_type(name + '/feedback')[0]
481 connections.append(utils.Connection(Rule(PUBLISHER, name + '/feedback', node), type_info, xmlrpc_uri))
482 type_info = rostopic.get_topic_type(name + '/result')[0]
483 connections.append(utils.Connection(Rule(PUBLISHER, name + '/result', node), type_info, xmlrpc_uri))
484 elif connection_type == ACTION_CLIENT:
485 type_info = rostopic.get_topic_type(name + '/goal')[0]
486 connections.append(utils.Connection(Rule(PUBLISHER, name + '/goal', node), type_info, xmlrpc_uri))
487 type_info = rostopic.get_topic_type(name + '/cancel')[0]
488 connections.append(utils.Connection(Rule(PUBLISHER, name + '/cancel', node), type_info, xmlrpc_uri))
489 type_info = rostopic.get_topic_type(name + '/status')[0]
490 connections.append(utils.Connection(Rule(SUBSCRIBER, name + '/status', node), type_info, xmlrpc_uri))
491 type_info = rostopic.get_topic_type(name + '/feedback')[0]
492 connections.append(utils.Connection(Rule(SUBSCRIBER, name + '/feedback', node), type_info, xmlrpc_uri))
493 type_info = rostopic.get_topic_type(name + '/result')[0]
494 connections.append(utils.Connection(Rule(SUBSCRIBER, name + '/result', node), type_info, xmlrpc_uri))
495 return connections
496
498 '''
499 Creates all the extra details to create a connection object from an
500 advertisement rule. This is a bit different to the previous one - we just need
501 the type and single node uri that everything originates from (don't need to generate all
502 the pub/sub connections themselves.
503
504 Probably flips could be merged into this sometime, but it'd be a bit gnarly.
505
506 @param connection_type : the connection type (one of gateway_msgs.msg.ConnectionType)
507 @type string
508 @param name : the name of the connection
509 @type string
510 @param node : the master node name it comes from
511 @param string
512
513 @return the utils.Connection object complete with type_info and xmlrpc_uri
514 @type utils.Connection
515 '''
516 xmlrpc_uri = self.lookupNode(node)
517 if connection_type == PUBLISHER or connection_type == SUBSCRIBER:
518 type_info = rostopic.get_topic_type(name)[0]
519 connection = utils.Connection(Rule(connection_type, name, node), type_info, xmlrpc_uri)
520 elif connection_type == SERVICE:
521 type_info = rosservice.get_service_uri(name)
522 connection = utils.Connection(Rule(connection_type, name, node), type_info, xmlrpc_uri)
523 elif connection_type == ACTION_SERVER or connection_type == ACTION_CLIENT:
524 goal_topic = name + '/goal'
525 goal_topic_type = rostopic.get_topic_type(goal_topic)
526 type_info = re.sub('ActionGoal$', '', goal_topic_type[0])
527 connection = utils.Connection(Rule(connection_type, name, node), type_info, xmlrpc_uri)
528 return connection
529
531 o = urlparse.urlparse(rosgraph.get_master_uri())
532 if o.hostname == 'localhost':
533 ros_ip = ''
534 try:
535 ros_ip = os.environ['ROS_IP']
536 except Exception:
537 try:
538
539 ros_ip = os.environ['ROS_HOSTNAME']
540 except Exception:
541
542 rospy.logwarn("Gateway: no valid ip found for this host, just setting 'localhost'")
543 return 'localhost'
544 return ros_ip
545 else:
546 return o.hostname
547
549 unused_new_connections, unused_lost_connections = self._connection_cache.update()
550
551
552 return self._connection_cache._connections
553
555 t = topic[1:len(topic)]
556 name = roslib.names.anonymous_name(t)
557 return name
558
559
560
561
562
564 '''
565 Assists a script to find the (hopefully) unique gateway namespace.
566 Note that unique is a necessary condition, there should only be one
567 gateway per ros system.
568
569 @return Namespace of the gateway node.
570 @rtype string
571 '''
572 unused_publishers, unused_subscribers, services = self.get_system_state()
573 for service in services:
574 service_name = service[0]
575 if re.search(r'remote_gateway_info', service_name):
576 if service_name == '/remote_gateway_info':
577 return "/"
578 else:
579 return re.sub(r'/remote_gateway_info', '', service_name)
580 return None
581