1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18   
 19   
 20   
 21   
 22   
 23   
 24   
 25   
 26   
 27   
 28   
 29   
 30   
 31   
 32   
 33   
 34   
 35  """Internal use: handles maintaining registrations with master via internal listener APIs""" 
 36   
 37   
 38   
 39  import socket 
 40  import sys 
 41  import logging 
 42  import threading 
 43  import time 
 44  import traceback 
 45  try: 
 46      import xmlrpc.client as xmlrpcclient 
 47  except ImportError: 
 48      import xmlrpclib as xmlrpcclient 
 49   
 50  from rospy.core import is_shutdown, is_shutdown_requested, xmlrpcapi, \ 
 51      logfatal, logwarn, loginfo, logerr, logdebug, \ 
 52      signal_shutdown, add_preshutdown_hook 
 53  from rospy.names import get_caller_id, get_namespace 
 54   
 55   
 56   
 57  _topic_manager = None 
 63   
 64  _service_manager = None 
 70   
 71       
 73      """Registration types""" 
 74      PUB = 'pub' 
 75      SUB = 'sub' 
 76      SRV = 'srv' 
  77       
 79      """Listener API for subscribing to changes in Publisher/Subscriber/Service declarations""" 
 80   
 81 -    def reg_added(self, resolved_name, data_type_or_uri, reg_type):  
  82          """ 
 83          New pub/sub/service declared. 
 84          @param resolved_name: resolved topic/service name 
 85          @param data_type_or_uri: topic type or service uri 
 86          @type  data_type_or_uri: str 
 87          @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 
 88          @type  reg_type: str 
 89          """ 
 90          pass 
  91       
 92 -    def reg_removed(self, resolved_name, data_type_or_uri, reg_type):  
  93          """ 
 94          New pub/sub/service removed. 
 95          @param resolved_name: topic/service name 
 96          @type  resolved_name: str 
 97          @param data_type_or_uri: topic type or service uri 
 98          @type  data_type_or_uri: str 
 99          @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 
100          @type  reg_type: str 
101          """ 
102          pass 
103   
105       
107          """ 
108          ctor. 
109          """ 
110          self.listeners = [] 
111          self.lock = threading.Lock() 
 112   
114          """ 
115          Subscribe to notifications of pub/sub/service registration 
116          changes. This is an internal API used to notify higher level 
117          routines when to communicate with the master. 
118          @param l: listener to subscribe 
119          @type  l: TopicListener 
120          """ 
121          assert isinstance(l, RegistrationListener) 
122          with self.lock: 
123              self.listeners.append(l) 
 124   
126          """ 
127          @param resolved_name: resolved_topic/service name 
128          @type  resolved_name: str 
129          @param data_type_or_uri: topic type or service uri 
130          @type  data_type_or_uri: str 
131          @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 
132          @type  reg_type: str 
133          """ 
134          with self.lock: 
135              for l in self.listeners: 
136                  try: 
137                      l.reg_removed(resolved_name, data_type_or_uri, reg_type) 
138                  except Exception as e: 
139                      logerr("error notifying listener of removal: %s"%traceback.format_exc()) 
 140               
141 -    def notify_added(self, resolved_name, data_type, reg_type): 
 142          """ 
143          @param resolved_name: topic/service name 
144          @type  resolved_name: str 
145          @param data_type: topic/service type 
146          @type  data_type: str 
147          @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 
148          @type  reg_type: str 
149          """ 
150          with self.lock: 
151              for l in self.listeners: 
152                  try: 
153                      l.reg_added(resolved_name, data_type, reg_type) 
154                  except Exception as e: 
155                      logerr(traceback.format_exc()) 
 156                       
158          """ 
159          Remove all registration listeners 
160          """ 
161          if not is_shutdown_requested(): 
162              with self.lock: 
163                  del self.listeners[:] 
164          else: 
165               
166               
167              locked = self.lock.acquire(False) 
168               
169              del self.listeners[:] 
170              if locked: 
171                  self.lock.release() 
  172               
173  _registration_listeners = RegistrationListeners() 
176   
177   
178   
180      """ 
181      Registration manager used by Node implemenation. 
182      Communicates with ROS Master to maintain topic registration 
183      information. Also responds to publisher updates to create topic 
184      connections 
185      """ 
186   
188          """ 
189          ctor. 
190          @param handler: node API handler 
191          """ 
192          self.logger = logging.getLogger("rospy.registration") 
193          self.handler = handler 
194          self.uri = self.master_uri = None 
195          self.updates = [] 
196          self.cond = threading.Condition()  
197          self.registered = False 
198           
199          add_preshutdown_hook(self.cleanup) 
 200           
201 -    def start(self, uri, master_uri): 
 202          """ 
203          Start the RegManager. This should be passed in as an argument to a thread 
204          starter as the RegManager is designed to spin in its own thread 
205          @param uri: URI of local node 
206          @type  uri: str 
207          @param master_uri: Master URI 
208          @type  master_uri: str 
209          """ 
210          self.registered = False  
211          self.master_uri = master_uri 
212          self.uri = uri 
213          first = True 
214          tm = get_topic_manager() 
215          sm = get_service_manager() 
216          ns = get_namespace() 
217          caller_id = get_caller_id() 
218          if not master_uri or master_uri == uri: 
219              registered = True 
220              master = None 
221          else: 
222              registered = False 
223              master = xmlrpcapi(master_uri) 
224              self.logger.info("Registering with master node %s", master_uri) 
225   
226          while not registered and not is_shutdown(): 
227              try: 
228                  try: 
229                       
230                      tm.lock.acquire() 
231                      sm.lock.acquire()                     
232   
233                      pub, sub, srv = tm.get_publications(), tm.get_subscriptions(), sm.get_services() 
234                      for resolved_name, data_type in pub: 
235                          self.logger.info("Registering publisher topic [%s] type [%s] with master", resolved_name, data_type) 
236                          code, msg, val = master.registerPublisher(caller_id, resolved_name, data_type, uri) 
237                          if code != 1: 
238                              logfatal("cannot register publication topic [%s] with master: %s"%(resolved_name, msg)) 
239                              signal_shutdown("master/node incompatibility with register publisher") 
240                      for resolved_name, data_type in sub: 
241                          self.logger.info("registering subscriber topic [%s] type [%s] with master", resolved_name, data_type) 
242                          code, msg, val = master.registerSubscriber(caller_id, resolved_name, data_type, uri) 
243                          if code != 1: 
244                              logfatal("cannot register subscription topic [%s] with master: %s"%(resolved_name, msg)) 
245                              signal_shutdown("master/node incompatibility with register subscriber")                         
246                          else: 
247                              self.publisher_update(resolved_name, val) 
248                      for resolved_name, service_uri in srv: 
249                          self.logger.info("registering service [%s] uri [%s] with master", resolved_name, service_uri) 
250                          code, msg, val = master.registerService(caller_id, resolved_name, service_uri, uri) 
251                          if code != 1: 
252                              logfatal("cannot register service [%s] with master: %s"%(resolved_name, msg)) 
253                              signal_shutdown("master/node incompatibility with register service")                         
254    
255                      registered = True 
256                       
257                       
258                      get_registration_listeners().add_listener(self) 
259                  finally: 
260                      sm.lock.release()                     
261                      tm.lock.release() 
262                   
263                  if pub or sub: 
264                      logdebug("Registered [%s] with master node %s", caller_id, master_uri) 
265                  else: 
266                      logdebug("No topics to register with master node %s", master_uri) 
267                       
268              except Exception as e: 
269                  if first: 
270                       
271                      logerr("Unable to immediately register with master node [%s]: master may not be running yet. Will keep trying."%master_uri) 
272                      first = False 
273                  time.sleep(0.2) 
274          self.registered = True 
275          self.run() 
 276           
278          """ 
279          Check if Node has been registered yet. 
280          @return: True if registration has occurred with master 
281          @rtype: bool 
282          """ 
283          return self.registered  
 284   
286          """ 
287          Main RegManager thread loop. 
288          Periodically checks the update 
289          queue and generates topic connections 
290          """ 
291           
292          while not self.handler.done and not is_shutdown(): 
293              cond = self.cond 
294              try: 
295                  cond.acquire() 
296                  if not self.updates: 
297                      cond.wait(0.5) 
298                  if self.updates: 
299                       
300                      topic, uris = self.updates.pop() 
301                       
302                      self.updates = [x for x in self.updates if x[0] != topic] 
303                  else: 
304                      topic = uris = None 
305              finally: 
306                  if cond is not None: 
307                      cond.release() 
308   
309              get_topic_manager().check_all() 
310   
311               
312               
313              if uris and not self.handler.done: 
314                  for uri in uris: 
315                       
316                      t = threading.Thread(target=self._connect_topic_thread, args=(topic, uri)) 
317                      t.setDaemon(True) 
318                      t.start() 
 319   
321          try: 
322              code, msg, _ = self.handler._connect_topic(topic, uri) 
323              if code != 1: 
324                  logdebug("Unable to connect subscriber to publisher [%s] for topic [%s]: %s", uri, topic, msg) 
325          except Exception as e: 
326              if not is_shutdown(): 
327                  logdebug("Unable to connect to publisher [%s] for topic [%s]: %s"%(uri, topic, traceback.format_exc())) 
 328           
330          """ 
331          Cleans up registrations with master and releases topic and service resources 
332          @param reason: human-reasonable debug string 
333          @type  reason: str 
334          """ 
335          self.logger.debug("registration cleanup starting") 
336          try: 
337              self.cond.acquire() 
338              self.cond.notifyAll() 
339          finally: 
340              self.cond.release()         
341   
342           
343          if not self.master_uri: 
344              return 
345           
346          master = xmlrpcapi(self.master_uri) 
347           
348          if master is None: 
349              return 
350           
351          caller_id = get_caller_id() 
352   
353           
354          rl = get_registration_listeners() 
355          if rl is not None: 
356              rl.clear() 
357               
358          tm = get_topic_manager() 
359          sm = get_service_manager() 
360          try: 
361              multi = xmlrpcclient.MultiCall(master) 
362              if tm is not None: 
363                  for resolved_name, _ in tm.get_subscriptions(): 
364                      self.logger.debug("unregisterSubscriber [%s]"%resolved_name) 
365                      multi.unregisterSubscriber(caller_id, resolved_name, self.uri) 
366                  for resolved_name, _ in tm.get_publications(): 
367                      self.logger.debug("unregisterPublisher [%s]"%resolved_name)                     
368                      multi.unregisterPublisher(caller_id, resolved_name, self.uri) 
369   
370              if sm is not None: 
371                  for resolved_name, service_uri in sm.get_services(): 
372                      self.logger.debug("unregisterService [%s]"%resolved_name)  
373                      multi.unregisterService(caller_id, resolved_name, service_uri) 
374              multi() 
375          except socket.error as se: 
376              (errno, msg) = se.args 
377              if errno == 111 or errno == 61:  
378                  self.logger.warn("cannot unregister with master due to network issues") 
379              else: 
380                  self.logger.warn("unclean shutdown\n%s"%traceback.format_exc()) 
381          except: 
382              self.logger.warn("unclean shutdown\n%s"%traceback.format_exc()) 
383   
384          self.logger.debug("registration cleanup: master calls complete")             
385   
386           
387           
388          if tm is not None: 
389              tm.close_all() 
390          if sm is not None: 
391              sm.unregister_all() 
 392   
393 -    def reg_removed(self, resolved_name, data_type_or_uri, reg_type): 
 394          """ 
395          RegistrationListener callback 
396          @param resolved_name: resolved name of topic or service 
397          @type  resolved_name: str 
398          @param data_type_or_uri: either the data type (for topic regs) or the service URI (for service regs). 
399          @type  data_type_or_uri: str 
400          @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 
401          @type  reg_type: str 
402          """ 
403          master_uri = self.master_uri 
404          if not master_uri: 
405              self.logger.error("Registrar: master_uri is not set yet, cannot inform master of deregistration") 
406          else: 
407              try: 
408                  master = xmlrpcapi(master_uri) 
409                  if reg_type == Registration.PUB: 
410                      self.logger.debug("unregisterPublisher(%s, %s)", resolved_name, self.uri) 
411                      master.unregisterPublisher(get_caller_id(), resolved_name, self.uri) 
412                  elif reg_type == Registration.SUB:             
413                      self.logger.debug("unregisterSubscriber(%s, %s)", resolved_name, data_type_or_uri) 
414                      master.unregisterSubscriber(get_caller_id(), resolved_name, self.uri) 
415                  elif reg_type == Registration.SRV: 
416                      self.logger.debug("unregisterService(%s, %s)", resolved_name, data_type_or_uri) 
417                      master.unregisterService(get_caller_id(), resolved_name, data_type_or_uri) 
418              except: 
419                  logwarn("unable to communicate with ROS Master, registrations are now out of sync") 
420                  self.logger.error(traceback.format_exc()) 
 421       
422 -    def reg_added(self, resolved_name, data_type_or_uri, reg_type): 
 423          """ 
424          RegistrationListener callback 
425          @param resolved_name: resolved name of topic or service 
426          @type  resolved_name: str 
427          @param data_type_or_uri: either the data type (for topic regs) or the service URI (for service regs). 
428          @type  data_type_or_uri: str 
429          @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 
430          @type  reg_type: str 
431          """ 
432           
433          master_uri = self.master_uri 
434          if not master_uri: 
435              self.logger.error("Registrar: master_uri is not set yet, cannot inform master of registration") 
436          else: 
437              master = xmlrpcapi(master_uri) 
438              args = (get_caller_id(), resolved_name, data_type_or_uri, self.uri) 
439              registered = False 
440              first = True 
441              while not registered and not is_shutdown(): 
442                  try: 
443                      if reg_type == Registration.PUB: 
444                          self.logger.debug("master.registerPublisher(%s, %s, %s, %s)"%args) 
445                          code, msg, val = master.registerPublisher(*args) 
446                          if code != 1: 
447                              logfatal("unable to register publication [%s] with master: %s"%(resolved_name, msg)) 
448                      elif reg_type == Registration.SUB: 
449                          self.logger.debug("master.registerSubscriber(%s, %s, %s, %s)"%args) 
450                          code, msg, val = master.registerSubscriber(*args) 
451                          if code == 1: 
452                              self.publisher_update(resolved_name, val) 
453                          else: 
454                               
455                               
456                              logfatal("unable to register subscription [%s] with master: %s"%(resolved_name, msg)) 
457                      elif reg_type == Registration.SRV: 
458                          self.logger.debug("master.registerService(%s, %s, %s, %s)"%args) 
459                          code, msg, val = master.registerService(*args) 
460                          if code != 1: 
461                              logfatal("unable to register service [%s] with master: %s"%(resolved_name, msg)) 
462                           
463                      registered = True 
464                  except Exception as e: 
465                      if first: 
466                          msg = "Unable to register with master node [%s]: master may not be running yet. Will keep trying."%master_uri 
467                          self.logger.error(str(e)+"\n"+msg) 
468                          print(msg) 
469                          first = False 
470                      time.sleep(0.2) 
 471   
473          """ 
474          Inform psmanager of latest publisher list for a topic.  This 
475          will cause L{RegManager} to create a topic connection for all new 
476          publishers (in a separate thread). 
477          @param resolved_name: resolved topic name 
478          @type  resolved_name: str 
479          @param uris: list of all publishers uris for topic 
480          @type  uris: [str] 
481          """ 
482          try: 
483              self.cond.acquire() 
484              self.updates.append((resolved_name, uris)) 
485              self.cond.notifyAll()               
486          finally: 
487              self.cond.release() 
  488