Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 import unittest
00036 import rospy
00037 import rostest
00038 import sys
00039 from cStringIO import StringIO
00040 import time
00041 from random_messages import RandomMsgGen
00042 import subprocess
00043 import os
00044
00045 import genpy
00046
00047 DELAY = 0.5
00048
00049 class RandomPlay(unittest.TestCase):
00050
00051 def msg_cb_topic(self, topic):
00052 def msg_cb(msg):
00053 nowtime = rospy.Time.now()
00054
00055 if self.start is None:
00056 self.start = nowtime
00057
00058 nowtime -= self.start
00059
00060 self.input.append((topic, msg, nowtime.to_sec()))
00061
00062 return msg_cb
00063
00064
00065 def test_random_play(self):
00066 rospy.init_node('random_sub', anonymous=True)
00067
00068 self.start = None
00069 self.input = []
00070
00071 self.assertTrue(len(sys.argv[1]) > 3)
00072
00073 seed = int(sys.argv[1])
00074 topics = int(sys.argv[2])
00075 length = float(sys.argv[3])
00076 scale = float(sys.argv[4])
00077 self.use_clock = bool(int(sys.argv[5]))
00078
00079 rmg = RandomMsgGen(int(seed), topics, length)
00080
00081 subscribers = {}
00082 for (topic, msg_class) in rmg.topics():
00083 subscribers[topic] = rospy.Subscriber(topic, msg_class, self.msg_cb_topic(topic))
00084
00085 bagpath = os.path.join('/tmp', 'test_rosbag_random_record_%d.bag'%seed)
00086 cmd = ['rosbag', 'play', '-d', str(DELAY), '-r', str(scale)]
00087
00088 rospy.loginfo(str(cmd))
00089
00090 if (self.use_clock):
00091 cmd += ['--clock', '--hz', '100']
00092
00093 cmd += [bagpath]
00094
00095 try:
00096 f1 = subprocess.Popen(cmd)
00097
00098 last_input_count = 0
00099 while (len(self.input) < rmg.message_count()):
00100
00101 time.sleep(1.0)
00102
00103 if len(self.input) == last_input_count:
00104 rc = f1.poll()
00105 if rc is not None:
00106 self.assertEqual(rc, 0)
00107 break
00108 last_input_count = len(self.input)
00109
00110 self.assertEqual(len(self.input), rmg.message_count())
00111
00112 max_late = 0
00113 max_early = 0
00114 avg_off = 0
00115 power = 0
00116
00117 for (expect_topic, expect_msg, expect_time) in rmg.messages():
00118
00119 if (not self.use_clock):
00120 expect_time /= scale
00121
00122 buff = StringIO()
00123 expect_msg.serialize(buff)
00124 expect_msg.deserialize(buff.getvalue())
00125
00126 msg_match = False
00127
00128 for ind in xrange(0,100):
00129 (input_topic, input_msg, input_time) = self.input[ind]
00130
00131 if (genpy.message.strify_message(expect_msg) == genpy.message.strify_message(input_msg)):
00132 msg_match = True
00133 del self.input[ind]
00134
00135
00136 diff = input_time - expect_time
00137
00138 if (diff < max_early):
00139 max_early = diff
00140
00141 if (diff > max_late):
00142 max_late = diff
00143
00144 avg_off += diff / rmg.message_count()
00145
00146 power += (diff**2) / rmg.message_count()
00147
00148
00149
00150
00151 self.assertTrue(input_time - expect_time > -.5)
00152 self.assertTrue(abs(input_time - expect_time) < .5)
00153 break
00154
00155 if not msg_match:
00156 print "No match at time: %f"%expect_time
00157
00158 self.assertTrue(msg_match)
00159
00160 print "%f %f %f %f"%(max_early, max_late, avg_off, power)
00161
00162 finally:
00163 f1.communicate()
00164
00165 self.assertEqual(f1.returncode, 0)
00166
00167 if __name__ == '__main__':
00168 rostest.rosrun('test_rosbag', 'random_record_play', RandomPlay, sys.argv)