Package master_sync_fkie :: Module master_sync
[frames] | no frames]

Source Code for Module master_sync_fkie.master_sync

  1  # Software License Agreement (BSD License)
 
  2  #
 
  3  # Copyright (c) 2012, Fraunhofer FKIE/US, Alexander Tiderko
 
  4  # All rights reserved.
 
  5  #
 
  6  # Redistribution and use in source and binary forms, with or without
 
  7  # modification, are permitted provided that the following conditions
 
  8  # are met:
 
  9  #
 
 10  #  * Redistributions of source code must retain the above copyright
 
 11  #    notice, this list of conditions and the following disclaimer.
 
 12  #  * Redistributions in binary form must reproduce the above
 
 13  #    copyright notice, this list of conditions and the following
 
 14  #    disclaimer in the documentation and/or other materials provided
 
 15  #    with the distribution.
 
 16  #  * Neither the name of Fraunhofer nor the names of its
 
 17  #    contributors may be used to endorse or promote products derived
 
 18  #    from this software without specific prior written permission.
 
 19  #
 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 
 31  # POSSIBILITY OF SUCH DAMAGE.
 
 32  
 
 33  import socket 
 34  import threading 
 35  import time 
 36  import uuid 
 37  try: 
 38      import xmlrpclib 
 39  except ImportError: 
 40      import xmlrpc.client as xmlrpclib 
 41  
 
 42  from multimaster_msgs_fkie.msg import MasterState  # , LinkState, LinkStatesStamped, MasterState, ROSMaster, SyncMasterInfo, SyncTopicInfo 
 43  from multimaster_msgs_fkie.srv import DiscoverMasters, GetSyncInfo, GetSyncInfoResponse 
 44  import rospy 
 45  
 
 46  from master_discovery_fkie.common import masteruri_from_master, resolve_url, read_interface, create_pattern, is_empty_pattern 
 47  from master_discovery_fkie.master_info import MasterInfo 
 48  import master_discovery_fkie.interface_finder as interface_finder 
 49  
 
 50  from .sync_thread import SyncThread 
 51  
 
 52  
 
53 -class Main(object):
54 ''' 55 ''' 56 57 UPDATE_INTERVALL = 30 58
59 - def __init__(self):
60 ''' 61 Creates a new instance. Find the topic of the master_discovery node using 62 U{master_discovery_fkie.interface_finder.get_changes_topic() 63 <http://docs.ros.org/api/master_discovery_fkie/html/modules.html#interface-finder-module>}. 64 Also the parameter C{~ignore_hosts} will be analyzed to exclude hosts from sync. 65 ''' 66 self.masters = {} 67 # the connection to the local service master 68 self.materuri = masteruri_from_master() 69 '''@ivar: the ROS master URI of the C{local} ROS master. ''' 70 self.__lock = threading.RLock() 71 # load interface 72 self._load_interface() 73 # subscribe to changes notifier topics 74 topic_names = interface_finder.get_changes_topic(masteruri_from_master()) 75 self.sub_changes = dict() 76 '''@ivar: `dict` with topics {name: U{rospy.Subscriber<http://docs.ros.org/api/rospy/html/rospy.topics.Subscriber-class.html>}} publishes the changes of the discovered ROS masters.''' 77 for topic_name in topic_names: 78 rospy.loginfo("listen for updates on %s", topic_name) 79 self.sub_changes[topic_name] = rospy.Subscriber(topic_name, MasterState, self._rosmsg_callback_master_state) 80 self.__timestamp_local = None 81 self.__own_state = None 82 self.update_timer = None 83 self.own_state_getter = None 84 self._join_threads = dict() # threads waiting for stopping the sync thread 85 # initialize the ROS services 86 rospy.Service('~get_sync_info', GetSyncInfo, self._rosservice_get_sync_info) 87 rospy.on_shutdown(self.finish) 88 self.obtain_masters()
89
90 - def _rosmsg_callback_master_state(self, data):
91 ''' 92 The method to handle the received MasterState messages. Based on this message 93 new threads to synchronize with remote ROS master will be created, updated or 94 removed. 95 @param data: the received message 96 @type data: U{master_discovery_fkie.MasterState 97 <http://docs.ros.org/api/multimaster_msgs_fkie/html/msg/MasterState.html>} 98 ''' 99 with self.__lock: 100 if not rospy.is_shutdown(): 101 if data.state in [MasterState.STATE_REMOVED]: 102 self.remove_master(data.master.name) 103 elif data.state in [MasterState.STATE_NEW, MasterState.STATE_CHANGED]: 104 m = data.master 105 self.update_master(m.name, m.uri, m.timestamp, m.timestamp_local, m.discoverer_name, m.monitoruri, m.online)
106
107 - def obtain_masters(self):
108 ''' 109 This method use the service 'list_masters' of the master_discoverer to get 110 the list of discovered ROS master. Based on this list the L{SyncThread} for 111 synchronization will be created. 112 @see: U{master_discovery_fkie.interface_finder.get_listmaster_service() 113 <http://docs.ros.org/api/master_discovery_fkie/html/modules.html#interface-finder-module>} 114 ''' 115 if not rospy.is_shutdown(): 116 service_names = interface_finder.get_listmaster_service(masteruri_from_master(), False) 117 for service_name in service_names: 118 try: 119 with self.__lock: 120 try: 121 socket.setdefaulttimeout(5) 122 discoverMasters = rospy.ServiceProxy(service_name, DiscoverMasters) 123 resp = discoverMasters() 124 masters = [] 125 master_names = [m.name for m in resp.masters] 126 rospy.loginfo("ROS masters obtained from '%s': %s", service_name, master_names) 127 for m in resp.masters: 128 if self._can_sync(m.name): # do not sync to the master, if it is in ignore list or not in filled sync list 129 masters.append(m.name) 130 self.update_master(m.name, m.uri, m.timestamp, m.timestamp_local, m.discoverer_name, m.monitoruri, m.online) 131 for key in set(self.masters.keys()) - set(masters): 132 self.remove_master(self.masters[key].name) 133 except rospy.ServiceException as e: 134 rospy.logwarn("ERROR Service call 'list_masters' failed: %s", str(e)) 135 except: 136 import traceback 137 rospy.logwarn("ERROR while initial list masters: %s", traceback.format_exc()) 138 finally: 139 socket.setdefaulttimeout(None) 140 self.update_timer = threading.Timer(self.UPDATE_INTERVALL, self.obtain_masters) 141 self.update_timer.start()
142
143 - def update_master(self, mastername, masteruri, timestamp, timestamp_local, discoverer_name, monitoruri, online):
144 ''' 145 Updates the timestamp of the given ROS master, or creates a new L{SyncThread} to 146 synchronize the local master with given ROS master. 147 @param mastername: the name of the remote ROS master to update or synchronize. 148 @type mastername: C{str} 149 @param masteruri: the URI of the remote ROS master. 150 @type masteruri: C{str} 151 @param timestamp: the timestamp of the remote ROS master. 152 @type timestamp: C{float64} 153 @param timestamp_local: the timestamp of the remote ROS master. (only local changes) 154 @type timestamp_local: C{float64} 155 @param discoverer_name: the name of the remote master_discoverer node 156 @type discoverer_name: C{str} 157 @param monitoruri: the URI of the RPC interface of the remote master_discoverer node. 158 @type monitoruri: C{str} 159 @param online: the current state on the master. 160 @type online: C{bool} 161 ''' 162 try: 163 with self.__lock: 164 if (masteruri != self.materuri): 165 if self._can_sync(mastername): 166 # do not sync to the master, if it is in ignore list 167 if self.__resync_on_reconnect and mastername in self.masters: 168 self.masters[mastername].set_online(online, self.__resync_on_reconnect_timeout) 169 if online: 170 if mastername in self.masters: 171 # updates only, if local changes are occured 172 self.masters[mastername].update(mastername, masteruri, discoverer_name, monitoruri, timestamp_local) 173 else: 174 self.masters[mastername] = SyncThread(mastername, masteruri, discoverer_name, monitoruri, 0.0, self.__sync_topics_on_demand) 175 if self.__own_state is not None: 176 self.masters[mastername].set_own_masterstate(MasterInfo.from_list(self.__own_state)) 177 self.masters[mastername].update(mastername, masteruri, discoverer_name, monitoruri, timestamp_local) 178 elif self.__timestamp_local != timestamp_local and self.__sync_topics_on_demand: 179 # get the master info from local discovery master and set it to all sync threads 180 self.own_state_getter = threading.Thread(target=self.get_own_state, args=(monitoruri,)) 181 self.own_state_getter.start() 182 except: 183 import traceback 184 rospy.logwarn("ERROR while update master[%s]: %s", str(mastername), traceback.format_exc())
185
186 - def get_own_state(self, monitoruri):
187 ''' 188 Gets the master info from local master discovery and set it to all sync threads. 189 This function is running in a thread!!! 190 ''' 191 try: 192 socket.setdefaulttimeout(3) 193 own_monitor = xmlrpclib.ServerProxy(monitoruri) 194 self.__own_state = own_monitor.masterInfo() 195 own_state = MasterInfo.from_list(self.__own_state) 196 socket.setdefaulttimeout(None) 197 with self.__lock: 198 # update the state for all sync threads 199 for (_, s) in self.masters.items(): 200 s.set_own_masterstate(own_state, self.__sync_topics_on_demand) 201 self.__timestamp_local = own_state.timestamp_local 202 except: 203 import traceback 204 rospy.logwarn("ERROR while getting own state from '%s': %s", monitoruri, traceback.format_exc()) 205 socket.setdefaulttimeout(None) 206 time.sleep(3) 207 if self.own_state_getter is not None and not rospy.is_shutdown(): 208 self.own_state_getter = threading.Thread(target=self.get_own_state, args=(monitoruri,)) 209 self.own_state_getter.start()
210
211 - def remove_master(self, ros_master_name):
212 ''' 213 Removes the master with given name from the synchronization list. 214 @param ros_master_name: the name of the ROS master to remove. 215 @type ros_master_name: C{str} 216 ''' 217 try: 218 with self.__lock: 219 if ros_master_name in self.masters: 220 m = self.masters.pop(ros_master_name) 221 ident = uuid.uuid4() 222 thread = threading.Thread(target=self._threading_stop_sync, args=(m, ident)) 223 self._join_threads[ident] = thread 224 thread.start() 225 except Exception: 226 import traceback 227 rospy.logwarn("ERROR while removing master[%s]: %s", ros_master_name, traceback.format_exc())
228
229 - def _threading_stop_sync(self, sync_thread, ident):
230 if isinstance(sync_thread, SyncThread): 231 rospy.loginfo(" Stop synchronization to `%s`" % sync_thread.name) 232 sync_thread.stop() 233 with self.__lock: 234 del self._join_threads[ident] 235 rospy.loginfo(" Finished synchronization to `%s`" % sync_thread.name) 236 del sync_thread
237
238 - def finish(self, msg=''):
239 ''' 240 Removes all remote masters and unregister their topics and services. 241 ''' 242 rospy.loginfo("Stop synchronization...") 243 with self.__lock: 244 # stop update timer 245 rospy.loginfo(" Stop timers...") 246 if self.update_timer is not None: 247 self.update_timer.cancel() 248 # unregister from update topics 249 rospy.loginfo(" Unregister from master discovery...") 250 for (_, v) in self.sub_changes.items(): 251 v.unregister() 252 self.own_state_getter = None 253 # Stop all sync threads 254 for key in self.masters.keys(): 255 rospy.loginfo(" Remove master: %s", key) 256 self.remove_master(key) 257 # wait for their ending 258 while len(self._join_threads) > 0: 259 rospy.loginfo(" Wait for ending of %s threads ...", str(len(self._join_threads))) 260 time.sleep(1) 261 rospy.loginfo("Synchronization is now off")
262
263 - def _rosservice_get_sync_info(self, req):
264 ''' 265 Callback for the ROS service to get the info to synchronized nodes. 266 ''' 267 masters = list() 268 try: 269 with self.__lock: 270 for (_, s) in self.masters.items(): 271 masters.append(s.get_sync_info()) 272 except: 273 import traceback 274 traceback.print_exc() 275 finally: 276 return GetSyncInfoResponse(masters)
277
278 - def _load_interface(self):
279 interface_file = resolve_url(rospy.get_param('~interface_url', '')) 280 if interface_file: 281 rospy.loginfo("interface_url: %s", interface_file) 282 try: 283 data = read_interface(interface_file) if interface_file else {} 284 # set the ignore hosts list 285 self._re_ignore_hosts = create_pattern('ignore_hosts', data, interface_file, []) 286 # set the sync hosts list 287 self._re_sync_hosts = create_pattern('sync_hosts', data, interface_file, []) 288 self.__sync_topics_on_demand = False 289 if interface_file: 290 if 'sync_topics_on_demand' in data: 291 self.__sync_topics_on_demand = data['sync_topics_on_demand'] 292 elif rospy.has_param('~sync_topics_on_demand'): 293 self.__sync_topics_on_demand = rospy.get_param('~sync_topics_on_demand') 294 rospy.loginfo("sync_topics_on_demand: %s", self.__sync_topics_on_demand) 295 self.__resync_on_reconnect = rospy.get_param('~resync_on_reconnect', True) 296 rospy.loginfo("resync_on_reconnect: %s", self.__resync_on_reconnect) 297 self.__resync_on_reconnect_timeout = rospy.get_param('~resync_on_reconnect_timeout', 0) 298 rospy.loginfo("resync_on_reconnect_timeout: %s", self.__resync_on_reconnect_timeout) 299 except: 300 import traceback 301 # kill the ros node, to notify the user about the error 302 rospy.logerr("Error on load interface: %s", traceback.format_exc()) 303 import os 304 import signal 305 os.kill(os.getpid(), signal.SIGKILL)
306
307 - def _can_sync(self, mastername):
308 result = False 309 if is_empty_pattern(self._re_ignore_hosts): 310 if is_empty_pattern(self._re_sync_hosts): 311 result = True 312 elif self._re_sync_hosts.match(mastername) is not None: 313 result = True 314 elif self._re_ignore_hosts.match(mastername) is None: 315 result = True 316 elif not is_empty_pattern(self._re_sync_hosts): 317 if self._re_sync_hosts.match(mastername) is not None: 318 result = True 319 return result
320