1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 """
36 Data structures and library for representing ROS Computation Graph state.
37 """
38
39 import sys
40 import time
41 import itertools
42 import random
43 import logging
44 import traceback
45 import xmlrpclib
46 import socket
47
48 import roslib.scriptutil
49
50 logger = logging.getLogger('rosgraph.graph')
51
52 _ROS_NAME = '/rosviz'
53
55 """
56 In order to prevent topic/node name aliasing, we have to remap
57 topic node names. Currently we just prepend a space, which is
58 an illegal ROS name and thus not aliased.
59 @return str: topic mapped to a graph node name.
60 """
61 return ' ' + topic
63 """
64 Inverse of topic_node
65 @return str: undo topic_node() operation
66 """
67 return node[1:]
68
70 """
71 Data structure for storing info about a 'bad' node
72 """
73
74
75 DEAD = 0
76
77 WONKY = 1
78
80 """
81 @param type: DEAD | WONKY
82 @type type: int
83 """
84 self.name =name
85 self.reason = reason
86 self.type = type
87
89 """
90 Data structure for storing Edge instances
91 """
92 __slots__ = ['edges_by_start', 'edges_by_end']
97
99 return itertools.chain(*[v for v in self.edges_by_start.itervalues()])
100
101 - def has(self, edge):
109
110 - def add(self, edge):
111 """
112 Add an edge to our internal representation. not multi-thread safe
113 @param edge: edge to add
114 @type edge: Edge
115 """
116
117 def update_map(map, key, edge):
118 if key in map:
119 l = map[key]
120 if not edge in l:
121 l.append(edge)
122 return True
123 else:
124 return False
125 else:
126 map[key] = [edge]
127 return True
128
129 updated = update_map(self.edges_by_start, edge.key, edge)
130 updated = update_map(self.edges_by_end, edge.rkey, edge) or updated
131 return updated
132
133 - def add_edges(self, start, dest, direction, label=''):
134 """
135 Create Edge instances for args and add resulting edges to edge
136 list. Convenience method to avoid repetitve logging, etc...
137 @param edge_list: data structure to add edge to
138 @type edge_list: EdgeList
139 @param start: name of start node. If None, warning will be logged and add fails
140 @type start: str
141 @param dest: name of start node. If None, warning will be logged and add fails
142 @type dest: str
143 @param direction: direction string (i/o/b)
144 @type direction: str
145 @return: True if update occured
146 @rtype: bool
147 """
148
149
150
151
152 updated = False
153 if not start:
154 logger.warn("cannot add edge: cannot map start [%s] to a node name", start)
155 elif not dest:
156 logger.warn("cannot add edge: cannot map dest [%s] to a node name", dest)
157 else:
158 for args in edge_args(start, dest, direction, label):
159 updated = self.add(Edge(*args)) or updated
160 return updated
161
163 """
164 Delete all edges that start or end at node
165 @param node: name of node
166 @type node: str
167 """
168 def matching(map, pref):
169 return [map[k] for k in map.iterkeys() if k.startswith(pref)]
170
171 pref = node+"|"
172 edge_lists = matching(self.edges_by_start, pref) + matching(self.edges_by_start, pref)
173 for el in edge_lists:
174 for e in el:
175 self.delete(e)
176
178
179 def update_map(map, key, edge):
180 if key in map:
181 edges = map[key]
182 if edge in edges:
183 edges.remove(edge)
184 return True
185 update_map(self.edges_by_start, edge.key, edge)
186 update_map(self.edges_by_end, edge.rkey, edge)
187
189 """
190 Data structure for representing ROS node graph edge
191 """
192
193 __slots__ = ['start', 'end', 'label', 'key', 'rkey']
194 - def __init__(self, start, end, label=''):
201
205 return "%s->%s"%(self.start, self.end)
208
209 -def edge_args(start, dest, direction, label):
210 """
211 compute argument ordering for Edge constructor based on direction flag
212 @param direction str: 'i', 'o', or 'b' (in/out/bidir) relative to \a start
213 @param start str: name of starting node
214 @param start dest: name of destination node
215 """
216 edge_args = []
217 if direction in ['o', 'b']:
218 edge_args.append((start, dest, label))
219 if direction in ['i', 'b']:
220 edge_args.append((dest, start, label))
221 return edge_args
222
223
225 """
226 Utility class for polling ROS statistics from running ROS graph.
227 Not multi-thread-safe
228 """
229
230 - def __init__(self, node_ns='/', topic_ns='/'):
231 self.master = roslib.scriptutil.get_master()
232
233 self.node_ns = node_ns or '/'
234 self.topic_ns = topic_ns or '/'
235
236
237 self.nn_nodes = set([])
238
239 self.nt_nodes = set([])
240
241
242 self.bad_nodes = {}
243 import threading
244 self.bad_nodes_lock = threading.Lock()
245
246
247 self.srvs = set([])
248
249 self.nn_edges = EdgeList()
250
251 self.nt_edges = EdgeList()
252
253 self.nt_all_edges = EdgeList()
254
255
256 self.node_uri_map = {}
257
258 self.uri_node_map = {}
259
260
261 self.last_master_refresh = 0
262 self.last_node_refresh = {}
263
264
265
266 self.master_stale = 5.0
267
268
269 self.node_stale = 5.0
270
271
273 """
274 @param stale_secs: seconds that data is considered fresh
275 @type stale_secs: double
276 """
277 self.master_stale = stale_secs
278
280 """
281 @param stale_secs: seconds that data is considered fresh
282 @type stale_secs: double
283 """
284 self.node_stale = stale_secs
285
287 """
288 @return: True if nodes information was updated
289 @rtype: bool
290 """
291 logger.debug("master refresh: starting")
292 updated = False
293
294 try:
295 code, msg, val = self.master.getSystemState(_ROS_NAME)
296 except socket.error, msg:
297 print >> sys.stderr, "Unable to contact master", msg
298 logger.error("unable to contact master: %s", msg)
299 return False
300 if code != 1:
301 print >> sys.stderr, "Unable to contact master: %s"%msg
302 logger.error("unable to contact master: %s", msg)
303 updated = False
304 else:
305 pubs, subs, srvs = val
306
307 nodes = []
308 nt_all_edges = self.nt_all_edges
309 nt_nodes = self.nt_nodes
310 for state, direction in ((pubs, 'o'), (subs, 'i')):
311 for topic, l in state:
312 if topic.startswith(self.topic_ns):
313 nodes.extend([n for n in l if n.startswith(self.node_ns)])
314 nt_nodes.add(topic_node(topic))
315 for node in l:
316 updated = nt_all_edges.add_edges(
317 node, topic_node(topic), direction) or updated
318
319 nodes = set(nodes)
320
321 srvs = set([s for s, _ in srvs])
322 purge = None
323 if nodes ^ self.nn_nodes:
324 purge = self.nn_nodes - nodes
325 self.nn_nodes = nodes
326 updated = True
327 if srvs ^ self.srvs:
328 self.srvs = srvs
329 updated = True
330
331 if purge:
332 logger.debug("following nodes and related edges will be purged: %s", ','.join(purge))
333 for p in purge:
334 logger.debug('purging edges for node %s', p)
335 self.nn_edges.delete_all(p)
336 self.nt_edges.delete_all(p)
337 self.nt_all_edges.delete_all(p)
338
339 logger.debug("master refresh: done, updated[%s]", updated)
340 return updated
341
343 try:
344
345 self.bad_nodes_lock.acquire()
346 if node in self.bad_nodes:
347 self.bad_nodes[node].type = BadNode.DEAD
348 else:
349 self.bad_nodes[node] = (BadNode(node, BadNode.DEAD, reason))
350 finally:
351 self.bad_nodes_lock.release()
352
354 """
355 Promotes bad node to 'wonky' status.
356 """
357 try:
358
359 self.bad_nodes_lock.acquire()
360 bad = self.bad_nodes[node]
361 bad.type = BadNode.WONKY
362 finally:
363 self.bad_nodes_lock.release()
364
366 """
367 Retrieve bus info from the node and update nodes and edges as appropriate
368 @param node: node name
369 @type node: str
370 @param api: XML-RPC proxy
371 @type api: ServerProxy
372 @param bad_node: If True, node has connectivity issues and
373 should be treated differently
374 @type bad_node: bool
375 """
376 try:
377 logger.debug("businfo: contacting node [%s] for bus info", node)
378
379
380 if bad_node:
381 self._unmark_bad_node(node)
382
383
384
385 old_timeout = socket.getdefaulttimeout()
386 if bad_node:
387
388 socket.setdefaulttimeout(0.2)
389 else:
390 socket.setdefaulttimeout(1.0)
391
392 code, msg, bus_info = api.getBusInfo(_ROS_NAME)
393
394 socket.setdefaulttimeout(old_timeout)
395 except Exception, e:
396
397 self._mark_bad_node(node, str(e))
398 code = -1
399 msg = traceback.format_exc()
400
401 updated = False
402 if code != 1:
403 logger.error("cannot get stats info from node [%s]: %s", node, msg)
404 else:
405
406 for info in bus_info:
407
408 if len(info) < 5:
409 continue
410
411 connection_id = info[0]
412 dest_id = info[1]
413 direction = info[2]
414 transport = info[3]
415 topic = info[4]
416 if len(info) > 5:
417 connected = info[5]
418 else:
419 connected = True
420
421 if connected and topic.startswith(self.topic_ns):
422
423
424 self.nt_nodes.add(topic_node(topic))
425
426
427 updated = self.nt_edges.add_edges(node, topic_node(topic), direction) or updated
428
429
430 if dest_id.startswith('http://'):
431
432 dest_name = self.uri_node_map.get(dest_id, None)
433 updated = self.nn_edges.add_edges(node, dest_name, direction, topic) or updated
434 else:
435
436 pass
437 return updated
438
440 """
441 Contact node for stats/connectivity information
442 @param node: name of node to contact
443 @type node: str
444 @param bad_node: if True, node has connectivity issues
445 @type bad_node: bool
446 @return: True if node was successfully contacted
447 @rtype bool
448 """
449
450
451 updated = False
452 uri = self._node_uri_refresh(node)
453 try:
454 if uri:
455 api = xmlrpclib.ServerProxy(uri)
456 updated = self._node_refresh_businfo(node, api, bad_node)
457 except KeyError, e:
458 logger.warn('cannot contact node [%s] as it is not in the lookup table'%node)
459 return updated
460
462 try:
463 code, msg, uri = self.master.lookupNode(_ROS_NAME, node)
464 except:
465 code = -1
466 msg = traceback.format_exc()
467 if code != 1:
468
469 print >> sys.stderr, "Unknown node reported: %s"%msg
470 logger.warn("master reported error in node lookup: %s"%msg)
471 return None
472 else:
473
474 self.node_uri_map[node] = uri
475 self.uri_node_map[uri] = node
476 return uri
477
479 """
480 Build self.node_uri_map and self.uri_node_map using master as a
481 lookup service. This will make N requests to the master for N
482 nodes, so this should only be used sparingly
483 """
484 for node in self.nn_nodes:
485 self._node_uri_refresh(node)
486
488 """
489 Update loop for nodes with bad connectivity. We box them separately
490 so that we can maintain the good performance of the normal update loop.
491 Once a node is on the bad list it stays there.
492 """
493 last_node_refresh = self.last_node_refresh
494
495
496 try:
497 self.bad_nodes_lock.acquire()
498
499 update_queue = self.bad_nodes.values()[:]
500 finally:
501 self.bad_nodes_lock.release()
502
503
504 updated = False
505
506 num_nodes = 0
507
508 start_time = time.time()
509 while update_queue:
510
511 next = update_queue.pop()
512
513 if time.time() > (last_node_refresh.get(next, 0.0) + self.node_stale):
514 updated = self._node_refresh(next.name, True) or updated
515
516
517 last_node_refresh[next] = time.time() + (random.random() * self.node_stale / 5.0)
518 num_nodes += 1
519
520
521 time.sleep(0.01)
522 end_time = time.time()
523
524 logger.debug("ROS stats (bad nodes) update took %ss"%(end_time-start_time))
525 return updated
526
528 """
529 Update all the stats. This method may take awhile to complete as it will
530 communicate with all nodes + master.
531 """
532
533 last_node_refresh = self.last_node_refresh
534
535
536 update_queue = None
537
538 work_to_do = True
539
540 updated = False
541
542 num_nodes = 0
543
544 start_time = time.time()
545 while work_to_do:
546
547
548
549
550
551 if time.time() > (self.last_master_refresh + self.master_stale):
552 updated = self._master_refresh()
553 if self.last_master_refresh == 0:
554
555 self._node_uri_refresh_all()
556
557 self.last_master_refresh = time.time()
558
559 else:
560
561 if update_queue is None:
562 update_queue = list(self.nn_nodes)
563
564 elif not update_queue:
565 work_to_do = False
566
567 else:
568
569 next = update_queue.pop()
570
571 if time.time() > (last_node_refresh.get(next, 0.0) + self.node_stale):
572 updated = self._node_refresh(next) or updated
573
574
575 last_node_refresh[next] = time.time() + (random.random() * self.node_stale / 5.0)
576 num_nodes += 1
577
578
579 time.sleep(0.01)
580 end_time = time.time()
581
582 logger.debug("ROS stats update took %ss"%(end_time-start_time))
583 return updated
584