1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """Internal use: handles maintaining registrations with master via internal listener APIs"""
36
37
38
39 import socket
40 import sys
41 import logging
42 import threading
43 import time
44 import traceback
45 try:
46 import xmlrpc.client as xmlrpcclient
47 except ImportError:
48 import xmlrpclib as xmlrpcclient
49
50 from rospy.core import is_shutdown, is_shutdown_requested, xmlrpcapi, \
51 logfatal, logwarn, loginfo, logerr, logdebug, \
52 signal_shutdown, add_preshutdown_hook
53 from rospy.names import get_caller_id, get_namespace
54
55
56
57 _topic_manager = None
63
64 _service_manager = None
70
71
73 """Registration types"""
74 PUB = 'pub'
75 SUB = 'sub'
76 SRV = 'srv'
77
79 """Listener API for subscribing to changes in Publisher/Subscriber/Service declarations"""
80
81 - def reg_added(self, resolved_name, data_type_or_uri, reg_type):
82 """
83 New pub/sub/service declared.
84 @param resolved_name: resolved topic/service name
85 @param data_type_or_uri: topic type or service uri
86 @type data_type_or_uri: str
87 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
88 @type reg_type: str
89 """
90 pass
91
92 - def reg_removed(self, resolved_name, data_type_or_uri, reg_type):
93 """
94 New pub/sub/service removed.
95 @param resolved_name: topic/service name
96 @type resolved_name: str
97 @param data_type_or_uri: topic type or service uri
98 @type data_type_or_uri: str
99 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
100 @type reg_type: str
101 """
102 pass
103
105
107 """
108 ctor.
109 """
110 self.listeners = []
111 self.lock = threading.Lock()
112
114 """
115 Subscribe to notifications of pub/sub/service registration
116 changes. This is an internal API used to notify higher level
117 routines when to communicate with the master.
118 @param l: listener to subscribe
119 @type l: TopicListener
120 """
121 assert isinstance(l, RegistrationListener)
122 with self.lock:
123 self.listeners.append(l)
124
126 """
127 @param resolved_name: resolved_topic/service name
128 @type resolved_name: str
129 @param data_type_or_uri: topic type or service uri
130 @type data_type_or_uri: str
131 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
132 @type reg_type: str
133 """
134 with self.lock:
135 for l in self.listeners:
136 try:
137 l.reg_removed(resolved_name, data_type_or_uri, reg_type)
138 except Exception as e:
139 logerr("error notifying listener of removal: %s"%traceback.format_exc())
140
141 - def notify_added(self, resolved_name, data_type, reg_type):
142 """
143 @param resolved_name: topic/service name
144 @type resolved_name: str
145 @param data_type: topic/service type
146 @type data_type: str
147 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
148 @type reg_type: str
149 """
150 with self.lock:
151 for l in self.listeners:
152 try:
153 l.reg_added(resolved_name, data_type, reg_type)
154 except Exception as e:
155 logerr(traceback.format_exc())
156
158 """
159 Remove all registration listeners
160 """
161 if not is_shutdown_requested():
162 with self.lock:
163 del self.listeners[:]
164 else:
165
166
167 locked = self.lock.acquire(False)
168
169 del self.listeners[:]
170 if locked:
171 self.lock.release()
172
173 _registration_listeners = RegistrationListeners()
176
177
178
180 """
181 Registration manager used by Node implemenation.
182 Communicates with ROS Master to maintain topic registration
183 information. Also responds to publisher updates to create topic
184 connections
185 """
186
188 """
189 ctor.
190 @param handler: node API handler
191 """
192 self.logger = logging.getLogger("rospy.registration")
193 self.handler = handler
194 self.uri = self.master_uri = None
195 self.updates = []
196 self.cond = threading.Condition()
197 self.registered = False
198
199 add_preshutdown_hook(self.cleanup)
200
201 - def start(self, uri, master_uri):
202 """
203 Start the RegManager. This should be passed in as an argument to a thread
204 starter as the RegManager is designed to spin in its own thread
205 @param uri: URI of local node
206 @type uri: str
207 @param master_uri: Master URI
208 @type master_uri: str
209 """
210 self.registered = False
211 self.master_uri = master_uri
212 self.uri = uri
213 first = True
214 tm = get_topic_manager()
215 sm = get_service_manager()
216 ns = get_namespace()
217 caller_id = get_caller_id()
218 if not master_uri or master_uri == uri:
219 registered = True
220 master = None
221 else:
222 registered = False
223 master = xmlrpcapi(master_uri)
224 self.logger.info("Registering with master node %s", master_uri)
225
226 while not registered and not is_shutdown():
227 try:
228 try:
229
230 tm.lock.acquire()
231 sm.lock.acquire()
232
233 pub, sub, srv = tm.get_publications(), tm.get_subscriptions(), sm.get_services()
234 for resolved_name, data_type in pub:
235 self.logger.info("Registering publisher topic [%s] type [%s] with master", resolved_name, data_type)
236 code, msg, val = master.registerPublisher(caller_id, resolved_name, data_type, uri)
237 if code != 1:
238 logfatal("cannot register publication topic [%s] with master: %s"%(resolved_name, msg))
239 signal_shutdown("master/node incompatibility with register publisher")
240 for resolved_name, data_type in sub:
241 self.logger.info("registering subscriber topic [%s] type [%s] with master", resolved_name, data_type)
242 code, msg, val = master.registerSubscriber(caller_id, resolved_name, data_type, uri)
243 if code != 1:
244 logfatal("cannot register subscription topic [%s] with master: %s"%(resolved_name, msg))
245 signal_shutdown("master/node incompatibility with register subscriber")
246 else:
247 self.publisher_update(resolved_name, val)
248 for resolved_name, service_uri in srv:
249 self.logger.info("registering service [%s] uri [%s] with master", resolved_name, service_uri)
250 code, msg, val = master.registerService(caller_id, resolved_name, service_uri, uri)
251 if code != 1:
252 logfatal("cannot register service [%s] with master: %s"%(resolved_name, msg))
253 signal_shutdown("master/node incompatibility with register service")
254
255 registered = True
256
257
258 get_registration_listeners().add_listener(self)
259 finally:
260 sm.lock.release()
261 tm.lock.release()
262
263 if pub or sub:
264 logdebug("Registered [%s] with master node %s", caller_id, master_uri)
265 else:
266 logdebug("No topics to register with master node %s", master_uri)
267
268 except Exception as e:
269 if first:
270
271 logerr("Unable to immediately register with master node [%s]: master may not be running yet. Will keep trying."%master_uri)
272 first = False
273 time.sleep(0.2)
274 self.registered = True
275 self.run()
276
278 """
279 Check if Node has been registered yet.
280 @return: True if registration has occurred with master
281 @rtype: bool
282 """
283 return self.registered
284
286 """
287 Main RegManager thread loop.
288 Periodically checks the update
289 queue and generates topic connections
290 """
291
292 while not self.handler.done and not is_shutdown():
293 cond = self.cond
294 try:
295 cond.acquire()
296 if not self.updates:
297 cond.wait(0.5)
298 if self.updates:
299
300 topic, uris = self.updates.pop()
301
302 self.updates = [x for x in self.updates if x[0] != topic]
303 else:
304 topic = uris = None
305 finally:
306 if cond is not None:
307 cond.release()
308
309 get_topic_manager().check_all()
310
311
312
313 if uris and not self.handler.done:
314 for uri in uris:
315
316 t = threading.Thread(target=self._connect_topic_thread, args=(topic, uri))
317 t.setDaemon(True)
318 t.start()
319
321 try:
322 code, msg, _ = self.handler._connect_topic(topic, uri)
323 if code != 1:
324 logdebug("Unable to connect subscriber to publisher [%s] for topic [%s]: %s", uri, topic, msg)
325 except Exception as e:
326 if not is_shutdown():
327 logdebug("Unable to connect to publisher [%s] for topic [%s]: %s"%(uri, topic, traceback.format_exc()))
328
330 """
331 Cleans up registrations with master and releases topic and service resources
332 @param reason: human-reasonable debug string
333 @type reason: str
334 """
335 self.logger.debug("registration cleanup starting")
336 try:
337 self.cond.acquire()
338 self.cond.notifyAll()
339 finally:
340 self.cond.release()
341
342
343 if not self.master_uri:
344 return
345
346 master = xmlrpcapi(self.master_uri)
347
348 if master is None:
349 return
350
351 caller_id = get_caller_id()
352
353
354 rl = get_registration_listeners()
355 if rl is not None:
356 rl.clear()
357
358 tm = get_topic_manager()
359 sm = get_service_manager()
360 try:
361 multi = xmlrpcclient.MultiCall(master)
362 if tm is not None:
363 for resolved_name, _ in tm.get_subscriptions():
364 self.logger.debug("unregisterSubscriber [%s]"%resolved_name)
365 multi.unregisterSubscriber(caller_id, resolved_name, self.uri)
366 for resolved_name, _ in tm.get_publications():
367 self.logger.debug("unregisterPublisher [%s]"%resolved_name)
368 multi.unregisterPublisher(caller_id, resolved_name, self.uri)
369
370 if sm is not None:
371 for resolved_name, service_uri in sm.get_services():
372 self.logger.debug("unregisterService [%s]"%resolved_name)
373 multi.unregisterService(caller_id, resolved_name, service_uri)
374 multi()
375 except socket.error as se:
376 (errno, msg) = se.args
377 if errno == 111 or errno == 61:
378 self.logger.warn("cannot unregister with master due to network issues")
379 else:
380 self.logger.warn("unclean shutdown\n%s"%traceback.format_exc())
381 except:
382 self.logger.warn("unclean shutdown\n%s"%traceback.format_exc())
383
384 self.logger.debug("registration cleanup: master calls complete")
385
386
387
388 if tm is not None:
389 tm.close_all()
390 if sm is not None:
391 sm.unregister_all()
392
393 - def reg_removed(self, resolved_name, data_type_or_uri, reg_type):
394 """
395 RegistrationListener callback
396 @param resolved_name: resolved name of topic or service
397 @type resolved_name: str
398 @param data_type_or_uri: either the data type (for topic regs) or the service URI (for service regs).
399 @type data_type_or_uri: str
400 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
401 @type reg_type: str
402 """
403 master_uri = self.master_uri
404 if not master_uri:
405 self.logger.error("Registrar: master_uri is not set yet, cannot inform master of deregistration")
406 else:
407 try:
408 master = xmlrpcapi(master_uri)
409 if reg_type == Registration.PUB:
410 self.logger.debug("unregisterPublisher(%s, %s)", resolved_name, self.uri)
411 master.unregisterPublisher(get_caller_id(), resolved_name, self.uri)
412 elif reg_type == Registration.SUB:
413 self.logger.debug("unregisterSubscriber(%s, %s)", resolved_name, data_type_or_uri)
414 master.unregisterSubscriber(get_caller_id(), resolved_name, self.uri)
415 elif reg_type == Registration.SRV:
416 self.logger.debug("unregisterService(%s, %s)", resolved_name, data_type_or_uri)
417 master.unregisterService(get_caller_id(), resolved_name, data_type_or_uri)
418 except:
419 logwarn("unable to communicate with ROS Master, registrations are now out of sync")
420 self.logger.error(traceback.format_exc())
421
422 - def reg_added(self, resolved_name, data_type_or_uri, reg_type):
423 """
424 RegistrationListener callback
425 @param resolved_name: resolved name of topic or service
426 @type resolved_name: str
427 @param data_type_or_uri: either the data type (for topic regs) or the service URI (for service regs).
428 @type data_type_or_uri: str
429 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
430 @type reg_type: str
431 """
432
433 master_uri = self.master_uri
434 if not master_uri:
435 self.logger.error("Registrar: master_uri is not set yet, cannot inform master of registration")
436 else:
437 master = xmlrpcapi(master_uri)
438 args = (get_caller_id(), resolved_name, data_type_or_uri, self.uri)
439 registered = False
440 first = True
441 while not registered and not is_shutdown():
442 try:
443 if reg_type == Registration.PUB:
444 self.logger.debug("master.registerPublisher(%s, %s, %s, %s)"%args)
445 code, msg, val = master.registerPublisher(*args)
446 if code != 1:
447 logfatal("unable to register publication [%s] with master: %s"%(resolved_name, msg))
448 elif reg_type == Registration.SUB:
449 self.logger.debug("master.registerSubscriber(%s, %s, %s, %s)"%args)
450 code, msg, val = master.registerSubscriber(*args)
451 if code == 1:
452 self.publisher_update(resolved_name, val)
453 else:
454
455
456 logfatal("unable to register subscription [%s] with master: %s"%(resolved_name, msg))
457 elif reg_type == Registration.SRV:
458 self.logger.debug("master.registerService(%s, %s, %s, %s)"%args)
459 code, msg, val = master.registerService(*args)
460 if code != 1:
461 logfatal("unable to register service [%s] with master: %s"%(resolved_name, msg))
462
463 registered = True
464 except Exception as e:
465 if first:
466 msg = "Unable to register with master node [%s]: master may not be running yet. Will keep trying."%master_uri
467 self.logger.error(str(e)+"\n"+msg)
468 print(msg)
469 first = False
470 time.sleep(0.2)
471
473 """
474 Inform psmanager of latest publisher list for a topic. This
475 will cause L{RegManager} to create a topic connection for all new
476 publishers (in a separate thread).
477 @param resolved_name: resolved topic name
478 @type resolved_name: str
479 @param uris: list of all publishers uris for topic
480 @type uris: [str]
481 """
482 try:
483 self.cond.acquire()
484 self.updates.append((resolved_name, uris))
485 self.cond.notifyAll()
486 finally:
487 self.cond.release()
488