1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 from __future__ import print_function
36
37 """
38 Data structures and library for representing ROS Computation Graph state.
39 """
40
41 import sys
42 import time
43 import itertools
44 import random
45 import logging
46 import traceback
47 try:
48 from xmlrpc.client import ServerProxy
49 except ImportError:
50 from xmlrpclib import ServerProxy
51 import socket
52
53 import rosgraph.masterapi
54
55 logger = logging.getLogger('rosgraph.graph')
56
57 _ROS_NAME = '/rosviz'
58
60 """
61 In order to prevent topic/node name aliasing, we have to remap
62 topic node names. Currently we just prepend a space, which is
63 an illegal ROS name and thus not aliased.
64 @return str: topic mapped to a graph node name.
65 """
66 return ' ' + topic
68 """
69 Inverse of topic_node
70 @return str: undo topic_node() operation
71 """
72 return node[1:]
73
75 """
76 Data structure for storing info about a 'bad' node
77 """
78
79
80 DEAD = 0
81
82 WONKY = 1
83
85 """
86 @param type: DEAD | WONKY
87 @type type: int
88 """
89 self.name =name
90 self.reason = reason
91 self.type = type
92
94 """
95 Data structure for storing Edge instances
96 """
97 __slots__ = ['edges_by_start', 'edges_by_end']
102
104 return itertools.chain(*[v for v in self.edges_by_start.values()])
105
106 - def has(self, edge):
114
115 - def add(self, edge):
116 """
117 Add an edge to our internal representation. not multi-thread safe
118 @param edge: edge to add
119 @type edge: Edge
120 """
121
122 def update_map(map, key, edge):
123 if key in map:
124 l = map[key]
125 if not edge in l:
126 l.append(edge)
127 return True
128 else:
129 return False
130 else:
131 map[key] = [edge]
132 return True
133
134 updated = update_map(self.edges_by_start, edge.key, edge)
135 updated = update_map(self.edges_by_end, edge.rkey, edge) or updated
136 return updated
137
138 - def add_edges(self, start, dest, direction, label=''):
139 """
140 Create Edge instances for args and add resulting edges to edge
141 list. Convenience method to avoid repetitve logging, etc...
142 @param edge_list: data structure to add edge to
143 @type edge_list: EdgeList
144 @param start: name of start node. If None, warning will be logged and add fails
145 @type start: str
146 @param dest: name of start node. If None, warning will be logged and add fails
147 @type dest: str
148 @param direction: direction string (i/o/b)
149 @type direction: str
150 @return: True if update occured
151 @rtype: bool
152 """
153
154
155
156
157 updated = False
158 if not start:
159 logger.warn("cannot add edge: cannot map start [%s] to a node name", start)
160 elif not dest:
161 logger.warn("cannot add edge: cannot map dest [%s] to a node name", dest)
162 else:
163 for args in edge_args(start, dest, direction, label):
164 updated = self.add(Edge(*args)) or updated
165 return updated
166
168 """
169 Delete all edges that start or end at node
170 @param node: name of node
171 @type node: str
172 """
173 def matching(map, pref):
174 return [map[k] for k in map.keys() if k.startswith(pref)]
175
176 pref = node+"|"
177 edge_lists = matching(self.edges_by_start, pref) + matching(self.edges_by_end, pref)
178 for el in edge_lists:
179 for e in el:
180 self.delete(e)
181
183
184 def update_map(map, key, edge):
185 if key in map:
186 edges = map[key]
187 if edge in edges:
188 edges.remove(edge)
189 return True
190 update_map(self.edges_by_start, edge.key, edge)
191 update_map(self.edges_by_end, edge.rkey, edge)
192
194 """
195 Data structure for representing ROS node graph edge
196 """
197
198 __slots__ = ['start', 'end', 'label', 'key', 'rkey']
199 - def __init__(self, start, end, label=''):
206
210 return "%s->%s"%(self.start, self.end)
213
214 -def edge_args(start, dest, direction, label):
215 """
216 compute argument ordering for Edge constructor based on direction flag
217 @param direction str: 'i', 'o', or 'b' (in/out/bidir) relative to \a start
218 @param start str: name of starting node
219 @param start dest: name of destination node
220 """
221 edge_args = []
222 if direction in ['o', 'b']:
223 edge_args.append((start, dest, label))
224 if direction in ['i', 'b']:
225 edge_args.append((dest, start, label))
226 return edge_args
227
228
230 """
231 Utility class for polling ROS statistics from running ROS graph.
232 Not multi-thread-safe
233 """
234
235 - def __init__(self, node_ns='/', topic_ns='/'):
236 self.master = rosgraph.masterapi.Master(_ROS_NAME)
237
238 self.node_ns = node_ns or '/'
239 self.topic_ns = topic_ns or '/'
240
241
242 self.nn_nodes = set([])
243
244 self.nt_nodes = set([])
245
246
247 self.bad_nodes = {}
248 import threading
249 self.bad_nodes_lock = threading.Lock()
250
251
252 self.srvs = set([])
253
254 self.nn_edges = EdgeList()
255
256 self.nt_edges = EdgeList()
257
258 self.nt_all_edges = EdgeList()
259
260
261 self.node_uri_map = {}
262
263 self.uri_node_map = {}
264
265
266 self.last_master_refresh = 0
267 self.last_node_refresh = {}
268
269
270
271 self.master_stale = 5.0
272
273
274 self.node_stale = 5.0
275
276
278 """
279 @param stale_secs: seconds that data is considered fresh
280 @type stale_secs: double
281 """
282 self.master_stale = stale_secs
283
285 """
286 @param stale_secs: seconds that data is considered fresh
287 @type stale_secs: double
288 """
289 self.node_stale = stale_secs
290
292 """
293 @return: True if nodes information was updated
294 @rtype: bool
295 """
296 logger.debug("master refresh: starting")
297 updated = False
298 try:
299 val = self.master.getSystemState()
300 except rosgraph.masterapi.MasterException as e:
301 print("Unable to contact master", str(e), file=sys.stderr)
302 logger.error("unable to contact master: %s", str(e))
303 return False
304
305 pubs, subs, srvs = val
306
307 nodes = []
308 nt_all_edges = self.nt_all_edges
309 nt_nodes = self.nt_nodes
310 for state, direction in ((pubs, 'o'), (subs, 'i')):
311 for topic, l in state:
312 if topic.startswith(self.topic_ns):
313 nodes.extend([n for n in l if n.startswith(self.node_ns)])
314 nt_nodes.add(topic_node(topic))
315 for node in l:
316 updated = nt_all_edges.add_edges(
317 node, topic_node(topic), direction) or updated
318
319 nodes = set(nodes)
320
321 srvs = set([s for s, _ in srvs])
322 purge = None
323 if nodes ^ self.nn_nodes:
324 purge = self.nn_nodes - nodes
325 self.nn_nodes = nodes
326 updated = True
327 if srvs ^ self.srvs:
328 self.srvs = srvs
329 updated = True
330
331 if purge:
332 logger.debug("following nodes and related edges will be purged: %s", ','.join(purge))
333 for p in purge:
334 logger.debug('purging edges for node %s', p)
335 self.nn_edges.delete_all(p)
336 self.nt_edges.delete_all(p)
337 self.nt_all_edges.delete_all(p)
338
339 logger.debug("master refresh: done, updated[%s]", updated)
340 return updated
341
343 try:
344
345 self.bad_nodes_lock.acquire()
346 if node in self.bad_nodes:
347 self.bad_nodes[node].type = BadNode.DEAD
348 else:
349 self.bad_nodes[node] = (BadNode(node, BadNode.DEAD, reason))
350 finally:
351 self.bad_nodes_lock.release()
352
354 """
355 Promotes bad node to 'wonky' status.
356 """
357 try:
358
359 self.bad_nodes_lock.acquire()
360 bad = self.bad_nodes[node]
361 bad.type = BadNode.WONKY
362 finally:
363 self.bad_nodes_lock.release()
364
366 """
367 Retrieve bus info from the node and update nodes and edges as appropriate
368 @param node: node name
369 @type node: str
370 @param api: XML-RPC proxy
371 @type api: ServerProxy
372 @param bad_node: If True, node has connectivity issues and
373 should be treated differently
374 @type bad_node: bool
375 """
376 try:
377 logger.debug("businfo: contacting node [%s] for bus info", node)
378
379
380 if bad_node:
381 self._unmark_bad_node(node)
382
383
384
385 old_timeout = socket.getdefaulttimeout()
386 if bad_node:
387
388 socket.setdefaulttimeout(0.2)
389 else:
390 socket.setdefaulttimeout(1.0)
391
392 code, msg, bus_info = api.getBusInfo(_ROS_NAME)
393
394 socket.setdefaulttimeout(old_timeout)
395 except Exception as e:
396
397 self._mark_bad_node(node, str(e))
398 code = -1
399 msg = traceback.format_exc()
400
401 updated = False
402 if code != 1:
403 logger.error("cannot get stats info from node [%s]: %s", node, msg)
404 else:
405
406 for info in bus_info:
407
408 if len(info) < 5:
409 continue
410
411 connection_id = info[0]
412 dest_id = info[1]
413 direction = info[2]
414 transport = info[3]
415 topic = info[4]
416 if len(info) > 5:
417 connected = info[5]
418 else:
419 connected = True
420
421 if connected and topic.startswith(self.topic_ns):
422
423
424 self.nt_nodes.add(topic_node(topic))
425
426
427 updated = self.nt_edges.add_edges(node, topic_node(topic), direction) or updated
428
429
430 if dest_id.startswith('http://'):
431
432 dest_name = self.uri_node_map.get(dest_id, None)
433 updated = self.nn_edges.add_edges(node, dest_name, direction, topic) or updated
434 else:
435
436 pass
437 return updated
438
440 """
441 Contact node for stats/connectivity information
442 @param node: name of node to contact
443 @type node: str
444 @param bad_node: if True, node has connectivity issues
445 @type bad_node: bool
446 @return: True if node was successfully contacted
447 @rtype bool
448 """
449
450
451 updated = False
452 uri = self._node_uri_refresh(node)
453 try:
454 if uri:
455 api = ServerProxy(uri)
456 updated = self._node_refresh_businfo(node, api, bad_node)
457 except KeyError as e:
458 logger.warn('cannot contact node [%s] as it is not in the lookup table'%node)
459 return updated
460
462 try:
463 uri = self.master.lookupNode(node)
464 except:
465 msg = traceback.format_exc()
466 logger.warn("master reported error in node lookup: %s"%msg)
467 return None
468
469 self.node_uri_map[node] = uri
470 self.uri_node_map[uri] = node
471 return uri
472
474 """
475 Build self.node_uri_map and self.uri_node_map using master as a
476 lookup service. This will make N requests to the master for N
477 nodes, so this should only be used sparingly
478 """
479 for node in self.nn_nodes:
480 self._node_uri_refresh(node)
481
483 """
484 Update loop for nodes with bad connectivity. We box them separately
485 so that we can maintain the good performance of the normal update loop.
486 Once a node is on the bad list it stays there.
487 """
488 last_node_refresh = self.last_node_refresh
489
490
491 try:
492 self.bad_nodes_lock.acquire()
493
494 update_queue = self.bad_nodes.values()[:]
495 finally:
496 self.bad_nodes_lock.release()
497
498
499 updated = False
500
501 num_nodes = 0
502
503 start_time = time.time()
504 while update_queue:
505
506 next = update_queue.pop()
507
508 if time.time() > (last_node_refresh.get(next, 0.0) + self.node_stale):
509 updated = self._node_refresh(next.name, True) or updated
510
511
512 last_node_refresh[next] = time.time() + (random.random() * self.node_stale / 5.0)
513 num_nodes += 1
514
515
516 time.sleep(0.01)
517 end_time = time.time()
518
519 logger.debug("ROS stats (bad nodes) update took %ss"%(end_time-start_time))
520 return updated
521
523 """
524 Update all the stats. This method may take awhile to complete as it will
525 communicate with all nodes + master.
526 """
527
528 last_node_refresh = self.last_node_refresh
529
530
531 update_queue = None
532
533 work_to_do = True
534
535 updated = False
536
537 num_nodes = 0
538
539 start_time = time.time()
540 while work_to_do:
541
542
543
544
545
546 if time.time() > (self.last_master_refresh + self.master_stale):
547 updated = self._master_refresh()
548 if self.last_master_refresh == 0:
549
550 self._node_uri_refresh_all()
551
552 self.last_master_refresh = time.time()
553
554 else:
555
556 if update_queue is None:
557 update_queue = list(self.nn_nodes)
558
559 elif not update_queue:
560 work_to_do = False
561
562 else:
563
564 next = update_queue.pop()
565
566 if time.time() > (last_node_refresh.get(next, 0.0) + self.node_stale):
567 updated = self._node_refresh(next) or updated
568
569
570 last_node_refresh[next] = time.time() + (random.random() * self.node_stale / 5.0)
571 num_nodes += 1
572
573
574 time.sleep(0.01)
575 end_time = time.time()
576
577 logger.debug("ROS stats update took %ss"%(end_time-start_time))
578 return updated
579