Package master_sync_fkie :: Module sync_thread
[frames] | no frames]

Source Code for Module master_sync_fkie.sync_thread

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2012, Fraunhofer FKIE/US, Alexander Tiderko 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Fraunhofer nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32   
 33   
 34  import random 
 35  import socket 
 36  import threading 
 37  import time 
 38  import traceback 
 39  import xmlrpclib 
 40   
 41  from multimaster_msgs_fkie.msg import SyncTopicInfo, SyncServiceInfo, SyncMasterInfo 
 42  import rospy 
 43   
 44  from master_discovery_fkie.common import masteruri_from_ros 
 45  from master_discovery_fkie.filter_interface import FilterInterface 
 46   
 47   
48 -class SyncThread(object):
49 ''' 50 A thread to synchronize the local ROS master with a remote master. While the 51 synchronization only the topic of the remote ROS master will be registered by 52 the local ROS master. The remote ROS master will be keep unchanged. 53 ''' 54 55 MAX_UPDATE_DELAY = 5 # times 56 57 MSG_ANY_TYPE = '*' 58
59 - def __init__(self, name, uri, discoverer_name, monitoruri, timestamp, sync_on_demand=False):
60 ''' 61 Initialization method for the SyncThread. 62 @param name: the name of the ROS master synchronized with. 63 @type name: C{str} 64 @param uri: the URI of the ROS master synchronized with 65 @type uri: C{str} 66 @param discoverer_name: the name of the discovery node running on ROS master synchronized with. 67 @type discoverer_name: C{str} 68 @param monitoruri: The URI of RPC server of the discovery node to get the ROS master state by calling a method only once. 69 @type monitoruri: C{str} 70 @param timestamp: The timestamp of the current state of the ROS master info. 71 @type timestamp: C{float64} 72 @param sync_on_demand: Synchronize topics on demand 73 @type sync_on_demand: bool 74 ''' 75 self.name = name 76 self.uri = uri 77 self.discoverer_name = discoverer_name 78 self.monitoruri = monitoruri 79 self.timestamp = timestamp 80 self.timestamp_local = 0. 81 self.timestamp_remote = 0. 82 self._online = True 83 self._offline_ts = 0 84 85 self.masteruri_local = masteruri_from_ros() 86 rospy.logdebug("SyncThread[%s]: create this sync thread", self.name) 87 # synchronization variables 88 self.__lock_info = threading.RLock() 89 self.__lock_intern = threading.RLock() 90 self._use_filtered_method = None 91 # SyncMasterInfo with currently synchronized nodes, publisher (topic, node, nodeuri), subscriber(topic, node, nodeuri) and services 92 self.__sync_info = None 93 self.__unregistered = False 94 # a list with published topics as a tuple of (topic name, node name, node URL) 95 self.__publisher = [] 96 # a list with subscribed topics as a tuple of (topic name, node name, node URL) 97 self.__subscriber = [] 98 # a list with services as a tuple of (service name, service URL, node name, node URL) 99 self.__services = [] 100 # the state of the own ROS master is used if `sync_on_demand` is enabled or 101 # to determine the type of topic subscribed remote with `Empty` type 102 self.__own_state = None 103 104 # setup the filter 105 self._filter = FilterInterface() 106 self._filter.load(self.name, 107 ['/rosout', rospy.get_name(), self.discoverer_name, '/node_manager', '/zeroconf'], [], 108 ['/rosout', '/rosout_agg'], ['/'] if sync_on_demand else [], 109 ['/*get_loggers', '/*set_logger_level'], [], 110 # do not sync the bond message of the nodelets!! 111 ['bond/Status'], 112 [], [], 113 []) 114 115 # congestion avoidance: wait for random.random*2 sec. If an update request 116 # is received try to cancel and restart the current timer. The timer can be 117 # canceled for maximal MAX_UPDATE_DELAY times. 118 self._update_timer = None 119 self._delayed_update = 0 120 self.__on_update = False
121
122 - def get_sync_info(self):
123 ''' 124 Returns the synchronized publisher, subscriber and services. 125 @rtype: SyncMasterInfo 126 ''' 127 with self.__lock_info: 128 if self.__sync_info is None: 129 # create a sync info 130 result_set = set() 131 result_publisher = [] 132 result_subscriber = [] 133 result_services = [] 134 for (t_n, n_n, n_uri) in self.__publisher: 135 result_publisher.append(SyncTopicInfo(t_n, n_n, n_uri)) 136 result_set.add(n_n) 137 for (t_n, n_n, n_uri) in self.__subscriber: 138 result_subscriber.append(SyncTopicInfo(t_n, n_n, n_uri)) 139 result_set.add(n_n) 140 for (s_n, s_uri, n_n, n_uri) in self.__services: 141 result_services.append(SyncServiceInfo(s_n, s_uri, n_n, n_uri)) 142 result_set.add(n_n) 143 self.__sync_info = SyncMasterInfo(self.uri, list(result_set), result_publisher, result_subscriber, result_services) 144 return self.__sync_info
145
146 - def set_online(self, value, resync_on_reconnect_timeout=0.):
147 if value: 148 if not self._online: 149 with self.__lock_intern: 150 self._online = True 151 offline_duration = time.time() - self._offline_ts 152 if offline_duration >= resync_on_reconnect_timeout: 153 rospy.loginfo("SyncThread[%s]: perform resync after the host was offline (unregister and register again to avoid connection losses to python topic. These does not suppot reconnection!)", self.name) 154 if self._update_timer is not None: 155 self._update_timer.cancel() 156 self._unreg_on_finish() 157 self.__unregistered = False 158 self.__publisher = [] 159 self.__subscriber = [] 160 self.__services = [] 161 self.timestamp = 0. 162 self.timestamp_local = 0. 163 self.timestamp_remote = 0. 164 else: 165 rospy.loginfo("SyncThread[%s]: skip resync after the host was offline because of resync_on_reconnect_timeout=%.2f and the host was only %.2f sec offline", self.name, resync_on_reconnect_timeout, offline_duration) 166 else: 167 self._online = False 168 self._offline_ts = time.time()
169
170 - def update(self, name, uri, discoverer_name, monitoruri, timestamp):
171 ''' 172 Sets a request to synchronize the local ROS master with this ROS master. 173 @note: If currently a synchronization is running this request will be ignored! 174 @param name: the name of the ROS master synchronized with. 175 @type name: C{str} 176 @param uri: the URI of the ROS master synchronized with 177 @type uri: C{str} 178 @param discoverer_name: the name of the discovery node running on ROS master synchronized with. 179 @type discoverer_name: C{str} 180 @param monitoruri: The URI of RPC server of the discovery node to get the ROS master state by calling a method only once. 181 @type monitoruri: C{str} 182 @param timestamp: The timestamp of the current state of the ROS master info. 183 @type timestamp: C{float64} 184 ''' 185 # rospy.logdebug("SyncThread[%s]: update request", self.name) 186 with self.__lock_intern: 187 self.timestamp_remote = timestamp 188 if (self.timestamp_local != timestamp): 189 rospy.logdebug("SyncThread[%s]: update notify new timestamp(%.9f), old(%.9f)", self.name, timestamp, self.timestamp_local) 190 self.name = name 191 self.uri = uri 192 self.discoverer_name = discoverer_name 193 self.monitoruri = monitoruri 194 self._request_update()
195 196 # rospy.logdebug("SyncThread[%s]: update exit", self.name) 197
198 - def set_own_masterstate(self, own_state, sync_on_demand=False):
199 ''' 200 Sets the state of the local ROS master state. If this state is not None, the topics on demand will be synchronized. 201 @param own_state: the state of the local ROS master state 202 @type own_state: C{master_discovery_fkie/MasterInfo} 203 @param sync_on_demand: if True, sync only topic, which are also local exists (Default: False) 204 @type sync_on_demand: bool 205 ''' 206 with self.__lock_intern: 207 timestamp_local = own_state.timestamp_local 208 if self.__own_state is None or (self.__own_state.timestamp_local != timestamp_local): 209 ownstate_ts = self.__own_state.timestamp_local if self.__own_state is not None else float('nan') 210 rospy.logdebug("SyncThread[%s]: local state update notify new timestamp(%.9f), old(%.9f)", self.name, timestamp_local, ownstate_ts) 211 self.__own_state = own_state 212 if sync_on_demand: 213 self._filter.update_sync_topics_pattern(self.__own_state.topic_names) 214 self._request_update()
215
216 - def stop(self):
217 ''' 218 Stops running thread. 219 ''' 220 rospy.logdebug(" SyncThread[%s]: stop request", self.name) 221 with self.__lock_intern: 222 if self._update_timer is not None: 223 self._update_timer.cancel() 224 self._unreg_on_finish() 225 rospy.logdebug(" SyncThread[%s]: stop exit", self.name)
226
227 - def _request_update(self):
228 with self.__lock_intern: 229 r = random.random() * 2. 230 # start update timer with a random waiting time to avoid a congestion picks on changes of ROS master state 231 if self._update_timer is None or not self._update_timer.isAlive(): 232 del self._update_timer 233 self._update_timer = threading.Timer(r, self._request_remote_state, args=(self._apply_remote_state,)) 234 self._update_timer.start() 235 else: 236 if self._delayed_update < self.MAX_UPDATE_DELAY: 237 # if the timer thread can be canceled start new one 238 self._update_timer.cancel() 239 # if callback (XMLRPC request) is already running the timer is not canceled -> test for `self.__on_update` 240 if not self._update_timer.isAlive() or not self.__on_update: 241 self._delayed_update += 1 242 self._update_timer = threading.Timer(r, self._request_remote_state, args=(self._apply_remote_state,)) 243 self._update_timer.start()
244
245 - def _request_remote_state(self, handler):
246 self._delayed_update = 0 247 self.__on_update = True 248 try: 249 # connect to master_monitor rpc-xml server of remote master discovery 250 socket.setdefaulttimeout(20) 251 remote_monitor = xmlrpclib.ServerProxy(self.monitoruri) 252 # determine the getting method: older versions have not a filtered method 253 if self._use_filtered_method is None: 254 try: 255 self._use_filtered_method = 'masterInfoFiltered' in remote_monitor.system.listMethods() 256 except: 257 self._use_filtered_method = False 258 remote_state = None 259 # get the state informations 260 rospy.loginfo("SyncThread[%s] Requesting remote state from '%s'", self.name, self.monitoruri) 261 if self._use_filtered_method: 262 remote_state = remote_monitor.masterInfoFiltered(self._filter.to_list()) 263 else: 264 remote_state = remote_monitor.masterInfo() 265 if not self.__unregistered: 266 handler(remote_state) 267 except: 268 rospy.logerr("SyncThread[%s] ERROR: %s", self.name, traceback.format_exc()) 269 finally: 270 self.__on_update = False 271 socket.setdefaulttimeout(None)
272
273 - def _apply_remote_state(self, remote_state):
274 rospy.loginfo("SyncThread[%s] Applying remote state...", self.name) 275 try: 276 stamp = float(remote_state[0]) 277 stamp_local = float(remote_state[1]) 278 remote_masteruri = remote_state[2] 279 # remote_mastername = remote_state[3] 280 publishers = remote_state[4] 281 subscribers = remote_state[5] 282 rservices = remote_state[6] 283 topicTypes = remote_state[7] 284 nodeProviders = remote_state[8] 285 serviceProviders = remote_state[9] 286 287 # create a multicall object 288 own_master = xmlrpclib.ServerProxy(self.masteruri_local) 289 own_master_multi = xmlrpclib.MultiCall(own_master) 290 # fill the multicall object 291 handler = [] 292 # sync the publishers 293 publisher = [] 294 publisher_to_register = [] 295 for (topic, nodes) in publishers: 296 for node in nodes: 297 topictype = self._get_topictype(topic, topicTypes) 298 nodeuri = self._get_nodeuri(node, nodeProviders, remote_masteruri) 299 if topictype and nodeuri and not self._do_ignore_ntp(node, topic, topictype): 300 # register the nodes only once 301 if not ((topic, node, nodeuri) in self.__publisher): 302 publisher_to_register.append((topic, topictype, node, nodeuri)) 303 publisher.append((topic, node, nodeuri)) 304 # unregister not updated publishers 305 for (topic, node, nodeuri) in set(self.__publisher) - set(publisher): 306 own_master_multi.unregisterPublisher(node, topic, nodeuri) 307 rospy.logdebug("SyncThread[%s]: UNPUB %s[%s] %s", 308 self.name, node, nodeuri, topic) 309 handler.append(('upub', topic, node, nodeuri)) 310 # register new publishers 311 for (topic, topictype, node, nodeuri) in publisher_to_register: 312 own_master_multi.registerPublisher(node, topic, topictype, nodeuri) 313 rospy.logdebug("SyncThread[%s]: PUB %s[%s] %s[%s]", 314 self.name, node, nodeuri, topic, topictype) 315 handler.append(('pub', topic, topictype, node, nodeuri)) 316 317 # sync the subscribers 318 subscriber = [] 319 subscriber_to_register = [] 320 for (topic, nodes) in subscribers: 321 for node in nodes: 322 topictype = self._get_topictype(topic, topicTypes) 323 nodeuri = self._get_nodeuri(node, nodeProviders, remote_masteruri) 324 # if remote topictype is None, try to set to the local topic type 325 # if not topictype and not self.__own_state is None: 326 # if topic in self.__own_state.topics: 327 # topictype = self.__own_state.topics[topic].type 328 if not topictype: 329 topictype = self.MSG_ANY_TYPE 330 if topictype and nodeuri and not self._do_ignore_nts(node, topic, topictype): 331 # register the node as subscriber in local ROS master 332 if not ((topic, node, nodeuri) in self.__subscriber): 333 subscriber_to_register.append((topic, topictype, node, nodeuri)) 334 subscriber.append((topic, node, nodeuri)) 335 # unregister not updated topics 336 for (topic, node, nodeuri) in set(self.__subscriber) - set(subscriber): 337 own_master_multi.unregisterSubscriber(node, topic, nodeuri) 338 rospy.logdebug("SyncThread[%s]: UNSUB %s[%s] %s", 339 self.name, node, nodeuri, topic) 340 handler.append(('usub', topic, node, nodeuri)) 341 # register new subscriber 342 for (topic, topictype, node, nodeuri) in subscriber_to_register: 343 own_master_multi.registerSubscriber(node, topic, topictype, nodeuri) 344 rospy.logdebug("SyncThread[%s]: SUB %s[%s] %s[%s]", 345 self.name, node, nodeuri, topic, topictype) 346 handler.append(('sub', topic, topictype, node, nodeuri)) 347 348 # sync the services 349 services = [] 350 services_to_register = [] 351 for (service, nodes) in rservices: 352 for node in nodes: 353 serviceuri = self._get_serviceuri(service, serviceProviders, remote_masteruri) 354 nodeuri = self._get_nodeuri(node, nodeProviders, remote_masteruri) 355 if serviceuri and nodeuri and not self._do_ignore_ns(node, service): 356 # register the node as publisher in local ROS master 357 if not ((service, serviceuri, node, nodeuri) in self.__services): 358 services_to_register.append((service, serviceuri, node, nodeuri)) 359 services.append((service, serviceuri, node, nodeuri)) 360 # unregister not updated services 361 for (service, serviceuri, node, nodeuri) in set(self.__services) - set(services): 362 own_master_multi.unregisterService(node, service, serviceuri) 363 rospy.logdebug("SyncThread[%s]: UNSRV %s[%s] %s[%s]", 364 self.name, node, nodeuri, service, serviceuri) 365 handler.append(('usrv', service, serviceuri, node, nodeuri)) 366 # register new services 367 for (service, serviceuri, node, nodeuri) in services_to_register: 368 own_master_multi.registerService(node, service, serviceuri, nodeuri) 369 rospy.logdebug("SyncThread[%s]: SRV %s[%s] %s[%s]", 370 self.name, node, nodeuri, service, serviceuri) 371 handler.append(('srv', service, serviceuri, node, nodeuri)) 372 373 # execute the multicall and update the current publicher, subscriber and services 374 if not self.__unregistered: 375 with self.__lock_info: 376 self.__sync_info = None 377 self.__publisher = publisher 378 self.__subscriber = subscriber 379 self.__services = services 380 # update the local ROS master 381 socket.setdefaulttimeout(3) 382 result = own_master_multi() 383 # analyze the results of the registration call 384 # HACK param to reduce publisher creation, see line 372 385 hack_pub = set() 386 for h, (code, statusMessage, r) in zip(handler, result): 387 try: 388 if h[0] == 'sub': 389 if code == -1: 390 rospy.logwarn("SyncThread[%s] topic subscription error: %s (%s), %s %s, node: %s", self.name, h[1], h[2], str(code), str(statusMessage), h[3]) 391 else: 392 rospy.logdebug("SyncThread[%s] topic subscribed: %s, %s %s, node: %s", self.name, h[1], str(code), str(statusMessage), h[3]) 393 if h[0] == 'sub' and code == 1 and len(r) > 0: 394 # Horrible hack: see line 372 395 hack_pub.add(h[1]) 396 elif h[0] == 'pub': 397 if code == -1: 398 rospy.logwarn("SyncThread[%s] topic advertise error: %s (%s), %s %s", self.name, h[1], h[2], str(code), str(statusMessage)) 399 else: 400 rospy.logdebug("SyncThread[%s] topic advertised: %s, %s %s", self.name, h[1], str(code), str(statusMessage)) 401 elif h[0] == 'usub': 402 rospy.logdebug("SyncThread[%s] topic unsubscribed: %s, %s %s", self.name, h[1], str(code), str(statusMessage)) 403 elif h[0] == 'upub': 404 rospy.logdebug("SyncThread[%s] topic unadvertised: %s, %s %s", self.name, h[1], str(code), str(statusMessage)) 405 elif h[0] == 'srv': 406 if code == -1: 407 rospy.logwarn("SyncThread[%s] service registration error: %s, %s %s", self.name, h[1], str(code), str(statusMessage)) 408 else: 409 rospy.logdebug("SyncThread[%s] service registered: %s, %s %s", self.name, h[1], str(code), str(statusMessage)) 410 elif h[0] == 'usrv': 411 rospy.logdebug("SyncThread[%s] service unregistered: %s, %s %s", self.name, h[1], str(code), str(statusMessage)) 412 except: 413 rospy.logerr("SyncThread[%s] ERROR while analyzing the results of the registration call [%s]: %s", self.name, h[1], traceback.format_exc()) 414 # Horrible hack: the response from registerSubscriber() can contain a 415 # list of current publishers. But we don't have a way of injecting them 416 # into rospy here. Now, if we get a publisherUpdate() from the master, 417 # everything will work. So, we ask the master if anyone is currently 418 # publishing the topic, grab the advertised type, use it to advertise 419 # ourselves, then unadvertise, triggering a publisherUpdate() along the 420 # way. 421 # We create publisher locally as a hack, to get callback set up properly for already registered local publishers 422 for m in hack_pub: 423 try: 424 rospy.loginfo("SyncThread[%s] Create and delete publisher to trigger publisherUpdate for %s", self.name, m) 425 topicPub = rospy.Publisher(m, rospy.msg.AnyMsg, queue_size=1) 426 topicPub.unregister() 427 del topicPub 428 except: 429 rospy.logerr("SyncThread[%s] ERROR while hack subscriber[%s]: %s", self.name, h[1], traceback.format_exc()) 430 # set the last synchronization time 431 self.timestamp = stamp 432 self.timestamp_local = stamp_local 433 rospy.logdebug("SyncThread[%s]: current timestamp %.9f, local %.9f", self.name, stamp, stamp_local) 434 if self.timestamp_remote > stamp_local: 435 rospy.logdebug("SyncThread[%s]: invoke next update, remote ts: %.9f", self.name, self.timestamp_remote) 436 self._update_timer = threading.Timer(random.random() * 2., self._request_remote_state, args=(self._apply_remote_state,)) 437 self._update_timer.start() 438 except: 439 rospy.logerr("SyncThread[%s] ERROR: %s", self.name, traceback.format_exc()) 440 finally: 441 socket.setdefaulttimeout(None) 442 rospy.loginfo("SyncThread[%s] remote state applied.", self.name)
443
444 - def _unreg_on_finish(self):
445 with self.__lock_info: 446 self.__unregistered = True 447 try: 448 rospy.logdebug(" SyncThread[%s] clear all registrations", self.name) 449 socket.setdefaulttimeout(5) 450 own_master = xmlrpclib.ServerProxy(self.masteruri_local) 451 own_master_multi = xmlrpclib.MultiCall(own_master) 452 # end routine if the master was removed 453 for topic, node, uri in self.__subscriber: 454 own_master_multi.unregisterSubscriber(node, topic, uri) 455 for topic, node, uri in self.__publisher: 456 own_master_multi.unregisterPublisher(node, topic, uri) 457 for service, serviceuri, node, uri in self.__services: 458 own_master_multi.unregisterService(node, service, serviceuri) 459 rospy.logdebug(" SyncThread[%s] execute a MultiCall", self.name) 460 _ = own_master_multi() 461 rospy.logdebug(" SyncThread[%s] finished", self.name) 462 except: 463 rospy.logerr("SyncThread[%s] ERROR while ending: %s", self.name, traceback.format_exc()) 464 socket.setdefaulttimeout(None)
465
466 - def _do_ignore_ntp(self, node, topic, topictype):
467 return self._filter.is_ignored_publisher(node, topic, topictype)
468
469 - def _do_ignore_nts(self, node, topic, topictype):
470 return self._filter.is_ignored_subscriber(node, topic, topictype)
471
472 - def _do_ignore_ns(self, node, service):
473 return self._filter.is_ignored_service(node, service)
474
475 - def _get_topictype(self, topic, topic_types):
476 for (topicname, topic_type) in topic_types: 477 if (topicname == topic): 478 return topic_type.replace('None', '') 479 return None
480
481 - def _get_nodeuri(self, node, nodes, remote_masteruri):
482 for (nodename, uri, masteruri, pid, local) in nodes: 483 if (nodename == node) and ((self._filter.sync_remote_nodes() and masteruri == remote_masteruri) or local == 'local'): 484 # the node was registered originally to another ROS master -> do sync 485 if masteruri != self.masteruri_local: 486 return uri 487 return None
488
489 - def _get_serviceuri(self, service, nodes, remote_masteruri):
490 for (servicename, uri, masteruri, topic_type, local) in nodes: 491 if (servicename == service) and ((self._filter.sync_remote_nodes() and masteruri == remote_masteruri) or local == 'local'): 492 if masteruri != self.masteruri_local: 493 return uri 494 return None
495