Package roswtf :: Module graph
[frames] | no frames]

Source Code for Module roswtf.graph

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2009, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  from __future__ import print_function 
 36  from __future__ import with_statement 
 37   
 38  import os 
 39  import itertools 
 40  import socket 
 41  import sys 
 42  import time 
 43  try: 
 44      from xmlrpc.client import ServerProxy 
 45  except ImportError: 
 46      from xmlrpclib import ServerProxy 
 47   
 48  import rospkg.environment 
 49   
 50  import rosgraph 
 51  import rosgraph.rosenv 
 52  import rosgraph.network 
 53   
 54  import rosnode 
 55  import rosservice 
 56   
 57  from roswtf.context import WtfException 
 58  from roswtf.environment import paths, is_executable 
 59  from roswtf.model import WtfWarning, WtfError 
 60  from roswtf.rules import warning_rule, error_rule 
 61   
62 -def _businfo(ctx, node, bus_info):
63 # [[connectionId1, destinationId1, direction1, transport1, ...]... ] 64 edges = [] 65 for info in bus_info: 66 #connection_id = info[0] 67 dest_id = info[1] 68 if dest_id.startswith('http://'): 69 if dest_id in ctx.uri_node_map: 70 dest_id = ctx.uri_node_map[dest_id] 71 else: 72 dest_id = 'unknown (%s)'%dest_id 73 direction = info[2] 74 #transport = info[3] 75 topic = info[4] 76 if len(info) > 5: 77 connected = info[5] 78 else: 79 connected = True #backwards compatibility 80 81 if connected: 82 if direction == 'i': 83 edges.append((topic, dest_id, node)) 84 elif direction == 'o': 85 edges.append((topic, node, dest_id)) 86 elif direction == 'b': 87 print("cannot handle bidirectional edges", file=sys.stderr) 88 else: 89 raise Exception() 90 91 return edges
92
93 -def unexpected_edges(ctx):
94 if not ctx.system_state or not ctx.nodes: 95 return 96 unexpected = set(ctx.actual_edges) - set(ctx.expected_edges) 97 return ["%s->%s (%s)"%(p, s, t) for (t, p, s) in unexpected]
98
99 -def missing_edges(ctx):
100 if not ctx.system_state or not ctx.nodes: 101 return 102 missing = set(ctx.expected_edges) - set(ctx.actual_edges) 103 return ["%s->%s (%s)"%(p, s, t) for (t, p, s) in missing]
104
105 -def ping_check(ctx):
106 if not ctx.system_state or not ctx.nodes: 107 return 108 _, unpinged = rosnode.rosnode_ping_all() 109 return unpinged
110
111 -def simtime_check(ctx):
112 if ctx.use_sim_time: 113 master = rosgraph.Master('/roswtf') 114 try: 115 pubtopics = master.getPublishedTopics('/') 116 except rosgraph.MasterException: 117 ctx.errors.append(WtfError("Cannot talk to ROS master")) 118 raise WtfException("roswtf lost connection to the ROS Master at %s"%rosgraph.rosenv.get_master_uri()) 119 120 for topic, _ in pubtopics: 121 if topic in ['/time', '/clock']: 122 return 123 return True
124 125 ## contact each service and make sure it returns a header
126 -def probe_all_services(ctx):
127 master = rosgraph.Master('/roswtf') 128 errors = [] 129 for service_name in ctx.services: 130 try: 131 service_uri = master.lookupService(service_name) 132 except: 133 ctx.errors.append(WtfError("cannot contact ROS Master at %s"%rosgraph.rosenv.get_master_uri())) 134 raise WtfException("roswtf lost connection to the ROS Master at %s"%rosgraph.rosenv.get_master_uri()) 135 136 try: 137 headers = rosservice.get_service_headers(service_name, service_uri) 138 if not headers: 139 errors.append("service [%s] did not return service headers"%service_name) 140 except rosgraph.network.ROSHandshakeException as e: 141 errors.append("service [%s] appears to be malfunctioning"%service_name) 142 except Exception as e: 143 errors.append("service [%s] appears to be malfunctioning: %s"%(service_name, e)) 144 return errors
145
146 -def unconnected_subscriptions(ctx):
147 ret = '' 148 whitelist = ['/reset_time'] 149 if ctx.use_sim_time: 150 for sub, l in ctx.unconnected_subscriptions.items(): 151 l = [t for t in l if t not in whitelist] 152 if l: 153 ret += ' * %s:\n'%sub 154 ret += ''.join([" * %s\n"%t for t in l]) 155 else: 156 for sub, l in ctx.unconnected_subscriptions.items(): 157 l = [t for t in l if t not in ['/time', '/clock']] 158 if l: 159 ret += ' * %s:\n'%sub 160 ret += ''.join([" * %s\n"%t for t in l]) 161 return ret
162 163 graph_warnings = [ 164 (unconnected_subscriptions, "The following node subscriptions are unconnected:\n"), 165 (unexpected_edges, "The following nodes are unexpectedly connected:"), 166 ] 167 168 graph_errors = [ 169 (simtime_check, "/use_sim_time is set but no publisher of /clock is present"), 170 (ping_check, "Could not contact the following nodes:"), 171 (missing_edges, "The following nodes should be connected but aren't:"), 172 (probe_all_services, "Errors connecting to the following services:"), 173 ] 174
175 -def topic_timestamp_drift(ctx, t):
176 #TODO: get msg_class, if msg_class has header, receive a message 177 # and compare its time to ros time 178 if 0: 179 rospy.Subscriber(t, msg_class)
180 181 #TODO: these are mainly future enhancements. It's unclear to me whether or not this will be 182 #useful as most of the generic rules are capable of targeting these problems as well. 183 #The only rule that in particular seems useful is the timestamp drift. It may be too 184 #expensive otherwise to run, though it would be interesting to attempt to receive a 185 #message from every single topic. 186 187 #TODO: parameter audit? 188 service_errors = [ 189 ] 190 service_warnings = [ 191 ] 192 topic_errors = [ 193 (topic_timestamp_drift, "Timestamp drift:") 194 ] 195 topic_warnings = [ 196 ] 197 node_errors = [ 198 ] 199 node_warnings = [ 200 ] 201 202 ## cache sim_time calculation sot that multiple rules can use
203 -def _compute_sim_time(ctx):
204 param_server = rosgraph.Master('/roswtf') 205 ctx.use_sim_time = False 206 try: 207 val = simtime = param_server.getParam('/use_sim_time') 208 if val: 209 ctx.use_sim_time = True 210 except: 211 pass
212
213 -def _compute_system_state(ctx):
214 socket.setdefaulttimeout(3.0) 215 master = rosgraph.Master('/roswtf') 216 217 # store system state 218 try: 219 val = master.getSystemState() 220 except rosgraph.MasterException: 221 return 222 ctx.system_state = val 223 224 pubs, subs, srvs = val 225 226 # compute list of topics and services 227 topics = [] 228 for t, _ in itertools.chain(pubs, subs): 229 topics.append(t) 230 services = [] 231 service_providers = [] 232 for s, l in srvs: 233 services.append(s) 234 service_providers.extend(l) 235 ctx.topics = topics 236 ctx.services = services 237 ctx.service_providers = service_providers 238 239 # compute list of nodes 240 nodes = [] 241 for s in val: 242 for t, l in s: 243 nodes.extend(l) 244 ctx.nodes = list(set(nodes)) #uniq 245 246 # - compute reverse mapping of URI->nodename 247 count = 0 248 start = time.time() 249 for n in ctx.nodes: 250 count += 1 251 try: 252 val = master.lookupNode(n) 253 except socket.error: 254 ctx.errors.append(WtfError("cannot contact ROS Master at %s"%rosgraph.rosenv.get_master_uri())) 255 raise WtfException("roswtf lost connection to the ROS Master at %s"%rosgraph.rosenv.get_master_uri()) 256 ctx.uri_node_map[val] = n 257 end = time.time() 258 # - time thresholds currently very arbitrary 259 if count: 260 if ((end - start) / count) > 1.: 261 ctx.warnings.append(WtfError("Communication with master is very slow (>1s average)")) 262 elif (end - start) / count > .5: 263 ctx.warnings.append(WtfWarning("Communication with master is very slow (>0.5s average)"))
264 265 import threading
266 -class NodeInfoThread(threading.Thread):
267 - def __init__(self, n, ctx, master, actual_edges, lock):
268 threading.Thread.__init__(self) 269 self.master = master 270 self.actual_edges = actual_edges 271 self.lock = lock 272 self.n = n 273 self.done = False 274 self.ctx = ctx
275
276 - def run(self):
277 ctx = self.ctx 278 master = self.master 279 actual_edges = self.actual_edges 280 lock = self.lock 281 n = self.n 282 283 try: 284 socket.setdefaulttimeout(3.0) 285 with lock: #Apparently get_api_uri is not thread safe... 286 node_api = rosnode.get_api_uri(master, n) 287 288 289 if not node_api: 290 with lock: 291 ctx.errors.append(WtfError("Master does not have lookup information for node [%s]"%n)) 292 return 293 294 node = ServerProxy(node_api) 295 start = time.time() 296 socket.setdefaulttimeout(3.0) 297 code, msg, bus_info = node.getBusInfo('/roswtf') 298 end = time.time() 299 with lock: 300 if (end-start) > 1.: 301 ctx.warnings.append(WtfWarning("Communication with node [%s] is very slow"%n)) 302 if code != 1: 303 ctx.warnings.append(WtfWarning("Node [%s] would not return bus info"%n)) 304 elif not bus_info: 305 if not n in ctx.service_providers: 306 ctx.warnings.append(WtfWarning("Node [%s] is not connected to anything"%n)) 307 else: 308 edges = _businfo(ctx, n, bus_info) 309 actual_edges.extend(edges) 310 except socket.error: 311 pass #ignore as we have rules to catch this 312 except Exception as e: 313 ctx.errors.append(WtfError("Communication with [%s] raised an error: %s"%(n, str(e)))) 314 finally: 315 self.done = True
316 317 318 ## retrieve graph state from master and related nodes once so we don't overload 319 ## the network
320 -def _compute_connectivity(ctx):
321 socket.setdefaulttimeout(3.0) 322 master = rosgraph.Master('/roswtf') 323 324 # Compute list of expected edges and unconnected subscriptions 325 pubs, subs, _ = ctx.system_state 326 expected_edges = [] # [(topic, publisher, subscriber),] 327 unconnected_subscriptions = {} # { subscriber : [topics] } 328 329 # - build up a dictionary of publishers keyed by topic 330 pub_dict = {} 331 for t, pub_list in pubs: 332 pub_dict[t] = pub_list 333 # - iterate through subscribers and add edge to each publisher of topic 334 for t, sub_list in subs: 335 for sub in sub_list: 336 if t in pub_dict: 337 expected_edges.extend([(t, pub, sub) for pub in pub_dict[t]]) 338 elif sub in unconnected_subscriptions: 339 unconnected_subscriptions[sub].append(t) 340 else: 341 unconnected_subscriptions[sub] = [t] 342 343 # compute actual edges 344 actual_edges = [] 345 lock = threading.Lock() 346 threads = [] 347 for n in ctx.nodes: 348 t =NodeInfoThread(n, ctx, master, actual_edges, lock) 349 threads.append(t) 350 t.start() 351 352 # spend up to a minute waiting for threads to complete. each 353 # thread has a 3-second timeout, but this will spike load 354 timeout_t = time.time() + 60.0 355 while time.time() < timeout_t and [t for t in threads if not t.done]: 356 time.sleep(0.5) 357 358 ctx.expected_edges = expected_edges 359 ctx.actual_edges = actual_edges 360 ctx.unconnected_subscriptions = unconnected_subscriptions
361
362 -def _compute_online_context(ctx):
363 # have to compute sim time first 364 _compute_sim_time(ctx) 365 _compute_system_state(ctx) 366 _compute_connectivity(ctx)
367
368 -def wtf_check_graph(ctx, names=None):
369 master_uri = ctx.ros_master_uri 370 #TODO: master rules 371 # - check for stale master state 372 373 # TODO: get the type for each topic from each publisher and see if they match up 374 375 master = rosgraph.Master('/roswtf') 376 try: 377 master.getPid() 378 except rospkg.MasterException: 379 warning_rule((True, "Cannot communicate with master, ignoring online checks"), True, ctx) 380 return 381 382 # fill in ctx info so we only have to compute once 383 print("analyzing graph...") 384 _compute_online_context(ctx) 385 print("... done analyzing graph") 386 387 if names: 388 check_topics = [t for t in names if t in ctx.topics] 389 check_services = [t for t in names if t in ctx.services] 390 check_nodes = [t for t in names if t in ctx.nodes] 391 unknown = [t for t in names if t not in check_topics + check_services + check_nodes] 392 if unknown: 393 raise WtfException("The following names were not found in the list of nodes, topics, or services:\n%s"%(''.join([" * %s\n"%t for t in unknown]))) 394 395 for t in check_topics: 396 for r in topic_warnings: 397 warning_rule(r, r[0](ctx, t), ctx) 398 for r in topic_errors: 399 error_rule(r, r[0](ctx, t), ctx) 400 for s in check_services: 401 for r in service_warnings: 402 warning_rule(r, r[0](ctx, s), ctx) 403 for r in service_errors: 404 error_rule(r, r[0](ctx, s), ctx) 405 for n in check_nodes: 406 for r in node_warnings: 407 warning_rule(r, r[0](ctx, n), ctx) 408 for r in node_errors: 409 error_rule(r, r[0](ctx, n), ctx) 410 411 412 print("running graph rules...") 413 for r in graph_warnings: 414 warning_rule(r, r[0](ctx), ctx) 415 for r in graph_errors: 416 error_rule(r, r[0](ctx), ctx) 417 print("... done running graph rules")
418