$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2009, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id: test_rostopic_command_line_offline.py 6214 2009-09-18 23:38:22Z kwc $ 00035 00036 from __future__ import with_statement 00037 00038 NAME = 'test_rostopic' 00039 import roslib; roslib.load_manifest('test_rostopic') 00040 00041 import os 00042 import sys 00043 import unittest 00044 import cStringIO 00045 import time 00046 00047 import rostest 00048 import rospy 00049 import std_msgs.msg 00050 00051 from subprocess import Popen, PIPE, check_call, call 00052 00053 from contextlib import contextmanager 00054 00055 @contextmanager 00056 def fakestdout(): 00057 realstdout = sys.stdout 00058 fakestdout = cStringIO.StringIO() 00059 sys.stdout = fakestdout 00060 yield fakestdout 00061 sys.stdout = realstdout 00062 00063 def todict(s): 00064 d = {} 00065 for l in s.split('\n'): 00066 key, p, val = l.partition(':') 00067 if p: 00068 d[key] = val.strip() 00069 return d 00070 00071 class TestRostopic(unittest.TestCase): 00072 00073 def __init__(self, *args): 00074 unittest.TestCase.__init__(self, *args) 00075 self.vals = set() 00076 00077 # wait for network to initialize 00078 rospy.init_node('test') 00079 topics = ['/chatter', '/foo/chatter', '/bar/chatter'] 00080 subs = [rospy.Subscriber(t, std_msgs.msg.String, self.callback, i) for i, t in enumerate(topics)] 00081 all = set(range(0, len(topics))) 00082 00083 timeout_t = time.time() + 10. 00084 while time.time() < timeout_t and self.vals != all and not rospy.is_shutdown(): 00085 time.sleep(0.1) 00086 [s.unregister() for s in subs] 00087 if rospy.is_shutdown(): 00088 self.fail("shutdown") 00089 00090 def test_offline(self): 00091 from ros import rostopic 00092 orig_uri = os.environ['ROS_MASTER_URI'] 00093 os.environ['ROS_MASTER_URI'] = 'http://fake_host:12356' 00094 00095 try: 00096 c = 'rostopic' 00097 00098 try: 00099 rostopic._rostopic_cmd_list([c, 'list']) 00100 self.fail("should have raised ROSTopicIOException") 00101 except rostopic.ROSTopicIOException: pass 00102 00103 try: 00104 rostopic._rostopic_cmd_info([c, 'info', '/chatter']) 00105 self.fail("should have raised ROSTopicIOException") 00106 except rostopic.ROSTopicIOException: pass 00107 00108 try: 00109 rostopic._rostopic_cmd_type([c, 'type', '/chatter']) 00110 self.fail("should have raised ROSTopicIOException") 00111 except rostopic.ROSTopicIOException: pass 00112 00113 try: 00114 rostopic._rostopic_cmd_find([c, 'find', 'std_msgs/String']) 00115 self.fail("should have raised ROSTopicIOException") 00116 except rostopic.ROSTopicIOException: pass 00117 00118 finally: 00119 os.environ['ROS_MASTER_URI'] = orig_uri 00120 00121 def test_cmd_type(self): 00122 from ros import rostopic 00123 cmd = 'rostopic' 00124 s = '/rosout_agg' 00125 t = 'rosgraph_msgs/Log' 00126 00127 try: 00128 rostopic.rostopicmain([cmd, 'type', 'fake']) 00129 self.fail("should have exited") 00130 except SystemExit, e: 00131 self.assertNotEquals(0, e.code) 00132 00133 for s in ['/chatter', 'chatter', 'foo/chatter', '/bar/chatter']: 00134 with fakestdout() as b: 00135 rostopic.rostopicmain([cmd, 'type', s]) 00136 v = b.getvalue().strip() 00137 self.assertEquals('std_msgs/String', v) 00138 00139 def test_main(self): 00140 from ros import rostopic 00141 c = 'rostopic' 00142 try: 00143 rostopic.rostopicmain([c]) 00144 self.fail("should have exited with error") 00145 except SystemExit, e: 00146 self.assertNotEquals(0, e.code) 00147 try: 00148 rostopic.rostopicmain([c, 'foo']) 00149 self.fail("should have exited with error") 00150 except SystemExit, e: 00151 self.assertNotEquals(0, e.code) 00152 00153 def test_cmd_pub(self): 00154 from ros import rostopic 00155 cmd = 'rostopic' 00156 00157 # we can't actually test functional behavior because rostopic 00158 # needs control over a node, though we can probably change the 00159 # source code to just check for the existing node. 00160 00161 invalid = [['-r', '--once', '/foo', 'std_msgs/String', 'hello'], 00162 ['-r', 'a', '/foo', 'std_msgs/String', 'hello'], 00163 ['-r', '--', '-1', '/foo', 'std_msgs/String', 'hello'], 00164 [], 00165 ['/foo'], 00166 ['/foo', 'std_msgs/String', 'a: b: !!c: d: e'], 00167 ] 00168 for i in invalid: 00169 try: 00170 rostopic.rostopicmain([cmd, 'pub'] + i) 00171 self.fail("should have exited with error"+str(i)) 00172 except SystemExit, e: 00173 self.assert_(e.code != 0) 00174 00175 00176 00177 def test_full_usage(self): 00178 from ros import rostopic 00179 try: 00180 rostopic._fullusage() 00181 self.fail("should have caused system exit") 00182 except SystemExit: pass 00183 00184 def test_get_topic_type(self): 00185 from ros import rostopic 00186 00187 self.assertEquals((None, None, None), rostopic.get_topic_type('/fake', blocking=False)) 00188 00189 t, n, f = rostopic.get_topic_type('/rosout', blocking=False) 00190 self.assertEquals('rosgraph_msgs/Log', t) 00191 self.assertEquals('/rosout', n) 00192 self.assert_(f is None) 00193 00194 t, n, f = rostopic.get_topic_type('/rosout/name', blocking=False) 00195 self.assertEquals('rosgraph_msgs/Log', t) 00196 self.assertEquals('/rosout', n) 00197 self.failIf(f is None) 00198 from rosgraph_msgs.msg import Log 00199 self.assertEquals("bob", f(Log(name="bob"))) 00200 00201 def test_get_topic_class(self): 00202 from ros import rostopic 00203 00204 self.assertEquals((None, None, None), rostopic.get_topic_class('/fake')) 00205 00206 from rosgraph_msgs.msg import Log 00207 c, n, f = rostopic.get_topic_class('/rosout') 00208 self.assertEquals(Log, c) 00209 self.assertEquals('/rosout', n) 00210 self.assert_(f is None) 00211 00212 c, n, f = rostopic.get_topic_class('/rosout/name') 00213 self.assertEquals(c, Log) 00214 self.assertEquals('/rosout', n) 00215 self.failIf(f is None) 00216 self.assertEquals("bob", f(Log(name="bob"))) 00217 00218 def test_cmd_info(self): 00219 from ros import rostopic 00220 cmd = 'rostopic' 00221 00222 try: 00223 rostopic.rostopicmain([cmd, 'info']) 00224 self.fail("should have exited with error") 00225 except SystemExit: pass 00226 try: 00227 rostopic.rostopicmain([cmd, 'info', '/fake_topic']) 00228 self.fail("should have exited with error") 00229 except SystemExit: pass 00230 try: 00231 rostopic.rostopicmain([cmd, 'info', '/chatter', '/bar/chatter']) 00232 self.fail("should have exited with error") 00233 except SystemExit: pass 00234 00235 with fakestdout() as b: 00236 rostopic.rostopicmain([cmd, 'info', 'rosout']) 00237 v = b.getvalue() 00238 for s in ["Publishers:", "Subscribers", "Type: rosgraph_msgs/Log", " * /rosout"]: 00239 self.assert_(s in v, "failed on %s: %s"%(s, v)) 00240 with fakestdout() as b: 00241 rostopic.rostopicmain([cmd, 'info', '/chatter']) 00242 v = b.getvalue() 00243 for s in ["Publishers:", "Subscribers", "Type: std_msgs/String", " * /talker"]: 00244 self.assert_(s in v, "failed on %s: %s"%(s, v)) 00245 00246 def test_cmd_find(self): 00247 from ros import rostopic 00248 cmd = 'rostopic' 00249 00250 try: 00251 rostopic.rostopicmain([cmd, 'find']) 00252 self.fail("arg parsing should have failed") 00253 except SystemExit: pass 00254 try: 00255 rostopic.rostopicmain([cmd, 'find', 'std_msgs/String', 'std_msgs/Int32']) 00256 self.fail("arg parsing should have failed") 00257 except SystemExit: pass 00258 00259 00260 with fakestdout() as b: 00261 rostopic.rostopicmain([cmd, 'find', 'std_msgs/String']) 00262 d = [x for x in b.getvalue().split('\n') if x.strip()] 00263 v = ['/foo/chatter', '/bar/chatter', '/chatter'] 00264 self.assertEquals(set(v), set(d)) 00265 00266 def callback(self, msg, val): 00267 self.vals.add(val) 00268 00269 def test_cmd_list(self): 00270 from ros import rostopic 00271 cmd = 'rostopic' 00272 s = '/add_two_ints' 00273 00274 # test failures 00275 for invalid in [['-ps'], ['-b' 'file.bag', '-s'], ['-b' 'file.bag', '-p']]: 00276 try: 00277 rostopic.rostopicmain([cmd, 'list'] + invalid) 00278 self.fail("should have failed") 00279 except SystemExit: pass 00280 00281 # test main entry 00282 rostopic.rostopicmain([cmd, 'list']) 00283 00284 # test directly 00285 topics = ['/chatter', '/foo/chatter', '/bar/chatter', '/rosout', '/rosout_agg'] 00286 00287 with fakestdout() as b: 00288 rostopic.rostopicmain([cmd, 'list']) 00289 v = [x.strip() for x in b.getvalue().split('\n') if x.strip()] 00290 self.failIf(set(topics)-set(v)) 00291 00292 # publishers-only 00293 topics = ['/chatter', '/foo/chatter', '/bar/chatter', '/rosout', '/rosout_agg'] 00294 with fakestdout() as b: 00295 rostopic.rostopicmain([cmd, 'list', '-p']) 00296 v = [x.strip() for x in b.getvalue().split('\n') if x.strip()] 00297 self.failIf(set(topics)-set(v)) 00298 self.failIf('/clock' in v) 00299 00300 # subscribers-only 00301 topics = ['/rosout'] 00302 with fakestdout() as b: 00303 rostopic.rostopicmain([cmd, 'list', '-s']) 00304 v = [x.strip() for x in b.getvalue().split('\n') if x.strip()] 00305 self.failIf(set(topics)-set(v), "%s vs. %s"%(topics, v)) 00306 self.failIf('/chatter' in v) 00307 00308 # turn on verbosity, not checking output as it's not as stable 00309 with fakestdout() as b: 00310 rostopic.rostopicmain([cmd, 'list', '-v']) 00311 v = b.getvalue() 00312 self.assert_("Published topics:" in v) 00313 self.assert_("Subscribed topics:" in v) 00314 00315 with fakestdout() as b: 00316 rostopic.rostopicmain([cmd, 'list', '-vs']) 00317 v = b.getvalue() 00318 self.failIf("Published topics:" in v) 00319 self.assert_("Subscribed topics:" in v) 00320 00321 with fakestdout() as b: 00322 rostopic.rostopicmain([cmd, 'list', '-vp']) 00323 v = b.getvalue() 00324 self.assert_("Published topics:" in v) 00325 self.failIf("Subscribed topics:" in v) 00326 00327 # test with multiple topic names 00328 try: 00329 rostopic.rostopicmain([cmd, 'list', 'rosout', 'rosout_agg']) 00330 self.fail("should have caused parser error") 00331 except SystemExit: 00332 pass 00333 00334 # test with resolved names 00335 for n in topics: 00336 with fakestdout() as b: 00337 rostopic.rostopicmain([cmd, 'list', n]) 00338 self.assertEquals(n, b.getvalue().strip()) 00339 00340 # test with relative names 00341 with fakestdout() as b: 00342 rostopic.rostopicmain([cmd, 'list', 'rosout']) 00343 self.assertEquals('/rosout', b.getvalue().strip()) 00344 00345 # test with namespaces 00346 with fakestdout() as b: 00347 rostopic.rostopicmain([cmd, 'list', '/foo']) 00348 self.assertEquals('/foo/chatter', b.getvalue().strip()) 00349 with fakestdout() as b: 00350 rostopic.rostopicmain([cmd, 'list', 'bar']) 00351 self.assertEquals('/bar/chatter', b.getvalue().strip()) 00352 00353 if __name__ == '__main__': 00354 rostest.unitrun('test_rostopic', NAME, TestRostopic, sys.argv, coverage_packages=['rostopic'])