Package rosgraph :: Package impl :: Module graph
[frames] | no frames]

Source Code for Module rosgraph.impl.graph

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id: graph.py 14254 2011-07-12 17:55:04Z kwc $ 
 34   
 35  """ 
 36  Data structures and library for representing ROS Computation Graph state. 
 37  """ 
 38   
 39  import sys 
 40  import time 
 41  import itertools 
 42  import random 
 43  import logging 
 44  import traceback 
 45  import xmlrpclib 
 46  import socket 
 47   
 48  import roslib.scriptutil  
 49   
 50  logger = logging.getLogger('rosgraph.graph') 
 51   
 52  _ROS_NAME = '/rosviz' 
 53   
54 -def topic_node(topic):
55 """ 56 In order to prevent topic/node name aliasing, we have to remap 57 topic node names. Currently we just prepend a space, which is 58 an illegal ROS name and thus not aliased. 59 @return str: topic mapped to a graph node name. 60 """ 61 return ' ' + topic
62 -def node_topic(node):
63 """ 64 Inverse of topic_node 65 @return str: undo topic_node() operation 66 """ 67 return node[1:]
68
69 -class BadNode(object):
70 """ 71 Data structure for storing info about a 'bad' node 72 """ 73 74 ## no connectivity 75 DEAD = 0 76 ## intermittent connectivity 77 WONKY = 1 78
79 - def __init__(self, name, type, reason):
80 """ 81 @param type: DEAD | WONKY 82 @type type: int 83 """ 84 self.name =name 85 self.reason = reason 86 self.type = type
87
88 -class EdgeList(object):
89 """ 90 Data structure for storing Edge instances 91 """ 92 __slots__ = ['edges_by_start', 'edges_by_end']
93 - def __init__(self):
94 # in order to make it easy to purge edges, we double-index them 95 self.edges_by_start = {} 96 self.edges_by_end = {}
97
98 - def __iter__(self):
99 return itertools.chain(*[v for v in self.edges_by_start.itervalues()])
100
101 - def has(self, edge):
102 """ 103 @return: True if edge is in edge list 104 @rtype: bool 105 """ 106 key = edge.key 107 return key in self.edges_by_start and \ 108 edge in self.edges_by_start[key]
109
110 - def add(self, edge):
111 """ 112 Add an edge to our internal representation. not multi-thread safe 113 @param edge: edge to add 114 @type edge: Edge 115 """ 116 # see note in __init__ 117 def update_map(map, key, edge): 118 if key in map: 119 l = map[key] 120 if not edge in l: 121 l.append(edge) 122 return True 123 else: 124 return False 125 else: 126 map[key] = [edge] 127 return True
128 129 updated = update_map(self.edges_by_start, edge.key, edge) 130 updated = update_map(self.edges_by_end, edge.rkey, edge) or updated 131 return updated
132
133 - def add_edges(self, start, dest, direction, label=''):
134 """ 135 Create Edge instances for args and add resulting edges to edge 136 list. Convenience method to avoid repetitve logging, etc... 137 @param edge_list: data structure to add edge to 138 @type edge_list: EdgeList 139 @param start: name of start node. If None, warning will be logged and add fails 140 @type start: str 141 @param dest: name of start node. If None, warning will be logged and add fails 142 @type dest: str 143 @param direction: direction string (i/o/b) 144 @type direction: str 145 @return: True if update occured 146 @rtype: bool 147 """ 148 149 # the warnings should generally be temporary, occuring of the 150 # master/node information becomes stale while we are still 151 # doing an update 152 updated = False 153 if not start: 154 logger.warn("cannot add edge: cannot map start [%s] to a node name", start) 155 elif not dest: 156 logger.warn("cannot add edge: cannot map dest [%s] to a node name", dest) 157 else: 158 for args in edge_args(start, dest, direction, label): 159 updated = self.add(Edge(*args)) or updated 160 return updated
161
162 - def delete_all(self, node):
163 """ 164 Delete all edges that start or end at node 165 @param node: name of node 166 @type node: str 167 """ 168 def matching(map, pref): 169 return [map[k] for k in map.iterkeys() if k.startswith(pref)]
170 171 pref = node+"|" 172 edge_lists = matching(self.edges_by_start, pref) + matching(self.edges_by_start, pref) 173 for el in edge_lists: 174 for e in el: 175 self.delete(e) 176
177 - def delete(self, edge):
178 # see note in __init__ 179 def update_map(map, key, edge): 180 if key in map: 181 edges = map[key] 182 if edge in edges: 183 edges.remove(edge) 184 return True
185 update_map(self.edges_by_start, edge.key, edge) 186 update_map(self.edges_by_end, edge.rkey, edge) 187
188 -class Edge(object):
189 """ 190 Data structure for representing ROS node graph edge 191 """ 192 193 __slots__ = ['start', 'end', 'label', 'key', 'rkey']
194 - def __init__(self, start, end, label=''):
195 self.start = start 196 self.end = end 197 self.label = label 198 self.key = "%s|%s"%(self.start, self.label) 199 # reverse key, indexed from end 200 self.rkey = "%s|%s"%(self.end, self.label)
201
202 - def __ne__(self, other):
203 return self.start != other.start or self.end != other.end
204 - def __str__(self):
205 return "%s->%s"%(self.start, self.end)
206 - def __eq__(self, other):
207 return self.start == other.start and self.end == other.end
208
209 -def edge_args(start, dest, direction, label):
210 """ 211 compute argument ordering for Edge constructor based on direction flag 212 @param direction str: 'i', 'o', or 'b' (in/out/bidir) relative to \a start 213 @param start str: name of starting node 214 @param start dest: name of destination node 215 """ 216 edge_args = [] 217 if direction in ['o', 'b']: 218 edge_args.append((start, dest, label)) 219 if direction in ['i', 'b']: 220 edge_args.append((dest, start, label)) 221 return edge_args
222 223
224 -class Graph(object):
225 """ 226 Utility class for polling ROS statistics from running ROS graph. 227 Not multi-thread-safe 228 """ 229
230 - def __init__(self, node_ns='/', topic_ns='/'):
231 self.master = roslib.scriptutil.get_master() 232 233 self.node_ns = node_ns or '/' 234 self.topic_ns = topic_ns or '/' 235 236 # ROS nodes 237 self.nn_nodes = set([]) 238 # ROS topic nodes 239 self.nt_nodes = set([]) 240 241 # ROS nodes that aren't responding quickly 242 self.bad_nodes = {} 243 import threading 244 self.bad_nodes_lock = threading.Lock() 245 246 # ROS services 247 self.srvs = set([]) 248 # ROS node->node transport connections 249 self.nn_edges = EdgeList() 250 # ROS node->topic connections 251 self.nt_edges = EdgeList() 252 # ROS all node->topic connections, including empty 253 self.nt_all_edges = EdgeList() 254 255 # node names to URI map 256 self.node_uri_map = {} # { node_name_str : uri_str } 257 # reverse map URIs to node names 258 self.uri_node_map = {} # { uri_str : node_name_str } 259 260 # time we last contacted master 261 self.last_master_refresh = 0 262 self.last_node_refresh = {} 263 264 # time we last communicated with master 265 # seconds until master data is considered stale 266 self.master_stale = 5.0 267 # time we last communicated with node 268 # seconds until node data is considered stale 269 self.node_stale = 5.0 #seconds
270 271
272 - def set_master_stale(self, stale_secs):
273 """ 274 @param stale_secs: seconds that data is considered fresh 275 @type stale_secs: double 276 """ 277 self.master_stale = stale_secs
278
279 - def set_node_stale(self, stale_secs):
280 """ 281 @param stale_secs: seconds that data is considered fresh 282 @type stale_secs: double 283 """ 284 self.node_stale = stale_secs
285
286 - def _master_refresh(self):
287 """ 288 @return: True if nodes information was updated 289 @rtype: bool 290 """ 291 logger.debug("master refresh: starting") 292 updated = False 293 #TODO: getSystemState probably needs to return URIs instead 294 try: 295 code, msg, val = self.master.getSystemState(_ROS_NAME) 296 except socket.error, msg: 297 print >> sys.stderr, "Unable to contact master", msg 298 logger.error("unable to contact master: %s", msg) 299 return False 300 if code != 1: 301 print >> sys.stderr, "Unable to contact master: %s"%msg 302 logger.error("unable to contact master: %s", msg) 303 updated = False 304 else: 305 pubs, subs, srvs = val 306 307 nodes = [] 308 nt_all_edges = self.nt_all_edges 309 nt_nodes = self.nt_nodes 310 for state, direction in ((pubs, 'o'), (subs, 'i')): 311 for topic, l in state: 312 if topic.startswith(self.topic_ns): 313 nodes.extend([n for n in l if n.startswith(self.node_ns)]) 314 nt_nodes.add(topic_node(topic)) 315 for node in l: 316 updated = nt_all_edges.add_edges( 317 node, topic_node(topic), direction) or updated 318 319 nodes = set(nodes) 320 321 srvs = set([s for s, _ in srvs]) 322 purge = None 323 if nodes ^ self.nn_nodes: 324 purge = self.nn_nodes - nodes 325 self.nn_nodes = nodes 326 updated = True 327 if srvs ^ self.srvs: 328 self.srvs = srvs 329 updated = True 330 331 if purge: 332 logger.debug("following nodes and related edges will be purged: %s", ','.join(purge)) 333 for p in purge: 334 logger.debug('purging edges for node %s', p) 335 self.nn_edges.delete_all(p) 336 self.nt_edges.delete_all(p) 337 self.nt_all_edges.delete_all(p) 338 339 logger.debug("master refresh: done, updated[%s]", updated) 340 return updated
341
342 - def _mark_bad_node(self, node, reason):
343 try: 344 # bad nodes are updated in a separate thread, so lock 345 self.bad_nodes_lock.acquire() 346 if node in self.bad_nodes: 347 self.bad_nodes[node].type = BadNode.DEAD 348 else: 349 self.bad_nodes[node] = (BadNode(node, BadNode.DEAD, reason)) 350 finally: 351 self.bad_nodes_lock.release()
352
353 - def _unmark_bad_node(self, node, reason):
354 """ 355 Promotes bad node to 'wonky' status. 356 """ 357 try: 358 # bad nodes are updated in a separate thread, so lock 359 self.bad_nodes_lock.acquire() 360 bad = self.bad_nodes[node] 361 bad.type = BadNode.WONKY 362 finally: 363 self.bad_nodes_lock.release()
364
365 - def _node_refresh_businfo(self, node, api, bad_node=False):
366 """ 367 Retrieve bus info from the node and update nodes and edges as appropriate 368 @param node: node name 369 @type node: str 370 @param api: XML-RPC proxy 371 @type api: ServerProxy 372 @param bad_node: If True, node has connectivity issues and 373 should be treated differently 374 @type bad_node: bool 375 """ 376 try: 377 logger.debug("businfo: contacting node [%s] for bus info", node) 378 379 # unmark bad node, though it stays on the bad list 380 if bad_node: 381 self._unmark_bad_node(node) 382 # Lower the socket timeout as we cannot abide by slow HTTP timeouts. 383 # If a node cannot meet this timeout, it goes on the bad list 384 # TODO: override transport instead. 385 old_timeout = socket.getdefaulttimeout() 386 if bad_node: 387 #even stricter timeout for bad_nodes right now 388 socket.setdefaulttimeout(0.2) 389 else: 390 socket.setdefaulttimeout(1.0) 391 392 code, msg, bus_info = api.getBusInfo(_ROS_NAME) 393 394 socket.setdefaulttimeout(old_timeout) 395 except Exception, e: 396 # node is (still) bad 397 self._mark_bad_node(node, str(e)) 398 code = -1 399 msg = traceback.format_exc() 400 401 updated = False 402 if code != 1: 403 logger.error("cannot get stats info from node [%s]: %s", node, msg) 404 else: 405 # [[connectionId1, destinationId1, direction1, transport1, ...]... ] 406 for info in bus_info: 407 # #3579 bad node, ignore 408 if len(info) < 5: 409 continue 410 411 connection_id = info[0] 412 dest_id = info[1] 413 direction = info[2] 414 transport = info[3] 415 topic = info[4] 416 if len(info) > 5: 417 connected = info[5] 418 else: 419 connected = True #backwards compatibility 420 421 if connected and topic.startswith(self.topic_ns): 422 # blindly add as we will be able to catch state change via edges. 423 # this currently means we don't cleanup topics 424 self.nt_nodes.add(topic_node(topic)) 425 426 # update node->topic->node graph edges 427 updated = self.nt_edges.add_edges(node, topic_node(topic), direction) or updated 428 429 # update node->node graph edges 430 if dest_id.startswith('http://'): 431 #print "FOUND URI", dest_id 432 dest_name = self.uri_node_map.get(dest_id, None) 433 updated = self.nn_edges.add_edges(node, dest_name, direction, topic) or updated 434 else: 435 #TODO: anyting to do here? 436 pass 437 return updated
438
439 - def _node_refresh(self, node, bad_node=False):
440 """ 441 Contact node for stats/connectivity information 442 @param node: name of node to contact 443 @type node: str 444 @param bad_node: if True, node has connectivity issues 445 @type bad_node: bool 446 @return: True if node was successfully contacted 447 @rtype bool 448 """ 449 # TODO: I'd like for master to provide this information in 450 # getSystemState() instead to prevent the extra connection per node 451 updated = False 452 uri = self._node_uri_refresh(node) 453 try: 454 if uri: 455 api = xmlrpclib.ServerProxy(uri) 456 updated = self._node_refresh_businfo(node, api, bad_node) 457 except KeyError, e: 458 logger.warn('cannot contact node [%s] as it is not in the lookup table'%node) 459 return updated
460
461 - def _node_uri_refresh(self, node):
462 try: 463 code, msg, uri = self.master.lookupNode(_ROS_NAME, node) 464 except: 465 code = -1 466 msg = traceback.format_exc() 467 if code != 1: 468 #TODO:REMOVE 469 print >> sys.stderr, "Unknown node reported: %s"%msg 470 logger.warn("master reported error in node lookup: %s"%msg) 471 return None 472 else: 473 # update maps 474 self.node_uri_map[node] = uri 475 self.uri_node_map[uri] = node 476 return uri
477
478 - def _node_uri_refresh_all(self):
479 """ 480 Build self.node_uri_map and self.uri_node_map using master as a 481 lookup service. This will make N requests to the master for N 482 nodes, so this should only be used sparingly 483 """ 484 for node in self.nn_nodes: 485 self._node_uri_refresh(node)
486
487 - def bad_update(self):
488 """ 489 Update loop for nodes with bad connectivity. We box them separately 490 so that we can maintain the good performance of the normal update loop. 491 Once a node is on the bad list it stays there. 492 """ 493 last_node_refresh = self.last_node_refresh 494 495 # nodes left to check 496 try: 497 self.bad_nodes_lock.acquire() 498 # make copy due to multithreading 499 update_queue = self.bad_nodes.values()[:] 500 finally: 501 self.bad_nodes_lock.release() 502 503 # return value. True if new data differs from old 504 updated = False 505 # number of nodes we checked 506 num_nodes = 0 507 508 start_time = time.time() 509 while update_queue: 510 # figure out the next node to contact 511 next = update_queue.pop() 512 # rate limit talking to any particular node 513 if time.time() > (last_node_refresh.get(next, 0.0) + self.node_stale): 514 updated = self._node_refresh(next.name, True) or updated 515 # include in random offset (max 1/5th normal update interval) 516 # to help spread out updates 517 last_node_refresh[next] = time.time() + (random.random() * self.node_stale / 5.0) 518 num_nodes += 1 519 520 # small yield to keep from torquing the processor 521 time.sleep(0.01) 522 end_time = time.time() 523 #print "Update (bad nodes) took %ss for %s nodes"%((end_time-start_time), num_nodes) 524 logger.debug("ROS stats (bad nodes) update took %ss"%(end_time-start_time)) 525 return updated
526
527 - def update(self):
528 """ 529 Update all the stats. This method may take awhile to complete as it will 530 communicate with all nodes + master. 531 """ 532 533 last_node_refresh = self.last_node_refresh 534 535 # nodes left to check 536 update_queue = None 537 # True if there are still more stats to fetch this cycle 538 work_to_do = True 539 # return value. True if new data differs from old 540 updated = False 541 # number of nodes we checked 542 num_nodes = 0 543 544 start_time = time.time() 545 while work_to_do: 546 547 # each time through the loop try to talk to either the master 548 # or a node. stop when we have talked to everybody. 549 550 # get a new node list from the master 551 if time.time() > (self.last_master_refresh + self.master_stale): 552 updated = self._master_refresh() 553 if self.last_master_refresh == 0: 554 # first time we contact the master, also refresh our full lookup tables 555 self._node_uri_refresh_all() 556 557 self.last_master_refresh = time.time() 558 # contact the nodes for stats 559 else: 560 # initialize update_queue based on most current nodes list 561 if update_queue is None: 562 update_queue = list(self.nn_nodes) 563 # no nodes left to contact 564 elif not update_queue: 565 work_to_do = False 566 # contact next node 567 else: 568 # figure out the next node to contact 569 next = update_queue.pop() 570 # rate limit talking to any particular node 571 if time.time() > (last_node_refresh.get(next, 0.0) + self.node_stale): 572 updated = self._node_refresh(next) or updated 573 # include in random offset (max 1/5th normal update interval) 574 # to help spread out updates 575 last_node_refresh[next] = time.time() + (random.random() * self.node_stale / 5.0) 576 num_nodes += 1 577 578 # small yield to keep from torquing the processor 579 time.sleep(0.01) 580 end_time = time.time() 581 #print "Update took %ss for %s nodes"%((end_time-start_time), num_nodes) 582 logger.debug("ROS stats update took %ss"%(end_time-start_time)) 583 return updated
584