Package node_manager_fkie :: Module discovery_listener
[frames] | no frames]

Source Code for Module node_manager_fkie.discovery_listener

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2012, Fraunhofer FKIE/US, Alexander Tiderko 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Fraunhofer nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32   
 33  from python_qt_binding.QtCore import QObject, Signal 
 34  import socket 
 35  import threading 
 36  import time 
 37   
 38  import rospy 
 39   
 40  from master_discovery_fkie.master_monitor import MasterMonitor, MasterConnectionException 
 41  import master_discovery_fkie.interface_finder as interface_finder 
 42   
 43  from node_manager_fkie.common import utf8 
 44   
 45  try: 
 46      import std_srvs.srv 
 47      from multimaster_msgs_fkie.msg import LinkStatesStamped, MasterState, ROSMaster  # , LinkState, SyncMasterInfo, SyncTopicInfo 
 48      from multimaster_msgs_fkie.srv import DiscoverMasters  # , GetSyncInfo 
 49  except ImportError, e: 
 50      import sys 
 51      print >> sys.stderr, "Can't import messages and services of multimaster_msgs_fkie. Is multimaster_msgs_fkie package compiled?" 
 52      raise ImportError(utf8(e)) 
 53   
 54   
55 -class MasterListService(QObject):
56 ''' 57 A class to retrieve the ROS master list from a ROS service. The service 58 will be determine using U{master_discovery_fkie.interface_finder.get_listmaster_service()<http://docs.ros.org/kinetic/api/master_discovery_fkie/html/modules.html#interface-finder-module>} 59 60 ''' 61 masterlist_signal = Signal(str, str, list) 62 '''@ivar: a signal with a list of the masters retrieved from the master_discovery service 'list_masters'. 63 ParameterB{:} C{masteruri}, C{service name}, C{[U{master_discovery_fkie.msg.ROSMaster<http://docs.ros.org/api/multimaster_msgs_fkie/html/msg/ROSMaster.html>}, ...]}''' 64 masterlist_err_signal = Signal(str, str) 65 '''@ivar: this signal is emitted if an error while calling #list_masters' 66 service of master_discovery is failed. 67 ParameterB{:} C{masteruri}, C{error}''' 68
69 - def __init__(self):
70 QObject.__init__(self) 71 self.__serviceThreads = {} 72 self.__refreshThreads = {} 73 self._lock = threading.RLock()
74
75 - def stop(self):
76 print " Shutdown discovery listener..." 77 for _, thread in self.__serviceThreads.iteritems(): 78 thread.join(3) 79 for _, thread in self.__refreshThreads.iteritems(): 80 thread.join(3) 81 print " Discovery listener is off!"
82
83 - def retrieveMasterList(self, masteruri, wait=False):
84 ''' 85 This method use the service 'list_masters' of the master_discovery to get 86 the list of discovered ROS master. The retrieved list will be emitted as 87 masterlist_signal. 88 @param masteruri: the ROS master URI 89 @type masteruri: C{str} 90 @param wait: wait for the service 91 @type wait: C{boolean} 92 ''' 93 with self._lock: 94 if masteruri not in self.__serviceThreads: 95 upthread = MasterListThread(masteruri, wait) 96 upthread.master_list_signal.connect(self._on_master_list) 97 upthread.err_signal.connect(self._on_err) 98 self.__serviceThreads[masteruri] = upthread 99 upthread.start()
100
101 - def refresh(self, masteruri, wait=False):
102 ''' 103 This method use the service 'refresh' of the master_discovery to refresh the 104 discovered masters. 105 @param masteruri: the ROS master URI 106 @type masteruri: C{str} 107 @param wait: wait for the service 108 @type wait: C{boolean} 109 ''' 110 with self._lock: 111 if masteruri not in self.__refreshThreads: 112 upthread = MasterRefreshThread(masteruri, wait) 113 upthread.ok_signal.connect(self._on_ok) 114 upthread.err_signal.connect(self._on_err) 115 self.__refreshThreads[masteruri] = upthread 116 upthread.start()
117
118 - def _on_master_list(self, masteruri, service_name, items):
119 with self._lock: 120 try: 121 thread = self.__serviceThreads.pop(masteruri) 122 del thread 123 except KeyError: 124 pass 125 self.masterlist_signal.emit(masteruri, service_name, items)
126
127 - def _on_err(self, masteruri, msg, on_refresh):
128 with self._lock: 129 try: 130 if on_refresh: 131 thread = self.__refreshThreads.pop(masteruri) 132 del thread 133 else: 134 thread = self.__serviceThreads.pop(masteruri) 135 del thread 136 except KeyError: 137 pass 138 self.masterlist_err_signal.emit(masteruri, msg)
139
140 - def _on_ok(self, masteruri):
141 with self._lock: 142 try: 143 thread = self.__refreshThreads.pop(masteruri) 144 del thread 145 except KeyError: 146 pass
147 148
149 -class MasterListThread(QObject, threading.Thread):
150 ''' 151 A thread to to retrieve the list of discovered ROS master from master_discovery 152 service and publish it by sending a QT signal. 153 ''' 154 master_list_signal = Signal(str, str, list) 155 err_signal = Signal(str, str, bool) 156
157 - def __init__(self, masteruri, wait, parent=None):
158 QObject.__init__(self) 159 threading.Thread.__init__(self) 160 self._masteruri = masteruri 161 self._wait = wait 162 self.setDaemon(True)
163
164 - def run(self):
165 ''' 166 ''' 167 if self._masteruri: 168 found = False 169 service_names = interface_finder.get_listmaster_service(self._masteruri, self._wait) 170 err_msg = '' 171 for service_name in service_names: 172 rospy.logdebug("service 'list_masters' found on %s as %s", self._masteruri, service_name) 173 if self._wait: 174 rospy.wait_for_service(service_name) 175 socket.setdefaulttimeout(3) 176 discoverMasters = rospy.ServiceProxy(service_name, DiscoverMasters) 177 try: 178 resp = discoverMasters() 179 except rospy.ServiceException, e: 180 err_msg = "Service call 'list_masters' failed: %s" % utf8(e) 181 rospy.logwarn(err_msg) 182 self.err_signal.emit(self._masteruri, "Service call '%s' failed: %s" % (service_name, err_msg), False) 183 else: 184 if resp.masters: 185 self.master_list_signal.emit(self._masteruri, service_name, resp.masters) 186 found = True 187 else: 188 self.err_signal.emit(self._masteruri, "local 'master_discovery' reports empty master list, it seems he has a problem", False) 189 finally: 190 socket.setdefaulttimeout(None) 191 if not found: 192 self.err_signal.emit(self._masteruri, "no service 'list_masters' found on %s" % self._masteruri, False)
193 194
195 -class MasterRefreshThread(QObject, threading.Thread):
196 ''' 197 A thread to call the refresh service of master discovery. 198 ''' 199 ok_signal = Signal(str) 200 err_signal = Signal(str, str, bool) 201
202 - def __init__(self, masteruri, wait, parent=None):
203 QObject.__init__(self) 204 threading.Thread.__init__(self) 205 self._masteruri = masteruri 206 self._wait = wait 207 self.setDaemon(True)
208
209 - def run(self):
210 ''' 211 ''' 212 if self._masteruri: 213 service_names = interface_finder.get_refresh_service(self._masteruri, self._wait) 214 err_msg = '' 215 for service_name in service_names: 216 rospy.logdebug("service 'refresh' found on %s as %s", self._masteruri, service_name) 217 if self._wait: 218 rospy.wait_for_service(service_name) 219 socket.setdefaulttimeout(3) 220 refreshMasters = rospy.ServiceProxy(service_name, std_srvs.srv.Empty) 221 try: 222 _ = refreshMasters() 223 self.ok_signal.emit(self._masteruri) 224 except rospy.ServiceException, e: 225 rospy.logwarn("ERROR Service call 'refresh' failed: %s", utf8(e)) 226 self.err_signal.emit(self._masteruri, "ERROR Service call 'refresh' failed: %s" % utf8(err_msg), True) 227 finally: 228 socket.setdefaulttimeout(None)
229 230
231 -class MasterStateTopic(QObject):
232 ''' 233 A class to receive the ROS master state updates from a ROS topic. The topic 234 will be determine using U{master_discovery_fkie.interface_finder.get_changes_topic()<http://docs.ros.org/kinetic/api/master_discovery_fkie/html/modules.html#interface-finder-module>}. 235 ''' 236 state_signal = Signal(MasterState) 237 '''@ivar: a signal to inform the receiver about new master state. 238 Parameter: U{master_discovery_fkie.msg.MasterState<http://docs.ros.org/api/multimaster_msgs_fkie/html/msg/MasterState.html>}''' 239
240 - def registerByROS(self, masteruri, wait=False):
241 ''' 242 This method creates a ROS subscriber to received the notifications of ROS 243 master updates. The retrieved messages will be emitted as state_signal. 244 @param masteruri: the ROS master URI 245 @type masteruri: C{str} 246 @param wait: wait for the topic 247 @type wait: C{boolean} 248 @return: the topic name or an empty string 249 ''' 250 result = '' 251 topic_names = interface_finder.get_changes_topic(masteruri, wait) 252 self.stop() 253 self.sub_changes = [] 254 for topic_name in topic_names: 255 rospy.loginfo("listen for updates on %s", topic_name) 256 sub_changes = rospy.Subscriber(topic_name, MasterState, self.handlerMasterStateMsg) 257 self.sub_changes.append(sub_changes) 258 result = topic_name 259 return result
260
261 - def stop(self):
262 ''' 263 Unregister the subscribed topics 264 ''' 265 if hasattr(self, 'sub_changes'): 266 for s in self.sub_changes: 267 try: 268 s.unregister() 269 except Exception as e: 270 rospy.logwarn("Error while unregister master state topic %s" % e) 271 del self.sub_changes
272
273 - def handlerMasterStateMsg(self, msg):
274 ''' 275 The method to handle the received MasterState messages. The received message 276 will be emitted as state_signal. 277 @param msg: the received message 278 @type msg: U{master_discovery_fkie.msg.MasterState<http://docs.ros.org/api/multimaster_msgs_fkie/html/msg/MasterState.html>} 279 ''' 280 self.state_signal.emit(msg)
281 282
283 -class MasterStatisticTopic(QObject):
284 ''' 285 A class to receive the connections statistics from a ROS topic. The topic 286 will be determine using U{master_discovery_fkie.interface_finder.get_stats_topic()<http://docs.ros.org/kinetic/api/master_discovery_fkie/html/modules.html#interface-finder-module>} 287 ''' 288 stats_signal = Signal(LinkStatesStamped) 289 '''@ivar: a signal with a list of link states to discovered ROS masters. 290 Paramter: U{master_discovery_fkie.msg.LinkStatesStamped<http://docs.ros.org/api/multimaster_msgs_fkie/html/msg/LinkStatesStamped.html>}''' 291
292 - def registerByROS(self, masteruri, wait=False):
293 ''' 294 This method creates a ROS subscriber to received the notifications of 295 connection updates. The retrieved messages will be emitted as stats_signal. 296 @param masteruri: the ROS master URI 297 @type masteruri: str 298 @param wait: wait for the topic 299 @type wait: boolean 300 @return: the topic name or an empty string 301 ''' 302 result = '' 303 self.stop() 304 self.sub_stats = [] 305 topic_names = interface_finder.get_stats_topic(masteruri, wait) 306 for topic_name in topic_names: 307 rospy.loginfo("listen for connection statistics on %s", topic_name) 308 sub_stats = rospy.Subscriber(topic_name, LinkStatesStamped, self.handlerMasterStatsMsg) 309 self.sub_stats.append(sub_stats) 310 result = topic_name 311 return result
312
313 - def stop(self):
314 ''' 315 Unregister the subscribed topics. 316 ''' 317 if hasattr(self, 'sub_stats'): 318 for s in self.sub_stats: 319 s.unregister() 320 del self.sub_stats
321
322 - def handlerMasterStatsMsg(self, msg):
323 ''' 324 The method to handle the received LinkStatesStamped messages. The received 325 message will be emitted as stats_signal. 326 @param msg: the received message 327 @type msg: U{master_discovery_fkie.msg.LinkStatesStamped<http://docs.ros.org/api/multimaster_msgs_fkie/html/msg/LinkStatesStamped.html>} 328 ''' 329 self.stats_signal.emit(msg)
330 331
332 -class OwnMasterMonitoring(QObject):
333 ''' 334 A class to monitor the state of the master. Will be used, if no master 335 discovering is available. On changes the 'state_signal' of type 336 U{master_discovery_fkie.msg.MasterState<http://docs.ros.org/api/multimaster_msgs_fkie/html/msg/MasterState.html>} will be emitted. 337 ''' 338 state_signal = Signal(MasterState) 339 '''@ivar: a signal to inform the receiver about new master state. 340 Parameter: U{master_discovery_fkie.msg.MasterState<http://docs.ros.org/api/multimaster_msgs_fkie/html/msg/MasterState.html>}''' 341 342 err_signal = Signal(str) 343 '''@ivar: a signal to inform about an error. 344 Parameter: L{str}''' 345 346 ROSMASTER_HZ = 1 347 '''@ivar: the rate to test ROS master for changes.''' 348
349 - def init(self, monitor_port):
350 ''' 351 Creates the local monitoring. Call start() to run the local monitoring. 352 @param monitor_port: the port of the XML-RPC Server created by monitoring class. 353 @type monitor_port: C{int} 354 ''' 355 self._master_monitor = MasterMonitor(monitor_port, False) 356 self._do_pause = True 357 self._do_finish = False 358 self._masteruri = self._master_monitor.getMasteruri() 359 self._local_addr = self._master_monitor.getMastername() 360 self._masterMonitorThread = threading.Thread(target=self.mastermonitor_loop) 361 self._masterMonitorThread.setDaemon(True) 362 self._masterMonitorThread.start() 363 self._last_error = (time.time(), None)
364
365 - def is_running(self):
366 return not self._do_pause
367
368 - def stop(self):
369 ''' 370 Stop the local master monitoring 371 ''' 372 self._do_finish = True 373 if self._master_monitor.is_running(): 374 print " Shutdown the local master monitoring..." 375 self._masterMonitorThread.join(15) 376 self._master_monitor.shutdown() 377 print " Local master monitoring is off!"
378
379 - def mastermonitor_loop(self):
380 ''' 381 The method test periodically the state of the ROS master. The new state will 382 be published as 'state_signal'. 383 ''' 384 import os 385 current_check_hz = OwnMasterMonitoring.ROSMASTER_HZ 386 while (not rospy.is_shutdown() and not self._do_finish): 387 try: 388 if not self._do_pause: 389 cputimes = os.times() 390 cputime_init = cputimes[0] + cputimes[1] 391 if self._master_monitor.checkState(): 392 mon_state = self._master_monitor.getCurrentState() 393 # publish the new state 394 state = MasterState(MasterState.STATE_CHANGED, 395 ROSMaster(utf8(self._local_addr), 396 utf8(self._masteruri), 397 mon_state.timestamp, 398 mon_state.timestamp_local, 399 True, 400 rospy.get_name(), 401 ''.join(['http://localhost:', utf8(self._master_monitor.rpcport)]))) 402 self.state_signal.emit(state) 403 # adapt the check rate to the CPU usage time 404 cputimes = os.times() 405 cputime = cputimes[0] + cputimes[1] - cputime_init 406 if current_check_hz * cputime > 0.20: 407 current_check_hz = float(current_check_hz) / 2.0 408 elif current_check_hz * cputime < 0.10 and current_check_hz < OwnMasterMonitoring.ROSMASTER_HZ: 409 current_check_hz = float(current_check_hz) * 2.0 410 except MasterConnectionException, mce: 411 self._handle_exception("MasterConnectionException while master check loop", mce) 412 except RuntimeError, ree: 413 # will thrown on exit of the app while try to emit the signal 414 self._handle_exception("RuntimeError while master check loop", ree) 415 if not rospy.is_shutdown() and not self._do_finish: 416 time.sleep(1.0 / current_check_hz)
417
418 - def _handle_exception(self, prefix, exception):
419 text = '%s: %s' % (prefix, exception) 420 if self._last_error[1] != text or time.time() - self._last_error[0] > 60: 421 self._last_error = (time.time(), text) 422 rospy.logwarn(text) 423 self.err_signal.emit(text)
424
425 - def pause(self, state):
426 ''' 427 Sets the local monitoring to pause. 428 @param state: On/Off pause 429 @type state: C{boolean} 430 ''' 431 if not state and self._do_pause != state: 432 self._master_monitor.reset() 433 self._do_pause = state
434
435 - def isPaused(self):
436 ''' 437 @return: True if the local monitoring of the Master state is paused. 438 @rtype: C{boolean} 439 ''' 440 return self._do_pause
441