Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 import unittest
00035 import rospy
00036 import rostest
00037 import sys
00038 try:
00039 from cStringIO import StringIO
00040 except ImportError:
00041 from io import StringIO
00042 import time
00043 from random_messages import RandomMsgGen
00044 import subprocess
00045 import signal
00046 import os
00047 import atexit
00048
00049 class RandomRecord(unittest.TestCase):
00050
00051 def test_random_record(self):
00052 rospy.init_node('random_pub')
00053
00054 if (len(sys.argv) < 2):
00055 raise Exception("Expected seed as first argument")
00056
00057 seed = int(sys.argv[1])
00058
00059 seed = int(sys.argv[1])
00060 topics = int(sys.argv[2])
00061 length = float(sys.argv[3])
00062
00063 rmg = RandomMsgGen(seed, topics, length)
00064
00065 publishers = {}
00066
00067 for (topic, msg_class) in rmg.topics():
00068 publishers[topic] = rospy.Publisher(topic, msg_class)
00069
00070 bagpath = os.path.join('/tmp', 'test_rosbag_random_record_%d'%seed)
00071 cmd = ['rosbag', 'record', '-a', '-O', bagpath]
00072 f1 = subprocess.Popen(cmd)
00073
00074 def finalkill():
00075 try:
00076 os.kill(f1.pid, signal.SIGKILL)
00077 except:
00078 pass
00079
00080 atexit.register(finalkill)
00081
00082
00083 rospy.sleep(rospy.Duration.from_sec(5.0))
00084
00085 start = rospy.Time.now()
00086 for (topic, msg, time) in rmg.messages():
00087 d = start + rospy.Duration.from_sec(time) - rospy.Time.now()
00088 rospy.sleep(d)
00089 publishers[topic].publish(msg)
00090
00091
00092 rospy.sleep(rospy.Duration.from_sec(5.0))
00093
00094
00095 os.kill(-os.getpgrp(), signal.SIGINT)
00096
00097
00098 rospy.sleep(rospy.Duration.from_sec(5.0))
00099
00100
00101 while (f1.poll() is None):
00102 try:
00103 os.kill(f1.pid, signal.SIGKILL)
00104 except:
00105 pass
00106 rospy.sleep(rospy.Duration.from_sec(1.0))
00107
00108 self.assertEqual(f1.returncode, 0)
00109
00110
00111 if __name__ == '__main__':
00112 rostest.rosrun('test_rosbag', 'random_record_play', RandomRecord, sys.argv)