1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 from python_qt_binding.QtCore import QObject, Signal
34 import socket
35 import threading
36 import time
37
38 import rospy
39
40 from master_discovery_fkie.master_monitor import MasterMonitor, MasterConnectionException
41 import master_discovery_fkie.interface_finder as interface_finder
42
43
44 try:
45 import std_srvs.srv
46 from multimaster_msgs_fkie.msg import LinkStatesStamped, MasterState, ROSMaster
47 from multimaster_msgs_fkie.srv import DiscoverMasters
48 except ImportError, e:
49 import sys
50 print >> sys.stderr, "Can't import messages and services of multimaster_msgs_fkie. Is multimaster_msgs_fkie package compiled?"
51 raise ImportError(str(e))
52
53
55 '''
56 A class to retrieve the ROS master list from a ROS service. The service
57 will be determine using U{master_discovery_fkie.interface_finder.get_listmaster_service()<http://docs.ros.org/kinetic/api/master_discovery_fkie/html/modules.html#interface-finder-module>}
58
59 '''
60 masterlist_signal = Signal(str, str, list)
61 '''@ivar: a signal with a list of the masters retrieved from the master_discovery service 'list_masters'.
62 ParameterB{:} C{masteruri}, C{service name}, C{[U{master_discovery_fkie.msg.ROSMaster<http://docs.ros.org/api/multimaster_msgs_fkie/html/msg/ROSMaster.html>}, ...]}'''
63 masterlist_err_signal = Signal(str, str)
64 '''@ivar: this signal is emitted if an error while calling #list_masters'
65 service of master_discovery is failed.
66 ParameterB{:} C{masteruri}, C{error}'''
67
69 QObject.__init__(self)
70 self.__serviceThreads = {}
71 self.__refreshThreads = {}
72 self._lock = threading.RLock()
73
75 print " Shutdown discovery listener..."
76 for _, thread in self.__serviceThreads.iteritems():
77 thread.join(3)
78 for _, thread in self.__refreshThreads.iteritems():
79 thread.join(3)
80 print " Discovery listener is off!"
81
83 '''
84 This method use the service 'list_masters' of the master_discovery to get
85 the list of discovered ROS master. The retrieved list will be emitted as
86 masterlist_signal.
87 @param masteruri: the ROS master URI
88 @type masteruri: C{str}
89 @param wait: wait for the service
90 @type wait: C{boolean}
91 '''
92 with self._lock:
93 if masteruri not in self.__serviceThreads:
94 upthread = MasterListThread(masteruri, wait)
95 upthread.master_list_signal.connect(self._on_master_list)
96 upthread.err_signal.connect(self._on_err)
97 self.__serviceThreads[masteruri] = upthread
98 upthread.start()
99
100 - def refresh(self, masteruri, wait=False):
101 '''
102 This method use the service 'refresh' of the master_discovery to refresh the
103 discovered masters.
104 @param masteruri: the ROS master URI
105 @type masteruri: C{str}
106 @param wait: wait for the service
107 @type wait: C{boolean}
108 '''
109 with self._lock:
110 if masteruri not in self.__refreshThreads:
111 upthread = MasterRefreshThread(masteruri, wait)
112 upthread.ok_signal.connect(self._on_ok)
113 upthread.err_signal.connect(self._on_err)
114 self.__refreshThreads[masteruri] = upthread
115 upthread.start()
116
118 with self._lock:
119 try:
120 thread = self.__serviceThreads.pop(masteruri)
121 del thread
122 except KeyError:
123 pass
124 self.masterlist_signal.emit(masteruri, service_name, items)
125
126 - def _on_err(self, masteruri, msg, on_refresh):
127 with self._lock:
128 try:
129 if on_refresh:
130 thread = self.__refreshThreads.pop(masteruri)
131 del thread
132 else:
133 thread = self.__serviceThreads.pop(masteruri)
134 del thread
135 except KeyError:
136 pass
137 self.masterlist_err_signal.emit(masteruri, msg)
138
140 with self._lock:
141 try:
142 thread = self.__refreshThreads.pop(masteruri)
143 del thread
144 except KeyError:
145 pass
146
147
149 '''
150 A thread to to retrieve the list of discovered ROS master from master_discovery
151 service and publish it by sending a QT signal.
152 '''
153 master_list_signal = Signal(str, str, list)
154 err_signal = Signal(str, str, bool)
155
156 - def __init__(self, masteruri, wait, parent=None):
157 QObject.__init__(self)
158 threading.Thread.__init__(self)
159 self._masteruri = masteruri
160 self._wait = wait
161 self.setDaemon(True)
162
164 '''
165 '''
166 if self._masteruri:
167 found = False
168 service_names = interface_finder.get_listmaster_service(self._masteruri, self._wait)
169 err_msg = ''
170 for service_name in service_names:
171 rospy.logdebug("service 'list_masters' found on %s as %s", self._masteruri, service_name)
172 if self._wait:
173 rospy.wait_for_service(service_name)
174 socket.setdefaulttimeout(3)
175 discoverMasters = rospy.ServiceProxy(service_name, DiscoverMasters)
176 try:
177 resp = discoverMasters()
178 except rospy.ServiceException, e:
179 rospy.logwarn("Service call 'list_masters' failed: %s", str(e))
180 self.err_signal.emit(self._masteruri, "Service call '%s' failed: %s" % (service_name, err_msg), False)
181 else:
182 if resp.masters:
183 self.master_list_signal.emit(self._masteruri, service_name, resp.masters)
184 found = True
185 else:
186 self.err_signal.emit(self._masteruri, "local 'master_discovery' reports empty master list, it seems he has a problem", False)
187 finally:
188 socket.setdefaulttimeout(None)
189 if not found:
190 self.err_signal.emit(self._masteruri, "no service 'list_masters' found on %s" % self._masteruri, False)
191
192
194 '''
195 A thread to call the refresh service of master discovery.
196 '''
197 ok_signal = Signal(str)
198 err_signal = Signal(str, str, bool)
199
200 - def __init__(self, masteruri, wait, parent=None):
201 QObject.__init__(self)
202 threading.Thread.__init__(self)
203 self._masteruri = masteruri
204 self._wait = wait
205 self.setDaemon(True)
206
208 '''
209 '''
210 if self._masteruri:
211 service_names = interface_finder.get_refresh_service(self._masteruri, self._wait)
212 err_msg = ''
213 for service_name in service_names:
214 rospy.logdebug("service 'refresh' found on %s as %s", self._masteruri, service_name)
215 if self._wait:
216 rospy.wait_for_service(service_name)
217 socket.setdefaulttimeout(3)
218 refreshMasters = rospy.ServiceProxy(service_name, std_srvs.srv.Empty)
219 try:
220 _ = refreshMasters()
221 self.ok_signal.emit(self._masteruri)
222 except rospy.ServiceException, e:
223 rospy.logwarn("ERROR Service call 'refresh' failed: %s", str(e))
224 self.err_signal.emit(self._masteruri, "ERROR Service call 'refresh' failed: %s" % err_msg, True)
225 finally:
226 socket.setdefaulttimeout(None)
227
228
230 '''
231 A class to receive the ROS master state updates from a ROS topic. The topic
232 will be determine using U{master_discovery_fkie.interface_finder.get_changes_topic()<http://docs.ros.org/kinetic/api/master_discovery_fkie/html/modules.html#interface-finder-module>}.
233 '''
234 state_signal = Signal(MasterState)
235 '''@ivar: a signal to inform the receiver about new master state.
236 Parameter: U{master_discovery_fkie.msg.MasterState<http://docs.ros.org/api/multimaster_msgs_fkie/html/msg/MasterState.html>}'''
237
239 '''
240 This method creates a ROS subscriber to received the notifications of ROS
241 master updates. The retrieved messages will be emitted as state_signal.
242 @param masteruri: the ROS master URI
243 @type masteruri: C{str}
244 @param wait: wait for the topic
245 @type wait: C{boolean}
246 @return: the topic name or an empty string
247 '''
248 result = ''
249 topic_names = interface_finder.get_changes_topic(masteruri, wait)
250 self.stop()
251 self.sub_changes = []
252 for topic_name in topic_names:
253 rospy.loginfo("listen for updates on %s", topic_name)
254 sub_changes = rospy.Subscriber(topic_name, MasterState, self.handlerMasterStateMsg)
255 self.sub_changes.append(sub_changes)
256 result = topic_name
257 return result
258
260 '''
261 Unregister the subscribed topics
262 '''
263 if hasattr(self, 'sub_changes'):
264 for s in self.sub_changes:
265 try:
266 s.unregister()
267 except Exception as e:
268 rospy.logwarn("Error while unregister master state topic %s" % e)
269 del self.sub_changes
270
272 '''
273 The method to handle the received MasterState messages. The received message
274 will be emitted as state_signal.
275 @param msg: the received message
276 @type msg: U{master_discovery_fkie.msg.MasterState<http://docs.ros.org/api/multimaster_msgs_fkie/html/msg/MasterState.html>}
277 '''
278 self.state_signal.emit(msg)
279
280
282 '''
283 A class to receive the connections statistics from a ROS topic. The topic
284 will be determine using U{master_discovery_fkie.interface_finder.get_stats_topic()<http://docs.ros.org/kinetic/api/master_discovery_fkie/html/modules.html#interface-finder-module>}
285 '''
286 stats_signal = Signal(LinkStatesStamped)
287 '''@ivar: a signal with a list of link states to discovered ROS masters.
288 Paramter: U{master_discovery_fkie.msg.LinkStatesStamped<http://docs.ros.org/api/multimaster_msgs_fkie/html/msg/LinkStatesStamped.html>}'''
289
291 '''
292 This method creates a ROS subscriber to received the notifications of
293 connection updates. The retrieved messages will be emitted as stats_signal.
294 @param masteruri: the ROS master URI
295 @type masteruri: str
296 @param wait: wait for the topic
297 @type wait: boolean
298 @return: the topic name or an empty string
299 '''
300 result = ''
301 self.stop()
302 self.sub_stats = []
303 topic_names = interface_finder.get_stats_topic(masteruri, wait)
304 for topic_name in topic_names:
305 rospy.loginfo("listen for connection statistics on %s", topic_name)
306 sub_stats = rospy.Subscriber(topic_name, LinkStatesStamped, self.handlerMasterStatsMsg)
307 self.sub_stats.append(sub_stats)
308 result = topic_name
309 return result
310
312 '''
313 Unregister the subscribed topics.
314 '''
315 if hasattr(self, 'sub_stats'):
316 for s in self.sub_stats:
317 s.unregister()
318 del self.sub_stats
319
321 '''
322 The method to handle the received LinkStatesStamped messages. The received
323 message will be emitted as stats_signal.
324 @param msg: the received message
325 @type msg: U{master_discovery_fkie.msg.LinkStatesStamped<http://docs.ros.org/api/multimaster_msgs_fkie/html/msg/LinkStatesStamped.html>}
326 '''
327 self.stats_signal.emit(msg)
328
329
331 '''
332 A class to monitor the state of the master. Will be used, if no master
333 discovering is available. On changes the 'state_signal' of type
334 U{master_discovery_fkie.msg.MasterState<http://docs.ros.org/api/multimaster_msgs_fkie/html/msg/MasterState.html>} will be emitted.
335 '''
336 state_signal = Signal(MasterState)
337 '''@ivar: a signal to inform the receiver about new master state.
338 Parameter: U{master_discovery_fkie.msg.MasterState<http://docs.ros.org/api/multimaster_msgs_fkie/html/msg/MasterState.html>}'''
339
340 err_signal = Signal(str)
341 '''@ivar: a signal to inform about an error.
342 Parameter: L{str}'''
343
344 ROSMASTER_HZ = 1
345 '''@ivar: the rate to test ROS master for changes.'''
346
347 - def init(self, monitor_port):
348 '''
349 Creates the local monitoring. Call start() to run the local monitoring.
350 @param monitor_port: the port of the XML-RPC Server created by monitoring class.
351 @type monitor_port: C{int}
352 '''
353 self._master_monitor = MasterMonitor(monitor_port, False)
354 self._do_pause = True
355 self._do_finish = False
356 self._masteruri = self._master_monitor.getMasteruri()
357 self._local_addr = self._master_monitor.getMastername()
358 self._masterMonitorThread = threading.Thread(target=self.mastermonitor_loop)
359 self._masterMonitorThread.setDaemon(True)
360 self._masterMonitorThread.start()
361 self._last_error = (time.time(), None)
362
364 return not self._do_pause
365
367 '''
368 Stop the local master monitoring
369 '''
370 self._do_finish = True
371 if self._master_monitor.is_running():
372 print " Shutdown the local master monitoring..."
373 self._masterMonitorThread.join(15)
374 self._master_monitor.shutdown()
375 print " Local master monitoring is off!"
376
378 '''
379 The method test periodically the state of the ROS master. The new state will
380 be published as 'state_signal'.
381 '''
382 import os
383 current_check_hz = OwnMasterMonitoring.ROSMASTER_HZ
384 while (not rospy.is_shutdown() and not self._do_finish):
385 try:
386 if not self._do_pause:
387 cputimes = os.times()
388 cputime_init = cputimes[0] + cputimes[1]
389 if self._master_monitor.checkState():
390 mon_state = self._master_monitor.getCurrentState()
391
392 state = MasterState(MasterState.STATE_CHANGED,
393 ROSMaster(str(self._local_addr),
394 str(self._masteruri),
395 mon_state.timestamp,
396 mon_state.timestamp_local,
397 True,
398 rospy.get_name(),
399 ''.join(['http://localhost:', str(self._master_monitor.rpcport)])))
400 self.state_signal.emit(state)
401
402 cputimes = os.times()
403 cputime = cputimes[0] + cputimes[1] - cputime_init
404 if current_check_hz * cputime > 0.20:
405 current_check_hz = float(current_check_hz) / 2.0
406 elif current_check_hz * cputime < 0.10 and current_check_hz < OwnMasterMonitoring.ROSMASTER_HZ:
407 current_check_hz = float(current_check_hz) * 2.0
408 except MasterConnectionException, mce:
409 self._handle_exception("MasterConnectionException while master check loop", mce)
410 except RuntimeError, ree:
411
412 self._handle_exception("RuntimeError while master check loop", ree)
413 if not rospy.is_shutdown() and not self._do_finish:
414 time.sleep(1.0 / current_check_hz)
415
417 text = '%s: %s' % (prefix, exception)
418 if self._last_error[1] != text or time.time() - self._last_error[0] > 60:
419 self._last_error = (time.time(), text)
420 rospy.logwarn(text)
421 self.err_signal.emit(text)
422
424 '''
425 Sets the local monitoring to pause.
426 @param state: On/Off pause
427 @type state: C{boolean}
428 '''
429 if not state and self._do_pause != state:
430 self._master_monitor.reset()
431 self._do_pause = state
432
434 '''
435 @return: True if the local monitoring of the Master state is paused.
436 @rtype: C{boolean}
437 '''
438 return self._do_pause
439