Package master_discovery_fkie :: Module master_monitor
[frames] | no frames]

Source Code for Module master_discovery_fkie.master_monitor

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2012, Fraunhofer FKIE/US, Alexander Tiderko 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Fraunhofer nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32   
 33  import cStringIO 
 34  import threading 
 35  import xmlrpclib 
 36  import socket 
 37  import time 
 38  from SimpleXMLRPCServer import SimpleXMLRPCServer 
 39  from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler 
 40  from SocketServer import ThreadingMixIn 
 41   
 42  import roslib; roslib.load_manifest('master_discovery_fkie') 
 43  import roslib.network 
 44  import rospy 
 45  import rosnode 
 46   
 47  from master_discovery_fkie.msg import * 
 48  from master_discovery_fkie.srv import * 
 49  from master_info import MasterInfo, NodeInfo, TopicInfo, ServiceInfo 
 50  import interface_finder 
51 52 -class MasterConnectionException(Exception):
53 ''' 54 The default exception class. 55 ''' 56 pass
57
58 -def _succeed(args):
59 code, msg, val = args 60 if code != 1: 61 raise rosnode.ROSNodeException("remote call failed: %s"%msg) 62 return val
63
64 -class RPCThreading(ThreadingMixIn, SimpleXMLRPCServer):
65 pass
66
67 -class MasterMonitor(object):
68 ''' 69 This class provides methods to get the state from the ROS master using his 70 RPC API and test for changes. Furthermore an XML-RPC server will be created 71 to offer the complete current state of the ROS master by one method call. 72 @see: L{getState()} 73 RPC Methods: 74 @see: L{getListedMasterInfo()} or L{getMasterContacts()} as RPC: C{masterInfo()} and 75 C{masterContacts()} 76 @group RPC-methods: getListedMasterInfo, getMasterContacts 77 ''' 78 79 MAX_PING_SEC = 10.0 80
81 - def __init__(self, rpcport=11611):
82 ''' 83 Initialize method. Creates an XML-RPC server on given port and starts this 84 in its own thread. 85 @param rpcport: the port number for the XML-RPC server 86 @type rpcport: C{int} 87 ''' 88 self._state_access_lock = threading.RLock() 89 self._create_access_lock = threading.RLock() 90 self._lock = threading.RLock() 91 self.__masteruri = self._masteruri_from_ros() 92 self.__new_master_state = None 93 self.__masteruri_rpc = None 94 self.__mastername = None 95 self.__cached_nodes = dict() 96 self.__cached_services = dict() 97 self.ros_node_name = str(rospy.get_name()) 98 if rospy.has_param('~name'): 99 self.__mastername = rospy.get_param('~name') 100 self.__mastername = self.getMastername() 101 rospy.set_param('/mastername', self.__mastername) 102 103 104 self.__master_state = None 105 '''@ivar: the current state of the ROS master''' 106 self.rpcport = rpcport 107 '''@ivar: the port number of the RPC server''' 108 109 # Create an XML-RPC server 110 ready = False 111 while not ready and (not rospy.is_shutdown()): 112 try: 113 self.rpcServer = RPCThreading(('', rpcport), logRequests=False, allow_none=True) 114 rospy.loginfo("Start RPC-XML Server at %s", self.rpcServer.server_address) 115 self.rpcServer.register_introspection_functions() 116 self.rpcServer.register_function(self.getListedMasterInfo, 'masterInfo') 117 self.rpcServer.register_function(self.getMasterContacts, 'masterContacts') 118 self._rpcThread = threading.Thread(target = self.rpcServer.serve_forever) 119 self._rpcThread.setDaemon(True) 120 self._rpcThread.start() 121 ready = True 122 except socket.error: 123 rospy.logwarn(''.join(["Error while start RPC-XML server on port ", str(rpcport), ". Try again..."])) 124 time.sleep(1) 125 except: 126 import traceback 127 print traceback.format_exc()
128 129 @classmethod
130 - def _masteruri_from_ros(cls):
131 ''' 132 Returns the master URI depending on ROS distribution API. 133 @return: ROS master URI 134 @rtype C{str} 135 ''' 136 try: 137 import rospkg.distro 138 distro = rospkg.distro.current_distro_codename() 139 if distro in ['electric', 'diamondback', 'cturtle']: 140 return roslib.rosenv.get_master_uri() 141 else: 142 import rosgraph 143 return rosgraph.rosenv.get_master_uri() 144 except: 145 import os 146 return os.environ['ROS_MASTER_URI']
147
148 - def shutdown(self):
149 ''' 150 Shutdown the RPC Server. 151 ''' 152 if hasattr(self, 'rpcServer'): 153 self.rpcServer.shutdown()
154
155 - def _getNodePid(self, nodes):
156 ''' 157 Gets process id of the node. 158 @param nodename: the name of the node 159 @type nodename: C{str} 160 @param uri: the uri of the node 161 @type uri: C{str} 162 ''' 163 for (nodename, uri) in nodes.items(): 164 if not uri is None: 165 pid = None 166 try: 167 # print "_getNodePid _lock try..", threading.current_thread() 168 with self._lock: 169 if self.__cached_nodes.has_key(nodename): 170 if time.time() - self.__cached_nodes[nodename][2] < self.MAX_PING_SEC: 171 # print "_getNodePid _lock RETe", threading.current_thread() 172 return 173 # print "_getNodePid _lock RET", threading.current_thread() 174 socket.setdefaulttimeout(0.7) 175 node = xmlrpclib.ServerProxy(uri) 176 pid = _succeed(node.getPid(self.ros_node_name)) 177 except (Exception, socket.error): 178 # import traceback 179 # print traceback.format_exc() 180 master = xmlrpclib.ServerProxy(self.getMasteruri()) 181 # print "request again nodeuri for", nodename 182 code, message, new_uri = master.lookupNode(self.ros_node_name, nodename) 183 # print "_getNodePid _lock try..", threading.current_thread() 184 with self._lock: 185 self.__new_master_state.getNode(nodename).uri = None if (code == -1) else new_uri 186 try: 187 # print "remove cached node", nodename 188 del self.__cached_nodes[nodename] 189 except: 190 pass 191 # print "_getNodePid _lock RET", threading.current_thread() 192 else: 193 # print "_getNodePid _lock try..", threading.current_thread() 194 with self._lock: 195 # print "get info about node", nodename, uri 196 self.__new_master_state.getNode(nodename).pid = pid 197 self.__cached_nodes[nodename] = (uri, pid, time.time()) 198 # print "_getNodePid _lock RET", threading.current_thread() 199 finally: 200 socket.setdefaulttimeout(None)
201
202 - def _getServiceInfo(self, services):
203 ''' 204 Gets service info through the RPC interface of the service. 205 @param service: the name of the service 206 @type service: C{str} 207 @param uri: the uri of the service 208 @type uri: C{str} 209 ''' 210 for (service, uri) in services.items(): 211 with self._lock: 212 if self.__cached_services.has_key(service): 213 if time.time() - self.__cached_services[service][2] < self.MAX_PING_SEC: 214 return 215 if not uri is None: 216 type = dest_addr = dest_port = None 217 try: 218 dest_addr, dest_port = rospy.parse_rosrpc_uri(uri) 219 except: 220 # print "ERROR while get service info" 221 continue 222 # return 223 # raise ROSServiceException("service [%s] has an invalid RPC URI [%s]"%(service, uri)) 224 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 225 try: 226 # connect to service and probe it to get the headers 227 s.settimeout(0.5) 228 s.connect((dest_addr, dest_port)) 229 header = { 'probe':'1', 'md5sum':'*', 230 'callerid':self.ros_node_name, 'service':service} 231 roslib.network.write_ros_handshake_header(s, header) 232 type = roslib.network.read_ros_handshake_header(s, cStringIO.StringIO(), 2048) 233 # print "_getServiceInfo _lock try..", threading.current_thread() 234 with self._lock: 235 # print "get info about service", service, uri 236 self.__new_master_state.getService(service).type = type['type'] 237 self.__cached_services[service] = (uri, type['type'], time.time()) 238 # print "_getServiceInfo _lock RET", threading.current_thread() 239 except socket.error: 240 # print "_getServiceInfo _lock try..", threading.current_thread() 241 with self._lock: 242 try: 243 # print "delete socket service", service, uri 244 del self.__cached_services[service] 245 except: 246 pass 247 # raise ROSServiceIOException("Unable to communicate with service [%s], address [%s]"%(service, uri)) 248 # print "_getServiceInfo _lock RET", threading.current_thread() 249 except: 250 # import traceback 251 # print traceback.format_exc() 252 # print "_getServiceInfo _lock try..", threading.current_thread() 253 with self._lock: 254 try: 255 # print "delete service", service, uri 256 del self.__cached_services[service] 257 except: 258 pass 259 # print "_getServiceInfo _lock RET", threading.current_thread() 260 pass 261 finally: 262 if s is not None: 263 s.close()
264 265
266 - def getListedMasterInfo(self):
267 ''' 268 Returns a extended roscore state. 269 @return: complete roscore state as 270 271 C{(stamp, stamp_local, masteruri, name, publishers, subscribers, services, topicTypes, nodes, serviceProvider)} 272 273 - C{publishers} is of the form 274 275 C{[ [topic1, [topic1Publisher1...topic1PublisherN]] ... ]} 276 277 - C{subscribers} is of the form 278 279 C{[ [topic1, [topic1Subscriber1...topic1SubscriberN]] ... ]} 280 281 - C{services} is of the form 282 283 C{[ [service1, [service1Provider1...service1ProviderN]] ... ]} 284 285 - C{topicTypes} is a list of 286 287 C{[[topicName1, topicType1], ... ]} 288 289 - C{nodes} is a list of (the pid of remote Nodes will not be resolved) 290 291 C{[nodename, XML-RPC URI, pid, E{lb} local, remote E{rb}]} 292 293 - C{serviceProvider} is a list of (the type, serviceClass and args of remote Services will not be resolved) 294 295 C{[service, XML-RPC URI, type, E{lb} local, remote E{rb}]} 296 297 @rtype: C{(float, 298 float, 299 str, 300 str, 301 [ [str,[str] ] ], 302 [ [str,[str] ] ], 303 [ [str,[str] ] ], 304 [ [str,str] ], 305 [ [str,str,int,str] ], 306 [ [str,str,str,str] ])} 307 ''' 308 #'print "MASTERINFO ====================" 309 t = str(time.time()) 310 result = (t, t, self.getMasteruri(), str(self.getMastername()), [], [], [], [], [], [] ) 311 if not (self.__master_state is None): 312 try: 313 #'print "getListedMasterInfo _state_access_lock try...", threading.current_thread() 314 with self._state_access_lock: 315 #'print " getListedMasterInfo _state_access_lock locked", threading.current_thread() 316 result = self.__master_state.listedState() 317 #'print "getListedMasterInfo _state_access_lock RET", threading.current_thread() 318 except: 319 import traceback 320 print traceback.format_exc() 321 #'print "MASTERINFO <<<<<<<<<<<<<<<<<<<<<" 322 return result
323
324 - def getCurrentState(self):
325 #'print "getCurrentState _state_access_lock try...", threading.current_thread() 326 with self._state_access_lock: 327 #'print " getCurrentState _state_access_lock locked", threading.current_thread() 328 return self.__master_state
329 #'print "getCurrentState _state_access_lock RET", threading.current_thread() 330
331 - def updateState(self, clear_cache=False):
332 ''' 333 Gets state from the ROS master through his RPC interface. 334 @rtype: L{MasterInfo} 335 @raise MasterConnectionException: if not complete information was get from the ROS master. 336 ''' 337 #'print "updateState _create_access_lock try...", threading.current_thread() 338 with self._create_access_lock: 339 #'print " updateState _create_access_lock locked", threading.current_thread() 340 now = time.time() 341 threads = [] 342 try: 343 # import os ################### 344 # cputimes = os.times() ################### 345 # cputime_init = cputimes[0] + cputimes[1] ################### 346 #'print " updateState _lock try...", threading.current_thread() 347 self._lock.acquire(True) 348 #'print " updateState _lock locked", threading.current_thread() 349 if clear_cache: 350 self.__cached_nodes = dict() 351 self.__cached_services = dict() 352 socket.setdefaulttimeout(5) 353 # cputimes2 = os.times() ################### 354 # cputime_init2 = cputimes2[0] + cputimes2[1] ################### 355 self.__new_master_state = master_state = MasterInfo(self.getMasteruri(), self.getMastername()) 356 master = xmlrpclib.ServerProxy(self.getMasteruri()) 357 # get topic types 358 code, message, topicTypes = master.getTopicTypes(self.ros_node_name) 359 # cputimes2 = os.times() ################### 360 # print "Get types: ", (cputimes2[0] + cputimes2[1] - cputime_init2) ################### 361 # cputimes2 = os.times() ################### 362 # cputime_init2 = cputimes2[0] + cputimes2[1] ################### 363 #convert topicType list to the dict 364 topicTypesDict = {} 365 for topic, type in topicTypes: 366 topicTypesDict[topic] = type 367 # cputimes2 = os.times() ################### 368 # print "Set types: ", (cputimes2[0] + cputimes2[1] - cputime_init2) ################### 369 # cputimes2 = os.times() ################### 370 # cputime_init2 = cputimes2[0] + cputimes2[1] ################### 371 # get system state 372 code, message, state = master.getSystemState(self.ros_node_name) 373 # cputimes2 = os.times() ################### 374 # print "Get state: ", (cputimes2[0] + cputimes2[1] - cputime_init2) ################### 375 # cputimes2 = os.times() ################### 376 # cputime_init2 = cputimes2[0] + cputimes2[1] ################### 377 378 # add published topics 379 for t, l in state[0]: 380 master_state.topics = t 381 for n in l: 382 master_state.nodes = n 383 master_state.getNode(n).publishedTopics = t 384 master_state.getTopic(t).publisherNodes = n 385 master_state.getTopic(t).type = topicTypesDict.get(t, 'None') 386 # cputimes2 = os.times() ################### 387 # print "read pub topics: ", (cputimes2[0] + cputimes2[1] - cputime_init2) ################### 388 # cputimes2 = os.times() ################### 389 # cputime_init2 = cputimes2[0] + cputimes2[1] ################### 390 # add subscribed topics 391 for t, l in state[1]: 392 master_state.topics = t 393 for n in l: 394 master_state.nodes = n 395 master_state.getNode(n).subscribedTopics = t 396 master_state.getTopic(t).subscriberNodes = n 397 master_state.getTopic(t).type = topicTypesDict.get(t, 'None') 398 # cputimes2 = os.times() ################### 399 # print "read sub topics: ", (cputimes2[0] + cputimes2[1] - cputime_init2) ################### 400 # cputimes2 = os.times() ################### 401 # cputime_init2 = cputimes2[0] + cputimes2[1] ################### 402 # cputimes = os.times() ################### 403 # print "Auswertung: ", (cputimes[0] + cputimes[1] - cputime_init) ################### 404 405 # add services 406 # cputimes = os.times() ################### 407 # cputime_init = cputimes[0] + cputimes[1] ################### 408 409 services = dict() 410 tmp_slist = [] 411 # multi-call style xmlrpc to lock up the service uri 412 param_server_multi = xmlrpclib.MultiCall(master) 413 for t, l in state[2]: 414 master_state.services = t 415 for n in l: 416 master_state.nodes = n 417 master_state.getNode(n).services = t 418 service = master_state.getService(t) 419 service.serviceProvider = n 420 if self.__cached_services.has_key(service.name): 421 service.uri = self.__cached_services[service.name][0] 422 service.type = self.__cached_services[service.name][1] 423 if service.isLocal and time.time() - self.__cached_services[service.name][2] > self.MAX_PING_SEC: 424 services[service.name] = service.uri 425 else: 426 #'print "request service:", service.name 427 tmp_slist.append(service) 428 param_server_multi.lookupService(self.ros_node_name, t) 429 # code, message, service.uri = master.lookupService(rospy.get_name(), t) 430 # if (code == -1): 431 # service.uri = None 432 # elif service.isLocal: 433 # services[service.name] = service.uri 434 try: 435 r = param_server_multi() 436 for (code, msg, uri), service in zip(r, tmp_slist): 437 if code == 1: 438 service.uri = uri 439 if service.isLocal: 440 # print "service local: ", service.uri, service.masteruri 441 services[service.name] = uri 442 else: 443 self.__cached_services[service.name] = (uri, None, time.time()) 444 445 except: 446 import traceback 447 traceback.print_exc() 448 if services: 449 pidThread = threading.Thread(target = self._getServiceInfo, args=((services,))) 450 pidThread.start() 451 threads.append(pidThread) 452 453 #get additional node information 454 nodes = dict() 455 try: 456 # multi-call style xmlrpc to loock up the node uri 457 param_server_multi = xmlrpclib.MultiCall(master) 458 tmp_nlist = [] 459 for name, node in master_state.nodes.items(): 460 if self.__cached_nodes.has_key(node.name): 461 node.uri = self.__cached_nodes[node.name][0] 462 node.pid = self.__cached_nodes[node.name][1] 463 if node.isLocal and time.time() - self.__cached_nodes[node.name][2] > self.MAX_PING_SEC: 464 nodes[node.name] = node.uri 465 else: 466 #'print "request node:", node.name 467 tmp_nlist.append(node) 468 param_server_multi.lookupNode(self.ros_node_name, name) 469 r = param_server_multi() 470 for (code, msg, uri), node in zip(r, tmp_nlist): 471 if code == 1: 472 node.uri = uri 473 if node.isLocal: 474 nodes[node.name] = uri 475 else: 476 self.__cached_nodes[node.name] = (uri, None, time.time()) 477 except: 478 import traceback 479 traceback.print_exc() 480 # cputimes = os.times() ################### 481 # print "Nodes+Services:", (cputimes[0] + cputimes[1] - cputime_init), ", count nodes:", len(nodes) ################### 482 483 if nodes: 484 # get process id of the nodes 485 pidThread = threading.Thread(target = self._getNodePid, args=((nodes,))) 486 pidThread.start() 487 threads.append(pidThread) 488 489 master_state.timestamp = now 490 except socket.error, e: 491 if isinstance(e, tuple): 492 (errn, msg) = e 493 if not errn in [100, 101, 102]: 494 import traceback 495 formatted_lines = traceback.format_exc().splitlines() 496 # print "Service call failed: %s"%traceback.format_exc() 497 raise MasterConnectionException(formatted_lines[-1]) 498 else: 499 import traceback 500 raise MasterConnectionException(traceback.format_exc()) 501 except: 502 import traceback 503 formatted_lines = traceback.format_exc().splitlines() 504 # print "Service call failed: %s"%traceback.format_exc() 505 raise MasterConnectionException(formatted_lines[-1]) 506 finally: 507 self._lock.release() 508 #'print " updateState _lock RET", threading.current_thread() 509 socket.setdefaulttimeout(None) 510 511 # cputimes = os.times() ################### 512 # cputime_init = cputimes[0] + cputimes[1] ################### 513 514 # print "threads:", len(threads) 515 # wait for all threads are finished 516 while threads: 517 th = threads.pop() 518 if th.isAlive(): 519 # print "join" 520 th.join() 521 # print "release" 522 del th 523 # cputimes = os.times() ################### 524 # print "Threads:", (cputimes[0] + cputimes[1] - cputime_init) ################### 525 # print "state update of ros master", self.__masteruri, " finished" 526 # return MasterInfo.from_list(master_state.listedState()) 527 #'print "updateState _create_access_lock RET", threading.current_thread() 528 529 return master_state
530
531 - def updateSyncInfo(self):
532 ''' 533 This method can be called to update the origin ROS master URI of the nodes 534 and services in new master_state. This is only need, if a synchronization is 535 running. The synchronization service will be detect automatically by searching 536 for the service ending with C{get_sync_info}. The method will be called by 537 C{checkState()}. 538 ''' 539 #'print "updateSyncInfo _create_access_lock try...", threading.current_thread() 540 541 with self._create_access_lock: 542 #'print " updateSyncInfo _create_access_lock locked", threading.current_thread() 543 master_state = self.__new_master_state 544 sync_info = None 545 # get synchronization info, if sync node is running 546 # to determine the origin ROS MASTER URI of the nodes 547 for name, service in master_state.services.items(): 548 if service.name.endswith('get_sync_info'): 549 if interface_finder.hostFromUri(self.getMasteruri()) == interface_finder.hostFromUri(service.uri): 550 socket.setdefaulttimeout(3) 551 get_sync_info = rospy.ServiceProxy(service.name, GetSyncInfo) 552 try: 553 sync_info = get_sync_info() 554 except rospy.ServiceException, e: 555 rospy.logwarn("ERROR Service call 'get_sync_info' failed: %s", str(e)) 556 finally: 557 socket.setdefaulttimeout(None) 558 559 #update the origin ROS MASTER URI of the nodes, if sync node is running 560 if sync_info: 561 for m in sync_info.hosts: 562 for n in m.nodes: 563 try: 564 master_state.getNode(n).masteruri = m.masteruri 565 except: 566 pass 567 for s in m.services: 568 try: 569 master_state.getService(s).masteruri = m.masteruri 570 except: 571 pass
572 #'print "updateSyncInfo _create_access_lock RET", threading.current_thread() 573 574 575
576 - def getMasteruri(self):
577 ''' 578 Requests the ROS master URI from the ROS master through the RPC interface and 579 returns it. 580 @return: ROS master URI 581 @rtype: C{str} or C{None} 582 ''' 583 code = -1 584 if self.__masteruri_rpc is None: 585 master = xmlrpclib.ServerProxy(self.__masteruri) 586 code, message, self.__masteruri_rpc = master.getUri(self.ros_node_name) 587 return self.__masteruri_rpc if code >= 0 or not self.__masteruri_rpc is None else self.__masteruri
588
589 - def getMastername(self):
590 ''' 591 Returns the name of the master. If no name is set, the hostname of the 592 ROS master URI will be extracted. 593 @return: the name of the ROS master 594 @rtype: C{str} or C{None} 595 ''' 596 if self.__mastername is None: 597 try: 598 self.__mastername = interface_finder.hostFromUri(self.getMasteruri()) 599 try: 600 from urlparse import urlparse 601 master_port = urlparse(self.__masteruri).port 602 if master_port != 11311: 603 self.__mastername = '--'.join([self.__mastername, str(master_port)]) 604 except: 605 pass 606 except: 607 pass 608 return self.__mastername
609
610 - def getMasterContacts(self):
611 ''' 612 The RPC method called by XML-RPC server to request the master contact information. 613 @return: (timestamp of the ROS master state, ROS master URI, master name, name of this service, URI of this RPC server) 614 @rtype: C{(str, str, str, str, str)} 615 ''' 616 #'print "MASTERCONTACTS ====================" 617 t = 0 618 if not self.__master_state is None: 619 #'print "getMasterContacts _state_access_lock try...", threading.current_thread() 620 with self._state_access_lock: 621 #'print " getMasterContacts _state_access_lock locked", threading.current_thread() 622 t = self.__master_state.timestamp 623 #'print "getMasterContacts _state_access_lock RET", threading.current_thread() 624 #'print "MASTERCONTACTS <<<<<<<<<<<<<<<<<<<<" 625 return (str(t), str(self.getMasteruri()), str(self.getMastername()), self.ros_node_name, roslib.network.create_local_xmlrpc_uri(self.rpcport))
626
627 - def checkState(self, clear_cache=False):
628 ''' 629 Gets the state from the ROS master and compares it to the stored state. 630 @param clear_cache Clears cache for ROS nodes and sevices. Is neaded on changes on remote hosts. 631 @return: True if the ROS master state is changed 632 @rtype: C{boolean} 633 ''' 634 result = False 635 s = self.updateState(clear_cache) 636 #'print "checkState _create_access_lock try...", threading.current_thread() 637 with self._create_access_lock: 638 #'print " checkState _create_access_lock locked", threading.current_thread() 639 #'print " checkState _state_access_lock try...", threading.current_thread() 640 do_update = False 641 with self._state_access_lock: 642 #'print " checkState _state_access_lock locked", threading.current_thread() 643 if s != self.__master_state: 644 do_update = True 645 if do_update: 646 self.updateSyncInfo() 647 with self._state_access_lock: 648 # test for local changes 649 ts_local = self.__new_master_state.timestamp_local 650 if not self.__master_state is None and not self.__master_state.has_local_changes(s): 651 ts_local = self.__master_state.timestamp_local 652 self.__master_state = self.__new_master_state 653 self.__master_state.timestamp_local = ts_local 654 result = True 655 self.__master_state.check_ts = self.__new_master_state.timestamp 656 #'print " checkState _state_access_lock RET", threading.current_thread() 657 #'print "checkState _create_access_lock RET", threading.current_thread() 658 return result
659
660 - def reset(self):
661 ''' 662 Sets the master state to None. 663 ''' 664 #'print "reset _state_access_lock try...", threading.current_thread() 665 with self._state_access_lock: 666 #'print " reset _state_access_lock locked", threading.current_thread() 667 if not self.__master_state is None: 668 del self.__master_state 669 self.__master_state = None
670 #'print "reset _state_access_lock RET", threading.current_thread() 671