Go to the documentation of this file.00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 import unittest
00036 import rospy
00037 import rostest
00038 import sys
00039 from cStringIO import StringIO
00040 import time
00041 from random_messages import RandomMsgGen
00042 import subprocess
00043 import os
00044 
00045 import genpy
00046 
00047 DELAY = 0.5
00048 
00049 class RandomPlay(unittest.TestCase):
00050 
00051   def msg_cb_topic(self, topic):
00052     def msg_cb(msg):
00053       nowtime = rospy.Time.now()
00054 
00055       if self.start is None:
00056         self.start = nowtime
00057 
00058       nowtime -= self.start
00059 
00060       self.input.append((topic, msg, nowtime.to_sec()))
00061 
00062     return msg_cb
00063 
00064 
00065   def test_random_play(self):
00066     rospy.init_node('random_sub', anonymous=True)
00067 
00068     self.start = None
00069     self.input = []
00070 
00071     self.assertTrue(len(sys.argv[1]) > 3)
00072 
00073     seed    = int(sys.argv[1])
00074     topics  = int(sys.argv[2])
00075     length  = float(sys.argv[3])
00076     scale   = float(sys.argv[4])
00077     self.use_clock = bool(int(sys.argv[5]))
00078 
00079     rmg = RandomMsgGen(int(seed), topics, length)
00080 
00081     subscribers = {}
00082     for (topic, msg_class) in rmg.topics():
00083       subscribers[topic] = rospy.Subscriber(topic, msg_class, self.msg_cb_topic(topic))
00084 
00085     bagpath = os.path.join('/tmp', 'test_rosbag_random_record_%d.bag'%seed)
00086     cmd = ['rosbag', 'play', '-d', str(DELAY), '-r', str(scale)]
00087     
00088     rospy.loginfo(str(cmd))
00089 
00090     if (self.use_clock):
00091       cmd += ['--clock', '--hz', '100']
00092 
00093     cmd += [bagpath]
00094 
00095     f1 = subprocess.Popen(cmd)
00096 
00097     while (len(self.input) < rmg.message_count()):
00098 
00099       time.sleep(.1)
00100 
00101     self.assertEqual(len(self.input), rmg.message_count())
00102 
00103     max_late = 0
00104     max_early = 0
00105     avg_off = 0
00106     power = 0
00107 
00108     for (expect_topic, expect_msg, expect_time) in rmg.messages():
00109 
00110       if (not self.use_clock):
00111         expect_time /= scale
00112 
00113       buff = StringIO()
00114       expect_msg.serialize(buff)
00115       expect_msg.deserialize(buff.getvalue())
00116 
00117       msg_match = False
00118 
00119       for ind in xrange(0,100):
00120         (input_topic, input_msg, input_time) = self.input[ind]
00121 
00122         if (genpy.message.strify_message(expect_msg) == genpy.message.strify_message(input_msg)):
00123           msg_match = True
00124           del self.input[ind]
00125 
00126           
00127           diff = input_time - expect_time
00128 
00129           if (diff < max_early):
00130             max_early = diff
00131 
00132           if (diff > max_late):
00133             max_late = diff
00134 
00135           avg_off += diff / rmg.message_count()
00136 
00137           power += (diff**2) / rmg.message_count()
00138 
00139           
00140           
00141           
00142           self.assertTrue(input_time - expect_time > -.5)
00143           self.assertTrue(abs(input_time - expect_time) < .5)
00144           break
00145 
00146       if not msg_match:
00147         print "No match at time: %f"%expect_time
00148 
00149       self.assertTrue(msg_match)
00150 
00151     print "%f %f %f %f"%(max_early, max_late, avg_off, power)
00152 
00153     (o1,e1) = f1.communicate()    
00154     self.assertEqual(f1.returncode, 0)
00155 
00156 if __name__ == '__main__':
00157   rostest.rosrun('test_rosbag', 'random_record_play', RandomPlay, sys.argv)