00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 import heapq
00037 import os
00038 import shutil
00039 import sys
00040 import time
00041 import unittest
00042
00043 import genpy
00044
00045 import rosbag
00046 from rosbag import bag
00047 import rospy
00048 from std_msgs.msg import Int32
00049 from std_msgs.msg import ColorRGBA
00050
00051 class TestRosbag(unittest.TestCase):
00052 def setUp(self):
00053 pass
00054
00055 def test_opening_stream_works(self):
00056 f = open('/tmp/test_opening_stream_works.bag', 'w')
00057 with rosbag.Bag(f, 'w') as b:
00058 for i in range(10):
00059 msg = Int32()
00060 msg.data = i
00061 b.write('/int', msg)
00062
00063 f = open('/tmp/test_opening_stream_works.bag', 'r')
00064 b = rosbag.Bag(f, 'r')
00065 self.assert_(len(list(b.read_messages())) == 10)
00066 b.close()
00067
00068 def test_invalid_bag_arguments_fails(self):
00069 f = '/tmp/test_invalid_bad_arguments_fails.bag'
00070
00071 def fn1(): rosbag.Bag('')
00072 def fn2(): rosbag.Bag(None)
00073 def fn3(): rosbag.Bag(f, 'z')
00074 def fn4(): rosbag.Bag(f, 'r', compression='foobar')
00075 def fn5(): rosbag.Bag(f, 'r', chunk_threshold=-1000)
00076 for fn in [fn1, fn2, fn3, fn4, fn5]:
00077 self.failUnlessRaises(ValueError, fn)
00078
00079 def test_io_on_close_fails(self):
00080 def fn():
00081 b = rosbag.Bag('/tmp/test_io_close_fails.bag', 'w')
00082 b.close()
00083 size = b.size()
00084 self.failUnlessRaises(ValueError, fn)
00085
00086 def test_write_invalid_message_fails(self):
00087 def fn():
00088 with rosbag.Bag('/tmp/test_write_invalid_message_fails.bag', 'w') as b:
00089 b.write(None, None, None)
00090 self.failUnlessRaises(ValueError, fn)
00091
00092 def test_simple_write_uncompressed_works(self):
00093 with rosbag.Bag('/tmp/test_simple_write_uncompressed_works.bag', 'w') as b:
00094 msg_count = 0
00095 for i in range(5, 0, -1):
00096 msg = Int32()
00097 msg.data = i
00098 t = genpy.Time.from_sec(i)
00099 b.write('/ints' + str(i), msg, t)
00100 msg_count += 1
00101
00102 msgs = list(rosbag.Bag('/tmp/test_simple_write_uncompressed_works.bag').read_messages())
00103
00104 self.assert_(len(msgs) == msg_count, 'not all messages written: expected %d, got %d' % (msg_count, len(msgs)))
00105
00106 for (_, _, t1), (_, _, t2) in zip(msgs, msgs[1:]):
00107 self.assert_(t1 < t2, 'messages returned unordered: got timestamp %s before %s' % (str(t1), str(t2)))
00108
00109 def test_writing_nonchronological_works(self):
00110 with rosbag.Bag('/tmp/test_writing_nonchronological_works.bag', 'w') as b:
00111 msg_count = 0
00112 for i in range(5, 0, -1):
00113 msg = Int32()
00114 msg.data = i
00115 t = genpy.Time.from_sec(i)
00116 b.write('/ints', msg, t)
00117 msg_count += 1
00118
00119 msgs = list(rosbag.Bag('/tmp/test_writing_nonchronological_works.bag').read_messages())
00120
00121 self.assert_(len(msgs) == msg_count, 'not all messages written: expected %d, got %d' % (msg_count, len(msgs)))
00122
00123 for (_, _, t1), (_, _, t2) in zip(msgs, msgs[1:]):
00124 self.assert_(t1 < t2, 'messages returned unordered: got timestamp %s before %s' % (str(t1), str(t2)))
00125
00126 def test_large_write_works(self):
00127 for compression in [rosbag.Compression.NONE, rosbag.Compression.BZ2]:
00128 with rosbag.Bag('/tmp/test_large_write_works.bag', 'w', compression=compression) as b:
00129 msg_count = 0
00130 for i in range(10000):
00131 msg = Int32()
00132 msg.data = i
00133 t = genpy.Time.from_sec(i)
00134 b.write('/ints', msg, t)
00135 msg_count += 1
00136
00137 msgs = list(rosbag.Bag('/tmp/test_large_write_works.bag').read_messages())
00138
00139 self.assertEquals(len(msgs), msg_count, 'not all messages written: expected %d, got %d' % (msg_count, len(msgs)))
00140
00141 for (_, _, t1), (_, _, t2) in zip(msgs, msgs[1:]):
00142 self.assert_(t1 < t2, 'messages returned unordered: got timestamp %s before %s' % (str(t1), str(t2)))
00143
00144 def test_get_messages_time_range_works(self):
00145 with rosbag.Bag('/tmp/test_get_messages_time_range_works.bag', 'w') as b:
00146 for i in range(30):
00147 msg = Int32()
00148 msg.data = i
00149 t = genpy.Time.from_sec(i)
00150 b.write('/ints', msg, t)
00151
00152 start_time = genpy.Time.from_sec(3)
00153 end_time = genpy.Time.from_sec(7)
00154 msgs = list(rosbag.Bag('/tmp/test_get_messages_time_range_works.bag').read_messages(topics='/ints', start_time=start_time, end_time=end_time))
00155
00156 self.assertEquals(len(msgs), 5)
00157
00158 def test_get_messages_filter_works(self):
00159 with rosbag.Bag('/tmp/test_get_messages_filter_works.bag', 'w') as b:
00160 for i in range(30):
00161 msg = Int32()
00162 msg.data = i
00163 t = genpy.Time.from_sec(i)
00164 b.write('/ints' + str(i), msg, t)
00165
00166 def filter(topic, datatype, md5sum, msg_def, header):
00167 return '5' in topic and datatype == Int32._type and md5sum == Int32._md5sum and msg_def == Int32._full_text
00168
00169 self.assertEquals(len(list(rosbag.Bag('/tmp/test_get_messages_filter_works.bag').read_messages(connection_filter=filter))), 3)
00170
00171 def test_rosbag_filter(self):
00172 inbag_filename = '/tmp/test_rosbag_filter__1.bag'
00173 outbag_filename = '/tmp/test_rosbag_filter__2.bag'
00174
00175 with rosbag.Bag(inbag_filename, 'w') as b:
00176 for i in range(30):
00177 msg = Int32()
00178 msg.data = i
00179 t = genpy.Time.from_sec(i)
00180 b.write('/ints' + str(i), msg, t)
00181
00182 expression = "(int(t.secs) == m.data) and (topic == '/ints' + str(m.data)) and (m.data >= 15 and m.data < 20)"
00183
00184 os.system('rosbag filter %s %s "%s"' % (inbag_filename, outbag_filename, expression))
00185
00186 msgs = list(rosbag.Bag(outbag_filename).read_messages())
00187
00188 self.assertEquals(len(msgs), 5)
00189
00190 def test_reindex_works(self):
00191 fn = '/tmp/test_reindex_works.bag'
00192
00193 chunk_threshold = 1024
00194
00195 with rosbag.Bag(fn, 'w', chunk_threshold=chunk_threshold) as b:
00196 for i in range(100):
00197 for j in range(5):
00198 msg = Int32()
00199 msg.data = i
00200 b.write('/topic%d' % j, msg)
00201 file_header_pos = b._file_header_pos
00202
00203 start_index = 4117 + chunk_threshold * 2 + chunk_threshold / 2
00204
00205 trunc_filename = '%s.trunc%s' % os.path.splitext(fn)
00206 reindex_filename = '%s.reindex%s' % os.path.splitext(fn)
00207
00208 for trunc_index in range(start_index, start_index + chunk_threshold):
00209 shutil.copy(fn, trunc_filename)
00210
00211 with open(trunc_filename, 'r+b') as f:
00212 f.seek(file_header_pos)
00213 header = {
00214 'op': bag._pack_uint8(bag._OP_FILE_HEADER),
00215 'index_pos': bag._pack_uint64(0),
00216 'conn_count': bag._pack_uint32(0),
00217 'chunk_count': bag._pack_uint32(0)
00218 }
00219 bag._write_record(f, header, padded_size=bag._FILE_HEADER_LENGTH)
00220 f.truncate(trunc_index)
00221
00222 shutil.copy(trunc_filename, reindex_filename)
00223
00224 try:
00225 b = rosbag.Bag(reindex_filename, 'a', allow_unindexed=True)
00226 except Exception, ex:
00227 pass
00228 for done in b.reindex():
00229 pass
00230 b.close()
00231
00232 msgs = list(rosbag.Bag(reindex_filename, 'r'))
00233
00234 def test_future_version_works(self):
00235 fn = '/tmp/test_future_version_2.1.bag'
00236
00237 with rosbag.Bag(fn, 'w', chunk_threshold=256) as b:
00238 for i in range(10):
00239 msg = Int32()
00240 msg.data = i
00241 b.write('/int', msg)
00242
00243 header = { 'op': bag._pack_uint8(max(bag._OP_CODES.iterkeys()) + 1) }
00244 data = 'ABCDEFGHI123456789'
00245 bag._write_record(b._file, header, data)
00246
00247 b._file.seek(0)
00248 b._file.write('#ROSBAG V%d.%d\n' % (b._version / 100, (b._version % 100) + 1))
00249 b._file.seek(0, os.SEEK_END)
00250
00251 with rosbag.Bag(fn) as b:
00252 for topic, msg, t in b:
00253 pass
00254
00255 def _print_bag_records(self, fn):
00256 with open(fn) as f:
00257 f.seek(0, os.SEEK_END)
00258 size = f.tell()
00259 f.seek(0)
00260
00261 version_line = f.readline().rstrip()
00262 print version_line
00263
00264 while f.tell() < size:
00265 header = bag._read_header(f)
00266 op = bag._read_uint8_field(header, 'op')
00267 data = bag._read_record_data(f)
00268
00269 print bag._OP_CODES.get(op, op)
00270
00271 if __name__ == '__main__':
00272 import rostest
00273 PKG='rosbag'
00274 rostest.run(PKG, 'TestRosbag', TestRosbag, sys.argv)