00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 from __future__ import with_statement
00037
00038 NAME = 'test_rostopic'
00039 import roslib; roslib.load_manifest('test_rostopic')
00040
00041 import os
00042 import sys
00043 import unittest
00044 import cStringIO
00045 import time
00046
00047 import rostest
00048 import rospy
00049 import std_msgs.msg
00050
00051 from subprocess import Popen, PIPE, check_call, call
00052
00053 from contextlib import contextmanager
00054
00055 @contextmanager
00056 def fakestdout():
00057 realstdout = sys.stdout
00058 fakestdout = cStringIO.StringIO()
00059 sys.stdout = fakestdout
00060 yield fakestdout
00061 sys.stdout = realstdout
00062
00063 def todict(s):
00064 d = {}
00065 for l in s.split('\n'):
00066 key, p, val = l.partition(':')
00067 if p:
00068 d[key] = val.strip()
00069 return d
00070
00071 class TestRostopic(unittest.TestCase):
00072
00073 def __init__(self, *args):
00074 unittest.TestCase.__init__(self, *args)
00075 self.vals = set()
00076
00077
00078 rospy.init_node('test')
00079 topics = ['/chatter', '/foo/chatter', '/bar/chatter']
00080 subs = [rospy.Subscriber(t, std_msgs.msg.String, self.callback, i) for i, t in enumerate(topics)]
00081 all = set(range(0, len(topics)))
00082
00083 timeout_t = time.time() + 10.
00084 while time.time() < timeout_t and self.vals != all and not rospy.is_shutdown():
00085 time.sleep(0.1)
00086 [s.unregister() for s in subs]
00087 if rospy.is_shutdown():
00088 self.fail("shutdown")
00089
00090 def test_offline(self):
00091 from ros import rostopic
00092 orig_uri = os.environ['ROS_MASTER_URI']
00093 os.environ['ROS_MASTER_URI'] = 'http://fake_host:12356'
00094
00095 try:
00096 c = 'rostopic'
00097
00098 try:
00099 rostopic._rostopic_cmd_list([c, 'list'])
00100 self.fail("should have raised ROSTopicIOException")
00101 except rostopic.ROSTopicIOException: pass
00102
00103 try:
00104 rostopic._rostopic_cmd_info([c, 'info', '/chatter'])
00105 self.fail("should have raised ROSTopicIOException")
00106 except rostopic.ROSTopicIOException: pass
00107
00108 try:
00109 rostopic._rostopic_cmd_type([c, 'type', '/chatter'])
00110 self.fail("should have raised ROSTopicIOException")
00111 except rostopic.ROSTopicIOException: pass
00112
00113 try:
00114 rostopic._rostopic_cmd_find([c, 'find', 'std_msgs/String'])
00115 self.fail("should have raised ROSTopicIOException")
00116 except rostopic.ROSTopicIOException: pass
00117
00118 finally:
00119 os.environ['ROS_MASTER_URI'] = orig_uri
00120
00121 def test_cmd_type(self):
00122 from ros import rostopic
00123 cmd = 'rostopic'
00124 s = '/rosout_agg'
00125 t = 'rosgraph_msgs/Log'
00126
00127 try:
00128 rostopic.rostopicmain([cmd, 'type', 'fake'])
00129 self.fail("should have exited")
00130 except SystemExit, e:
00131 self.assertNotEquals(0, e.code)
00132
00133 for s in ['/chatter', 'chatter', 'foo/chatter', '/bar/chatter']:
00134 with fakestdout() as b:
00135 rostopic.rostopicmain([cmd, 'type', s])
00136 v = b.getvalue().strip()
00137 self.assertEquals('std_msgs/String', v)
00138
00139 def test_main(self):
00140 from ros import rostopic
00141 c = 'rostopic'
00142 try:
00143 rostopic.rostopicmain([c])
00144 self.fail("should have exited with error")
00145 except SystemExit, e:
00146 self.assertNotEquals(0, e.code)
00147 try:
00148 rostopic.rostopicmain([c, 'foo'])
00149 self.fail("should have exited with error")
00150 except SystemExit, e:
00151 self.assertNotEquals(0, e.code)
00152
00153 def test_cmd_pub(self):
00154 from ros import rostopic
00155 cmd = 'rostopic'
00156
00157
00158
00159
00160
00161 invalid = [['-r', '--once', '/foo', 'std_msgs/String', 'hello'],
00162 ['-r', 'a', '/foo', 'std_msgs/String', 'hello'],
00163 ['-r', '--', '-1', '/foo', 'std_msgs/String', 'hello'],
00164 [],
00165 ['/foo'],
00166 ['/foo', 'std_msgs/String', 'a: b: !!c: d: e'],
00167 ]
00168 for i in invalid:
00169 try:
00170 rostopic.rostopicmain([cmd, 'pub'] + i)
00171 self.fail("should have exited with error"+str(i))
00172 except SystemExit, e:
00173 self.assert_(e.code != 0)
00174
00175
00176
00177 def test_full_usage(self):
00178 from ros import rostopic
00179 try:
00180 rostopic._fullusage()
00181 self.fail("should have caused system exit")
00182 except SystemExit: pass
00183
00184 def test_get_topic_type(self):
00185 from ros import rostopic
00186
00187 self.assertEquals((None, None, None), rostopic.get_topic_type('/fake', blocking=False))
00188
00189 t, n, f = rostopic.get_topic_type('/rosout', blocking=False)
00190 self.assertEquals('rosgraph_msgs/Log', t)
00191 self.assertEquals('/rosout', n)
00192 self.assert_(f is None)
00193
00194 t, n, f = rostopic.get_topic_type('/rosout/name', blocking=False)
00195 self.assertEquals('rosgraph_msgs/Log', t)
00196 self.assertEquals('/rosout', n)
00197 self.failIf(f is None)
00198 from rosgraph_msgs.msg import Log
00199 self.assertEquals("bob", f(Log(name="bob")))
00200
00201 def test_get_topic_class(self):
00202 from ros import rostopic
00203
00204 self.assertEquals((None, None, None), rostopic.get_topic_class('/fake'))
00205
00206 from rosgraph_msgs.msg import Log
00207 c, n, f = rostopic.get_topic_class('/rosout')
00208 self.assertEquals(Log, c)
00209 self.assertEquals('/rosout', n)
00210 self.assert_(f is None)
00211
00212 c, n, f = rostopic.get_topic_class('/rosout/name')
00213 self.assertEquals(c, Log)
00214 self.assertEquals('/rosout', n)
00215 self.failIf(f is None)
00216 self.assertEquals("bob", f(Log(name="bob")))
00217
00218 def test_cmd_info(self):
00219 from ros import rostopic
00220 cmd = 'rostopic'
00221
00222 try:
00223 rostopic.rostopicmain([cmd, 'info'])
00224 self.fail("should have exited with error")
00225 except SystemExit: pass
00226 try:
00227 rostopic.rostopicmain([cmd, 'info', '/fake_topic'])
00228 self.fail("should have exited with error")
00229 except SystemExit: pass
00230 try:
00231 rostopic.rostopicmain([cmd, 'info', '/chatter', '/bar/chatter'])
00232 self.fail("should have exited with error")
00233 except SystemExit: pass
00234
00235 with fakestdout() as b:
00236 rostopic.rostopicmain([cmd, 'info', 'rosout'])
00237 v = b.getvalue()
00238 for s in ["Publishers:", "Subscribers", "Type: rosgraph_msgs/Log", " * /rosout"]:
00239 self.assert_(s in v, "failed on %s: %s"%(s, v))
00240 with fakestdout() as b:
00241 rostopic.rostopicmain([cmd, 'info', '/chatter'])
00242 v = b.getvalue()
00243 for s in ["Publishers:", "Subscribers", "Type: std_msgs/String", " * /talker"]:
00244 self.assert_(s in v, "failed on %s: %s"%(s, v))
00245
00246 def test_cmd_find(self):
00247 from ros import rostopic
00248 cmd = 'rostopic'
00249
00250 try:
00251 rostopic.rostopicmain([cmd, 'find'])
00252 self.fail("arg parsing should have failed")
00253 except SystemExit: pass
00254 try:
00255 rostopic.rostopicmain([cmd, 'find', 'std_msgs/String', 'std_msgs/Int32'])
00256 self.fail("arg parsing should have failed")
00257 except SystemExit: pass
00258
00259
00260 with fakestdout() as b:
00261 rostopic.rostopicmain([cmd, 'find', 'std_msgs/String'])
00262 d = [x for x in b.getvalue().split('\n') if x.strip()]
00263 v = ['/foo/chatter', '/bar/chatter', '/chatter']
00264 self.assertEquals(set(v), set(d))
00265
00266 def callback(self, msg, val):
00267 self.vals.add(val)
00268
00269 def test_cmd_list(self):
00270 from ros import rostopic
00271 cmd = 'rostopic'
00272 s = '/add_two_ints'
00273
00274
00275 for invalid in [['-ps'], ['-b' 'file.bag', '-s'], ['-b' 'file.bag', '-p']]:
00276 try:
00277 rostopic.rostopicmain([cmd, 'list'] + invalid)
00278 self.fail("should have failed")
00279 except SystemExit: pass
00280
00281
00282 rostopic.rostopicmain([cmd, 'list'])
00283
00284
00285 topics = ['/chatter', '/foo/chatter', '/bar/chatter', '/rosout', '/rosout_agg']
00286
00287 with fakestdout() as b:
00288 rostopic.rostopicmain([cmd, 'list'])
00289 v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
00290 self.failIf(set(topics)-set(v))
00291
00292
00293 topics = ['/chatter', '/foo/chatter', '/bar/chatter', '/rosout', '/rosout_agg']
00294 with fakestdout() as b:
00295 rostopic.rostopicmain([cmd, 'list', '-p'])
00296 v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
00297 self.failIf(set(topics)-set(v))
00298 self.failIf('/clock' in v)
00299
00300
00301 topics = ['/rosout']
00302 with fakestdout() as b:
00303 rostopic.rostopicmain([cmd, 'list', '-s'])
00304 v = [x.strip() for x in b.getvalue().split('\n') if x.strip()]
00305 self.failIf(set(topics)-set(v), "%s vs. %s"%(topics, v))
00306 self.failIf('/chatter' in v)
00307
00308
00309 with fakestdout() as b:
00310 rostopic.rostopicmain([cmd, 'list', '-v'])
00311 v = b.getvalue()
00312 self.assert_("Published topics:" in v)
00313 self.assert_("Subscribed topics:" in v)
00314
00315 with fakestdout() as b:
00316 rostopic.rostopicmain([cmd, 'list', '-vs'])
00317 v = b.getvalue()
00318 self.failIf("Published topics:" in v)
00319 self.assert_("Subscribed topics:" in v)
00320
00321 with fakestdout() as b:
00322 rostopic.rostopicmain([cmd, 'list', '-vp'])
00323 v = b.getvalue()
00324 self.assert_("Published topics:" in v)
00325 self.failIf("Subscribed topics:" in v)
00326
00327
00328 try:
00329 rostopic.rostopicmain([cmd, 'list', 'rosout', 'rosout_agg'])
00330 self.fail("should have caused parser error")
00331 except SystemExit:
00332 pass
00333
00334
00335 for n in topics:
00336 with fakestdout() as b:
00337 rostopic.rostopicmain([cmd, 'list', n])
00338 self.assertEquals(n, b.getvalue().strip())
00339
00340
00341 with fakestdout() as b:
00342 rostopic.rostopicmain([cmd, 'list', 'rosout'])
00343 self.assertEquals('/rosout', b.getvalue().strip())
00344
00345
00346 with fakestdout() as b:
00347 rostopic.rostopicmain([cmd, 'list', '/foo'])
00348 self.assertEquals('/foo/chatter', b.getvalue().strip())
00349 with fakestdout() as b:
00350 rostopic.rostopicmain([cmd, 'list', 'bar'])
00351 self.assertEquals('/bar/chatter', b.getvalue().strip())
00352
00353 if __name__ == '__main__':
00354 rostest.unitrun('test_rostopic', NAME, TestRostopic, sys.argv, coverage_packages=['rostopic'])