Package rosgraph :: Package impl :: Module graph
[frames] | no frames]

Source Code for Module rosgraph.impl.graph

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2008, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  from __future__ import print_function 
 36   
 37  """ 
 38  Data structures and library for representing ROS Computation Graph state. 
 39  """ 
 40   
 41  import sys 
 42  import time 
 43  import itertools 
 44  import random 
 45  import logging 
 46  import traceback 
 47  try: 
 48      from xmlrpc.client import ServerProxy 
 49  except ImportError: 
 50      from xmlrpclib import ServerProxy 
 51  import socket 
 52   
 53  import rosgraph.masterapi 
 54   
 55  logger = logging.getLogger('rosgraph.graph') 
 56   
 57  _ROS_NAME = '/rosviz' 
 58   
59 -def topic_node(topic):
60 """ 61 In order to prevent topic/node name aliasing, we have to remap 62 topic node names. Currently we just prepend a space, which is 63 an illegal ROS name and thus not aliased. 64 @return str: topic mapped to a graph node name. 65 """ 66 return ' ' + topic
67 -def node_topic(node):
68 """ 69 Inverse of topic_node 70 @return str: undo topic_node() operation 71 """ 72 return node[1:]
73
74 -class BadNode(object):
75 """ 76 Data structure for storing info about a 'bad' node 77 """ 78 79 ## no connectivity 80 DEAD = 0 81 ## intermittent connectivity 82 WONKY = 1 83
84 - def __init__(self, name, type, reason):
85 """ 86 @param type: DEAD | WONKY 87 @type type: int 88 """ 89 self.name =name 90 self.reason = reason 91 self.type = type
92
93 -class EdgeList(object):
94 """ 95 Data structure for storing Edge instances 96 """ 97 __slots__ = ['edges_by_start', 'edges_by_end']
98 - def __init__(self):
99 # in order to make it easy to purge edges, we double-index them 100 self.edges_by_start = {} 101 self.edges_by_end = {}
102
103 - def __iter__(self):
104 return itertools.chain(*[v for v in self.edges_by_start.values()])
105
106 - def has(self, edge):
107 """ 108 @return: True if edge is in edge list 109 @rtype: bool 110 """ 111 key = edge.key 112 return key in self.edges_by_start and \ 113 edge in self.edges_by_start[key]
114
115 - def add(self, edge):
116 """ 117 Add an edge to our internal representation. not multi-thread safe 118 @param edge: edge to add 119 @type edge: Edge 120 """ 121 # see note in __init__ 122 def update_map(map, key, edge): 123 if key in map: 124 l = map[key] 125 if not edge in l: 126 l.append(edge) 127 return True 128 else: 129 return False 130 else: 131 map[key] = [edge] 132 return True
133 134 updated = update_map(self.edges_by_start, edge.key, edge) 135 updated = update_map(self.edges_by_end, edge.rkey, edge) or updated 136 return updated
137
138 - def add_edges(self, start, dest, direction, label=''):
139 """ 140 Create Edge instances for args and add resulting edges to edge 141 list. Convenience method to avoid repetitve logging, etc... 142 @param edge_list: data structure to add edge to 143 @type edge_list: EdgeList 144 @param start: name of start node. If None, warning will be logged and add fails 145 @type start: str 146 @param dest: name of start node. If None, warning will be logged and add fails 147 @type dest: str 148 @param direction: direction string (i/o/b) 149 @type direction: str 150 @return: True if update occured 151 @rtype: bool 152 """ 153 154 # the warnings should generally be temporary, occuring of the 155 # master/node information becomes stale while we are still 156 # doing an update 157 updated = False 158 if not start: 159 logger.warn("cannot add edge: cannot map start [%s] to a node name", start) 160 elif not dest: 161 logger.warn("cannot add edge: cannot map dest [%s] to a node name", dest) 162 else: 163 for args in edge_args(start, dest, direction, label): 164 updated = self.add(Edge(*args)) or updated 165 return updated
166
167 - def delete_all(self, node):
168 """ 169 Delete all edges that start or end at node 170 @param node: name of node 171 @type node: str 172 """ 173 def matching(map, pref): 174 return [map[k] for k in map.keys() if k.startswith(pref)]
175 176 pref = node+"|" 177 edge_lists = matching(self.edges_by_start, pref) + matching(self.edges_by_end, pref) 178 for el in edge_lists: 179 for e in el: 180 self.delete(e) 181
182 - def delete(self, edge):
183 # see note in __init__ 184 def update_map(map, key, edge): 185 if key in map: 186 edges = map[key] 187 if edge in edges: 188 edges.remove(edge) 189 return True
190 update_map(self.edges_by_start, edge.key, edge) 191 update_map(self.edges_by_end, edge.rkey, edge) 192
193 -class Edge(object):
194 """ 195 Data structure for representing ROS node graph edge 196 """ 197 198 __slots__ = ['start', 'end', 'label', 'key', 'rkey']
199 - def __init__(self, start, end, label=''):
200 self.start = start 201 self.end = end 202 self.label = label 203 self.key = "%s|%s"%(self.start, self.label) 204 # reverse key, indexed from end 205 self.rkey = "%s|%s"%(self.end, self.label)
206
207 - def __ne__(self, other):
208 return self.start != other.start or self.end != other.end
209 - def __str__(self):
210 return "%s->%s"%(self.start, self.end)
211 - def __eq__(self, other):
212 return self.start == other.start and self.end == other.end
213
214 -def edge_args(start, dest, direction, label):
215 """ 216 compute argument ordering for Edge constructor based on direction flag 217 @param direction str: 'i', 'o', or 'b' (in/out/bidir) relative to \a start 218 @param start str: name of starting node 219 @param start dest: name of destination node 220 """ 221 edge_args = [] 222 if direction in ['o', 'b']: 223 edge_args.append((start, dest, label)) 224 if direction in ['i', 'b']: 225 edge_args.append((dest, start, label)) 226 return edge_args
227 228
229 -class Graph(object):
230 """ 231 Utility class for polling ROS statistics from running ROS graph. 232 Not multi-thread-safe 233 """ 234
235 - def __init__(self, node_ns='/', topic_ns='/'):
236 self.master = rosgraph.masterapi.Master(_ROS_NAME) 237 238 self.node_ns = node_ns or '/' 239 self.topic_ns = topic_ns or '/' 240 241 # ROS nodes 242 self.nn_nodes = set([]) 243 # ROS topic nodes 244 self.nt_nodes = set([]) 245 246 # ROS nodes that aren't responding quickly 247 self.bad_nodes = {} 248 import threading 249 self.bad_nodes_lock = threading.Lock() 250 251 # ROS services 252 self.srvs = set([]) 253 # ROS node->node transport connections 254 self.nn_edges = EdgeList() 255 # ROS node->topic connections 256 self.nt_edges = EdgeList() 257 # ROS all node->topic connections, including empty 258 self.nt_all_edges = EdgeList() 259 260 # node names to URI map 261 self.node_uri_map = {} # { node_name_str : uri_str } 262 # reverse map URIs to node names 263 self.uri_node_map = {} # { uri_str : node_name_str } 264 265 # time we last contacted master 266 self.last_master_refresh = 0 267 self.last_node_refresh = {} 268 269 # time we last communicated with master 270 # seconds until master data is considered stale 271 self.master_stale = 5.0 272 # time we last communicated with node 273 # seconds until node data is considered stale 274 self.node_stale = 5.0 #seconds
275 276
277 - def set_master_stale(self, stale_secs):
278 """ 279 @param stale_secs: seconds that data is considered fresh 280 @type stale_secs: double 281 """ 282 self.master_stale = stale_secs
283
284 - def set_node_stale(self, stale_secs):
285 """ 286 @param stale_secs: seconds that data is considered fresh 287 @type stale_secs: double 288 """ 289 self.node_stale = stale_secs
290
291 - def _master_refresh(self):
292 """ 293 @return: True if nodes information was updated 294 @rtype: bool 295 """ 296 logger.debug("master refresh: starting") 297 updated = False 298 try: 299 val = self.master.getSystemState() 300 except rosgraph.masterapi.MasterException as e: 301 print("Unable to contact master", str(e), file=sys.stderr) 302 logger.error("unable to contact master: %s", str(e)) 303 return False 304 305 pubs, subs, srvs = val 306 307 nodes = [] 308 nt_all_edges = self.nt_all_edges 309 nt_nodes = self.nt_nodes 310 for state, direction in ((pubs, 'o'), (subs, 'i')): 311 for topic, l in state: 312 if topic.startswith(self.topic_ns): 313 nodes.extend([n for n in l if n.startswith(self.node_ns)]) 314 nt_nodes.add(topic_node(topic)) 315 for node in l: 316 updated = nt_all_edges.add_edges( 317 node, topic_node(topic), direction) or updated 318 319 nodes = set(nodes) 320 321 srvs = set([s for s, _ in srvs]) 322 purge = None 323 if nodes ^ self.nn_nodes: 324 purge = self.nn_nodes - nodes 325 self.nn_nodes = nodes 326 updated = True 327 if srvs ^ self.srvs: 328 self.srvs = srvs 329 updated = True 330 331 if purge: 332 logger.debug("following nodes and related edges will be purged: %s", ','.join(purge)) 333 for p in purge: 334 logger.debug('purging edges for node %s', p) 335 self.nn_edges.delete_all(p) 336 self.nt_edges.delete_all(p) 337 self.nt_all_edges.delete_all(p) 338 339 logger.debug("master refresh: done, updated[%s]", updated) 340 return updated
341
342 - def _mark_bad_node(self, node, reason):
343 try: 344 # bad nodes are updated in a separate thread, so lock 345 self.bad_nodes_lock.acquire() 346 if node in self.bad_nodes: 347 self.bad_nodes[node].type = BadNode.DEAD 348 else: 349 self.bad_nodes[node] = (BadNode(node, BadNode.DEAD, reason)) 350 finally: 351 self.bad_nodes_lock.release()
352
353 - def _unmark_bad_node(self, node, reason):
354 """ 355 Promotes bad node to 'wonky' status. 356 """ 357 try: 358 # bad nodes are updated in a separate thread, so lock 359 self.bad_nodes_lock.acquire() 360 bad = self.bad_nodes[node] 361 bad.type = BadNode.WONKY 362 finally: 363 self.bad_nodes_lock.release()
364
365 - def _node_refresh_businfo(self, node, api, bad_node=False):
366 """ 367 Retrieve bus info from the node and update nodes and edges as appropriate 368 @param node: node name 369 @type node: str 370 @param api: XML-RPC proxy 371 @type api: ServerProxy 372 @param bad_node: If True, node has connectivity issues and 373 should be treated differently 374 @type bad_node: bool 375 """ 376 try: 377 logger.debug("businfo: contacting node [%s] for bus info", node) 378 379 # unmark bad node, though it stays on the bad list 380 if bad_node: 381 self._unmark_bad_node(node) 382 # Lower the socket timeout as we cannot abide by slow HTTP timeouts. 383 # If a node cannot meet this timeout, it goes on the bad list 384 # TODO: override transport instead. 385 old_timeout = socket.getdefaulttimeout() 386 if bad_node: 387 #even stricter timeout for bad_nodes right now 388 socket.setdefaulttimeout(0.2) 389 else: 390 socket.setdefaulttimeout(1.0) 391 392 code, msg, bus_info = api.getBusInfo(_ROS_NAME) 393 394 socket.setdefaulttimeout(old_timeout) 395 except Exception as e: 396 # node is (still) bad 397 self._mark_bad_node(node, str(e)) 398 code = -1 399 msg = traceback.format_exc() 400 401 updated = False 402 if code != 1: 403 logger.error("cannot get stats info from node [%s]: %s", node, msg) 404 else: 405 # [[connectionId1, destinationId1, direction1, transport1, ...]... ] 406 for info in bus_info: 407 # #3579 bad node, ignore 408 if len(info) < 5: 409 continue 410 411 connection_id = info[0] 412 dest_id = info[1] 413 direction = info[2] 414 transport = info[3] 415 topic = info[4] 416 if len(info) > 5: 417 connected = info[5] 418 else: 419 connected = True #backwards compatibility 420 421 if connected and topic.startswith(self.topic_ns): 422 # blindly add as we will be able to catch state change via edges. 423 # this currently means we don't cleanup topics 424 self.nt_nodes.add(topic_node(topic)) 425 426 # update node->topic->node graph edges 427 updated = self.nt_edges.add_edges(node, topic_node(topic), direction) or updated 428 429 # update node->node graph edges 430 if dest_id.startswith('http://'): 431 #print("FOUND URI", dest_id) 432 dest_name = self.uri_node_map.get(dest_id, None) 433 updated = self.nn_edges.add_edges(node, dest_name, direction, topic) or updated 434 else: 435 #TODO: anyting to do here? 436 pass 437 return updated
438
439 - def _node_refresh(self, node, bad_node=False):
440 """ 441 Contact node for stats/connectivity information 442 @param node: name of node to contact 443 @type node: str 444 @param bad_node: if True, node has connectivity issues 445 @type bad_node: bool 446 @return: True if node was successfully contacted 447 @rtype bool 448 """ 449 # TODO: I'd like for master to provide this information in 450 # getSystemState() instead to prevent the extra connection per node 451 updated = False 452 uri = self._node_uri_refresh(node) 453 try: 454 if uri: 455 api = ServerProxy(uri) 456 updated = self._node_refresh_businfo(node, api, bad_node) 457 except KeyError as e: 458 logger.warn('cannot contact node [%s] as it is not in the lookup table'%node) 459 return updated
460
461 - def _node_uri_refresh(self, node):
462 try: 463 uri = self.master.lookupNode(node) 464 except: 465 msg = traceback.format_exc() 466 logger.warn("master reported error in node lookup: %s"%msg) 467 return None 468 # update maps 469 self.node_uri_map[node] = uri 470 self.uri_node_map[uri] = node 471 return uri
472
473 - def _node_uri_refresh_all(self):
474 """ 475 Build self.node_uri_map and self.uri_node_map using master as a 476 lookup service. This will make N requests to the master for N 477 nodes, so this should only be used sparingly 478 """ 479 for node in self.nn_nodes: 480 self._node_uri_refresh(node)
481
482 - def bad_update(self):
483 """ 484 Update loop for nodes with bad connectivity. We box them separately 485 so that we can maintain the good performance of the normal update loop. 486 Once a node is on the bad list it stays there. 487 """ 488 last_node_refresh = self.last_node_refresh 489 490 # nodes left to check 491 try: 492 self.bad_nodes_lock.acquire() 493 # make copy due to multithreading 494 update_queue = self.bad_nodes.values()[:] 495 finally: 496 self.bad_nodes_lock.release() 497 498 # return value. True if new data differs from old 499 updated = False 500 # number of nodes we checked 501 num_nodes = 0 502 503 start_time = time.time() 504 while update_queue: 505 # figure out the next node to contact 506 next = update_queue.pop() 507 # rate limit talking to any particular node 508 if time.time() > (last_node_refresh.get(next, 0.0) + self.node_stale): 509 updated = self._node_refresh(next.name, True) or updated 510 # include in random offset (max 1/5th normal update interval) 511 # to help spread out updates 512 last_node_refresh[next] = time.time() + (random.random() * self.node_stale / 5.0) 513 num_nodes += 1 514 515 # small yield to keep from torquing the processor 516 time.sleep(0.01) 517 end_time = time.time() 518 #print("Update (bad nodes) took %ss for %s nodes"%((end_time-start_time), num_nodes)) 519 logger.debug("ROS stats (bad nodes) update took %ss"%(end_time-start_time)) 520 return updated
521
522 - def update(self):
523 """ 524 Update all the stats. This method may take awhile to complete as it will 525 communicate with all nodes + master. 526 """ 527 528 last_node_refresh = self.last_node_refresh 529 530 # nodes left to check 531 update_queue = None 532 # True if there are still more stats to fetch this cycle 533 work_to_do = True 534 # return value. True if new data differs from old 535 updated = False 536 # number of nodes we checked 537 num_nodes = 0 538 539 start_time = time.time() 540 while work_to_do: 541 542 # each time through the loop try to talk to either the master 543 # or a node. stop when we have talked to everybody. 544 545 # get a new node list from the master 546 if time.time() > (self.last_master_refresh + self.master_stale): 547 updated = self._master_refresh() 548 if self.last_master_refresh == 0: 549 # first time we contact the master, also refresh our full lookup tables 550 self._node_uri_refresh_all() 551 552 self.last_master_refresh = time.time() 553 # contact the nodes for stats 554 else: 555 # initialize update_queue based on most current nodes list 556 if update_queue is None: 557 update_queue = list(self.nn_nodes) 558 # no nodes left to contact 559 elif not update_queue: 560 work_to_do = False 561 # contact next node 562 else: 563 # figure out the next node to contact 564 next = update_queue.pop() 565 # rate limit talking to any particular node 566 if time.time() > (last_node_refresh.get(next, 0.0) + self.node_stale): 567 updated = self._node_refresh(next) or updated 568 # include in random offset (max 1/5th normal update interval) 569 # to help spread out updates 570 last_node_refresh[next] = time.time() + (random.random() * self.node_stale / 5.0) 571 num_nodes += 1 572 573 # small yield to keep from torquing the processor 574 time.sleep(0.01) 575 end_time = time.time() 576 #print("Update took %ss for %s nodes"%((end_time-start_time), num_nodes)) 577 logger.debug("ROS stats update took %ss"%(end_time-start_time)) 578 return updated
579