$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2009, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id: test_rosnode_command_line_online.py 5710 2009-08-20 03:11:04Z sfkwc $ 00035 00036 from __future__ import with_statement 00037 00038 PKG = 'test_rosnode' 00039 NAME = 'test_rosnode_command_line_online' 00040 import roslib; roslib.load_manifest(PKG) 00041 00042 import os 00043 import signal 00044 import sys 00045 import time 00046 import unittest 00047 00048 import rostest 00049 00050 import cStringIO 00051 from subprocess import Popen, PIPE, check_call, call 00052 00053 from roslib.scriptutil import get_param_server, script_resolve_name 00054 00055 from contextlib import contextmanager 00056 @contextmanager 00057 def fakestdout(): 00058 realstdout = sys.stdout 00059 fakestdout = cStringIO.StringIO() 00060 sys.stdout = fakestdout 00061 yield fakestdout 00062 sys.stdout = realstdout 00063 00064 def tolist(b): 00065 return [x.strip() for x in b.getvalue().split('\n') if x.strip()] 00066 00067 class TestRosnode(unittest.TestCase): 00068 00069 def __init__(self, *args): 00070 unittest.TestCase.__init__(self, *args) 00071 self.vals = set() 00072 00073 # wait for network to initialize 00074 import rospy 00075 import std_msgs.msg 00076 rospy.init_node('test') 00077 topics = ['/chatter', '/foo/chatter', '/bar/chatter'] 00078 subs = [rospy.Subscriber(t, std_msgs.msg.String, self.callback, i) for i, t in enumerate(topics)] 00079 all = set(range(0, len(topics))) 00080 00081 timeout_t = time.time() + 10. 00082 while time.time() < timeout_t and self.vals != all: 00083 time.sleep(0.1) 00084 [s.unregister() for s in subs] 00085 00086 def setUp(self): 00087 self.vals = set() 00088 self.msgs = {} 00089 00090 def callback(self, msg, val): 00091 self.vals.add(val) 00092 00093 def _check(self, expected, actual): 00094 """ 00095 Make sure all elements of expected are present in actual 00096 """ 00097 for t in expected: 00098 self.assert_(t in actual) 00099 00100 def _notcheck(self, not_expected, actual): 00101 """ 00102 Make sure all elements of not_expected are not present in actual 00103 """ 00104 for t in not_expected: 00105 self.failIf(t in actual) 00106 00107 00108 def test_rosnode_info(self): 00109 from ros import rosnode 00110 cmd = 'rosnode' 00111 00112 nodes = ['/talker', 00113 '/foo/talker', 00114 '/bar/talker', 00115 '/baz/talker1', 00116 '/baz/talker2', 00117 '/baz/talker3', 00118 '/listeners/listener', 00119 '/rosout', 00120 ] 00121 try: 00122 rosnode._rosnode_cmd_info([cmd, 'info']) 00123 self.fail("should have failed") 00124 except SystemExit, e: 00125 self.assertNotEquals(0, e.code) 00126 00127 for n in nodes: 00128 with fakestdout() as b: 00129 rosnode._rosnode_cmd_info([cmd, 'info', n]) 00130 s = b.getvalue() 00131 self.assert_("Node [%s]"%n in s) 00132 self.assert_("Pid: " in s, s) 00133 00134 def test_rosnode_list(self): 00135 from ros import rosnode 00136 cmd = 'rosnode' 00137 00138 nodes = ['/talker', 00139 '/foo/talker', 00140 '/bar/talker', 00141 '/baz/talker1', 00142 '/baz/talker2', 00143 '/baz/talker3', 00144 '/rosout', 00145 ] 00146 l = rosnode.get_node_names() 00147 for t in nodes: 00148 self.assert_(t in l) 00149 00150 try: 00151 rosnode._rosnode_cmd_list([cmd, 'list', 'one', 'two']) 00152 self.fail("should have failed") 00153 except SystemExit, e: 00154 self.assertNotEquals(0, e.code) 00155 00156 with fakestdout() as b: 00157 rosnode._rosnode_cmd_list([cmd, 'list']) 00158 self._check(nodes, tolist(b)) 00159 with fakestdout() as b: 00160 rosnode._rosnode_cmd_list([cmd, 'list', '/']) 00161 l = tolist(b) 00162 self._check(nodes, l) 00163 num_nodes = len(l) 00164 # test -u uris 00165 with fakestdout() as b: 00166 rosnode._rosnode_cmd_list([cmd, 'list', '-u', '/']) 00167 l = tolist(b) 00168 self.assertEquals(num_nodes, len(l)) 00169 self.failIf([n for n in l if not n.startswith('http://')]) 00170 # test -a all 00171 with fakestdout() as b: 00172 rosnode._rosnode_cmd_list([cmd, 'list', '-a', '/']) 00173 l = tolist(b) 00174 uris = [x.split()[0] for x in l if x] 00175 names = [x.split()[1] for x in l if x] 00176 self._check(nodes, names) 00177 self.assertEquals(num_nodes, len(uris)) 00178 self.failIf([n for n in uris if not n.startswith('http://')]) 00179 00180 00181 # test with namespace 00182 foon = [p for p in nodes if p.startswith('/foo/')] 00183 not_foon = [p for p in nodes if not p.startswith('/foo/')] 00184 for ns in ['foo', '/foo', '/foo/']: 00185 with fakestdout() as b: 00186 rosnode._rosnode_cmd_list([cmd, 'list', ns]) 00187 self._check(foon, tolist(b)) 00188 self._notcheck(not_foon, tolist(b)) 00189 bazn = [p for p in nodes if p.startswith('/baz/')] 00190 not_bazn = [p for p in nodes if not p.startswith('/baz/')] 00191 for ns in ['baz', '/baz', '/baz/']: 00192 with fakestdout() as b: 00193 rosnode._rosnode_cmd_list([cmd, 'list', ns]) 00194 self._check(bazn, tolist(b)) 00195 self._notcheck(not_bazn, tolist(b)) 00196 00197 # test with no match 00198 with fakestdout() as b: 00199 rosnode._rosnode_cmd_list([cmd, 'list', '/not/a/namespace/']) 00200 self.assertEquals([], tolist(b)) 00201 00202 00203 def test_rosnode_usage(self): 00204 from ros import rosnode 00205 cmd = 'rosnode' 00206 for c in ['ping', 'list', 'info', 'machine', 'cleanup', 'kill']: 00207 try: 00208 with fakestdout() as b: 00209 rosnode.rosnodemain([cmd, c, '-h']) 00210 self.assert_("usage" in b.getvalue()) 00211 self.fail("should have exited on usage") 00212 except SystemExit, e: 00213 self.assertEquals(0, e.code) 00214 00215 def test_rosnode_ping(self): 00216 from ros import rosnode 00217 cmd = 'rosnode' 00218 00219 self.failIf(rosnode.rosnode_ping('/fake_node', max_count=1)) 00220 self.assert_(rosnode.rosnode_ping('/rosout', max_count=1)) 00221 self.assert_(rosnode.rosnode_ping('/rosout', max_count=2)) 00222 00223 with fakestdout() as b: 00224 self.assert_(rosnode.rosnode_ping('/rosout', max_count=2, verbose=True)) 00225 s = b.getvalue() 00226 self.assert_('xmlrpc reply' in s, s) 00227 self.assert_('ping average:' in s, s) 00228 00229 # test via command-line API 00230 rosnode._rosnode_cmd_ping([cmd, 'ping', '-c', '1', '/fake_node']) 00231 with fakestdout() as b: 00232 rosnode._rosnode_cmd_ping([cmd, 'ping', '-c', '1', '/rosout']) 00233 s = b.getvalue() 00234 self.assert_('xmlrpc reply' in s, s) 00235 with fakestdout() as b: 00236 rosnode._rosnode_cmd_ping([cmd, 'ping', '-c', '1', 'rosout']) 00237 s = b.getvalue() 00238 self.assert_('xmlrpc reply' in s, s) 00239 with fakestdout() as b: 00240 rosnode._rosnode_cmd_ping([cmd, 'ping', '-c', '2', 'rosout']) 00241 s = b.getvalue() 00242 self.assertEquals(2, s.count('xmlrpc reply')) 00243 00244 def test_rosnode_ping_all(self): 00245 from ros import rosnode 00246 cmd = 'rosnode' 00247 00248 pinged, unpinged = rosnode.rosnode_ping_all(verbose=False) 00249 self.assert_('/rosout' in pinged) 00250 with fakestdout() as b: 00251 pinged, unpinged = rosnode.rosnode_ping_all(verbose=True) 00252 self.assert_('xmlrpc reply' in b.getvalue()) 00253 self.assert_('/rosout' in pinged) 00254 00255 def test_rosnode_kill(self): 00256 from ros import rosnode 00257 cmd = 'rosnode' 00258 for n in ['to_kill/kill1', '/to_kill/kill2']: 00259 self.assert_(rosnode.rosnode_ping(n, max_count=1)) 00260 rosnode._rosnode_cmd_kill([cmd, 'kill', n]) 00261 self.failIf(rosnode.rosnode_ping(n, max_count=1)) 00262 00263 def test_fullusage(self): 00264 from ros import rosnode 00265 try: 00266 rosnode._fullusage() 00267 except SystemExit: pass 00268 try: 00269 rosnode.rosnodemain(['rosnode']) 00270 except SystemExit: pass 00271 try: 00272 rosnode.rosnodemain(['rosnode', 'invalid']) 00273 except SystemExit: pass 00274 00275 00276 if __name__ == '__main__': 00277 rostest.unitrun(PKG, NAME, TestRosnode, sys.argv, coverage_packages=['rosnode'])