39 from cStringIO
import StringIO
41 from io
import StringIO
43 from random_messages
import RandomMsgGen
52 rospy.init_node(
'random_pub')
54 if (len(sys.argv) < 2):
55 raise Exception(
"Expected seed as first argument")
57 seed = int(sys.argv[1])
59 seed = int(sys.argv[1])
60 topics = int(sys.argv[2])
61 length = float(sys.argv[3])
67 for (topic, msg_class)
in rmg.topics():
68 publishers[topic] = rospy.Publisher(topic, msg_class)
70 bagpath = os.path.join(
'/tmp',
'test_rosbag_random_record_%d'%seed)
71 cmd = [
'rosbag',
'record',
'-a',
'-O', bagpath]
72 f1 = subprocess.Popen(cmd)
76 os.kill(f1.pid, signal.SIGKILL)
80 atexit.register(finalkill)
83 rospy.sleep(rospy.Duration.from_sec(5.0))
85 start = rospy.Time.now()
86 for (topic, msg, time)
in rmg.messages():
87 d = start + rospy.Duration.from_sec(time) - rospy.Time.now()
89 publishers[topic].publish(msg)
92 rospy.sleep(rospy.Duration.from_sec(5.0))
95 os.kill(-os.getpgrp(), signal.SIGINT)
98 rospy.sleep(rospy.Duration.from_sec(5.0))
101 while (f1.poll()
is None):
103 os.kill(f1.pid, signal.SIGKILL)
106 rospy.sleep(rospy.Duration.from_sec(1.0))
108 self.assertEqual(f1.returncode, 0)
111 if __name__ ==
'__main__':
112 rostest.rosrun(
'test_rosbag',
'random_record_play', RandomRecord, sys.argv)
def test_random_record(self)