$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2009, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 # Revision $Id$ 00035 00036 PKG = 'test_rostopic' 00037 NAME = 'test_rostopic_command_line_online' 00038 import roslib; roslib.load_manifest(PKG) 00039 00040 import os 00041 import signal 00042 import sys 00043 import time 00044 import unittest 00045 00046 import rospy 00047 import rostest 00048 00049 import std_msgs.msg 00050 00051 from subprocess import Popen, PIPE, check_call, call 00052 00053 def run_for(cmd, secs): 00054 popen = Popen(cmd, stdout=PIPE, stderr=PIPE, close_fds=True) 00055 timeout_t = time.time() + secs 00056 while time.time() < timeout_t: 00057 time.sleep(0.1) 00058 os.kill(popen.pid, signal.SIGKILL) 00059 00060 class TestRostopicOnline(unittest.TestCase): 00061 00062 def setUp(self): 00063 self.vals = set() 00064 self.msgs = {} 00065 00066 def callback(self, msg, val): 00067 self.vals.add(val) 00068 self.msgs[val] = msg 00069 00070 def test_rostopic(self): 00071 topics = ['/chatter', '/foo/chatter', '/bar/chatter'] 00072 00073 # wait for network to initialize 00074 rospy.init_node('test') 00075 for i, t in enumerate(topics): 00076 rospy.Subscriber(t, std_msgs.msg.String, self.callback, i) 00077 all = set(range(0, len(topics))) 00078 00079 timeout_t = time.time() + 10. 00080 while time.time() < timeout_t and self.vals != all: 00081 time.sleep(0.1) 00082 00083 # network is initialized 00084 cmd = 'rostopic' 00085 names = ['/chatter', 'foo/chatter'] 00086 00087 # list 00088 # - we aren't matching against the core services as those can make the test suites brittle 00089 output = Popen([cmd, 'list'], stdout=PIPE).communicate()[0] 00090 l = set(output.split()) 00091 for t in topics: 00092 self.assert_(t in l) 00093 00094 for name in names: 00095 # type 00096 output = Popen([cmd, 'type', name], stdout=PIPE).communicate()[0] 00097 self.assertEquals('std_msgs/String', output.strip()) 00098 00099 # find 00100 output = Popen([cmd, 'find', 'std_msgs/String'], stdout=PIPE).communicate()[0] 00101 values = [n.strip() for n in output.split('\n') if n.strip()] 00102 self.assertEquals(set(values), set(topics)) 00103 00104 #echo 00105 # test with -c option to get command to terminate 00106 count = 3 00107 output = Popen([cmd, 'echo', name, '-n', str(count)], stdout=PIPE).communicate()[0] 00108 values = [n.strip() for n in output.split('\n') if n.strip()] 00109 values = [n for n in values if n != '---'] 00110 self.assertEquals(count, len(values), "wrong number of echos in output:\n"+str(values)) 00111 for n in values: 00112 self.assert_('data: hello world ' in n, n) 00113 00114 if 0: 00115 #bw 00116 stdout, stderr = run_for([cmd, 'bw', name], 3.) 00117 self.assert_('average:' in stdout, "OUTPUT: %s\n%s"%(stdout,stderr)) 00118 00119 # hz 00120 stdout, stderr = run_for([cmd, 'hz', name], 2.) 00121 self.assert_('average rate:' in stdout) 00122 00123 # pub 00124 # - pub wait until ctrl-C, so we have to wait then kill it 00125 if 1: 00126 s = 'hello' 00127 t = '/pub/chatter' 00128 key = len(topics) 00129 rospy.Subscriber(t, std_msgs.msg.String, self.callback, key) 00130 00131 #TODO: correct popen call 00132 args = [cmd, 'pub', t, 'std_msgs/String', s] 00133 popen = Popen(args, stdout=PIPE, stderr=PIPE, close_fds=True) 00134 00135 # - give rostopic pub 5 seconds to send us a message 00136 all = set(range(0, key+1)) 00137 timeout_t = time.time() + 5. 00138 while time.time() < timeout_t and self.vals != all: 00139 time.sleep(0.1) 00140 # - check published value 00141 msg = self.msgs[key] 00142 self.assertEquals(s, msg.data) 00143 00144 os.kill(popen.pid, signal.SIGKILL) 00145 00146 # test with dictionary 00147 t = '/pub2/chatter' 00148 key = len(topics)+1 00149 rospy.Subscriber(t, std_msgs.msg.String, self.callback, key) 00150 00151 args = [cmd, 'pub', t, 'std_msgs/String', "{data: %s}"%s] 00152 popen = Popen(args, stdout=PIPE, stderr=PIPE, close_fds=True) 00153 00154 # - give rostopic pub 5 seconds to send us a message 00155 all = set(range(0, key+2)) 00156 timeout_t = time.time() + 5. 00157 while time.time() < timeout_t and self.vals != all: 00158 time.sleep(0.1) 00159 00160 # - check published value 00161 try: 00162 msg = self.msgs[key] 00163 except KeyError: 00164 self.fail("no message received on "+str(key)) 00165 self.assertEquals(s, msg.data) 00166 00167 os.kill(popen.pid, signal.SIGKILL) 00168 00169 if __name__ == '__main__': 00170 rostest.run(PKG, NAME, TestRostopicOnline, sys.argv)