Package rospy :: Package impl :: Module registration

Source Code for Module rospy.impl.registration

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id: registration.py 14618 2011-08-09 16:37:22Z kwc $ 
 34   
 35  """Internal use: handles maintaining registrations with master via internal listener APIs""" 
 36   
 37   
 38   
 39  import socket 
 40  import sys 
 41  import logging 
 42  try: 
 43          import _thread 
 44  except ImportError: 
 45      import thread as _thread 
 46  import threading 
 47  import time 
 48  import traceback 
 49  try: 
 50      import xmlrpc.client as xmlrpcclient 
 51  except ImportError: 
 52      import xmlrpclib as xmlrpcclient 
 53   
 54  from rospy.core import is_shutdown, xmlrpcapi, \ 
 55      logfatal, logwarn, loginfo, logerr, logdebug, \ 
 56      signal_shutdown, add_preshutdown_hook 
 57  from rospy.names import get_caller_id, get_namespace 
 58   
 59  # topic manager and service manager singletons 
 60   
 61  _topic_manager = None 
62 -def set_topic_manager(tm):
63 global _topic_manager 64 _topic_manager = tm
65 -def get_topic_manager():
66 return _topic_manager
67 68 _service_manager = None
69 -def set_service_manager(sm):
70 global _service_manager 71 _service_manager = sm
72 -def get_service_manager():
73 return _service_manager
74 75
76 -class Registration(object):
77 """Registration types""" 78 PUB = 'pub' 79 SUB = 'sub' 80 SRV = 'srv'
81
82 -class RegistrationListener(object):
83 """Listener API for subscribing to changes in Publisher/Subscriber/Service declarations""" 84
85 - def reg_added(self, resolved_name, data_type_or_uri, reg_type):
86 """ 87 New pub/sub/service declared. 88 @param resolved_name: resolved topic/service name 89 @param data_type_or_uri: topic type or service uri 90 @type data_type_or_uri: str 91 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 92 @type reg_type: str 93 """ 94 pass
95
96 - def reg_removed(self, resolved_name, data_type_or_uri, reg_type):
97 """ 98 New pub/sub/service removed. 99 @param resolved_name: topic/service name 100 @type resolved_name: str 101 @param data_type_or_uri: topic type or service uri 102 @type data_type_or_uri: str 103 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 104 @type reg_type: str 105 """ 106 pass 107
108 -class RegistrationListeners(object):
109
110 - def __init__(self):
111 """ 112 ctor. 113 """ 114 self.listeners = [] 115 self.lock = threading.Lock()
116
117 - def add_listener(self, l):
118 """ 119 Subscribe to notifications of pub/sub/service registration 120 changes. This is an internal API used to notify higher level 121 routines when to communicate with the master. 122 @param l: listener to subscribe 123 @type l: TopicListener 124 """ 125 assert isinstance(l, RegistrationListener) 126 with self.lock: 127 self.listeners.append(l)
128
129 - def notify_removed(self, resolved_name, data_type_or_uri, reg_type):
130 """ 131 @param resolved_name: resolved_topic/service name 132 @type resolved_name: str 133 @param data_type_or_uri: topic type or service uri 134 @type data_type_or_uri: str 135 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 136 @type reg_type: str 137 """ 138 with self.lock: 139 for l in self.listeners: 140 try: 141 l.reg_removed(resolved_name, data_type_or_uri, reg_type) 142 except Exception as e: 143 logerr("error notifying listener of removal: %s"%traceback.format_exc(e))
144
145 - def notify_added(self, resolved_name, data_type, reg_type):
146 """ 147 @param resolved_name: topic/service name 148 @type resolved_name: str 149 @param data_type: topic/service type 150 @type data_type: str 151 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 152 @type reg_type: str 153 """ 154 with self.lock: 155 for l in self.listeners: 156 try: 157 l.reg_added(resolved_name, data_type, reg_type) 158 except Exception as e: 159 logerr(traceback.format_exc(e))
160
161 - def clear(self):
162 """ 163 Remove all registration listeners 164 """ 165 with self.lock: 166 del self.listeners[:]
167 168 _registration_listeners = RegistrationListeners()
169 -def get_registration_listeners():
170 return _registration_listeners
171 172 # RegManager's main purpose is to collect all client->master communication in one place 173
174 -class RegManager(RegistrationListener):
175 """ 176 Registration manager used by Node implemenation. 177 Communicates with ROS Master to maintain topic registration 178 information. Also responds to publisher updates to create topic 179 connections 180 """ 181
182 - def __init__(self, handler):
183 """ 184 ctor. 185 @param handler: node API handler 186 """ 187 self.logger = logging.getLogger("rospy.registration") 188 self.handler = handler 189 self.uri = self.master_uri = None 190 self.updates = [] 191 self.cond = threading.Condition() #for locking/notifying updates 192 self.registered = False 193 # cleanup has to occur before official shutdown 194 add_preshutdown_hook(self.cleanup)
195
196 - def start(self, uri, master_uri):
197 """ 198 Start the RegManager. This should be passed in as an argument to a thread 199 starter as the RegManager is designed to spin in its own thread 200 @param uri: URI of local node 201 @type uri: str 202 @param master_uri: Master URI 203 @type master_uri: str 204 """ 205 self.registered = False 206 self.master_uri = master_uri 207 self.uri = uri 208 first = True 209 tm = get_topic_manager() 210 sm = get_service_manager() 211 ns = get_namespace() 212 caller_id = get_caller_id() 213 if not master_uri or master_uri == uri: 214 registered = True 215 master = None 216 else: 217 registered = False 218 master = xmlrpcapi(master_uri) 219 self.logger.info("Registering with master node %s", master_uri) 220 221 while not registered and not is_shutdown(): 222 try: 223 try: 224 # prevent TopicManager and ServiceManager from accepting registrations until we are done 225 tm.lock.acquire() 226 sm.lock.acquire() 227 228 pub, sub, srv = tm.get_publications(), tm.get_subscriptions(), sm.get_services() 229 for resolved_name, data_type in pub: 230 self.logger.info("Registering publisher topic [%s] type [%s] with master", resolved_name, data_type) 231 code, msg, val = master.registerPublisher(caller_id, resolved_name, data_type, uri) 232 if code != 1: 233 logfatal("cannot register publication topic [%s] with master: %s"%(resolved_name, msg)) 234 signal_shutdown("master/node incompatibility with register publisher") 235 for resolved_name, data_type in sub: 236 self.logger.info("registering subscriber topic [%s] type [%s] with master", resolved_name, data_type) 237 code, msg, val = master.registerSubscriber(caller_id, resolved_name, data_type, uri) 238 if code != 1: 239 logfatal("cannot register subscription topic [%s] with master: %s"%(resolved_name, msg)) 240 signal_shutdown("master/node incompatibility with register subscriber") 241 else: 242 self.publisher_update(resolved_name, val) 243 for resolved_name, service_uri in srv: 244 self.logger.info("registering service [%s] uri [%s] with master", resolved_name, service_uri) 245 code, msg, val = master.registerService(caller_id, resolved_name, service_uri, uri) 246 if code != 1: 247 logfatal("cannot register service [%s] with master: %s"%(resolved_name, msg)) 248 signal_shutdown("master/node incompatibility with register service") 249 250 registered = True 251 252 # Subscribe to updates to our state 253 get_registration_listeners().add_listener(self) 254 finally: 255 sm.lock.release() 256 tm.lock.release() 257 258 if pub or sub: 259 logdebug("Registered [%s] with master node %s", caller_id, master_uri) 260 else: 261 logdebug("No topics to register with master node %s", master_uri) 262 263 except Exception as e: 264 if first: 265 # this use to print to console always, arguable whether or not this should be subjected to same configuration options as logging 266 logerr("Unable to immediately register with master node [%s]: master may not be running yet. Will keep trying."%master_uri) 267 first = False 268 time.sleep(0.2) 269 self.registered = True 270 self.run()
271
272 - def is_registered(self):
273 """ 274 Check if Node has been registered yet. 275 @return: True if registration has occurred with master 276 @rtype: bool 277 """ 278 return self.registered
279
280 - def run(self):
281 """ 282 Main RegManager thread loop. 283 Periodically checks the update 284 queue and generates topic connections 285 """ 286 #Connect the topics 287 while not self.handler.done and not is_shutdown(): 288 cond = self.cond 289 try: 290 cond.acquire() 291 if not self.updates: 292 cond.wait(0.5) 293 if self.updates: 294 #work from the end as these are the most up-to-date 295 topic, uris = self.updates.pop() 296 #filter out older updates for same topic 297 self.updates = [x for x in self.updates if x[0] != topic] 298 else: 299 topic = uris = None 300 finally: 301 if cond is not None: 302 cond.release() 303 304 #call _connect_topic on all URIs as it can check to see whether 305 #or not a connection exists. 306 if uris and not self.handler.done: 307 for uri in uris: 308 # #1141: have to multithread this to prevent a bad publisher from hanging us 309 _thread.start_new_thread(self._connect_topic_thread, (topic, uri))
310
311 - def _connect_topic_thread(self, topic, uri):
312 try: 313 code, msg, _ = self.handler._connect_topic(topic, uri) 314 if code != 1: 315 logdebug("Unable to connect subscriber to publisher [%s] for topic [%s]: %s", uri, topic, msg) 316 except Exception as e: 317 if not is_shutdown(): 318 logdebug("Unable to connect to publisher [%s] for topic [%s]: %s"%(uri, topic, traceback.format_exc()))
319
320 - def cleanup(self, reason):
321 """ 322 Cleans up registrations with master and releases topic and service resources 323 @param reason: human-reasonable debug string 324 @type reason: str 325 """ 326 self.logger.debug("registration cleanup starting") 327 try: 328 self.cond.acquire() 329 self.cond.notifyAll() 330 finally: 331 self.cond.release() 332 333 # we never successfully initialized master_uri 334 if not self.master_uri: 335 return 336 337 master = xmlrpcapi(self.master_uri) 338 # we never successfully initialized master 339 if master is None: 340 return 341 342 caller_id = get_caller_id() 343 344 # clear the registration listeners as we are going to do a quick unregister here 345 rl = get_registration_listeners() 346 if rl is not None: 347 rl.clear() 348 349 tm = get_topic_manager() 350 sm = get_service_manager() 351 try: 352 multi = xmlrpcclient.MultiCall(master) 353 if tm is not None: 354 for resolved_name, _ in tm.get_subscriptions(): 355 self.logger.debug("unregisterSubscriber [%s]"%resolved_name) 356 multi.unregisterSubscriber(caller_id, resolved_name, self.uri) 357 for resolved_name, _ in tm.get_publications(): 358 self.logger.debug("unregisterPublisher [%s]"%resolved_name) 359 multi.unregisterPublisher(caller_id, resolved_name, self.uri) 360 361 if sm is not None: 362 for resolved_name, service_uri in sm.get_services(): 363 self.logger.debug("unregisterService [%s]"%resolved_name) 364 multi.unregisterService(caller_id, resolved_name, service_uri) 365 multi() 366 except socket.error as se: 367 (errno, msg) = se.args 368 if errno == 111 or errno == 61: #can't talk to master, nothing we can do about it 369 self.logger.warn("cannot unregister with master due to network issues") 370 else: 371 self.logger.warn("unclean shutdown\n%s"%traceback.format_exc()) 372 except: 373 self.logger.warn("unclean shutdown\n%s"%traceback.format_exc()) 374 375 self.logger.debug("registration cleanup: master calls complete") 376 377 #TODO: cleanup() should actually be orchestrated by a separate 378 #cleanup routine that calls the reg manager/sm/tm 379 if tm is not None: 380 tm.close_all() 381 if sm is not None: 382 sm.unregister_all()
383
384 - def reg_removed(self, resolved_name, data_type_or_uri, reg_type):
385 """ 386 RegistrationListener callback 387 @param resolved_name: resolved name of topic or service 388 @type resolved_name: str 389 @param data_type_or_uri: either the data type (for topic regs) or the service URI (for service regs). 390 @type data_type_or_uri: str 391 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 392 @type reg_type: str 393 """ 394 master_uri = self.master_uri 395 if not master_uri: 396 self.logger.error("Registrar: master_uri is not set yet, cannot inform master of deregistration") 397 else: 398 try: 399 master = xmlrpcapi(master_uri) 400 if reg_type == Registration.PUB: 401 self.logger.debug("unregisterPublisher(%s, %s)", resolved_name, self.uri) 402 master.unregisterPublisher(get_caller_id(), resolved_name, self.uri) 403 elif reg_type == Registration.SUB: 404 self.logger.debug("unregisterSubscriber(%s, %s)", resolved_name, data_type_or_uri) 405 master.unregisterSubscriber(get_caller_id(), resolved_name, self.uri) 406 elif reg_type == Registration.SRV: 407 self.logger.debug("unregisterService(%s, %s)", resolved_name, data_type_or_uri) 408 master.unregisterService(get_caller_id(), resolved_name, data_type_or_uri) 409 except: 410 logwarn("unable to communicate with ROS Master, registrations are now out of sync") 411 self.logger.error(traceback.format_exc())
412
413 - def reg_added(self, resolved_name, data_type_or_uri, reg_type):
414 """ 415 RegistrationListener callback 416 @param resolved_name: resolved name of topic or service 417 @type resolved_name: str 418 @param data_type_or_uri: either the data type (for topic regs) or the service URI (for service regs). 419 @type data_type_or_uri: str 420 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 421 @type reg_type: str 422 """ 423 #TODO: this needs to be made robust to master outages 424 master_uri = self.master_uri 425 if not master_uri: 426 self.logger.error("Registrar: master_uri is not set yet, cannot inform master of registration") 427 else: 428 master = xmlrpcapi(master_uri) 429 args = (get_caller_id(), resolved_name, data_type_or_uri, self.uri) 430 registered = False 431 first = True 432 while not registered and not is_shutdown(): 433 try: 434 if reg_type == Registration.PUB: 435 self.logger.debug("master.registerPublisher(%s, %s, %s, %s)"%args) 436 code, msg, val = master.registerPublisher(*args) 437 if code != 1: 438 logfatal("unable to register publication [%s] with master: %s"%(resolved_name, msg)) 439 elif reg_type == Registration.SUB: 440 self.logger.debug("master.registerSubscriber(%s, %s, %s, %s)"%args) 441 code, msg, val = master.registerSubscriber(*args) 442 if code == 1: 443 self.publisher_update(resolved_name, val) 444 else: 445 # this is potentially worth exiting over. in the future may want to add a retry 446 # timer 447 logfatal("unable to register subscription [%s] with master: %s"%(resolved_name, msg)) 448 elif reg_type == Registration.SRV: 449 self.logger.debug("master.registerService(%s, %s, %s, %s)"%args) 450 code, msg, val = master.registerService(*args) 451 if code != 1: 452 logfatal("unable to register service [%s] with master: %s"%(resolved_name, msg)) 453 454 registered = True 455 except Exception as e: 456 if first: 457 msg = "Unable to register with master node [%s]: master may not be running yet. Will keep trying."%master_uri 458 self.logger.error(str(e)+"\n"+msg) 459 print(msg) 460 first = False 461 time.sleep(0.2)
462
463 - def publisher_update(self, resolved_name, uris):
464 """ 465 Inform psmanager of latest publisher list for a topic. This 466 will cause L{RegManager} to create a topic connection for all new 467 publishers (in a separate thread). 468 @param resolved_name: resolved topic name 469 @type resolved_name: str 470 @param uris: list of all publishers uris for topic 471 @type uris: [str] 472 """ 473 try: 474 self.cond.acquire() 475 self.updates.append((resolved_name, uris)) 476 self.cond.notifyAll() 477 finally: 478 self.cond.release()
479