1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """Internal use: handles maintaining registrations with master via internal listener APIs"""
36
37
38
39 import errno
40 import socket
41 import sys
42 import logging
43 import threading
44 import time
45 import traceback
46 try:
47 import xmlrpc.client as xmlrpcclient
48 except ImportError:
49 import xmlrpclib as xmlrpcclient
50
51 from rospy.core import is_shutdown, is_shutdown_requested, xmlrpcapi, \
52 logfatal, logwarn, loginfo, logerr, logdebug, \
53 signal_shutdown, add_preshutdown_hook
54 from rospy.names import get_caller_id, get_namespace
55
56
57
58 _topic_manager = None
64
65 _service_manager = None
71
72
74 """Registration types"""
75 PUB = 'pub'
76 SUB = 'sub'
77 SRV = 'srv'
78
80 """Listener API for subscribing to changes in Publisher/Subscriber/Service declarations"""
81
82 - def reg_added(self, resolved_name, data_type_or_uri, reg_type):
83 """
84 New pub/sub/service declared.
85 @param resolved_name: resolved topic/service name
86 @param data_type_or_uri: topic type or service uri
87 @type data_type_or_uri: str
88 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
89 @type reg_type: str
90 """
91 pass
92
93 - def reg_removed(self, resolved_name, data_type_or_uri, reg_type):
94 """
95 New pub/sub/service removed.
96 @param resolved_name: topic/service name
97 @type resolved_name: str
98 @param data_type_or_uri: topic type or service uri
99 @type data_type_or_uri: str
100 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
101 @type reg_type: str
102 """
103 pass
104
106
108 """
109 ctor.
110 """
111 self.listeners = []
112 self.lock = threading.Lock()
113
115 """
116 Subscribe to notifications of pub/sub/service registration
117 changes. This is an internal API used to notify higher level
118 routines when to communicate with the master.
119 @param l: listener to subscribe
120 @type l: TopicListener
121 """
122 assert isinstance(l, RegistrationListener)
123 with self.lock:
124 self.listeners.append(l)
125
127 """
128 @param resolved_name: resolved_topic/service name
129 @type resolved_name: str
130 @param data_type_or_uri: topic type or service uri
131 @type data_type_or_uri: str
132 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
133 @type reg_type: str
134 """
135 with self.lock:
136 for l in self.listeners:
137 try:
138 l.reg_removed(resolved_name, data_type_or_uri, reg_type)
139 except Exception as e:
140 logerr("error notifying listener of removal: %s"%traceback.format_exc())
141
142 - def notify_added(self, resolved_name, data_type, reg_type):
143 """
144 @param resolved_name: topic/service name
145 @type resolved_name: str
146 @param data_type: topic/service type
147 @type data_type: str
148 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
149 @type reg_type: str
150 """
151 with self.lock:
152 for l in self.listeners:
153 try:
154 l.reg_added(resolved_name, data_type, reg_type)
155 except Exception as e:
156 logerr(traceback.format_exc())
157
159 """
160 Remove all registration listeners
161 """
162 if not is_shutdown_requested():
163 with self.lock:
164 del self.listeners[:]
165 else:
166
167
168 locked = self.lock.acquire(False)
169
170 del self.listeners[:]
171 if locked:
172 self.lock.release()
173
174 _registration_listeners = RegistrationListeners()
177
178
179
181 """
182 Registration manager used by Node implemenation.
183 Communicates with ROS Master to maintain topic registration
184 information. Also responds to publisher updates to create topic
185 connections
186 """
187
189 """
190 ctor.
191 @param handler: node API handler
192 """
193 self.logger = logging.getLogger("rospy.registration")
194 self.handler = handler
195 self.uri = self.master_uri = None
196 self.updates = []
197 self.cond = threading.Condition()
198 self.registered = False
199
200 add_preshutdown_hook(self.cleanup)
201
202 - def start(self, uri, master_uri):
203 """
204 Start the RegManager. This should be passed in as an argument to a thread
205 starter as the RegManager is designed to spin in its own thread
206 @param uri: URI of local node
207 @type uri: str
208 @param master_uri: Master URI
209 @type master_uri: str
210 """
211 self.registered = False
212 self.master_uri = master_uri
213 self.uri = uri
214 first = True
215 tm = get_topic_manager()
216 sm = get_service_manager()
217 ns = get_namespace()
218 caller_id = get_caller_id()
219 if not master_uri or master_uri == uri:
220 registered = True
221 master = None
222 else:
223 registered = False
224 master = xmlrpcapi(master_uri)
225 self.logger.info("Registering with master node %s", master_uri)
226
227 while not registered and not is_shutdown():
228 try:
229 try:
230
231 tm.lock.acquire()
232 sm.lock.acquire()
233
234 pub, sub, srv = tm.get_publications(), tm.get_subscriptions(), sm.get_services()
235 for resolved_name, data_type in pub:
236 self.logger.info("Registering publisher topic [%s] type [%s] with master", resolved_name, data_type)
237 code, msg, val = master.registerPublisher(caller_id, resolved_name, data_type, uri)
238 if code != 1:
239 logfatal("cannot register publication topic [%s] with master: %s"%(resolved_name, msg))
240 signal_shutdown("master/node incompatibility with register publisher")
241 for resolved_name, data_type in sub:
242 self.logger.info("registering subscriber topic [%s] type [%s] with master", resolved_name, data_type)
243 code, msg, val = master.registerSubscriber(caller_id, resolved_name, data_type, uri)
244 if code != 1:
245 logfatal("cannot register subscription topic [%s] with master: %s"%(resolved_name, msg))
246 signal_shutdown("master/node incompatibility with register subscriber")
247 else:
248 self.publisher_update(resolved_name, val)
249 for resolved_name, service_uri in srv:
250 self.logger.info("registering service [%s] uri [%s] with master", resolved_name, service_uri)
251 code, msg, val = master.registerService(caller_id, resolved_name, service_uri, uri)
252 if code != 1:
253 logfatal("cannot register service [%s] with master: %s"%(resolved_name, msg))
254 signal_shutdown("master/node incompatibility with register service")
255
256 registered = True
257
258
259 get_registration_listeners().add_listener(self)
260 finally:
261 sm.lock.release()
262 tm.lock.release()
263
264 if pub or sub:
265 logdebug("Registered [%s] with master node %s", caller_id, master_uri)
266 else:
267 logdebug("No topics to register with master node %s", master_uri)
268
269 except Exception as e:
270 if first:
271
272 logerr("Unable to immediately register with master node [%s]: master may not be running yet. Will keep trying."%master_uri)
273 first = False
274 time.sleep(0.2)
275 self.registered = True
276 self.run()
277
279 """
280 Check if Node has been registered yet.
281 @return: True if registration has occurred with master
282 @rtype: bool
283 """
284 return self.registered
285
287 """
288 Main RegManager thread loop.
289 Periodically checks the update
290 queue and generates topic connections
291 """
292
293 while not self.handler.done and not is_shutdown():
294 cond = self.cond
295 try:
296 cond.acquire()
297 if not self.updates:
298 cond.wait(0.5)
299 if self.updates:
300
301 topic, uris = self.updates.pop()
302
303 self.updates = [x for x in self.updates if x[0] != topic]
304 else:
305 topic = uris = None
306 finally:
307 if cond is not None:
308 cond.release()
309
310 get_topic_manager().check_all()
311
312
313
314 if uris and not self.handler.done:
315 for uri in uris:
316
317 t = threading.Thread(target=self._connect_topic_thread, args=(topic, uri))
318 t.daemon = True
319 t.start()
320
322 try:
323 code, msg, _ = self.handler._connect_topic(topic, uri)
324 if code != 1:
325 logdebug("Unable to connect subscriber to publisher [%s] for topic [%s]: %s", uri, topic, msg)
326 except Exception as e:
327 if not is_shutdown():
328 logdebug("Unable to connect to publisher [%s] for topic [%s]: %s"%(uri, topic, traceback.format_exc()))
329
331 """
332 Cleans up registrations with master and releases topic and service resources
333 @param reason: human-reasonable debug string
334 @type reason: str
335 """
336 self.logger.debug("registration cleanup starting")
337 try:
338 self.cond.acquire()
339 self.cond.notifyAll()
340 finally:
341 self.cond.release()
342
343
344 if not self.master_uri:
345 return
346
347 master = xmlrpcapi(self.master_uri)
348
349 if master is None:
350 return
351
352 caller_id = get_caller_id()
353
354
355 rl = get_registration_listeners()
356 if rl is not None:
357 rl.clear()
358
359 tm = get_topic_manager()
360 sm = get_service_manager()
361 try:
362 multi = xmlrpcclient.MultiCall(master)
363 if tm is not None:
364 for resolved_name, _ in tm.get_subscriptions():
365 self.logger.debug("unregisterSubscriber [%s]"%resolved_name)
366 multi.unregisterSubscriber(caller_id, resolved_name, self.uri)
367 for resolved_name, _ in tm.get_publications():
368 self.logger.debug("unregisterPublisher [%s]"%resolved_name)
369 multi.unregisterPublisher(caller_id, resolved_name, self.uri)
370
371 if sm is not None:
372 for resolved_name, service_uri in sm.get_services():
373 self.logger.debug("unregisterService [%s]"%resolved_name)
374 multi.unregisterService(caller_id, resolved_name, service_uri)
375 multi()
376 except socket.error as se:
377 (se_errno, msg) = se.args
378 if se_errno == errno.ECONNREFUSED or se_errno == errno.ENODATA:
379 self.logger.warn("cannot unregister with master due to network issues")
380 else:
381 self.logger.warn("unclean shutdown\n%s"%traceback.format_exc())
382 except:
383 self.logger.warn("unclean shutdown\n%s"%traceback.format_exc())
384
385 self.logger.debug("registration cleanup: master calls complete")
386
387
388
389 if tm is not None:
390 tm.close_all()
391 if sm is not None:
392 sm.unregister_all()
393
394 - def reg_removed(self, resolved_name, data_type_or_uri, reg_type):
395 """
396 RegistrationListener callback
397 @param resolved_name: resolved name of topic or service
398 @type resolved_name: str
399 @param data_type_or_uri: either the data type (for topic regs) or the service URI (for service regs).
400 @type data_type_or_uri: str
401 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
402 @type reg_type: str
403 """
404 master_uri = self.master_uri
405 if not master_uri:
406 self.logger.error("Registrar: master_uri is not set yet, cannot inform master of deregistration")
407 else:
408 try:
409 master = xmlrpcapi(master_uri)
410 if reg_type == Registration.PUB:
411 self.logger.debug("unregisterPublisher(%s, %s)", resolved_name, self.uri)
412 master.unregisterPublisher(get_caller_id(), resolved_name, self.uri)
413 elif reg_type == Registration.SUB:
414 self.logger.debug("unregisterSubscriber(%s, %s)", resolved_name, data_type_or_uri)
415 master.unregisterSubscriber(get_caller_id(), resolved_name, self.uri)
416 elif reg_type == Registration.SRV:
417 self.logger.debug("unregisterService(%s, %s)", resolved_name, data_type_or_uri)
418 master.unregisterService(get_caller_id(), resolved_name, data_type_or_uri)
419 except:
420 logwarn("unable to communicate with ROS Master, registrations are now out of sync")
421 self.logger.error(traceback.format_exc())
422
423 - def reg_added(self, resolved_name, data_type_or_uri, reg_type):
424 """
425 RegistrationListener callback
426 @param resolved_name: resolved name of topic or service
427 @type resolved_name: str
428 @param data_type_or_uri: either the data type (for topic regs) or the service URI (for service regs).
429 @type data_type_or_uri: str
430 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
431 @type reg_type: str
432 """
433
434 master_uri = self.master_uri
435 if not master_uri:
436 self.logger.error("Registrar: master_uri is not set yet, cannot inform master of registration")
437 else:
438 master = xmlrpcapi(master_uri)
439 args = (get_caller_id(), resolved_name, data_type_or_uri, self.uri)
440 registered = False
441 first = True
442 while not registered and not is_shutdown():
443 try:
444 if reg_type == Registration.PUB:
445 self.logger.debug("master.registerPublisher(%s, %s, %s, %s)"%args)
446 code, msg, val = master.registerPublisher(*args)
447 if code != 1:
448 logfatal("unable to register publication [%s] with master: %s"%(resolved_name, msg))
449 elif reg_type == Registration.SUB:
450 self.logger.debug("master.registerSubscriber(%s, %s, %s, %s)"%args)
451 code, msg, val = master.registerSubscriber(*args)
452 if code == 1:
453 self.publisher_update(resolved_name, val)
454 else:
455
456
457 logfatal("unable to register subscription [%s] with master: %s"%(resolved_name, msg))
458 elif reg_type == Registration.SRV:
459 self.logger.debug("master.registerService(%s, %s, %s, %s)"%args)
460 code, msg, val = master.registerService(*args)
461 if code != 1:
462 logfatal("unable to register service [%s] with master: %s"%(resolved_name, msg))
463
464 registered = True
465 except Exception as e:
466 if first:
467 msg = "Unable to register with master node [%s]: master may not be running yet. Will keep trying."%master_uri
468 self.logger.error(str(e)+"\n"+msg)
469 print(msg)
470 first = False
471 time.sleep(0.2)
472
474 """
475 Inform psmanager of latest publisher list for a topic. This
476 will cause L{RegManager} to create a topic connection for all new
477 publishers (in a separate thread).
478 @param resolved_name: resolved topic name
479 @type resolved_name: str
480 @param uris: list of all publishers uris for topic
481 @type uris: [str]
482 """
483 try:
484 self.cond.acquire()
485 self.updates.append((resolved_name, uris))
486 self.cond.notifyAll()
487 finally:
488 self.cond.release()
489