Package rospy :: Package impl :: Module registration

Source Code for Module rospy.impl.registration

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id: registration.py 11753 2010-10-25 06:23:19Z kwc $ 
 34   
 35  """Internal use: handles maintaining registrations with master via internal listener APIs""" 
 36   
 37  from __future__ import with_statement 
 38   
 39  import socket 
 40  import sys 
 41  import logging 
 42  import thread 
 43  import threading 
 44  import time 
 45  import traceback 
 46  import xmlrpclib 
 47   
 48  from rospy.core import is_shutdown, xmlrpcapi, \ 
 49      logfatal, logwarn, loginfo, logerr, logdebug, \ 
 50      signal_shutdown, add_preshutdown_hook 
 51  from rospy.names import get_caller_id, get_namespace 
 52   
 53  # topic manager and service manager singletons 
 54   
 55  _topic_manager = None 
56 -def set_topic_manager(tm):
57 global _topic_manager 58 _topic_manager = tm
59 -def get_topic_manager():
60 return _topic_manager
61 62 _service_manager = None
63 -def set_service_manager(sm):
64 global _service_manager 65 _service_manager = sm
66 -def get_service_manager():
67 return _service_manager
68 69
70 -class Registration(object):
71 """Registration types""" 72 PUB = 'pub' 73 SUB = 'sub' 74 SRV = 'srv'
75
76 -class RegistrationListener(object):
77 """Listener API for subscribing to changes in Publisher/Subscriber/Service declarations""" 78
79 - def reg_added(self, resolved_name, data_type_or_uri, reg_type):
80 """ 81 New pub/sub/service declared. 82 @param resolved_name: resolved topic/service name 83 @param data_type_or_uri: topic type or service uri 84 @type data_type_or_uri: str 85 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 86 @type reg_type: str 87 """ 88 pass
89
90 - def reg_removed(self, resolved_name, data_type_or_uri, reg_type):
91 """ 92 New pub/sub/service removed. 93 @param resolved_name: topic/service name 94 @type resolved_name: str 95 @param data_type_or_uri: topic type or service uri 96 @type data_type_or_uri: str 97 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 98 @type reg_type: str 99 """ 100 pass 101
102 -class RegistrationListeners(object):
103
104 - def __init__(self):
105 """ 106 ctor. 107 """ 108 self.listeners = [] 109 self.lock = threading.Lock()
110
111 - def add_listener(self, l):
112 """ 113 Subscribe to notifications of pub/sub/service registration 114 changes. This is an internal API used to notify higher level 115 routines when to communicate with the master. 116 @param l: listener to subscribe 117 @type l: TopicListener 118 """ 119 assert isinstance(l, RegistrationListener) 120 with self.lock: 121 self.listeners.append(l)
122
123 - def notify_removed(self, resolved_name, data_type_or_uri, reg_type):
124 """ 125 @param resolved_name: resolved_topic/service name 126 @type resolved_name: str 127 @param data_type_or_uri: topic type or service uri 128 @type data_type_or_uri: str 129 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 130 @type reg_type: str 131 """ 132 with self.lock: 133 for l in self.listeners: 134 try: 135 l.reg_removed(resolved_name, data_type_or_uri, reg_type) 136 except Exception, e: 137 logerr("error notifying listener of removal: %s"%traceback.format_exc(e))
138
139 - def notify_added(self, resolved_name, data_type, reg_type):
140 """ 141 @param resolved_name: topic/service name 142 @type resolved_name: str 143 @param data_type: topic/service type 144 @type data_type: str 145 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 146 @type reg_type: str 147 """ 148 with self.lock: 149 for l in self.listeners: 150 try: 151 l.reg_added(resolved_name, data_type, reg_type) 152 except Exception, e: 153 logerr(traceback.format_exc(e))
154
155 - def clear(self):
156 """ 157 Remove all registration listeners 158 """ 159 with self.lock: 160 del self.listeners[:]
161 162 _registration_listeners = RegistrationListeners()
163 -def get_registration_listeners():
164 return _registration_listeners
165 166 # RegManager's main purpose is to collect all client->master communication in one place 167
168 -class RegManager(RegistrationListener):
169 """ 170 Registration manager used by Node implemenation. 171 Communicates with ROS Master to maintain topic registration 172 information. Also responds to publisher updates to create topic 173 connections 174 """ 175
176 - def __init__(self, handler):
177 """ 178 ctor. 179 @param handler: node API handler 180 """ 181 self.logger = logging.getLogger("rospy.registration") 182 self.handler = handler 183 self.uri = self.master_uri = None 184 self.updates = [] 185 self.cond = threading.Condition() #for locking/notifying updates 186 self.registered = False 187 # cleanup has to occur before official shutdown 188 add_preshutdown_hook(self.cleanup)
189
190 - def start(self, uri, master_uri):
191 """ 192 Start the RegManager. This should be passed in as an argument to a thread 193 starter as the RegManager is designed to spin in its own thread 194 @param uri: URI of local node 195 @type uri: str 196 @param master_uri: Master URI 197 @type master_uri: str 198 """ 199 self.registered = False 200 self.master_uri = master_uri 201 self.uri = uri 202 first = True 203 tm = get_topic_manager() 204 sm = get_service_manager() 205 ns = get_namespace() 206 caller_id = get_caller_id() 207 if not master_uri or master_uri == uri: 208 registered = True 209 master = None 210 else: 211 registered = False 212 master = xmlrpcapi(master_uri) 213 self.logger.info("Registering with master node %s", master_uri) 214 215 while not registered and not is_shutdown(): 216 try: 217 try: 218 # prevent TopicManager and ServiceManager from accepting registrations until we are done 219 tm.lock.acquire() 220 sm.lock.acquire() 221 222 pub, sub, srv = tm.get_publications(), tm.get_subscriptions(), sm.get_services() 223 for resolved_name, data_type in pub: 224 self.logger.info("Registering publisher topic [%s] type [%s] with master", resolved_name, data_type) 225 code, msg, val = master.registerPublisher(caller_id, resolved_name, data_type, uri) 226 if code != 1: 227 logfatal("cannot register publication topic [%s] with master: %s"%(resolved_name, msg)) 228 signal_shutdown("master/node incompatibility with register publisher") 229 for resolved_name, data_type in sub: 230 self.logger.info("registering subscriber topic [%s] type [%s] with master", resolved_name, data_type) 231 code, msg, val = master.registerSubscriber(caller_id, resolved_name, data_type, uri) 232 if code != 1: 233 logfatal("cannot register subscription topic [%s] with master: %s"%(resolved_name, msg)) 234 signal_shutdown("master/node incompatibility with register subscriber") 235 else: 236 self.publisher_update(resolved_name, val) 237 for resolved_name, service_uri in srv: 238 self.logger.info("registering service [%s] uri [%s] with master", resolved_name, service_uri) 239 code, msg, val = master.registerService(caller_id, resolved_name, service_uri, uri) 240 if code != 1: 241 logfatal("cannot register service [%s] with master: %s"%(resolved_name, msg)) 242 signal_shutdown("master/node incompatibility with register service") 243 244 registered = True 245 246 # Subscribe to updates to our state 247 get_registration_listeners().add_listener(self) 248 finally: 249 sm.lock.release() 250 tm.lock.release() 251 252 if pub or sub: 253 logdebug("Registered [%s] with master node %s", caller_id, master_uri) 254 else: 255 logdebug("No topics to register with master node %s", master_uri) 256 257 except Exception, e: 258 if first: 259 # this use to print to console always, arguable whether or not this should be subjected to same configuration options as logging 260 logerr("Unable to immediately register with master node [%s]: master may not be running yet. Will keep trying."%master_uri) 261 first = False 262 time.sleep(0.2) 263 self.registered = True 264 self.run()
265
266 - def is_registered(self):
267 """ 268 Check if Node has been registered yet. 269 @return: True if registration has occurred with master 270 @rtype: bool 271 """ 272 return self.registered
273
274 - def run(self):
275 """ 276 Main RegManager thread loop. 277 Periodically checks the update 278 queue and generates topic connections 279 """ 280 #Connect the topics 281 while not self.handler.done and not is_shutdown(): 282 cond = self.cond 283 try: 284 cond.acquire() 285 if not self.updates: 286 cond.wait(0.5) 287 if self.updates: 288 #work from the end as these are the most up-to-date 289 topic, uris = self.updates.pop() 290 #filter out older updates for same topic 291 self.updates = [x for x in self.updates if x[0] != topic] 292 else: 293 topic = uris = None 294 finally: 295 if cond is not None: 296 cond.release() 297 298 #call _connect_topic on all URIs as it can check to see whether 299 #or not a connection exists. 300 if uris and not self.handler.done: 301 for uri in uris: 302 # #1141: have to multithread this to prevent a bad publisher from hanging us 303 thread.start_new_thread(self._connect_topic_thread, (topic, uri))
304
305 - def _connect_topic_thread(self, topic, uri):
306 try: 307 code, msg, _ = self.handler._connect_topic(topic, uri) 308 if code != 1: 309 logdebug("Unable to connect subscriber to publisher [%s] for topic [%s]: %s", uri, topic, msg) 310 except Exception, e: 311 if not is_shutdown(): 312 logdebug("Unable to connect to publisher [%s] for topic [%s]: %s"%(uri, topic, traceback.format_exc()))
313
314 - def cleanup(self, reason):
315 """ 316 Cleans up registrations with master and releases topic and service resources 317 @param reason: human-reasonable debug string 318 @type reason: str 319 """ 320 self.logger.debug("registration cleanup starting") 321 try: 322 self.cond.acquire() 323 self.cond.notifyAll() 324 finally: 325 self.cond.release() 326 327 # we never successfully initialized master_uri 328 if not self.master_uri: 329 return 330 331 master = xmlrpcapi(self.master_uri) 332 # we never successfully initialized master 333 if master is None: 334 return 335 336 caller_id = get_caller_id() 337 338 # clear the registration listeners as we are going to do a quick unregister here 339 rl = get_registration_listeners() 340 if rl is not None: 341 rl.clear() 342 343 tm = get_topic_manager() 344 sm = get_service_manager() 345 try: 346 multi = xmlrpclib.MultiCall(master) 347 if tm is not None: 348 for resolved_name, _ in tm.get_subscriptions(): 349 self.logger.debug("unregisterSubscriber [%s]"%resolved_name) 350 multi.unregisterSubscriber(caller_id, resolved_name, self.uri) 351 for resolved_name, _ in tm.get_publications(): 352 self.logger.debug("unregisterPublisher [%s]"%resolved_name) 353 multi.unregisterPublisher(caller_id, resolved_name, self.uri) 354 355 if sm is not None: 356 for resolved_name, service_uri in sm.get_services(): 357 self.logger.debug("unregisterService [%s]"%resolved_name) 358 multi.unregisterService(caller_id, resolved_name, service_uri) 359 multi() 360 except socket.error, (errno, msg): 361 if errno == 111 or errno == 61: #can't talk to master, nothing we can do about it 362 self.logger.warn("cannot unregister with master due to network issues") 363 else: 364 self.logger.warn("unclean shutdown\n%s"%traceback.format_exc()) 365 except: 366 self.logger.warn("unclean shutdown\n%s"%traceback.format_exc()) 367 368 self.logger.debug("registration cleanup: master calls complete") 369 370 #TODO: cleanup() should actually be orchestrated by a separate 371 #cleanup routine that calls the reg manager/sm/tm 372 if tm is not None: 373 tm.close_all() 374 if sm is not None: 375 sm.unregister_all()
376
377 - def reg_removed(self, resolved_name, data_type_or_uri, reg_type):
378 """ 379 RegistrationListener callback 380 @param resolved_name: resolved name of topic or service 381 @type resolved_name: str 382 @param data_type_or_uri: either the data type (for topic regs) or the service URI (for service regs). 383 @type data_type_or_uri: str 384 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 385 @type reg_type: str 386 """ 387 master_uri = self.master_uri 388 if not master_uri: 389 self.logger.error("Registrar: master_uri is not set yet, cannot inform master of deregistration") 390 else: 391 try: 392 master = xmlrpcapi(master_uri) 393 if reg_type == Registration.PUB: 394 self.logger.debug("unregisterPublisher(%s, %s)", resolved_name, self.uri) 395 master.unregisterPublisher(get_caller_id(), resolved_name, self.uri) 396 elif reg_type == Registration.SUB: 397 self.logger.debug("unregisterSubscriber(%s, %s)", resolved_name, data_type_or_uri) 398 master.unregisterSubscriber(get_caller_id(), resolved_name, self.uri) 399 elif reg_type == Registration.SRV: 400 self.logger.debug("unregisterService(%s, %s)", resolved_name, data_type_or_uri) 401 master.unregisterService(get_caller_id(), resolved_name, data_type_or_uri) 402 except: 403 logwarn("unable to communicate with ROS Master, registrations are now out of sync") 404 self.logger.error(traceback.format_exc())
405
406 - def reg_added(self, resolved_name, data_type_or_uri, reg_type):
407 """ 408 RegistrationListener callback 409 @param resolved_name: resolved name of topic or service 410 @type resolved_name: str 411 @param data_type_or_uri: either the data type (for topic regs) or the service URI (for service regs). 412 @type data_type_or_uri: str 413 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 414 @type reg_type: str 415 """ 416 #TODO: this needs to be made robust to master outages 417 master_uri = self.master_uri 418 if not master_uri: 419 self.logger.error("Registrar: master_uri is not set yet, cannot inform master of registration") 420 else: 421 master = xmlrpcapi(master_uri) 422 args = (get_caller_id(), resolved_name, data_type_or_uri, self.uri) 423 registered = False 424 first = True 425 while not registered and not is_shutdown(): 426 try: 427 if reg_type == Registration.PUB: 428 self.logger.debug("master.registerPublisher(%s, %s, %s, %s)"%args) 429 code, msg, val = master.registerPublisher(*args) 430 if code != 1: 431 logfatal("unable to register publication [%s] with master: %s"%(resolved_name, msg)) 432 elif reg_type == Registration.SUB: 433 self.logger.debug("master.registerSubscriber(%s, %s, %s, %s)"%args) 434 code, msg, val = master.registerSubscriber(*args) 435 if code == 1: 436 self.publisher_update(resolved_name, val) 437 else: 438 # this is potentially worth exiting over. in the future may want to add a retry 439 # timer 440 logfatal("unable to register subscription [%s] with master: %s"%(resolved_name, msg)) 441 elif reg_type == Registration.SRV: 442 self.logger.debug("master.registerService(%s, %s, %s, %s)"%args) 443 code, msg, val = master.registerService(*args) 444 if code != 1: 445 logfatal("unable to register service [%s] with master: %s"%(resolved_name, msg)) 446 447 registered = True 448 except Exception, e: 449 if first: 450 msg = "Unable to register with master node [%s]: master may not be running yet. Will keep trying."%master_uri 451 self.logger.error(str(e)+"\n"+msg) 452 print msg 453 first = False 454 time.sleep(0.2)
455
456 - def publisher_update(self, resolved_name, uris):
457 """ 458 Inform psmanager of latest publisher list for a topic. This 459 will cause L{RegManager} to create a topic connection for all new 460 publishers (in a separate thread). 461 @param resolved_name: resolved topic name 462 @type resolved_name: str 463 @param uris: list of all publishers uris for topic 464 @type uris: [str] 465 """ 466 try: 467 self.cond.acquire() 468 self.updates.append((resolved_name, uris)) 469 self.cond.notifyAll() 470 finally: 471 self.cond.release()
472