Package rospy :: Package impl :: Module registration

Source Code for Module rospy.impl.registration

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  """Internal use: handles maintaining registrations with master via internal listener APIs""" 
 36   
 37   
 38   
 39  import errno 
 40  import socket 
 41  import sys 
 42  import logging 
 43  import threading 
 44  import time 
 45  import traceback 
 46  try: 
 47      import xmlrpc.client as xmlrpcclient 
 48  except ImportError: 
 49      import xmlrpclib as xmlrpcclient 
 50   
 51  from rospy.core import is_shutdown, is_shutdown_requested, xmlrpcapi, \ 
 52      logfatal, logwarn, loginfo, logerr, logdebug, \ 
 53      signal_shutdown, add_preshutdown_hook 
 54  from rospy.names import get_caller_id, get_namespace 
 55   
 56  # topic manager and service manager singletons 
 57   
 58  _topic_manager = None 
59 -def set_topic_manager(tm):
60 global _topic_manager 61 _topic_manager = tm
62 -def get_topic_manager():
63 return _topic_manager
64 65 _service_manager = None
66 -def set_service_manager(sm):
67 global _service_manager 68 _service_manager = sm
69 -def get_service_manager():
70 return _service_manager
71 72
73 -class Registration(object):
74 """Registration types""" 75 PUB = 'pub' 76 SUB = 'sub' 77 SRV = 'srv'
78
79 -class RegistrationListener(object):
80 """Listener API for subscribing to changes in Publisher/Subscriber/Service declarations""" 81
82 - def reg_added(self, resolved_name, data_type_or_uri, reg_type):
83 """ 84 New pub/sub/service declared. 85 @param resolved_name: resolved topic/service name 86 @param data_type_or_uri: topic type or service uri 87 @type data_type_or_uri: str 88 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 89 @type reg_type: str 90 """ 91 pass
92
93 - def reg_removed(self, resolved_name, data_type_or_uri, reg_type):
94 """ 95 New pub/sub/service removed. 96 @param resolved_name: topic/service name 97 @type resolved_name: str 98 @param data_type_or_uri: topic type or service uri 99 @type data_type_or_uri: str 100 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 101 @type reg_type: str 102 """ 103 pass 104
105 -class RegistrationListeners(object):
106
107 - def __init__(self):
108 """ 109 ctor. 110 """ 111 self.listeners = [] 112 self.lock = threading.Lock()
113
114 - def add_listener(self, l):
115 """ 116 Subscribe to notifications of pub/sub/service registration 117 changes. This is an internal API used to notify higher level 118 routines when to communicate with the master. 119 @param l: listener to subscribe 120 @type l: TopicListener 121 """ 122 assert isinstance(l, RegistrationListener) 123 with self.lock: 124 self.listeners.append(l)
125
126 - def notify_removed(self, resolved_name, data_type_or_uri, reg_type):
127 """ 128 @param resolved_name: resolved_topic/service name 129 @type resolved_name: str 130 @param data_type_or_uri: topic type or service uri 131 @type data_type_or_uri: str 132 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 133 @type reg_type: str 134 """ 135 with self.lock: 136 for l in self.listeners: 137 try: 138 l.reg_removed(resolved_name, data_type_or_uri, reg_type) 139 except Exception as e: 140 logerr("error notifying listener of removal: %s"%traceback.format_exc())
141
142 - def notify_added(self, resolved_name, data_type, reg_type):
143 """ 144 @param resolved_name: topic/service name 145 @type resolved_name: str 146 @param data_type: topic/service type 147 @type data_type: str 148 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 149 @type reg_type: str 150 """ 151 with self.lock: 152 for l in self.listeners: 153 try: 154 l.reg_added(resolved_name, data_type, reg_type) 155 except Exception as e: 156 logerr(traceback.format_exc())
157
158 - def clear(self):
159 """ 160 Remove all registration listeners 161 """ 162 if not is_shutdown_requested(): 163 with self.lock: 164 del self.listeners[:] 165 else: 166 # when being in shutdown phase the lock might not be lockable 167 # if a notify_added/removed is currently ongoing 168 locked = self.lock.acquire(False) 169 # remove all listeners anyway 170 del self.listeners[:] 171 if locked: 172 self.lock.release()
173 174 _registration_listeners = RegistrationListeners()
175 -def get_registration_listeners():
176 return _registration_listeners
177 178 # RegManager's main purpose is to collect all client->master communication in one place 179
180 -class RegManager(RegistrationListener):
181 """ 182 Registration manager used by Node implemenation. 183 Communicates with ROS Master to maintain topic registration 184 information. Also responds to publisher updates to create topic 185 connections 186 """ 187
188 - def __init__(self, handler):
189 """ 190 ctor. 191 @param handler: node API handler 192 """ 193 self.logger = logging.getLogger("rospy.registration") 194 self.handler = handler 195 self.uri = self.master_uri = None 196 self.updates = [] 197 self.cond = threading.Condition() #for locking/notifying updates 198 self.registered = False 199 # cleanup has to occur before official shutdown 200 add_preshutdown_hook(self.cleanup)
201
202 - def start(self, uri, master_uri):
203 """ 204 Start the RegManager. This should be passed in as an argument to a thread 205 starter as the RegManager is designed to spin in its own thread 206 @param uri: URI of local node 207 @type uri: str 208 @param master_uri: Master URI 209 @type master_uri: str 210 """ 211 self.registered = False 212 self.master_uri = master_uri 213 self.uri = uri 214 first = True 215 tm = get_topic_manager() 216 sm = get_service_manager() 217 ns = get_namespace() 218 caller_id = get_caller_id() 219 if not master_uri or master_uri == uri: 220 registered = True 221 master = None 222 else: 223 registered = False 224 master = xmlrpcapi(master_uri) 225 self.logger.info("Registering with master node %s", master_uri) 226 227 while not registered and not is_shutdown(): 228 try: 229 try: 230 # prevent TopicManager and ServiceManager from accepting registrations until we are done 231 tm.lock.acquire() 232 sm.lock.acquire() 233 234 pub, sub, srv = tm.get_publications(), tm.get_subscriptions(), sm.get_services() 235 for resolved_name, data_type in pub: 236 self.logger.info("Registering publisher topic [%s] type [%s] with master", resolved_name, data_type) 237 code, msg, val = master.registerPublisher(caller_id, resolved_name, data_type, uri) 238 if code != 1: 239 logfatal("cannot register publication topic [%s] with master: %s"%(resolved_name, msg)) 240 signal_shutdown("master/node incompatibility with register publisher") 241 for resolved_name, data_type in sub: 242 self.logger.info("registering subscriber topic [%s] type [%s] with master", resolved_name, data_type) 243 code, msg, val = master.registerSubscriber(caller_id, resolved_name, data_type, uri) 244 if code != 1: 245 logfatal("cannot register subscription topic [%s] with master: %s"%(resolved_name, msg)) 246 signal_shutdown("master/node incompatibility with register subscriber") 247 else: 248 self.publisher_update(resolved_name, val) 249 for resolved_name, service_uri in srv: 250 self.logger.info("registering service [%s] uri [%s] with master", resolved_name, service_uri) 251 code, msg, val = master.registerService(caller_id, resolved_name, service_uri, uri) 252 if code != 1: 253 logfatal("cannot register service [%s] with master: %s"%(resolved_name, msg)) 254 signal_shutdown("master/node incompatibility with register service") 255 256 registered = True 257 258 # Subscribe to updates to our state 259 get_registration_listeners().add_listener(self) 260 finally: 261 sm.lock.release() 262 tm.lock.release() 263 264 if pub or sub: 265 logdebug("Registered [%s] with master node %s", caller_id, master_uri) 266 else: 267 logdebug("No topics to register with master node %s", master_uri) 268 269 except Exception as e: 270 if first: 271 # this use to print to console always, arguable whether or not this should be subjected to same configuration options as logging 272 logerr("Unable to immediately register with master node [%s]: master may not be running yet. Will keep trying."%master_uri) 273 first = False 274 time.sleep(0.2) 275 self.registered = True 276 self.run()
277
278 - def is_registered(self):
279 """ 280 Check if Node has been registered yet. 281 @return: True if registration has occurred with master 282 @rtype: bool 283 """ 284 return self.registered
285
286 - def run(self):
287 """ 288 Main RegManager thread loop. 289 Periodically checks the update 290 queue and generates topic connections 291 """ 292 #Connect the topics 293 while not self.handler.done and not is_shutdown(): 294 cond = self.cond 295 try: 296 cond.acquire() 297 if not self.updates: 298 cond.wait(0.5) 299 if self.updates: 300 #work from the end as these are the most up-to-date 301 topic, uris = self.updates.pop() 302 #filter out older updates for same topic 303 self.updates = [x for x in self.updates if x[0] != topic] 304 else: 305 topic = uris = None 306 finally: 307 if cond is not None: 308 cond.release() 309 310 get_topic_manager().check_all() 311 312 #call _connect_topic on all URIs as it can check to see whether 313 #or not a connection exists. 314 if uris and not self.handler.done: 315 for uri in uris: 316 # #1141: have to multithread this to prevent a bad publisher from hanging us 317 t = threading.Thread(target=self._connect_topic_thread, args=(topic, uri)) 318 t.daemon = True 319 t.start()
320
321 - def _connect_topic_thread(self, topic, uri):
322 try: 323 code, msg, _ = self.handler._connect_topic(topic, uri) 324 if code != 1: 325 logdebug("Unable to connect subscriber to publisher [%s] for topic [%s]: %s", uri, topic, msg) 326 except Exception as e: 327 if not is_shutdown(): 328 logdebug("Unable to connect to publisher [%s] for topic [%s]: %s"%(uri, topic, traceback.format_exc()))
329
330 - def cleanup(self, reason):
331 """ 332 Cleans up registrations with master and releases topic and service resources 333 @param reason: human-reasonable debug string 334 @type reason: str 335 """ 336 self.logger.debug("registration cleanup starting") 337 try: 338 self.cond.acquire() 339 self.cond.notifyAll() 340 finally: 341 self.cond.release() 342 343 # we never successfully initialized master_uri 344 if not self.master_uri: 345 return 346 347 master = xmlrpcapi(self.master_uri) 348 # we never successfully initialized master 349 if master is None: 350 return 351 352 caller_id = get_caller_id() 353 354 # clear the registration listeners as we are going to do a quick unregister here 355 rl = get_registration_listeners() 356 if rl is not None: 357 rl.clear() 358 359 tm = get_topic_manager() 360 sm = get_service_manager() 361 try: 362 multi = xmlrpcclient.MultiCall(master) 363 if tm is not None: 364 for resolved_name, _ in tm.get_subscriptions(): 365 self.logger.debug("unregisterSubscriber [%s]"%resolved_name) 366 multi.unregisterSubscriber(caller_id, resolved_name, self.uri) 367 for resolved_name, _ in tm.get_publications(): 368 self.logger.debug("unregisterPublisher [%s]"%resolved_name) 369 multi.unregisterPublisher(caller_id, resolved_name, self.uri) 370 371 if sm is not None: 372 for resolved_name, service_uri in sm.get_services(): 373 self.logger.debug("unregisterService [%s]"%resolved_name) 374 multi.unregisterService(caller_id, resolved_name, service_uri) 375 multi() 376 except socket.error as se: 377 (se_errno, msg) = se.args 378 if se_errno == errno.ECONNREFUSED or se_errno == errno.ENODATA: #can't talk to master, nothing we can do about it 379 self.logger.warn("cannot unregister with master due to network issues") 380 else: 381 self.logger.warn("unclean shutdown\n%s"%traceback.format_exc()) 382 except: 383 self.logger.warn("unclean shutdown\n%s"%traceback.format_exc()) 384 385 self.logger.debug("registration cleanup: master calls complete") 386 387 #TODO: cleanup() should actually be orchestrated by a separate 388 #cleanup routine that calls the reg manager/sm/tm 389 if tm is not None: 390 tm.close_all() 391 if sm is not None: 392 sm.unregister_all()
393
394 - def reg_removed(self, resolved_name, data_type_or_uri, reg_type):
395 """ 396 RegistrationListener callback 397 @param resolved_name: resolved name of topic or service 398 @type resolved_name: str 399 @param data_type_or_uri: either the data type (for topic regs) or the service URI (for service regs). 400 @type data_type_or_uri: str 401 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 402 @type reg_type: str 403 """ 404 master_uri = self.master_uri 405 if not master_uri: 406 self.logger.error("Registrar: master_uri is not set yet, cannot inform master of deregistration") 407 else: 408 try: 409 master = xmlrpcapi(master_uri) 410 if reg_type == Registration.PUB: 411 self.logger.debug("unregisterPublisher(%s, %s)", resolved_name, self.uri) 412 master.unregisterPublisher(get_caller_id(), resolved_name, self.uri) 413 elif reg_type == Registration.SUB: 414 self.logger.debug("unregisterSubscriber(%s, %s)", resolved_name, data_type_or_uri) 415 master.unregisterSubscriber(get_caller_id(), resolved_name, self.uri) 416 elif reg_type == Registration.SRV: 417 self.logger.debug("unregisterService(%s, %s)", resolved_name, data_type_or_uri) 418 master.unregisterService(get_caller_id(), resolved_name, data_type_or_uri) 419 except: 420 logwarn("unable to communicate with ROS Master, registrations are now out of sync") 421 self.logger.error(traceback.format_exc())
422
423 - def reg_added(self, resolved_name, data_type_or_uri, reg_type):
424 """ 425 RegistrationListener callback 426 @param resolved_name: resolved name of topic or service 427 @type resolved_name: str 428 @param data_type_or_uri: either the data type (for topic regs) or the service URI (for service regs). 429 @type data_type_or_uri: str 430 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 431 @type reg_type: str 432 """ 433 #TODO: this needs to be made robust to master outages 434 master_uri = self.master_uri 435 if not master_uri: 436 self.logger.error("Registrar: master_uri is not set yet, cannot inform master of registration") 437 else: 438 master = xmlrpcapi(master_uri) 439 args = (get_caller_id(), resolved_name, data_type_or_uri, self.uri) 440 registered = False 441 first = True 442 while not registered and not is_shutdown(): 443 try: 444 if reg_type == Registration.PUB: 445 self.logger.debug("master.registerPublisher(%s, %s, %s, %s)"%args) 446 code, msg, val = master.registerPublisher(*args) 447 if code != 1: 448 logfatal("unable to register publication [%s] with master: %s"%(resolved_name, msg)) 449 elif reg_type == Registration.SUB: 450 self.logger.debug("master.registerSubscriber(%s, %s, %s, %s)"%args) 451 code, msg, val = master.registerSubscriber(*args) 452 if code == 1: 453 self.publisher_update(resolved_name, val) 454 else: 455 # this is potentially worth exiting over. in the future may want to add a retry 456 # timer 457 logfatal("unable to register subscription [%s] with master: %s"%(resolved_name, msg)) 458 elif reg_type == Registration.SRV: 459 self.logger.debug("master.registerService(%s, %s, %s, %s)"%args) 460 code, msg, val = master.registerService(*args) 461 if code != 1: 462 logfatal("unable to register service [%s] with master: %s"%(resolved_name, msg)) 463 464 registered = True 465 except Exception as e: 466 if first: 467 msg = "Unable to register with master node [%s]: master may not be running yet. Will keep trying."%master_uri 468 self.logger.error(str(e)+"\n"+msg) 469 print(msg) 470 first = False 471 time.sleep(0.2)
472
473 - def publisher_update(self, resolved_name, uris):
474 """ 475 Inform psmanager of latest publisher list for a topic. This 476 will cause L{RegManager} to create a topic connection for all new 477 publishers (in a separate thread). 478 @param resolved_name: resolved topic name 479 @type resolved_name: str 480 @param uris: list of all publishers uris for topic 481 @type uris: [str] 482 """ 483 try: 484 self.cond.acquire() 485 self.updates.append((resolved_name, uris)) 486 self.cond.notifyAll() 487 finally: 488 self.cond.release()
489