$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2009, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id: test_rosnode_command_line_online.py 5516 2009-08-06 01:55:16Z sfkwc $ 00035 00036 PKG = 'test_rosnode' 00037 NAME = 'test_rosnode_command_line_online' 00038 import roslib; roslib.load_manifest(PKG) 00039 00040 import os 00041 import signal 00042 import sys 00043 import time 00044 import unittest 00045 00046 import rospy 00047 import rostest 00048 00049 import std_msgs.msg 00050 00051 from subprocess import Popen, PIPE, check_call, call 00052 00053 def run_for(cmd, secs): 00054 popen = Popen(cmd, stdout=PIPE, stderr=PIPE, close_fds=True) 00055 timeout_t = time.time() + secs 00056 while time.time() < timeout_t: 00057 time.sleep(0.1) 00058 os.kill(popen.pid, signal.SIGKILL) 00059 00060 class TestRosnodeOnline(unittest.TestCase): 00061 00062 def setUp(self): 00063 self.vals = set() 00064 self.msgs = {} 00065 00066 def callback(self, msg, val): 00067 self.vals.add(val) 00068 self.msgs[val] = msg 00069 00070 def test_rosnode(self): 00071 topics = ['/chatter', '/foo/chatter', '/bar/chatter'] 00072 00073 # wait for network to initialize 00074 rospy.init_node('test') 00075 nodes = ['/talker', '/foo/talker', '/bar/talker', rospy.get_caller_id()] 00076 00077 for i, t in enumerate(topics): 00078 rospy.Subscriber(t, std_msgs.msg.String, self.callback, i) 00079 all = set(range(0, len(topics))) 00080 00081 timeout_t = time.time() + 10. 00082 while time.time() < timeout_t and self.vals != all: 00083 time.sleep(0.1) 00084 self.assertEquals(self.vals, all, "failed to initialize graph correctly") 00085 00086 00087 # network is initialized 00088 cmd = 'rosnode' 00089 00090 # list 00091 # - we aren't matching against the core services as those can make the test suites brittle 00092 output = Popen([cmd, 'list'], stdout=PIPE).communicate()[0] 00093 l = set(output.split()) 00094 for t in nodes: 00095 self.assert_(t in l, "%s not in %s"%(t, l)) 00096 00097 output = Popen([cmd, 'list', '-a'], stdout=PIPE).communicate()[0] 00098 l = set(output.split()) 00099 for t in nodes: 00100 for e in l: 00101 if t in e: 00102 break 00103 else: 00104 self.fail("did not find [%s] in list [%s]"%(t, l)) 00105 00106 output = Popen([cmd, 'list', '-u'], stdout=PIPE).communicate()[0] 00107 l = set(output.split()) 00108 self.assert_(len(l), "list -u is empty") 00109 for e in l: 00110 self.assert_(e.startswith('http://')) 00111 00112 for name in nodes: 00113 # type 00114 output = Popen([cmd, 'info', name], stdout=PIPE).communicate()[0] 00115 # not really validating output as much as making sure it's not broken 00116 self.assert_(name in output) 00117 self.assert_('chatter' in output) 00118 self.assert_('Publications' in output) 00119 self.assert_('Subscriptions' in output) 00120 00121 if 0: 00122 #ping 00123 stdout, stderr = run_for([cmd, 'ping', name], 3.) 00124 00125 if __name__ == '__main__': 00126 rostest.run(PKG, NAME, TestRosnodeOnline, sys.argv)