00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 PKG = 'test_rosnode'
00037 NAME = 'test_rosnode_command_line_online'
00038 import roslib; roslib.load_manifest(PKG)
00039
00040 import os
00041 import signal
00042 import sys
00043 import time
00044 import unittest
00045
00046 import rospy
00047 import rostest
00048
00049 import std_msgs.msg
00050
00051 from subprocess import Popen, PIPE, check_call, call
00052
00053 def run_for(cmd, secs):
00054 popen = Popen(cmd, stdout=PIPE, stderr=PIPE, close_fds=True)
00055 timeout_t = time.time() + secs
00056 while time.time() < timeout_t:
00057 time.sleep(0.1)
00058 os.kill(popen.pid, signal.SIGKILL)
00059
00060 class TestRosnodeOnline(unittest.TestCase):
00061
00062 def setUp(self):
00063 self.vals = set()
00064 self.msgs = {}
00065
00066 def callback(self, msg, val):
00067 self.vals.add(val)
00068 self.msgs[val] = msg
00069
00070 def test_rosnode(self):
00071 topics = ['/chatter', '/foo/chatter', '/bar/chatter']
00072
00073
00074 rospy.init_node('test')
00075 nodes = ['/talker', '/foo/talker', '/bar/talker', rospy.get_caller_id()]
00076
00077 for i, t in enumerate(topics):
00078 rospy.Subscriber(t, std_msgs.msg.String, self.callback, i)
00079 all = set(range(0, len(topics)))
00080
00081 timeout_t = time.time() + 10.
00082 while time.time() < timeout_t and self.vals != all:
00083 time.sleep(0.1)
00084 self.assertEquals(self.vals, all, "failed to initialize graph correctly")
00085
00086
00087
00088 cmd = 'rosnode'
00089
00090
00091
00092 output = Popen([cmd, 'list'], stdout=PIPE).communicate()[0]
00093 l = set(output.split())
00094 for t in nodes:
00095 self.assert_(t in l, "%s not in %s"%(t, l))
00096
00097 output = Popen([cmd, 'list', '-a'], stdout=PIPE).communicate()[0]
00098 l = set(output.split())
00099 for t in nodes:
00100 for e in l:
00101 if t in e:
00102 break
00103 else:
00104 self.fail("did not find [%s] in list [%s]"%(t, l))
00105
00106 output = Popen([cmd, 'list', '-u'], stdout=PIPE).communicate()[0]
00107 l = set(output.split())
00108 self.assert_(len(l), "list -u is empty")
00109 for e in l:
00110 self.assert_(e.startswith('http://'))
00111
00112 for name in nodes:
00113
00114 output = Popen([cmd, 'info', name], stdout=PIPE).communicate()[0]
00115
00116 self.assert_(name in output)
00117 self.assert_('chatter' in output)
00118 self.assert_('Publications' in output)
00119 self.assert_('Subscriptions' in output)
00120
00121 if 0:
00122
00123 stdout, stderr = run_for([cmd, 'ping', name], 3.)
00124
00125 if __name__ == '__main__':
00126 rostest.run(PKG, NAME, TestRosnodeOnline, sys.argv)