Package node_manager_fkie :: Module discovery_listener
[frames] | no frames]

Source Code for Module node_manager_fkie.discovery_listener

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2012, Fraunhofer FKIE/US, Alexander Tiderko 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Fraunhofer nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32   
 33  import threading 
 34  import time 
 35  import socket 
 36   
 37  from python_qt_binding import QtCore 
 38   
 39  import roslib; roslib.load_manifest('node_manager_fkie') 
 40  import rospy 
 41   
 42  try: 
 43    from multimaster_msgs_fkie.msg import LinkStatesStamped, MasterState, ROSMaster#, LinkState, SyncMasterInfo, SyncTopicInfo 
 44    from multimaster_msgs_fkie.srv import DiscoverMasters#, GetSyncInfo 
 45  except ImportError, e: 
 46    import sys 
 47    print >> sys.stderr, "Can't import massages and services of multimaster_msgs_fkie. Is multimaster_msgs_fkie package compiled?" 
 48    raise ImportError(str(e)) 
 49   
 50  import master_discovery_fkie.interface_finder as interface_finder 
 51  from master_discovery_fkie.master_monitor import MasterMonitor, MasterConnectionException 
 52   
 53   
54 -class MasterListService(QtCore.QObject):
55 ''' 56 A class to retrieve the ROS master list from a ROS service. The service 57 will be determine using L{master_discovery_fkie.interface_finder.get_listmaster_service()} 58 59 ''' 60 masterlist_signal = QtCore.Signal(str, str, list) 61 '''@ivar: a signal with a list of the masters retrieved from the master_discovery service 'list_masters'. 62 ParameterB{:} C{masteruri}, C{service name}, C{[L{master_discovery_fkie.ROSMaster}, ...]}''' 63 masterlist_err_signal = QtCore.Signal(str, str) 64 '''@ivar: this signal is emitted if an error while calling #list_masters' 65 service of master_discovery is failed. 66 ParameterB{:} C{masteruri}, C{error}''' 67
68 - def __init__(self):
69 QtCore.QObject.__init__(self) 70 self.__serviceThreads = {} 71 self._lock = threading.RLock()
72
73 - def stop(self):
74 print " Shutdown discovery listener..." 75 for key, thread in self.__serviceThreads.iteritems(): 76 thread.join(3) 77 print " Discovery listener is off!"
78
79 - def retrieveMasterList(self, masteruri, wait=False):
80 ''' 81 This method use the service 'list_masters' of the master_discovery to get 82 the list of discovered ROS master. The retrieved list will be emitted as 83 masterlist_signal. 84 @param masteruri: the ROS master URI 85 @type masteruri: C{str} 86 @param wait: wait for the service 87 @type wait: C{boolean} 88 ''' 89 with self._lock: 90 if not (self.__serviceThreads.has_key(masteruri)): 91 upthread = MasterListThread(masteruri, wait) 92 upthread.master_list_signal.connect(self._on_master_list) 93 upthread.err_signal.connect(self._on_err) 94 self.__serviceThreads[masteruri] = upthread 95 upthread.start()
96
97 - def _on_master_list(self, masteruri, service_name, items):
98 with self._lock: 99 try: 100 thread = self.__serviceThreads.pop(masteruri) 101 del thread 102 except KeyError: 103 pass 104 self.masterlist_signal.emit(masteruri, service_name, items)
105
106 - def _on_err(self, masteruri, str):
107 with self._lock: 108 try: 109 thread = self.__serviceThreads.pop(masteruri) 110 del thread 111 except KeyError: 112 pass 113 self.masterlist_err_signal.emit(masteruri, str)
114 115 116
117 -class MasterListThread(QtCore.QObject, threading.Thread):
118 ''' 119 A thread to to retrieve the list of discovered ROS master from master_discovery 120 service and publish it by sending a QT signal. 121 ''' 122 master_list_signal = QtCore.Signal(str, str, list) 123 err_signal = QtCore.Signal(str, str) 124
125 - def __init__(self, masteruri, wait, parent=None):
126 QtCore.QObject.__init__(self) 127 threading.Thread.__init__(self) 128 self._masteruri = masteruri 129 self._wait = wait 130 self.setDaemon(True)
131
132 - def run(self):
133 ''' 134 ''' 135 if self._masteruri: 136 found = False 137 service_names = interface_finder.get_listmaster_service(self._masteruri, self._wait) 138 err_msg = '' 139 for service_name in service_names: 140 rospy.loginfo("service 'list_masters' found on %s as %s", self._masteruri, service_name) 141 if self._wait: 142 rospy.wait_for_service(service_name) 143 socket.setdefaulttimeout(3) 144 discoverMasters = rospy.ServiceProxy(service_name, DiscoverMasters) 145 try: 146 resp = discoverMasters() 147 except rospy.ServiceException, e: 148 rospy.logwarn("ERROR Service call 'list_masters' failed: %s", str(e)) 149 err_msg = ''.join([err_msg, '\n', service_name, ': ', str(e)]) 150 else: 151 self.master_list_signal.emit(self._masteruri, service_name, resp.masters) 152 if resp.masters: 153 found = True 154 finally: 155 socket.setdefaulttimeout(None) 156 if not found: 157 self.err_signal.emit(self._masteruri, "ERROR Service call 'list_masters' failed: %s"%err_msg)
158 159 160
161 -class MasterStateTopic(QtCore.QObject):
162 ''' 163 A class to receive the ROS master state updates from a ROS topic. The topic 164 will be determine using L{master_discovery_fkie.interface_finder.get_changes_topic()}. 165 ''' 166 state_signal = QtCore.Signal(MasterState) 167 '''@ivar: a signal to inform the receiver about new master state. 168 Parameter: L{master_discovery_fkie.msg.MasterState}''' 169
170 - def registerByROS(self, masteruri, wait=False):
171 ''' 172 This method creates a ROS subscriber to received the notifications of ROS 173 master updates. The retrieved messages will be emitted as state_signal. 174 @param masteruri: the ROS master URI 175 @type masteruri: C{str} 176 @param wait: wait for the topic 177 @type wait: C{boolean} 178 ''' 179 found = False 180 topic_names = interface_finder.get_changes_topic(masteruri, wait) 181 self.stop() 182 self.sub_changes = [] 183 for topic_name in topic_names: 184 rospy.loginfo("listen for updates on %s", topic_name) 185 sub_changes = rospy.Subscriber(topic_name, MasterState, self.handlerMasterStateMsg) 186 self.sub_changes.append(sub_changes) 187 found = True 188 return found
189
190 - def stop(self):
191 ''' 192 Unregister the subscribed topics 193 ''' 194 if hasattr(self, 'sub_changes'): 195 for s in self.sub_changes: 196 try: 197 s.unregister() 198 except Exceptio as e: 199 rospy.logwarn("Error while unregister master state topic %s"%e) 200 del self.sub_changes
201
202 - def handlerMasterStateMsg(self, msg):
203 ''' 204 The method to handle the received MasterState messages. The received message 205 will be emitted as state_signal. 206 @param msg: the received message 207 @type msg: L{master_discovery_fkie.MasterState} 208 ''' 209 self.state_signal.emit(msg)
210 211
212 -class MasterStatisticTopic(QtCore.QObject):
213 ''' 214 A class to receive the connections statistics from a ROS topic. The topic 215 will be determine using L{master_discovery_fkie.interface_finder.get_stats_topic()} 216 ''' 217 stats_signal = QtCore.Signal(LinkStatesStamped) 218 '''@ivar: a signal with a list of link states to discovered ROS masters. 219 Paramter: L{master_discovery_fkie.msg.LinkStatesStamped}''' 220
221 - def registerByROS(self, masteruri, wait=False):
222 ''' 223 This method creates a ROS subscriber to received the notifications of 224 connection updates. The retrieved messages will be emitted as stats_signal. 225 @param masteruri: the ROS master URI 226 @type masteruri: str 227 @param wait: wait for the topic 228 @type wait: boolean 229 ''' 230 found = False 231 self.stop() 232 self.sub_stats = [] 233 topic_names = interface_finder.get_stats_topic(masteruri, wait) 234 for topic_name in topic_names: 235 rospy.loginfo("listen for connection statistics on %s", topic_name) 236 sub_stats = rospy.Subscriber(topic_name, LinkStatesStamped, self.handlerMasterStatsMsg) 237 self.sub_stats.append(sub_stats) 238 found = True 239 return found
240
241 - def stop(self):
242 ''' 243 Unregister the subscribed topics. 244 ''' 245 if hasattr(self, 'sub_stats'): 246 for s in self.sub_stats: 247 s.unregister() 248 del self.sub_stats
249
250 - def handlerMasterStatsMsg(self, msg):
251 ''' 252 The method to handle the received LinkStatesStamped messages. The received 253 message will be emitted as stats_signal. 254 @param msg: the received message 255 @type msg: L{master_discovery_fkie.LinkStatesStamped} 256 ''' 257 self.stats_signal.emit(msg)
258 259
260 -class OwnMasterMonitoring(QtCore.QObject):
261 ''' 262 A class to monitor the state of the master. Will be used, if no master 263 discovering is available. On changes the 'state_signal' of type 264 L{master_discovery_fkie.msg.MasterState} will be emitted. 265 ''' 266 state_signal = QtCore.Signal(MasterState) 267 '''@ivar: a signal to inform the receiver about new master state. 268 Parameter: L{master_discovery_fkie.msg.MasterState}''' 269 270 err_signal = QtCore.Signal(str) 271 '''@ivar: a signal to inform about an error. 272 Parameter: L{str}''' 273 274 ROSMASTER_HZ = 1 275 '''@ivar: the rate to test ROS master for changes.''' 276
277 - def init(self, monitor_port):
278 ''' 279 Creates the local monitoring. Call start() to run the local monitoring. 280 @param monitor_port: the port of the XML-RPC Server created by monitoring class. 281 @type monitor_port: C{int} 282 ''' 283 self._master_monitor = MasterMonitor(monitor_port, False) 284 self._do_pause = True 285 self._do_finish = False 286 # self._local_addr = roslib.network.get_local_address() 287 # self._masteruri = roslib.rosenv.get_master_uri() 288 self._masteruri = self._master_monitor.getMasteruri() 289 self._local_addr = self._master_monitor.getMastername() 290 self._masterMonitorThread = threading.Thread(target = self.mastermonitor_loop) 291 self._masterMonitorThread.setDaemon(True) 292 self._masterMonitorThread.start()
293
294 - def stop(self):
295 ''' 296 Stop the local master monitoring 297 ''' 298 print " Shutdown the local master monitoring..." 299 self._do_finish = True 300 self._masterMonitorThread.join(15) 301 self._master_monitor.shutdown() 302 print " Local master monitoring is off!"
303
304 - def mastermonitor_loop(self):
305 ''' 306 The method test periodically the state of the ROS master. The new state will 307 be published as 'state_signal'. 308 ''' 309 import os 310 current_check_hz = OwnMasterMonitoring.ROSMASTER_HZ 311 while (not rospy.is_shutdown() and not self._do_finish): 312 try: 313 if not self._do_pause: 314 cputimes = os.times() 315 cputime_init = cputimes[0] + cputimes[1] 316 if self._master_monitor.checkState(): 317 mon_state = self._master_monitor.getCurrentState() 318 # publish the new state 319 state = MasterState(MasterState.STATE_CHANGED, 320 ROSMaster(str(self._local_addr), 321 str(self._masteruri), 322 mon_state.timestamp, 323 mon_state.timestamp_local, 324 True, 325 rospy.get_name(), 326 ''.join(['http://localhost:',str(self._master_monitor.rpcport)]))) 327 self.state_signal.emit(state) 328 # adapt the check rate to the CPU usage time 329 cputimes = os.times() 330 cputime = cputimes[0] + cputimes[1] - cputime_init 331 if current_check_hz*cputime > 0.20: 332 current_check_hz = float(current_check_hz)/2.0 333 elif current_check_hz*cputime < 0.10 and current_check_hz < OwnMasterMonitoring.ROSMASTER_HZ: 334 current_check_hz = float(current_check_hz)*2.0 335 except MasterConnectionException, e: 336 rospy.logwarn("MasterConnectionError while master check loop: %s"%e) 337 self.err_signal.emit("Error while master check loop: %s"%e) 338 except RuntimeError, e: 339 # will thrown on exit of the app while try to emit the signal 340 rospy.logwarn("RuntimeError while master check loop: %s"%e) 341 self.err_signal.emit("Error while master check loop: %s"%e) 342 if not rospy.is_shutdown() and not self._do_finish: 343 time.sleep(1.0/current_check_hz)
344
345 - def pause(self, state):
346 ''' 347 Sets the local monitoring to pause. 348 @param state: On/Off pause 349 @type state: C{boolean} 350 ''' 351 if not state and self._do_pause != state: 352 self._master_monitor.reset() 353 self._do_pause = state
354
355 - def isPaused(self):
356 ''' 357 @return: True if the local monitoring of the Master state is paused. 358 @rtype: C{boolean} 359 ''' 360 return self._do_pause
361