48 from rosbag
import bag
59 f = open(
'/tmp/test_opening_stream_works.bag',
'wb')
66 f = open(
'/tmp/test_opening_stream_works.bag',
'rb')
68 self.assert_(len(list(b.read_messages())) == 10) 72 f =
'/tmp/test_invalid_bad_arguments_fails.bag' 74 def fn1(): rosbag.Bag(
'')
75 def fn2(): rosbag.Bag(
None)
76 def fn3(): rosbag.Bag(f,
'z')
77 def fn4(): rosbag.Bag(f,
'r', compression='foobar')
78 def fn5(): rosbag.Bag(f,
'r', chunk_threshold=-1000) 79 for fn
in [fn1, fn2, fn3, fn4, fn5]:
80 self.failUnlessRaises(ValueError, fn)
84 b =
rosbag.Bag(
'/tmp/test_io_close_fails.bag',
'w')
87 self.failUnlessRaises(ValueError, fn)
91 with
rosbag.Bag(
'/tmp/test_write_invalid_message_fails.bag',
'w')
as b:
92 b.write(
None,
None,
None)
93 self.failUnlessRaises(ValueError, fn)
96 with
rosbag.Bag(
'/tmp/test_simple_write_uncompressed_works.bag',
'w')
as b:
98 for i
in range(5, 0, -1):
101 t = genpy.Time.from_sec(i)
102 b.write(
'/ints' + str(i), msg, t)
105 msgs = list(
rosbag.Bag(
'/tmp/test_simple_write_uncompressed_works.bag').read_messages())
107 self.assert_(len(msgs) == msg_count,
'not all messages written: expected %d, got %d' % (msg_count, len(msgs)))
109 for (_, _, t1), (_, _, t2)
in zip(msgs, msgs[1:]):
110 self.assert_(t1 < t2,
'messages returned unordered: got timestamp %s before %s' % (str(t1), str(t2)))
113 with
rosbag.Bag(
'/tmp/test_writing_nonchronological_works.bag',
'w')
as b:
115 for i
in range(5, 0, -1):
118 t = genpy.Time.from_sec(i)
119 b.write(
'/ints', msg, t)
122 msgs = list(
rosbag.Bag(
'/tmp/test_writing_nonchronological_works.bag').read_messages())
124 self.assert_(len(msgs) == msg_count,
'not all messages written: expected %d, got %d' % (msg_count, len(msgs)))
126 for (_, _, t1), (_, _, t2)
in zip(msgs, msgs[1:]):
127 self.assert_(t1 < t2,
'messages returned unordered: got timestamp %s before %s' % (str(t1), str(t2)))
130 for compression
in [rosbag.Compression.NONE, rosbag.Compression.BZ2]:
131 with
rosbag.Bag(
'/tmp/test_large_write_works.bag',
'w', compression=compression)
as b:
133 for i
in range(10000):
136 t = genpy.Time.from_sec(i)
137 b.write(
'/ints', msg, t)
140 msgs = list(
rosbag.Bag(
'/tmp/test_large_write_works.bag').read_messages())
142 self.assertEquals(len(msgs), msg_count,
'not all messages written: expected %d, got %d' % (msg_count, len(msgs)))
144 for (_, _, t1), (_, _, t2)
in zip(msgs, msgs[1:]):
145 self.assert_(t1 < t2,
'messages returned unordered: got timestamp %s before %s' % (str(t1), str(t2)))
148 with
rosbag.Bag(
'/tmp/test_get_messages_time_range_works.bag',
'w')
as b:
152 t = genpy.Time.from_sec(i)
153 b.write(
'/ints', msg, t)
155 start_time = genpy.Time.from_sec(3)
156 end_time = genpy.Time.from_sec(7)
157 msgs = list(
rosbag.Bag(
'/tmp/test_get_messages_time_range_works.bag').read_messages(topics=
'/ints', start_time=start_time, end_time=end_time))
159 self.assertEquals(len(msgs), 5)
162 with
rosbag.Bag(
'/tmp/test_get_messages_filter_works.bag',
'w')
as b:
166 t = genpy.Time.from_sec(i)
167 b.write(
'/ints' + str(i), msg, t)
169 def filter(topic, datatype, md5sum, msg_def, header):
170 return '5' in topic
and datatype == Int32._type
and md5sum == Int32._md5sum
and msg_def == Int32._full_text
172 self.assertEquals(len(list(
rosbag.Bag(
'/tmp/test_get_messages_filter_works.bag').read_messages(connection_filter=filter))), 3)
175 inbag_filename =
'/tmp/test_rosbag_filter__1.bag' 176 outbag_filename =
'/tmp/test_rosbag_filter__2.bag' 182 t = genpy.Time.from_sec(i)
183 b.write(
'/ints' + str(i), msg, t)
185 expression =
"(int(t.secs) == m.data) and (topic == '/ints' + str(m.data)) and (m.data >= 15 and m.data < 20)" 187 os.system(
'rosbag filter %s %s "%s"' % (inbag_filename, outbag_filename, expression))
189 msgs = list(
rosbag.Bag(outbag_filename).read_messages())
191 self.assertEquals(len(msgs), 5)
195 tempdir = tempfile.mkdtemp()
197 inbag_filename = os.path.join(tempdir,
'test_rosbag_filter__1.bag')
198 outbag1_filename = os.path.join(tempdir,
'test_rosbag_filter__2.bag')
199 outbag2_filename = os.path.join(tempdir,
'test_rosbag_filter__3.bag')
204 t = genpy.Time.from_sec(i)
205 b.write(
'/ints' + str(i), msg, t)
208 cmd =
'rosbag filter %s %s "True"' 209 os.system(cmd % (inbag_filename, outbag1_filename))
210 os.system(cmd % (outbag1_filename, outbag2_filename))
212 with open(outbag1_filename,
'r') as h: 213 outbag1_md5 = hashlib.md5(h.read()).hexdigest() 214 with open(outbag2_filename, 'r') as h: 215 outbag2_md5 = hashlib.md5(h.read()).hexdigest() 216 self.assertEquals(outbag1_md5, outbag2_md5) 218 shutil.rmtree(tempdir)
221 fn =
'/tmp/test_reindex_works.bag' 223 chunk_threshold = 1024
225 with
rosbag.Bag(fn,
'w', chunk_threshold=chunk_threshold)
as b:
230 b.write(
'/topic%d' % j, msg)
231 file_header_pos = b._file_header_pos
233 start_index = 4117 + chunk_threshold * 2 + int(chunk_threshold / 2)
235 trunc_filename =
'%s.trunc%s' % os.path.splitext(fn)
236 reindex_filename =
'%s.reindex%s' % os.path.splitext(fn)
238 for trunc_index
in range(start_index, start_index + chunk_threshold):
239 shutil.copy(fn, trunc_filename)
241 with open(trunc_filename,
'r+b')
as f:
242 f.seek(file_header_pos)
244 'op': bag._pack_uint8(bag._OP_FILE_HEADER),
245 'index_pos': bag._pack_uint64(0),
246 'conn_count': bag._pack_uint32(0),
247 'chunk_count': bag._pack_uint32(0)
249 bag._write_record(f, header, padded_size=bag._FILE_HEADER_LENGTH)
250 f.truncate(trunc_index)
252 shutil.copy(trunc_filename, reindex_filename)
255 b =
rosbag.Bag(reindex_filename,
'a', allow_unindexed=
True)
256 except Exception
as ex:
258 for done
in b.reindex():
262 msgs = list(
rosbag.Bag(reindex_filename,
'r')) 265 fn =
'/tmp/test_future_version_2.1.bag' 267 with
rosbag.Bag(fn,
'w', chunk_threshold=256)
as b:
273 header = {
'op': bag._pack_uint8(max(bag._OP_CODES.keys()) + 1) }
274 data =
'ABCDEFGHI123456789' 275 bag._write_record(b._file, header, data)
278 data =
'#ROSBAG V%d.%d\n' % (int(b._version / 100), (b._version % 100) + 1)
281 b._file.seek(0, os.SEEK_END)
284 for topic, msg, t
in b:
288 fn =
'/tmp/test_get_message_count.bag' 291 bag.write(
"/test_bag", Int32(data=i))
292 bag.write(
"/test_bag", String(data=
'also'))
293 bag.write(
"/test_bag/more", String(data=
'alone'))
296 self.assertEquals(bag.get_message_count(), 300)
297 self.assertEquals(bag.get_message_count(topic_filters=
'/test_bag'), 200)
298 self.assertEquals(bag.get_message_count(topic_filters=[
'/test_bag',
'/test_bag/more']), 300)
299 self.assertEquals(bag.get_message_count(topic_filters=[
'/none']), 0)
302 fn =
'/tmp/test_get_compression_info.bag' 307 bag.write(
"/test_bag", Int32(data=i))
310 info = bag.get_compression_info()
311 self.assertEquals(info.compression, rosbag.Compression.NONE)
313 self.assertEquals(info.uncompressed, 5166)
314 self.assertEquals(info.compressed, 5166)
316 with
rosbag.Bag(fn, mode=
'w', compression=rosbag.Compression.BZ2)
as bag:
318 bag.write(
"/test_bag", Int32(data=i))
321 info = bag.get_compression_info()
322 self.assertEquals(info.compression, rosbag.Compression.BZ2)
323 self.assertEquals(info.uncompressed, 5166)
327 self.assertLess(info.compressed, 1050)
328 self.assertGreater(info.compressed, 850)
331 fn =
'/tmp/test_get_time.bag' 335 bag.write(
"/test_bag", Int32(data=i), t=genpy.Time.from_sec(i))
338 start_stamp = bag.get_start_time()
339 end_stamp = bag.get_end_time()
341 self.assertEquals(start_stamp, 0.0)
342 self.assertEquals(end_stamp, 99.0)
345 """Test for issue #657""" 346 fn =
'/tmp/test_get_time_emtpy_bag.bag' 349 self.assertRaisesRegexp(rosbag.ROSBagException,
350 'Bag contains no message',
352 self.assertRaisesRegexp(rosbag.ROSBagException,
353 'Bag contains no message',
357 fn =
'/tmp/test_get_type_and_topic_info.bag' 358 topic_1 =
"/test_bag" 359 topic_2 =
"/test_bag/more" 362 bag.write(topic_1, Int32(data=i))
363 bag.write(topic_1, String(data=
'also'))
364 bag.write(topic_2, String(data=
'alone'))
367 msg_types, topics = bag.get_type_and_topic_info()
368 self.assertEquals(len(msg_types), 2)
369 self.assertTrue(
"std_msgs/Int32" in msg_types)
370 self.assertTrue(
"std_msgs/String" in msg_types)
371 self.assertEquals(len(topics), 2)
372 self.assertTrue(topic_1
in topics)
373 self.assertTrue(topic_2
in topics)
375 self.assertEquals(topics[topic_1].message_count, 200)
376 self.assertEquals(topics[topic_1].msg_type,
"std_msgs/Int32")
377 self.assertEquals(topics[topic_2].message_count, 100)
378 self.assertEquals(topics[topic_2].msg_type,
"std_msgs/String")
381 msg_types, topics = bag.get_type_and_topic_info(topic_1)
384 self.assertEquals(len(msg_types), 2)
385 self.assertTrue(
"std_msgs/Int32" in msg_types)
386 self.assertTrue(
"std_msgs/String" in msg_types)
388 self.assertEquals(len(topics), 1)
389 self.assertTrue(topic_1
in topics)
391 self.assertEquals(topics[topic_1].message_count, 200)
392 self.assertEquals(topics[topic_1].msg_type,
"std_msgs/Int32")
395 msg_types, topics = bag.get_type_and_topic_info(topic_2)
398 self.assertEquals(len(msg_types), 2)
399 self.assertTrue(
"std_msgs/Int32" in msg_types)
400 self.assertTrue(
"std_msgs/String" in msg_types)
402 self.assertEquals(len(topics), 1)
403 self.assertTrue(topic_2
in topics)
405 self.assertEquals(topics[topic_2].message_count, 100)
406 self.assertEquals(topics[topic_2].msg_type,
"std_msgs/String")
409 msg_types, topics = bag.get_type_and_topic_info(
"/none")
412 self.assertEquals(len(msg_types), 2)
413 self.assertTrue(
"std_msgs/Int32" in msg_types)
414 self.assertTrue(
"std_msgs/String" in msg_types)
417 self.assertEquals(len(topics), 0)
421 f.seek(0, os.SEEK_END)
425 version_line = f.readline().rstrip()
428 while f.tell() < size:
429 header = bag._read_header(f)
430 op = bag._read_uint8_field(header,
'op')
431 data = bag._read_record_data(f)
433 print(bag._OP_CODES.get(op, op))
435 if __name__ ==
'__main__':
438 rostest.run(PKG,
'TestRosbag', TestRosbag, sys.argv)
def test_writing_nonchronological_works(self)
def test_opening_stream_works(self)
def test_large_write_works(self)
def test_get_time_empty_bag(self)
def test_invalid_bag_arguments_fails(self)
def test_reindex_works(self)
def test_get_compression_info(self)
def test_get_message_count(self)
def test_io_on_close_fails(self)
def test_get_messages_filter_works(self)
def test_rosbag_filter(self)
def test_write_invalid_message_fails(self)
def _print_bag_records(self, fn)
def test_future_version_works(self)
def test_simple_write_uncompressed_works(self)
def test_get_type_and_topic_info(self)
def test_get_messages_time_range_works(self)
def test_trivial_rosbag_filter(self)