00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 from __future__ import with_statement
00037
00038 PKG = 'test_rosnode'
00039 NAME = 'test_rosnode_command_line_online'
00040 import roslib; roslib.load_manifest(PKG)
00041
00042 import os
00043 import signal
00044 import sys
00045 import time
00046 import unittest
00047
00048 import rostest
00049
00050 import cStringIO
00051 from subprocess import Popen, PIPE, check_call, call
00052
00053 from roslib.scriptutil import get_param_server, script_resolve_name
00054
00055 from contextlib import contextmanager
00056 @contextmanager
00057 def fakestdout():
00058 realstdout = sys.stdout
00059 fakestdout = cStringIO.StringIO()
00060 sys.stdout = fakestdout
00061 yield fakestdout
00062 sys.stdout = realstdout
00063
00064 def tolist(b):
00065 return [x.strip() for x in b.getvalue().split('\n') if x.strip()]
00066
00067 class TestRosnode(unittest.TestCase):
00068
00069 def __init__(self, *args):
00070 unittest.TestCase.__init__(self, *args)
00071 self.vals = set()
00072
00073
00074 import rospy
00075 import std_msgs.msg
00076 rospy.init_node('test')
00077 topics = ['/chatter', '/foo/chatter', '/bar/chatter']
00078 subs = [rospy.Subscriber(t, std_msgs.msg.String, self.callback, i) for i, t in enumerate(topics)]
00079 all = set(range(0, len(topics)))
00080
00081 timeout_t = time.time() + 10.
00082 while time.time() < timeout_t and self.vals != all:
00083 time.sleep(0.1)
00084 [s.unregister() for s in subs]
00085
00086 def setUp(self):
00087 self.vals = set()
00088 self.msgs = {}
00089
00090 def callback(self, msg, val):
00091 self.vals.add(val)
00092
00093 def _check(self, expected, actual):
00094 """
00095 Make sure all elements of expected are present in actual
00096 """
00097 for t in expected:
00098 self.assert_(t in actual)
00099
00100 def _notcheck(self, not_expected, actual):
00101 """
00102 Make sure all elements of not_expected are not present in actual
00103 """
00104 for t in not_expected:
00105 self.failIf(t in actual)
00106
00107
00108 def test_rosnode_info(self):
00109 from ros import rosnode
00110 cmd = 'rosnode'
00111
00112 nodes = ['/talker',
00113 '/foo/talker',
00114 '/bar/talker',
00115 '/baz/talker1',
00116 '/baz/talker2',
00117 '/baz/talker3',
00118 '/listeners/listener',
00119 '/rosout',
00120 ]
00121 try:
00122 rosnode._rosnode_cmd_info([cmd, 'info'])
00123 self.fail("should have failed")
00124 except SystemExit, e:
00125 self.assertNotEquals(0, e.code)
00126
00127 for n in nodes:
00128 with fakestdout() as b:
00129 rosnode._rosnode_cmd_info([cmd, 'info', n])
00130 s = b.getvalue()
00131 self.assert_("Node [%s]"%n in s)
00132 self.assert_("Pid: " in s, s)
00133
00134 def test_rosnode_list(self):
00135 from ros import rosnode
00136 cmd = 'rosnode'
00137
00138 nodes = ['/talker',
00139 '/foo/talker',
00140 '/bar/talker',
00141 '/baz/talker1',
00142 '/baz/talker2',
00143 '/baz/talker3',
00144 '/rosout',
00145 ]
00146 l = rosnode.get_node_names()
00147 for t in nodes:
00148 self.assert_(t in l)
00149
00150 try:
00151 rosnode._rosnode_cmd_list([cmd, 'list', 'one', 'two'])
00152 self.fail("should have failed")
00153 except SystemExit, e:
00154 self.assertNotEquals(0, e.code)
00155
00156 with fakestdout() as b:
00157 rosnode._rosnode_cmd_list([cmd, 'list'])
00158 self._check(nodes, tolist(b))
00159 with fakestdout() as b:
00160 rosnode._rosnode_cmd_list([cmd, 'list', '/'])
00161 l = tolist(b)
00162 self._check(nodes, l)
00163 num_nodes = len(l)
00164
00165 with fakestdout() as b:
00166 rosnode._rosnode_cmd_list([cmd, 'list', '-u', '/'])
00167 l = tolist(b)
00168 self.assertEquals(num_nodes, len(l))
00169 self.failIf([n for n in l if not n.startswith('http://')])
00170
00171 with fakestdout() as b:
00172 rosnode._rosnode_cmd_list([cmd, 'list', '-a', '/'])
00173 l = tolist(b)
00174 uris = [x.split()[0] for x in l if x]
00175 names = [x.split()[1] for x in l if x]
00176 self._check(nodes, names)
00177 self.assertEquals(num_nodes, len(uris))
00178 self.failIf([n for n in uris if not n.startswith('http://')])
00179
00180
00181
00182 foon = [p for p in nodes if p.startswith('/foo/')]
00183 not_foon = [p for p in nodes if not p.startswith('/foo/')]
00184 for ns in ['foo', '/foo', '/foo/']:
00185 with fakestdout() as b:
00186 rosnode._rosnode_cmd_list([cmd, 'list', ns])
00187 self._check(foon, tolist(b))
00188 self._notcheck(not_foon, tolist(b))
00189 bazn = [p for p in nodes if p.startswith('/baz/')]
00190 not_bazn = [p for p in nodes if not p.startswith('/baz/')]
00191 for ns in ['baz', '/baz', '/baz/']:
00192 with fakestdout() as b:
00193 rosnode._rosnode_cmd_list([cmd, 'list', ns])
00194 self._check(bazn, tolist(b))
00195 self._notcheck(not_bazn, tolist(b))
00196
00197
00198 with fakestdout() as b:
00199 rosnode._rosnode_cmd_list([cmd, 'list', '/not/a/namespace/'])
00200 self.assertEquals([], tolist(b))
00201
00202
00203 def test_rosnode_usage(self):
00204 from ros import rosnode
00205 cmd = 'rosnode'
00206 for c in ['ping', 'list', 'info', 'machine', 'cleanup', 'kill']:
00207 try:
00208 with fakestdout() as b:
00209 rosnode.rosnodemain([cmd, c, '-h'])
00210 self.assert_("usage" in b.getvalue())
00211 self.fail("should have exited on usage")
00212 except SystemExit, e:
00213 self.assertEquals(0, e.code)
00214
00215 def test_rosnode_ping(self):
00216 from ros import rosnode
00217 cmd = 'rosnode'
00218
00219 self.failIf(rosnode.rosnode_ping('/fake_node', max_count=1))
00220 self.assert_(rosnode.rosnode_ping('/rosout', max_count=1))
00221 self.assert_(rosnode.rosnode_ping('/rosout', max_count=2))
00222
00223 with fakestdout() as b:
00224 self.assert_(rosnode.rosnode_ping('/rosout', max_count=2, verbose=True))
00225 s = b.getvalue()
00226 self.assert_('xmlrpc reply' in s, s)
00227 self.assert_('ping average:' in s, s)
00228
00229
00230 rosnode._rosnode_cmd_ping([cmd, 'ping', '-c', '1', '/fake_node'])
00231 with fakestdout() as b:
00232 rosnode._rosnode_cmd_ping([cmd, 'ping', '-c', '1', '/rosout'])
00233 s = b.getvalue()
00234 self.assert_('xmlrpc reply' in s, s)
00235 with fakestdout() as b:
00236 rosnode._rosnode_cmd_ping([cmd, 'ping', '-c', '1', 'rosout'])
00237 s = b.getvalue()
00238 self.assert_('xmlrpc reply' in s, s)
00239 with fakestdout() as b:
00240 rosnode._rosnode_cmd_ping([cmd, 'ping', '-c', '2', 'rosout'])
00241 s = b.getvalue()
00242 self.assertEquals(2, s.count('xmlrpc reply'))
00243
00244 def test_rosnode_ping_all(self):
00245 from ros import rosnode
00246 cmd = 'rosnode'
00247
00248 pinged, unpinged = rosnode.rosnode_ping_all(verbose=False)
00249 self.assert_('/rosout' in pinged)
00250 with fakestdout() as b:
00251 pinged, unpinged = rosnode.rosnode_ping_all(verbose=True)
00252 self.assert_('xmlrpc reply' in b.getvalue())
00253 self.assert_('/rosout' in pinged)
00254
00255 def test_rosnode_kill(self):
00256 from ros import rosnode
00257 cmd = 'rosnode'
00258 for n in ['to_kill/kill1', '/to_kill/kill2']:
00259 self.assert_(rosnode.rosnode_ping(n, max_count=1))
00260 rosnode._rosnode_cmd_kill([cmd, 'kill', n])
00261 self.failIf(rosnode.rosnode_ping(n, max_count=1))
00262
00263 def test_fullusage(self):
00264 from ros import rosnode
00265 try:
00266 rosnode._fullusage()
00267 except SystemExit: pass
00268 try:
00269 rosnode.rosnodemain(['rosnode'])
00270 except SystemExit: pass
00271 try:
00272 rosnode.rosnodemain(['rosnode', 'invalid'])
00273 except SystemExit: pass
00274
00275
00276 if __name__ == '__main__':
00277 rostest.unitrun(PKG, NAME, TestRosnode, sys.argv, coverage_packages=['rosnode'])