Package master_sync_fkie :: Module master_sync
[frames] | no frames]

Source Code for Module master_sync_fkie.master_sync

  1  # Software License Agreement (BSD License)
 
  2  #
 
  3  # Copyright (c) 2012, Fraunhofer FKIE/US, Alexander Tiderko
 
  4  # All rights reserved.
 
  5  #
 
  6  # Redistribution and use in source and binary forms, with or without
 
  7  # modification, are permitted provided that the following conditions
 
  8  # are met:
 
  9  #
 
 10  #  * Redistributions of source code must retain the above copyright
 
 11  #    notice, this list of conditions and the following disclaimer.
 
 12  #  * Redistributions in binary form must reproduce the above
 
 13  #    copyright notice, this list of conditions and the following
 
 14  #    disclaimer in the documentation and/or other materials provided
 
 15  #    with the distribution.
 
 16  #  * Neither the name of Fraunhofer nor the names of its
 
 17  #    contributors may be used to endorse or promote products derived
 
 18  #    from this software without specific prior written permission.
 
 19  #
 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 
 31  # POSSIBILITY OF SUCH DAMAGE.
 
 32  
 
 33  import uuid 
 34  import threading 
 35  import time 
 36  import xmlrpclib 
 37  import socket 
 38  
 
 39  import roslib; roslib.load_manifest('master_sync_fkie') 
 40  import rospy 
 41  
 
 42  from master_discovery_fkie.common import masteruri_from_ros, resolve_url, read_interface, create_pattern, is_empty_pattern 
 43  from sync_thread import SyncThread 
 44  from multimaster_msgs_fkie.msg import MasterState#, LinkState, LinkStatesStamped, MasterState, ROSMaster, SyncMasterInfo, SyncTopicInfo 
 45  from multimaster_msgs_fkie.srv import DiscoverMasters, GetSyncInfo, GetSyncInfoResponse 
 46  import master_discovery_fkie.interface_finder as interface_finder 
 47  from master_discovery_fkie.master_info import MasterInfo 
 48  
 
 49  
 
 50  
 
51 -class Main(object):
52 ''' 53 ''' 54 55 UPDATE_INTERVALL = 30 56
57 - def __init__(self):
58 ''' 59 Creates a new instance. Find the topic of the master_discovery node using 60 L{master_discovery_fkie.interface_finder.get_changes_topic()}. Also the 61 parameter C{~ignore_hosts} will be analyzed to exclude hosts from sync. 62 ''' 63 self.masters = {} 64 # the connection to the local service master 65 self.materuri = self.getMasteruri() 66 '''@ivar: the ROS master URI of the C{local} ROS master. ''' 67 self.__lock = threading.RLock() 68 #load interface 69 self._loadInterface() 70 # subscribe to changes notifier topics 71 topic_names = interface_finder.get_changes_topic(self.getMasteruri()) 72 self.sub_changes = dict() 73 '''@ivar: {dict} with topics C{(name: L{rospy.Subscriber})} publishes the changes of the discovered ROS masters.''' 74 for topic_name in topic_names: 75 rospy.loginfo("listen for updates on %s", topic_name) 76 self.sub_changes[topic_name] = rospy.Subscriber(topic_name, MasterState, self.handlerMasterStateMsg) 77 self.__timestamp_local = None 78 self.__own_state = None 79 self.update_timer = None 80 self.own_state_getter = None 81 self._join_threads = dict() # threads waiting for stopping the sync thread 82 # initialize the ROS services 83 rospy.Service('~get_sync_info', GetSyncInfo, self.rosservice_get_sync_info) 84 rospy.on_shutdown(self.finish) 85 self.retrieveMasters()
86
87 - def handlerMasterStateMsg(self, data):
88 ''' 89 The method to handle the received MasterState messages. Based on this message 90 new threads to synchronize with remote ROS master will be created, updated or 91 removed. 92 @param data: the received message 93 @type data: L{master_discovery_fkie.MasterState} 94 ''' 95 with self.__lock: 96 if not rospy.is_shutdown(): 97 if data.state in [MasterState.STATE_REMOVED]: 98 self.removeMaster(data.master.name) 99 elif data.state in [MasterState.STATE_NEW, MasterState.STATE_CHANGED]: 100 m = data.master 101 self.updateMaster(m.name, m.uri, m.timestamp, m.timestamp_local, m.discoverer_name, m.monitoruri)
102
103 - def getMasteruri(self):
104 ''' 105 Requests the ROS master URI from the ROS master through the RPC interface and 106 returns it. The 'materuri' attribute will be set to the requested value. 107 @return: ROS master URI 108 @rtype: C{str} or C{None} 109 ''' 110 if not hasattr(self, 'materuri') or self.materuri is None: 111 masteruri = masteruri_from_ros() 112 master = xmlrpclib.ServerProxy(masteruri) 113 code, message, self.materuri = master.getUri(rospy.get_name()) 114 return self.materuri
115
116 - def retrieveMasters(self):
117 ''' 118 This method use the service 'list_masters' of the master_discoverer to get 119 the list of discovered ROS master. Based on this list the L{SyncThread} for 120 synchronization will be created. 121 @see: L{master_discovery_fkie.interface_finder.get_listmaster_service()} 122 ''' 123 if not rospy.is_shutdown(): 124 rospy.loginfo("Update ROS master list...") 125 service_names = interface_finder.get_listmaster_service(self.getMasteruri(), False) 126 for service_name in service_names: 127 rospy.loginfo("service 'list_masters' found on %s", service_name) 128 try: 129 with self.__lock: 130 # rospy.wait_for_service(service_name) 131 try: 132 socket.setdefaulttimeout(5) 133 discoverMasters = rospy.ServiceProxy(service_name, DiscoverMasters) 134 resp = discoverMasters() 135 masters = [] 136 for m in resp.masters: 137 if not self._re_ignore_hosts.match(m.name) or self._re_sync_hosts.match(m.name): # do not sync to the master, if it is in ignore list 138 masters.append(m.name) 139 self.updateMaster(m.name, m.uri, m.timestamp, m.timestamp_local, m.discoverer_name, m.monitoruri) 140 for key in set(self.masters.keys()) - set(masters): 141 self.removeMaster(self.masters[key].name) 142 except rospy.ServiceException, e: 143 rospy.logwarn("ERROR Service call 'list_masters' failed: %s", str(e)) 144 except: 145 import traceback 146 rospy.logwarn("ERROR while initial list masters: %s", traceback.format_exc()) 147 finally: 148 socket.setdefaulttimeout(None) 149 self.update_timer = threading.Timer(self.UPDATE_INTERVALL, self.retrieveMasters) 150 self.update_timer.start()
151 152
153 - def updateMaster(self, mastername, masteruri, timestamp, timestamp_local, discoverer_name, monitoruri):
154 ''' 155 Updates the timestamp of the given ROS master, or creates a new SyncThread to 156 synchronize the local master with given ROS master. 157 @param mastername: the name of the remote ROS master to update or synchronize. 158 @type mastername: C{str} 159 @param masteruri: the URI of the remote ROS master. 160 @type masteruri: C{str} 161 @param timestamp: the timestamp of the remote ROS master. 162 @type timestamp: L{float64} 163 @param timestamp_local: the timestamp of the remote ROS master. (only local changes) 164 @type timestamp_local: L{float64} 165 @param discoverer_name: the name of the remote master_discoverer node 166 @type discoverer_name: C{str} 167 @param monitoruri: the URI of the RPC interface of the remote master_discoverer node. 168 @type monitoruri: C{str} 169 ''' 170 try: 171 with self.__lock: 172 if (masteruri != self.materuri): 173 if (is_empty_pattern(self._re_ignore_hosts) or not self._re_ignore_hosts.match(mastername) 174 or (not is_empty_pattern(self._re_sync_hosts) and not self._re_sync_hosts.match(mastername) is None)): # do not sync to the master, if it is in ignore list 175 if (mastername in self.masters): 176 # updates only, if local changes are occured 177 self.masters[mastername].update(mastername, masteruri, discoverer_name, monitoruri, timestamp_local) 178 else: 179 self.masters[mastername] = SyncThread(mastername, masteruri, discoverer_name, monitoruri, 0.0, self.__sync_topics_on_demand) 180 if not self.__own_state is None: 181 self.masters[mastername].setOwnMasterState(MasterInfo.from_list(self.__own_state)) 182 # self.own_state_getter = threading.Thread(target=self.get_own_state, args=(monitoruri,)) 183 # self.own_state_getter.start() 184 elif self.__timestamp_local != timestamp_local: 185 # get the master info from local discovery master and set it to all sync threads 186 self.own_state_getter = threading.Thread(target=self.get_own_state, args=(monitoruri,)) 187 self.own_state_getter.start() 188 except: 189 import traceback 190 rospy.logwarn("ERROR while update master[%s]: %s", str(mastername), traceback.format_exc())
191
192 - def get_own_state(self, monitoruri):
193 # get the master info from local discovery master and set it to all sync threads 194 # This function is running in a thread!!! 195 try: 196 socket.setdefaulttimeout(3) 197 own_monitor = xmlrpclib.ServerProxy(monitoruri) 198 self.__own_state = own_monitor.masterInfo() 199 own_state = MasterInfo.from_list(self.__own_state) 200 socket.setdefaulttimeout(None) 201 with self.__lock: 202 # update the state for all sync threads 203 for (mastername, s) in self.masters.iteritems(): 204 s.setOwnMasterState(own_state, self.__sync_topics_on_demand) 205 self.__timestamp_local = own_state.timestamp_local 206 except: 207 import traceback 208 print traceback.print_exc() 209 socket.setdefaulttimeout(None) 210 time.sleep(3) 211 if not self.own_state_getter is None and not rospy.is_shutdown(): 212 self.own_state_getter = threading.Thread(target=self.get_own_state, args=(monitoruri,)) 213 self.own_state_getter.start()
214
215 - def removeMaster(self, ros_master_name):
216 ''' 217 Removes the master with given name from the synchronization list. 218 @param ros_master_name: the name of the ROS master to remove. 219 @type ros_master_name: C{str} 220 ''' 221 try: 222 with self.__lock: 223 if (ros_master_name in self.masters): 224 m = self.masters.pop(ros_master_name) 225 id = uuid.uuid4() 226 thread = threading.Thread(target=self._threading_stop_sync, args=(m, id)) 227 self._join_threads[id] = thread 228 thread.start() 229 except Exception: 230 import traceback 231 rospy.logwarn("ERROR while removing master[%s]: %s", ros_master_name, traceback.format_exc())
232
233 - def _threading_stop_sync(self, sync_thread, id):
234 if isinstance(sync_thread, SyncThread): 235 rospy.loginfo(" Stop synchronization to `%s`"%sync_thread.name) 236 sync_thread.stop() 237 with self.__lock: 238 del self._join_threads[id] 239 rospy.loginfo(" Finished synchronization to `%s`"%sync_thread.name) 240 del sync_thread
241
242 - def finish(self, msg=''):
243 ''' 244 Removes all remote masters and unregister their topics and services. 245 ''' 246 rospy.loginfo("Stop synchronization...") 247 with self.__lock: 248 # stop update timer 249 rospy.loginfo(" Stop timers...") 250 if not self.update_timer is None: 251 self.update_timer.cancel() 252 # unregister from update topics 253 rospy.loginfo(" Unregister from master discovery...") 254 for (k, v) in self.sub_changes.iteritems(): 255 v.unregister() 256 self.own_state_getter = None 257 # Stop all sync threads 258 for key in self.masters.keys(): 259 rospy.loginfo(" Remove master: %s", key) 260 self.removeMaster(key) 261 # wait for their ending 262 while len(self._join_threads) > 0: 263 rospy.loginfo(" Wait for ending of %s threads ...", str(len(self._join_threads))) 264 time.sleep(1) 265 rospy.loginfo("Synchronization is now off")
266
267 - def rosservice_get_sync_info(self, req):
268 ''' 269 Callback for the ROS service to get the info to synchronized nodes. 270 ''' 271 masters = list() 272 try: 273 with self.__lock: 274 for (mastername, s) in self.masters.iteritems(): 275 masters.append(s.getSyncInfo()) 276 except: 277 import traceback 278 traceback.print_exc() 279 finally: 280 return GetSyncInfoResponse(masters)
281
282 - def _loadInterface(self):
283 interface_file = resolve_url(rospy.get_param('~interface_url', '')) 284 if interface_file: 285 rospy.loginfo("interface_url: %s", interface_file) 286 try: 287 data = read_interface(interface_file) if interface_file else {} 288 # set the ignore hosts list 289 self._re_ignore_hosts = create_pattern('ignore_hosts', data, interface_file, []) 290 # set the sync hosts list 291 self._re_sync_hosts = create_pattern('sync_hosts', data, interface_file, []) 292 self.__sync_topics_on_demand = False 293 if interface_file: 294 if data.has_key('sync_topics_on_demand'): 295 self.__sync_topics_on_demand = data['sync_topics_on_demand'] 296 elif rospy.has_param('~sync_topics_on_demand'): 297 self.__sync_topics_on_demand = rospy.get_param('~sync_topics_on_demand') 298 rospy.loginfo("sync_topics_on_demand: %s", self.__sync_topics_on_demand) 299 except: 300 import traceback 301 # kill the ros node, to notify the user about the error 302 rospy.logerr("Error on load interface: %s", traceback.format_exc()) 303 import os, signal 304 os.kill(os.getpid(), signal.SIGKILL)
305