Package master_sync_fkie :: Module sync_thread
[frames] | no frames]

Source Code for Module master_sync_fkie.sync_thread

  1    # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2012, Fraunhofer FKIE/US, Alexander Tiderko 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Fraunhofer nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32   
 33   
 34  import time 
 35  import math 
 36  import threading 
 37  import xmlrpclib 
 38  import random 
 39  import socket 
 40   
 41  import roslib; roslib.load_manifest('master_sync_fkie') 
 42  import roslib.message 
 43  import rospy 
 44  import rosgraph.masterapi 
 45   
 46  from master_discovery_fkie.common import masteruri_from_ros 
 47  from master_discovery_fkie.filter_interface import FilterInterface 
 48  from multimaster_msgs_fkie.msg import SyncTopicInfo, SyncServiceInfo, SyncMasterInfo 
 49   
50 -class SyncThread(object):
51 ''' 52 A thread to synchronize the local ROS master with a remote master. While the 53 synchronization only the topic of the remote ROS master will be registered by 54 the local ROS master. The remote ROS master will be keep unchanged. 55 ''' 56 57 MAX_UPDATE_DELAY = 5 # times 58
59 - def __init__(self, name, uri, discoverer_name, monitoruri, timestamp, sync_on_demand=False):
60 ''' 61 Initialization method for the SyncThread. 62 @param name: the name of the ROS master synchronized with. 63 @type name: C{str} 64 @param uri: the URI of the ROS master synchronized with 65 @type uri: C{str} 66 @param discoverer_name: the name of the discovery node running on ROS master synchronized with. 67 @type discoverer_name: C{str} 68 @param monitoruri: The URI of RPC server of the discovery node to get the ROS master state by calling a method only once. 69 @type monitoruri: C{str} 70 @param timestamp: The timestamp of the current state of the ROS master info. 71 @type timestamp: C{float64} 72 @param sync_on_demand: Synchronize topics on demand 73 @type sync_on_demand: bool 74 ''' 75 self.name = name 76 self.uri = uri 77 self.discoverer_name = discoverer_name 78 self.monitoruri = monitoruri 79 self.timestamp = timestamp 80 self.timestamp_local = 0. 81 self.timestamp_remote = 0. 82 83 self.localMasteruri = masteruri_from_ros() 84 rospy.logdebug("SyncThread[%s]: create this sync thread", self.name) 85 # synchronization variables 86 self.__lock_info = threading.RLock() 87 self.__lock_intern = threading.RLock() 88 self._use_filtered_method = None 89 # SyncMasterInfo with currently synchronized nodes, publisher (topic, node, nodeuri), subscriber(topic, node, nodeuri) and services 90 self.__sync_info = None 91 self.__unregistered = False 92 # a list with published topics as a tuple of (topic name, node name, node URL) 93 self.__publisher = [] 94 # a list with subscribed topics as a tuple of (topic name, node name, node URL) 95 self.__subscriber = [] 96 # a list with services as a tuple of (service name, service URL, node name, node URL) 97 self.__services = [] 98 # the state of the own ROS master is used if `sync_on_demand` is enabled or 99 # to determine the type of topic subscribed remote with `Empty` type 100 self.__own_state = None 101 102 #setup the filter 103 self._filter = FilterInterface() 104 self._filter.load(self.name, 105 ['/rosout', rospy.get_name().replace('/', '/*')+'*', self.discoverer_name.replace('/', '/*')+'*', '/*node_manager', '/*zeroconf'], [], 106 ['/rosout', '/rosout_agg'], ['/'] if sync_on_demand else [], 107 ['/*get_loggers', '/*set_logger_level'], [], 108 # do not sync the bond message of the nodelets!! 109 ['bond/Status']) 110 111 # congestion avoidance: wait for random.random*2 sec. If an update request 112 # is received try to cancel and restart the current timer. The timer can be 113 # canceled for maximal MAX_UPDATE_DELAY times. 114 self._update_timer = None 115 self._delayed_update = 0 116 self.__on_update = False
117
118 - def getSyncInfo(self):
119 ''' 120 Returns the synchronized publisher, subscriber and services. 121 @rtype: SyncMasterInfo 122 ''' 123 with self.__lock_info: 124 if self.__sync_info is None: 125 # create a sync info 126 result_set = set() 127 result_publisher = [] 128 result_subscriber = [] 129 result_services = [] 130 for (t_n, n_n, n_uri) in self.__publisher: 131 result_publisher.append(SyncTopicInfo(t_n, n_n, n_uri)) 132 result_set.add(n_n) 133 for (t_n, n_n, n_uri) in self.__subscriber: 134 result_subscriber.append(SyncTopicInfo(t_n, n_n, n_uri)) 135 result_set.add(n_n) 136 for (s_n, s_uri, n_n, n_uri) in self.__services: 137 result_services.append(SyncServiceInfo(s_n, s_uri, n_n, n_uri)) 138 result_set.add(n_n) 139 self.__sync_info = SyncMasterInfo(self.uri, list(result_set), result_publisher, result_subscriber, result_services) 140 return self.__sync_info
141
142 - def update(self, name, uri, discoverer_name, monitoruri, timestamp):
143 ''' 144 Sets a request to synchronize the local ROS master with this ROS master. 145 @note: If currently a synchronization is running this request will be ignored! 146 @param name: the name of the ROS master synchronized with. 147 @type name: C{str} 148 @param uri: the URI of the ROS master synchronized with 149 @type uri: C{str} 150 @param discoverer_name: the name of the discovery node running on ROS master synchronized with. 151 @type discoverer_name: C{str} 152 @param monitoruri: The URI of RPC server of the discovery node to get the ROS master state by calling a method only once. 153 @type monitoruri: C{str} 154 @param timestamp: The timestamp of the current state of the ROS master info. 155 @type timestamp: C{float64} 156 ''' 157 # rospy.logdebug("SyncThread[%s]: update request", self.name) 158 with self.__lock_intern: 159 if (self.timestamp_local != timestamp): 160 rospy.logdebug("SyncThread[%s]: update notify new timestamp(%.9f), old(%.9f)", self.name, timestamp, self.timestamp_local) 161 self.name = name 162 self.uri = uri 163 self.discoverer_name = discoverer_name 164 self.monitoruri = monitoruri 165 self.timestamp_remote = timestamp 166 self._request_update()
167 # rospy.logdebug("SyncThread[%s]: update exit", self.name) 168
169 - def setOwnMasterState(self, own_state, sync_on_demand=False):
170 ''' 171 Sets the state of the local ROS master state. If this state is not None, the topics on demand will be synchronized. 172 @param own_state: the state of the local ROS master state 173 @type own_state: C{master_discovery_fkie/MasterInfo} 174 @param sync_on_demand: if True, sync only topic, which are also local exists (Default: False) 175 @type sync_on_demand: bool 176 ''' 177 # rospy.logdebug("SyncThread[%s]: setOwnMasterState", self.name) 178 with self.__lock_intern: 179 timestamp_local = own_state.timestamp_local 180 if self.__own_state is None or (self.__own_state.timestamp_local != timestamp_local): 181 rospy.logdebug("SyncThread[%s]: local state update notify new timestamp(%.9f), old(%.9f)", self.name, timestamp_local, self.__own_state.timestamp_local if not self.__own_state is None else float('nan')) 182 self.__own_state = own_state 183 if sync_on_demand: 184 self._filter.update_sync_topics_pattern(self.__own_state.topic_names) 185 self._request_update()
186 # rospy.logdebug("SyncThread[%s]: setOwnMasterState exit", self.name) 187
188 - def stop(self):
189 ''' 190 Stops running thread. 191 ''' 192 rospy.logdebug(" SyncThread[%s]: stop request", self.name) 193 with self.__lock_intern: 194 if not self._update_timer is None: 195 self._update_timer.cancel() 196 self._unreg_on_finish() 197 rospy.logdebug(" SyncThread[%s]: stop exit", self.name)
198
199 - def _request_update(self):
200 with self.__lock_intern: 201 r = random.random() * 2. 202 # start update timer with a random waiting time to avoid a congestion picks on changes of ROS master state 203 if self._update_timer is None or not self._update_timer.isAlive(): 204 del self._update_timer 205 self._update_timer = threading.Timer(r, self._request_remote_state, args=(self._apply_remote_state,)) 206 self._update_timer.start() 207 else: 208 if self._delayed_update < self.MAX_UPDATE_DELAY: 209 # if the timer thread can be canceled start new one 210 self._update_timer.cancel() 211 # if callback (XMLRPC request) is already running the timer is not canceled -> test for `self.__on_update` 212 if not self._update_timer.isAlive() or not self.__on_update: 213 self._delayed_update += 1 214 self._update_timer = threading.Timer(r, self._request_remote_state, args=(self._apply_remote_state,)) 215 self._update_timer.start()
216
217 - def _request_remote_state(self, handler):
218 self._delayed_update = 0 219 self.__on_update = True 220 try: 221 # connect to master_monitor rpc-xml server of remote master discovery 222 socket.setdefaulttimeout(20) 223 remote_monitor = xmlrpclib.ServerProxy(self.monitoruri) 224 # determine the getting method: older versions have not a filtered method 225 if self._use_filtered_method is None: 226 try: 227 self._use_filtered_method = 'masterInfoFiltered' in remote_monitor.system.listMethods() 228 except: 229 self._use_filtered_method = False 230 remote_state = None 231 # get the state informations 232 if self._use_filtered_method: 233 remote_state = remote_monitor.masterInfoFiltered(self._filter.to_list()) 234 else: 235 remote_state = remote_monitor.masterInfo() 236 if not self.__unregistered: 237 handler(remote_state) 238 except: 239 import traceback 240 rospy.logwarn("SyncThread[%s] ERROR: %s", self.name, traceback.format_exc()) 241 finally: 242 self.__on_update = False 243 socket.setdefaulttimeout(None)
244
245 - def _apply_remote_state(self, remote_state):
246 try: 247 stamp = float(remote_state[0]) 248 stamp_local = float(remote_state[1]) 249 remote_masteruri = remote_state[2] 250 # remote_mastername = remote_state[3] 251 publishers = remote_state[4] 252 subscribers = remote_state[5] 253 rservices = remote_state[6] 254 topicTypes = remote_state[7] 255 nodeProviders = remote_state[8] 256 serviceProviders = remote_state[9] 257 258 # create a multicall object 259 own_master = xmlrpclib.ServerProxy(self.localMasteruri) 260 own_master_multi = xmlrpclib.MultiCall(own_master) 261 # fill the multicall object 262 handler = [] 263 # sync the publishers 264 publisher = [] 265 publisher_to_register = [] 266 for (topic, nodes) in publishers: 267 for node in nodes: 268 topictype = self._getTopicType(topic, topicTypes) 269 nodeuri = self._getNodeUri(node, nodeProviders, remote_masteruri) 270 if topictype and nodeuri and not self._doIgnoreNT(node, topic, topictype): 271 # register the nodes only once 272 if not ((topic, node, nodeuri) in self.__publisher): 273 publisher_to_register.append((topic, topictype, node, nodeuri)) 274 publisher.append((topic, node, nodeuri)) 275 # unregister not updated publishers 276 for (topic, node, nodeuri) in set(self.__publisher)-set(publisher): 277 own_master_multi.unregisterPublisher(node, topic, nodeuri) 278 handler.append(('upub', topic, node, nodeuri)) 279 #register new publishers 280 for (topic, topictype, node, nodeuri) in publisher_to_register: 281 own_master_multi.registerPublisher(node, topic, topictype, nodeuri) 282 handler.append(('pub', topic, topictype, node, nodeuri)) 283 284 # sync the subscribers 285 subscriber = [] 286 subscriber_to_register = [] 287 for (topic, nodes) in subscribers: 288 for node in nodes: 289 topictype = self._getTopicType(topic, topicTypes) 290 nodeuri = self._getNodeUri(node, nodeProviders, remote_masteruri) 291 # if remote topictype is None, try to set to the local topic type 292 if not topictype and not self.__own_state is None: 293 if topic in self.__own_state.topics: 294 topictype = self.__own_state.topics[topic].type 295 if topictype and nodeuri and not self._doIgnoreNT(node, topic, topictype): 296 # register the node as subscriber in local ROS master 297 if not ((topic, node, nodeuri) in self.__subscriber): 298 subscriber_to_register.append((topic, topictype, node, nodeuri)) 299 subscriber.append((topic, node, nodeuri)) 300 # unregister not updated topics 301 for (topic, node, nodeuri) in set(self.__subscriber)-set(subscriber): 302 own_master_multi.unregisterSubscriber(node, topic, nodeuri) 303 handler.append(('usub', topic, node, nodeuri)) 304 #register new subscriber 305 for (topic, topictype, node, nodeuri) in subscriber_to_register: 306 own_master_multi.registerSubscriber(node, topic, topictype, nodeuri) 307 handler.append(('sub', topic, topictype, node, nodeuri)) 308 309 # sync the services 310 services = [] 311 services_to_register = [] 312 for (service, nodes) in rservices: 313 for node in nodes: 314 serviceuri = self._getServiceUri(service, serviceProviders, remote_masteruri) 315 nodeuri = self._getNodeUri(node, nodeProviders, remote_masteruri) 316 if serviceuri and nodeuri and not self._doIgnoreNS(node, service): 317 # register the node as publisher in local ROS master 318 if not ((service, serviceuri, node, nodeuri) in self.__services): 319 services_to_register.append((service, serviceuri, node, nodeuri)) 320 services.append((service, serviceuri, node, nodeuri)) 321 # unregister not updated services 322 for (service, serviceuri, node, nodeuri) in set(self.__services)-set(services): 323 own_master_multi.unregisterService(node, service, serviceuri) 324 handler.append(('usrv', service, serviceuri, node, nodeuri)) 325 #register new services 326 for (service, serviceuri, node, nodeuri) in services_to_register: 327 own_master_multi.registerService(node, service, serviceuri, nodeuri) 328 handler.append(('srv', service, serviceuri, node, nodeuri)) 329 330 # execute the multicall and update the current publicher, subscriber and services 331 if not self.__unregistered: 332 with self.__lock_info: 333 self.__sync_info = None 334 self.__publisher = publisher 335 self.__subscriber = subscriber 336 self.__services = services 337 # update the local ROS master 338 socket.setdefaulttimeout(3) 339 result = own_master_multi() 340 # analyze the results of the registration call 341 for h,(code, statusMessage, r) in zip(handler, result): 342 try: 343 if h[0] == 'sub': 344 if code == -1: 345 rospy.logwarn("SyncThread[%s] topic subscription error: %s (%s), %s %s", self.name, h[1], h[2], str(code), str(statusMessage)) 346 else: 347 rospy.loginfo("SyncThread[%s] topic subscribed: %s, %s %s", self.name, h[1], str(code), str(statusMessage)) 348 if h[0] == 'sub' and code == 1 and len(r) > 0: 349 # Horrible hack: the response from registerSubscriber() can contain a 350 # list of current publishers. But we don't have a way of injecting them 351 # into rospy here. Now, if we get a publisherUpdate() from the master, 352 # everything will work. So, we ask the master if anyone is currently 353 # publishing the topic, grab the advertised type, use it to advertise 354 # ourselves, then unadvertise, triggering a publisherUpdate() along the 355 # way. 356 # We create publisher locally as a hack, to get callback set up properly for already registered local publishers 357 topicPub = rospy.Publisher(h[1], roslib.message.get_message_class(h[2])) 358 topicPub.unregister() 359 del topicPub 360 elif h[0] == 'pub': 361 if code == -1: 362 rospy.logwarn("SyncThread[%s] topic advertise error: %s (%s), %s %s", self.name, h[1], h[2], str(code), str(statusMessage)) 363 else: 364 rospy.loginfo("SyncThread[%s] topic advertised: %s, %s %s", self.name, h[1], str(code), str(statusMessage)) 365 elif h[0] == 'usub': 366 rospy.loginfo("SyncThread[%s] topic unsubscribed: %s, %s %s", self.name, h[1], str(code), str(statusMessage)) 367 elif h[0] == 'upub': 368 rospy.loginfo("SyncThread[%s] topic unadvertised: %s, %s %s", self.name, h[1], str(code), str(statusMessage)) 369 elif h[0] == 'srv': 370 if code == -1: 371 rospy.logwarn("SyncThread[%s] service registration error: %s, %s %s", self.name, h[1], str(code), str(statusMessage)) 372 else: 373 rospy.loginfo("SyncThread[%s] service registered: %s, %s %s", self.name, h[1], str(code), str(statusMessage)) 374 elif h[0] == 'usrv': 375 rospy.loginfo("SyncThread[%s] service unregistered: %s, %s %s", self.name, h[1], str(code), str(statusMessage)) 376 except: 377 import traceback 378 rospy.logerr("SyncThread[%s] ERROR while hack subscriber[%s]: %s", self.name, h[1], traceback.format_exc()) 379 # set the last synchronization time 380 self.timestamp = stamp 381 self.timestamp_local = stamp_local 382 rospy.logdebug("SyncThread[%s]: current timestamp %.9f, local %.9f", self.name, stamp, stamp_local) 383 if self.timestamp_remote > stamp_local: 384 rospy.logdebug("SyncThread[%s]: invoke next update, remote ts: %.9f", self.name, self.timestamp_remote) 385 self._update_timer = threading.Timer(random.random() * 2., self._request_remote_state, args=(self._apply_remote_state,)) 386 self._update_timer.start() 387 except: 388 import traceback 389 rospy.logwarn("SyncThread[%s] ERROR: %s", self.name, traceback.format_exc()) 390 finally: 391 socket.setdefaulttimeout(None)
392
393 - def _unreg_on_finish(self):
394 with self.__lock_info: 395 self.__unregistered = True 396 try: 397 rospy.logdebug(" SyncThread[%s] clear all registrations", self.name) 398 socket.setdefaulttimeout(5) 399 own_master = xmlrpclib.ServerProxy(self.localMasteruri) 400 own_master_multi = xmlrpclib.MultiCall(own_master) 401 #end routine if the master was removed 402 for topic, node, uri in self.__subscriber: 403 own_master_multi.unregisterSubscriber(node, topic, uri) 404 for topic, node, uri in self.__publisher: 405 own_master_multi.unregisterPublisher(node, topic, uri) 406 for service, serviceuri, node, uri in self.__services: 407 own_master_multi.unregisterService(node, service, serviceuri) 408 rospy.logdebug(" SyncThread[%s] execute a MultiCall", self.name) 409 r = own_master_multi() 410 rospy.logdebug(" SyncThread[%s] finished", self.name) 411 except: 412 import traceback 413 rospy.logwarn("SyncThread[%s] ERROR while ending: %s", self.name, traceback.format_exc()) 414 socket.setdefaulttimeout(None)
415
416 - def _doIgnoreNT(self, node, topic, topictype):
417 return self._filter.is_ignored_topic(node, topic, topictype)
418
419 - def _doIgnoreNS(self, node, service):
420 return self._filter.is_ignored_service(node, service)
421
422 - def _getTopicType(self, topic, topicTypes):
423 for (topicname, type) in topicTypes: 424 if (topicname == topic): 425 return type.replace('None', '') 426 return None
427
428 - def _getNodeUri(self, node, nodes, remote_masteruri):
429 for (nodename, uri, masteruri, pid, local) in nodes: 430 if (nodename == node) and ((self._filter.sync_remote_nodes() and masteruri == remote_masteruri) or local == 'local'): 431 # the node was registered originally to another ROS master -> do sync 432 if masteruri != self.localMasteruri: 433 return uri 434 return None
435
436 - def _getServiceUri(self, service, nodes, remote_masteruri):
437 for (servicename, uri, masteruri, type, local) in nodes: 438 if (servicename == service) and ((self._filter.sync_remote_nodes() and masteruri == remote_masteruri) or local == 'local'): 439 if masteruri != self.localMasteruri: 440 return uri 441 return None
442