Package rosmaster :: Module registrations

Source Code for Module rosmaster.registrations

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  from rosmaster.util import xmlrpcapi 
 36  import rosmaster.exceptions 
 37   
 38  """Data structures for representing registration data in the Master""" 
 39   
40 -class NodeRef(object):
41 """ 42 Container for node registration information. Used in master's 43 self.nodes data structure. This is effectively a reference 44 counter for the node registration information: when the 45 subscriptions and publications are empty the node registration can 46 be deleted. 47 """
48 - def __init__(self, id, api):
49 """ 50 ctor 51 @param api str: node XML-RPC API 52 """ 53 self.id = id 54 self.api = api 55 self.param_subscriptions = [] 56 self.topic_subscriptions = [] 57 self.topic_publications = [] 58 self.services = []
59
60 - def clear(self):
61 """ 62 Delete all state from this NodeRef except for the api location 63 """ 64 self.param_subscriptions = [] 65 self.topic_subscriptions = [] 66 self.topic_publications = [] 67 self.services = []
68
69 - def is_empty(self):
70 """ 71 @return: True if node has no active registrations 72 """ 73 return sum((len(x) for x in 74 [self.param_subscriptions, 75 self.topic_subscriptions, 76 self.topic_publications, 77 self.services,])) == 0
78
79 - def add(self, type_, key):
80 if type_ == Registrations.TOPIC_SUBSCRIPTIONS: 81 if not key in self.topic_subscriptions: 82 self.topic_subscriptions.append(key) 83 elif type_ == Registrations.TOPIC_PUBLICATIONS: 84 if not key in self.topic_publications: 85 self.topic_publications.append(key) 86 elif type_ == Registrations.SERVICE: 87 if not key in self.services: 88 self.services.append(key) 89 elif type_ == Registrations.PARAM_SUBSCRIPTIONS: 90 if not key in self.param_subscriptions: 91 self.param_subscriptions.append(key) 92 else: 93 raise rosmaster.exceptions.InternalException("internal bug")
94
95 - def remove(self, type_, key):
96 if type_ == Registrations.TOPIC_SUBSCRIPTIONS: 97 if key in self.topic_subscriptions: 98 self.topic_subscriptions.remove(key) 99 elif type_ == Registrations.TOPIC_PUBLICATIONS: 100 if key in self.topic_publications: 101 self.topic_publications.remove(key) 102 elif type_ == Registrations.SERVICE: 103 if key in self.services: 104 self.services.remove(key) 105 elif type_ == Registrations.PARAM_SUBSCRIPTIONS: 106 if key in self.param_subscriptions: 107 self.param_subscriptions.remove(key) 108 else: 109 raise rosmaster.exceptions.InternalException("internal bug")
110 111 # NOTE: I'm not terribly happy that this task has leaked into the data model. need 112 # to refactor to get this back into masterslave. 113
114 -def shutdown_node_task(api, caller_id, reason):
115 """ 116 Method to shutdown another ROS node. Generally invoked within a 117 separate thread as this is used to cleanup hung nodes. 118 119 @param api: XML-RPC API of node to shutdown 120 @type api: str 121 @param caller_id: name of node being shutdown 122 @type caller_id: str 123 @param reason: human-readable reason why node is being shutdown 124 @type reason: str 125 """ 126 try: 127 xmlrpcapi(api).shutdown('/master', reason) 128 except: 129 pass #expected in many common cases
130
131 -class Registrations(object):
132 """ 133 All calls may result in access/modifications to node registrations 134 dictionary, so be careful to guarantee appropriate thread-safeness. 135 136 Data structure for storing a set of registrations (e.g. publications, services). 137 The underlying data storage is the same except for services, which have the 138 constraint that only one registration may be active for a given key. 139 """ 140 141 TOPIC_SUBSCRIPTIONS = 1 142 TOPIC_PUBLICATIONS = 2 143 SERVICE = 3 144 PARAM_SUBSCRIPTIONS = 4 145
146 - def __init__(self, type_):
147 """ 148 ctor. 149 @param type_: one of [ TOPIC_SUBSCRIPTIONS, 150 TOPIC_PUBLICATIONS, SERVICE, PARAM_SUBSCRIPTIONS ] 151 @type type_: int 152 """ 153 if not type_ in [ 154 Registrations.TOPIC_SUBSCRIPTIONS, 155 Registrations.TOPIC_PUBLICATIONS, 156 Registrations.SERVICE, 157 Registrations.PARAM_SUBSCRIPTIONS ]: 158 raise rosmaster.exceptions.InternalException("invalid registration type: %s"%type_) 159 self.type = type_ 160 ## { key: [(caller_id, caller_api)] } 161 self.map = {} 162 self.service_api_map = None
163
164 - def __nonzero__(self):
165 """ 166 @return: True if there are no registrations 167 """ 168 return len(self.map) != 0
169
170 - def iterkeys(self):
171 """ 172 Iterate over registration keys 173 @return: iterator for registration keys 174 """ 175 return self.map.iterkeys()
176
177 - def get_service_api(self, service):
178 """ 179 Lookup service API URI. NOTE: this should only be valid if type==SERVICE as 180 service Registrations instances are the only ones that track service API URIs. 181 @param service: service name 182 @type service: str 183 @return str: service_api for registered key or None if 184 registration is no longer valid. 185 @type: str 186 """ 187 if self.service_api_map and service in self.service_api_map: 188 caller_id, service_api = self.service_api_map[service] 189 return service_api 190 return None
191
192 - def get_apis(self, key):
193 """ 194 Only valid if self.type != SERVICE. 195 @param key: registration key (e.g. topic/service/param name) 196 @type key: str 197 @return: caller_apis for registered key, empty list if registration is not valid 198 @rtype: [str] 199 """ 200 return [api for _, api in self.map.get(key, [])]
201
202 - def __contains__(self, key):
203 """ 204 Emulate mapping type for has_key() 205 """ 206 return key in self.map
207
208 - def __getitem__(self, key):
209 """ 210 @param key: registration key (e.g. topic/service/param name) 211 @type key: str 212 @return: (caller_id, caller_api) for registered 213 key, empty list if registration is not valid 214 @rtype: [(str, str),] 215 """ 216 # unlike get_apis, returns the caller_id to prevent any race 217 # conditions that can occur if caller_id/caller_apis change 218 # due to a new node. 219 return self.map.get(key, [])
220
221 - def has_key(self, key):
222 """ 223 @param key: registration key (e.g. topic/service/param name) 224 @type key: str 225 @return: True if key is registered 226 @rtype: bool 227 """ 228 return key in self.map
229
230 - def get_state(self):
231 """ 232 @return: state in getSystemState()-friendly format [ [key, [callerId1...callerIdN]] ... ] 233 @rtype: [str, [str]...] 234 """ 235 retval = [] 236 for k in self.map.iterkeys(): 237 retval.append([k, [id for id, _ in self.map[k]]]) 238 return retval
239
240 - def register(self, key, caller_id, caller_api, service_api=None):
241 """ 242 Add caller_id into the map as a provider of the specified 243 service (key). caller_id must not have been previously 244 registered with a different caller_api. 245 246 Subroutine for managing provider map data structure (essentially a multimap). 247 @param key: registration key (e.g. topic/service/param name) 248 @type key: str 249 @param caller_id: caller_id of provider 250 @type caller_id: str 251 @param caller_api: API URI of provider 252 @type caller_api: str 253 @param service_api: (keyword) ROS service API URI if registering a service 254 @type service_api: str 255 """ 256 map = self.map 257 if key in map and not service_api: 258 providers = map[key] 259 if not (caller_id, caller_api) in providers: 260 providers.append((caller_id, caller_api)) 261 else: 262 map[key] = providers = [(caller_id, caller_api)] 263 264 if service_api: 265 if self.service_api_map is None: 266 self.service_api_map = {} 267 self.service_api_map[key] = (caller_id, service_api) 268 elif self.type == Registrations.SERVICE: 269 raise rosmaster.exceptions.InternalException("service_api must be specified for Registrations.SERVICE")
270
271 - def unregister_all(self, caller_id):
272 """ 273 Remove all registrations associated with caller_id 274 @param caller_id: caller_id of provider 275 @type caller_id: str 276 """ 277 map = self.map 278 # fairly expensive 279 dead_keys = [] 280 for key in map: 281 providers = map[key] 282 # find all matching entries 283 to_remove = [(id, api) for id, api in providers if id == caller_id] 284 # purge them 285 for r in to_remove: 286 providers.remove(r) 287 if not providers: 288 dead_keys.append(key) 289 for k in dead_keys: 290 del self.map[k] 291 if self.type == Registrations.SERVICE and self.service_api_map: 292 del dead_keys[:] 293 for key, val in self.service_api_map.iteritems(): 294 if val[0] == caller_id: 295 dead_keys.append(key) 296 for k in dead_keys: 297 del self.service_api_map[k]
298
299 - def unregister(self, key, caller_id, caller_api, service_api=None):
300 """ 301 Remove caller_id from the map as a provider of the specified service (key). 302 Subroutine for managing provider map data structure, essentially a multimap 303 @param key: registration key (e.g. topic/service/param name) 304 @type key: str 305 @param caller_id: caller_id of provider 306 @type caller_id: str 307 @param caller_api: API URI of provider 308 @type caller_api: str 309 @param service_api: (keyword) ROS service API URI if registering a service 310 @type service_api: str 311 @return: for ease of master integration, directly returns unregister value for 312 higher-level XMLRPC API. val is the number of APIs unregistered (0 or 1) 313 @rtype: code, msg, val 314 """ 315 # if we are unregistering a topic, validate against the caller_api 316 if service_api: 317 # validate against the service_api 318 if self.service_api_map is None: 319 return 1, "[%s] is not a provider of [%s]"%(caller_id, key), 0 320 if self.service_api_map.get(key, None) != (caller_id, service_api): 321 return 1, "[%s] is no longer the current service api handle for [%s]"%(service_api, key), 0 322 else: 323 del self.service_api_map[key] 324 del self.map[key] 325 # caller_api is None for unregister service, so we can't validate as well 326 return 1, "Unregistered [%s] as provider of [%s]"%(caller_id, key), 1 327 elif self.type == Registrations.SERVICE: 328 raise rosmaster.exceptions.InternalException("service_api must be specified for Registrations.SERVICE") 329 else: 330 providers = self.map.get(key, []) 331 if (caller_id, caller_api) in providers: 332 providers.remove((caller_id, caller_api)) 333 if not providers: 334 del self.map[key] 335 return 1, "Unregistered [%s] as provider of [%s]"%(caller_id, key), 1 336 else: 337 return 1, "[%s] is not a known provider of [%s]"%(caller_id, key), 0
338
339 -class RegistrationManager(object):
340 """ 341 Stores registrations for Master. 342 343 RegistrationManager is not threadsafe, so access must be externally locked as appropriate 344 """ 345
346 - def __init__(self, thread_pool):
347 """ 348 ctor. 349 @param thread_pool: thread pool for queueing tasks 350 @type thread_pool: ThreadPool 351 """ 352 self.nodes = {} 353 self.thread_pool = thread_pool 354 355 self.publishers = Registrations(Registrations.TOPIC_PUBLICATIONS) 356 self.subscribers = Registrations(Registrations.TOPIC_SUBSCRIPTIONS) 357 self.services = Registrations(Registrations.SERVICE) 358 self.param_subscribers = Registrations(Registrations.PARAM_SUBSCRIPTIONS)
359 360
361 - def reverse_lookup(self, caller_api):
362 """ 363 Get a NodeRef by caller_api 364 @param caller_api: caller XML RPC URI 365 @type caller_api: str 366 @return: nodes that declare caller_api as their 367 API. 99.9% of the time this should only be one node, but we 368 allow for multiple matches as the master API does not restrict 369 this. 370 @rtype: [NodeRef] 371 """ 372 matches = [n for n in self.nodes.iteritems() if n.api == caller_api] 373 if matches: 374 return matches
375
376 - def get_node(self, caller_id):
377 return self.nodes.get(caller_id, None)
378
379 - def _register(self, r, key, caller_id, caller_api, service_api=None):
380 # update node information 381 node_ref, changed = self._register_node_api(caller_id, caller_api) 382 node_ref.add(r.type, key) 383 # update pub/sub/service indicies 384 if changed: 385 self.publishers.unregister_all(caller_id) 386 self.subscribers.unregister_all(caller_id) 387 self.services.unregister_all(caller_id) 388 self.param_subscribers.unregister_all(caller_id) 389 r.register(key, caller_id, caller_api, service_api)
390
391 - def _unregister(self, r, key, caller_id, caller_api, service_api=None):
392 node_ref = self.nodes.get(caller_id, None) 393 if node_ref != None: 394 retval = r.unregister(key, caller_id, caller_api, service_api) 395 # check num removed field, if 1, unregister is valid 396 if retval[2] == 1: 397 node_ref.remove(r.type, key) 398 if node_ref.is_empty(): 399 del self.nodes[caller_id] 400 else: 401 retval = 1, "[%s] is not a registered node"%caller_id, 0 402 return retval
403
404 - def register_service(self, service, caller_id, caller_api, service_api):
405 """ 406 Register service provider 407 @return: None 408 """ 409 self._register(self.services, service, caller_id, caller_api, service_api)
410 - def register_publisher(self, topic, caller_id, caller_api):
411 """ 412 Register topic publisher 413 @return: None 414 """ 415 self._register(self.publishers, topic, caller_id, caller_api)
416 - def register_subscriber(self, topic, caller_id, caller_api):
417 """ 418 Register topic subscriber 419 @return: None 420 """ 421 self._register(self.subscribers, topic, caller_id, caller_api)
422 - def register_param_subscriber(self, param, caller_id, caller_api):
423 """ 424 Register param subscriber 425 @return: None 426 """ 427 self._register(self.param_subscribers, param, caller_id, caller_api)
428
429 - def unregister_service(self, service, caller_id, service_api):
430 caller_api = None 431 return self._unregister(self.services, service, caller_id, caller_api, service_api)
432
433 - def unregister_subscriber(self, topic, caller_id, caller_api):
434 return self._unregister(self.subscribers, topic, caller_id, caller_api)
435 - def unregister_publisher(self, topic, caller_id, caller_api):
436 return self._unregister(self.publishers, topic, caller_id, caller_api)
437 - def unregister_param_subscriber(self, param, caller_id, caller_api):
438 return self._unregister(self.param_subscribers, param, caller_id, caller_api)
439
440 - def _register_node_api(self, caller_id, caller_api):
441 """ 442 @param caller_id: caller_id of provider 443 @type caller_id: str 444 @param caller_api: caller_api of provider 445 @type caller_api: str 446 @return: (registration_information, changed_registration). changed_registration is true if 447 caller_api is differet than the one registered with caller_id 448 @rtype: (NodeRef, bool) 449 """ 450 node_ref = self.nodes.get(caller_id, None) 451 452 bumped_api = None 453 if node_ref is not None: 454 if node_ref.api == caller_api: 455 return node_ref, False 456 else: 457 bumped_api = node_ref.api 458 self.thread_pool.queue_task(bumped_api, shutdown_node_task, 459 (bumped_api, caller_id, "new node registered with same name")) 460 461 node_ref = NodeRef(caller_id, caller_api) 462 self.nodes[caller_id] = node_ref 463 return (node_ref, bumped_api != None)
464