1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """Internal use: handles maintaining registrations with master via internal listener APIs"""
36
37
38
39 import socket
40 import sys
41 import logging
42 import threading
43 import time
44 import traceback
45 try:
46 import xmlrpc.client as xmlrpcclient
47 except ImportError:
48 import xmlrpclib as xmlrpcclient
49
50 from rospy.core import is_shutdown, is_shutdown_requested, xmlrpcapi, \
51 logfatal, logwarn, loginfo, logerr, logdebug, \
52 signal_shutdown, add_preshutdown_hook
53 from rospy.names import get_caller_id, get_namespace
54
55
56
57 _topic_manager = None
63
64 _service_manager = None
70
71
73 """Registration types"""
74 PUB = 'pub'
75 SUB = 'sub'
76 SRV = 'srv'
77
79 """Listener API for subscribing to changes in Publisher/Subscriber/Service declarations"""
80
81 - def reg_added(self, resolved_name, data_type_or_uri, reg_type):
82 """
83 New pub/sub/service declared.
84 @param resolved_name: resolved topic/service name
85 @param data_type_or_uri: topic type or service uri
86 @type data_type_or_uri: str
87 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
88 @type reg_type: str
89 """
90 pass
91
92 - def reg_removed(self, resolved_name, data_type_or_uri, reg_type):
93 """
94 New pub/sub/service removed.
95 @param resolved_name: topic/service name
96 @type resolved_name: str
97 @param data_type_or_uri: topic type or service uri
98 @type data_type_or_uri: str
99 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
100 @type reg_type: str
101 """
102 pass
103
105
107 """
108 ctor.
109 """
110 self.listeners = []
111 self.lock = threading.Lock()
112
114 """
115 Subscribe to notifications of pub/sub/service registration
116 changes. This is an internal API used to notify higher level
117 routines when to communicate with the master.
118 @param l: listener to subscribe
119 @type l: TopicListener
120 """
121 assert isinstance(l, RegistrationListener)
122 with self.lock:
123 self.listeners.append(l)
124
126 """
127 @param resolved_name: resolved_topic/service name
128 @type resolved_name: str
129 @param data_type_or_uri: topic type or service uri
130 @type data_type_or_uri: str
131 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
132 @type reg_type: str
133 """
134 with self.lock:
135 for l in self.listeners:
136 try:
137 l.reg_removed(resolved_name, data_type_or_uri, reg_type)
138 except Exception as e:
139 logerr("error notifying listener of removal: %s"%traceback.format_exc(e))
140
141 - def notify_added(self, resolved_name, data_type, reg_type):
142 """
143 @param resolved_name: topic/service name
144 @type resolved_name: str
145 @param data_type: topic/service type
146 @type data_type: str
147 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
148 @type reg_type: str
149 """
150 with self.lock:
151 for l in self.listeners:
152 try:
153 l.reg_added(resolved_name, data_type, reg_type)
154 except Exception as e:
155 logerr(traceback.format_exc(e))
156
158 """
159 Remove all registration listeners
160 """
161 if not is_shutdown_requested():
162 with self.lock:
163 del self.listeners[:]
164 else:
165
166
167 locked = self.lock.acquire(False)
168
169 del self.listeners[:]
170 if locked:
171 self.lock.release()
172
173 _registration_listeners = RegistrationListeners()
176
177
178
180 """
181 Registration manager used by Node implemenation.
182 Communicates with ROS Master to maintain topic registration
183 information. Also responds to publisher updates to create topic
184 connections
185 """
186
188 """
189 ctor.
190 @param handler: node API handler
191 """
192 self.logger = logging.getLogger("rospy.registration")
193 self.handler = handler
194 self.uri = self.master_uri = None
195 self.updates = []
196 self.cond = threading.Condition()
197 self.registered = False
198
199 add_preshutdown_hook(self.cleanup)
200
201 - def start(self, uri, master_uri):
202 """
203 Start the RegManager. This should be passed in as an argument to a thread
204 starter as the RegManager is designed to spin in its own thread
205 @param uri: URI of local node
206 @type uri: str
207 @param master_uri: Master URI
208 @type master_uri: str
209 """
210 self.registered = False
211 self.master_uri = master_uri
212 self.uri = uri
213 first = True
214 tm = get_topic_manager()
215 sm = get_service_manager()
216 ns = get_namespace()
217 caller_id = get_caller_id()
218 if not master_uri or master_uri == uri:
219 registered = True
220 master = None
221 else:
222 registered = False
223 master = xmlrpcapi(master_uri)
224 self.logger.info("Registering with master node %s", master_uri)
225
226 while not registered and not is_shutdown():
227 try:
228 try:
229
230 tm.lock.acquire()
231 sm.lock.acquire()
232
233 pub, sub, srv = tm.get_publications(), tm.get_subscriptions(), sm.get_services()
234 for resolved_name, data_type in pub:
235 self.logger.info("Registering publisher topic [%s] type [%s] with master", resolved_name, data_type)
236 code, msg, val = master.registerPublisher(caller_id, resolved_name, data_type, uri)
237 if code != 1:
238 logfatal("cannot register publication topic [%s] with master: %s"%(resolved_name, msg))
239 signal_shutdown("master/node incompatibility with register publisher")
240 for resolved_name, data_type in sub:
241 self.logger.info("registering subscriber topic [%s] type [%s] with master", resolved_name, data_type)
242 code, msg, val = master.registerSubscriber(caller_id, resolved_name, data_type, uri)
243 if code != 1:
244 logfatal("cannot register subscription topic [%s] with master: %s"%(resolved_name, msg))
245 signal_shutdown("master/node incompatibility with register subscriber")
246 else:
247 self.publisher_update(resolved_name, val)
248 for resolved_name, service_uri in srv:
249 self.logger.info("registering service [%s] uri [%s] with master", resolved_name, service_uri)
250 code, msg, val = master.registerService(caller_id, resolved_name, service_uri, uri)
251 if code != 1:
252 logfatal("cannot register service [%s] with master: %s"%(resolved_name, msg))
253 signal_shutdown("master/node incompatibility with register service")
254
255 registered = True
256
257
258 get_registration_listeners().add_listener(self)
259 finally:
260 sm.lock.release()
261 tm.lock.release()
262
263 if pub or sub:
264 logdebug("Registered [%s] with master node %s", caller_id, master_uri)
265 else:
266 logdebug("No topics to register with master node %s", master_uri)
267
268 except Exception as e:
269 if first:
270
271 logerr("Unable to immediately register with master node [%s]: master may not be running yet. Will keep trying."%master_uri)
272 first = False
273 time.sleep(0.2)
274 self.registered = True
275 self.run()
276
278 """
279 Check if Node has been registered yet.
280 @return: True if registration has occurred with master
281 @rtype: bool
282 """
283 return self.registered
284
286 """
287 Main RegManager thread loop.
288 Periodically checks the update
289 queue and generates topic connections
290 """
291
292 while not self.handler.done and not is_shutdown():
293 cond = self.cond
294 try:
295 cond.acquire()
296 if not self.updates:
297 cond.wait(0.5)
298 if self.updates:
299
300 topic, uris = self.updates.pop()
301
302 self.updates = [x for x in self.updates if x[0] != topic]
303 else:
304 topic = uris = None
305 finally:
306 if cond is not None:
307 cond.release()
308
309
310
311 if uris and not self.handler.done:
312 for uri in uris:
313
314 t = threading.Thread(target=self._connect_topic_thread, args=(topic, uri))
315 t.setDaemon(True)
316 t.start()
317
319 try:
320 code, msg, _ = self.handler._connect_topic(topic, uri)
321 if code != 1:
322 logdebug("Unable to connect subscriber to publisher [%s] for topic [%s]: %s", uri, topic, msg)
323 except Exception as e:
324 if not is_shutdown():
325 logdebug("Unable to connect to publisher [%s] for topic [%s]: %s"%(uri, topic, traceback.format_exc()))
326
328 """
329 Cleans up registrations with master and releases topic and service resources
330 @param reason: human-reasonable debug string
331 @type reason: str
332 """
333 self.logger.debug("registration cleanup starting")
334 try:
335 self.cond.acquire()
336 self.cond.notifyAll()
337 finally:
338 self.cond.release()
339
340
341 if not self.master_uri:
342 return
343
344 master = xmlrpcapi(self.master_uri)
345
346 if master is None:
347 return
348
349 caller_id = get_caller_id()
350
351
352 rl = get_registration_listeners()
353 if rl is not None:
354 rl.clear()
355
356 tm = get_topic_manager()
357 sm = get_service_manager()
358 try:
359 multi = xmlrpcclient.MultiCall(master)
360 if tm is not None:
361 for resolved_name, _ in tm.get_subscriptions():
362 self.logger.debug("unregisterSubscriber [%s]"%resolved_name)
363 multi.unregisterSubscriber(caller_id, resolved_name, self.uri)
364 for resolved_name, _ in tm.get_publications():
365 self.logger.debug("unregisterPublisher [%s]"%resolved_name)
366 multi.unregisterPublisher(caller_id, resolved_name, self.uri)
367
368 if sm is not None:
369 for resolved_name, service_uri in sm.get_services():
370 self.logger.debug("unregisterService [%s]"%resolved_name)
371 multi.unregisterService(caller_id, resolved_name, service_uri)
372 multi()
373 except socket.error as se:
374 (errno, msg) = se.args
375 if errno == 111 or errno == 61:
376 self.logger.warn("cannot unregister with master due to network issues")
377 else:
378 self.logger.warn("unclean shutdown\n%s"%traceback.format_exc())
379 except:
380 self.logger.warn("unclean shutdown\n%s"%traceback.format_exc())
381
382 self.logger.debug("registration cleanup: master calls complete")
383
384
385
386 if tm is not None:
387 tm.close_all()
388 if sm is not None:
389 sm.unregister_all()
390
391 - def reg_removed(self, resolved_name, data_type_or_uri, reg_type):
392 """
393 RegistrationListener callback
394 @param resolved_name: resolved name of topic or service
395 @type resolved_name: str
396 @param data_type_or_uri: either the data type (for topic regs) or the service URI (for service regs).
397 @type data_type_or_uri: str
398 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
399 @type reg_type: str
400 """
401 master_uri = self.master_uri
402 if not master_uri:
403 self.logger.error("Registrar: master_uri is not set yet, cannot inform master of deregistration")
404 else:
405 try:
406 master = xmlrpcapi(master_uri)
407 if reg_type == Registration.PUB:
408 self.logger.debug("unregisterPublisher(%s, %s)", resolved_name, self.uri)
409 master.unregisterPublisher(get_caller_id(), resolved_name, self.uri)
410 elif reg_type == Registration.SUB:
411 self.logger.debug("unregisterSubscriber(%s, %s)", resolved_name, data_type_or_uri)
412 master.unregisterSubscriber(get_caller_id(), resolved_name, self.uri)
413 elif reg_type == Registration.SRV:
414 self.logger.debug("unregisterService(%s, %s)", resolved_name, data_type_or_uri)
415 master.unregisterService(get_caller_id(), resolved_name, data_type_or_uri)
416 except:
417 logwarn("unable to communicate with ROS Master, registrations are now out of sync")
418 self.logger.error(traceback.format_exc())
419
420 - def reg_added(self, resolved_name, data_type_or_uri, reg_type):
421 """
422 RegistrationListener callback
423 @param resolved_name: resolved name of topic or service
424 @type resolved_name: str
425 @param data_type_or_uri: either the data type (for topic regs) or the service URI (for service regs).
426 @type data_type_or_uri: str
427 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
428 @type reg_type: str
429 """
430
431 master_uri = self.master_uri
432 if not master_uri:
433 self.logger.error("Registrar: master_uri is not set yet, cannot inform master of registration")
434 else:
435 master = xmlrpcapi(master_uri)
436 args = (get_caller_id(), resolved_name, data_type_or_uri, self.uri)
437 registered = False
438 first = True
439 while not registered and not is_shutdown():
440 try:
441 if reg_type == Registration.PUB:
442 self.logger.debug("master.registerPublisher(%s, %s, %s, %s)"%args)
443 code, msg, val = master.registerPublisher(*args)
444 if code != 1:
445 logfatal("unable to register publication [%s] with master: %s"%(resolved_name, msg))
446 elif reg_type == Registration.SUB:
447 self.logger.debug("master.registerSubscriber(%s, %s, %s, %s)"%args)
448 code, msg, val = master.registerSubscriber(*args)
449 if code == 1:
450 self.publisher_update(resolved_name, val)
451 else:
452
453
454 logfatal("unable to register subscription [%s] with master: %s"%(resolved_name, msg))
455 elif reg_type == Registration.SRV:
456 self.logger.debug("master.registerService(%s, %s, %s, %s)"%args)
457 code, msg, val = master.registerService(*args)
458 if code != 1:
459 logfatal("unable to register service [%s] with master: %s"%(resolved_name, msg))
460
461 registered = True
462 except Exception as e:
463 if first:
464 msg = "Unable to register with master node [%s]: master may not be running yet. Will keep trying."%master_uri
465 self.logger.error(str(e)+"\n"+msg)
466 print(msg)
467 first = False
468 time.sleep(0.2)
469
471 """
472 Inform psmanager of latest publisher list for a topic. This
473 will cause L{RegManager} to create a topic connection for all new
474 publishers (in a separate thread).
475 @param resolved_name: resolved topic name
476 @type resolved_name: str
477 @param uris: list of all publishers uris for topic
478 @type uris: [str]
479 """
480 try:
481 self.cond.acquire()
482 self.updates.append((resolved_name, uris))
483 self.cond.notifyAll()
484 finally:
485 self.cond.release()
486