Package master_sync_fkie :: Module master_sync
[frames] | no frames]

Source Code for Module master_sync_fkie.master_sync

  1  # Software License Agreement (BSD License)
 
  2  #
 
  3  # Copyright (c) 2012, Fraunhofer FKIE/US, Alexander Tiderko
 
  4  # All rights reserved.
 
  5  #
 
  6  # Redistribution and use in source and binary forms, with or without
 
  7  # modification, are permitted provided that the following conditions
 
  8  # are met:
 
  9  #
 
 10  #  * Redistributions of source code must retain the above copyright
 
 11  #    notice, this list of conditions and the following disclaimer.
 
 12  #  * Redistributions in binary form must reproduce the above
 
 13  #    copyright notice, this list of conditions and the following
 
 14  #    disclaimer in the documentation and/or other materials provided
 
 15  #    with the distribution.
 
 16  #  * Neither the name of Fraunhofer nor the names of its
 
 17  #    contributors may be used to endorse or promote products derived
 
 18  #    from this software without specific prior written permission.
 
 19  #
 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 
 31  # POSSIBILITY OF SUCH DAMAGE.
 
 32  
 
 33  import socket 
 34  import threading 
 35  import time 
 36  import uuid 
 37  import xmlrpclib 
 38  
 
 39  from multimaster_msgs_fkie.msg import MasterState  # , LinkState, LinkStatesStamped, MasterState, ROSMaster, SyncMasterInfo, SyncTopicInfo 
 40  from multimaster_msgs_fkie.srv import DiscoverMasters, GetSyncInfo, GetSyncInfoResponse 
 41  import rospy 
 42  
 
 43  from master_discovery_fkie.common import masteruri_from_master, resolve_url, read_interface, create_pattern, is_empty_pattern 
 44  from master_discovery_fkie.master_info import MasterInfo 
 45  import master_discovery_fkie.interface_finder as interface_finder 
 46  
 
 47  from .sync_thread import SyncThread 
 48  
 
 49  
 
50 -class Main(object):
51 ''' 52 ''' 53 54 UPDATE_INTERVALL = 30 55
56 - def __init__(self):
57 ''' 58 Creates a new instance. Find the topic of the master_discovery node using 59 U{master_discovery_fkie.interface_finder.get_changes_topic() 60 <http://docs.ros.org/api/master_discovery_fkie/html/modules.html#interface-finder-module>}. 61 Also the parameter C{~ignore_hosts} will be analyzed to exclude hosts from sync. 62 ''' 63 self.masters = {} 64 # the connection to the local service master 65 self.materuri = masteruri_from_master() 66 '''@ivar: the ROS master URI of the C{local} ROS master. ''' 67 self.__lock = threading.RLock() 68 # load interface 69 self._load_interface() 70 # subscribe to changes notifier topics 71 topic_names = interface_finder.get_changes_topic(masteruri_from_master()) 72 self.sub_changes = dict() 73 '''@ivar: `dict` with topics {name: U{rospy.Subscriber<http://docs.ros.org/api/rospy/html/rospy.topics.Subscriber-class.html>}} publishes the changes of the discovered ROS masters.''' 74 for topic_name in topic_names: 75 rospy.loginfo("listen for updates on %s", topic_name) 76 self.sub_changes[topic_name] = rospy.Subscriber(topic_name, MasterState, self._rosmsg_callback_master_state) 77 self.__timestamp_local = None 78 self.__own_state = None 79 self.update_timer = None 80 self.own_state_getter = None 81 self._join_threads = dict() # threads waiting for stopping the sync thread 82 # initialize the ROS services 83 rospy.Service('~get_sync_info', GetSyncInfo, self._rosservice_get_sync_info) 84 rospy.on_shutdown(self.finish) 85 self.obtain_masters()
86
87 - def _rosmsg_callback_master_state(self, data):
88 ''' 89 The method to handle the received MasterState messages. Based on this message 90 new threads to synchronize with remote ROS master will be created, updated or 91 removed. 92 @param data: the received message 93 @type data: U{master_discovery_fkie.MasterState 94 <http://docs.ros.org/api/multimaster_msgs_fkie/html/msg/MasterState.html>} 95 ''' 96 with self.__lock: 97 if not rospy.is_shutdown(): 98 if data.state in [MasterState.STATE_REMOVED]: 99 self.remove_master(data.master.name) 100 elif data.state in [MasterState.STATE_NEW, MasterState.STATE_CHANGED]: 101 m = data.master 102 self.update_master(m.name, m.uri, m.timestamp, m.timestamp_local, m.discoverer_name, m.monitoruri, m.online)
103
104 - def obtain_masters(self):
105 ''' 106 This method use the service 'list_masters' of the master_discoverer to get 107 the list of discovered ROS master. Based on this list the L{SyncThread} for 108 synchronization will be created. 109 @see: U{master_discovery_fkie.interface_finder.get_listmaster_service() 110 <http://docs.ros.org/api/master_discovery_fkie/html/modules.html#interface-finder-module>} 111 ''' 112 if not rospy.is_shutdown(): 113 service_names = interface_finder.get_listmaster_service(masteruri_from_master(), False) 114 for service_name in service_names: 115 try: 116 with self.__lock: 117 try: 118 socket.setdefaulttimeout(5) 119 discoverMasters = rospy.ServiceProxy(service_name, DiscoverMasters) 120 resp = discoverMasters() 121 masters = [] 122 master_names = [m.name for m in resp.masters] 123 rospy.loginfo("ROS masters obtained from '%s': %s", service_name, master_names) 124 for m in resp.masters: 125 if self._can_sync(m.name): # do not sync to the master, if it is in ignore list or not in filled sync list 126 masters.append(m.name) 127 self.update_master(m.name, m.uri, m.timestamp, m.timestamp_local, m.discoverer_name, m.monitoruri, m.online) 128 for key in set(self.masters.keys()) - set(masters): 129 self.remove_master(self.masters[key].name) 130 except rospy.ServiceException, e: 131 rospy.logwarn("ERROR Service call 'list_masters' failed: %s", str(e)) 132 except: 133 import traceback 134 rospy.logwarn("ERROR while initial list masters: %s", traceback.format_exc()) 135 finally: 136 socket.setdefaulttimeout(None) 137 self.update_timer = threading.Timer(self.UPDATE_INTERVALL, self.obtain_masters) 138 self.update_timer.start()
139
140 - def update_master(self, mastername, masteruri, timestamp, timestamp_local, discoverer_name, monitoruri, online):
141 ''' 142 Updates the timestamp of the given ROS master, or creates a new L{SyncThread} to 143 synchronize the local master with given ROS master. 144 @param mastername: the name of the remote ROS master to update or synchronize. 145 @type mastername: C{str} 146 @param masteruri: the URI of the remote ROS master. 147 @type masteruri: C{str} 148 @param timestamp: the timestamp of the remote ROS master. 149 @type timestamp: C{float64} 150 @param timestamp_local: the timestamp of the remote ROS master. (only local changes) 151 @type timestamp_local: C{float64} 152 @param discoverer_name: the name of the remote master_discoverer node 153 @type discoverer_name: C{str} 154 @param monitoruri: the URI of the RPC interface of the remote master_discoverer node. 155 @type monitoruri: C{str} 156 @param online: the current state on the master. 157 @type online: C{bool} 158 ''' 159 try: 160 with self.__lock: 161 if (masteruri != self.materuri): 162 if self._can_sync(mastername): 163 # do not sync to the master, if it is in ignore list 164 if self.__resync_on_reconnect and mastername in self.masters: 165 self.masters[mastername].set_online(online, self.__resync_on_reconnect_timeout) 166 if online: 167 if mastername in self.masters: 168 # updates only, if local changes are occured 169 self.masters[mastername].update(mastername, masteruri, discoverer_name, monitoruri, timestamp_local) 170 else: 171 self.masters[mastername] = SyncThread(mastername, masteruri, discoverer_name, monitoruri, 0.0, self.__sync_topics_on_demand) 172 if self.__own_state is not None: 173 self.masters[mastername].set_own_masterstate(MasterInfo.from_list(self.__own_state)) 174 self.masters[mastername].update(mastername, masteruri, discoverer_name, monitoruri, timestamp_local) 175 elif self.__timestamp_local != timestamp_local and self.__sync_topics_on_demand: 176 # get the master info from local discovery master and set it to all sync threads 177 self.own_state_getter = threading.Thread(target=self.get_own_state, args=(monitoruri,)) 178 self.own_state_getter.start() 179 except: 180 import traceback 181 rospy.logwarn("ERROR while update master[%s]: %s", str(mastername), traceback.format_exc())
182
183 - def get_own_state(self, monitoruri):
184 ''' 185 Gets the master info from local master discovery and set it to all sync threads. 186 This function is running in a thread!!! 187 ''' 188 try: 189 socket.setdefaulttimeout(3) 190 own_monitor = xmlrpclib.ServerProxy(monitoruri) 191 self.__own_state = own_monitor.masterInfo() 192 own_state = MasterInfo.from_list(self.__own_state) 193 socket.setdefaulttimeout(None) 194 with self.__lock: 195 # update the state for all sync threads 196 for (_, s) in self.masters.iteritems(): 197 s.set_own_masterstate(own_state, self.__sync_topics_on_demand) 198 self.__timestamp_local = own_state.timestamp_local 199 except: 200 import traceback 201 rospy.logwarn("ERROR while getting own state from '%s': %s", monitoruri, traceback.format_exc()) 202 socket.setdefaulttimeout(None) 203 time.sleep(3) 204 if self.own_state_getter is not None and not rospy.is_shutdown(): 205 self.own_state_getter = threading.Thread(target=self.get_own_state, args=(monitoruri,)) 206 self.own_state_getter.start()
207
208 - def remove_master(self, ros_master_name):
209 ''' 210 Removes the master with given name from the synchronization list. 211 @param ros_master_name: the name of the ROS master to remove. 212 @type ros_master_name: C{str} 213 ''' 214 try: 215 with self.__lock: 216 if ros_master_name in self.masters: 217 m = self.masters.pop(ros_master_name) 218 ident = uuid.uuid4() 219 thread = threading.Thread(target=self._threading_stop_sync, args=(m, ident)) 220 self._join_threads[ident] = thread 221 thread.start() 222 except Exception: 223 import traceback 224 rospy.logwarn("ERROR while removing master[%s]: %s", ros_master_name, traceback.format_exc())
225
226 - def _threading_stop_sync(self, sync_thread, ident):
227 if isinstance(sync_thread, SyncThread): 228 rospy.loginfo(" Stop synchronization to `%s`" % sync_thread.name) 229 sync_thread.stop() 230 with self.__lock: 231 del self._join_threads[ident] 232 rospy.loginfo(" Finished synchronization to `%s`" % sync_thread.name) 233 del sync_thread
234
235 - def finish(self, msg=''):
236 ''' 237 Removes all remote masters and unregister their topics and services. 238 ''' 239 rospy.loginfo("Stop synchronization...") 240 with self.__lock: 241 # stop update timer 242 rospy.loginfo(" Stop timers...") 243 if self.update_timer is not None: 244 self.update_timer.cancel() 245 # unregister from update topics 246 rospy.loginfo(" Unregister from master discovery...") 247 for (_, v) in self.sub_changes.iteritems(): 248 v.unregister() 249 self.own_state_getter = None 250 # Stop all sync threads 251 for key in self.masters.keys(): 252 rospy.loginfo(" Remove master: %s", key) 253 self.remove_master(key) 254 # wait for their ending 255 while len(self._join_threads) > 0: 256 rospy.loginfo(" Wait for ending of %s threads ...", str(len(self._join_threads))) 257 time.sleep(1) 258 rospy.loginfo("Synchronization is now off")
259
260 - def _rosservice_get_sync_info(self, req):
261 ''' 262 Callback for the ROS service to get the info to synchronized nodes. 263 ''' 264 masters = list() 265 try: 266 with self.__lock: 267 for (_, s) in self.masters.iteritems(): 268 masters.append(s.get_sync_info()) 269 except: 270 import traceback 271 traceback.print_exc() 272 finally: 273 return GetSyncInfoResponse(masters)
274
275 - def _load_interface(self):
276 interface_file = resolve_url(rospy.get_param('~interface_url', '')) 277 if interface_file: 278 rospy.loginfo("interface_url: %s", interface_file) 279 try: 280 data = read_interface(interface_file) if interface_file else {} 281 # set the ignore hosts list 282 self._re_ignore_hosts = create_pattern('ignore_hosts', data, interface_file, []) 283 # set the sync hosts list 284 self._re_sync_hosts = create_pattern('sync_hosts', data, interface_file, []) 285 self.__sync_topics_on_demand = False 286 if interface_file: 287 if 'sync_topics_on_demand' in data: 288 self.__sync_topics_on_demand = data['sync_topics_on_demand'] 289 elif rospy.has_param('~sync_topics_on_demand'): 290 self.__sync_topics_on_demand = rospy.get_param('~sync_topics_on_demand') 291 rospy.loginfo("sync_topics_on_demand: %s", self.__sync_topics_on_demand) 292 self.__resync_on_reconnect = rospy.get_param('~resync_on_reconnect', True) 293 rospy.loginfo("resync_on_reconnect: %s", self.__resync_on_reconnect) 294 self.__resync_on_reconnect_timeout = rospy.get_param('~resync_on_reconnect_timeout', 0) 295 rospy.loginfo("resync_on_reconnect_timeout: %s", self.__resync_on_reconnect_timeout) 296 except: 297 import traceback 298 # kill the ros node, to notify the user about the error 299 rospy.logerr("Error on load interface: %s", traceback.format_exc()) 300 import os 301 import signal 302 os.kill(os.getpid(), signal.SIGKILL)
303
304 - def _can_sync(self, mastername):
305 result = False 306 if is_empty_pattern(self._re_ignore_hosts): 307 if is_empty_pattern(self._re_sync_hosts): 308 result = True 309 elif self._re_sync_hosts.match(mastername) is not None: 310 result = True 311 elif self._re_ignore_hosts.match(mastername) is None: 312 result = True 313 elif not is_empty_pattern(self._re_sync_hosts): 314 if self._re_sync_hosts.match(mastername) is not None: 315 result = True 316 return result
317