1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 import cStringIO
34 import threading
35 import xmlrpclib
36 import socket
37 import time
38 from SimpleXMLRPCServer import SimpleXMLRPCServer
39 from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler
40 from SocketServer import ThreadingMixIn
41
42 import roslib; roslib.load_manifest('master_discovery_fkie')
43 import roslib.network
44 import rospy
45 import rosnode
46
47 from master_discovery_fkie.msg import *
48 from master_discovery_fkie.srv import *
49 from master_info import MasterInfo, NodeInfo, TopicInfo, ServiceInfo
50 import interface_finder
53 '''
54 The default exception class.
55 '''
56 pass
57
59 code, msg, val = args
60 if code != 1:
61 raise rosnode.ROSNodeException("remote call failed: %s"%msg)
62 return val
63
66
68 '''
69 This class provides methods to get the state from the ROS master using his
70 RPC API and test for changes. Furthermore an XML-RPC server will be created
71 to offer the complete current state of the ROS master by one method call.
72 @see: L{getState()}
73 RPC Methods:
74 @see: L{getListedMasterInfo()} or L{getMasterContacts()} as RPC: C{masterInfo()} and
75 C{masterContacts()}
76 @group RPC-methods: getListedMasterInfo, getMasterContacts
77 '''
78
79 MAX_PING_SEC = 10.0
80
82 '''
83 Initialize method. Creates an XML-RPC server on given port and starts this
84 in its own thread.
85 @param rpcport: the port number for the XML-RPC server
86 @type rpcport: C{int}
87 '''
88 self._state_access_lock = threading.RLock()
89 self._create_access_lock = threading.RLock()
90 self._lock = threading.RLock()
91 self.__masteruri = self._masteruri_from_ros()
92 self.__new_master_state = None
93 self.__masteruri_rpc = None
94 self.__mastername = None
95 self.__cached_nodes = dict()
96 self.__cached_services = dict()
97 self.ros_node_name = str(rospy.get_name())
98 if rospy.has_param('~name'):
99 self.__mastername = rospy.get_param('~name')
100 self.__mastername = self.getMastername()
101 rospy.set_param('/mastername', self.__mastername)
102
103
104 self.__master_state = None
105 '''@ivar: the current state of the ROS master'''
106 self.rpcport = rpcport
107 '''@ivar: the port number of the RPC server'''
108
109
110 ready = False
111 while not ready and (not rospy.is_shutdown()):
112 try:
113 self.rpcServer = RPCThreading(('', rpcport), logRequests=False, allow_none=True)
114 rospy.loginfo("Start RPC-XML Server at %s", self.rpcServer.server_address)
115 self.rpcServer.register_introspection_functions()
116 self.rpcServer.register_function(self.getListedMasterInfo, 'masterInfo')
117 self.rpcServer.register_function(self.getMasterContacts, 'masterContacts')
118 self._rpcThread = threading.Thread(target = self.rpcServer.serve_forever)
119 self._rpcThread.setDaemon(True)
120 self._rpcThread.start()
121 ready = True
122 except socket.error:
123 rospy.logwarn(''.join(["Error while start RPC-XML server on port ", str(rpcport), ". Try again..."]))
124 time.sleep(1)
125 except:
126 import traceback
127 print traceback.format_exc()
128
129 @classmethod
131 '''
132 Returns the master URI depending on ROS distribution API.
133 @return: ROS master URI
134 @rtype C{str}
135 '''
136 try:
137 import rospkg.distro
138 distro = rospkg.distro.current_distro_codename()
139 if distro in ['electric', 'diamondback', 'cturtle']:
140 return roslib.rosenv.get_master_uri()
141 else:
142 import rosgraph
143 return rosgraph.rosenv.get_master_uri()
144 except:
145 import os
146 return os.environ['ROS_MASTER_URI']
147
149 '''
150 Shutdown the RPC Server.
151 '''
152 if hasattr(self, 'rpcServer'):
153 self.rpcServer.shutdown()
154
156 '''
157 Gets process id of the node.
158 @param nodename: the name of the node
159 @type nodename: C{str}
160 @param uri: the uri of the node
161 @type uri: C{str}
162 '''
163 for (nodename, uri) in nodes.items():
164 if not uri is None:
165 pid = None
166 try:
167
168 with self._lock:
169 if self.__cached_nodes.has_key(nodename):
170 if time.time() - self.__cached_nodes[nodename][2] < self.MAX_PING_SEC:
171
172 return
173
174 socket.setdefaulttimeout(0.7)
175 node = xmlrpclib.ServerProxy(uri)
176 pid = _succeed(node.getPid(self.ros_node_name))
177 except (Exception, socket.error):
178
179
180 master = xmlrpclib.ServerProxy(self.getMasteruri())
181
182 code, message, new_uri = master.lookupNode(self.ros_node_name, nodename)
183
184 with self._lock:
185 self.__new_master_state.getNode(nodename).uri = None if (code == -1) else new_uri
186 try:
187
188 del self.__cached_nodes[nodename]
189 except:
190 pass
191
192 else:
193
194 with self._lock:
195
196 self.__new_master_state.getNode(nodename).pid = pid
197 self.__cached_nodes[nodename] = (uri, pid, time.time())
198
199 finally:
200 socket.setdefaulttimeout(None)
201
203 '''
204 Gets service info through the RPC interface of the service.
205 @param service: the name of the service
206 @type service: C{str}
207 @param uri: the uri of the service
208 @type uri: C{str}
209 '''
210 for (service, uri) in services.items():
211 with self._lock:
212 if self.__cached_services.has_key(service):
213 if time.time() - self.__cached_services[service][2] < self.MAX_PING_SEC:
214 return
215 if not uri is None:
216 type = dest_addr = dest_port = None
217 try:
218 dest_addr, dest_port = rospy.parse_rosrpc_uri(uri)
219 except:
220
221 continue
222
223
224 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
225 try:
226
227 s.settimeout(0.5)
228 s.connect((dest_addr, dest_port))
229 header = { 'probe':'1', 'md5sum':'*',
230 'callerid':self.ros_node_name, 'service':service}
231 roslib.network.write_ros_handshake_header(s, header)
232 type = roslib.network.read_ros_handshake_header(s, cStringIO.StringIO(), 2048)
233
234 with self._lock:
235
236 self.__new_master_state.getService(service).type = type['type']
237 self.__cached_services[service] = (uri, type['type'], time.time())
238
239 except socket.error:
240
241 with self._lock:
242 try:
243
244 del self.__cached_services[service]
245 except:
246 pass
247
248
249 except:
250
251
252
253 with self._lock:
254 try:
255
256 del self.__cached_services[service]
257 except:
258 pass
259
260 pass
261 finally:
262 if s is not None:
263 s.close()
264
265
267 '''
268 Returns a extended roscore state.
269 @return: complete roscore state as
270
271 C{(stamp, stamp_local, masteruri, name, publishers, subscribers, services, topicTypes, nodes, serviceProvider)}
272
273 - C{publishers} is of the form
274
275 C{[ [topic1, [topic1Publisher1...topic1PublisherN]] ... ]}
276
277 - C{subscribers} is of the form
278
279 C{[ [topic1, [topic1Subscriber1...topic1SubscriberN]] ... ]}
280
281 - C{services} is of the form
282
283 C{[ [service1, [service1Provider1...service1ProviderN]] ... ]}
284
285 - C{topicTypes} is a list of
286
287 C{[[topicName1, topicType1], ... ]}
288
289 - C{nodes} is a list of (the pid of remote Nodes will not be resolved)
290
291 C{[nodename, XML-RPC URI, pid, E{lb} local, remote E{rb}]}
292
293 - C{serviceProvider} is a list of (the type, serviceClass and args of remote Services will not be resolved)
294
295 C{[service, XML-RPC URI, type, E{lb} local, remote E{rb}]}
296
297 @rtype: C{(float,
298 float,
299 str,
300 str,
301 [ [str,[str] ] ],
302 [ [str,[str] ] ],
303 [ [str,[str] ] ],
304 [ [str,str] ],
305 [ [str,str,int,str] ],
306 [ [str,str,str,str] ])}
307 '''
308
309 t = str(time.time())
310 result = (t, t, self.getMasteruri(), str(self.getMastername()), [], [], [], [], [], [] )
311 if not (self.__master_state is None):
312 try:
313
314 with self._state_access_lock:
315
316 result = self.__master_state.listedState()
317
318 except:
319 import traceback
320 print traceback.format_exc()
321
322 return result
323
325
326 with self._state_access_lock:
327
328 return self.__master_state
329
330
332 '''
333 Gets state from the ROS master through his RPC interface.
334 @rtype: L{MasterInfo}
335 @raise MasterConnectionException: if not complete information was get from the ROS master.
336 '''
337
338 with self._create_access_lock:
339
340 now = time.time()
341 threads = []
342 try:
343
344
345
346
347 self._lock.acquire(True)
348
349 if clear_cache:
350 self.__cached_nodes = dict()
351 self.__cached_services = dict()
352 socket.setdefaulttimeout(5)
353
354
355 self.__new_master_state = master_state = MasterInfo(self.getMasteruri(), self.getMastername())
356 master = xmlrpclib.ServerProxy(self.getMasteruri())
357
358 code, message, topicTypes = master.getTopicTypes(self.ros_node_name)
359
360
361
362
363
364 topicTypesDict = {}
365 for topic, type in topicTypes:
366 topicTypesDict[topic] = type
367
368
369
370
371
372 code, message, state = master.getSystemState(self.ros_node_name)
373
374
375
376
377
378
379 for t, l in state[0]:
380 master_state.topics = t
381 for n in l:
382 master_state.nodes = n
383 master_state.getNode(n).publishedTopics = t
384 master_state.getTopic(t).publisherNodes = n
385 master_state.getTopic(t).type = topicTypesDict.get(t, 'None')
386
387
388
389
390
391 for t, l in state[1]:
392 master_state.topics = t
393 for n in l:
394 master_state.nodes = n
395 master_state.getNode(n).subscribedTopics = t
396 master_state.getTopic(t).subscriberNodes = n
397 master_state.getTopic(t).type = topicTypesDict.get(t, 'None')
398
399
400
401
402
403
404
405
406
407
408
409 services = dict()
410 tmp_slist = []
411
412 param_server_multi = xmlrpclib.MultiCall(master)
413 for t, l in state[2]:
414 master_state.services = t
415 for n in l:
416 master_state.nodes = n
417 master_state.getNode(n).services = t
418 service = master_state.getService(t)
419 service.serviceProvider = n
420 if self.__cached_services.has_key(service.name):
421 service.uri = self.__cached_services[service.name][0]
422 service.type = self.__cached_services[service.name][1]
423 if service.isLocal and time.time() - self.__cached_services[service.name][2] > self.MAX_PING_SEC:
424 services[service.name] = service.uri
425 else:
426
427 tmp_slist.append(service)
428 param_server_multi.lookupService(self.ros_node_name, t)
429
430
431
432
433
434 try:
435 r = param_server_multi()
436 for (code, msg, uri), service in zip(r, tmp_slist):
437 if code == 1:
438 service.uri = uri
439 if service.isLocal:
440
441 services[service.name] = uri
442 else:
443 self.__cached_services[service.name] = (uri, None, time.time())
444
445 except:
446 import traceback
447 traceback.print_exc()
448 if services:
449 pidThread = threading.Thread(target = self._getServiceInfo, args=((services,)))
450 pidThread.start()
451 threads.append(pidThread)
452
453
454 nodes = dict()
455 try:
456
457 param_server_multi = xmlrpclib.MultiCall(master)
458 tmp_nlist = []
459 for name, node in master_state.nodes.items():
460 if self.__cached_nodes.has_key(node.name):
461 node.uri = self.__cached_nodes[node.name][0]
462 node.pid = self.__cached_nodes[node.name][1]
463 if node.isLocal and time.time() - self.__cached_nodes[node.name][2] > self.MAX_PING_SEC:
464 nodes[node.name] = node.uri
465 else:
466
467 tmp_nlist.append(node)
468 param_server_multi.lookupNode(self.ros_node_name, name)
469 r = param_server_multi()
470 for (code, msg, uri), node in zip(r, tmp_nlist):
471 if code == 1:
472 node.uri = uri
473 if node.isLocal:
474 nodes[node.name] = uri
475 else:
476 self.__cached_nodes[node.name] = (uri, None, time.time())
477 except:
478 import traceback
479 traceback.print_exc()
480
481
482
483 if nodes:
484
485 pidThread = threading.Thread(target = self._getNodePid, args=((nodes,)))
486 pidThread.start()
487 threads.append(pidThread)
488
489 master_state.timestamp = now
490 except socket.error, e:
491 if isinstance(e, tuple):
492 (errn, msg) = e
493 if not errn in [100, 101, 102]:
494 import traceback
495 formatted_lines = traceback.format_exc().splitlines()
496
497 raise MasterConnectionException(formatted_lines[-1])
498 else:
499 import traceback
500 raise MasterConnectionException(traceback.format_exc())
501 except:
502 import traceback
503 formatted_lines = traceback.format_exc().splitlines()
504
505 raise MasterConnectionException(formatted_lines[-1])
506 finally:
507 self._lock.release()
508
509 socket.setdefaulttimeout(None)
510
511
512
513
514
515
516 while threads:
517 th = threads.pop()
518 if th.isAlive():
519
520 th.join()
521
522 del th
523
524
525
526
527
528
529 return master_state
530
532 '''
533 This method can be called to update the origin ROS master URI of the nodes
534 and services in new master_state. This is only need, if a synchronization is
535 running. The synchronization service will be detect automatically by searching
536 for the service ending with C{get_sync_info}. The method will be called by
537 C{checkState()}.
538 '''
539
540
541 with self._create_access_lock:
542
543 master_state = self.__new_master_state
544 sync_info = None
545
546
547 for name, service in master_state.services.items():
548 if service.name.endswith('get_sync_info'):
549 if interface_finder.hostFromUri(self.getMasteruri()) == interface_finder.hostFromUri(service.uri):
550 socket.setdefaulttimeout(3)
551 get_sync_info = rospy.ServiceProxy(service.name, GetSyncInfo)
552 try:
553 sync_info = get_sync_info()
554 except rospy.ServiceException, e:
555 rospy.logwarn("ERROR Service call 'get_sync_info' failed: %s", str(e))
556 finally:
557 socket.setdefaulttimeout(None)
558
559
560 if sync_info:
561 for m in sync_info.hosts:
562 for n in m.nodes:
563 try:
564 master_state.getNode(n).masteruri = m.masteruri
565 except:
566 pass
567 for s in m.services:
568 try:
569 master_state.getService(s).masteruri = m.masteruri
570 except:
571 pass
572
573
574
575
577 '''
578 Requests the ROS master URI from the ROS master through the RPC interface and
579 returns it.
580 @return: ROS master URI
581 @rtype: C{str} or C{None}
582 '''
583 code = -1
584 if self.__masteruri_rpc is None:
585 master = xmlrpclib.ServerProxy(self.__masteruri)
586 code, message, self.__masteruri_rpc = master.getUri(self.ros_node_name)
587 return self.__masteruri_rpc if code >= 0 or not self.__masteruri_rpc is None else self.__masteruri
588
590 '''
591 Returns the name of the master. If no name is set, the hostname of the
592 ROS master URI will be extracted.
593 @return: the name of the ROS master
594 @rtype: C{str} or C{None}
595 '''
596 if self.__mastername is None:
597 try:
598 self.__mastername = interface_finder.hostFromUri(self.getMasteruri())
599 try:
600 from urlparse import urlparse
601 master_port = urlparse(self.__masteruri).port
602 if master_port != 11311:
603 self.__mastername = '--'.join([self.__mastername, str(master_port)])
604 except:
605 pass
606 except:
607 pass
608 return self.__mastername
609
626
628 '''
629 Gets the state from the ROS master and compares it to the stored state.
630 @param clear_cache Clears cache for ROS nodes and sevices. Is neaded on changes on remote hosts.
631 @return: True if the ROS master state is changed
632 @rtype: C{boolean}
633 '''
634 result = False
635 s = self.updateState(clear_cache)
636
637 with self._create_access_lock:
638
639
640 do_update = False
641 with self._state_access_lock:
642
643 if s != self.__master_state:
644 do_update = True
645 if do_update:
646 self.updateSyncInfo()
647 with self._state_access_lock:
648
649 ts_local = self.__new_master_state.timestamp_local
650 if not self.__master_state is None and not self.__master_state.has_local_changes(s):
651 ts_local = self.__master_state.timestamp_local
652 self.__master_state = self.__new_master_state
653 self.__master_state.timestamp_local = ts_local
654 result = True
655 self.__master_state.check_ts = self.__new_master_state.timestamp
656
657
658 return result
659
661 '''
662 Sets the master state to None.
663 '''
664
665 with self._state_access_lock:
666
667 if not self.__master_state is None:
668 del self.__master_state
669 self.__master_state = None
670
671