1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 from __future__ import print_function
36
37 """
38 Data structures and library for representing ROS Computation Graph state.
39 """
40
41 import sys
42 import time
43 import itertools
44 import random
45 import logging
46 import traceback
47 try:
48 from xmlrpc.client import ServerProxy
49 except ImportError:
50 from xmlrpclib import ServerProxy
51 import socket
52
53 import rosgraph.masterapi
54
55 logger = logging.getLogger('rosgraph.graph')
56
57 _ROS_NAME = '/rosviz'
58
60 """
61 In order to prevent topic/node name aliasing, we have to remap
62 topic node names. Currently we just prepend a space, which is
63 an illegal ROS name and thus not aliased.
64 @return str: topic mapped to a graph node name.
65 """
66 return ' ' + topic
68 """
69 Inverse of topic_node
70 @return str: undo topic_node() operation
71 """
72 return node[1:]
73
75 """
76 Data structure for storing info about a 'bad' node
77 """
78
79
80 DEAD = 0
81
82 WONKY = 1
83
85 """
86 @param type: DEAD | WONKY
87 @type type: int
88 """
89 self.name =name
90 self.reason = reason
91 self.type = type
92
94 """
95 Data structure for storing Edge instances
96 """
97 __slots__ = ['edges_by_start', 'edges_by_end']
102
104 return itertools.chain(*[v for v in self.edges_by_start.values()])
105
106 - def has(self, edge):
108
117
118 - def add(self, edge):
119 """
120 Add an edge to our internal representation. not multi-thread safe
121 @param edge: edge to add
122 @type edge: Edge
123 """
124
125 def update_map(map, key, edge):
126 if key in map:
127 l = map[key]
128 if not edge in l:
129 l.append(edge)
130 return True
131 else:
132 return False
133 else:
134 map[key] = [edge]
135 return True
136
137 updated = update_map(self.edges_by_start, edge.key, edge)
138 updated = update_map(self.edges_by_end, edge.rkey, edge) or updated
139 return updated
140
141 - def add_edges(self, start, dest, direction, label=''):
142 """
143 Create Edge instances for args and add resulting edges to edge
144 list. Convenience method to avoid repetitve logging, etc...
145 @param edge_list: data structure to add edge to
146 @type edge_list: EdgeList
147 @param start: name of start node. If None, warning will be logged and add fails
148 @type start: str
149 @param dest: name of start node. If None, warning will be logged and add fails
150 @type dest: str
151 @param direction: direction string (i/o/b)
152 @type direction: str
153 @return: True if update occurred
154 @rtype: bool
155 """
156
157
158
159
160 updated = False
161 if not start:
162 logger.warn("cannot add edge: cannot map start [%s] to a node name", start)
163 elif not dest:
164 logger.warn("cannot add edge: cannot map dest [%s] to a node name", dest)
165 else:
166 for args in edge_args(start, dest, direction, label):
167 updated = self.add(Edge(*args)) or updated
168 return updated
169
171 """
172 Delete all edges that start or end at node
173 @param node: name of node
174 @type node: str
175 """
176 def matching(map, pref):
177 return [map[k] for k in map.keys() if k.startswith(pref)]
178
179 pref = node+"|"
180 edge_lists = matching(self.edges_by_start, pref) + matching(self.edges_by_end, pref)
181 for el in edge_lists:
182 for e in el:
183 self.delete(e)
184
186
187 def update_map(map, key, edge):
188 if key in map:
189 edges = map[key]
190 if edge in edges:
191 edges.remove(edge)
192 return True
193 update_map(self.edges_by_start, edge.key, edge)
194 update_map(self.edges_by_end, edge.rkey, edge)
195
197 """
198 Data structure for representing ROS node graph edge
199 """
200
201 __slots__ = ['start', 'end', 'label', 'key', 'rkey']
202 - def __init__(self, start, end, label=''):
209
213 return "%s->%s"%(self.start, self.end)
216
217 -def edge_args(start, dest, direction, label):
218 """
219 compute argument ordering for Edge constructor based on direction flag
220 @param direction str: 'i', 'o', or 'b' (in/out/bidir) relative to \a start
221 @param start str: name of starting node
222 @param start dest: name of destination node
223 """
224 edge_args = []
225 if direction in ['o', 'b']:
226 edge_args.append((start, dest, label))
227 if direction in ['i', 'b']:
228 edge_args.append((dest, start, label))
229 return edge_args
230
231
233 """
234 Utility class for polling ROS statistics from running ROS graph.
235 Not multi-thread-safe
236 """
237
238 - def __init__(self, node_ns='/', topic_ns='/'):
239 self.master = rosgraph.masterapi.Master(_ROS_NAME)
240
241 self.node_ns = node_ns or '/'
242 self.topic_ns = topic_ns or '/'
243
244
245 self.nn_nodes = set([])
246
247 self.nt_nodes = set([])
248
249
250 self.bad_nodes = {}
251 import threading
252 self.bad_nodes_lock = threading.Lock()
253
254
255 self.srvs = set([])
256
257 self.nn_edges = EdgeList()
258
259 self.nt_edges = EdgeList()
260
261 self.nt_all_edges = EdgeList()
262
263
264 self.node_uri_map = {}
265
266 self.uri_node_map = {}
267
268
269 self.last_master_refresh = 0
270 self.last_node_refresh = {}
271
272
273
274 self.master_stale = 5.0
275
276
277 self.node_stale = 5.0
278
279
281 """
282 @param stale_secs: seconds that data is considered fresh
283 @type stale_secs: double
284 """
285 self.master_stale = stale_secs
286
288 """
289 @param stale_secs: seconds that data is considered fresh
290 @type stale_secs: double
291 """
292 self.node_stale = stale_secs
293
295 """
296 @return: True if nodes information was updated
297 @rtype: bool
298 """
299 logger.debug("master refresh: starting")
300 updated = False
301 try:
302 val = self.master.getSystemState()
303 except rosgraph.masterapi.MasterException as e:
304 print("Unable to contact master", str(e), file=sys.stderr)
305 logger.error("unable to contact master: %s", str(e))
306 return False
307
308 pubs, subs, srvs = val
309
310 nodes = []
311 nt_all_edges = self.nt_all_edges
312 nt_nodes = self.nt_nodes
313 for state, direction in ((pubs, 'o'), (subs, 'i')):
314 for topic, l in state:
315 if topic.startswith(self.topic_ns):
316 nodes.extend([n for n in l if n.startswith(self.node_ns)])
317 nt_nodes.add(topic_node(topic))
318 for node in l:
319 updated = nt_all_edges.add_edges(
320 node, topic_node(topic), direction) or updated
321
322 nodes = set(nodes)
323
324 srvs = set([s for s, _ in srvs])
325 purge = None
326 if nodes ^ self.nn_nodes:
327 purge = self.nn_nodes - nodes
328 self.nn_nodes = nodes
329 updated = True
330 if srvs ^ self.srvs:
331 self.srvs = srvs
332 updated = True
333
334 if purge:
335 logger.debug("following nodes and related edges will be purged: %s", ','.join(purge))
336 for p in purge:
337 logger.debug('purging edges for node %s', p)
338 self.nn_edges.delete_all(p)
339 self.nt_edges.delete_all(p)
340 self.nt_all_edges.delete_all(p)
341
342 logger.debug("master refresh: done, updated[%s]", updated)
343 return updated
344
346 try:
347
348 self.bad_nodes_lock.acquire()
349 if node in self.bad_nodes:
350 self.bad_nodes[node].type = BadNode.DEAD
351 else:
352 self.bad_nodes[node] = (BadNode(node, BadNode.DEAD, reason))
353 finally:
354 self.bad_nodes_lock.release()
355
357 """
358 Promotes bad node to 'wonky' status.
359 """
360 try:
361
362 self.bad_nodes_lock.acquire()
363 bad = self.bad_nodes[node]
364 bad.type = BadNode.WONKY
365 finally:
366 self.bad_nodes_lock.release()
367
369 """
370 Retrieve bus info from the node and update nodes and edges as appropriate
371 @param node: node name
372 @type node: str
373 @param api: XML-RPC proxy
374 @type api: ServerProxy
375 @param bad_node: If True, node has connectivity issues and
376 should be treated differently
377 @type bad_node: bool
378 """
379 try:
380 logger.debug("businfo: contacting node [%s] for bus info", node)
381
382
383 if bad_node:
384 self._unmark_bad_node(node)
385
386
387
388 old_timeout = socket.getdefaulttimeout()
389 if bad_node:
390
391 socket.setdefaulttimeout(0.2)
392 else:
393 socket.setdefaulttimeout(1.0)
394
395 code, msg, bus_info = api.getBusInfo(_ROS_NAME)
396
397 socket.setdefaulttimeout(old_timeout)
398 except Exception as e:
399
400 self._mark_bad_node(node, str(e))
401 code = -1
402 msg = traceback.format_exc()
403
404 updated = False
405 if code != 1:
406 logger.error("cannot get stats info from node [%s]: %s", node, msg)
407 else:
408
409 for info in bus_info:
410
411 if len(info) < 5:
412 continue
413
414 connection_id = info[0]
415 dest_id = info[1]
416 direction = info[2]
417 transport = info[3]
418 topic = info[4]
419 if len(info) > 5:
420 connected = info[5]
421 else:
422 connected = True
423
424 if connected and topic.startswith(self.topic_ns):
425
426
427 self.nt_nodes.add(topic_node(topic))
428
429
430 updated = self.nt_edges.add_edges(node, topic_node(topic), direction) or updated
431
432
433 if dest_id.startswith('http://'):
434
435 dest_name = self.uri_node_map.get(dest_id, None)
436 updated = self.nn_edges.add_edges(node, dest_name, direction, topic) or updated
437 else:
438
439 pass
440 return updated
441
443 """
444 Contact node for stats/connectivity information
445 @param node: name of node to contact
446 @type node: str
447 @param bad_node: if True, node has connectivity issues
448 @type bad_node: bool
449 @return: True if node was successfully contacted
450 @rtype bool
451 """
452
453
454 updated = False
455 uri = self._node_uri_refresh(node)
456 try:
457 if uri:
458 api = ServerProxy(uri)
459 updated = self._node_refresh_businfo(node, api, bad_node)
460 except KeyError as e:
461 logger.warn('cannot contact node [%s] as it is not in the lookup table'%node)
462 return updated
463
465 try:
466 uri = self.master.lookupNode(node)
467 except:
468 msg = traceback.format_exc()
469 logger.warn("master reported error in node lookup: %s"%msg)
470 return None
471
472 self.node_uri_map[node] = uri
473 self.uri_node_map[uri] = node
474 return uri
475
477 """
478 Build self.node_uri_map and self.uri_node_map using master as a
479 lookup service. This will make N requests to the master for N
480 nodes, so this should only be used sparingly
481 """
482 for node in self.nn_nodes:
483 self._node_uri_refresh(node)
484
486 """
487 Update loop for nodes with bad connectivity. We box them separately
488 so that we can maintain the good performance of the normal update loop.
489 Once a node is on the bad list it stays there.
490 """
491 last_node_refresh = self.last_node_refresh
492
493
494 try:
495 self.bad_nodes_lock.acquire()
496
497 update_queue = self.bad_nodes.values()[:]
498 finally:
499 self.bad_nodes_lock.release()
500
501
502 updated = False
503
504 num_nodes = 0
505
506 start_time = time.time()
507 while update_queue:
508
509 next = update_queue.pop()
510
511 if time.time() > (last_node_refresh.get(next, 0.0) + self.node_stale):
512 updated = self._node_refresh(next.name, True) or updated
513
514
515 last_node_refresh[next] = time.time() + (random.random() * self.node_stale / 5.0)
516 num_nodes += 1
517
518
519 time.sleep(0.01)
520 end_time = time.time()
521
522 logger.debug("ROS stats (bad nodes) update took %ss"%(end_time-start_time))
523 return updated
524
526 """
527 Update all the stats. This method may take a while to complete as it will
528 communicate with all nodes + master.
529 """
530
531 last_node_refresh = self.last_node_refresh
532
533
534 update_queue = None
535
536 work_to_do = True
537
538 updated = False
539
540 num_nodes = 0
541
542 start_time = time.time()
543 while work_to_do:
544
545
546
547
548
549 if time.time() > (self.last_master_refresh + self.master_stale):
550 updated = self._master_refresh()
551 if self.last_master_refresh == 0:
552
553 self._node_uri_refresh_all()
554
555 self.last_master_refresh = time.time()
556
557 else:
558
559 if update_queue is None:
560 update_queue = list(self.nn_nodes)
561
562 elif not update_queue:
563 work_to_do = False
564
565 else:
566
567 next = update_queue.pop()
568
569 if time.time() > (last_node_refresh.get(next, 0.0) + self.node_stale):
570 updated = self._node_refresh(next) or updated
571
572
573 last_node_refresh[next] = time.time() + (random.random() * self.node_stale / 5.0)
574 num_nodes += 1
575
576
577 time.sleep(0.01)
578 end_time = time.time()
579
580 logger.debug("ROS stats update took %ss"%(end_time-start_time))
581 return updated
582