1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18   
 19   
 20   
 21   
 22   
 23   
 24   
 25   
 26   
 27   
 28   
 29   
 30   
 31   
 32   
 33   
 34   
 35  """Internal use: handles maintaining registrations with master via internal listener APIs""" 
 36   
 37   
 38   
 39  import socket 
 40  import sys 
 41  import logging 
 42  try: 
 43          import _thread 
 44  except ImportError: 
 45      import thread as _thread 
 46  import threading 
 47  import time 
 48  import traceback 
 49  try: 
 50      import xmlrpc.client as xmlrpcclient 
 51  except ImportError: 
 52      import xmlrpclib as xmlrpcclient 
 53   
 54  from rospy.core import is_shutdown, xmlrpcapi, \ 
 55      logfatal, logwarn, loginfo, logerr, logdebug, \ 
 56      signal_shutdown, add_preshutdown_hook 
 57  from rospy.names import get_caller_id, get_namespace 
 58   
 59   
 60   
 61  _topic_manager = None 
 67   
 68  _service_manager = None 
 74   
 75       
 77      """Registration types""" 
 78      PUB = 'pub' 
 79      SUB = 'sub' 
 80      SRV = 'srv' 
  81       
 83      """Listener API for subscribing to changes in Publisher/Subscriber/Service declarations""" 
 84   
 85 -    def reg_added(self, resolved_name, data_type_or_uri, reg_type):  
  86          """ 
 87          New pub/sub/service declared. 
 88          @param resolved_name: resolved topic/service name 
 89          @param data_type_or_uri: topic type or service uri 
 90          @type  data_type_or_uri: str 
 91          @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 
 92          @type  reg_type: str 
 93          """ 
 94          pass 
  95       
 96 -    def reg_removed(self, resolved_name, data_type_or_uri, reg_type):  
  97          """ 
 98          New pub/sub/service removed. 
 99          @param resolved_name: topic/service name 
100          @type  resolved_name: str 
101          @param data_type_or_uri: topic type or service uri 
102          @type  data_type_or_uri: str 
103          @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 
104          @type  reg_type: str 
105          """ 
106          pass 
107   
109       
111          """ 
112          ctor. 
113          """ 
114          self.listeners = [] 
115          self.lock = threading.Lock() 
 116   
118          """ 
119          Subscribe to notifications of pub/sub/service registration 
120          changes. This is an internal API used to notify higher level 
121          routines when to communicate with the master. 
122          @param l: listener to subscribe 
123          @type  l: TopicListener 
124          """ 
125          assert isinstance(l, RegistrationListener) 
126          with self.lock: 
127              self.listeners.append(l) 
 128   
130          """ 
131          @param resolved_name: resolved_topic/service name 
132          @type  resolved_name: str 
133          @param data_type_or_uri: topic type or service uri 
134          @type  data_type_or_uri: str 
135          @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 
136          @type  reg_type: str 
137          """ 
138          with self.lock: 
139              for l in self.listeners: 
140                  try: 
141                      l.reg_removed(resolved_name, data_type_or_uri, reg_type) 
142                  except Exception as e: 
143                      logerr("error notifying listener of removal: %s"%traceback.format_exc(e)) 
 144               
145 -    def notify_added(self, resolved_name, data_type, reg_type): 
 146          """ 
147          @param resolved_name: topic/service name 
148          @type  resolved_name: str 
149          @param data_type: topic/service type 
150          @type  data_type: str 
151          @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 
152          @type  reg_type: str 
153          """ 
154          with self.lock: 
155              for l in self.listeners: 
156                  try: 
157                      l.reg_added(resolved_name, data_type, reg_type) 
158                  except Exception as e: 
159                      logerr(traceback.format_exc(e)) 
 160                       
162          """ 
163          Remove all registration listeners 
164          """ 
165          with self.lock: 
166              del self.listeners[:] 
  167               
168  _registration_listeners = RegistrationListeners() 
171   
172   
173   
175      """ 
176      Registration manager used by Node implemenation. 
177      Communicates with ROS Master to maintain topic registration 
178      information. Also responds to publisher updates to create topic 
179      connections 
180      """ 
181   
183          """ 
184          ctor. 
185          @param handler: node API handler 
186          """ 
187          self.logger = logging.getLogger("rospy.registration") 
188          self.handler = handler 
189          self.uri = self.master_uri = None 
190          self.updates = [] 
191          self.cond = threading.Condition()  
192          self.registered = False 
193           
194          add_preshutdown_hook(self.cleanup) 
 195           
196 -    def start(self, uri, master_uri): 
 197          """ 
198          Start the RegManager. This should be passed in as an argument to a thread 
199          starter as the RegManager is designed to spin in its own thread 
200          @param uri: URI of local node 
201          @type  uri: str 
202          @param master_uri: Master URI 
203          @type  master_uri: str 
204          """ 
205          self.registered = False  
206          self.master_uri = master_uri 
207          self.uri = uri 
208          first = True 
209          tm = get_topic_manager() 
210          sm = get_service_manager() 
211          ns = get_namespace() 
212          caller_id = get_caller_id() 
213          if not master_uri or master_uri == uri: 
214              registered = True 
215              master = None 
216          else: 
217              registered = False 
218              master = xmlrpcapi(master_uri) 
219              self.logger.info("Registering with master node %s", master_uri) 
220   
221          while not registered and not is_shutdown(): 
222              try: 
223                  try: 
224                       
225                      tm.lock.acquire() 
226                      sm.lock.acquire()                     
227   
228                      pub, sub, srv = tm.get_publications(), tm.get_subscriptions(), sm.get_services() 
229                      for resolved_name, data_type in pub: 
230                          self.logger.info("Registering publisher topic [%s] type [%s] with master", resolved_name, data_type) 
231                          code, msg, val = master.registerPublisher(caller_id, resolved_name, data_type, uri) 
232                          if code != 1: 
233                              logfatal("cannot register publication topic [%s] with master: %s"%(resolved_name, msg)) 
234                              signal_shutdown("master/node incompatibility with register publisher") 
235                      for resolved_name, data_type in sub: 
236                          self.logger.info("registering subscriber topic [%s] type [%s] with master", resolved_name, data_type) 
237                          code, msg, val = master.registerSubscriber(caller_id, resolved_name, data_type, uri) 
238                          if code != 1: 
239                              logfatal("cannot register subscription topic [%s] with master: %s"%(resolved_name, msg)) 
240                              signal_shutdown("master/node incompatibility with register subscriber")                         
241                          else: 
242                              self.publisher_update(resolved_name, val) 
243                      for resolved_name, service_uri in srv: 
244                          self.logger.info("registering service [%s] uri [%s] with master", resolved_name, service_uri) 
245                          code, msg, val = master.registerService(caller_id, resolved_name, service_uri, uri) 
246                          if code != 1: 
247                              logfatal("cannot register service [%s] with master: %s"%(resolved_name, msg)) 
248                              signal_shutdown("master/node incompatibility with register service")                         
249    
250                      registered = True 
251                       
252                       
253                      get_registration_listeners().add_listener(self) 
254                  finally: 
255                      sm.lock.release()                     
256                      tm.lock.release() 
257                   
258                  if pub or sub: 
259                      logdebug("Registered [%s] with master node %s", caller_id, master_uri) 
260                  else: 
261                      logdebug("No topics to register with master node %s", master_uri) 
262                       
263              except Exception as e: 
264                  if first: 
265                       
266                      logerr("Unable to immediately register with master node [%s]: master may not be running yet. Will keep trying."%master_uri) 
267                      first = False 
268                  time.sleep(0.2) 
269          self.registered = True 
270          self.run() 
 271           
273          """ 
274          Check if Node has been registered yet. 
275          @return: True if registration has occurred with master 
276          @rtype: bool 
277          """ 
278          return self.registered  
 279   
281          """ 
282          Main RegManager thread loop. 
283          Periodically checks the update 
284          queue and generates topic connections 
285          """ 
286           
287          while not self.handler.done and not is_shutdown(): 
288              cond = self.cond 
289              try: 
290                  cond.acquire() 
291                  if not self.updates: 
292                      cond.wait(0.5) 
293                  if self.updates: 
294                       
295                      topic, uris = self.updates.pop() 
296                       
297                      self.updates = [x for x in self.updates if x[0] != topic] 
298                  else: 
299                      topic = uris = None 
300              finally: 
301                  if cond is not None: 
302                      cond.release() 
303   
304               
305               
306              if uris and not self.handler.done: 
307                  for uri in uris: 
308                       
309                      _thread.start_new_thread(self._connect_topic_thread, (topic, uri)) 
 310   
312          try: 
313              code, msg, _ = self.handler._connect_topic(topic, uri) 
314              if code != 1: 
315                  logdebug("Unable to connect subscriber to publisher [%s] for topic [%s]: %s", uri, topic, msg) 
316          except Exception as e: 
317              if not is_shutdown(): 
318                  logdebug("Unable to connect to publisher [%s] for topic [%s]: %s"%(uri, topic, traceback.format_exc())) 
 319           
321          """ 
322          Cleans up registrations with master and releases topic and service resources 
323          @param reason: human-reasonable debug string 
324          @type  reason: str 
325          """ 
326          self.logger.debug("registration cleanup starting") 
327          try: 
328              self.cond.acquire() 
329              self.cond.notifyAll() 
330          finally: 
331              self.cond.release()         
332   
333           
334          if not self.master_uri: 
335              return 
336           
337          master = xmlrpcapi(self.master_uri) 
338           
339          if master is None: 
340              return 
341           
342          caller_id = get_caller_id() 
343   
344           
345          rl = get_registration_listeners() 
346          if rl is not None: 
347              rl.clear() 
348               
349          tm = get_topic_manager() 
350          sm = get_service_manager() 
351          try: 
352              multi = xmlrpcclient.MultiCall(master) 
353              if tm is not None: 
354                  for resolved_name, _ in tm.get_subscriptions(): 
355                      self.logger.debug("unregisterSubscriber [%s]"%resolved_name) 
356                      multi.unregisterSubscriber(caller_id, resolved_name, self.uri) 
357                  for resolved_name, _ in tm.get_publications(): 
358                      self.logger.debug("unregisterPublisher [%s]"%resolved_name)                     
359                      multi.unregisterPublisher(caller_id, resolved_name, self.uri) 
360   
361              if sm is not None: 
362                  for resolved_name, service_uri in sm.get_services(): 
363                      self.logger.debug("unregisterService [%s]"%resolved_name)  
364                      multi.unregisterService(caller_id, resolved_name, service_uri) 
365              multi() 
366          except socket.error as se: 
367              (errno, msg) = se.args 
368              if errno == 111 or errno == 61:  
369                  self.logger.warn("cannot unregister with master due to network issues") 
370              else: 
371                  self.logger.warn("unclean shutdown\n%s"%traceback.format_exc()) 
372          except: 
373              self.logger.warn("unclean shutdown\n%s"%traceback.format_exc()) 
374   
375          self.logger.debug("registration cleanup: master calls complete")             
376   
377           
378           
379          if tm is not None: 
380              tm.close_all() 
381          if sm is not None: 
382              sm.unregister_all() 
 383   
384 -    def reg_removed(self, resolved_name, data_type_or_uri, reg_type): 
 385          """ 
386          RegistrationListener callback 
387          @param resolved_name: resolved name of topic or service 
388          @type  resolved_name: str 
389          @param data_type_or_uri: either the data type (for topic regs) or the service URI (for service regs). 
390          @type  data_type_or_uri: str 
391          @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 
392          @type  reg_type: str 
393          """ 
394          master_uri = self.master_uri 
395          if not master_uri: 
396              self.logger.error("Registrar: master_uri is not set yet, cannot inform master of deregistration") 
397          else: 
398              try: 
399                  master = xmlrpcapi(master_uri) 
400                  if reg_type == Registration.PUB: 
401                      self.logger.debug("unregisterPublisher(%s, %s)", resolved_name, self.uri) 
402                      master.unregisterPublisher(get_caller_id(), resolved_name, self.uri) 
403                  elif reg_type == Registration.SUB:             
404                      self.logger.debug("unregisterSubscriber(%s, %s)", resolved_name, data_type_or_uri) 
405                      master.unregisterSubscriber(get_caller_id(), resolved_name, self.uri) 
406                  elif reg_type == Registration.SRV: 
407                      self.logger.debug("unregisterService(%s, %s)", resolved_name, data_type_or_uri) 
408                      master.unregisterService(get_caller_id(), resolved_name, data_type_or_uri) 
409              except: 
410                  logwarn("unable to communicate with ROS Master, registrations are now out of sync") 
411                  self.logger.error(traceback.format_exc()) 
 412       
413 -    def reg_added(self, resolved_name, data_type_or_uri, reg_type): 
 414          """ 
415          RegistrationListener callback 
416          @param resolved_name: resolved name of topic or service 
417          @type  resolved_name: str 
418          @param data_type_or_uri: either the data type (for topic regs) or the service URI (for service regs). 
419          @type  data_type_or_uri: str 
420          @param reg_type: Valid values are L{Registration.PUB}, L{Registration.SUB}, L{Registration.SRV} 
421          @type  reg_type: str 
422          """ 
423           
424          master_uri = self.master_uri 
425          if not master_uri: 
426              self.logger.error("Registrar: master_uri is not set yet, cannot inform master of registration") 
427          else: 
428              master = xmlrpcapi(master_uri) 
429              args = (get_caller_id(), resolved_name, data_type_or_uri, self.uri) 
430              registered = False 
431              first = True 
432              while not registered and not is_shutdown(): 
433                  try: 
434                      if reg_type == Registration.PUB: 
435                          self.logger.debug("master.registerPublisher(%s, %s, %s, %s)"%args) 
436                          code, msg, val = master.registerPublisher(*args) 
437                          if code != 1: 
438                              logfatal("unable to register publication [%s] with master: %s"%(resolved_name, msg)) 
439                      elif reg_type == Registration.SUB: 
440                          self.logger.debug("master.registerSubscriber(%s, %s, %s, %s)"%args) 
441                          code, msg, val = master.registerSubscriber(*args) 
442                          if code == 1: 
443                              self.publisher_update(resolved_name, val) 
444                          else: 
445                               
446                               
447                              logfatal("unable to register subscription [%s] with master: %s"%(resolved_name, msg)) 
448                      elif reg_type == Registration.SRV: 
449                          self.logger.debug("master.registerService(%s, %s, %s, %s)"%args) 
450                          code, msg, val = master.registerService(*args) 
451                          if code != 1: 
452                              logfatal("unable to register service [%s] with master: %s"%(resolved_name, msg)) 
453                           
454                      registered = True 
455                  except Exception as e: 
456                      if first: 
457                          msg = "Unable to register with master node [%s]: master may not be running yet. Will keep trying."%master_uri 
458                          self.logger.error(str(e)+"\n"+msg) 
459                          print(msg) 
460                          first = False 
461                      time.sleep(0.2) 
 462   
464          """ 
465          Inform psmanager of latest publisher list for a topic.  This 
466          will cause L{RegManager} to create a topic connection for all new 
467          publishers (in a separate thread). 
468          @param resolved_name: resolved topic name 
469          @type  resolved_name: str 
470          @param uris: list of all publishers uris for topic 
471          @type  uris: [str] 
472          """ 
473          try: 
474              self.cond.acquire() 
475              self.updates.append((resolved_name, uris)) 
476              self.cond.notifyAll()               
477          finally: 
478              self.cond.release() 
  479