Package roswtf :: Module graph
[frames] | no frames]

Source Code for Module roswtf.graph

  1  # Software License Agreement (BSD License) 
  2  # 
  3  # Copyright (c) 2009, Willow Garage, Inc. 
  4  # All rights reserved. 
  5  # 
  6  # Redistribution and use in source and binary forms, with or without 
  7  # modification, are permitted provided that the following conditions 
  8  # are met: 
  9  # 
 10  #  * Redistributions of source code must retain the above copyright 
 11  #    notice, this list of conditions and the following disclaimer. 
 12  #  * Redistributions in binary form must reproduce the above 
 13  #    copyright notice, this list of conditions and the following 
 14  #    disclaimer in the documentation and/or other materials provided 
 15  #    with the distribution. 
 16  #  * Neither the name of Willow Garage, Inc. nor the names of its 
 17  #    contributors may be used to endorse or promote products derived 
 18  #    from this software without specific prior written permission. 
 19  # 
 20  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
 21  # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
 22  # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
 23  # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
 24  # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
 25  # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
 26  # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
 27  # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
 28  # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
 29  # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
 30  # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 31  # POSSIBILITY OF SUCH DAMAGE. 
 32  # 
 33  # Revision $Id$ 
 34   
 35  from __future__ import with_statement 
 36   
 37  import os 
 38  import itertools 
 39  import socket 
 40  import sys 
 41  import time 
 42  import xmlrpclib 
 43   
 44  import rospkg.environment 
 45   
 46  import rosgraph 
 47  import rosgraph.rosenv 
 48  import rosgraph.network 
 49   
 50  import rosnode 
 51  import rosservice 
 52   
 53  from roswtf.context import WtfException 
 54  from roswtf.environment import paths, is_executable 
 55  from roswtf.model import WtfWarning, WtfError 
 56  from roswtf.rules import warning_rule, error_rule 
 57   
58 -def _businfo(ctx, node, bus_info):
59 # [[connectionId1, destinationId1, direction1, transport1, ...]... ] 60 edges = [] 61 for info in bus_info: 62 #connection_id = info[0] 63 dest_id = info[1] 64 if dest_id.startswith('http://'): 65 if dest_id in ctx.uri_node_map: 66 dest_id = ctx.uri_node_map[dest_id] 67 else: 68 dest_id = 'unknown (%s)'%dest_id 69 direction = info[2] 70 #transport = info[3] 71 topic = info[4] 72 if len(info) > 5: 73 connected = info[5] 74 else: 75 connected = True #backwards compatibility 76 77 if connected: 78 if direction == 'i': 79 edges.append((topic, dest_id, node)) 80 elif direction == 'o': 81 edges.append((topic, node, dest_id)) 82 elif direction == 'b': 83 print >> sys.stderr, "cannot handle bidirectional edges" 84 else: 85 raise Exception() 86 87 return edges
88
89 -def unexpected_edges(ctx):
90 if not ctx.system_state or not ctx.nodes: 91 return 92 unexpected = set(ctx.actual_edges) - set(ctx.expected_edges) 93 return ["%s->%s (%s)"%(p, s, t) for (t, p, s) in unexpected]
94
95 -def missing_edges(ctx):
96 if not ctx.system_state or not ctx.nodes: 97 return 98 missing = set(ctx.expected_edges) - set(ctx.actual_edges) 99 return ["%s->%s (%s)"%(p, s, t) for (t, p, s) in missing]
100
101 -def ping_check(ctx):
102 if not ctx.system_state or not ctx.nodes: 103 return 104 _, unpinged = rosnode.rosnode_ping_all() 105 return unpinged
106
107 -def simtime_check(ctx):
108 if ctx.use_sim_time: 109 master = rosgraph.Master('/roswtf') 110 try: 111 pubtopics = master.getPublishedTopics('/') 112 except rosgraph.MasterException: 113 ctx.errors.append(WtfError("Cannot talk to ROS master")) 114 raise WtfException("roswtf lost connection to the ROS Master at %s"%rosgraph.rosenv.get_master_uri()) 115 116 for topic, _ in pubtopics: 117 if topic in ['/time', '/clock']: 118 return 119 return True
120 121 ## contact each service and make sure it returns a header
122 -def probe_all_services(ctx):
123 master = rosgraph.Master('/roswtf') 124 errors = [] 125 for service_name in ctx.services: 126 try: 127 service_uri = master.lookupService(service_name) 128 except: 129 ctx.errors.append(WtfError("cannot contact ROS Master at %s"%rosgraph.rosenv.get_master_uri())) 130 raise WtfException("roswtf lost connection to the ROS Master at %s"%rosgraph.rosenv.get_master_uri()) 131 132 try: 133 headers = rosservice.get_service_headers(service_name, service_uri) 134 if not headers: 135 errors.append("service [%s] did not return service headers"%service_name) 136 except rosgraph.network.ROSHandshakeException as e: 137 errors.append("service [%s] appears to be malfunctioning"%service_name) 138 except Exception as e: 139 errors.append("service [%s] appears to be malfunctioning: %s"%(service_name, e)) 140 return errors
141
142 -def unconnected_subscriptions(ctx):
143 ret = '' 144 whitelist = ['/reset_time'] 145 if ctx.use_sim_time: 146 for sub, l in ctx.unconnected_subscriptions.iteritems(): 147 l = [t for t in l if t not in whitelist] 148 if l: 149 ret += ' * %s:\n'%sub 150 ret += ''.join([" * %s\n"%t for t in l]) 151 else: 152 for sub, l in ctx.unconnected_subscriptions.iteritems(): 153 l = [t for t in l if t not in ['/time', '/clock']] 154 if l: 155 ret += ' * %s:\n'%sub 156 ret += ''.join([" * %s\n"%t for t in l]) 157 return ret
158 159 graph_warnings = [ 160 (unconnected_subscriptions, "The following node subscriptions are unconnected:\n"), 161 (unexpected_edges, "The following nodes are unexpectedly connected:"), 162 ] 163 164 graph_errors = [ 165 (simtime_check, "/use_simtime is set but no publisher of /clock is present"), 166 (ping_check, "Could not contact the following nodes:"), 167 (missing_edges, "The following nodes should be connected but aren't:"), 168 (probe_all_services, "Errors connecting to the following services:"), 169 ] 170
171 -def topic_timestamp_drift(ctx, t):
172 #TODO: get msg_class, if msg_class has header, receive a message 173 # and compare its time to ros time 174 if 0: 175 rospy.Subscriber(t, msg_class)
176 177 #TODO: these are mainly future enhancements. It's unclear to me whether or not this will be 178 #useful as most of the generic rules are capable of targetting these problems as well. 179 #The only rule that in particular seems useful is the timestamp drift. It may be too 180 #expensive otherwise to run, though it would be interesting to attempt to receive a 181 #message from every single topic. 182 183 #TODO: parameter audit? 184 service_errors = [ 185 ] 186 service_warnings = [ 187 ] 188 topic_errors = [ 189 (topic_timestamp_drift, "Timestamp drift:") 190 ] 191 topic_warnings = [ 192 ] 193 node_errors = [ 194 ] 195 node_warnings = [ 196 ] 197 198 ## cache sim_time calculation sot that multiple rules can use
199 -def _compute_sim_time(ctx):
200 param_server = rosgraph.Master('/roswtf') 201 ctx.use_sim_time = False 202 try: 203 val = simtime = param_server.getParam('/use_sim_time') 204 if val: 205 ctx.use_sim_time = True 206 except: 207 pass
208
209 -def _compute_system_state(ctx):
210 socket.setdefaulttimeout(3.0) 211 master = rosgraph.Master('/roswtf') 212 213 # store system state 214 try: 215 val = master.getSystemState() 216 except rosgraph.MasterException: 217 return 218 ctx.system_state = val 219 220 pubs, subs, srvs = val 221 222 # compute list of topics and services 223 topics = [] 224 for t, _ in itertools.chain(pubs, subs): 225 topics.append(t) 226 services = [] 227 service_providers = [] 228 for s, l in srvs: 229 services.append(s) 230 service_providers.extend(l) 231 ctx.topics = topics 232 ctx.services = services 233 ctx.service_providers = service_providers 234 235 # compute list of nodes 236 nodes = [] 237 for s in val: 238 for t, l in s: 239 nodes.extend(l) 240 ctx.nodes = list(set(nodes)) #uniq 241 242 # - compute reverse mapping of URI->nodename 243 count = 0 244 start = time.time() 245 for n in ctx.nodes: 246 count += 1 247 try: 248 val = master.lookupNode(n) 249 except socket.error: 250 ctx.errors.append(WtfError("cannot contact ROS Master at %s"%rosgraph.rosenv.get_master_uri())) 251 raise WtfException("roswtf lost connection to the ROS Master at %s"%rosgraph.rosenv.get_master_uri()) 252 ctx.uri_node_map[val] = n 253 end = time.time() 254 # - time thresholds currently very arbitrary 255 if count: 256 if ((end - start) / count) > 1.: 257 ctx.warnings.append(WtfError("Communication with master is very slow (>1s average)")) 258 elif (end - start) / count > .5: 259 ctx.warnings.append(WtfWarning("Communication with master is very slow (>0.5s average)"))
260 261 import threading
262 -class NodeInfoThread(threading.Thread):
263 - def __init__(self, n, ctx, master, actual_edges, lock):
264 threading.Thread.__init__(self) 265 self.master = master 266 self.actual_edges = actual_edges 267 self.lock = lock 268 self.n = n 269 self.done = False 270 self.ctx = ctx
271
272 - def run(self):
273 ctx = self.ctx 274 master = self.master 275 actual_edges = self.actual_edges 276 lock = self.lock 277 n = self.n 278 279 try: 280 socket.setdefaulttimeout(3.0) 281 with lock: #Apparently get_api_uri is not thread safe... 282 node_api = rosnode.get_api_uri(master, n) 283 284 285 if not node_api: 286 with lock: 287 ctx.errors.append(WtfError("Master does not have lookup information for node [%s]"%n)) 288 return 289 290 node = xmlrpclib.ServerProxy(node_api) 291 start = time.time() 292 socket.setdefaulttimeout(3.0) 293 code, msg, bus_info = node.getBusInfo('/roswtf') 294 end = time.time() 295 with lock: 296 if (end-start) > 1.: 297 ctx.warnings.append(WtfWarning("Communication with node [%s] is very slow"%n)) 298 if code != 1: 299 ctx.warnings.append(WtfWarning("Node [%s] would not return bus info"%n)) 300 elif not bus_info: 301 if not n in ctx.service_providers: 302 ctx.warnings.append(WtfWarning("Node [%s] is not connected to anything"%n)) 303 else: 304 edges = _businfo(ctx, n, bus_info) 305 actual_edges.extend(edges) 306 except socket.error, e: 307 pass #ignore as we have rules to catch this 308 except Exception, e: 309 ctx.errors.append(WtfError("Communication with [%s] raised an error: %s"%(n, str(e)))) 310 finally: 311 self.done = True
312 313 314 ## retrieve graph state from master and related nodes once so we don't overload 315 ## the network
316 -def _compute_connectivity(ctx):
317 socket.setdefaulttimeout(3.0) 318 master = rosgraph.Master('/roswtf') 319 320 # Compute list of expected edges and unconnected subscriptions 321 pubs, subs, _ = ctx.system_state 322 expected_edges = [] # [(topic, publisher, subscriber),] 323 unconnected_subscriptions = {} # { subscriber : [topics] } 324 325 # - build up a dictionary of publishers keyed by topic 326 pub_dict = {} 327 for t, pub_list in pubs: 328 pub_dict[t] = pub_list 329 # - iterate through subscribers and add edge to each publisher of topic 330 for t, sub_list in subs: 331 for sub in sub_list: 332 if t in pub_dict: 333 expected_edges.extend([(t, pub, sub) for pub in pub_dict[t]]) 334 elif sub in unconnected_subscriptions: 335 unconnected_subscriptions[sub].append(t) 336 else: 337 unconnected_subscriptions[sub] = [t] 338 339 # compute actual edges 340 actual_edges = [] 341 lock = threading.Lock() 342 threads = [] 343 for n in ctx.nodes: 344 t =NodeInfoThread(n, ctx, master, actual_edges, lock) 345 threads.append(t) 346 t.start() 347 348 # spend up to a minute waiting for threads to complete. each 349 # thread has a 3-second timeout, but this will spike load 350 timeout_t = time.time() + 60.0 351 while time.time() < timeout_t and [t for t in threads if not t.done]: 352 time.sleep(0.5) 353 354 ctx.expected_edges = expected_edges 355 ctx.actual_edges = actual_edges 356 ctx.unconnected_subscriptions = unconnected_subscriptions
357
358 -def _compute_online_context(ctx):
359 # have to compute sim time first 360 _compute_sim_time(ctx) 361 _compute_system_state(ctx) 362 _compute_connectivity(ctx)
363
364 -def wtf_check_graph(ctx, names=None):
365 master_uri = ctx.ros_master_uri 366 #TODO: master rules 367 # - check for stale master state 368 369 # TODO: get the type for each topic from each publisher and see if they match up 370 371 master = rosgraph.Master('/roswtf') 372 try: 373 master.getPid() 374 except rospkg.MasterException: 375 warning_rule((True, "Cannot communicate with master, ignoring online checks"), True, ctx) 376 return 377 378 # fill in ctx info so we only have to compute once 379 print "analyzing graph..." 380 _compute_online_context(ctx) 381 print "... done analyzing graph" 382 383 if names: 384 check_topics = [t for t in names if t in ctx.topics] 385 check_services = [t for t in names if t in ctx.services] 386 check_nodes = [t for t in names if t in ctx.nodes] 387 unknown = [t for t in names if t not in check_topics + check_services + check_nodes] 388 if unknown: 389 raise WtfException("The following names were not found in the list of nodes, topics, or services:\n%s"%(''.join([" * %s\n"%t for t in unknown]))) 390 391 for t in check_topics: 392 for r in topic_warnings: 393 warning_rule(r, r[0](ctx, t), ctx) 394 for r in topic_errors: 395 error_rule(r, r[0](ctx, t), ctx) 396 for s in check_services: 397 for r in service_warnings: 398 warning_rule(r, r[0](ctx, s), ctx) 399 for r in service_errors: 400 error_rule(r, r[0](ctx, s), ctx) 401 for n in check_nodes: 402 for r in node_warnings: 403 warning_rule(r, r[0](ctx, n), ctx) 404 for r in node_errors: 405 error_rule(r, r[0](ctx, n), ctx) 406 407 408 print "running graph rules..." 409 for r in graph_warnings: 410 warning_rule(r, r[0](ctx), ctx) 411 for r in graph_errors: 412 error_rule(r, r[0](ctx), ctx) 413 print "... done running graph rules"
414