1
2
3
4
5
6
7
8
9
10
11 import os
12 import socket
13 import errno
14 import rospy
15 import rosgraph
16 import rostopic
17 import rosservice
18 import roslib.names
19 from rosmaster.util import xmlrpcapi
20 try:
21 import urllib.parse as urlparse
22 except ImportError:
23 import urlparse
24 import re
25 from gateway_msgs.msg import Rule, ConnectionType
26
27
28 import utils
29
30
31 PUBLISHER = ConnectionType.PUBLISHER
32 SUBSCRIBER = ConnectionType.SUBSCRIBER
33 SERVICE = ConnectionType.SERVICE
34 ACTION_SERVER = ConnectionType.ACTION_SERVER
35 ACTION_CLIENT = ConnectionType.ACTION_CLIENT
36
37
38
39
40
41
43
52
53 - def update(self, new_system_state=None):
54 '''
55 Currently completely regenerating the connections dictionary and then taking
56 diffs. Could be faster if we took diffs on the system state instead, but that's
57 a bit more awkward since each element has a variable list of nodes that we'd have
58 to check against to get good diffs. e.g.
59 old_publishers = ['/chatter', ['/talker']]
60 new_publishers = ['/chatter', ['/talker', '/babbler']]
61 '''
62 if new_system_state is None:
63 publishers, subscribers, services = self._get_system_state()
64 else:
65 publishers = new_system_state[PUBLISHER]
66 subscribers = new_system_state[SUBSCRIBER]
67 services = new_system_state[SERVICE]
68 action_servers, publishers, subscribers = self._get_action_servers(publishers, subscribers)
69 action_clients, publishers, subscribers = self._get_action_clients(publishers, subscribers)
70 connections = utils.create_empty_connection_type_dictionary()
71 connections[PUBLISHER] = self._get_connections_from_pub_sub_list(publishers, PUBLISHER)
72 connections[SUBSCRIBER] = self._get_connections_from_pub_sub_list(subscribers, SUBSCRIBER)
73 connections[SERVICE] = self._get_connections_from_service_list(services, SERVICE)
74 connections[ACTION_SERVER] = self._get_connections_from_action_list(action_servers, ACTION_SERVER)
75 connections[ACTION_CLIENT] = self._get_connections_from_action_list(action_clients, ACTION_CLIENT)
76
77
78 diff = lambda l1, l2: [x for x in l1 if x not in l2]
79 new_connections = utils.create_empty_connection_type_dictionary()
80 lost_connections = utils.create_empty_connection_type_dictionary()
81 for connection_type in utils.connection_types:
82 new_connections[connection_type] = diff(connections[connection_type], self._connections[connection_type])
83 lost_connections[connection_type] = diff(self._connections[connection_type], connections[connection_type])
84 self._connections = connections
85 return new_connections, lost_connections
86
88
89 available = False
90 for candidate in topic_node_list:
91 if candidate[0] == topic and node in candidate[1]:
92 available = True
93 break
94 return available
95
97 connections = []
98 for action in connection_list:
99 action_name = action[0]
100
101
102
103 nodes = action[1]
104 for node in nodes:
105
106
107
108
109 rule = Rule(connection_type, action_name, node)
110 connection = utils.Connection(rule, None, None)
111 connections.append(connection)
112 return connections
113
115 connections = []
116 for service in connection_list:
117 service_name = service[0]
118
119 nodes = service[1]
120 for node in nodes:
121
122
123
124
125 rule = Rule(connection_type, service_name, node)
126 connection = utils.Connection(rule, None, None)
127 connections.append(connection)
128 return connections
129
131 connections = []
132 for topic in connection_list:
133 topic_name = topic[0]
134
135
136 nodes = topic[1]
137 for node in nodes:
138
139
140
141
142 rule = Rule(connection_type, topic_name, node)
143 connection = utils.Connection(rule, None, None)
144 connections.append(connection)
145 return connections
146
148 '''
149 Return actions and pruned publisher, subscriber lists.
150
151 @param publishers
152 @type list of publishers in the form returned by rosgraph.Master.get_system_state
153 @param subscribers
154 @type list of subscribers in the form returned by rosgraph.Master.get_system_state
155 @return list of actions, pruned_publishers, pruned_subscribers
156 @rtype [base_topic, [nodes]], as param type, as param type
157 '''
158
159 actions = []
160 for goal_candidate in pubs:
161 if re.search('\/goal$', goal_candidate[0]):
162
163 base_topic = re.sub('\/goal$', '', goal_candidate[0])
164 nodes = goal_candidate[1]
165 action_nodes = []
166
167
168 for node in nodes:
169 is_action = True
170 is_action &= self._is_topic_node_in_list(base_topic + '/goal', node, pubs)
171 is_action &= self._is_topic_node_in_list(base_topic + '/cancel', node, pubs)
172 is_action &= self._is_topic_node_in_list(base_topic + '/status', node, subs)
173 is_action &= self._is_topic_node_in_list(base_topic + '/feedback', node, subs)
174 is_action &= self._is_topic_node_in_list(base_topic + '/result', node, subs)
175
176 if is_action:
177 action_nodes.append(node)
178
179 if len(action_nodes) != 0:
180
181 actions.append([base_topic, action_nodes])
182
183 for connection in pubs:
184 if connection[0] in [base_topic + '/goal', base_topic + '/cancel']:
185 for node in action_nodes:
186 try:
187 connection[1].remove(node)
188 except ValueError:
189 rospy.logerr("Gateway : couldn't remove an action publisher from the master connections list [%s][%s]" % (connection[0], node))
190 for connection in subs:
191 if connection[0] in [base_topic + '/status', base_topic + '/feedback', base_topic + '/result']:
192 for node in action_nodes:
193 try:
194 connection[1].remove(node)
195 except ValueError:
196 rospy.logerr("Gateway : couldn't remove an action subscriber from the master connections list [%s][%s]" % (connection[0], node))
197 pubs[:] = [connection for connection in pubs if len(connection[1]) != 0]
198 subs[:] = [connection for connection in subs if len(connection[1]) != 0]
199 return actions, pubs, subs
200
202 '''
203 Return action servers and pruned publisher, subscriber lists.
204
205 @param publishers
206 @type list of publishers in the form returned by rosgraph.Master.get_system_state
207 @param subscribers
208 @type list of subscribers in the form returned by rosgraph.Master.get_system_state
209 @return list of actions, pruned_publishers, pruned_subscribers
210 @rtype [base_topic, [nodes]], as param type, as param type
211 '''
212 actions, subs, pubs = self._get_actions(subscribers, publishers)
213 return actions, pubs, subs
214
216 '''
217 Return action clients and pruned publisher, subscriber lists.
218
219 @param publishers
220 @type list of publishers in the form returned by rosgraph.Master.get_system_state
221 @param subscribers
222 @type list of subscribers in the form returned by rosgraph.Master.get_system_state
223 @return list of actions, pruned_publishers, pruned_subscribers
224 @rtype [base_topic, [nodes]], as param type, as param type
225 '''
226 actions, pubs, subs = self._get_actions(publishers, subscribers)
227 return actions, pubs, subs
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
290 '''
291 Representing a ros master (local ros master). Just contains a
292 few utility methods for retrieving master related information as well
293 as handles for registering and unregistering rules that have
294 been pulled or flipped in from another gateway.
295 '''
296
298 rosgraph.Master.__init__(self, rospy.get_name())
299
300 self.get_system_state = self.getSystemState
301 self._connection_cache = ConnectionCache(self.get_system_state)
302
303
304
305
306
308 '''
309 Registers a rule with the local master.
310
311 @param registration : registration details
312 @type utils.Registration
313
314 @return the updated registration object (only adds an anonymously generated local node name)
315 @rtype utils.Registration
316 '''
317 registration.local_node = self._get_anonymous_node_name(registration.connection.rule.node)
318 rospy.logdebug("Gateway : registering a new node [%s] for [%s]" % (registration.local_node, registration))
319
320
321
322
323 node_master = rosgraph.Master(registration.local_node)
324 if registration.connection.rule.type == PUBLISHER:
325 node_master.registerPublisher(registration.connection.rule.name, registration.connection.type_info, registration.connection.xmlrpc_uri)
326 return registration
327 elif registration.connection.rule.type == SUBSCRIBER:
328 self._register_subscriber(node_master, registration.connection.rule.name, registration.connection.type_info, registration.connection.xmlrpc_uri)
329 return registration
330 elif registration.connection.rule.type == SERVICE:
331 if rosservice.get_service_node(registration.connection.rule.name):
332 rospy.logwarn("Gateway : tried to register a service that is already locally available, aborting [%s]" % registration.connection.rule.name)
333 return None
334 else:
335 node_master.registerService(registration.connection.rule.name, registration.connection.type_info, registration.connection.xmlrpc_uri)
336 return registration
337 elif registration.connection.rule.type == ACTION_SERVER:
338
339 self._register_subscriber(node_master, registration.connection.rule.name + "/goal", registration.connection.type_info + "ActionGoal", registration.connection.xmlrpc_uri)
340 self._register_subscriber(node_master, registration.connection.rule.name + "/cancel", "actionlib_msgs/GoalID", registration.connection.xmlrpc_uri)
341 node_master.registerPublisher(registration.connection.rule.name + "/status", "actionlib_msgs/GoalStatusArray", registration.connection.xmlrpc_uri)
342 node_master.registerPublisher(registration.connection.rule.name + "/feedback", registration.connection.type_info + "ActionFeedback", registration.connection.xmlrpc_uri)
343 node_master.registerPublisher(registration.connection.rule.name + "/result", registration.connection.type_info + "ActionResult", registration.connection.xmlrpc_uri)
344 return registration
345 elif registration.connection.rule.type == ACTION_CLIENT:
346 node_master.registerPublisher(registration.connection.rule.name + "/goal", registration.connection.type_info + "ActionGoal", registration.connection.xmlrpc_uri)
347 node_master.registerPublisher(registration.connection.rule.name + "/cancel", "actionlib_msgs/GoalID", registration.connection.xmlrpc_uri)
348 self._register_subscriber(node_master, registration.connection.rule.name + "/status", "actionlib_msgs/GoalStatusArray", registration.connection.xmlrpc_uri)
349 self._register_subscriber(node_master, registration.connection.rule.name + "/feedback", registration.connection.type_info + "ActionFeedback", registration.connection.xmlrpc_uri)
350 self._register_subscriber(node_master, registration.connection.rule.name + "/result", registration.connection.type_info + "ActionResult", registration.connection.xmlrpc_uri)
351 return registration
352 return None
353
355 '''
356 Unregisters a rule with the local master.
357
358 @param registration : registration details for an existing gateway registered rule
359 @type utils.Registration
360 '''
361 node_master = rosgraph.Master(registration.local_node)
362 rospy.logdebug("Gateway : unregistering local node [%s] for [%s]" % (registration.local_node, registration))
363 if registration.connection.rule.type == PUBLISHER:
364 node_master.unregisterPublisher(registration.connection.rule.name, registration.connection.xmlrpc_uri)
365 elif registration.connection.rule.type == SUBSCRIBER:
366 self._unregister_subscriber(node_master, registration.connection.xmlrpc_uri, registration.connection.rule.name)
367 elif registration.connection.rule.type == SERVICE:
368 node_master.unregisterService(registration.connection.rule.name, registration.connection.type_info)
369 elif registration.connection.rule.type == ACTION_SERVER:
370 self._unregister_subscriber(node_master, registration.connection.xmlrpc_uri, registration.connection.rule.name + "/goal")
371 self._unregister_subscriber(node_master, registration.connection.xmlrpc_uri, registration.connection.rule.name + "/cancel")
372 node_master.unregisterPublisher(registration.connection.rule.name + "/status", registration.connection.xmlrpc_uri)
373 node_master.unregisterPublisher(registration.connection.rule.name + "/feedback", registration.connection.xmlrpc_uri)
374 node_master.unregisterPublisher(registration.connection.rule.name + "/result", registration.connection.xmlrpc_uri)
375 elif registration.connection.rule.type == ACTION_CLIENT:
376 node_master.unregisterPublisher(registration.connection.rule.name + "/goal", registration.connection.xmlrpc_uri)
377 node_master.unregisterPublisher(registration.connection.rule.name + "/cancel", registration.connection.xmlrpc_uri)
378 self._unregister_subscriber(node_master, registration.connection.xmlrpc_uri, registration.connection.rule.name + "/status")
379 self._unregister_subscriber(node_master, registration.connection.xmlrpc_uri, registration.connection.rule.name + "/feedback")
380 self._unregister_subscriber(node_master, registration.connection.xmlrpc_uri, registration.connection.rule.name + "/result")
381
383 '''
384 This one is not necessary, since you can pretty much guarantee the
385 existence of the subscriber here, but it pays to be safe - we've seen
386 some errors come out here when the ROS_MASTER_URI was only set to
387 localhost.
388
389 @param node_master : node-master xmlrpc method handler
390 @param type_info : type of the subscriber message
391 @param xmlrpc_uri : the uri of the node (xmlrpc server)
392 @type string
393 @param name : fully resolved subscriber name
394 '''
395
396
397 pub_uri_list = node_master.registerSubscriber(name, type_info, xmlrpc_uri)
398 try:
399
400 xmlrpcapi(xmlrpc_uri).publisherUpdate('/master', name, pub_uri_list)
401 except socket.error, v:
402 errorcode = v[0]
403 if errorcode != errno.ECONNREFUSED:
404 rospy.logerr("Gateway : error registering subscriber (is ROS_MASTER_URI and ROS_HOSTNAME or ROS_IP correctly set?)")
405 rospy.logerr("Gateway : errorcode [%s] xmlrpc_uri [%s]" % (str(errorcode), xmlrpc_uri))
406 raise
407 else:
408 pass
409
411 '''
412 It is a special case as it requires xmlrpc handling to inform the subscriber of
413 the disappearance of publishers it was connected to. It also needs to handle the
414 case when that information doesn't get to the subscriber because it is down.
415
416 @param node_master : node-master xmlrpc method handler
417 @param xmlrpc_uri : the uri of the node (xmlrpc server)
418 @type string
419 @param name : fully resolved subscriber name
420 '''
421
422
423 try:
424 xmlrpcapi(xmlrpc_uri).publisherUpdate('/master', name, [])
425 except socket.error, v:
426 errorcode = v[0]
427 if errorcode != errno.ECONNREFUSED:
428 raise
429 else:
430 pass
431 node_master.unregisterSubscriber(name, xmlrpc_uri)
432
433
434
435
436
438 '''
439 Creates all the extra details to create a connection object from a
440 rule.
441
442 @param connection_type : the connection type (one of gateway_msgs.msg.ConnectionType)
443 @type string
444 @param name : the name of the connection
445 @type string
446 @param node : the master node name it comes from
447 @param string
448
449 @return the utils.Connection object complete with type_info and xmlrpc_uri
450 @type utils.Connection
451 '''
452 xmlrpc_uri = self.lookupNode(node)
453 connections = []
454 if connection_type == PUBLISHER or connection_type == SUBSCRIBER:
455 type_info = rostopic.get_topic_type(name)[0]
456 connections.append(utils.Connection(Rule(connection_type, name, node), type_info, xmlrpc_uri))
457 elif connection_type == SERVICE:
458 type_info = rosservice.get_service_uri(name)
459 connections.append(utils.Connection(Rule(connection_type, name, node), type_info, xmlrpc_uri))
460 elif connection_type == ACTION_SERVER:
461 type_info = rostopic.get_topic_type(name + '/goal')[0]
462 connections.append(utils.Connection(Rule(SUBSCRIBER, name + '/goal', node), type_info, xmlrpc_uri))
463 type_info = rostopic.get_topic_type(name + '/cancel')[0]
464 connections.append(utils.Connection(Rule(SUBSCRIBER, name + '/cancel', node), type_info, xmlrpc_uri))
465 type_info = rostopic.get_topic_type(name + '/status')[0]
466 connections.append(utils.Connection(Rule(PUBLISHER, name + '/status', node), type_info, xmlrpc_uri))
467 type_info = rostopic.get_topic_type(name + '/feedback')[0]
468 connections.append(utils.Connection(Rule(PUBLISHER, name + '/feedback', node), type_info, xmlrpc_uri))
469 type_info = rostopic.get_topic_type(name + '/result')[0]
470 connections.append(utils.Connection(Rule(PUBLISHER, name + '/result', node), type_info, xmlrpc_uri))
471 elif connection_type == ACTION_CLIENT:
472 type_info = rostopic.get_topic_type(name + '/goal')[0]
473 connections.append(utils.Connection(Rule(PUBLISHER, name + '/goal', node), type_info, xmlrpc_uri))
474 type_info = rostopic.get_topic_type(name + '/cancel')[0]
475 connections.append(utils.Connection(Rule(PUBLISHER, name + '/cancel', node), type_info, xmlrpc_uri))
476 type_info = rostopic.get_topic_type(name + '/status')[0]
477 connections.append(utils.Connection(Rule(SUBSCRIBER, name + '/status', node), type_info, xmlrpc_uri))
478 type_info = rostopic.get_topic_type(name + '/feedback')[0]
479 connections.append(utils.Connection(Rule(SUBSCRIBER, name + '/feedback', node), type_info, xmlrpc_uri))
480 type_info = rostopic.get_topic_type(name + '/result')[0]
481 connections.append(utils.Connection(Rule(SUBSCRIBER, name + '/result', node), type_info, xmlrpc_uri))
482 return connections
483
485 '''
486 Creates all the extra details to create a connection object from an
487 advertisement rule. This is a bit different to the previous one - we just need
488 the type and single node uri that everything originates from (don't need to generate all
489 the pub/sub connections themselves.
490
491 Probably flips could be merged into this sometime, but it'd be a bit gnarly.
492
493 @param connection_type : the connection type (one of gateway_msgs.msg.ConnectionType)
494 @type string
495 @param name : the name of the connection
496 @type string
497 @param node : the master node name it comes from
498 @param string
499
500 @return the utils.Connection object complete with type_info and xmlrpc_uri
501 @type utils.Connection
502 '''
503 xmlrpc_uri = self.lookupNode(node)
504 if connection_type == PUBLISHER or connection_type == SUBSCRIBER:
505 type_info = rostopic.get_topic_type(name)[0]
506 connection = utils.Connection(Rule(connection_type, name, node), type_info, xmlrpc_uri)
507 elif connection_type == SERVICE:
508 type_info = rosservice.get_service_uri(name)
509 connection = utils.Connection(Rule(connection_type, name, node), type_info, xmlrpc_uri)
510 elif connection_type == ACTION_SERVER or connection_type == ACTION_CLIENT:
511 goal_topic = name + '/goal'
512 goal_topic_type = rostopic.get_topic_type(goal_topic)
513 type_info = re.sub('ActionGoal$', '', goal_topic_type[0])
514 connection = utils.Connection(Rule(connection_type, name, node), type_info, xmlrpc_uri)
515 return connection
516
518 o = urlparse.urlparse(rosgraph.get_master_uri())
519 if o.hostname == 'localhost':
520 ros_ip = ''
521 try:
522 ros_ip = os.environ['ROS_IP']
523 except Exception:
524 try:
525
526 ros_ip = os.environ['ROS_HOSTNAME']
527 except Exception:
528
529 rospy.logwarn("Gateway: no valid ip found for this host, just setting 'localhost'")
530 return 'localhost'
531 return ros_ip
532 else:
533 return o.hostname
534
536 unused_new_connections, unused_lost_connections = self._connection_cache.update()
537
538
539 return self._connection_cache._connections
540
542 t = topic[1:len(topic)]
543 name = roslib.names.anonymous_name(t)
544 return name
545
546
547
548
549
551 '''
552 Assists a script to find the (hopefully) unique gateway namespace.
553 Note that unique is a necessary condition, there should only be one
554 gateway per ros system.
555
556 @return Namespace of the gateway node.
557 @rtype string
558 '''
559 unused_publishers, unused_subscribers, services = self.get_system_state()
560 for service in services:
561 service_name = service[0]
562 if re.search(r'remote_gateway_info', service_name):
563 if service_name == '/remote_gateway_info':
564 return "/"
565 else:
566 return re.sub(r'/remote_gateway_info', '', service_name)
567 return None
568