1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """Internal use: handles maintaining registrations with master via internal listener APIs"""
36
37
38
39 import socket
40 import sys
41 import logging
42 try:
43 import _thread
44 except ImportError:
45 import thread as _thread
46 import threading
47 import time
48 import traceback
49 try:
50 import xmlrpc.client as xmlrpcclient
51 except ImportError:
52 import xmlrpclib as xmlrpcclient
53
54 from rospy.core import is_shutdown, xmlrpcapi, \
55 logfatal, logwarn, loginfo, logerr, logdebug, \
56 signal_shutdown, add_preshutdown_hook
57 from rospy.names import get_caller_id, get_namespace
58
59
60
61 _topic_manager = None
67
68 _service_manager = None
74
75
77 """Registration types"""
78 PUB = 'pub'
79 SUB = 'sub'
80 SRV = 'srv'
81
83 """Listener API for subscribing to changes in Publisher/Subscriber/Service declarations"""
84
85 - def reg_added(self, resolved_name, data_type_or_uri, reg_type):
86 """
87 New pub/sub/service declared.
88 @param resolved_name: resolved topic/service name
89 @param data_type_or_uri: topic type or service uri
90 @type data_type_or_uri: str
91 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
92 @type reg_type: str
93 """
94 pass
95
96 - def reg_removed(self, resolved_name, data_type_or_uri, reg_type):
97 """
98 New pub/sub/service removed.
99 @param resolved_name: topic/service name
100 @type resolved_name: str
101 @param data_type_or_uri: topic type or service uri
102 @type data_type_or_uri: str
103 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
104 @type reg_type: str
105 """
106 pass
107
109
111 """
112 ctor.
113 """
114 self.listeners = []
115 self.lock = threading.Lock()
116
118 """
119 Subscribe to notifications of pub/sub/service registration
120 changes. This is an internal API used to notify higher level
121 routines when to communicate with the master.
122 @param l: listener to subscribe
123 @type l: TopicListener
124 """
125 assert isinstance(l, RegistrationListener)
126 with self.lock:
127 self.listeners.append(l)
128
130 """
131 @param resolved_name: resolved_topic/service name
132 @type resolved_name: str
133 @param data_type_or_uri: topic type or service uri
134 @type data_type_or_uri: str
135 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
136 @type reg_type: str
137 """
138 with self.lock:
139 for l in self.listeners:
140 try:
141 l.reg_removed(resolved_name, data_type_or_uri, reg_type)
142 except Exception as e:
143 logerr("error notifying listener of removal: %s"%traceback.format_exc(e))
144
145 - def notify_added(self, resolved_name, data_type, reg_type):
146 """
147 @param resolved_name: topic/service name
148 @type resolved_name: str
149 @param data_type: topic/service type
150 @type data_type: str
151 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
152 @type reg_type: str
153 """
154 with self.lock:
155 for l in self.listeners:
156 try:
157 l.reg_added(resolved_name, data_type, reg_type)
158 except Exception as e:
159 logerr(traceback.format_exc(e))
160
162 """
163 Remove all registration listeners
164 """
165 with self.lock:
166 del self.listeners[:]
167
168 _registration_listeners = RegistrationListeners()
171
172
173
175 """
176 Registration manager used by Node implemenation.
177 Communicates with ROS Master to maintain topic registration
178 information. Also responds to publisher updates to create topic
179 connections
180 """
181
183 """
184 ctor.
185 @param handler: node API handler
186 """
187 self.logger = logging.getLogger("rospy.registration")
188 self.handler = handler
189 self.uri = self.master_uri = None
190 self.updates = []
191 self.cond = threading.Condition()
192 self.registered = False
193
194 add_preshutdown_hook(self.cleanup)
195
196 - def start(self, uri, master_uri):
197 """
198 Start the RegManager. This should be passed in as an argument to a thread
199 starter as the RegManager is designed to spin in its own thread
200 @param uri: URI of local node
201 @type uri: str
202 @param master_uri: Master URI
203 @type master_uri: str
204 """
205 self.registered = False
206 self.master_uri = master_uri
207 self.uri = uri
208 first = True
209 tm = get_topic_manager()
210 sm = get_service_manager()
211 ns = get_namespace()
212 caller_id = get_caller_id()
213 if not master_uri or master_uri == uri:
214 registered = True
215 master = None
216 else:
217 registered = False
218 master = xmlrpcapi(master_uri)
219 self.logger.info("Registering with master node %s", master_uri)
220
221 while not registered and not is_shutdown():
222 try:
223 try:
224
225 tm.lock.acquire()
226 sm.lock.acquire()
227
228 pub, sub, srv = tm.get_publications(), tm.get_subscriptions(), sm.get_services()
229 for resolved_name, data_type in pub:
230 self.logger.info("Registering publisher topic [%s] type [%s] with master", resolved_name, data_type)
231 code, msg, val = master.registerPublisher(caller_id, resolved_name, data_type, uri)
232 if code != 1:
233 logfatal("cannot register publication topic [%s] with master: %s"%(resolved_name, msg))
234 signal_shutdown("master/node incompatibility with register publisher")
235 for resolved_name, data_type in sub:
236 self.logger.info("registering subscriber topic [%s] type [%s] with master", resolved_name, data_type)
237 code, msg, val = master.registerSubscriber(caller_id, resolved_name, data_type, uri)
238 if code != 1:
239 logfatal("cannot register subscription topic [%s] with master: %s"%(resolved_name, msg))
240 signal_shutdown("master/node incompatibility with register subscriber")
241 else:
242 self.publisher_update(resolved_name, val)
243 for resolved_name, service_uri in srv:
244 self.logger.info("registering service [%s] uri [%s] with master", resolved_name, service_uri)
245 code, msg, val = master.registerService(caller_id, resolved_name, service_uri, uri)
246 if code != 1:
247 logfatal("cannot register service [%s] with master: %s"%(resolved_name, msg))
248 signal_shutdown("master/node incompatibility with register service")
249
250 registered = True
251
252
253 get_registration_listeners().add_listener(self)
254 finally:
255 sm.lock.release()
256 tm.lock.release()
257
258 if pub or sub:
259 logdebug("Registered [%s] with master node %s", caller_id, master_uri)
260 else:
261 logdebug("No topics to register with master node %s", master_uri)
262
263 except Exception as e:
264 if first:
265
266 logerr("Unable to immediately register with master node [%s]: master may not be running yet. Will keep trying."%master_uri)
267 first = False
268 time.sleep(0.2)
269 self.registered = True
270 self.run()
271
273 """
274 Check if Node has been registered yet.
275 @return: True if registration has occurred with master
276 @rtype: bool
277 """
278 return self.registered
279
281 """
282 Main RegManager thread loop.
283 Periodically checks the update
284 queue and generates topic connections
285 """
286
287 while not self.handler.done and not is_shutdown():
288 cond = self.cond
289 try:
290 cond.acquire()
291 if not self.updates:
292 cond.wait(0.5)
293 if self.updates:
294
295 topic, uris = self.updates.pop()
296
297 self.updates = [x for x in self.updates if x[0] != topic]
298 else:
299 topic = uris = None
300 finally:
301 if cond is not None:
302 cond.release()
303
304
305
306 if uris and not self.handler.done:
307 for uri in uris:
308
309 _thread.start_new_thread(self._connect_topic_thread, (topic, uri))
310
312 try:
313 code, msg, _ = self.handler._connect_topic(topic, uri)
314 if code != 1:
315 logdebug("Unable to connect subscriber to publisher [%s] for topic [%s]: %s", uri, topic, msg)
316 except Exception as e:
317 if not is_shutdown():
318 logdebug("Unable to connect to publisher [%s] for topic [%s]: %s"%(uri, topic, traceback.format_exc()))
319
321 """
322 Cleans up registrations with master and releases topic and service resources
323 @param reason: human-reasonable debug string
324 @type reason: str
325 """
326 self.logger.debug("registration cleanup starting")
327 try:
328 self.cond.acquire()
329 self.cond.notifyAll()
330 finally:
331 self.cond.release()
332
333
334 if not self.master_uri:
335 return
336
337 master = xmlrpcapi(self.master_uri)
338
339 if master is None:
340 return
341
342 caller_id = get_caller_id()
343
344
345 rl = get_registration_listeners()
346 if rl is not None:
347 rl.clear()
348
349 tm = get_topic_manager()
350 sm = get_service_manager()
351 try:
352 multi = xmlrpcclient.MultiCall(master)
353 if tm is not None:
354 for resolved_name, _ in tm.get_subscriptions():
355 self.logger.debug("unregisterSubscriber [%s]"%resolved_name)
356 multi.unregisterSubscriber(caller_id, resolved_name, self.uri)
357 for resolved_name, _ in tm.get_publications():
358 self.logger.debug("unregisterPublisher [%s]"%resolved_name)
359 multi.unregisterPublisher(caller_id, resolved_name, self.uri)
360
361 if sm is not None:
362 for resolved_name, service_uri in sm.get_services():
363 self.logger.debug("unregisterService [%s]"%resolved_name)
364 multi.unregisterService(caller_id, resolved_name, service_uri)
365 multi()
366 except socket.error as se:
367 (errno, msg) = se.args
368 if errno == 111 or errno == 61:
369 self.logger.warn("cannot unregister with master due to network issues")
370 else:
371 self.logger.warn("unclean shutdown\n%s"%traceback.format_exc())
372 except:
373 self.logger.warn("unclean shutdown\n%s"%traceback.format_exc())
374
375 self.logger.debug("registration cleanup: master calls complete")
376
377
378
379 if tm is not None:
380 tm.close_all()
381 if sm is not None:
382 sm.unregister_all()
383
384 - def reg_removed(self, resolved_name, data_type_or_uri, reg_type):
385 """
386 RegistrationListener callback
387 @param resolved_name: resolved name of topic or service
388 @type resolved_name: str
389 @param data_type_or_uri: either the data type (for topic regs) or the service URI (for service regs).
390 @type data_type_or_uri: str
391 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
392 @type reg_type: str
393 """
394 master_uri = self.master_uri
395 if not master_uri:
396 self.logger.error("Registrar: master_uri is not set yet, cannot inform master of deregistration")
397 else:
398 try:
399 master = xmlrpcapi(master_uri)
400 if reg_type == Registration.PUB:
401 self.logger.debug("unregisterPublisher(%s, %s)", resolved_name, self.uri)
402 master.unregisterPublisher(get_caller_id(), resolved_name, self.uri)
403 elif reg_type == Registration.SUB:
404 self.logger.debug("unregisterSubscriber(%s, %s)", resolved_name, data_type_or_uri)
405 master.unregisterSubscriber(get_caller_id(), resolved_name, self.uri)
406 elif reg_type == Registration.SRV:
407 self.logger.debug("unregisterService(%s, %s)", resolved_name, data_type_or_uri)
408 master.unregisterService(get_caller_id(), resolved_name, data_type_or_uri)
409 except:
410 logwarn("unable to communicate with ROS Master, registrations are now out of sync")
411 self.logger.error(traceback.format_exc())
412
413 - def reg_added(self, resolved_name, data_type_or_uri, reg_type):
414 """
415 RegistrationListener callback
416 @param resolved_name: resolved name of topic or service
417 @type resolved_name: str
418 @param data_type_or_uri: either the data type (for topic regs) or the service URI (for service regs).
419 @type data_type_or_uri: str
420 @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV}
421 @type reg_type: str
422 """
423
424 master_uri = self.master_uri
425 if not master_uri:
426 self.logger.error("Registrar: master_uri is not set yet, cannot inform master of registration")
427 else:
428 master = xmlrpcapi(master_uri)
429 args = (get_caller_id(), resolved_name, data_type_or_uri, self.uri)
430 registered = False
431 first = True
432 while not registered and not is_shutdown():
433 try:
434 if reg_type == Registration.PUB:
435 self.logger.debug("master.registerPublisher(%s, %s, %s, %s)"%args)
436 code, msg, val = master.registerPublisher(*args)
437 if code != 1:
438 logfatal("unable to register publication [%s] with master: %s"%(resolved_name, msg))
439 elif reg_type == Registration.SUB:
440 self.logger.debug("master.registerSubscriber(%s, %s, %s, %s)"%args)
441 code, msg, val = master.registerSubscriber(*args)
442 if code == 1:
443 self.publisher_update(resolved_name, val)
444 else:
445
446
447 logfatal("unable to register subscription [%s] with master: %s"%(resolved_name, msg))
448 elif reg_type == Registration.SRV:
449 self.logger.debug("master.registerService(%s, %s, %s, %s)"%args)
450 code, msg, val = master.registerService(*args)
451 if code != 1:
452 logfatal("unable to register service [%s] with master: %s"%(resolved_name, msg))
453
454 registered = True
455 except Exception as e:
456 if first:
457 msg = "Unable to register with master node [%s]: master may not be running yet. Will keep trying."%master_uri
458 self.logger.error(str(e)+"\n"+msg)
459 print(msg)
460 first = False
461 time.sleep(0.2)
462
464 """
465 Inform psmanager of latest publisher list for a topic. This
466 will cause L{RegManager} to create a topic connection for all new
467 publishers (in a separate thread).
468 @param resolved_name: resolved topic name
469 @type resolved_name: str
470 @param uris: list of all publishers uris for topic
471 @type uris: [str]
472 """
473 try:
474 self.cond.acquire()
475 self.updates.append((resolved_name, uris))
476 self.cond.notifyAll()
477 finally:
478 self.cond.release()
479