$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2008, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 00035 import roslib 00036 roslib.load_manifest('test_rosbag') 00037 00038 import unittest 00039 import rospy 00040 import rostest 00041 import sys 00042 from cStringIO import StringIO 00043 import time 00044 from random_messages import RandomMsgGen 00045 import subprocess 00046 import os 00047 00048 DELAY = 0.5 00049 00050 class RandomPlay(unittest.TestCase): 00051 00052 def msg_cb_topic(self, topic): 00053 def msg_cb(msg): 00054 nowtime = rospy.Time.now() 00055 00056 if self.start is None: 00057 self.start = nowtime 00058 00059 nowtime -= self.start 00060 00061 self.input.append((topic, msg, nowtime.to_sec())) 00062 00063 return msg_cb 00064 00065 00066 def test_random_play(self): 00067 rospy.init_node('random_sub', anonymous=True) 00068 00069 self.start = None 00070 self.input = [] 00071 00072 self.assertTrue(len(sys.argv[1]) > 3) 00073 00074 seed = int(sys.argv[1]) 00075 topics = int(sys.argv[2]) 00076 length = float(sys.argv[3]) 00077 scale = float(sys.argv[4]) 00078 self.use_clock = bool(int(sys.argv[5])) 00079 00080 rmg = RandomMsgGen(int(seed), topics, length) 00081 00082 subscribers = {} 00083 for (topic, msg_class) in rmg.topics(): 00084 subscribers[topic] = rospy.Subscriber(topic, msg_class, self.msg_cb_topic(topic)) 00085 00086 bagpath = os.path.join('/tmp', 'test_rosbag_random_record_%d.bag'%seed) 00087 cmd = ['rosbag', 'play', '-d', str(DELAY), '-r', str(scale)] 00088 00089 rospy.loginfo(str(cmd)) 00090 00091 if (self.use_clock): 00092 cmd += ['--clock', '--hz', '100'] 00093 00094 cmd += [bagpath] 00095 00096 f1 = subprocess.Popen(cmd) 00097 00098 while (len(self.input) < rmg.message_count()): 00099 # print "\n%d/%d\n"%(len(self.input), rmg.message_count()) 00100 time.sleep(.1) 00101 00102 self.assertEqual(len(self.input), rmg.message_count()) 00103 00104 max_late = 0 00105 max_early = 0 00106 avg_off = 0 00107 power = 0 00108 00109 for (expect_topic, expect_msg, expect_time) in rmg.messages(): 00110 00111 if (not self.use_clock): 00112 expect_time /= scale 00113 00114 buff = StringIO() 00115 expect_msg.serialize(buff) 00116 expect_msg.deserialize(buff.getvalue()) 00117 00118 msg_match = False 00119 00120 for ind in xrange(0,100): 00121 (input_topic, input_msg, input_time) = self.input[ind] 00122 00123 if (roslib.message.strify_message(expect_msg) == roslib.message.strify_message(input_msg)): 00124 msg_match = True 00125 del self.input[ind] 00126 00127 # stats 00128 diff = input_time - expect_time 00129 00130 if (diff < max_early): 00131 max_early = diff 00132 00133 if (diff > max_late): 00134 max_late = diff 00135 00136 avg_off += diff / rmg.message_count() 00137 00138 power += (diff**2) / rmg.message_count() 00139 00140 # Messages can arrive late, but never very early Both of 00141 # these bounds are much larger than they ought to be, but 00142 # you never know with a heavily loaded system. 00143 self.assertTrue(input_time - expect_time > -.5) 00144 self.assertTrue(abs(input_time - expect_time) < .5) 00145 break 00146 00147 if not msg_match: 00148 print "No match at time: %f"%expect_time 00149 00150 self.assertTrue(msg_match) 00151 00152 print "%f %f %f %f"%(max_early, max_late, avg_off, power) 00153 00154 (o1,e1) = f1.communicate() 00155 self.assertEqual(f1.returncode, 0) 00156 00157 if __name__ == '__main__': 00158 rostest.rosrun('test_rosbag', 'random_record_play', RandomPlay, sys.argv)