Package rospy :: Package impl :: Module registration

Source Code for Module rospy.impl.registration

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  """Internal use: handles maintaining registrations with master via internal listener APIs""" 
 36   
 37   
 38   
 39  import socket 
 40  import sys 
 41  import logging 
 42  import threading 
 43  import time 
 44  import traceback 
 45  try: 
 46      import xmlrpc.client as xmlrpcclient 
 47  except ImportError: 
 48      import xmlrpclib as xmlrpcclient 
 49   
 50  from rospy.core import is_shutdown, is_shutdown_requested, xmlrpcapi, \ 
 51      logfatal, logwarn, loginfo, logerr, logdebug, \ 
 52      signal_shutdown, add_preshutdown_hook 
 53  from rospy.names import get_caller_id, get_namespace 
 54   
 55  # topic manager and service manager singletons 
 56   
 57  _topic_manager = None 
58 -def set_topic_manager(tm):
59 global _topic_manager 60 _topic_manager = tm
61 -def get_topic_manager():
62 return _topic_manager
63 64 _service_manager = None
65 -def set_service_manager(sm):
66 global _service_manager 67 _service_manager = sm
68 -def get_service_manager():
69 return _service_manager
70 71
72 -class Registration(object):
73 """Registration types""" 74 PUB = 'pub' 75 SUB = 'sub' 76 SRV = 'srv'
77
78 -class RegistrationListener(object):
79 """Listener API for subscribing to changes in Publisher/Subscriber/Service declarations""" 80
81 - def reg_added(self, resolved_name, data_type_or_uri, reg_type):
82 """ 83 New pub/sub/service declared. 84 @param resolved_name: resolved topic/service name 85 @param data_type_or_uri: topic type or service uri 86 @type data_type_or_uri: str 87 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 88 @type reg_type: str 89 """ 90 pass
91
92 - def reg_removed(self, resolved_name, data_type_or_uri, reg_type):
93 """ 94 New pub/sub/service removed. 95 @param resolved_name: topic/service name 96 @type resolved_name: str 97 @param data_type_or_uri: topic type or service uri 98 @type data_type_or_uri: str 99 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 100 @type reg_type: str 101 """ 102 pass 103
104 -class RegistrationListeners(object):
105
106 - def __init__(self):
107 """ 108 ctor. 109 """ 110 self.listeners = [] 111 self.lock = threading.Lock()
112
113 - def add_listener(self, l):
114 """ 115 Subscribe to notifications of pub/sub/service registration 116 changes. This is an internal API used to notify higher level 117 routines when to communicate with the master. 118 @param l: listener to subscribe 119 @type l: TopicListener 120 """ 121 assert isinstance(l, RegistrationListener) 122 with self.lock: 123 self.listeners.append(l)
124
125 - def notify_removed(self, resolved_name, data_type_or_uri, reg_type):
126 """ 127 @param resolved_name: resolved_topic/service name 128 @type resolved_name: str 129 @param data_type_or_uri: topic type or service uri 130 @type data_type_or_uri: str 131 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 132 @type reg_type: str 133 """ 134 with self.lock: 135 for l in self.listeners: 136 try: 137 l.reg_removed(resolved_name, data_type_or_uri, reg_type) 138 except Exception as e: 139 logerr("error notifying listener of removal: %s"%traceback.format_exc())
140
141 - def notify_added(self, resolved_name, data_type, reg_type):
142 """ 143 @param resolved_name: topic/service name 144 @type resolved_name: str 145 @param data_type: topic/service type 146 @type data_type: str 147 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 148 @type reg_type: str 149 """ 150 with self.lock: 151 for l in self.listeners: 152 try: 153 l.reg_added(resolved_name, data_type, reg_type) 154 except Exception as e: 155 logerr(traceback.format_exc())
156
157 - def clear(self):
158 """ 159 Remove all registration listeners 160 """ 161 if not is_shutdown_requested(): 162 with self.lock: 163 del self.listeners[:] 164 else: 165 # when being in shutdown phase the lock might not be lockable 166 # if a notify_added/removed is currently ongoing 167 locked = self.lock.acquire(False) 168 # remove all listeners anyway 169 del self.listeners[:] 170 if locked: 171 self.lock.release()
172 173 _registration_listeners = RegistrationListeners()
174 -def get_registration_listeners():
175 return _registration_listeners
176 177 # RegManager's main purpose is to collect all client->master communication in one place 178
179 -class RegManager(RegistrationListener):
180 """ 181 Registration manager used by Node implemenation. 182 Communicates with ROS Master to maintain topic registration 183 information. Also responds to publisher updates to create topic 184 connections 185 """ 186
187 - def __init__(self, handler):
188 """ 189 ctor. 190 @param handler: node API handler 191 """ 192 self.logger = logging.getLogger("rospy.registration") 193 self.handler = handler 194 self.uri = self.master_uri = None 195 self.updates = [] 196 self.cond = threading.Condition() #for locking/notifying updates 197 self.registered = False 198 # cleanup has to occur before official shutdown 199 add_preshutdown_hook(self.cleanup)
200
201 - def start(self, uri, master_uri):
202 """ 203 Start the RegManager. This should be passed in as an argument to a thread 204 starter as the RegManager is designed to spin in its own thread 205 @param uri: URI of local node 206 @type uri: str 207 @param master_uri: Master URI 208 @type master_uri: str 209 """ 210 self.registered = False 211 self.master_uri = master_uri 212 self.uri = uri 213 first = True 214 tm = get_topic_manager() 215 sm = get_service_manager() 216 ns = get_namespace() 217 caller_id = get_caller_id() 218 if not master_uri or master_uri == uri: 219 registered = True 220 master = None 221 else: 222 registered = False 223 master = xmlrpcapi(master_uri) 224 self.logger.info("Registering with master node %s", master_uri) 225 226 while not registered and not is_shutdown(): 227 try: 228 try: 229 # prevent TopicManager and ServiceManager from accepting registrations until we are done 230 tm.lock.acquire() 231 sm.lock.acquire() 232 233 pub, sub, srv = tm.get_publications(), tm.get_subscriptions(), sm.get_services() 234 for resolved_name, data_type in pub: 235 self.logger.info("Registering publisher topic [%s] type [%s] with master", resolved_name, data_type) 236 code, msg, val = master.registerPublisher(caller_id, resolved_name, data_type, uri) 237 if code != 1: 238 logfatal("cannot register publication topic [%s] with master: %s"%(resolved_name, msg)) 239 signal_shutdown("master/node incompatibility with register publisher") 240 for resolved_name, data_type in sub: 241 self.logger.info("registering subscriber topic [%s] type [%s] with master", resolved_name, data_type) 242 code, msg, val = master.registerSubscriber(caller_id, resolved_name, data_type, uri) 243 if code != 1: 244 logfatal("cannot register subscription topic [%s] with master: %s"%(resolved_name, msg)) 245 signal_shutdown("master/node incompatibility with register subscriber") 246 else: 247 self.publisher_update(resolved_name, val) 248 for resolved_name, service_uri in srv: 249 self.logger.info("registering service [%s] uri [%s] with master", resolved_name, service_uri) 250 code, msg, val = master.registerService(caller_id, resolved_name, service_uri, uri) 251 if code != 1: 252 logfatal("cannot register service [%s] with master: %s"%(resolved_name, msg)) 253 signal_shutdown("master/node incompatibility with register service") 254 255 registered = True 256 257 # Subscribe to updates to our state 258 get_registration_listeners().add_listener(self) 259 finally: 260 sm.lock.release() 261 tm.lock.release() 262 263 if pub or sub: 264 logdebug("Registered [%s] with master node %s", caller_id, master_uri) 265 else: 266 logdebug("No topics to register with master node %s", master_uri) 267 268 except Exception as e: 269 if first: 270 # this use to print to console always, arguable whether or not this should be subjected to same configuration options as logging 271 logerr("Unable to immediately register with master node [%s]: master may not be running yet. Will keep trying."%master_uri) 272 first = False 273 time.sleep(0.2) 274 self.registered = True 275 self.run()
276
277 - def is_registered(self):
278 """ 279 Check if Node has been registered yet. 280 @return: True if registration has occurred with master 281 @rtype: bool 282 """ 283 return self.registered
284
285 - def run(self):
286 """ 287 Main RegManager thread loop. 288 Periodically checks the update 289 queue and generates topic connections 290 """ 291 #Connect the topics 292 while not self.handler.done and not is_shutdown(): 293 cond = self.cond 294 try: 295 cond.acquire() 296 if not self.updates: 297 cond.wait(0.5) 298 if self.updates: 299 #work from the end as these are the most up-to-date 300 topic, uris = self.updates.pop() 301 #filter out older updates for same topic 302 self.updates = [x for x in self.updates if x[0] != topic] 303 else: 304 topic = uris = None 305 finally: 306 if cond is not None: 307 cond.release() 308 309 #call _connect_topic on all URIs as it can check to see whether 310 #or not a connection exists. 311 if uris and not self.handler.done: 312 for uri in uris: 313 # #1141: have to multithread this to prevent a bad publisher from hanging us 314 t = threading.Thread(target=self._connect_topic_thread, args=(topic, uri)) 315 t.setDaemon(True) 316 t.start()
317
318 - def _connect_topic_thread(self, topic, uri):
319 try: 320 code, msg, _ = self.handler._connect_topic(topic, uri) 321 if code != 1: 322 logdebug("Unable to connect subscriber to publisher [%s] for topic [%s]: %s", uri, topic, msg) 323 except Exception as e: 324 if not is_shutdown(): 325 logdebug("Unable to connect to publisher [%s] for topic [%s]: %s"%(uri, topic, traceback.format_exc()))
326
327 - def cleanup(self, reason):
328 """ 329 Cleans up registrations with master and releases topic and service resources 330 @param reason: human-reasonable debug string 331 @type reason: str 332 """ 333 self.logger.debug("registration cleanup starting") 334 try: 335 self.cond.acquire() 336 self.cond.notifyAll() 337 finally: 338 self.cond.release() 339 340 # we never successfully initialized master_uri 341 if not self.master_uri: 342 return 343 344 master = xmlrpcapi(self.master_uri) 345 # we never successfully initialized master 346 if master is None: 347 return 348 349 caller_id = get_caller_id() 350 351 # clear the registration listeners as we are going to do a quick unregister here 352 rl = get_registration_listeners() 353 if rl is not None: 354 rl.clear() 355 356 tm = get_topic_manager() 357 sm = get_service_manager() 358 try: 359 multi = xmlrpcclient.MultiCall(master) 360 if tm is not None: 361 for resolved_name, _ in tm.get_subscriptions(): 362 self.logger.debug("unregisterSubscriber [%s]"%resolved_name) 363 multi.unregisterSubscriber(caller_id, resolved_name, self.uri) 364 for resolved_name, _ in tm.get_publications(): 365 self.logger.debug("unregisterPublisher [%s]"%resolved_name) 366 multi.unregisterPublisher(caller_id, resolved_name, self.uri) 367 368 if sm is not None: 369 for resolved_name, service_uri in sm.get_services(): 370 self.logger.debug("unregisterService [%s]"%resolved_name) 371 multi.unregisterService(caller_id, resolved_name, service_uri) 372 multi() 373 except socket.error as se: 374 (errno, msg) = se.args 375 if errno == 111 or errno == 61: #can't talk to master, nothing we can do about it 376 self.logger.warn("cannot unregister with master due to network issues") 377 else: 378 self.logger.warn("unclean shutdown\n%s"%traceback.format_exc()) 379 except: 380 self.logger.warn("unclean shutdown\n%s"%traceback.format_exc()) 381 382 self.logger.debug("registration cleanup: master calls complete") 383 384 #TODO: cleanup() should actually be orchestrated by a separate 385 #cleanup routine that calls the reg manager/sm/tm 386 if tm is not None: 387 tm.close_all() 388 if sm is not None: 389 sm.unregister_all()
390
391 - def reg_removed(self, resolved_name, data_type_or_uri, reg_type):
392 """ 393 RegistrationListener callback 394 @param resolved_name: resolved name of topic or service 395 @type resolved_name: str 396 @param data_type_or_uri: either the data type (for topic regs) or the service URI (for service regs). 397 @type data_type_or_uri: str 398 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 399 @type reg_type: str 400 """ 401 master_uri = self.master_uri 402 if not master_uri: 403 self.logger.error("Registrar: master_uri is not set yet, cannot inform master of deregistration") 404 else: 405 try: 406 master = xmlrpcapi(master_uri) 407 if reg_type == Registration.PUB: 408 self.logger.debug("unregisterPublisher(%s, %s)", resolved_name, self.uri) 409 master.unregisterPublisher(get_caller_id(), resolved_name, self.uri) 410 elif reg_type == Registration.SUB: 411 self.logger.debug("unregisterSubscriber(%s, %s)", resolved_name, data_type_or_uri) 412 master.unregisterSubscriber(get_caller_id(), resolved_name, self.uri) 413 elif reg_type == Registration.SRV: 414 self.logger.debug("unregisterService(%s, %s)", resolved_name, data_type_or_uri) 415 master.unregisterService(get_caller_id(), resolved_name, data_type_or_uri) 416 except: 417 logwarn("unable to communicate with ROS Master, registrations are now out of sync") 418 self.logger.error(traceback.format_exc())
419
420 - def reg_added(self, resolved_name, data_type_or_uri, reg_type):
421 """ 422 RegistrationListener callback 423 @param resolved_name: resolved name of topic or service 424 @type resolved_name: str 425 @param data_type_or_uri: either the data type (for topic regs) or the service URI (for service regs). 426 @type data_type_or_uri: str 427 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 428 @type reg_type: str 429 """ 430 #TODO: this needs to be made robust to master outages 431 master_uri = self.master_uri 432 if not master_uri: 433 self.logger.error("Registrar: master_uri is not set yet, cannot inform master of registration") 434 else: 435 master = xmlrpcapi(master_uri) 436 args = (get_caller_id(), resolved_name, data_type_or_uri, self.uri) 437 registered = False 438 first = True 439 while not registered and not is_shutdown(): 440 try: 441 if reg_type == Registration.PUB: 442 self.logger.debug("master.registerPublisher(%s, %s, %s, %s)"%args) 443 code, msg, val = master.registerPublisher(*args) 444 if code != 1: 445 logfatal("unable to register publication [%s] with master: %s"%(resolved_name, msg)) 446 elif reg_type == Registration.SUB: 447 self.logger.debug("master.registerSubscriber(%s, %s, %s, %s)"%args) 448 code, msg, val = master.registerSubscriber(*args) 449 if code == 1: 450 self.publisher_update(resolved_name, val) 451 else: 452 # this is potentially worth exiting over. in the future may want to add a retry 453 # timer 454 logfatal("unable to register subscription [%s] with master: %s"%(resolved_name, msg)) 455 elif reg_type == Registration.SRV: 456 self.logger.debug("master.registerService(%s, %s, %s, %s)"%args) 457 code, msg, val = master.registerService(*args) 458 if code != 1: 459 logfatal("unable to register service [%s] with master: %s"%(resolved_name, msg)) 460 461 registered = True 462 except Exception as e: 463 if first: 464 msg = "Unable to register with master node [%s]: master may not be running yet. Will keep trying."%master_uri 465 self.logger.error(str(e)+"\n"+msg) 466 print(msg) 467 first = False 468 time.sleep(0.2)
469
470 - def publisher_update(self, resolved_name, uris):
471 """ 472 Inform psmanager of latest publisher list for a topic. This 473 will cause L{RegManager} to create a topic connection for all new 474 publishers (in a separate thread). 475 @param resolved_name: resolved topic name 476 @type resolved_name: str 477 @param uris: list of all publishers uris for topic 478 @type uris: [str] 479 """ 480 try: 481 self.cond.acquire() 482 self.updates.append((resolved_name, uris)) 483 self.cond.notifyAll() 484 finally: 485 self.cond.release()
486