1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """Internal use: handles maintaining registrations with master via internal listener APIs"""
36
37 from __future__ import with_statement
38
39 import socket
40 import sys
41 import logging
42 import thread
43 import threading
44 import time
45 import traceback
46 import xmlrpclib
47
48 from rospy.core import is_shutdown, xmlrpcapi, \
49 logfatal, logwarn, loginfo, logerr, logdebug, \
50 signal_shutdown, add_preshutdown_hook
51 from rospy.names import get_caller_id, get_namespace
52
53
54
55 _topic_manager = None
61
62 _service_manager = None
68
69
71 """Registration types"""
72 PUB = 'pub'
73 SUB = 'sub'
74 SRV = 'srv'
75
77 """Listener API for subscribing to changes in Publisher/Subscriber/Service declarations"""
78
79 - def reg_added(self, resolved_name, data_type_or_uri, reg_type):
80 """
81 New pub/sub/service declared.
82 @param resolved_name: resolved topic/service name
83 @param data_type_or_uri: topic type or service uri
84 @type data_type_or_uri: str
85 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
86 @type reg_type: str
87 """
88 pass
89
90 - def reg_removed(self, resolved_name, data_type_or_uri, reg_type):
91 """
92 New pub/sub/service removed.
93 @param resolved_name: topic/service name
94 @type resolved_name: str
95 @param data_type_or_uri: topic type or service uri
96 @type data_type_or_uri: str
97 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
98 @type reg_type: str
99 """
100 pass
101
103
105 """
106 ctor.
107 """
108 self.listeners = []
109 self.lock = threading.Lock()
110
112 """
113 Subscribe to notifications of pub/sub/service registration
114 changes. This is an internal API used to notify higher level
115 routines when to communicate with the master.
116 @param l: listener to subscribe
117 @type l: TopicListener
118 """
119 assert isinstance(l, RegistrationListener)
120 with self.lock:
121 self.listeners.append(l)
122
124 """
125 @param resolved_name: resolved_topic/service name
126 @type resolved_name: str
127 @param data_type_or_uri: topic type or service uri
128 @type data_type_or_uri: str
129 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
130 @type reg_type: str
131 """
132 with self.lock:
133 for l in self.listeners:
134 try:
135 l.reg_removed(resolved_name, data_type_or_uri, reg_type)
136 except Exception, e:
137 logerr("error notifying listener of removal: %s"%traceback.format_exc(e))
138
139 - def notify_added(self, resolved_name, data_type, reg_type):
140 """
141 @param resolved_name: topic/service name
142 @type resolved_name: str
143 @param data_type: topic/service type
144 @type data_type: str
145 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
146 @type reg_type: str
147 """
148 with self.lock:
149 for l in self.listeners:
150 try:
151 l.reg_added(resolved_name, data_type, reg_type)
152 except Exception, e:
153 logerr(traceback.format_exc(e))
154
156 """
157 Remove all registration listeners
158 """
159 with self.lock:
160 del self.listeners[:]
161
162 _registration_listeners = RegistrationListeners()
165
166
167
169 """
170 Registration manager used by Node implemenation.
171 Communicates with ROS Master to maintain topic registration
172 information. Also responds to publisher updates to create topic
173 connections
174 """
175
177 """
178 ctor.
179 @param handler: node API handler
180 """
181 self.logger = logging.getLogger("rospy.registration")
182 self.handler = handler
183 self.uri = self.master_uri = None
184 self.updates = []
185 self.cond = threading.Condition()
186 self.registered = False
187
188 add_preshutdown_hook(self.cleanup)
189
190 - def start(self, uri, master_uri):
191 """
192 Start the RegManager. This should be passed in as an argument to a thread
193 starter as the RegManager is designed to spin in its own thread
194 @param uri: URI of local node
195 @type uri: str
196 @param master_uri: Master URI
197 @type master_uri: str
198 """
199 self.registered = False
200 self.master_uri = master_uri
201 self.uri = uri
202 first = True
203 tm = get_topic_manager()
204 sm = get_service_manager()
205 ns = get_namespace()
206 caller_id = get_caller_id()
207 if not master_uri or master_uri == uri:
208 registered = True
209 master = None
210 else:
211 registered = False
212 master = xmlrpcapi(master_uri)
213 self.logger.info("Registering with master node %s", master_uri)
214
215 while not registered and not is_shutdown():
216 try:
217 try:
218
219 tm.lock.acquire()
220 sm.lock.acquire()
221
222 pub, sub, srv = tm.get_publications(), tm.get_subscriptions(), sm.get_services()
223 for resolved_name, data_type in pub:
224 self.logger.info("Registering publisher topic [%s] type [%s] with master", resolved_name, data_type)
225 code, msg, val = master.registerPublisher(caller_id, resolved_name, data_type, uri)
226 if code != 1:
227 logfatal("cannot register publication topic [%s] with master: %s"%(resolved_name, msg))
228 signal_shutdown("master/node incompatibility with register publisher")
229 for resolved_name, data_type in sub:
230 self.logger.info("registering subscriber topic [%s] type [%s] with master", resolved_name, data_type)
231 code, msg, val = master.registerSubscriber(caller_id, resolved_name, data_type, uri)
232 if code != 1:
233 logfatal("cannot register subscription topic [%s] with master: %s"%(resolved_name, msg))
234 signal_shutdown("master/node incompatibility with register subscriber")
235 else:
236 self.publisher_update(resolved_name, val)
237 for resolved_name, service_uri in srv:
238 self.logger.info("registering service [%s] uri [%s] with master", resolved_name, service_uri)
239 code, msg, val = master.registerService(caller_id, resolved_name, service_uri, uri)
240 if code != 1:
241 logfatal("cannot register service [%s] with master: %s"%(resolved_name, msg))
242 signal_shutdown("master/node incompatibility with register service")
243
244 registered = True
245
246
247 get_registration_listeners().add_listener(self)
248 finally:
249 sm.lock.release()
250 tm.lock.release()
251
252 if pub or sub:
253 logdebug("Registered [%s] with master node %s", caller_id, master_uri)
254 else:
255 logdebug("No topics to register with master node %s", master_uri)
256
257 except Exception, e:
258 if first:
259
260 logerr("Unable to immediately register with master node [%s]: master may not be running yet. Will keep trying."%master_uri)
261 first = False
262 time.sleep(0.2)
263 self.registered = True
264 self.run()
265
267 """
268 Check if Node has been registered yet.
269 @return: True if registration has occurred with master
270 @rtype: bool
271 """
272 return self.registered
273
275 """
276 Main RegManager thread loop.
277 Periodically checks the update
278 queue and generates topic connections
279 """
280
281 while not self.handler.done and not is_shutdown():
282 cond = self.cond
283 try:
284 cond.acquire()
285 if not self.updates:
286 cond.wait(0.5)
287 if self.updates:
288
289 topic, uris = self.updates.pop()
290
291 self.updates = [x for x in self.updates if x[0] != topic]
292 else:
293 topic = uris = None
294 finally:
295 if cond is not None:
296 cond.release()
297
298
299
300 if uris and not self.handler.done:
301 for uri in uris:
302
303 thread.start_new_thread(self._connect_topic_thread, (topic, uri))
304
306 try:
307 code, msg, _ = self.handler._connect_topic(topic, uri)
308 if code != 1:
309 logdebug("Unable to connect subscriber to publisher [%s] for topic [%s]: %s", uri, topic, msg)
310 except Exception, e:
311 if not is_shutdown():
312 logdebug("Unable to connect to publisher [%s] for topic [%s]: %s"%(uri, topic, traceback.format_exc()))
313
315 """
316 Cleans up registrations with master and releases topic and service resources
317 @param reason: human-reasonable debug string
318 @type reason: str
319 """
320 self.logger.debug("registration cleanup starting")
321 try:
322 self.cond.acquire()
323 self.cond.notifyAll()
324 finally:
325 self.cond.release()
326
327
328 if not self.master_uri:
329 return
330
331 master = xmlrpcapi(self.master_uri)
332
333 if master is None:
334 return
335
336 caller_id = get_caller_id()
337
338
339 rl = get_registration_listeners()
340 if rl is not None:
341 rl.clear()
342
343 tm = get_topic_manager()
344 sm = get_service_manager()
345 try:
346 multi = xmlrpclib.MultiCall(master)
347 if tm is not None:
348 for resolved_name, _ in tm.get_subscriptions():
349 self.logger.debug("unregisterSubscriber [%s]"%resolved_name)
350 multi.unregisterSubscriber(caller_id, resolved_name, self.uri)
351 for resolved_name, _ in tm.get_publications():
352 self.logger.debug("unregisterPublisher [%s]"%resolved_name)
353 multi.unregisterPublisher(caller_id, resolved_name, self.uri)
354
355 if sm is not None:
356 for resolved_name, service_uri in sm.get_services():
357 self.logger.debug("unregisterService [%s]"%resolved_name)
358 multi.unregisterService(caller_id, resolved_name, service_uri)
359 multi()
360 except socket.error, (errno, msg):
361 if errno == 111 or errno == 61:
362 self.logger.warn("cannot unregister with master due to network issues")
363 else:
364 self.logger.warn("unclean shutdown\n%s"%traceback.format_exc())
365 except:
366 self.logger.warn("unclean shutdown\n%s"%traceback.format_exc())
367
368 self.logger.debug("registration cleanup: master calls complete")
369
370
371
372 if tm is not None:
373 tm.close_all()
374 if sm is not None:
375 sm.unregister_all()
376
377 - def reg_removed(self, resolved_name, data_type_or_uri, reg_type):
378 """
379 RegistrationListener callback
380 @param resolved_name: resolved name of topic or service
381 @type resolved_name: str
382 @param data_type_or_uri: either the data type (for topic regs) or the service URI (for service regs).
383 @type data_type_or_uri: str
384 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
385 @type reg_type: str
386 """
387 master_uri = self.master_uri
388 if not master_uri:
389 self.logger.error("Registrar: master_uri is not set yet, cannot inform master of deregistration")
390 else:
391 try:
392 master = xmlrpcapi(master_uri)
393 if reg_type == Registration.PUB:
394 self.logger.debug("unregisterPublisher(%s, %s)", resolved_name, self.uri)
395 master.unregisterPublisher(get_caller_id(), resolved_name, self.uri)
396 elif reg_type == Registration.SUB:
397 self.logger.debug("unregisterSubscriber(%s, %s)", resolved_name, data_type_or_uri)
398 master.unregisterSubscriber(get_caller_id(), resolved_name, self.uri)
399 elif reg_type == Registration.SRV:
400 self.logger.debug("unregisterService(%s, %s)", resolved_name, data_type_or_uri)
401 master.unregisterService(get_caller_id(), resolved_name, data_type_or_uri)
402 except:
403 logwarn("unable to communicate with ROS Master, registrations are now out of sync")
404 self.logger.error(traceback.format_exc())
405
406 - def reg_added(self, resolved_name, data_type_or_uri, reg_type):
407 """
408 RegistrationListener callback
409 @param resolved_name: resolved name of topic or service
410 @type resolved_name: str
411 @param data_type_or_uri: either the data type (for topic regs) or the service URI (for service regs).
412 @type data_type_or_uri: str
413 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
414 @type reg_type: str
415 """
416
417 master_uri = self.master_uri
418 if not master_uri:
419 self.logger.error("Registrar: master_uri is not set yet, cannot inform master of registration")
420 else:
421 master = xmlrpcapi(master_uri)
422 args = (get_caller_id(), resolved_name, data_type_or_uri, self.uri)
423 registered = False
424 first = True
425 while not registered and not is_shutdown():
426 try:
427 if reg_type == Registration.PUB:
428 self.logger.debug("master.registerPublisher(%s, %s, %s, %s)"%args)
429 code, msg, val = master.registerPublisher(*args)
430 if code != 1:
431 logfatal("unable to register publication [%s] with master: %s"%(resolved_name, msg))
432 elif reg_type == Registration.SUB:
433 self.logger.debug("master.registerSubscriber(%s, %s, %s, %s)"%args)
434 code, msg, val = master.registerSubscriber(*args)
435 if code == 1:
436 self.publisher_update(resolved_name, val)
437 else:
438
439
440 logfatal("unable to register subscription [%s] with master: %s"%(resolved_name, msg))
441 elif reg_type == Registration.SRV:
442 self.logger.debug("master.registerService(%s, %s, %s, %s)"%args)
443 code, msg, val = master.registerService(*args)
444 if code != 1:
445 logfatal("unable to register service [%s] with master: %s"%(resolved_name, msg))
446
447 registered = True
448 except Exception, e:
449 if first:
450 msg = "Unable to register with master node [%s]: master may not be running yet. Will keep trying."%master_uri
451 self.logger.error(str(e)+"\n"+msg)
452 print msg
453 first = False
454 time.sleep(0.2)
455
457 """
458 Inform psmanager of latest publisher list for a topic. This
459 will cause L{RegManager} to create a topic connection for all new
460 publishers (in a separate thread).
461 @param resolved_name: resolved topic name
462 @type resolved_name: str
463 @param uris: list of all publishers uris for topic
464 @type uris: [str]
465 """
466 try:
467 self.cond.acquire()
468 self.updates.append((resolved_name, uris))
469 self.cond.notifyAll()
470 finally:
471 self.cond.release()
472