1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 import threading
34 import time
35 import socket
36
37 from python_qt_binding import QtCore
38
39 import roslib; roslib.load_manifest('node_manager_fkie')
40 import rospy
41
42 try:
43 from multimaster_msgs_fkie.msg import LinkStatesStamped, MasterState, ROSMaster
44 from multimaster_msgs_fkie.srv import DiscoverMasters
45 except ImportError, e:
46 import sys
47 print >> sys.stderr, "Can't import massages and services of multimaster_msgs_fkie. Is multimaster_msgs_fkie package compiled?"
48 raise ImportError(str(e))
49
50 import master_discovery_fkie.interface_finder as interface_finder
51 from master_discovery_fkie.master_monitor import MasterMonitor, MasterConnectionException
52
53
55 '''
56 A class to retrieve the ROS master list from a ROS service. The service
57 will be determine using L{master_discovery_fkie.interface_finder.get_listmaster_service()}
58
59 '''
60 masterlist_signal = QtCore.Signal(str, str, list)
61 '''@ivar: a signal with a list of the masters retrieved from the master_discovery service 'list_masters'.
62 ParameterB{:} C{masteruri}, C{service name}, C{[L{master_discovery_fkie.ROSMaster}, ...]}'''
63 masterlist_err_signal = QtCore.Signal(str, str)
64 '''@ivar: this signal is emitted if an error while calling #list_masters'
65 service of master_discovery is failed.
66 ParameterB{:} C{masteruri}, C{error}'''
67
69 QtCore.QObject.__init__(self)
70 self.__serviceThreads = {}
71 self._lock = threading.RLock()
72
74 print " Shutdown discovery listener..."
75 for key, thread in self.__serviceThreads.iteritems():
76 thread.join(3)
77 print " Discovery listener is off!"
78
80 '''
81 This method use the service 'list_masters' of the master_discovery to get
82 the list of discovered ROS master. The retrieved list will be emitted as
83 masterlist_signal.
84 @param masteruri: the ROS master URI
85 @type masteruri: C{str}
86 @param wait: wait for the service
87 @type wait: C{boolean}
88 '''
89 with self._lock:
90 if not (self.__serviceThreads.has_key(masteruri)):
91 upthread = MasterListThread(masteruri, wait)
92 upthread.master_list_signal.connect(self._on_master_list)
93 upthread.err_signal.connect(self._on_err)
94 self.__serviceThreads[masteruri] = upthread
95 upthread.start()
96
105
106 - def _on_err(self, masteruri, str):
114
115
116
118 '''
119 A thread to to retrieve the list of discovered ROS master from master_discovery
120 service and publish it by sending a QT signal.
121 '''
122 master_list_signal = QtCore.Signal(str, str, list)
123 err_signal = QtCore.Signal(str, str)
124
125 - def __init__(self, masteruri, wait, parent=None):
126 QtCore.QObject.__init__(self)
127 threading.Thread.__init__(self)
128 self._masteruri = masteruri
129 self._wait = wait
130 self.setDaemon(True)
131
133 '''
134 '''
135 if self._masteruri:
136 found = False
137 service_names = interface_finder.get_listmaster_service(self._masteruri, self._wait)
138 err_msg = ''
139 for service_name in service_names:
140 rospy.loginfo("service 'list_masters' found on %s as %s", self._masteruri, service_name)
141 if self._wait:
142 rospy.wait_for_service(service_name)
143 socket.setdefaulttimeout(3)
144 discoverMasters = rospy.ServiceProxy(service_name, DiscoverMasters)
145 try:
146 resp = discoverMasters()
147 except rospy.ServiceException, e:
148 rospy.logwarn("ERROR Service call 'list_masters' failed: %s", str(e))
149 err_msg = ''.join([err_msg, '\n', service_name, ': ', str(e)])
150 else:
151 self.master_list_signal.emit(self._masteruri, service_name, resp.masters)
152 if resp.masters:
153 found = True
154 finally:
155 socket.setdefaulttimeout(None)
156 if not found:
157 self.err_signal.emit(self._masteruri, "ERROR Service call 'list_masters' failed: %s"%err_msg)
158
159
160
162 '''
163 A class to receive the ROS master state updates from a ROS topic. The topic
164 will be determine using L{master_discovery_fkie.interface_finder.get_changes_topic()}.
165 '''
166 state_signal = QtCore.Signal(MasterState)
167 '''@ivar: a signal to inform the receiver about new master state.
168 Parameter: L{master_discovery_fkie.msg.MasterState}'''
169
171 '''
172 This method creates a ROS subscriber to received the notifications of ROS
173 master updates. The retrieved messages will be emitted as state_signal.
174 @param masteruri: the ROS master URI
175 @type masteruri: C{str}
176 @param wait: wait for the topic
177 @type wait: C{boolean}
178 '''
179 found = False
180 topic_names = interface_finder.get_changes_topic(masteruri, wait)
181 self.stop()
182 self.sub_changes = []
183 for topic_name in topic_names:
184 rospy.loginfo("listen for updates on %s", topic_name)
185 sub_changes = rospy.Subscriber(topic_name, MasterState, self.handlerMasterStateMsg)
186 self.sub_changes.append(sub_changes)
187 found = True
188 return found
189
191 '''
192 Unregister the subscribed topics
193 '''
194 if hasattr(self, 'sub_changes'):
195 for s in self.sub_changes:
196 try:
197 s.unregister()
198 except Exceptio as e:
199 rospy.logwarn("Error while unregister master state topic %s"%e)
200 del self.sub_changes
201
203 '''
204 The method to handle the received MasterState messages. The received message
205 will be emitted as state_signal.
206 @param msg: the received message
207 @type msg: L{master_discovery_fkie.MasterState}
208 '''
209 self.state_signal.emit(msg)
210
211
213 '''
214 A class to receive the connections statistics from a ROS topic. The topic
215 will be determine using L{master_discovery_fkie.interface_finder.get_stats_topic()}
216 '''
217 stats_signal = QtCore.Signal(LinkStatesStamped)
218 '''@ivar: a signal with a list of link states to discovered ROS masters.
219 Paramter: L{master_discovery_fkie.msg.LinkStatesStamped}'''
220
222 '''
223 This method creates a ROS subscriber to received the notifications of
224 connection updates. The retrieved messages will be emitted as stats_signal.
225 @param masteruri: the ROS master URI
226 @type masteruri: str
227 @param wait: wait for the topic
228 @type wait: boolean
229 '''
230 found = False
231 self.stop()
232 self.sub_stats = []
233 topic_names = interface_finder.get_stats_topic(masteruri, wait)
234 for topic_name in topic_names:
235 rospy.loginfo("listen for connection statistics on %s", topic_name)
236 sub_stats = rospy.Subscriber(topic_name, LinkStatesStamped, self.handlerMasterStatsMsg)
237 self.sub_stats.append(sub_stats)
238 found = True
239 return found
240
242 '''
243 Unregister the subscribed topics.
244 '''
245 if hasattr(self, 'sub_stats'):
246 for s in self.sub_stats:
247 s.unregister()
248 del self.sub_stats
249
251 '''
252 The method to handle the received LinkStatesStamped messages. The received
253 message will be emitted as stats_signal.
254 @param msg: the received message
255 @type msg: L{master_discovery_fkie.LinkStatesStamped}
256 '''
257 self.stats_signal.emit(msg)
258
259
261 '''
262 A class to monitor the state of the master. Will be used, if no master
263 discovering is available. On changes the 'state_signal' of type
264 L{master_discovery_fkie.msg.MasterState} will be emitted.
265 '''
266 state_signal = QtCore.Signal(MasterState)
267 '''@ivar: a signal to inform the receiver about new master state.
268 Parameter: L{master_discovery_fkie.msg.MasterState}'''
269
270 err_signal = QtCore.Signal(str)
271 '''@ivar: a signal to inform about an error.
272 Parameter: L{str}'''
273
274 ROSMASTER_HZ = 1
275 '''@ivar: the rate to test ROS master for changes.'''
276
277 - def init(self, monitor_port):
278 '''
279 Creates the local monitoring. Call start() to run the local monitoring.
280 @param monitor_port: the port of the XML-RPC Server created by monitoring class.
281 @type monitor_port: C{int}
282 '''
283 self._master_monitor = MasterMonitor(monitor_port, False)
284 self._do_pause = True
285 self._do_finish = False
286
287
288 self._masteruri = self._master_monitor.getMasteruri()
289 self._local_addr = self._master_monitor.getMastername()
290 self._masterMonitorThread = threading.Thread(target = self.mastermonitor_loop)
291 self._masterMonitorThread.setDaemon(True)
292 self._masterMonitorThread.start()
293
295 '''
296 Stop the local master monitoring
297 '''
298 print " Shutdown the local master monitoring..."
299 self._do_finish = True
300 self._masterMonitorThread.join(15)
301 self._master_monitor.shutdown()
302 print " Local master monitoring is off!"
303
305 '''
306 The method test periodically the state of the ROS master. The new state will
307 be published as 'state_signal'.
308 '''
309 import os
310 current_check_hz = OwnMasterMonitoring.ROSMASTER_HZ
311 while (not rospy.is_shutdown() and not self._do_finish):
312 try:
313 if not self._do_pause:
314 cputimes = os.times()
315 cputime_init = cputimes[0] + cputimes[1]
316 if self._master_monitor.checkState():
317 mon_state = self._master_monitor.getCurrentState()
318
319 state = MasterState(MasterState.STATE_CHANGED,
320 ROSMaster(str(self._local_addr),
321 str(self._masteruri),
322 mon_state.timestamp,
323 mon_state.timestamp_local,
324 True,
325 rospy.get_name(),
326 ''.join(['http://localhost:',str(self._master_monitor.rpcport)])))
327 self.state_signal.emit(state)
328
329 cputimes = os.times()
330 cputime = cputimes[0] + cputimes[1] - cputime_init
331 if current_check_hz*cputime > 0.20:
332 current_check_hz = float(current_check_hz)/2.0
333 elif current_check_hz*cputime < 0.10 and current_check_hz < OwnMasterMonitoring.ROSMASTER_HZ:
334 current_check_hz = float(current_check_hz)*2.0
335 except MasterConnectionException, e:
336 rospy.logwarn("MasterConnectionError while master check loop: %s"%e)
337 self.err_signal.emit("Error while master check loop: %s"%e)
338 except RuntimeError, e:
339
340 rospy.logwarn("RuntimeError while master check loop: %s"%e)
341 self.err_signal.emit("Error while master check loop: %s"%e)
342 if not rospy.is_shutdown() and not self._do_finish:
343 time.sleep(1.0/current_check_hz)
344
346 '''
347 Sets the local monitoring to pause.
348 @param state: On/Off pause
349 @type state: C{boolean}
350 '''
351 if not state and self._do_pause != state:
352 self._master_monitor.reset()
353 self._do_pause = state
354
356 '''
357 @return: True if the local monitoring of the Master state is paused.
358 @rtype: C{boolean}
359 '''
360 return self._do_pause
361