Package rospy :: Package impl :: Module registration

Source Code for Module rospy.impl.registration

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  """Internal use: handles maintaining registrations with master via internal listener APIs""" 
 36   
 37   
 38   
 39  import socket 
 40  import sys 
 41  import logging 
 42  import threading 
 43  import time 
 44  import traceback 
 45  try: 
 46      import xmlrpc.client as xmlrpcclient 
 47  except ImportError: 
 48      import xmlrpclib as xmlrpcclient 
 49   
 50  from rospy.core import is_shutdown, is_shutdown_requested, xmlrpcapi, \ 
 51      logfatal, logwarn, loginfo, logerr, logdebug, \ 
 52      signal_shutdown, add_preshutdown_hook 
 53  from rospy.names import get_caller_id, get_namespace 
 54   
 55  # topic manager and service manager singletons 
 56   
 57  _topic_manager = None 
58 -def set_topic_manager(tm):
59 global _topic_manager 60 _topic_manager = tm
61 -def get_topic_manager():
62 return _topic_manager
63 64 _service_manager = None
65 -def set_service_manager(sm):
66 global _service_manager 67 _service_manager = sm
68 -def get_service_manager():
69 return _service_manager
70 71
72 -class Registration(object):
73 """Registration types""" 74 PUB = 'pub' 75 SUB = 'sub' 76 SRV = 'srv'
77
78 -class RegistrationListener(object):
79 """Listener API for subscribing to changes in Publisher/Subscriber/Service declarations""" 80
81 - def reg_added(self, resolved_name, data_type_or_uri, reg_type):
82 """ 83 New pub/sub/service declared. 84 @param resolved_name: resolved topic/service name 85 @param data_type_or_uri: topic type or service uri 86 @type data_type_or_uri: str 87 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 88 @type reg_type: str 89 """ 90 pass
91
92 - def reg_removed(self, resolved_name, data_type_or_uri, reg_type):
93 """ 94 New pub/sub/service removed. 95 @param resolved_name: topic/service name 96 @type resolved_name: str 97 @param data_type_or_uri: topic type or service uri 98 @type data_type_or_uri: str 99 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 100 @type reg_type: str 101 """ 102 pass 103
104 -class RegistrationListeners(object):
105
106 - def __init__(self):
107 """ 108 ctor. 109 """ 110 self.listeners = [] 111 self.lock = threading.Lock()
112
113 - def add_listener(self, l):
114 """ 115 Subscribe to notifications of pub/sub/service registration 116 changes. This is an internal API used to notify higher level 117 routines when to communicate with the master. 118 @param l: listener to subscribe 119 @type l: TopicListener 120 """ 121 assert isinstance(l, RegistrationListener) 122 with self.lock: 123 self.listeners.append(l)
124
125 - def notify_removed(self, resolved_name, data_type_or_uri, reg_type):
126 """ 127 @param resolved_name: resolved_topic/service name 128 @type resolved_name: str 129 @param data_type_or_uri: topic type or service uri 130 @type data_type_or_uri: str 131 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 132 @type reg_type: str 133 """ 134 with self.lock: 135 for l in self.listeners: 136 try: 137 l.reg_removed(resolved_name, data_type_or_uri, reg_type) 138 except Exception as e: 139 logerr("error notifying listener of removal: %s"%traceback.format_exc())
140
141 - def notify_added(self, resolved_name, data_type, reg_type):
142 """ 143 @param resolved_name: topic/service name 144 @type resolved_name: str 145 @param data_type: topic/service type 146 @type data_type: str 147 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 148 @type reg_type: str 149 """ 150 with self.lock: 151 for l in self.listeners: 152 try: 153 l.reg_added(resolved_name, data_type, reg_type) 154 except Exception as e: 155 logerr(traceback.format_exc())
156
157 - def clear(self):
158 """ 159 Remove all registration listeners 160 """ 161 if not is_shutdown_requested(): 162 with self.lock: 163 del self.listeners[:] 164 else: 165 # when being in shutdown phase the lock might not be lockable 166 # if a notify_added/removed is currently ongoing 167 locked = self.lock.acquire(False) 168 # remove all listeners anyway 169 del self.listeners[:] 170 if locked: 171 self.lock.release()
172 173 _registration_listeners = RegistrationListeners()
174 -def get_registration_listeners():
175 return _registration_listeners
176 177 # RegManager's main purpose is to collect all client->master communication in one place 178
179 -class RegManager(RegistrationListener):
180 """ 181 Registration manager used by Node implemenation. 182 Communicates with ROS Master to maintain topic registration 183 information. Also responds to publisher updates to create topic 184 connections 185 """ 186
187 - def __init__(self, handler):
188 """ 189 ctor. 190 @param handler: node API handler 191 """ 192 self.logger = logging.getLogger("rospy.registration") 193 self.handler = handler 194 self.uri = self.master_uri = None 195 self.updates = [] 196 self.cond = threading.Condition() #for locking/notifying updates 197 self.registered = False 198 # cleanup has to occur before official shutdown 199 add_preshutdown_hook(self.cleanup)
200
201 - def start(self, uri, master_uri):
202 """ 203 Start the RegManager. This should be passed in as an argument to a thread 204 starter as the RegManager is designed to spin in its own thread 205 @param uri: URI of local node 206 @type uri: str 207 @param master_uri: Master URI 208 @type master_uri: str 209 """ 210 self.registered = False 211 self.master_uri = master_uri 212 self.uri = uri 213 first = True 214 tm = get_topic_manager() 215 sm = get_service_manager() 216 ns = get_namespace() 217 caller_id = get_caller_id() 218 if not master_uri or master_uri == uri: 219 registered = True 220 master = None 221 else: 222 registered = False 223 master = xmlrpcapi(master_uri) 224 self.logger.info("Registering with master node %s", master_uri) 225 226 while not registered and not is_shutdown(): 227 try: 228 try: 229 # prevent TopicManager and ServiceManager from accepting registrations until we are done 230 tm.lock.acquire() 231 sm.lock.acquire() 232 233 pub, sub, srv = tm.get_publications(), tm.get_subscriptions(), sm.get_services() 234 for resolved_name, data_type in pub: 235 self.logger.info("Registering publisher topic [%s] type [%s] with master", resolved_name, data_type) 236 code, msg, val = master.registerPublisher(caller_id, resolved_name, data_type, uri) 237 if code != 1: 238 logfatal("cannot register publication topic [%s] with master: %s"%(resolved_name, msg)) 239 signal_shutdown("master/node incompatibility with register publisher") 240 for resolved_name, data_type in sub: 241 self.logger.info("registering subscriber topic [%s] type [%s] with master", resolved_name, data_type) 242 code, msg, val = master.registerSubscriber(caller_id, resolved_name, data_type, uri) 243 if code != 1: 244 logfatal("cannot register subscription topic [%s] with master: %s"%(resolved_name, msg)) 245 signal_shutdown("master/node incompatibility with register subscriber") 246 else: 247 self.publisher_update(resolved_name, val) 248 for resolved_name, service_uri in srv: 249 self.logger.info("registering service [%s] uri [%s] with master", resolved_name, service_uri) 250 code, msg, val = master.registerService(caller_id, resolved_name, service_uri, uri) 251 if code != 1: 252 logfatal("cannot register service [%s] with master: %s"%(resolved_name, msg)) 253 signal_shutdown("master/node incompatibility with register service") 254 255 registered = True 256 257 # Subscribe to updates to our state 258 get_registration_listeners().add_listener(self) 259 finally: 260 sm.lock.release() 261 tm.lock.release() 262 263 if pub or sub: 264 logdebug("Registered [%s] with master node %s", caller_id, master_uri) 265 else: 266 logdebug("No topics to register with master node %s", master_uri) 267 268 except Exception as e: 269 if first: 270 # this use to print to console always, arguable whether or not this should be subjected to same configuration options as logging 271 logerr("Unable to immediately register with master node [%s]: master may not be running yet. Will keep trying."%master_uri) 272 first = False 273 time.sleep(0.2) 274 self.registered = True 275 self.run()
276
277 - def is_registered(self):
278 """ 279 Check if Node has been registered yet. 280 @return: True if registration has occurred with master 281 @rtype: bool 282 """ 283 return self.registered
284
285 - def run(self):
286 """ 287 Main RegManager thread loop. 288 Periodically checks the update 289 queue and generates topic connections 290 """ 291 #Connect the topics 292 while not self.handler.done and not is_shutdown(): 293 cond = self.cond 294 try: 295 cond.acquire() 296 if not self.updates: 297 cond.wait(0.5) 298 if self.updates: 299 #work from the end as these are the most up-to-date 300 topic, uris = self.updates.pop() 301 #filter out older updates for same topic 302 self.updates = [x for x in self.updates if x[0] != topic] 303 else: 304 topic = uris = None 305 finally: 306 if cond is not None: 307 cond.release() 308 309 get_topic_manager().check_all() 310 311 #call _connect_topic on all URIs as it can check to see whether 312 #or not a connection exists. 313 if uris and not self.handler.done: 314 for uri in uris: 315 # #1141: have to multithread this to prevent a bad publisher from hanging us 316 t = threading.Thread(target=self._connect_topic_thread, args=(topic, uri)) 317 t.setDaemon(True) 318 t.start()
319
320 - def _connect_topic_thread(self, topic, uri):
321 try: 322 code, msg, _ = self.handler._connect_topic(topic, uri) 323 if code != 1: 324 logdebug("Unable to connect subscriber to publisher [%s] for topic [%s]: %s", uri, topic, msg) 325 except Exception as e: 326 if not is_shutdown(): 327 logdebug("Unable to connect to publisher [%s] for topic [%s]: %s"%(uri, topic, traceback.format_exc()))
328
329 - def cleanup(self, reason):
330 """ 331 Cleans up registrations with master and releases topic and service resources 332 @param reason: human-reasonable debug string 333 @type reason: str 334 """ 335 self.logger.debug("registration cleanup starting") 336 try: 337 self.cond.acquire() 338 self.cond.notifyAll() 339 finally: 340 self.cond.release() 341 342 # we never successfully initialized master_uri 343 if not self.master_uri: 344 return 345 346 master = xmlrpcapi(self.master_uri) 347 # we never successfully initialized master 348 if master is None: 349 return 350 351 caller_id = get_caller_id() 352 353 # clear the registration listeners as we are going to do a quick unregister here 354 rl = get_registration_listeners() 355 if rl is not None: 356 rl.clear() 357 358 tm = get_topic_manager() 359 sm = get_service_manager() 360 try: 361 multi = xmlrpcclient.MultiCall(master) 362 if tm is not None: 363 for resolved_name, _ in tm.get_subscriptions(): 364 self.logger.debug("unregisterSubscriber [%s]"%resolved_name) 365 multi.unregisterSubscriber(caller_id, resolved_name, self.uri) 366 for resolved_name, _ in tm.get_publications(): 367 self.logger.debug("unregisterPublisher [%s]"%resolved_name) 368 multi.unregisterPublisher(caller_id, resolved_name, self.uri) 369 370 if sm is not None: 371 for resolved_name, service_uri in sm.get_services(): 372 self.logger.debug("unregisterService [%s]"%resolved_name) 373 multi.unregisterService(caller_id, resolved_name, service_uri) 374 multi() 375 except socket.error as se: 376 (errno, msg) = se.args 377 if errno == 111 or errno == 61: #can't talk to master, nothing we can do about it 378 self.logger.warn("cannot unregister with master due to network issues") 379 else: 380 self.logger.warn("unclean shutdown\n%s"%traceback.format_exc()) 381 except: 382 self.logger.warn("unclean shutdown\n%s"%traceback.format_exc()) 383 384 self.logger.debug("registration cleanup: master calls complete") 385 386 #TODO: cleanup() should actually be orchestrated by a separate 387 #cleanup routine that calls the reg manager/sm/tm 388 if tm is not None: 389 tm.close_all() 390 if sm is not None: 391 sm.unregister_all()
392
393 - def reg_removed(self, resolved_name, data_type_or_uri, reg_type):
394 """ 395 RegistrationListener callback 396 @param resolved_name: resolved name of topic or service 397 @type resolved_name: str 398 @param data_type_or_uri: either the data type (for topic regs) or the service URI (for service regs). 399 @type data_type_or_uri: str 400 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 401 @type reg_type: str 402 """ 403 master_uri = self.master_uri 404 if not master_uri: 405 self.logger.error("Registrar: master_uri is not set yet, cannot inform master of deregistration") 406 else: 407 try: 408 master = xmlrpcapi(master_uri) 409 if reg_type == Registration.PUB: 410 self.logger.debug("unregisterPublisher(%s, %s)", resolved_name, self.uri) 411 master.unregisterPublisher(get_caller_id(), resolved_name, self.uri) 412 elif reg_type == Registration.SUB: 413 self.logger.debug("unregisterSubscriber(%s, %s)", resolved_name, data_type_or_uri) 414 master.unregisterSubscriber(get_caller_id(), resolved_name, self.uri) 415 elif reg_type == Registration.SRV: 416 self.logger.debug("unregisterService(%s, %s)", resolved_name, data_type_or_uri) 417 master.unregisterService(get_caller_id(), resolved_name, data_type_or_uri) 418 except: 419 logwarn("unable to communicate with ROS Master, registrations are now out of sync") 420 self.logger.error(traceback.format_exc())
421
422 - def reg_added(self, resolved_name, data_type_or_uri, reg_type):
423 """ 424 RegistrationListener callback 425 @param resolved_name: resolved name of topic or service 426 @type resolved_name: str 427 @param data_type_or_uri: either the data type (for topic regs) or the service URI (for service regs). 428 @type data_type_or_uri: str 429 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 430 @type reg_type: str 431 """ 432 #TODO: this needs to be made robust to master outages 433 master_uri = self.master_uri 434 if not master_uri: 435 self.logger.error("Registrar: master_uri is not set yet, cannot inform master of registration") 436 else: 437 master = xmlrpcapi(master_uri) 438 args = (get_caller_id(), resolved_name, data_type_or_uri, self.uri) 439 registered = False 440 first = True 441 while not registered and not is_shutdown(): 442 try: 443 if reg_type == Registration.PUB: 444 self.logger.debug("master.registerPublisher(%s, %s, %s, %s)"%args) 445 code, msg, val = master.registerPublisher(*args) 446 if code != 1: 447 logfatal("unable to register publication [%s] with master: %s"%(resolved_name, msg)) 448 elif reg_type == Registration.SUB: 449 self.logger.debug("master.registerSubscriber(%s, %s, %s, %s)"%args) 450 code, msg, val = master.registerSubscriber(*args) 451 if code == 1: 452 self.publisher_update(resolved_name, val) 453 else: 454 # this is potentially worth exiting over. in the future may want to add a retry 455 # timer 456 logfatal("unable to register subscription [%s] with master: %s"%(resolved_name, msg)) 457 elif reg_type == Registration.SRV: 458 self.logger.debug("master.registerService(%s, %s, %s, %s)"%args) 459 code, msg, val = master.registerService(*args) 460 if code != 1: 461 logfatal("unable to register service [%s] with master: %s"%(resolved_name, msg)) 462 463 registered = True 464 except Exception as e: 465 if first: 466 msg = "Unable to register with master node [%s]: master may not be running yet. Will keep trying."%master_uri 467 self.logger.error(str(e)+"\n"+msg) 468 print(msg) 469 first = False 470 time.sleep(0.2)
471
472 - def publisher_update(self, resolved_name, uris):
473 """ 474 Inform psmanager of latest publisher list for a topic. This 475 will cause L{RegManager} to create a topic connection for all new 476 publishers (in a separate thread). 477 @param resolved_name: resolved topic name 478 @type resolved_name: str 479 @param uris: list of all publishers uris for topic 480 @type uris: [str] 481 """ 482 try: 483 self.cond.acquire() 484 self.updates.append((resolved_name, uris)) 485 self.cond.notifyAll() 486 finally: 487 self.cond.release()
488