Package rosgraph :: Package impl :: Module graph
[frames] | no frames]

Source Code for Module rosgraph.impl.graph

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  from __future__ import print_function 
 36   
 37  """ 
 38  Data structures and library for representing ROS Computation Graph state. 
 39  """ 
 40   
 41  import sys 
 42  import time 
 43  import itertools 
 44  import random 
 45  import logging 
 46  import traceback 
 47  try: 
 48      from xmlrpc.client import ServerProxy 
 49  except ImportError: 
 50      from xmlrpclib import ServerProxy 
 51  import socket 
 52   
 53  import rosgraph.masterapi 
 54   
 55  logger = logging.getLogger('rosgraph.graph') 
 56   
 57  _ROS_NAME = '/rosviz' 
 58   
59 -def topic_node(topic):
60 """ 61 In order to prevent topic/node name aliasing, we have to remap 62 topic node names. Currently we just prepend a space, which is 63 an illegal ROS name and thus not aliased. 64 @return str: topic mapped to a graph node name. 65 """ 66 return ' ' + topic
67 -def node_topic(node):
68 """ 69 Inverse of topic_node 70 @return str: undo topic_node() operation 71 """ 72 return node[1:]
73
74 -class BadNode(object):
75 """ 76 Data structure for storing info about a 'bad' node 77 """ 78 79 ## no connectivity 80 DEAD = 0 81 ## intermittent connectivity 82 WONKY = 1 83
84 - def __init__(self, name, type, reason):
85 """ 86 @param type: DEAD | WONKY 87 @type type: int 88 """ 89 self.name =name 90 self.reason = reason 91 self.type = type
92
93 -class EdgeList(object):
94 """ 95 Data structure for storing Edge instances 96 """ 97 __slots__ = ['edges_by_start', 'edges_by_end']
98 - def __init__(self):
99 # in order to make it easy to purge edges, we double-index them 100 self.edges_by_start = {} 101 self.edges_by_end = {}
102
103 - def __iter__(self):
104 return itertools.chain(*[v for v in self.edges_by_start.values()])
105
106 - def has(self, edge):
107 return edge in self
108
109 - def __contains__(self, edge):
110 """ 111 @return: True if edge is in edge list 112 @rtype: bool 113 """ 114 key = edge.key 115 return key in self.edges_by_start and \ 116 edge in self.edges_by_start[key]
117
118 - def add(self, edge):
119 """ 120 Add an edge to our internal representation. not multi-thread safe 121 @param edge: edge to add 122 @type edge: Edge 123 """ 124 # see note in __init__ 125 def update_map(map, key, edge): 126 if key in map: 127 l = map[key] 128 if not edge in l: 129 l.append(edge) 130 return True 131 else: 132 return False 133 else: 134 map[key] = [edge] 135 return True
136 137 updated = update_map(self.edges_by_start, edge.key, edge) 138 updated = update_map(self.edges_by_end, edge.rkey, edge) or updated 139 return updated
140
141 - def add_edges(self, start, dest, direction, label=''):
142 """ 143 Create Edge instances for args and add resulting edges to edge 144 list. Convenience method to avoid repetitve logging, etc... 145 @param edge_list: data structure to add edge to 146 @type edge_list: EdgeList 147 @param start: name of start node. If None, warning will be logged and add fails 148 @type start: str 149 @param dest: name of start node. If None, warning will be logged and add fails 150 @type dest: str 151 @param direction: direction string (i/o/b) 152 @type direction: str 153 @return: True if update occured 154 @rtype: bool 155 """ 156 157 # the warnings should generally be temporary, occuring of the 158 # master/node information becomes stale while we are still 159 # doing an update 160 updated = False 161 if not start: 162 logger.warn("cannot add edge: cannot map start [%s] to a node name", start) 163 elif not dest: 164 logger.warn("cannot add edge: cannot map dest [%s] to a node name", dest) 165 else: 166 for args in edge_args(start, dest, direction, label): 167 updated = self.add(Edge(*args)) or updated 168 return updated
169
170 - def delete_all(self, node):
171 """ 172 Delete all edges that start or end at node 173 @param node: name of node 174 @type node: str 175 """ 176 def matching(map, pref): 177 return [map[k] for k in map.keys() if k.startswith(pref)]
178 179 pref = node+"|" 180 edge_lists = matching(self.edges_by_start, pref) + matching(self.edges_by_end, pref) 181 for el in edge_lists: 182 for e in el: 183 self.delete(e) 184
185 - def delete(self, edge):
186 # see note in __init__ 187 def update_map(map, key, edge): 188 if key in map: 189 edges = map[key] 190 if edge in edges: 191 edges.remove(edge) 192 return True
193 update_map(self.edges_by_start, edge.key, edge) 194 update_map(self.edges_by_end, edge.rkey, edge) 195
196 -class Edge(object):
197 """ 198 Data structure for representing ROS node graph edge 199 """ 200 201 __slots__ = ['start', 'end', 'label', 'key', 'rkey']
202 - def __init__(self, start, end, label=''):
203 self.start = start 204 self.end = end 205 self.label = label 206 self.key = "%s|%s"%(self.start, self.label) 207 # reverse key, indexed from end 208 self.rkey = "%s|%s"%(self.end, self.label)
209
210 - def __ne__(self, other):
211 return self.start != other.start or self.end != other.end
212 - def __str__(self):
213 return "%s->%s"%(self.start, self.end)
214 - def __eq__(self, other):
215 return self.start == other.start and self.end == other.end
216
217 -def edge_args(start, dest, direction, label):
218 """ 219 compute argument ordering for Edge constructor based on direction flag 220 @param direction str: 'i', 'o', or 'b' (in/out/bidir) relative to \a start 221 @param start str: name of starting node 222 @param start dest: name of destination node 223 """ 224 edge_args = [] 225 if direction in ['o', 'b']: 226 edge_args.append((start, dest, label)) 227 if direction in ['i', 'b']: 228 edge_args.append((dest, start, label)) 229 return edge_args
230 231
232 -class Graph(object):
233 """ 234 Utility class for polling ROS statistics from running ROS graph. 235 Not multi-thread-safe 236 """ 237
238 - def __init__(self, node_ns='/', topic_ns='/'):
239 self.master = rosgraph.masterapi.Master(_ROS_NAME) 240 241 self.node_ns = node_ns or '/' 242 self.topic_ns = topic_ns or '/' 243 244 # ROS nodes 245 self.nn_nodes = set([]) 246 # ROS topic nodes 247 self.nt_nodes = set([]) 248 249 # ROS nodes that aren't responding quickly 250 self.bad_nodes = {} 251 import threading 252 self.bad_nodes_lock = threading.Lock() 253 254 # ROS services 255 self.srvs = set([]) 256 # ROS node->node transport connections 257 self.nn_edges = EdgeList() 258 # ROS node->topic connections 259 self.nt_edges = EdgeList() 260 # ROS all node->topic connections, including empty 261 self.nt_all_edges = EdgeList() 262 263 # node names to URI map 264 self.node_uri_map = {} # { node_name_str : uri_str } 265 # reverse map URIs to node names 266 self.uri_node_map = {} # { uri_str : node_name_str } 267 268 # time we last contacted master 269 self.last_master_refresh = 0 270 self.last_node_refresh = {} 271 272 # time we last communicated with master 273 # seconds until master data is considered stale 274 self.master_stale = 5.0 275 # time we last communicated with node 276 # seconds until node data is considered stale 277 self.node_stale = 5.0 #seconds
278 279
280 - def set_master_stale(self, stale_secs):
281 """ 282 @param stale_secs: seconds that data is considered fresh 283 @type stale_secs: double 284 """ 285 self.master_stale = stale_secs
286
287 - def set_node_stale(self, stale_secs):
288 """ 289 @param stale_secs: seconds that data is considered fresh 290 @type stale_secs: double 291 """ 292 self.node_stale = stale_secs
293
294 - def _master_refresh(self):
295 """ 296 @return: True if nodes information was updated 297 @rtype: bool 298 """ 299 logger.debug("master refresh: starting") 300 updated = False 301 try: 302 val = self.master.getSystemState() 303 except rosgraph.masterapi.MasterException as e: 304 print("Unable to contact master", str(e), file=sys.stderr) 305 logger.error("unable to contact master: %s", str(e)) 306 return False 307 308 pubs, subs, srvs = val 309 310 nodes = [] 311 nt_all_edges = self.nt_all_edges 312 nt_nodes = self.nt_nodes 313 for state, direction in ((pubs, 'o'), (subs, 'i')): 314 for topic, l in state: 315 if topic.startswith(self.topic_ns): 316 nodes.extend([n for n in l if n.startswith(self.node_ns)]) 317 nt_nodes.add(topic_node(topic)) 318 for node in l: 319 updated = nt_all_edges.add_edges( 320 node, topic_node(topic), direction) or updated 321 322 nodes = set(nodes) 323 324 srvs = set([s for s, _ in srvs]) 325 purge = None 326 if nodes ^ self.nn_nodes: 327 purge = self.nn_nodes - nodes 328 self.nn_nodes = nodes 329 updated = True 330 if srvs ^ self.srvs: 331 self.srvs = srvs 332 updated = True 333 334 if purge: 335 logger.debug("following nodes and related edges will be purged: %s", ','.join(purge)) 336 for p in purge: 337 logger.debug('purging edges for node %s', p) 338 self.nn_edges.delete_all(p) 339 self.nt_edges.delete_all(p) 340 self.nt_all_edges.delete_all(p) 341 342 logger.debug("master refresh: done, updated[%s]", updated) 343 return updated
344
345 - def _mark_bad_node(self, node, reason):
346 try: 347 # bad nodes are updated in a separate thread, so lock 348 self.bad_nodes_lock.acquire() 349 if node in self.bad_nodes: 350 self.bad_nodes[node].type = BadNode.DEAD 351 else: 352 self.bad_nodes[node] = (BadNode(node, BadNode.DEAD, reason)) 353 finally: 354 self.bad_nodes_lock.release()
355
356 - def _unmark_bad_node(self, node, reason):
357 """ 358 Promotes bad node to 'wonky' status. 359 """ 360 try: 361 # bad nodes are updated in a separate thread, so lock 362 self.bad_nodes_lock.acquire() 363 bad = self.bad_nodes[node] 364 bad.type = BadNode.WONKY 365 finally: 366 self.bad_nodes_lock.release()
367
368 - def _node_refresh_businfo(self, node, api, bad_node=False):
369 """ 370 Retrieve bus info from the node and update nodes and edges as appropriate 371 @param node: node name 372 @type node: str 373 @param api: XML-RPC proxy 374 @type api: ServerProxy 375 @param bad_node: If True, node has connectivity issues and 376 should be treated differently 377 @type bad_node: bool 378 """ 379 try: 380 logger.debug("businfo: contacting node [%s] for bus info", node) 381 382 # unmark bad node, though it stays on the bad list 383 if bad_node: 384 self._unmark_bad_node(node) 385 # Lower the socket timeout as we cannot abide by slow HTTP timeouts. 386 # If a node cannot meet this timeout, it goes on the bad list 387 # TODO: override transport instead. 388 old_timeout = socket.getdefaulttimeout() 389 if bad_node: 390 #even stricter timeout for bad_nodes right now 391 socket.setdefaulttimeout(0.2) 392 else: 393 socket.setdefaulttimeout(1.0) 394 395 code, msg, bus_info = api.getBusInfo(_ROS_NAME) 396 397 socket.setdefaulttimeout(old_timeout) 398 except Exception as e: 399 # node is (still) bad 400 self._mark_bad_node(node, str(e)) 401 code = -1 402 msg = traceback.format_exc() 403 404 updated = False 405 if code != 1: 406 logger.error("cannot get stats info from node [%s]: %s", node, msg) 407 else: 408 # [[connectionId1, destinationId1, direction1, transport1, ...]... ] 409 for info in bus_info: 410 # #3579 bad node, ignore 411 if len(info) < 5: 412 continue 413 414 connection_id = info[0] 415 dest_id = info[1] 416 direction = info[2] 417 transport = info[3] 418 topic = info[4] 419 if len(info) > 5: 420 connected = info[5] 421 else: 422 connected = True #backwards compatibility 423 424 if connected and topic.startswith(self.topic_ns): 425 # blindly add as we will be able to catch state change via edges. 426 # this currently means we don't cleanup topics 427 self.nt_nodes.add(topic_node(topic)) 428 429 # update node->topic->node graph edges 430 updated = self.nt_edges.add_edges(node, topic_node(topic), direction) or updated 431 432 # update node->node graph edges 433 if dest_id.startswith('http://'): 434 #print("FOUND URI", dest_id) 435 dest_name = self.uri_node_map.get(dest_id, None) 436 updated = self.nn_edges.add_edges(node, dest_name, direction, topic) or updated 437 else: 438 #TODO: anyting to do here? 439 pass 440 return updated
441
442 - def _node_refresh(self, node, bad_node=False):
443 """ 444 Contact node for stats/connectivity information 445 @param node: name of node to contact 446 @type node: str 447 @param bad_node: if True, node has connectivity issues 448 @type bad_node: bool 449 @return: True if node was successfully contacted 450 @rtype bool 451 """ 452 # TODO: I'd like for master to provide this information in 453 # getSystemState() instead to prevent the extra connection per node 454 updated = False 455 uri = self._node_uri_refresh(node) 456 try: 457 if uri: 458 api = ServerProxy(uri) 459 updated = self._node_refresh_businfo(node, api, bad_node) 460 except KeyError as e: 461 logger.warn('cannot contact node [%s] as it is not in the lookup table'%node) 462 return updated
463
464 - def _node_uri_refresh(self, node):
465 try: 466 uri = self.master.lookupNode(node) 467 except: 468 msg = traceback.format_exc() 469 logger.warn("master reported error in node lookup: %s"%msg) 470 return None 471 # update maps 472 self.node_uri_map[node] = uri 473 self.uri_node_map[uri] = node 474 return uri
475
476 - def _node_uri_refresh_all(self):
477 """ 478 Build self.node_uri_map and self.uri_node_map using master as a 479 lookup service. This will make N requests to the master for N 480 nodes, so this should only be used sparingly 481 """ 482 for node in self.nn_nodes: 483 self._node_uri_refresh(node)
484
485 - def bad_update(self):
486 """ 487 Update loop for nodes with bad connectivity. We box them separately 488 so that we can maintain the good performance of the normal update loop. 489 Once a node is on the bad list it stays there. 490 """ 491 last_node_refresh = self.last_node_refresh 492 493 # nodes left to check 494 try: 495 self.bad_nodes_lock.acquire() 496 # make copy due to multithreading 497 update_queue = self.bad_nodes.values()[:] 498 finally: 499 self.bad_nodes_lock.release() 500 501 # return value. True if new data differs from old 502 updated = False 503 # number of nodes we checked 504 num_nodes = 0 505 506 start_time = time.time() 507 while update_queue: 508 # figure out the next node to contact 509 next = update_queue.pop() 510 # rate limit talking to any particular node 511 if time.time() > (last_node_refresh.get(next, 0.0) + self.node_stale): 512 updated = self._node_refresh(next.name, True) or updated 513 # include in random offset (max 1/5th normal update interval) 514 # to help spread out updates 515 last_node_refresh[next] = time.time() + (random.random() * self.node_stale / 5.0) 516 num_nodes += 1 517 518 # small yield to keep from torquing the processor 519 time.sleep(0.01) 520 end_time = time.time() 521 #print("Update (bad nodes) took %ss for %s nodes"%((end_time-start_time), num_nodes)) 522 logger.debug("ROS stats (bad nodes) update took %ss"%(end_time-start_time)) 523 return updated
524
525 - def update(self):
526 """ 527 Update all the stats. This method may take awhile to complete as it will 528 communicate with all nodes + master. 529 """ 530 531 last_node_refresh = self.last_node_refresh 532 533 # nodes left to check 534 update_queue = None 535 # True if there are still more stats to fetch this cycle 536 work_to_do = True 537 # return value. True if new data differs from old 538 updated = False 539 # number of nodes we checked 540 num_nodes = 0 541 542 start_time = time.time() 543 while work_to_do: 544 545 # each time through the loop try to talk to either the master 546 # or a node. stop when we have talked to everybody. 547 548 # get a new node list from the master 549 if time.time() > (self.last_master_refresh + self.master_stale): 550 updated = self._master_refresh() 551 if self.last_master_refresh == 0: 552 # first time we contact the master, also refresh our full lookup tables 553 self._node_uri_refresh_all() 554 555 self.last_master_refresh = time.time() 556 # contact the nodes for stats 557 else: 558 # initialize update_queue based on most current nodes list 559 if update_queue is None: 560 update_queue = list(self.nn_nodes) 561 # no nodes left to contact 562 elif not update_queue: 563 work_to_do = False 564 # contact next node 565 else: 566 # figure out the next node to contact 567 next = update_queue.pop() 568 # rate limit talking to any particular node 569 if time.time() > (last_node_refresh.get(next, 0.0) + self.node_stale): 570 updated = self._node_refresh(next) or updated 571 # include in random offset (max 1/5th normal update interval) 572 # to help spread out updates 573 last_node_refresh[next] = time.time() + (random.random() * self.node_stale / 5.0) 574 num_nodes += 1 575 576 # small yield to keep from torquing the processor 577 time.sleep(0.01) 578 end_time = time.time() 579 #print("Update took %ss for %s nodes"%((end_time-start_time), num_nodes)) 580 logger.debug("ROS stats update took %ss"%(end_time-start_time)) 581 return updated
582