Package node_manager_fkie :: Module discovery_listener
[frames] | no frames]

Source Code for Module node_manager_fkie.discovery_listener

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2012, Fraunhofer FKIE/US, Alexander Tiderko 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Fraunhofer nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32   
 33  from python_qt_binding.QtCore import QObject, Signal 
 34  import socket 
 35  import threading 
 36  import time 
 37   
 38  import rospy 
 39   
 40  from master_discovery_fkie.master_monitor import MasterMonitor, MasterConnectionException 
 41  import master_discovery_fkie.interface_finder as interface_finder 
 42   
 43   
 44  try: 
 45      import std_srvs.srv 
 46      from multimaster_msgs_fkie.msg import LinkStatesStamped, MasterState, ROSMaster  # , LinkState, SyncMasterInfo, SyncTopicInfo 
 47      from multimaster_msgs_fkie.srv import DiscoverMasters  # , GetSyncInfo 
 48  except ImportError, e: 
 49      import sys 
 50      print >> sys.stderr, "Can't import messages and services of multimaster_msgs_fkie. Is multimaster_msgs_fkie package compiled?" 
 51      raise ImportError(str(e)) 
 52   
 53   
54 -class MasterListService(QObject):
55 ''' 56 A class to retrieve the ROS master list from a ROS service. The service 57 will be determine using U{master_discovery_fkie.interface_finder.get_listmaster_service()<http://docs.ros.org/kinetic/api/master_discovery_fkie/html/modules.html#interface-finder-module>} 58 59 ''' 60 masterlist_signal = Signal(str, str, list) 61 '''@ivar: a signal with a list of the masters retrieved from the master_discovery service 'list_masters'. 62 ParameterB{:} C{masteruri}, C{service name}, C{[U{master_discovery_fkie.msg.ROSMaster<http://docs.ros.org/api/multimaster_msgs_fkie/html/msg/ROSMaster.html>}, ...]}''' 63 masterlist_err_signal = Signal(str, str) 64 '''@ivar: this signal is emitted if an error while calling #list_masters' 65 service of master_discovery is failed. 66 ParameterB{:} C{masteruri}, C{error}''' 67
68 - def __init__(self):
69 QObject.__init__(self) 70 self.__serviceThreads = {} 71 self.__refreshThreads = {} 72 self._lock = threading.RLock()
73
74 - def stop(self):
75 print " Shutdown discovery listener..." 76 for _, thread in self.__serviceThreads.iteritems(): 77 thread.join(3) 78 for _, thread in self.__refreshThreads.iteritems(): 79 thread.join(3) 80 print " Discovery listener is off!"
81
82 - def retrieveMasterList(self, masteruri, wait=False):
83 ''' 84 This method use the service 'list_masters' of the master_discovery to get 85 the list of discovered ROS master. The retrieved list will be emitted as 86 masterlist_signal. 87 @param masteruri: the ROS master URI 88 @type masteruri: C{str} 89 @param wait: wait for the service 90 @type wait: C{boolean} 91 ''' 92 with self._lock: 93 if masteruri not in self.__serviceThreads: 94 upthread = MasterListThread(masteruri, wait) 95 upthread.master_list_signal.connect(self._on_master_list) 96 upthread.err_signal.connect(self._on_err) 97 self.__serviceThreads[masteruri] = upthread 98 upthread.start()
99
100 - def refresh(self, masteruri, wait=False):
101 ''' 102 This method use the service 'refresh' of the master_discovery to refresh the 103 discovered masters. 104 @param masteruri: the ROS master URI 105 @type masteruri: C{str} 106 @param wait: wait for the service 107 @type wait: C{boolean} 108 ''' 109 with self._lock: 110 if masteruri not in self.__refreshThreads: 111 upthread = MasterRefreshThread(masteruri, wait) 112 upthread.ok_signal.connect(self._on_ok) 113 upthread.err_signal.connect(self._on_err) 114 self.__refreshThreads[masteruri] = upthread 115 upthread.start()
116
117 - def _on_master_list(self, masteruri, service_name, items):
118 with self._lock: 119 try: 120 thread = self.__serviceThreads.pop(masteruri) 121 del thread 122 except KeyError: 123 pass 124 self.masterlist_signal.emit(masteruri, service_name, items)
125
126 - def _on_err(self, masteruri, msg, on_refresh):
127 with self._lock: 128 try: 129 if on_refresh: 130 thread = self.__refreshThreads.pop(masteruri) 131 del thread 132 else: 133 thread = self.__serviceThreads.pop(masteruri) 134 del thread 135 except KeyError: 136 pass 137 self.masterlist_err_signal.emit(masteruri, msg)
138
139 - def _on_ok(self, masteruri):
140 with self._lock: 141 try: 142 thread = self.__refreshThreads.pop(masteruri) 143 del thread 144 except KeyError: 145 pass
146 147
148 -class MasterListThread(QObject, threading.Thread):
149 ''' 150 A thread to to retrieve the list of discovered ROS master from master_discovery 151 service and publish it by sending a QT signal. 152 ''' 153 master_list_signal = Signal(str, str, list) 154 err_signal = Signal(str, str, bool) 155
156 - def __init__(self, masteruri, wait, parent=None):
157 QObject.__init__(self) 158 threading.Thread.__init__(self) 159 self._masteruri = masteruri 160 self._wait = wait 161 self.setDaemon(True)
162
163 - def run(self):
164 ''' 165 ''' 166 if self._masteruri: 167 found = False 168 service_names = interface_finder.get_listmaster_service(self._masteruri, self._wait) 169 err_msg = '' 170 for service_name in service_names: 171 rospy.logdebug("service 'list_masters' found on %s as %s", self._masteruri, service_name) 172 if self._wait: 173 rospy.wait_for_service(service_name) 174 socket.setdefaulttimeout(3) 175 discoverMasters = rospy.ServiceProxy(service_name, DiscoverMasters) 176 try: 177 resp = discoverMasters() 178 except rospy.ServiceException, e: 179 rospy.logwarn("Service call 'list_masters' failed: %s", str(e)) 180 self.err_signal.emit(self._masteruri, "Service call '%s' failed: %s" % (service_name, err_msg), False) 181 else: 182 if resp.masters: 183 self.master_list_signal.emit(self._masteruri, service_name, resp.masters) 184 found = True 185 else: 186 self.err_signal.emit(self._masteruri, "local 'master_discovery' reports empty master list, it seems he has a problem", False) 187 finally: 188 socket.setdefaulttimeout(None) 189 if not found: 190 self.err_signal.emit(self._masteruri, "no service 'list_masters' found on %s" % self._masteruri, False)
191 192
193 -class MasterRefreshThread(QObject, threading.Thread):
194 ''' 195 A thread to call the refresh service of master discovery. 196 ''' 197 ok_signal = Signal(str) 198 err_signal = Signal(str, str, bool) 199
200 - def __init__(self, masteruri, wait, parent=None):
201 QObject.__init__(self) 202 threading.Thread.__init__(self) 203 self._masteruri = masteruri 204 self._wait = wait 205 self.setDaemon(True)
206
207 - def run(self):
208 ''' 209 ''' 210 if self._masteruri: 211 service_names = interface_finder.get_refresh_service(self._masteruri, self._wait) 212 err_msg = '' 213 for service_name in service_names: 214 rospy.logdebug("service 'refresh' found on %s as %s", self._masteruri, service_name) 215 if self._wait: 216 rospy.wait_for_service(service_name) 217 socket.setdefaulttimeout(3) 218 refreshMasters = rospy.ServiceProxy(service_name, std_srvs.srv.Empty) 219 try: 220 _ = refreshMasters() 221 self.ok_signal.emit(self._masteruri) 222 except rospy.ServiceException, e: 223 rospy.logwarn("ERROR Service call 'refresh' failed: %s", str(e)) 224 self.err_signal.emit(self._masteruri, "ERROR Service call 'refresh' failed: %s" % err_msg, True) 225 finally: 226 socket.setdefaulttimeout(None)
227 228
229 -class MasterStateTopic(QObject):
230 ''' 231 A class to receive the ROS master state updates from a ROS topic. The topic 232 will be determine using U{master_discovery_fkie.interface_finder.get_changes_topic()<http://docs.ros.org/kinetic/api/master_discovery_fkie/html/modules.html#interface-finder-module>}. 233 ''' 234 state_signal = Signal(MasterState) 235 '''@ivar: a signal to inform the receiver about new master state. 236 Parameter: U{master_discovery_fkie.msg.MasterState<http://docs.ros.org/api/multimaster_msgs_fkie/html/msg/MasterState.html>}''' 237
238 - def registerByROS(self, masteruri, wait=False):
239 ''' 240 This method creates a ROS subscriber to received the notifications of ROS 241 master updates. The retrieved messages will be emitted as state_signal. 242 @param masteruri: the ROS master URI 243 @type masteruri: C{str} 244 @param wait: wait for the topic 245 @type wait: C{boolean} 246 @return: the topic name or an empty string 247 ''' 248 result = '' 249 topic_names = interface_finder.get_changes_topic(masteruri, wait) 250 self.stop() 251 self.sub_changes = [] 252 for topic_name in topic_names: 253 rospy.loginfo("listen for updates on %s", topic_name) 254 sub_changes = rospy.Subscriber(topic_name, MasterState, self.handlerMasterStateMsg) 255 self.sub_changes.append(sub_changes) 256 result = topic_name 257 return result
258
259 - def stop(self):
260 ''' 261 Unregister the subscribed topics 262 ''' 263 if hasattr(self, 'sub_changes'): 264 for s in self.sub_changes: 265 try: 266 s.unregister() 267 except Exception as e: 268 rospy.logwarn("Error while unregister master state topic %s" % e) 269 del self.sub_changes
270
271 - def handlerMasterStateMsg(self, msg):
272 ''' 273 The method to handle the received MasterState messages. The received message 274 will be emitted as state_signal. 275 @param msg: the received message 276 @type msg: U{master_discovery_fkie.msg.MasterState<http://docs.ros.org/api/multimaster_msgs_fkie/html/msg/MasterState.html>} 277 ''' 278 self.state_signal.emit(msg)
279 280
281 -class MasterStatisticTopic(QObject):
282 ''' 283 A class to receive the connections statistics from a ROS topic. The topic 284 will be determine using U{master_discovery_fkie.interface_finder.get_stats_topic()<http://docs.ros.org/kinetic/api/master_discovery_fkie/html/modules.html#interface-finder-module>} 285 ''' 286 stats_signal = Signal(LinkStatesStamped) 287 '''@ivar: a signal with a list of link states to discovered ROS masters. 288 Paramter: U{master_discovery_fkie.msg.LinkStatesStamped<http://docs.ros.org/api/multimaster_msgs_fkie/html/msg/LinkStatesStamped.html>}''' 289
290 - def registerByROS(self, masteruri, wait=False):
291 ''' 292 This method creates a ROS subscriber to received the notifications of 293 connection updates. The retrieved messages will be emitted as stats_signal. 294 @param masteruri: the ROS master URI 295 @type masteruri: str 296 @param wait: wait for the topic 297 @type wait: boolean 298 @return: the topic name or an empty string 299 ''' 300 result = '' 301 self.stop() 302 self.sub_stats = [] 303 topic_names = interface_finder.get_stats_topic(masteruri, wait) 304 for topic_name in topic_names: 305 rospy.loginfo("listen for connection statistics on %s", topic_name) 306 sub_stats = rospy.Subscriber(topic_name, LinkStatesStamped, self.handlerMasterStatsMsg) 307 self.sub_stats.append(sub_stats) 308 result = topic_name 309 return result
310
311 - def stop(self):
312 ''' 313 Unregister the subscribed topics. 314 ''' 315 if hasattr(self, 'sub_stats'): 316 for s in self.sub_stats: 317 s.unregister() 318 del self.sub_stats
319
320 - def handlerMasterStatsMsg(self, msg):
321 ''' 322 The method to handle the received LinkStatesStamped messages. The received 323 message will be emitted as stats_signal. 324 @param msg: the received message 325 @type msg: U{master_discovery_fkie.msg.LinkStatesStamped<http://docs.ros.org/api/multimaster_msgs_fkie/html/msg/LinkStatesStamped.html>} 326 ''' 327 self.stats_signal.emit(msg)
328 329
330 -class OwnMasterMonitoring(QObject):
331 ''' 332 A class to monitor the state of the master. Will be used, if no master 333 discovering is available. On changes the 'state_signal' of type 334 U{master_discovery_fkie.msg.MasterState<http://docs.ros.org/api/multimaster_msgs_fkie/html/msg/MasterState.html>} will be emitted. 335 ''' 336 state_signal = Signal(MasterState) 337 '''@ivar: a signal to inform the receiver about new master state. 338 Parameter: U{master_discovery_fkie.msg.MasterState<http://docs.ros.org/api/multimaster_msgs_fkie/html/msg/MasterState.html>}''' 339 340 err_signal = Signal(str) 341 '''@ivar: a signal to inform about an error. 342 Parameter: L{str}''' 343 344 ROSMASTER_HZ = 1 345 '''@ivar: the rate to test ROS master for changes.''' 346
347 - def init(self, monitor_port):
348 ''' 349 Creates the local monitoring. Call start() to run the local monitoring. 350 @param monitor_port: the port of the XML-RPC Server created by monitoring class. 351 @type monitor_port: C{int} 352 ''' 353 self._master_monitor = MasterMonitor(monitor_port, False) 354 self._do_pause = True 355 self._do_finish = False 356 self._masteruri = self._master_monitor.getMasteruri() 357 self._local_addr = self._master_monitor.getMastername() 358 self._masterMonitorThread = threading.Thread(target=self.mastermonitor_loop) 359 self._masterMonitorThread.setDaemon(True) 360 self._masterMonitorThread.start() 361 self._last_error = (time.time(), None)
362
363 - def is_running(self):
364 return not self._do_pause
365
366 - def stop(self):
367 ''' 368 Stop the local master monitoring 369 ''' 370 self._do_finish = True 371 if self._master_monitor.is_running(): 372 print " Shutdown the local master monitoring..." 373 self._masterMonitorThread.join(15) 374 self._master_monitor.shutdown() 375 print " Local master monitoring is off!"
376
377 - def mastermonitor_loop(self):
378 ''' 379 The method test periodically the state of the ROS master. The new state will 380 be published as 'state_signal'. 381 ''' 382 import os 383 current_check_hz = OwnMasterMonitoring.ROSMASTER_HZ 384 while (not rospy.is_shutdown() and not self._do_finish): 385 try: 386 if not self._do_pause: 387 cputimes = os.times() 388 cputime_init = cputimes[0] + cputimes[1] 389 if self._master_monitor.checkState(): 390 mon_state = self._master_monitor.getCurrentState() 391 # publish the new state 392 state = MasterState(MasterState.STATE_CHANGED, 393 ROSMaster(str(self._local_addr), 394 str(self._masteruri), 395 mon_state.timestamp, 396 mon_state.timestamp_local, 397 True, 398 rospy.get_name(), 399 ''.join(['http://localhost:', str(self._master_monitor.rpcport)]))) 400 self.state_signal.emit(state) 401 # adapt the check rate to the CPU usage time 402 cputimes = os.times() 403 cputime = cputimes[0] + cputimes[1] - cputime_init 404 if current_check_hz * cputime > 0.20: 405 current_check_hz = float(current_check_hz) / 2.0 406 elif current_check_hz * cputime < 0.10 and current_check_hz < OwnMasterMonitoring.ROSMASTER_HZ: 407 current_check_hz = float(current_check_hz) * 2.0 408 except MasterConnectionException, mce: 409 self._handle_exception("MasterConnectionException while master check loop", mce) 410 except RuntimeError, ree: 411 # will thrown on exit of the app while try to emit the signal 412 self._handle_exception("RuntimeError while master check loop", ree) 413 if not rospy.is_shutdown() and not self._do_finish: 414 time.sleep(1.0 / current_check_hz)
415
416 - def _handle_exception(self, prefix, exception):
417 text = '%s: %s' % (prefix, exception) 418 if self._last_error[1] != text or time.time() - self._last_error[0] > 60: 419 self._last_error = (time.time(), text) 420 rospy.logwarn(text) 421 self.err_signal.emit(text)
422
423 - def pause(self, state):
424 ''' 425 Sets the local monitoring to pause. 426 @param state: On/Off pause 427 @type state: C{boolean} 428 ''' 429 if not state and self._do_pause != state: 430 self._master_monitor.reset() 431 self._do_pause = state
432
433 - def isPaused(self):
434 ''' 435 @return: True if the local monitoring of the Master state is paused. 436 @rtype: C{boolean} 437 ''' 438 return self._do_pause
439