Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import unittest
00035 import rospy
00036 import rostest
00037 import sys
00038 from cStringIO import StringIO
00039 import time
00040 from random_messages import RandomMsgGen
00041 import subprocess
00042 import signal
00043 import os
00044 import atexit
00045
00046 class RandomRecord(unittest.TestCase):
00047
00048 def test_random_record(self):
00049 rospy.init_node('random_pub')
00050
00051 if (len(sys.argv) < 2):
00052 raise Exception("Expected seed as first argument")
00053
00054 seed = int(sys.argv[1])
00055
00056 seed = int(sys.argv[1])
00057 topics = int(sys.argv[2])
00058 length = float(sys.argv[3])
00059
00060 rmg = RandomMsgGen(seed, topics, length)
00061
00062 publishers = {}
00063
00064 for (topic, msg_class) in rmg.topics():
00065 publishers[topic] = rospy.Publisher(topic, msg_class)
00066
00067 bagpath = os.path.join('/tmp', 'test_rosbag_random_record_%d'%seed)
00068 cmd = ['rosbag', 'record', '-a', '-O', bagpath]
00069 f1 = subprocess.Popen(cmd)
00070
00071 def finalkill():
00072 try:
00073 os.kill(f1.pid, signal.SIGKILL)
00074 except:
00075 pass
00076
00077 atexit.register(finalkill)
00078
00079
00080 rospy.sleep(rospy.Duration.from_sec(5.0))
00081
00082 start = rospy.Time.now()
00083 for (topic, msg, time) in rmg.messages():
00084 d = start + rospy.Duration.from_sec(time) - rospy.Time.now()
00085 rospy.sleep(d)
00086 publishers[topic].publish(msg)
00087
00088
00089 rospy.sleep(rospy.Duration.from_sec(5.0))
00090
00091
00092 os.kill(-os.getpgrp(), signal.SIGINT)
00093
00094
00095 rospy.sleep(rospy.Duration.from_sec(5.0))
00096
00097
00098 while (f1.poll() is None):
00099 try:
00100 os.kill(f1.pid, signal.SIGKILL)
00101 except:
00102 pass
00103 rospy.sleep(rospy.Duration.from_sec(1.0))
00104
00105 self.assertEqual(f1.returncode, 0)
00106
00107
00108 if __name__ == '__main__':
00109 rostest.rosrun('test_rosbag', 'random_record_play', RandomRecord, sys.argv)