1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 import threading
34 import xmlrpclib
35 import copy
36 import sys
37 import socket
38 import time
39
40 import roslib; roslib.load_manifest('master_discovery_fkie')
41 import rospy
42 import roslib.network
43
44 from master_discovery_fkie.msg import *
45 from master_discovery_fkie.srv import *
46 from master_monitor import MasterMonitor, MasterConnectionException
47 from udp import McastSocket
51 '''
52 The class stores all information about the remote ROS master and the all
53 received heartbeat messages of the remote node. On first contact a theaded
54 connection to remote discoverer will be established to get additional
55 information about the ROS master.
56 '''
57 - def __init__(self, monitoruri, heartbeat_rate=1., timestamp=0.0, timestamp_local=0.0, callback_master_state=None):
58 '''
59 Initialize method for the DiscoveredMaster class.
60 @param monitoruri: The URI of the remote RPC server, which moniter the ROS master
61 @type monitoruri: C{str}
62 @param heartbeat_rate: The remote rate, which is used to send the heartbeat messages.
63 @type heartbeat_rate: C{float} (Default: C{1.})
64 @param timestamp: The timestamp of the state of the remoter ROS master
65 @type timestamp: C{float} (Default: c{0})
66 @param timestamp_local: The timestamp of the state of the remoter ROS master, without the changes maked while a synchronization.
67 @type timestamp_local: C{float} (Default: c{0})
68 @param callback_master_state: the callback method to publish the changes of the ROS masters
69 @type callback_master_state: C{<method>(master_discovery_fkie/MasterState)} (Default: C{None})
70 '''
71 self.masteruri = None
72 self.mastername = None
73 self.timestamp = timestamp
74 self.timestamp_local = timestamp_local
75 self.discoverername = None
76 self.monitoruri = monitoruri
77 self.heartbeat_rate = heartbeat_rate
78 self.heartbeats = list()
79 self.last_heartbeat_ts = time.time()
80 self.online = False
81 self.callback_master_state = callback_master_state
82
83 self._retrieveThread = threading.Thread(target = self.__retrieveMasterinfo)
84 self._retrieveThread.setDaemon(True)
85 self._retrieveThread.start()
86
88 '''
89 Adds a new heartbeat measurement. If it is a new timestamp a ROS message
90 about the change of this ROS master will be published into ROS network.
91 @param timestamp: The new timestamp of the ROS master state
92 @type timestamp: C{float}
93 @param timestamp_local: The timestamp of the state of the remoter ROS master, without the changes maked while a synchronization.
94 @type timestamp_local: C{float} (Default: c{0})
95 @param rate: The remote rate, which is used to send the heartbeat messages.
96 @type rate: C{float}
97 @return: True, on changes
98 '''
99 cur_time = time.time()
100 self.heartbeats.append(cur_time)
101 self.last_heartbeat_ts = cur_time
102
103 if self.heartbeat_rate != rate:
104 self.heartbeat_rate = rate
105 self.heartbeats = list()
106
107 if (self.timestamp != timestamp or not self.online or self.timestamp_local != timestamp_local):
108 self.timestamp = timestamp
109 self.timestamp_local = timestamp_local
110 if not (self.masteruri is None):
111
112 self.online = True
113 if not (self.callback_master_state is None):
114 self.callback_master_state(MasterState(MasterState.STATE_CHANGED,
115 ROSMaster(str(self.mastername),
116 self.masteruri,
117 self.timestamp,
118 self.timestamp_local,
119 self.online,
120 self.discoverername,
121 self.monitoruri)))
122 return True
123 return False
124
126 '''
127 Removes all hearbeat measurements, which are older as the given timestamp.
128 @param timestamp: heartbeats older this timestamp will be removed.
129 @type timestamp: C{float}
130 @return: the count of removed heartbeats
131 @rtype: C{int}
132 '''
133 do_remove = True
134 removed = 0
135 while do_remove:
136 if len(self.heartbeats) > 0 and self.heartbeats[0] < timestamp:
137 del self.heartbeats[0]
138 removed = removed + 1
139 else:
140 do_remove = False
141 return removed
142
157
159 '''
160 Connects to the remote RPC server of the discoverer node and gets the
161 information about the Master URI, name of the service, and other. The
162 getMasterInfo() method will be used. On problems the connection will be
163 reestablished until the information will be get successful.
164 '''
165 if not (self.monitoruri is None):
166 while self._retrieveThread.is_alive() and not rospy.is_shutdown() and (self.mastername is None):
167 try:
168 remote_monitor = xmlrpclib.ServerProxy(self.monitoruri)
169 timestamp, masteruri, mastername, nodename, monitoruri = remote_monitor.masterContacts()
170 except:
171 import traceback
172 rospy.logwarn("socket error: %s", traceback.format_exc())
173 time.sleep(1)
174 else:
175 if float(timestamp) != 0:
176 self.masteruri = masteruri
177 self.mastername = mastername
178 self.discoverername = nodename
179
180 self.timestamp = float(timestamp)
181 self.online = True
182
183 if not (self.callback_master_state is None):
184 self.callback_master_state(MasterState(MasterState.STATE_NEW,
185 ROSMaster(str(self.mastername),
186 self.masteruri,
187 self.timestamp,
188 self.timestamp,
189 self.online,
190 self.discoverername,
191 self.monitoruri)))
192 else:
193 time.sleep(1)
194
198 '''
199 The class to publish the current state of the ROS master.
200 '''
201
202 VERSION = 2
203 '''@ivar: the version of the packet format described by L{HEARTBEAT_FMT}'''
204 '''
205 Version 1: 'cBBiiH'
206 one character 'R'
207 unsigned char: version of the hearbeat message
208 unsigned char: rate of the heartbeat message in HZ*10. Maximal rate: 25.5 Hz -> value 255
209 int: secs of the ROS Master state
210 int: nsecs of the ROS Master state
211 unsigned short: the port number of the RPC Server of the remote ROS-Core monitor
212 int: secs of the ROS Master state (only local changes)
213 int: nsecs of the ROS Master state (only local changes)
214 Version 2: 'cBBiiH'
215 Version 1
216 int: secs of the ROS Master state (only local changes). Changes while sync will be ignored.
217 int: nsecs of the ROS Master state (only local changes). Changes while sync will be ignored.
218 '''
219 HEARTBEAT_FMT = 'cBBiiHii'
220 ''' @ivar: packet format description, see: U{http://docs.python.org/library/struct.html} '''
221 HEARTBEAT_HZ = 2
222 ''' @ivar: the send rate of the heartbeat packets in hz (Default: 2 Hz)'''
223 MEASUREMENT_INTERVALS = 5
224 ''' @ivar: the count of intervals (1 sec) used for a quality calculation. If
225 HEARTBEAT_HZ is smaller then 1, MEASUREMENT_INTERVALS will be divided by HEARTBEAT_HZ value.
226 (Default: 5 sec are used to determine the link qaulity)'''
227 TIMEOUT_FACTOR = 1.4
228 ''' @ivar: the timeout is defined by calculated measurement duration multiplied by TIMEOUT_FAKTOR. '''
229 ROSMASTER_HZ = 1
230 ''' @ivar: the test rate of ROS master state in Hz (Default: 1 Hz). '''
231 REMOVE_AFTER = 300
232 ''' @ivar: remove an offline host after this time in [sec] (Default: 300 sec). '''
233
234 - def __init__(self, mcast_port, mcast_group, monitor_port):
235 '''
236 Initialize method for the Discoverer class
237 @param mcast_port: The port used to publish and receive the multicast messages.
238 @type mcast_port: int
239 @param mcast_group: The IPv4 or IPv6 multicast group used for discovering over nodes.
240 @type mcast_group: str
241 @param monitor_port: The port of the RPC Server, used to get more information about the ROS master.
242 @type monitor_port: int
243 '''
244 threading.Thread.__init__(self)
245 self.do_finish = False
246 self.__lock = threading.RLock()
247
248 self.masters = dict()
249 self._changed = False
250 self.static_hosts = []
251 if rospy.has_param('~rosmaster_hz'):
252 Discoverer.ROSMASTER_HZ = rospy.get_param('~rosmaster_hz')
253 if rospy.has_param('~heartbeat_hz'):
254 Discoverer.HEARTBEAT_HZ = rospy.get_param('~heartbeat_hz')
255 if rospy.has_param('~measurement_intervals'):
256 Discoverer.MEASUREMENT_INTERVALS = rospy.get_param('~measurement_intervals')
257 if rospy.has_param('~timeout_factor'):
258 Discoverer.TIMEOUT_FACTOR = rospy.get_param('~timeout_factor')
259 if rospy.has_param('~remove_after'):
260 Discoverer.REMOVE_AFTER = rospy.get_param('~remove_after')
261 if rospy.has_param('~static_hosts'):
262 self.static_hosts[len(self.static_hosts):] = rospy.get_param('~static_hosts')
263
264 rospy.loginfo("Check the ROS Master[Hz]: " + str(Discoverer.ROSMASTER_HZ))
265 rospy.loginfo("Heart beat [Hz]: " + str(Discoverer.HEARTBEAT_HZ))
266 rospy.loginfo("Static hosts: " + str(self.static_hosts))
267 self.current_check_hz = Discoverer.ROSMASTER_HZ
268 self.pubstats = rospy.Publisher("~linkstats", LinkStatesStamped)
269
270
271
272 local_addr = roslib.network.get_local_address()
273 if (local_addr in ['localhost', '127.0.0.1']):
274 sys.exit("'%s' is not reachable for other systems. Change the ROS_MASTER_URI!", local_addr)
275
276 self.mcast_port = mcast_port
277 self.mcast_group = mcast_group
278 rospy.loginfo("Start broadcasting at ('%s', %d)", mcast_group, mcast_port)
279 self._init_mcast_socket(True)
280
281
282
283
284
285
286
287 self.pubchanges = rospy.Publisher("~changes", MasterState)
288
289 rospy.Service('~list_masters', DiscoverMasters, self.rosservice_list_masters)
290
291
292 self._recvThread = threading.Thread(target = self.recv_loop)
293 self._recvThread.setDaemon(True)
294 self._recvThread.start()
295
296
297 self.master_monitor = MasterMonitor(monitor_port)
298 self._masterMonitorThread = threading.Thread(target = self.checkROSMaster_loop)
299 self._masterMonitorThread.setDaemon(True)
300 self._masterMonitorThread.start()
301
302
303 try:
304 self._statsTimer = threading.Timer(1, self.timed_stats_calculation)
305 self._statsTimer.start()
306 except:
307 rospy.logwarn("ROS Timer is not available! Statistic calculation and timeouts are deactivated!")
308
309 rospy.on_shutdown(self.finish)
310
312 rospy.loginfo("Init multicast socket")
313
314 self.msocket = msocket = McastSocket(self.mcast_port, self.mcast_group)
315
316 if not msocket.hasEnabledMulticastIface() and doexit_on_error:
317 sys.exit("No enabled multicast interfaces available!\nAdd multicast support e.g. sudo ifconfig eth0 multicast")
318
344
346 '''
347 The run method is used for periodic send small multicast messages. This
348 messages simulates the heartbeat and are used to detect other running
349 nodes associated with ROS master.
350 '''
351 while (not rospy.is_shutdown()) and not self.do_finish:
352 if not self.master_monitor.getMasteruri() is None:
353 t = 0
354 local_t = 0
355 if not self.master_monitor.getCurrentState() is None:
356 t = self.master_monitor.getCurrentState().timestamp
357 local_t = self.master_monitor.getCurrentState().timestamp_local
358 msg = struct.pack(Discoverer.HEARTBEAT_FMT,'R', Discoverer.VERSION, int(Discoverer.HEARTBEAT_HZ*10), int(t), int((t-(int(t))) * 1000000000), self.master_monitor.rpcport,
359 int(local_t), int((local_t-(int(local_t))) * 1000000000))
360 try:
361 self.msocket.send2group(msg)
362 for a in self.static_hosts:
363 try:
364 self.msocket.send2addr(msg, a)
365 except socket.gaierror as e:
366 rospy.logwarn("send to static host: " + str(a) + " failed: " + str(e))
367 except Exception as e:
368 rospy.logwarn(e)
369 self._init_mcast_socket()
370 time.sleep(1.0/Discoverer.HEARTBEAT_HZ)
371 msg = struct.pack(Discoverer.HEARTBEAT_FMT,'R', Discoverer.VERSION, int(Discoverer.HEARTBEAT_HZ*10), -1, -1, self.master_monitor.rpcport, -1, -1)
372 self.msocket.send2group(msg)
373 for a in self.static_hosts:
374 rospy.loginfo("send Discoverer.HEARTBEAT_FMT to static host: " + str(a))
375 self.msocket.send2addr(msg, a)
376 self.msocket.close()
377
379 '''
380 The method test periodically the state of the ROS master. The new state will
381 be published as heartbeat messages.
382 '''
383 import os
384 try_count = 0
385 while (not rospy.is_shutdown()) and not self.do_finish:
386 try:
387 cputimes = os.times()
388 cputime_init = cputimes[0] + cputimes[1]
389 if self.master_monitor.checkState(self._changed):
390
391 pass
392 with self.__lock:
393 self._changed = False
394
395 cputimes = os.times()
396 cputime = cputimes[0] + cputimes[1] - cputime_init
397 if self.current_check_hz*cputime > 0.20:
398 self.current_check_hz = float(self.current_check_hz)/2.0
399 elif self.current_check_hz*cputime < 0.10 and float(self.current_check_hz)*2.0 < Discoverer.ROSMASTER_HZ:
400 self.current_check_hz = float(self.current_check_hz)*2.0
401 try_count = 0
402 except MasterConnectionException, e:
403 try_count = try_count + 1
404 if try_count == 5:
405 rospy.logerr("Communication with ROS Master failed: %s", e)
406
407
408
409 with self.__lock:
410 current_time = time.time()
411 to_remove = []
412 for (k, v) in self.masters.iteritems():
413 if Discoverer.REMOVE_AFTER > 0 and current_time - v.last_heartbeat_ts > Discoverer.REMOVE_AFTER:
414 to_remove.append(k)
415 if not v.mastername is None:
416 self.publish_masterstate(MasterState(MasterState.STATE_REMOVED,
417 ROSMaster(str(v.mastername),
418 v.masteruri,
419 v.timestamp,
420 v.timestamp_local,
421 v.online,
422 v.discoverername,
423 v.monitoruri)))
424 for r in to_remove:
425 del self.masters[r]
426 time.sleep(1.0/self.current_check_hz)
427
429 '''
430 This method handles the received multicast messages.
431 '''
432 while self.msocket and (not rospy.is_shutdown()) and not self.do_finish:
433 try:
434 (msg, address) = self.msocket.recvfrom(1024)
435 except socket.timeout:
436
437 pass
438 except socket.error:
439 import traceback
440 rospy.logwarn("socket error: %s", traceback.format_exc())
441 else:
442 if not self.do_finish:
443 try:
444 (version, msg_tuple) = self.msg2masterState(msg, address)
445 if (version == Discoverer.VERSION):
446 (r, version, rate, secs, nsecs, monitor_port, secs_l, nsecs_l) = msg_tuple
447 master_key = (address, monitor_port)
448
449 if secs == -1 or secs_l == -1:
450 with self.__lock:
451 if self.masters.has_key(master_key):
452 master = self.masters[master_key]
453 if not master.mastername is None:
454 self.publish_masterstate(MasterState(MasterState.STATE_REMOVED,
455 ROSMaster(str(master.mastername),
456 master.masteruri,
457 master.timestamp,
458 master.timestamp_local,
459 False,
460 master.discoverername,
461 master.monitoruri)))
462 del self.masters[master_key]
463
464 elif self.masters.has_key(master_key):
465 with self.__lock:
466 changed = self.masters[master_key].addHeartbeat(float(secs)+float(nsecs)/1000000000.0, float(secs_l)+float(nsecs_l)/1000000000.0, float(rate)/10.0,)
467 if not self._changed:
468 self._changed = changed
469
470 else:
471
472 with self.__lock:
473 self.masters[master_key] = DiscoveredMaster(monitoruri=''.join(['http://', address[0],':',str(monitor_port)]),
474 heartbeat_rate=float(rate)/10.0,
475 timestamp=float(secs)+float(nsecs)/1000000000.0,
476 timestamp_local=float(secs_l)+float(nsecs_l)/1000000000.0,
477 callback_master_state=self.publish_masterstate)
478 except Exception, e:
479 import traceback
480 print traceback.format_exc()
481 rospy.logwarn("Error while decode message: %s", str(e))
482
483 @classmethod
485 '''
486 @return: parses the hearbeat message and return a tuple of
487 version and values corresponding with current version of message.
488 @see L{Discoverer.HEARTBEAT_FMT}
489 @raise Exception on invalid message
490 @rtype: C{(unsigned char, tuple corresponding to L{Discoverer.HEARTBEAT_FMT})}
491 '''
492 if len(msg) > 2:
493 (r,) = struct.unpack('c', msg[0])
494 (version,) = struct.unpack('B', msg[1])
495 if (version == Discoverer.VERSION):
496 if (r == 'R'):
497 if len(msg) == struct.calcsize(Discoverer.HEARTBEAT_FMT):
498 return (version, struct.unpack(Discoverer.HEARTBEAT_FMT, msg))
499 else:
500 raise Exception(' '.join(["wrong initial discovery message char", str(r), "received from", str(address)]))
501 elif (version > Discoverer.VERSION):
502 raise Exception(' '.join(["newer heartbeat version", str(version), "(own:", str(Discoverer.VERSION), ") detected, please update your master_discovery"]))
503 elif (version < Discoverer.VERSION):
504 raise Exception(' '.join(["old heartbeat version", str(version), "detected (current:", str(Discoverer.VERSION),"), please update master_discovery on", str(address)]))
505 else:
506 raise Exception(' '.join(["heartbeat version", str(Discoverer.VERSION), "expected, received:", str(version)]))
507 raise Exception("massage is to small")
508
510 '''
511 This method will be called by a timer and has two jobs:
512 1. set the masters offline, if no heartbeat messages are received a long time
513 2. calculate the quality of known links
514 @see: L{float}
515 '''
516 result = LinkStatesStamped()
517 current_time = time.time()
518 result.header.stamp.secs = int(current_time)
519 result.header.stamp.nsecs = int((current_time - result.header.stamp.secs) * 1000000000)
520 with self.__lock:
521 for (k, v) in self.masters.iteritems():
522 quality = -1.0
523 if not (v.mastername is None):
524 rate = v.heartbeat_rate
525 measurement_duration = Discoverer.MEASUREMENT_INTERVALS
526 if rate < 1.:
527 measurement_duration = Discoverer.MEASUREMENT_INTERVALS / rate
528
529 ts_oldest = current_time - measurement_duration
530 removed_ts = v.removeHeartbeats(ts_oldest)
531
532 if current_time - v.last_heartbeat_ts > (measurement_duration * Discoverer.TIMEOUT_FACTOR):
533 v.setOffline()
534
535 if v.online:
536 beats_count = len(v.heartbeats)
537 expected_count = rate * measurement_duration
538 if expected_count > 0:
539 quality = float(beats_count) / float(expected_count) * 100.0
540 if quality > 100.0:
541 quality = 100.0
542 result.links.append(LinkState(v.mastername, quality))
543
544 self.publish_stats(result)
545 try:
546 if not rospy.is_shutdown():
547 self._statsTimer = threading.Timer(1, self.timed_stats_calculation)
548 self._statsTimer.start()
549 except:
550 pass
551
553 '''
554 Publishes the given state to the ROS network. This method is thread safe.
555 @param master_state: the master state to publish
556 @type master_state: L{master_discovery_fkie.MasterState}
557 '''
558 with self.__lock:
559 try:
560 self.pubchanges.publish(master_state)
561 except:
562 import traceback
563 traceback.print_exc()
564
566 '''
567 Publishes the link quality states to the ROS network.This method is thread safe.
568 @param stats: the link quality states to publish
569 @type stats: L{master_discovery_fkie.LinkStatesStamped}
570 '''
571 with self.__lock:
572 try:
573 self.pubstats.publish(stats)
574 except:
575 import traceback
576 traceback.print_exc()
577
598