Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 import unittest
00036 import rospy
00037 import rostest
00038 import sys
00039 try:
00040 from cStringIO import StringIO
00041 except ImportError:
00042 from io import StringIO
00043 import time
00044 from random_messages import RandomMsgGen
00045 import subprocess
00046 import os
00047
00048 import genpy
00049
00050 DELAY = 0.5
00051
00052 class RandomPlay(unittest.TestCase):
00053
00054 def msg_cb_topic(self, topic):
00055 def msg_cb(msg):
00056 nowtime = rospy.Time.now()
00057
00058 if self.start is None:
00059 self.start = nowtime
00060
00061 nowtime -= self.start
00062
00063 self.input.append((topic, msg, nowtime.to_sec()))
00064
00065 return msg_cb
00066
00067
00068 def test_random_play(self):
00069 rospy.init_node('random_sub', anonymous=True)
00070
00071 self.start = None
00072 self.input = []
00073
00074 self.assertTrue(len(sys.argv[1]) > 3)
00075
00076 seed = int(sys.argv[1])
00077 topics = int(sys.argv[2])
00078 length = float(sys.argv[3])
00079 scale = float(sys.argv[4])
00080 self.use_clock = bool(int(sys.argv[5]))
00081
00082 rmg = RandomMsgGen(int(seed), topics, length)
00083
00084 subscribers = {}
00085 for (topic, msg_class) in rmg.topics():
00086 subscribers[topic] = rospy.Subscriber(topic, msg_class, self.msg_cb_topic(topic))
00087
00088 bagpath = os.path.join('/tmp', 'test_rosbag_random_record_%d.bag'%seed)
00089 cmd = ['rosbag', 'play', '-d', str(DELAY), '-r', str(scale)]
00090
00091 rospy.loginfo(str(cmd))
00092
00093 if (self.use_clock):
00094 cmd += ['--clock', '--hz', '100']
00095
00096 cmd += [bagpath]
00097
00098 try:
00099 f1 = subprocess.Popen(cmd)
00100
00101 last_input_count = 0
00102 while (len(self.input) < rmg.message_count()):
00103
00104 time.sleep(1.0)
00105
00106 if len(self.input) == last_input_count:
00107 rc = f1.poll()
00108 if rc is not None:
00109 self.assertEqual(rc, 0)
00110 break
00111 last_input_count = len(self.input)
00112
00113 self.assertEqual(len(self.input), rmg.message_count())
00114
00115 max_late = 0
00116 max_early = 0
00117 avg_off = 0
00118 power = 0
00119
00120 for (expect_topic, expect_msg, expect_time) in rmg.messages():
00121
00122 if (not self.use_clock):
00123 expect_time /= scale
00124
00125 buff = StringIO()
00126 expect_msg.serialize(buff)
00127 expect_msg.deserialize(buff.getvalue())
00128
00129 msg_match = False
00130
00131 for ind in range(0,100):
00132 (input_topic, input_msg, input_time) = self.input[ind]
00133
00134 if (genpy.message.strify_message(expect_msg) == genpy.message.strify_message(input_msg)):
00135 msg_match = True
00136 del self.input[ind]
00137
00138
00139 diff = input_time - expect_time
00140
00141 if (diff < max_early):
00142 max_early = diff
00143
00144 if (diff > max_late):
00145 max_late = diff
00146
00147 avg_off += diff / rmg.message_count()
00148
00149 power += (diff**2) / rmg.message_count()
00150
00151
00152
00153
00154 self.assertTrue(input_time - expect_time > -.5)
00155 self.assertTrue(abs(input_time - expect_time) < .5)
00156 break
00157
00158 if not msg_match:
00159 print("No match at time: %f" % expect_time)
00160
00161 self.assertTrue(msg_match)
00162
00163 print("%f %f %f %f"%(max_early, max_late, avg_off, power))
00164
00165 finally:
00166 f1.communicate()
00167
00168 self.assertEqual(f1.returncode, 0)
00169
00170 if __name__ == '__main__':
00171 rostest.rosrun('test_rosbag', 'random_record_play', RandomPlay, sys.argv)