1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18   
 19   
 20   
 21   
 22   
 23   
 24   
 25   
 26   
 27   
 28   
 29   
 30   
 31   
 32   
 33   
 34  import random 
 35  import socket 
 36  import threading 
 37  import time 
 38  import traceback 
 39  import xmlrpclib 
 40   
 41  from multimaster_msgs_fkie.msg import SyncTopicInfo, SyncServiceInfo, SyncMasterInfo 
 42  import rospy 
 43   
 44  from master_discovery_fkie.common import masteruri_from_ros 
 45  from master_discovery_fkie.filter_interface import FilterInterface 
 46   
 47   
 49      ''' 
 50      A thread to synchronize the local ROS master with a remote master. While the 
 51      synchronization only the topic of the remote ROS master will be registered by 
 52      the local ROS master. The remote ROS master will be keep unchanged. 
 53      ''' 
 54   
 55      MAX_UPDATE_DELAY = 5   
 56   
 57      MSG_ANY_TYPE = '*' 
 58   
 59 -    def __init__(self, name, uri, discoverer_name, monitoruri, timestamp, sync_on_demand=False): 
  60          ''' 
 61          Initialization method for the SyncThread. 
 62          @param name: the name of the ROS master synchronized with. 
 63          @type name:  C{str} 
 64          @param uri: the URI of the ROS master synchronized with 
 65          @type uri:  C{str} 
 66          @param discoverer_name: the name of the discovery node running on ROS master synchronized with. 
 67          @type discoverer_name:  C{str} 
 68          @param monitoruri: The URI of RPC server of the discovery node to get the ROS master state by calling a method only once. 
 69          @type monitoruri:  C{str} 
 70          @param timestamp: The timestamp of the current state of the ROS master info. 
 71          @type timestamp:  C{float64} 
 72          @param sync_on_demand: Synchronize topics on demand 
 73          @type sync_on_demand: bool 
 74          ''' 
 75          self.name = name 
 76          self.uri = uri 
 77          self.discoverer_name = discoverer_name 
 78          self.monitoruri = monitoruri 
 79          self.timestamp = timestamp 
 80          self.timestamp_local = 0. 
 81          self.timestamp_remote = 0. 
 82          self._online = True 
 83          self._offline_ts = 0 
 84   
 85          self.masteruri_local = masteruri_from_ros() 
 86          rospy.logdebug("SyncThread[%s]: create this sync thread", self.name) 
 87           
 88          self.__lock_info = threading.RLock() 
 89          self.__lock_intern = threading.RLock() 
 90          self._use_filtered_method = None 
 91           
 92          self.__sync_info = None 
 93          self.__unregistered = False 
 94           
 95          self.__publisher = [] 
 96           
 97          self.__subscriber = [] 
 98           
 99          self.__services = [] 
100           
101           
102          self.__own_state = None 
103   
104           
105          self._filter = FilterInterface() 
106          self._filter.load(self.name, 
107                            ['/rosout', rospy.get_name().replace('/', '/*') + '*', self.discoverer_name.replace('/', '/*') + '*', '/*node_manager', '/*zeroconf'], [], 
108                            ['/rosout', '/rosout_agg'], ['/'] if sync_on_demand else [], 
109                            ['/*get_loggers', '/*set_logger_level'], [], 
110                             
111                            ['bond/Status'], 
112                            [], [], 
113                            []) 
114   
115           
116           
117           
118          self._update_timer = None 
119          self._delayed_update = 0 
120          self.__on_update = False 
 121   
123          ''' 
124          Returns the synchronized publisher, subscriber and services. 
125          @rtype: SyncMasterInfo 
126          ''' 
127          with self.__lock_info: 
128              if self.__sync_info is None: 
129                   
130                  result_set = set() 
131                  result_publisher = [] 
132                  result_subscriber = [] 
133                  result_services = [] 
134                  for (t_n, n_n, n_uri) in self.__publisher: 
135                      result_publisher.append(SyncTopicInfo(t_n, n_n, n_uri)) 
136                      result_set.add(n_n) 
137                  for (t_n, n_n, n_uri) in self.__subscriber: 
138                      result_subscriber.append(SyncTopicInfo(t_n, n_n, n_uri)) 
139                      result_set.add(n_n) 
140                  for (s_n, s_uri, n_n, n_uri) in self.__services: 
141                      result_services.append(SyncServiceInfo(s_n, s_uri, n_n, n_uri)) 
142                      result_set.add(n_n) 
143                  self.__sync_info = SyncMasterInfo(self.uri, list(result_set), result_publisher, result_subscriber, result_services) 
144              return self.__sync_info 
 145   
146 -    def set_online(self, value, resync_on_reconnect_timeout=0.): 
 147          if value: 
148              if not self._online: 
149                  with self.__lock_intern: 
150                      self._online = True 
151                      offline_duration = time.time() - self._offline_ts 
152                      if offline_duration >= resync_on_reconnect_timeout: 
153                          rospy.loginfo("SyncThread[%s]: perform resync after the host was offline (unregister and register again to avoid connection losses to python topic. These does not suppot reconnection!)", self.name) 
154                          if self._update_timer is not None: 
155                              self._update_timer.cancel() 
156                          self._unreg_on_finish() 
157                          self.__unregistered = False 
158                          self.__publisher = [] 
159                          self.__subscriber = [] 
160                          self.__services = [] 
161                          self.timestamp = 0. 
162                          self.timestamp_local = 0. 
163                          self.timestamp_remote = 0. 
164                      else: 
165                          rospy.loginfo("SyncThread[%s]: skip resync after the host was offline because of resync_on_reconnect_timeout=%.2f and the host was only %.2f sec offline", self.name, resync_on_reconnect_timeout, offline_duration) 
166          else: 
167              self._online = False 
168              self._offline_ts = time.time() 
 169   
170 -    def update(self, name, uri, discoverer_name, monitoruri, timestamp): 
 171          ''' 
172          Sets a request to synchronize the local ROS master with this ROS master. 
173          @note: If currently a synchronization is running this request will be ignored! 
174          @param name: the name of the ROS master synchronized with. 
175          @type name:  C{str} 
176          @param uri: the URI of the ROS master synchronized with 
177          @type uri:  C{str} 
178          @param discoverer_name: the name of the discovery node running on ROS master synchronized with. 
179          @type discoverer_name:  C{str} 
180          @param monitoruri: The URI of RPC server of the discovery node to get the ROS master state by calling a method only once. 
181          @type monitoruri:  C{str} 
182          @param timestamp: The timestamp of the current state of the ROS master info. 
183          @type timestamp:  C{float64} 
184          ''' 
185   
186          with self.__lock_intern: 
187              self.timestamp_remote = timestamp 
188              if (self.timestamp_local != timestamp): 
189                  rospy.logdebug("SyncThread[%s]: update notify new timestamp(%.9f), old(%.9f)", self.name, timestamp, self.timestamp_local) 
190                  self.name = name 
191                  self.uri = uri 
192                  self.discoverer_name = discoverer_name 
193                  self.monitoruri = monitoruri 
194                  self._request_update() 
 195   
196   
197   
199          ''' 
200          Sets the state of the local ROS master state. If this state is not None, the topics on demand will be synchronized. 
201          @param own_state: the state of the local ROS master state 
202          @type own_state:  C{master_discovery_fkie/MasterInfo} 
203          @param sync_on_demand: if True, sync only topic, which are also local exists (Default: False) 
204          @type sync_on_demand:  bool 
205          ''' 
206          with self.__lock_intern: 
207              timestamp_local = own_state.timestamp_local 
208              if self.__own_state is None or (self.__own_state.timestamp_local != timestamp_local): 
209                  ownstate_ts = self.__own_state.timestamp_local if self.__own_state is not None else float('nan') 
210                  rospy.logdebug("SyncThread[%s]: local state update notify new timestamp(%.9f), old(%.9f)", self.name, timestamp_local, ownstate_ts) 
211                  self.__own_state = own_state 
212                  if sync_on_demand: 
213                      self._filter.update_sync_topics_pattern(self.__own_state.topic_names) 
214                  self._request_update() 
 215   
217          ''' 
218          Stops running thread. 
219          ''' 
220          rospy.logdebug("  SyncThread[%s]: stop request", self.name) 
221          with self.__lock_intern: 
222              if self._update_timer is not None: 
223                  self._update_timer.cancel() 
224              self._unreg_on_finish() 
225          rospy.logdebug("  SyncThread[%s]: stop exit", self.name) 
 226   
228          with self.__lock_intern: 
229              r = random.random() * 2. 
230               
231              if self._update_timer is None or not self._update_timer.isAlive(): 
232                  del self._update_timer 
233                  self._update_timer = threading.Timer(r, self._request_remote_state, args=(self._apply_remote_state,)) 
234                  self._update_timer.start() 
235              else: 
236                  if self._delayed_update < self.MAX_UPDATE_DELAY: 
237                       
238                      self._update_timer.cancel() 
239                       
240                      if not self._update_timer.isAlive() or not self.__on_update: 
241                          self._delayed_update += 1 
242                          self._update_timer = threading.Timer(r, self._request_remote_state, args=(self._apply_remote_state,)) 
243                          self._update_timer.start() 
 244   
246          self._delayed_update = 0 
247          self.__on_update = True 
248          try: 
249               
250              socket.setdefaulttimeout(20) 
251              remote_monitor = xmlrpclib.ServerProxy(self.monitoruri) 
252               
253              if self._use_filtered_method is None: 
254                  try: 
255                      self._use_filtered_method = 'masterInfoFiltered' in remote_monitor.system.listMethods() 
256                  except: 
257                      self._use_filtered_method = False 
258              remote_state = None 
259               
260              rospy.loginfo("SyncThread[%s] Requesting remote state from '%s'", self.name, self.monitoruri) 
261              if self._use_filtered_method: 
262                  remote_state = remote_monitor.masterInfoFiltered(self._filter.to_list()) 
263              else: 
264                  remote_state = remote_monitor.masterInfo() 
265              if not self.__unregistered: 
266                  handler(remote_state) 
267          except: 
268              rospy.logerr("SyncThread[%s] ERROR: %s", self.name, traceback.format_exc()) 
269          finally: 
270              self.__on_update = False 
271              socket.setdefaulttimeout(None) 
 272   
274          rospy.loginfo("SyncThread[%s] Applying remote state...", self.name) 
275          try: 
276              stamp = float(remote_state[0]) 
277              stamp_local = float(remote_state[1]) 
278              remote_masteruri = remote_state[2] 
279   
280              publishers = remote_state[4] 
281              subscribers = remote_state[5] 
282              rservices = remote_state[6] 
283              topicTypes = remote_state[7] 
284              nodeProviders = remote_state[8] 
285              serviceProviders = remote_state[9] 
286   
287               
288              own_master = xmlrpclib.ServerProxy(self.masteruri_local) 
289              own_master_multi = xmlrpclib.MultiCall(own_master) 
290               
291              handler = [] 
292               
293              publisher = [] 
294              publisher_to_register = [] 
295              for (topic, nodes) in publishers: 
296                  for node in nodes: 
297                      topictype = self._get_topictype(topic, topicTypes) 
298                      nodeuri = self._get_nodeuri(node, nodeProviders, remote_masteruri) 
299                      if topictype and nodeuri and not self._do_ignore_ntp(node, topic, topictype): 
300                           
301                          if not ((topic, node, nodeuri) in self.__publisher): 
302                              publisher_to_register.append((topic, topictype, node, nodeuri)) 
303                          publisher.append((topic, node, nodeuri)) 
304               
305              for (topic, node, nodeuri) in set(self.__publisher) - set(publisher): 
306                  own_master_multi.unregisterPublisher(node, topic, nodeuri) 
307                  rospy.logdebug("SyncThread[%s]: UNPUB %s[%s] %s",  
308                                self.name, node, nodeuri, topic) 
309                  handler.append(('upub', topic, node, nodeuri)) 
310               
311              for (topic, topictype, node, nodeuri) in publisher_to_register: 
312                  own_master_multi.registerPublisher(node, topic, topictype, nodeuri) 
313                  rospy.logdebug("SyncThread[%s]: PUB %s[%s] %s[%s]",  
314                                self.name, node, nodeuri, topic, topictype) 
315                  handler.append(('pub', topic, topictype, node, nodeuri)) 
316   
317               
318              subscriber = [] 
319              subscriber_to_register = [] 
320              for (topic, nodes) in subscribers: 
321                  for node in nodes: 
322                      topictype = self._get_topictype(topic, topicTypes) 
323                      nodeuri = self._get_nodeuri(node, nodeProviders, remote_masteruri) 
324                       
325   
326   
327   
328                      if not topictype: 
329                          topictype = self.MSG_ANY_TYPE 
330                      if topictype and nodeuri and not self._do_ignore_nts(node, topic, topictype): 
331                           
332                          if not ((topic, node, nodeuri) in self.__subscriber): 
333                              subscriber_to_register.append((topic, topictype, node, nodeuri)) 
334                          subscriber.append((topic, node, nodeuri)) 
335               
336              for (topic, node, nodeuri) in set(self.__subscriber) - set(subscriber): 
337                  own_master_multi.unregisterSubscriber(node, topic, nodeuri) 
338                  rospy.logdebug("SyncThread[%s]: UNSUB %s[%s] %s",  
339                                self.name, node, nodeuri, topic) 
340                  handler.append(('usub', topic, node, nodeuri)) 
341               
342              for (topic, topictype, node, nodeuri) in subscriber_to_register: 
343                  own_master_multi.registerSubscriber(node, topic, topictype, nodeuri) 
344                  rospy.logdebug("SyncThread[%s]: SUB %s[%s] %s[%s]",  
345                                self.name, node, nodeuri, topic, topictype) 
346                  handler.append(('sub', topic, topictype, node, nodeuri)) 
347   
348               
349              services = [] 
350              services_to_register = [] 
351              for (service, nodes) in rservices: 
352                  for node in nodes: 
353                      serviceuri = self._get_serviceuri(service, serviceProviders, remote_masteruri) 
354                      nodeuri = self._get_nodeuri(node, nodeProviders, remote_masteruri) 
355                      if serviceuri and nodeuri and not self._do_ignore_ns(node, service): 
356                           
357                          if not ((service, serviceuri, node, nodeuri) in self.__services): 
358                              services_to_register.append((service, serviceuri, node, nodeuri)) 
359                          services.append((service, serviceuri, node, nodeuri)) 
360               
361              for (service, serviceuri, node, nodeuri) in set(self.__services) - set(services): 
362                  own_master_multi.unregisterService(node, service, serviceuri) 
363                  rospy.logdebug("SyncThread[%s]: UNSRV %s[%s] %s[%s]",  
364                                self.name, node, nodeuri, service, serviceuri) 
365                  handler.append(('usrv', service, serviceuri, node, nodeuri)) 
366               
367              for (service, serviceuri, node, nodeuri) in services_to_register: 
368                  own_master_multi.registerService(node, service, serviceuri, nodeuri) 
369                  rospy.logdebug("SyncThread[%s]: SRV %s[%s] %s[%s]", 
370                                self.name, node, nodeuri, service, serviceuri) 
371                  handler.append(('srv', service, serviceuri, node, nodeuri)) 
372   
373               
374              if not self.__unregistered: 
375                  with self.__lock_info: 
376                      self.__sync_info = None 
377                      self.__publisher = publisher 
378                      self.__subscriber = subscriber 
379                      self.__services = services 
380                   
381                  socket.setdefaulttimeout(3) 
382                  result = own_master_multi() 
383                   
384                   
385                  hack_pub = set() 
386                  for h, (code, statusMessage, r) in zip(handler, result): 
387                      try: 
388                          if h[0] == 'sub': 
389                              if code == -1: 
390                                  rospy.logwarn("SyncThread[%s] topic subscription error: %s (%s), %s %s, node: %s", self.name, h[1], h[2], str(code), str(statusMessage), h[3]) 
391                              else: 
392                                  rospy.logdebug("SyncThread[%s] topic subscribed: %s, %s %s, node: %s", self.name, h[1], str(code), str(statusMessage), h[3]) 
393                          if h[0] == 'sub' and code == 1 and len(r) > 0: 
394                               
395                              hack_pub.add(h[1]) 
396                          elif h[0] == 'pub': 
397                              if code == -1: 
398                                  rospy.logwarn("SyncThread[%s] topic advertise error: %s (%s), %s %s", self.name, h[1], h[2], str(code), str(statusMessage)) 
399                              else: 
400                                  rospy.logdebug("SyncThread[%s] topic advertised: %s, %s %s", self.name, h[1], str(code), str(statusMessage)) 
401                          elif h[0] == 'usub': 
402                              rospy.logdebug("SyncThread[%s] topic unsubscribed: %s, %s %s", self.name, h[1], str(code), str(statusMessage)) 
403                          elif h[0] == 'upub': 
404                              rospy.logdebug("SyncThread[%s] topic unadvertised: %s, %s %s", self.name, h[1], str(code), str(statusMessage)) 
405                          elif h[0] == 'srv': 
406                              if code == -1: 
407                                  rospy.logwarn("SyncThread[%s] service registration error: %s, %s %s", self.name, h[1], str(code), str(statusMessage)) 
408                              else: 
409                                  rospy.logdebug("SyncThread[%s] service registered: %s, %s %s", self.name, h[1], str(code), str(statusMessage)) 
410                          elif h[0] == 'usrv': 
411                              rospy.logdebug("SyncThread[%s] service unregistered: %s, %s %s", self.name, h[1], str(code), str(statusMessage)) 
412                      except: 
413                          rospy.logerr("SyncThread[%s] ERROR while analyzing the results of the registration call [%s]: %s", self.name, h[1], traceback.format_exc()) 
414                   
415                   
416                   
417                   
418                   
419                   
420                   
421                   
422                  for m in hack_pub: 
423                      try: 
424                          rospy.loginfo("SyncThread[%s] Create and delete publisher to trigger publisherUpdate for %s", self.name, m) 
425                          topicPub = rospy.Publisher(m, rospy.msg.AnyMsg, queue_size=1) 
426                          topicPub.unregister() 
427                          del topicPub 
428                      except: 
429                          rospy.logerr("SyncThread[%s] ERROR while hack subscriber[%s]: %s", self.name, h[1], traceback.format_exc()) 
430                   
431                  self.timestamp = stamp 
432                  self.timestamp_local = stamp_local 
433                  rospy.logdebug("SyncThread[%s]: current timestamp %.9f, local %.9f", self.name, stamp, stamp_local) 
434                  if self.timestamp_remote > stamp_local: 
435                      rospy.logdebug("SyncThread[%s]: invoke next update, remote ts: %.9f", self.name, self.timestamp_remote) 
436                      self._update_timer = threading.Timer(random.random() * 2., self._request_remote_state, args=(self._apply_remote_state,)) 
437                      self._update_timer.start() 
438          except: 
439              rospy.logerr("SyncThread[%s] ERROR: %s", self.name, traceback.format_exc()) 
440          finally: 
441              socket.setdefaulttimeout(None) 
442          rospy.loginfo("SyncThread[%s] remote state applied.", self.name) 
 443   
445          with self.__lock_info: 
446              self.__unregistered = True 
447              try: 
448                  rospy.logdebug("    SyncThread[%s] clear all registrations", self.name) 
449                  socket.setdefaulttimeout(5) 
450                  own_master = xmlrpclib.ServerProxy(self.masteruri_local) 
451                  own_master_multi = xmlrpclib.MultiCall(own_master) 
452                   
453                  for topic, node, uri in self.__subscriber: 
454                      own_master_multi.unregisterSubscriber(node, topic, uri) 
455                  for topic, node, uri in self.__publisher: 
456                      own_master_multi.unregisterPublisher(node, topic, uri) 
457                  for service, serviceuri, node, uri in self.__services: 
458                      own_master_multi.unregisterService(node, service, serviceuri) 
459                  rospy.logdebug("    SyncThread[%s] execute a MultiCall", self.name) 
460                  _ = own_master_multi() 
461                  rospy.logdebug("    SyncThread[%s] finished", self.name) 
462              except: 
463                  rospy.logerr("SyncThread[%s] ERROR while ending: %s", self.name, traceback.format_exc()) 
464              socket.setdefaulttimeout(None) 
 465   
467          return self._filter.is_ignored_publisher(node, topic, topictype) 
 468   
470          return self._filter.is_ignored_subscriber(node, topic, topictype) 
 471   
473          return self._filter.is_ignored_service(node, service) 
 474   
476          for (topicname, topic_type) in topic_types: 
477              if (topicname == topic): 
478                  return topic_type.replace('None', '') 
479          return None 
 480   
482          for (nodename, uri, masteruri, pid, local) in nodes: 
483              if (nodename == node) and ((self._filter.sync_remote_nodes() and masteruri == remote_masteruri) or local == 'local'): 
484                   
485                  if masteruri != self.masteruri_local: 
486                      return uri 
487          return None 
 488   
490          for (servicename, uri, masteruri, topic_type, local) in nodes: 
491              if (servicename == service) and ((self._filter.sync_remote_nodes() and masteruri == remote_masteruri) or local == 'local'): 
492                  if masteruri != self.masteruri_local: 
493                      return uri 
494          return None 
  495