1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 from rosmaster.util import remove_server_proxy
36 from rosmaster.util import xmlrpcapi
37 import rosmaster.exceptions
38
39 """Data structures for representing registration data in the Master"""
40
42 """
43 Container for node registration information. Used in master's
44 self.nodes data structure. This is effectively a reference
45 counter for the node registration information: when the
46 subscriptions and publications are empty the node registration can
47 be deleted.
48 """
50 """
51 ctor
52 @param api str: node XML-RPC API
53 """
54 self.id = id
55 self.api = api
56 self.param_subscriptions = []
57 self.topic_subscriptions = []
58 self.topic_publications = []
59 self.services = []
60
62 """
63 Delete all state from this NodeRef except for the api location
64 """
65 self.param_subscriptions = []
66 self.topic_subscriptions = []
67 self.topic_publications = []
68 self.services = []
69
71 """
72 @return: True if node has no active registrations
73 """
74 return sum((len(x) for x in
75 [self.param_subscriptions,
76 self.topic_subscriptions,
77 self.topic_publications,
78 self.services,])) == 0
79
80 - def add(self, type_, key):
95
111
112
113
114
116 """
117 Method to shutdown another ROS node. Generally invoked within a
118 separate thread as this is used to cleanup hung nodes.
119
120 @param api: XML-RPC API of node to shutdown
121 @type api: str
122 @param caller_id: name of node being shutdown
123 @type caller_id: str
124 @param reason: human-readable reason why node is being shutdown
125 @type reason: str
126 """
127 try:
128 xmlrpcapi(api).shutdown('/master', "[{}] Reason: {}".format(caller_id, reason))
129 except:
130 pass
131 remove_server_proxy(api)
132
134 """
135 All calls may result in access/modifications to node registrations
136 dictionary, so be careful to guarantee appropriate thread-safeness.
137
138 Data structure for storing a set of registrations (e.g. publications, services).
139 The underlying data storage is the same except for services, which have the
140 constraint that only one registration may be active for a given key.
141 """
142
143 TOPIC_SUBSCRIPTIONS = 1
144 TOPIC_PUBLICATIONS = 2
145 SERVICE = 3
146 PARAM_SUBSCRIPTIONS = 4
147
165
167 """
168 @return: True if there are registrations
169 """
170 return len(self.map) != 0
171
173 """
174 @return: True if there are registrations
175 """
176 return len(self.map) != 0
177
179 """
180 Iterate over registration keys
181 @return: iterator for registration keys
182 """
183 return self.map.keys()
184
186 """
187 Lookup service API URI. NOTE: this should only be valid if type==SERVICE as
188 service Registrations instances are the only ones that track service API URIs.
189 @param service: service name
190 @type service: str
191 @return str: service_api for registered key or None if
192 registration is no longer valid.
193 @type: str
194 """
195 if self.service_api_map and service in self.service_api_map:
196 caller_id, service_api = self.service_api_map[service]
197 return service_api
198 return None
199
201 """
202 Only valid if self.type != SERVICE.
203 @param key: registration key (e.g. topic/service/param name)
204 @type key: str
205 @return: caller_apis for registered key, empty list if registration is not valid
206 @rtype: [str]
207 """
208 return [api for _, api in self.map.get(key, [])]
209
211 """
212 Emulate mapping type for has_key()
213 """
214 return key in self.map
215
217 """
218 @param key: registration key (e.g. topic/service/param name)
219 @type key: str
220 @return: (caller_id, caller_api) for registered
221 key, empty list if registration is not valid
222 @rtype: [(str, str),]
223 """
224
225
226
227 return self.map.get(key, [])
228
230 """
231 @param key: registration key (e.g. topic/service/param name)
232 @type key: str
233 @return: True if key is registered
234 @rtype: bool
235 """
236 return key in self.map
237
239 """
240 @return: state in getSystemState()-friendly format [ [key, [callerId1...callerIdN]] ... ]
241 @rtype: [str, [str]...]
242 """
243 retval = []
244 for k in self.map.keys():
245 retval.append([k, [id for id, _ in self.map[k]]])
246 return retval
247
248 - def register(self, key, caller_id, caller_api, service_api=None):
249 """
250 Add caller_id into the map as a provider of the specified
251 service (key). caller_id must not have been previously
252 registered with a different caller_api.
253
254 Subroutine for managing provider map data structure (essentially a multimap).
255 @param key: registration key (e.g. topic/service/param name)
256 @type key: str
257 @param caller_id: caller_id of provider
258 @type caller_id: str
259 @param caller_api: API URI of provider
260 @type caller_api: str
261 @param service_api: (keyword) ROS service API URI if registering a service
262 @type service_api: str
263 """
264 map = self.map
265 if key in map and not service_api:
266 providers = map[key]
267 if not (caller_id, caller_api) in providers:
268 providers.append((caller_id, caller_api))
269 else:
270 map[key] = providers = [(caller_id, caller_api)]
271
272 if service_api:
273 if self.service_api_map is None:
274 self.service_api_map = {}
275 self.service_api_map[key] = (caller_id, service_api)
276 elif self.type == Registrations.SERVICE:
277 raise rosmaster.exceptions.InternalException("service_api must be specified for Registrations.SERVICE")
278
280 """
281 Remove all registrations associated with caller_id
282 @param caller_id: caller_id of provider
283 @type caller_id: str
284 """
285 map = self.map
286
287 dead_keys = []
288 for key in map:
289 providers = map[key]
290
291 to_remove = [(id, api) for id, api in providers if id == caller_id]
292
293 for r in to_remove:
294 providers.remove(r)
295 if not providers:
296 dead_keys.append(key)
297 for k in dead_keys:
298 del self.map[k]
299 if self.type == Registrations.SERVICE and self.service_api_map:
300 del dead_keys[:]
301 for key, val in self.service_api_map.items():
302 if val[0] == caller_id:
303 dead_keys.append(key)
304 for k in dead_keys:
305 del self.service_api_map[k]
306
307 - def unregister(self, key, caller_id, caller_api, service_api=None):
308 """
309 Remove caller_id from the map as a provider of the specified service (key).
310 Subroutine for managing provider map data structure, essentially a multimap
311 @param key: registration key (e.g. topic/service/param name)
312 @type key: str
313 @param caller_id: caller_id of provider
314 @type caller_id: str
315 @param caller_api: API URI of provider
316 @type caller_api: str
317 @param service_api: (keyword) ROS service API URI if registering a service
318 @type service_api: str
319 @return: for ease of master integration, directly returns unregister value for
320 higher-level XMLRPC API. val is the number of APIs unregistered (0 or 1)
321 @rtype: code, msg, val
322 """
323
324 if service_api:
325
326 if self.service_api_map is None:
327 return 1, "[%s] is not a provider of [%s]"%(caller_id, key), 0
328 if self.service_api_map.get(key, None) != (caller_id, service_api):
329 return 1, "[%s] is no longer the current service api handle for [%s]"%(service_api, key), 0
330 else:
331 del self.service_api_map[key]
332 del self.map[key]
333
334 return 1, "Unregistered [%s] as provider of [%s]"%(caller_id, key), 1
335 elif self.type == Registrations.SERVICE:
336 raise rosmaster.exceptions.InternalException("service_api must be specified for Registrations.SERVICE")
337 else:
338 providers = self.map.get(key, [])
339 if (caller_id, caller_api) in providers:
340 providers.remove((caller_id, caller_api))
341 if not providers:
342 del self.map[key]
343 return 1, "Unregistered [%s] as provider of [%s]"%(caller_id, key), 1
344 else:
345 return 1, "[%s] is not a known provider of [%s]"%(caller_id, key), 0
346
348 """
349 Stores registrations for Master.
350
351 RegistrationManager is not threadsafe, so access must be externally locked as appropriate
352 """
353
367
368
370 """
371 Get a NodeRef by caller_api
372 @param caller_api: caller XML RPC URI
373 @type caller_api: str
374 @return: nodes that declare caller_api as their
375 API. 99.9% of the time this should only be one node, but we
376 allow for multiple matches as the master API does not restrict
377 this.
378 @rtype: [NodeRef]
379 """
380 matches = [n for n in self.nodes.items() if n.api == caller_api]
381 if matches:
382 return matches
383
385 return self.nodes.get(caller_id, None)
386
387 - def _register(self, r, key, caller_id, caller_api, service_api=None):
388
389 node_ref, changed = self._register_node_api(caller_id, caller_api)
390 node_ref.add(r.type, key)
391
392 if changed:
393 self.publishers.unregister_all(caller_id)
394 self.subscribers.unregister_all(caller_id)
395 self.services.unregister_all(caller_id)
396 self.param_subscribers.unregister_all(caller_id)
397 r.register(key, caller_id, caller_api, service_api)
398
399 - def _unregister(self, r, key, caller_id, caller_api, service_api=None):
400 node_ref = self.nodes.get(caller_id, None)
401 if node_ref != None:
402 retval = r.unregister(key, caller_id, caller_api, service_api)
403
404 if retval[2] == 1:
405 node_ref.remove(r.type, key)
406 if node_ref.is_empty():
407 del self.nodes[caller_id]
408 else:
409 retval = 1, "[%s] is not a registered node"%caller_id, 0
410 return retval
411
413 """
414 Register service provider
415 @return: None
416 """
417 self._register(self.services, service, caller_id, caller_api, service_api)
419 """
420 Register topic publisher
421 @return: None
422 """
423 self._register(self.publishers, topic, caller_id, caller_api)
425 """
426 Register topic subscriber
427 @return: None
428 """
429 self._register(self.subscribers, topic, caller_id, caller_api)
431 """
432 Register param subscriber
433 @return: None
434 """
435 self._register(self.param_subscribers, param, caller_id, caller_api)
436
438 caller_api = None
439 return self._unregister(self.services, service, caller_id, caller_api, service_api)
440
442 return self._unregister(self.subscribers, topic, caller_id, caller_api)
444 return self._unregister(self.publishers, topic, caller_id, caller_api)
446 return self._unregister(self.param_subscribers, param, caller_id, caller_api)
447
449 """
450 @param caller_id: caller_id of provider
451 @type caller_id: str
452 @param caller_api: caller_api of provider
453 @type caller_api: str
454 @return: (registration_information, changed_registration). changed_registration is true if
455 caller_api is differet than the one registered with caller_id
456 @rtype: (NodeRef, bool)
457 """
458 node_ref = self.nodes.get(caller_id, None)
459
460 bumped_api = None
461 if node_ref is not None:
462 if node_ref.api == caller_api:
463 return node_ref, False
464 else:
465 bumped_api = node_ref.api
466 self.thread_pool.queue_task(bumped_api, shutdown_node_task,
467 (bumped_api, caller_id, "new node registered with same name"))
468
469 node_ref = NodeRef(caller_id, caller_api)
470 self.nodes[caller_id] = node_ref
471 return (node_ref, bumped_api != None)
472