1  
 
  2  
 
  3  
 
  4  
 
  5  
 
  6  
 
  7  
 
  8  
 
  9  
 
 10  
 
 11  
 
 12  
 
 13  
 
 14  
 
 15  
 
 16  
 
 17  
 
 18  
 
 19  
 
 20  
 
 21  
 
 22  
 
 23  
 
 24  
 
 25  
 
 26  
 
 27  
 
 28  
 
 29  
 
 30  
 
 31  
 
 32  
 
 33  import socket 
 34  import threading 
 35  import time 
 36  import uuid 
 37  import xmlrpclib 
 38  
 
 39  from multimaster_msgs_fkie.msg import MasterState   
 40  from multimaster_msgs_fkie.srv import DiscoverMasters, GetSyncInfo, GetSyncInfoResponse 
 41  import rospy 
 42  
 
 43  from master_discovery_fkie.common import masteruri_from_master, resolve_url, read_interface, create_pattern, is_empty_pattern 
 44  from master_discovery_fkie.master_info import MasterInfo 
 45  import master_discovery_fkie.interface_finder as interface_finder 
 46  
 
 47  from .sync_thread import SyncThread 
 48  
 
 49  
 
 51      '''
 
 52      ''' 
 53  
 
 54      UPDATE_INTERVALL = 30 
 55  
 
 57          '''
 
 58          Creates a new instance. Find the topic of the master_discovery node using
 
 59          U{master_discovery_fkie.interface_finder.get_changes_topic()
 
 60          <http://docs.ros.org/api/master_discovery_fkie/html/modules.html#interface-finder-module>}.
 
 61          Also the parameter C{~ignore_hosts} will be analyzed to exclude hosts from sync.
 
 62          ''' 
 63          self.masters = {} 
 64          
 
 65          self.materuri = masteruri_from_master() 
 66          '''@ivar: the ROS master URI of the C{local} ROS master. ''' 
 67          self.__lock = threading.RLock() 
 68          
 
 69          self._load_interface() 
 70          
 
 71          topic_names = interface_finder.get_changes_topic(masteruri_from_master()) 
 72          self.sub_changes = dict() 
 73          '''@ivar: `dict` with topics {name: U{rospy.Subscriber<http://docs.ros.org/api/rospy/html/rospy.topics.Subscriber-class.html>}} publishes the changes of the discovered ROS masters.''' 
 74          for topic_name in topic_names: 
 75              rospy.loginfo("listen for updates on %s", topic_name) 
 76              self.sub_changes[topic_name] = rospy.Subscriber(topic_name, MasterState, self._rosmsg_callback_master_state) 
 77          self.__timestamp_local = None 
 78          self.__own_state = None 
 79          self.update_timer = None 
 80          self.own_state_getter = None 
 81          self._join_threads = dict()   
 82          
 
 83          rospy.Service('~get_sync_info', GetSyncInfo, self._rosservice_get_sync_info) 
 84          rospy.on_shutdown(self.finish) 
 85          self.obtain_masters() 
  86  
 
 88          '''
 
 89          The method to handle the received MasterState messages. Based on this message
 
 90          new threads to synchronize with remote ROS master will be created, updated or
 
 91          removed.
 
 92          @param data: the received message
 
 93          @type data: U{master_discovery_fkie.MasterState
 
 94          <http://docs.ros.org/api/multimaster_msgs_fkie/html/msg/MasterState.html>}
 
 95          ''' 
 96          with self.__lock: 
 97              if not rospy.is_shutdown(): 
 98                  if data.state in [MasterState.STATE_REMOVED]: 
 99                      self.remove_master(data.master.name) 
100                  elif data.state in [MasterState.STATE_NEW, MasterState.STATE_CHANGED]: 
101                      m = data.master 
102                      self.update_master(m.name, m.uri, m.timestamp, m.timestamp_local, m.discoverer_name, m.monitoruri, m.online) 
 103  
 
104 -    def obtain_masters(self): 
 105          '''
 
106          This method use the service 'list_masters' of the master_discoverer to get
 
107          the list of discovered ROS master. Based on this list the L{SyncThread} for
 
108          synchronization will be created.
 
109          @see: U{master_discovery_fkie.interface_finder.get_listmaster_service()
 
110              <http://docs.ros.org/api/master_discovery_fkie/html/modules.html#interface-finder-module>}
 
111          ''' 
112          if not rospy.is_shutdown(): 
113              service_names = interface_finder.get_listmaster_service(masteruri_from_master(), False) 
114              for service_name in service_names: 
115                  try: 
116                      with self.__lock: 
117                          try: 
118                              socket.setdefaulttimeout(5) 
119                              discoverMasters = rospy.ServiceProxy(service_name, DiscoverMasters) 
120                              resp = discoverMasters() 
121                              masters = [] 
122                              master_names = [m.name for m in resp.masters] 
123                              rospy.loginfo("ROS masters obtained from '%s': %s", service_name, master_names) 
124                              for m in resp.masters: 
125                                  if not self._re_ignore_hosts.match(m.name) or self._re_sync_hosts.match(m.name):   
126                                      masters.append(m.name) 
127                                  self.update_master(m.name, m.uri, m.timestamp, m.timestamp_local, m.discoverer_name, m.monitoruri, m.online) 
128                              for key in set(self.masters.keys()) - set(masters): 
129                                  self.remove_master(self.masters[key].name) 
130                          except rospy.ServiceException, e: 
131                              rospy.logwarn("ERROR Service call 'list_masters' failed: %s", str(e)) 
132                  except: 
133                      import traceback 
134                      rospy.logwarn("ERROR while initial list masters: %s", traceback.format_exc()) 
135                  finally: 
136                      socket.setdefaulttimeout(None) 
137              self.update_timer = threading.Timer(self.UPDATE_INTERVALL, self.obtain_masters) 
138              self.update_timer.start() 
 139  
 
140 -    def update_master(self, mastername, masteruri, timestamp, timestamp_local, discoverer_name, monitoruri, online): 
 141          '''
 
142          Updates the timestamp of the given ROS master, or creates a new L{SyncThread} to
 
143          synchronize the local master with given ROS master.
 
144          @param mastername: the name of the remote ROS master to update or synchronize.
 
145          @type mastername: C{str}
 
146          @param masteruri: the URI of the remote ROS master.
 
147          @type masteruri: C{str}
 
148          @param timestamp: the timestamp of the remote ROS master.
 
149          @type timestamp: C{float64}
 
150          @param timestamp_local: the timestamp of the remote ROS master. (only local changes)
 
151          @type timestamp_local: C{float64}
 
152          @param discoverer_name: the name of the remote master_discoverer node
 
153          @type discoverer_name: C{str}
 
154          @param monitoruri: the URI of the RPC interface of the remote master_discoverer node.
 
155          @type monitoruri: C{str}
 
156          @param online: the current state on the master.
 
157          @type online: C{bool}
 
158          ''' 
159          try: 
160              with self.__lock: 
161                  if (masteruri != self.materuri): 
162                      if (is_empty_pattern(self._re_ignore_hosts) or not self._re_ignore_hosts.match(mastername) or
 
163                              (not is_empty_pattern(self._re_sync_hosts) and self._re_sync_hosts.match(mastername) is not None)): 
164                          
 
165                          if self.__resync_on_reconnect and mastername in self.masters: 
166                              self.masters[mastername].set_online(online, self.__resync_on_reconnect_timeout) 
167                          if online: 
168                              if mastername in self.masters: 
169                                      
 
170                                  self.masters[mastername].update(mastername, masteruri, discoverer_name, monitoruri, timestamp_local) 
171                              else: 
172                                  self.masters[mastername] = SyncThread(mastername, masteruri, discoverer_name, monitoruri, 0.0, self.__sync_topics_on_demand) 
173                                  if self.__own_state is not None: 
174                                      self.masters[mastername].set_own_masterstate(MasterInfo.from_list(self.__own_state)) 
175                                  self.masters[mastername].update(mastername, masteruri, discoverer_name, monitoruri, timestamp_local) 
176                  elif self.__timestamp_local != timestamp_local and self.__sync_topics_on_demand: 
177                      
 
178                      self.own_state_getter = threading.Thread(target=self.get_own_state, args=(monitoruri,)) 
179                      self.own_state_getter.start() 
180          except: 
181              import traceback 
182              rospy.logwarn("ERROR while update master[%s]: %s", str(mastername), traceback.format_exc()) 
 183  
 
184 -    def get_own_state(self, monitoruri): 
 185          '''
 
186          Gets the master info from local master discovery and set it to all sync threads.
 
187          This function is running in a thread!!!
 
188          ''' 
189          try: 
190              socket.setdefaulttimeout(3) 
191              own_monitor = xmlrpclib.ServerProxy(monitoruri) 
192              self.__own_state = own_monitor.masterInfo() 
193              own_state = MasterInfo.from_list(self.__own_state) 
194              socket.setdefaulttimeout(None) 
195              with self.__lock: 
196                  
 
197                  for (_, s) in self.masters.iteritems(): 
198                      s.set_own_masterstate(own_state, self.__sync_topics_on_demand) 
199                  self.__timestamp_local = own_state.timestamp_local 
200          except: 
201              import traceback 
202              rospy.logwarn("ERROR while getting own state from '%s': %s", monitoruri, traceback.format_exc()) 
203              socket.setdefaulttimeout(None) 
204              time.sleep(3) 
205              if self.own_state_getter is not None and not rospy.is_shutdown(): 
206                  self.own_state_getter = threading.Thread(target=self.get_own_state, args=(monitoruri,)) 
207                  self.own_state_getter.start() 
 208  
 
209 -    def remove_master(self, ros_master_name): 
 210          '''
 
211          Removes the master with given name from the synchronization list.
 
212          @param ros_master_name: the name of the ROS master to remove.
 
213          @type ros_master_name: C{str}
 
214          ''' 
215          try: 
216              with self.__lock: 
217                  if ros_master_name in self.masters: 
218                      m = self.masters.pop(ros_master_name) 
219                      ident = uuid.uuid4() 
220                      thread = threading.Thread(target=self._threading_stop_sync, args=(m, ident)) 
221                      self._join_threads[ident] = thread 
222                      thread.start() 
223          except Exception: 
224              import traceback 
225              rospy.logwarn("ERROR while removing master[%s]: %s", ros_master_name, traceback.format_exc()) 
 226  
 
227 -    def _threading_stop_sync(self, sync_thread, ident): 
 228          if isinstance(sync_thread, SyncThread): 
229              rospy.loginfo("  Stop synchronization to `%s`" % sync_thread.name) 
230              sync_thread.stop() 
231              with self.__lock: 
232                  del self._join_threads[ident] 
233              rospy.loginfo("  Finished synchronization to `%s`" % sync_thread.name) 
234              del sync_thread 
 235  
 
236 -    def finish(self, msg=''): 
 237          '''
 
238          Removes all remote masters and unregister their topics and services.
 
239          ''' 
240          rospy.loginfo("Stop synchronization...") 
241          with self.__lock: 
242              
 
243              rospy.loginfo("  Stop timers...") 
244              if self.update_timer is not None: 
245                  self.update_timer.cancel() 
246              
 
247              rospy.loginfo("  Unregister from master discovery...") 
248              for (_, v) in self.sub_changes.iteritems(): 
249                  v.unregister() 
250              self.own_state_getter = None 
251              
 
252              for key in self.masters.keys(): 
253                  rospy.loginfo("  Remove master: %s", key) 
254                  self.remove_master(key) 
255          
 
256          while len(self._join_threads) > 0: 
257              rospy.loginfo("  Wait for ending of %s threads ...", str(len(self._join_threads))) 
258              time.sleep(1) 
259          rospy.loginfo("Synchronization is now off") 
 260  
 
262          '''
 
263          Callback for the ROS service to get the info to synchronized nodes.
 
264          ''' 
265          masters = list() 
266          try: 
267              with self.__lock: 
268                  for (_, s) in self.masters.iteritems(): 
269                      masters.append(s.get_sync_info()) 
270          except: 
271              import traceback 
272              traceback.print_exc() 
273          finally: 
274              return GetSyncInfoResponse(masters) 
 275  
 
276 -    def _load_interface(self): 
 277          interface_file = resolve_url(rospy.get_param('~interface_url', '')) 
278          if interface_file: 
279              rospy.loginfo("interface_url: %s", interface_file) 
280          try: 
281              data = read_interface(interface_file) if interface_file else {} 
282              
 
283              self._re_ignore_hosts = create_pattern('ignore_hosts', data, interface_file, []) 
284              
 
285              self._re_sync_hosts = create_pattern('sync_hosts', data, interface_file, []) 
286              self.__sync_topics_on_demand = False 
287              if interface_file: 
288                  if 'sync_topics_on_demand' in data: 
289                      self.__sync_topics_on_demand = data['sync_topics_on_demand'] 
290              elif rospy.has_param('~sync_topics_on_demand'): 
291                  self.__sync_topics_on_demand = rospy.get_param('~sync_topics_on_demand') 
292              rospy.loginfo("sync_topics_on_demand: %s", self.__sync_topics_on_demand) 
293              self.__resync_on_reconnect = rospy.get_param('~resync_on_reconnect', True) 
294              rospy.loginfo("resync_on_reconnect: %s", self.__resync_on_reconnect) 
295              self.__resync_on_reconnect_timeout = rospy.get_param('~resync_on_reconnect_timeout', 0) 
296              rospy.loginfo("resync_on_reconnect_timeout: %s", self.__resync_on_reconnect_timeout) 
297          except: 
298              import traceback 
299              
 
300              rospy.logerr("Error on load interface: %s", traceback.format_exc()) 
301              import os 
302              import signal 
303              os.kill(os.getpid(), signal.SIGKILL) 
  304