Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 import unittest
00036 import rospy
00037 import rostest
00038 import sys
00039 from cStringIO import StringIO
00040 import time
00041 from random_messages import RandomMsgGen
00042 import subprocess
00043 import os
00044
00045 import genpy
00046
00047 DELAY = 0.5
00048
00049 class RandomPlay(unittest.TestCase):
00050
00051 def msg_cb_topic(self, topic):
00052 def msg_cb(msg):
00053 nowtime = rospy.Time.now()
00054
00055 if self.start is None:
00056 self.start = nowtime
00057
00058 nowtime -= self.start
00059
00060 self.input.append((topic, msg, nowtime.to_sec()))
00061
00062 return msg_cb
00063
00064
00065 def test_random_play(self):
00066 rospy.init_node('random_sub', anonymous=True)
00067
00068 self.start = None
00069 self.input = []
00070
00071 self.assertTrue(len(sys.argv[1]) > 3)
00072
00073 seed = int(sys.argv[1])
00074 topics = int(sys.argv[2])
00075 length = float(sys.argv[3])
00076 scale = float(sys.argv[4])
00077 self.use_clock = bool(int(sys.argv[5]))
00078
00079 rmg = RandomMsgGen(int(seed), topics, length)
00080
00081 subscribers = {}
00082 for (topic, msg_class) in rmg.topics():
00083 subscribers[topic] = rospy.Subscriber(topic, msg_class, self.msg_cb_topic(topic))
00084
00085 bagpath = os.path.join('/tmp', 'test_rosbag_random_record_%d.bag'%seed)
00086 cmd = ['rosbag', 'play', '-d', str(DELAY), '-r', str(scale)]
00087
00088 rospy.loginfo(str(cmd))
00089
00090 if (self.use_clock):
00091 cmd += ['--clock', '--hz', '100']
00092
00093 cmd += [bagpath]
00094
00095 f1 = subprocess.Popen(cmd)
00096
00097 while (len(self.input) < rmg.message_count()):
00098
00099 time.sleep(.1)
00100
00101 self.assertEqual(len(self.input), rmg.message_count())
00102
00103 max_late = 0
00104 max_early = 0
00105 avg_off = 0
00106 power = 0
00107
00108 for (expect_topic, expect_msg, expect_time) in rmg.messages():
00109
00110 if (not self.use_clock):
00111 expect_time /= scale
00112
00113 buff = StringIO()
00114 expect_msg.serialize(buff)
00115 expect_msg.deserialize(buff.getvalue())
00116
00117 msg_match = False
00118
00119 for ind in xrange(0,100):
00120 (input_topic, input_msg, input_time) = self.input[ind]
00121
00122 if (genpy.message.strify_message(expect_msg) == genpy.message.strify_message(input_msg)):
00123 msg_match = True
00124 del self.input[ind]
00125
00126
00127 diff = input_time - expect_time
00128
00129 if (diff < max_early):
00130 max_early = diff
00131
00132 if (diff > max_late):
00133 max_late = diff
00134
00135 avg_off += diff / rmg.message_count()
00136
00137 power += (diff**2) / rmg.message_count()
00138
00139
00140
00141
00142 self.assertTrue(input_time - expect_time > -.5)
00143 self.assertTrue(abs(input_time - expect_time) < .5)
00144 break
00145
00146 if not msg_match:
00147 print "No match at time: %f"%expect_time
00148
00149 self.assertTrue(msg_match)
00150
00151 print "%f %f %f %f"%(max_early, max_late, avg_off, power)
00152
00153 (o1,e1) = f1.communicate()
00154 self.assertEqual(f1.returncode, 0)
00155
00156 if __name__ == '__main__':
00157 rostest.rosrun('test_rosbag', 'random_record_play', RandomPlay, sys.argv)