Package master_sync_fkie :: Module sync_thread
[frames] | no frames]

Source Code for Module master_sync_fkie.sync_thread

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2012, Fraunhofer FKIE/US, Alexander Tiderko 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Fraunhofer nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32   
 33   
 34  import random 
 35  import socket 
 36  import threading 
 37  import time 
 38  import traceback 
 39  try: 
 40      import xmlrpclib 
 41  except ImportError: 
 42      import xmlrpc.client as xmlrpclib 
 43   
 44  from multimaster_msgs_fkie.msg import SyncTopicInfo, SyncServiceInfo, SyncMasterInfo 
 45  import rospy 
 46   
 47  from master_discovery_fkie.common import masteruri_from_ros 
 48  from master_discovery_fkie.filter_interface import FilterInterface 
 49   
 50   
51 -class SyncThread(object):
52 ''' 53 A thread to synchronize the local ROS master with a remote master. While the 54 synchronization only the topic of the remote ROS master will be registered by 55 the local ROS master. The remote ROS master will be keep unchanged. 56 ''' 57 58 MAX_UPDATE_DELAY = 5 # times 59 60 MSG_ANY_TYPE = '*' 61
62 - def __init__(self, name, uri, discoverer_name, monitoruri, timestamp, sync_on_demand=False):
63 ''' 64 Initialization method for the SyncThread. 65 @param name: the name of the ROS master synchronized with. 66 @type name: C{str} 67 @param uri: the URI of the ROS master synchronized with 68 @type uri: C{str} 69 @param discoverer_name: the name of the discovery node running on ROS master synchronized with. 70 @type discoverer_name: C{str} 71 @param monitoruri: The URI of RPC server of the discovery node to get the ROS master state by calling a method only once. 72 @type monitoruri: C{str} 73 @param timestamp: The timestamp of the current state of the ROS master info. 74 @type timestamp: C{float64} 75 @param sync_on_demand: Synchronize topics on demand 76 @type sync_on_demand: bool 77 ''' 78 self.name = name 79 self.uri = uri 80 self.discoverer_name = discoverer_name 81 self.monitoruri = monitoruri 82 self.timestamp = timestamp 83 self.timestamp_local = 0. 84 self.timestamp_remote = 0. 85 self._online = True 86 self._offline_ts = 0 87 88 self.masteruri_local = masteruri_from_ros() 89 rospy.logdebug("SyncThread[%s]: create this sync thread", self.name) 90 # synchronization variables 91 self.__lock_info = threading.RLock() 92 self.__lock_intern = threading.RLock() 93 self._use_filtered_method = None 94 # SyncMasterInfo with currently synchronized nodes, publisher (topic, node, nodeuri), subscriber(topic, node, nodeuri) and services 95 self.__sync_info = None 96 self.__unregistered = False 97 # a list with published topics as a tuple of (topic name, node name, node URL) 98 self.__publisher = [] 99 # a list with subscribed topics as a tuple of (topic name, node name, node URL) 100 self.__subscriber = [] 101 # a list with services as a tuple of (service name, service URL, node name, node URL) 102 self.__services = [] 103 # the state of the own ROS master is used if `sync_on_demand` is enabled or 104 # to determine the type of topic subscribed remote with `Empty` type 105 self.__own_state = None 106 107 # setup the filter 108 self._filter = FilterInterface() 109 self._filter.load(self.name, 110 ['/rosout', rospy.get_name(), self.discoverer_name, '/node_manager', '/zeroconf'], [], 111 ['/rosout', '/rosout_agg'], ['/'] if sync_on_demand else [], 112 ['/*get_loggers', '/*set_logger_level'], [], 113 # do not sync the bond message of the nodelets!! 114 ['bond/Status'], 115 [], [], 116 []) 117 118 # congestion avoidance: wait for random.random*2 sec. If an update request 119 # is received try to cancel and restart the current timer. The timer can be 120 # canceled for maximal MAX_UPDATE_DELAY times. 121 self._update_timer = None 122 self._delayed_update = 0 123 self.__on_update = False
124
125 - def get_sync_info(self):
126 ''' 127 Returns the synchronized publisher, subscriber and services. 128 @rtype: SyncMasterInfo 129 ''' 130 with self.__lock_info: 131 if self.__sync_info is None: 132 # create a sync info 133 result_set = set() 134 result_publisher = [] 135 result_subscriber = [] 136 result_services = [] 137 for (t_n, n_n, n_uri) in self.__publisher: 138 result_publisher.append(SyncTopicInfo(t_n, n_n, n_uri)) 139 result_set.add(n_n) 140 for (t_n, n_n, n_uri) in self.__subscriber: 141 result_subscriber.append(SyncTopicInfo(t_n, n_n, n_uri)) 142 result_set.add(n_n) 143 for (s_n, s_uri, n_n, n_uri) in self.__services: 144 result_services.append(SyncServiceInfo(s_n, s_uri, n_n, n_uri)) 145 result_set.add(n_n) 146 self.__sync_info = SyncMasterInfo(self.uri, list(result_set), result_publisher, result_subscriber, result_services) 147 return self.__sync_info
148
149 - def set_online(self, value, resync_on_reconnect_timeout=0.):
150 if value: 151 if not self._online: 152 with self.__lock_intern: 153 self._online = True 154 offline_duration = time.time() - self._offline_ts 155 if offline_duration >= resync_on_reconnect_timeout: 156 rospy.loginfo("SyncThread[%s]: perform resync after the host was offline (unregister and register again to avoid connection losses to python topic. These does not suppot reconnection!)", self.name) 157 if self._update_timer is not None: 158 self._update_timer.cancel() 159 self._unreg_on_finish() 160 self.__unregistered = False 161 self.__publisher = [] 162 self.__subscriber = [] 163 self.__services = [] 164 self.timestamp = 0. 165 self.timestamp_local = 0. 166 self.timestamp_remote = 0. 167 else: 168 rospy.loginfo("SyncThread[%s]: skip resync after the host was offline because of resync_on_reconnect_timeout=%.2f and the host was only %.2f sec offline", self.name, resync_on_reconnect_timeout, offline_duration) 169 else: 170 self._online = False 171 self._offline_ts = time.time()
172
173 - def update(self, name, uri, discoverer_name, monitoruri, timestamp):
174 ''' 175 Sets a request to synchronize the local ROS master with this ROS master. 176 @note: If currently a synchronization is running this request will be ignored! 177 @param name: the name of the ROS master synchronized with. 178 @type name: C{str} 179 @param uri: the URI of the ROS master synchronized with 180 @type uri: C{str} 181 @param discoverer_name: the name of the discovery node running on ROS master synchronized with. 182 @type discoverer_name: C{str} 183 @param monitoruri: The URI of RPC server of the discovery node to get the ROS master state by calling a method only once. 184 @type monitoruri: C{str} 185 @param timestamp: The timestamp of the current state of the ROS master info. 186 @type timestamp: C{float64} 187 ''' 188 # rospy.logdebug("SyncThread[%s]: update request", self.name) 189 with self.__lock_intern: 190 self.timestamp_remote = timestamp 191 if (self.timestamp_local != timestamp): 192 rospy.logdebug("SyncThread[%s]: update notify new timestamp(%.9f), old(%.9f)", self.name, timestamp, self.timestamp_local) 193 self.name = name 194 self.uri = uri 195 self.discoverer_name = discoverer_name 196 self.monitoruri = monitoruri 197 self._request_update()
198 199 # rospy.logdebug("SyncThread[%s]: update exit", self.name) 200
201 - def set_own_masterstate(self, own_state, sync_on_demand=False):
202 ''' 203 Sets the state of the local ROS master state. If this state is not None, the topics on demand will be synchronized. 204 @param own_state: the state of the local ROS master state 205 @type own_state: C{master_discovery_fkie/MasterInfo} 206 @param sync_on_demand: if True, sync only topic, which are also local exists (Default: False) 207 @type sync_on_demand: bool 208 ''' 209 with self.__lock_intern: 210 timestamp_local = own_state.timestamp_local 211 if self.__own_state is None or (self.__own_state.timestamp_local != timestamp_local): 212 ownstate_ts = self.__own_state.timestamp_local if self.__own_state is not None else float('nan') 213 rospy.logdebug("SyncThread[%s]: local state update notify new timestamp(%.9f), old(%.9f)", self.name, timestamp_local, ownstate_ts) 214 self.__own_state = own_state 215 if sync_on_demand: 216 self._filter.update_sync_topics_pattern(self.__own_state.topic_names) 217 self._request_update()
218
219 - def stop(self):
220 ''' 221 Stops running thread. 222 ''' 223 rospy.logdebug(" SyncThread[%s]: stop request", self.name) 224 with self.__lock_intern: 225 if self._update_timer is not None: 226 self._update_timer.cancel() 227 self._unreg_on_finish() 228 rospy.logdebug(" SyncThread[%s]: stop exit", self.name)
229
230 - def _request_update(self):
231 with self.__lock_intern: 232 r = random.random() * 2. 233 # start update timer with a random waiting time to avoid a congestion picks on changes of ROS master state 234 if self._update_timer is None or not self._update_timer.isAlive(): 235 del self._update_timer 236 self._update_timer = threading.Timer(r, self._request_remote_state, args=(self._apply_remote_state,)) 237 self._update_timer.start() 238 else: 239 if self._delayed_update < self.MAX_UPDATE_DELAY: 240 # if the timer thread can be canceled start new one 241 self._update_timer.cancel() 242 # if callback (XMLRPC request) is already running the timer is not canceled -> test for `self.__on_update` 243 if not self._update_timer.isAlive() or not self.__on_update: 244 self._delayed_update += 1 245 self._update_timer = threading.Timer(r, self._request_remote_state, args=(self._apply_remote_state,)) 246 self._update_timer.start()
247
248 - def _request_remote_state(self, handler):
249 self._delayed_update = 0 250 self.__on_update = True 251 try: 252 # connect to master_monitor rpc-xml server of remote master discovery 253 socket.setdefaulttimeout(20) 254 remote_monitor = xmlrpclib.ServerProxy(self.monitoruri) 255 # determine the getting method: older versions have not a filtered method 256 if self._use_filtered_method is None: 257 try: 258 self._use_filtered_method = 'masterInfoFiltered' in remote_monitor.system.listMethods() 259 except: 260 self._use_filtered_method = False 261 remote_state = None 262 # get the state informations 263 rospy.loginfo("SyncThread[%s] Requesting remote state from '%s'", self.name, self.monitoruri) 264 if self._use_filtered_method: 265 remote_state = remote_monitor.masterInfoFiltered(self._filter.to_list()) 266 else: 267 remote_state = remote_monitor.masterInfo() 268 if not self.__unregistered: 269 handler(remote_state) 270 except: 271 rospy.logerr("SyncThread[%s] ERROR: %s", self.name, traceback.format_exc()) 272 finally: 273 self.__on_update = False 274 socket.setdefaulttimeout(None)
275
276 - def _apply_remote_state(self, remote_state):
277 rospy.loginfo("SyncThread[%s] Applying remote state...", self.name) 278 try: 279 stamp = float(remote_state[0]) 280 stamp_local = float(remote_state[1]) 281 remote_masteruri = remote_state[2] 282 # remote_mastername = remote_state[3] 283 publishers = remote_state[4] 284 subscribers = remote_state[5] 285 rservices = remote_state[6] 286 topicTypes = remote_state[7] 287 nodeProviders = remote_state[8] 288 serviceProviders = remote_state[9] 289 290 # create a multicall object 291 own_master = xmlrpclib.ServerProxy(self.masteruri_local) 292 own_master_multi = xmlrpclib.MultiCall(own_master) 293 # fill the multicall object 294 handler = [] 295 # sync the publishers 296 publisher = [] 297 publisher_to_register = [] 298 for (topic, nodes) in publishers: 299 for node in nodes: 300 topictype = self._get_topictype(topic, topicTypes) 301 nodeuri = self._get_nodeuri(node, nodeProviders, remote_masteruri) 302 if topictype and nodeuri and not self._do_ignore_ntp(node, topic, topictype): 303 # register the nodes only once 304 if not ((topic, node, nodeuri) in self.__publisher): 305 publisher_to_register.append((topic, topictype, node, nodeuri)) 306 publisher.append((topic, node, nodeuri)) 307 # unregister not updated publishers 308 for (topic, node, nodeuri) in set(self.__publisher) - set(publisher): 309 own_master_multi.unregisterPublisher(node, topic, nodeuri) 310 rospy.logdebug("SyncThread[%s]: UNPUB %s[%s] %s", 311 self.name, node, nodeuri, topic) 312 handler.append(('upub', topic, node, nodeuri)) 313 # register new publishers 314 for (topic, topictype, node, nodeuri) in publisher_to_register: 315 own_master_multi.registerPublisher(node, topic, topictype, nodeuri) 316 rospy.logdebug("SyncThread[%s]: PUB %s[%s] %s[%s]", 317 self.name, node, nodeuri, topic, topictype) 318 handler.append(('pub', topic, topictype, node, nodeuri)) 319 320 # sync the subscribers 321 subscriber = [] 322 subscriber_to_register = [] 323 for (topic, nodes) in subscribers: 324 for node in nodes: 325 topictype = self._get_topictype(topic, topicTypes) 326 nodeuri = self._get_nodeuri(node, nodeProviders, remote_masteruri) 327 # if remote topictype is None, try to set to the local topic type 328 # if not topictype and not self.__own_state is None: 329 # if topic in self.__own_state.topics: 330 # topictype = self.__own_state.topics[topic].type 331 if not topictype: 332 topictype = self.MSG_ANY_TYPE 333 if topictype and nodeuri and not self._do_ignore_nts(node, topic, topictype): 334 # register the node as subscriber in local ROS master 335 if not ((topic, node, nodeuri) in self.__subscriber): 336 subscriber_to_register.append((topic, topictype, node, nodeuri)) 337 subscriber.append((topic, node, nodeuri)) 338 # unregister not updated topics 339 for (topic, node, nodeuri) in set(self.__subscriber) - set(subscriber): 340 own_master_multi.unregisterSubscriber(node, topic, nodeuri) 341 rospy.logdebug("SyncThread[%s]: UNSUB %s[%s] %s", 342 self.name, node, nodeuri, topic) 343 handler.append(('usub', topic, node, nodeuri)) 344 # register new subscriber 345 for (topic, topictype, node, nodeuri) in subscriber_to_register: 346 own_master_multi.registerSubscriber(node, topic, topictype, nodeuri) 347 rospy.logdebug("SyncThread[%s]: SUB %s[%s] %s[%s]", 348 self.name, node, nodeuri, topic, topictype) 349 handler.append(('sub', topic, topictype, node, nodeuri)) 350 351 # sync the services 352 services = [] 353 services_to_register = [] 354 for (service, nodes) in rservices: 355 for node in nodes: 356 serviceuri = self._get_serviceuri(service, serviceProviders, remote_masteruri) 357 nodeuri = self._get_nodeuri(node, nodeProviders, remote_masteruri) 358 if serviceuri and nodeuri and not self._do_ignore_ns(node, service): 359 # register the node as publisher in local ROS master 360 if not ((service, serviceuri, node, nodeuri) in self.__services): 361 services_to_register.append((service, serviceuri, node, nodeuri)) 362 services.append((service, serviceuri, node, nodeuri)) 363 # unregister not updated services 364 for (service, serviceuri, node, nodeuri) in set(self.__services) - set(services): 365 own_master_multi.unregisterService(node, service, serviceuri) 366 rospy.logdebug("SyncThread[%s]: UNSRV %s[%s] %s[%s]", 367 self.name, node, nodeuri, service, serviceuri) 368 handler.append(('usrv', service, serviceuri, node, nodeuri)) 369 # register new services 370 for (service, serviceuri, node, nodeuri) in services_to_register: 371 own_master_multi.registerService(node, service, serviceuri, nodeuri) 372 rospy.logdebug("SyncThread[%s]: SRV %s[%s] %s[%s]", 373 self.name, node, nodeuri, service, serviceuri) 374 handler.append(('srv', service, serviceuri, node, nodeuri)) 375 376 # execute the multicall and update the current publicher, subscriber and services 377 if not self.__unregistered: 378 with self.__lock_info: 379 self.__sync_info = None 380 self.__publisher = publisher 381 self.__subscriber = subscriber 382 self.__services = services 383 # update the local ROS master 384 socket.setdefaulttimeout(3) 385 result = own_master_multi() 386 # analyze the results of the registration call 387 # HACK param to reduce publisher creation, see line 372 388 hack_pub = set() 389 for h, (code, statusMessage, r) in zip(handler, result): 390 try: 391 if h[0] == 'sub': 392 if code == -1: 393 rospy.logwarn("SyncThread[%s] topic subscription error: %s (%s), %s %s, node: %s", self.name, h[1], h[2], str(code), str(statusMessage), h[3]) 394 else: 395 rospy.logdebug("SyncThread[%s] topic subscribed: %s, %s %s, node: %s", self.name, h[1], str(code), str(statusMessage), h[3]) 396 if h[0] == 'sub' and code == 1 and len(r) > 0: 397 # Horrible hack: see line 372 398 hack_pub.add(h[1]) 399 elif h[0] == 'pub': 400 if code == -1: 401 rospy.logwarn("SyncThread[%s] topic advertise error: %s (%s), %s %s", self.name, h[1], h[2], str(code), str(statusMessage)) 402 else: 403 rospy.logdebug("SyncThread[%s] topic advertised: %s, %s %s", self.name, h[1], str(code), str(statusMessage)) 404 elif h[0] == 'usub': 405 rospy.logdebug("SyncThread[%s] topic unsubscribed: %s, %s %s", self.name, h[1], str(code), str(statusMessage)) 406 elif h[0] == 'upub': 407 rospy.logdebug("SyncThread[%s] topic unadvertised: %s, %s %s", self.name, h[1], str(code), str(statusMessage)) 408 elif h[0] == 'srv': 409 if code == -1: 410 rospy.logwarn("SyncThread[%s] service registration error: %s, %s %s", self.name, h[1], str(code), str(statusMessage)) 411 else: 412 rospy.logdebug("SyncThread[%s] service registered: %s, %s %s", self.name, h[1], str(code), str(statusMessage)) 413 elif h[0] == 'usrv': 414 rospy.logdebug("SyncThread[%s] service unregistered: %s, %s %s", self.name, h[1], str(code), str(statusMessage)) 415 except: 416 rospy.logerr("SyncThread[%s] ERROR while analyzing the results of the registration call [%s]: %s", self.name, h[1], traceback.format_exc()) 417 # Horrible hack: the response from registerSubscriber() can contain a 418 # list of current publishers. But we don't have a way of injecting them 419 # into rospy here. Now, if we get a publisherUpdate() from the master, 420 # everything will work. So, we ask the master if anyone is currently 421 # publishing the topic, grab the advertised type, use it to advertise 422 # ourselves, then unadvertise, triggering a publisherUpdate() along the 423 # way. 424 # We create publisher locally as a hack, to get callback set up properly for already registered local publishers 425 for m in hack_pub: 426 try: 427 rospy.loginfo("SyncThread[%s] Create and delete publisher to trigger publisherUpdate for %s", self.name, m) 428 topicPub = rospy.Publisher(m, rospy.msg.AnyMsg, queue_size=1) 429 topicPub.unregister() 430 del topicPub 431 except: 432 rospy.logerr("SyncThread[%s] ERROR while hack subscriber[%s]: %s", self.name, h[1], traceback.format_exc()) 433 # set the last synchronization time 434 self.timestamp = stamp 435 self.timestamp_local = stamp_local 436 rospy.logdebug("SyncThread[%s]: current timestamp %.9f, local %.9f", self.name, stamp, stamp_local) 437 if self.timestamp_remote > stamp_local: 438 rospy.logdebug("SyncThread[%s]: invoke next update, remote ts: %.9f", self.name, self.timestamp_remote) 439 self._update_timer = threading.Timer(random.random() * 2., self._request_remote_state, args=(self._apply_remote_state,)) 440 self._update_timer.start() 441 except: 442 rospy.logerr("SyncThread[%s] ERROR: %s", self.name, traceback.format_exc()) 443 finally: 444 socket.setdefaulttimeout(None) 445 rospy.loginfo("SyncThread[%s] remote state applied.", self.name)
446
447 - def _unreg_on_finish(self):
448 with self.__lock_info: 449 self.__unregistered = True 450 try: 451 rospy.logdebug(" SyncThread[%s] clear all registrations", self.name) 452 socket.setdefaulttimeout(5) 453 own_master = xmlrpclib.ServerProxy(self.masteruri_local) 454 own_master_multi = xmlrpclib.MultiCall(own_master) 455 # end routine if the master was removed 456 for topic, node, uri in self.__subscriber: 457 own_master_multi.unregisterSubscriber(node, topic, uri) 458 for topic, node, uri in self.__publisher: 459 own_master_multi.unregisterPublisher(node, topic, uri) 460 for service, serviceuri, node, uri in self.__services: 461 own_master_multi.unregisterService(node, service, serviceuri) 462 rospy.logdebug(" SyncThread[%s] execute a MultiCall", self.name) 463 _ = own_master_multi() 464 rospy.logdebug(" SyncThread[%s] finished", self.name) 465 except: 466 rospy.logerr("SyncThread[%s] ERROR while ending: %s", self.name, traceback.format_exc()) 467 socket.setdefaulttimeout(None)
468
469 - def _do_ignore_ntp(self, node, topic, topictype):
470 return self._filter.is_ignored_publisher(node, topic, topictype)
471
472 - def _do_ignore_nts(self, node, topic, topictype):
473 return self._filter.is_ignored_subscriber(node, topic, topictype)
474
475 - def _do_ignore_ns(self, node, service):
476 return self._filter.is_ignored_service(node, service)
477
478 - def _get_topictype(self, topic, topic_types):
479 for (topicname, topic_type) in topic_types: 480 if (topicname == topic): 481 return topic_type.replace('None', '') 482 return None
483
484 - def _get_nodeuri(self, node, nodes, remote_masteruri):
485 for (nodename, uri, masteruri, pid, local) in nodes: 486 if (nodename == node) and ((self._filter.sync_remote_nodes() and masteruri == remote_masteruri) or local == 'local'): 487 # the node was registered originally to another ROS master -> do sync 488 if masteruri != self.masteruri_local: 489 return uri 490 return None
491
492 - def _get_serviceuri(self, service, nodes, remote_masteruri):
493 for (servicename, uri, masteruri, topic_type, local) in nodes: 494 if (servicename == service) and ((self._filter.sync_remote_nodes() and masteruri == remote_masteruri) or local == 'local'): 495 if masteruri != self.masteruri_local: 496 return uri 497 return None
498