00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 import heapq
00037 import os
00038 import shutil
00039 import sys
00040 import time
00041 import unittest
00042
00043 import genpy
00044
00045 import rosbag
00046 from rosbag import bag
00047 import rospy
00048 from std_msgs.msg import Int32
00049 from std_msgs.msg import ColorRGBA
00050 from std_msgs.msg import String
00051
00052 class TestRosbag(unittest.TestCase):
00053 def setUp(self):
00054 pass
00055
00056 def test_opening_stream_works(self):
00057 f = open('/tmp/test_opening_stream_works.bag', 'wb')
00058 with rosbag.Bag(f, 'w') as b:
00059 for i in range(10):
00060 msg = Int32()
00061 msg.data = i
00062 b.write('/int', msg)
00063
00064 f = open('/tmp/test_opening_stream_works.bag', 'rb')
00065 b = rosbag.Bag(f, 'r')
00066 self.assert_(len(list(b.read_messages())) == 10)
00067 b.close()
00068
00069 def test_invalid_bag_arguments_fails(self):
00070 f = '/tmp/test_invalid_bad_arguments_fails.bag'
00071
00072 def fn1(): rosbag.Bag('')
00073 def fn2(): rosbag.Bag(None)
00074 def fn3(): rosbag.Bag(f, 'z')
00075 def fn4(): rosbag.Bag(f, 'r', compression='foobar')
00076 def fn5(): rosbag.Bag(f, 'r', chunk_threshold=-1000)
00077 for fn in [fn1, fn2, fn3, fn4, fn5]:
00078 self.failUnlessRaises(ValueError, fn)
00079
00080 def test_io_on_close_fails(self):
00081 def fn():
00082 b = rosbag.Bag('/tmp/test_io_close_fails.bag', 'w')
00083 b.close()
00084 size = b.size()
00085 self.failUnlessRaises(ValueError, fn)
00086
00087 def test_write_invalid_message_fails(self):
00088 def fn():
00089 with rosbag.Bag('/tmp/test_write_invalid_message_fails.bag', 'w') as b:
00090 b.write(None, None, None)
00091 self.failUnlessRaises(ValueError, fn)
00092
00093 def test_simple_write_uncompressed_works(self):
00094 with rosbag.Bag('/tmp/test_simple_write_uncompressed_works.bag', 'w') as b:
00095 msg_count = 0
00096 for i in range(5, 0, -1):
00097 msg = Int32()
00098 msg.data = i
00099 t = genpy.Time.from_sec(i)
00100 b.write('/ints' + str(i), msg, t)
00101 msg_count += 1
00102
00103 msgs = list(rosbag.Bag('/tmp/test_simple_write_uncompressed_works.bag').read_messages())
00104
00105 self.assert_(len(msgs) == msg_count, 'not all messages written: expected %d, got %d' % (msg_count, len(msgs)))
00106
00107 for (_, _, t1), (_, _, t2) in zip(msgs, msgs[1:]):
00108 self.assert_(t1 < t2, 'messages returned unordered: got timestamp %s before %s' % (str(t1), str(t2)))
00109
00110 def test_writing_nonchronological_works(self):
00111 with rosbag.Bag('/tmp/test_writing_nonchronological_works.bag', 'w') as b:
00112 msg_count = 0
00113 for i in range(5, 0, -1):
00114 msg = Int32()
00115 msg.data = i
00116 t = genpy.Time.from_sec(i)
00117 b.write('/ints', msg, t)
00118 msg_count += 1
00119
00120 msgs = list(rosbag.Bag('/tmp/test_writing_nonchronological_works.bag').read_messages())
00121
00122 self.assert_(len(msgs) == msg_count, 'not all messages written: expected %d, got %d' % (msg_count, len(msgs)))
00123
00124 for (_, _, t1), (_, _, t2) in zip(msgs, msgs[1:]):
00125 self.assert_(t1 < t2, 'messages returned unordered: got timestamp %s before %s' % (str(t1), str(t2)))
00126
00127 def test_large_write_works(self):
00128 for compression in [rosbag.Compression.NONE, rosbag.Compression.BZ2]:
00129 with rosbag.Bag('/tmp/test_large_write_works.bag', 'w', compression=compression) as b:
00130 msg_count = 0
00131 for i in range(10000):
00132 msg = Int32()
00133 msg.data = i
00134 t = genpy.Time.from_sec(i)
00135 b.write('/ints', msg, t)
00136 msg_count += 1
00137
00138 msgs = list(rosbag.Bag('/tmp/test_large_write_works.bag').read_messages())
00139
00140 self.assertEquals(len(msgs), msg_count, 'not all messages written: expected %d, got %d' % (msg_count, len(msgs)))
00141
00142 for (_, _, t1), (_, _, t2) in zip(msgs, msgs[1:]):
00143 self.assert_(t1 < t2, 'messages returned unordered: got timestamp %s before %s' % (str(t1), str(t2)))
00144
00145 def test_get_messages_time_range_works(self):
00146 with rosbag.Bag('/tmp/test_get_messages_time_range_works.bag', 'w') as b:
00147 for i in range(30):
00148 msg = Int32()
00149 msg.data = i
00150 t = genpy.Time.from_sec(i)
00151 b.write('/ints', msg, t)
00152
00153 start_time = genpy.Time.from_sec(3)
00154 end_time = genpy.Time.from_sec(7)
00155 msgs = list(rosbag.Bag('/tmp/test_get_messages_time_range_works.bag').read_messages(topics='/ints', start_time=start_time, end_time=end_time))
00156
00157 self.assertEquals(len(msgs), 5)
00158
00159 def test_get_messages_filter_works(self):
00160 with rosbag.Bag('/tmp/test_get_messages_filter_works.bag', 'w') as b:
00161 for i in range(30):
00162 msg = Int32()
00163 msg.data = i
00164 t = genpy.Time.from_sec(i)
00165 b.write('/ints' + str(i), msg, t)
00166
00167 def filter(topic, datatype, md5sum, msg_def, header):
00168 return '5' in topic and datatype == Int32._type and md5sum == Int32._md5sum and msg_def == Int32._full_text
00169
00170 self.assertEquals(len(list(rosbag.Bag('/tmp/test_get_messages_filter_works.bag').read_messages(connection_filter=filter))), 3)
00171
00172 def test_rosbag_filter(self):
00173 inbag_filename = '/tmp/test_rosbag_filter__1.bag'
00174 outbag_filename = '/tmp/test_rosbag_filter__2.bag'
00175
00176 with rosbag.Bag(inbag_filename, 'w') as b:
00177 for i in range(30):
00178 msg = Int32()
00179 msg.data = i
00180 t = genpy.Time.from_sec(i)
00181 b.write('/ints' + str(i), msg, t)
00182
00183 expression = "(int(t.secs) == m.data) and (topic == '/ints' + str(m.data)) and (m.data >= 15 and m.data < 20)"
00184
00185 os.system('rosbag filter %s %s "%s"' % (inbag_filename, outbag_filename, expression))
00186
00187 msgs = list(rosbag.Bag(outbag_filename).read_messages())
00188
00189 self.assertEquals(len(msgs), 5)
00190
00191 def test_reindex_works(self):
00192 fn = '/tmp/test_reindex_works.bag'
00193
00194 chunk_threshold = 1024
00195
00196 with rosbag.Bag(fn, 'w', chunk_threshold=chunk_threshold) as b:
00197 for i in range(100):
00198 for j in range(5):
00199 msg = Int32()
00200 msg.data = i
00201 b.write('/topic%d' % j, msg)
00202 file_header_pos = b._file_header_pos
00203
00204 start_index = 4117 + chunk_threshold * 2 + int(chunk_threshold / 2)
00205
00206 trunc_filename = '%s.trunc%s' % os.path.splitext(fn)
00207 reindex_filename = '%s.reindex%s' % os.path.splitext(fn)
00208
00209 for trunc_index in range(start_index, start_index + chunk_threshold):
00210 shutil.copy(fn, trunc_filename)
00211
00212 with open(trunc_filename, 'r+b') as f:
00213 f.seek(file_header_pos)
00214 header = {
00215 'op': bag._pack_uint8(bag._OP_FILE_HEADER),
00216 'index_pos': bag._pack_uint64(0),
00217 'conn_count': bag._pack_uint32(0),
00218 'chunk_count': bag._pack_uint32(0)
00219 }
00220 bag._write_record(f, header, padded_size=bag._FILE_HEADER_LENGTH)
00221 f.truncate(trunc_index)
00222
00223 shutil.copy(trunc_filename, reindex_filename)
00224
00225 try:
00226 b = rosbag.Bag(reindex_filename, 'a', allow_unindexed=True)
00227 except Exception as ex:
00228 pass
00229 for done in b.reindex():
00230 pass
00231 b.close()
00232
00233 msgs = list(rosbag.Bag(reindex_filename, 'r'))
00234
00235 def test_future_version_works(self):
00236 fn = '/tmp/test_future_version_2.1.bag'
00237
00238 with rosbag.Bag(fn, 'w', chunk_threshold=256) as b:
00239 for i in range(10):
00240 msg = Int32()
00241 msg.data = i
00242 b.write('/int', msg)
00243
00244 header = { 'op': bag._pack_uint8(max(bag._OP_CODES.keys()) + 1) }
00245 data = 'ABCDEFGHI123456789'
00246 bag._write_record(b._file, header, data)
00247
00248 b._file.seek(0)
00249 data = '#ROSBAG V%d.%d\n' % (int(b._version / 100), (b._version % 100) + 1)
00250 data = data.encode()
00251 b._file.write(data)
00252 b._file.seek(0, os.SEEK_END)
00253
00254 with rosbag.Bag(fn) as b:
00255 for topic, msg, t in b:
00256 pass
00257
00258 def test_get_message_count(self):
00259 fn = '/tmp/test_get_message_count.bag'
00260 with rosbag.Bag(fn, mode='w') as bag:
00261 for i in xrange(100):
00262 bag.write("/test_bag", Int32(data=i))
00263 bag.write("/test_bag", String(data='also'))
00264 bag.write("/test_bag/more", String(data='alone'))
00265
00266 with rosbag.Bag(fn) as bag:
00267 self.assertEquals(bag.get_message_count(), 300)
00268 self.assertEquals(bag.get_message_count(topic_filters='/test_bag'), 200)
00269 self.assertEquals(bag.get_message_count(topic_filters=['/test_bag', '/test_bag/more']), 300)
00270 self.assertEquals(bag.get_message_count(topic_filters=['/none']), 0)
00271
00272 def test_get_compression_info(self):
00273 fn = '/tmp/test_get_compression_info.bag'
00274
00275
00276 with rosbag.Bag(fn, mode='w') as bag:
00277 for i in xrange(100):
00278 bag.write("/test_bag", Int32(data=i))
00279
00280 with rosbag.Bag(fn) as bag:
00281 info = bag.get_compression_info()
00282 self.assertEquals(info.compression, rosbag.Compression.NONE)
00283
00284 self.assertEquals(info.uncompressed, 5166)
00285 self.assertEquals(info.compressed, 5166)
00286
00287 with rosbag.Bag(fn, mode='w', compression=rosbag.Compression.BZ2) as bag:
00288 for i in xrange(100):
00289 bag.write("/test_bag", Int32(data=i))
00290
00291 with rosbag.Bag(fn) as bag:
00292 info = bag.get_compression_info()
00293 self.assertEquals(info.compression, rosbag.Compression.BZ2)
00294 self.assertEquals(info.uncompressed, 5166)
00295
00296
00297
00298 self.assertLess(info.compressed, 1000)
00299 self.assertGreater(info.compressed, 900)
00300
00301 def test_get_time(self):
00302 fn = '/tmp/test_get_time.bag'
00303
00304 with rosbag.Bag(fn, mode='w') as bag:
00305 for i in xrange(100):
00306 bag.write("/test_bag", Int32(data=i), t=genpy.Time.from_sec(i))
00307
00308 with rosbag.Bag(fn) as bag:
00309 start_stamp = bag.get_start_time()
00310 end_stamp = bag.get_end_time()
00311
00312 self.assertEquals(start_stamp, 0.0)
00313 self.assertEquals(end_stamp, 99.0)
00314
00315 def test_get_time_empty_bag(self):
00316 """Test for issue #657"""
00317 fn = '/tmp/test_get_time_emtpy_bag.bag'
00318
00319 with rosbag.Bag(fn, mode='w') as bag:
00320 self.assertRaisesRegexp(rosbag.ROSBagException,
00321 'Bag contains no message',
00322 bag.get_start_time)
00323 self.assertRaisesRegexp(rosbag.ROSBagException,
00324 'Bag contains no message',
00325 bag.get_end_time)
00326
00327 def test_get_type_and_topic_info(self):
00328 fn = '/tmp/test_get_type_and_topic_info.bag'
00329 topic_1 = "/test_bag"
00330 topic_2 = "/test_bag/more"
00331 with rosbag.Bag(fn, mode='w') as bag:
00332 for i in xrange(100):
00333 bag.write(topic_1, Int32(data=i))
00334 bag.write(topic_1, String(data='also'))
00335 bag.write(topic_2, String(data='alone'))
00336
00337 with rosbag.Bag(fn) as bag:
00338 msg_types, topics = bag.get_type_and_topic_info()
00339 self.assertEquals(len(msg_types), 2)
00340 self.assertTrue("std_msgs/Int32" in msg_types)
00341 self.assertTrue("std_msgs/String" in msg_types)
00342 self.assertEquals(len(topics), 2)
00343 self.assertTrue(topic_1 in topics)
00344 self.assertTrue(topic_2 in topics)
00345
00346 self.assertEquals(topics[topic_1].message_count, 200)
00347 self.assertEquals(topics[topic_1].msg_type, "std_msgs/Int32")
00348 self.assertEquals(topics[topic_2].message_count, 100)
00349 self.assertEquals(topics[topic_2].msg_type, "std_msgs/String")
00350
00351
00352 msg_types, topics = bag.get_type_and_topic_info(topic_1)
00353
00354
00355 self.assertEquals(len(msg_types), 2)
00356 self.assertTrue("std_msgs/Int32" in msg_types)
00357 self.assertTrue("std_msgs/String" in msg_types)
00358
00359 self.assertEquals(len(topics), 1)
00360 self.assertTrue(topic_1 in topics)
00361
00362 self.assertEquals(topics[topic_1].message_count, 200)
00363 self.assertEquals(topics[topic_1].msg_type, "std_msgs/Int32")
00364
00365
00366 msg_types, topics = bag.get_type_and_topic_info(topic_2)
00367
00368
00369 self.assertEquals(len(msg_types), 2)
00370 self.assertTrue("std_msgs/Int32" in msg_types)
00371 self.assertTrue("std_msgs/String" in msg_types)
00372
00373 self.assertEquals(len(topics), 1)
00374 self.assertTrue(topic_2 in topics)
00375
00376 self.assertEquals(topics[topic_2].message_count, 100)
00377 self.assertEquals(topics[topic_2].msg_type, "std_msgs/String")
00378
00379
00380 msg_types, topics = bag.get_type_and_topic_info("/none")
00381
00382
00383 self.assertEquals(len(msg_types), 2)
00384 self.assertTrue("std_msgs/Int32" in msg_types)
00385 self.assertTrue("std_msgs/String" in msg_types)
00386
00387
00388 self.assertEquals(len(topics), 0)
00389
00390 def _print_bag_records(self, fn):
00391 with open(fn) as f:
00392 f.seek(0, os.SEEK_END)
00393 size = f.tell()
00394 f.seek(0)
00395
00396 version_line = f.readline().rstrip()
00397 print(version_line)
00398
00399 while f.tell() < size:
00400 header = bag._read_header(f)
00401 op = bag._read_uint8_field(header, 'op')
00402 data = bag._read_record_data(f)
00403
00404 print(bag._OP_CODES.get(op, op))
00405
00406 if __name__ == '__main__':
00407 import rostest
00408 PKG='rosbag'
00409 rostest.run(PKG, 'TestRosbag', TestRosbag, sys.argv)