test_bag.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # Software License Agreement (BSD License)
3 #
4 # Copyright (c) 2008, Willow Garage, Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 #
11 # * Redistributions of source code must retain the above copyright
12 # notice, this list of conditions and the following disclaimer.
13 # * Redistributions in binary form must reproduce the above
14 # copyright notice, this list of conditions and the following
15 # disclaimer in the documentation and/or other materials provided
16 # with the distribution.
17 # * Neither the name of Willow Garage, Inc. nor the names of its
18 # contributors may be used to endorse or promote products derived
19 # from this software without specific prior written permission.
20 #
21 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
29 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
31 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
32 # POSSIBILITY OF SUCH DAMAGE.
33 #
34 # test_bag.py
35 
36 import hashlib
37 import heapq
38 import os
39 import shutil
40 import sys
41 import tempfile
42 import time
43 import unittest
44 
45 import genpy
46 
47 import rosbag
48 from rosbag import bag
49 import rospy
50 from std_msgs.msg import Int32
51 from std_msgs.msg import ColorRGBA
52 from std_msgs.msg import String
53 
54 class TestRosbag(unittest.TestCase):
55  def setUp(self):
56  pass
57 
59  f = open('/tmp/test_opening_stream_works.bag', 'wb')
60  with rosbag.Bag(f, 'w') as b:
61  for i in range(10):
62  msg = Int32()
63  msg.data = i
64  b.write('/int', msg)
65 
66  f = open('/tmp/test_opening_stream_works.bag', 'rb')
67  b = rosbag.Bag(f, 'r')
68  self.assert_(len(list(b.read_messages())) == 10)
69  b.close()
70 
72  f = '/tmp/test_invalid_bad_arguments_fails.bag'
73 
74  def fn1(): rosbag.Bag('')
75  def fn2(): rosbag.Bag(None)
76  def fn3(): rosbag.Bag(f, 'z')
77  def fn4(): rosbag.Bag(f, 'r', compression='foobar')
78  def fn5(): rosbag.Bag(f, 'r', chunk_threshold=-1000)
79  for fn in [fn1, fn2, fn3, fn4, fn5]:
80  self.failUnlessRaises(ValueError, fn)
81 
83  def fn():
84  b = rosbag.Bag('/tmp/test_io_close_fails.bag', 'w')
85  b.close()
86  size = b.size()
87  self.failUnlessRaises(ValueError, fn)
88 
90  def fn():
91  with rosbag.Bag('/tmp/test_write_invalid_message_fails.bag', 'w') as b:
92  b.write(None, None, None)
93  self.failUnlessRaises(ValueError, fn)
94 
96  with rosbag.Bag('/tmp/test_simple_write_uncompressed_works.bag', 'w') as b:
97  msg_count = 0
98  for i in range(5, 0, -1):
99  msg = Int32()
100  msg.data = i
101  t = genpy.Time.from_sec(i)
102  b.write('/ints' + str(i), msg, t)
103  msg_count += 1
104 
105  msgs = list(rosbag.Bag('/tmp/test_simple_write_uncompressed_works.bag').read_messages())
106 
107  self.assert_(len(msgs) == msg_count, 'not all messages written: expected %d, got %d' % (msg_count, len(msgs)))
108 
109  for (_, _, t1), (_, _, t2) in zip(msgs, msgs[1:]):
110  self.assert_(t1 < t2, 'messages returned unordered: got timestamp %s before %s' % (str(t1), str(t2)))
111 
113  with rosbag.Bag('/tmp/test_writing_nonchronological_works.bag', 'w') as b:
114  msg_count = 0
115  for i in range(5, 0, -1):
116  msg = Int32()
117  msg.data = i
118  t = genpy.Time.from_sec(i)
119  b.write('/ints', msg, t)
120  msg_count += 1
121 
122  msgs = list(rosbag.Bag('/tmp/test_writing_nonchronological_works.bag').read_messages())
123 
124  self.assert_(len(msgs) == msg_count, 'not all messages written: expected %d, got %d' % (msg_count, len(msgs)))
125 
126  for (_, _, t1), (_, _, t2) in zip(msgs, msgs[1:]):
127  self.assert_(t1 < t2, 'messages returned unordered: got timestamp %s before %s' % (str(t1), str(t2)))
128 
130  for compression in [rosbag.Compression.NONE, rosbag.Compression.BZ2]:
131  with rosbag.Bag('/tmp/test_large_write_works.bag', 'w', compression=compression) as b:
132  msg_count = 0
133  for i in range(10000):
134  msg = Int32()
135  msg.data = i
136  t = genpy.Time.from_sec(i)
137  b.write('/ints', msg, t)
138  msg_count += 1
139 
140  msgs = list(rosbag.Bag('/tmp/test_large_write_works.bag').read_messages())
141 
142  self.assertEquals(len(msgs), msg_count, 'not all messages written: expected %d, got %d' % (msg_count, len(msgs)))
143 
144  for (_, _, t1), (_, _, t2) in zip(msgs, msgs[1:]):
145  self.assert_(t1 < t2, 'messages returned unordered: got timestamp %s before %s' % (str(t1), str(t2)))
146 
148  with rosbag.Bag('/tmp/test_get_messages_time_range_works.bag', 'w') as b:
149  for i in range(30):
150  msg = Int32()
151  msg.data = i
152  t = genpy.Time.from_sec(i)
153  b.write('/ints', msg, t)
154 
155  start_time = genpy.Time.from_sec(3)
156  end_time = genpy.Time.from_sec(7)
157  msgs = list(rosbag.Bag('/tmp/test_get_messages_time_range_works.bag').read_messages(topics='/ints', start_time=start_time, end_time=end_time))
158 
159  self.assertEquals(len(msgs), 5)
160 
162  with rosbag.Bag('/tmp/test_get_messages_filter_works.bag', 'w') as b:
163  for i in range(30):
164  msg = Int32()
165  msg.data = i
166  t = genpy.Time.from_sec(i)
167  b.write('/ints' + str(i), msg, t)
168 
169  def filter(topic, datatype, md5sum, msg_def, header):
170  return '5' in topic and datatype == Int32._type and md5sum == Int32._md5sum and msg_def == Int32._full_text
171 
172  self.assertEquals(len(list(rosbag.Bag('/tmp/test_get_messages_filter_works.bag').read_messages(connection_filter=filter))), 3)
173 
175  inbag_filename = '/tmp/test_rosbag_filter__1.bag'
176  outbag_filename = '/tmp/test_rosbag_filter__2.bag'
177 
178  with rosbag.Bag(inbag_filename, 'w') as b:
179  for i in range(30):
180  msg = Int32()
181  msg.data = i
182  t = genpy.Time.from_sec(i)
183  b.write('/ints' + str(i), msg, t)
184 
185  expression = "(int(t.secs) == m.data) and (topic == '/ints' + str(m.data)) and (m.data >= 15 and m.data < 20)"
186 
187  os.system('rosbag filter %s %s "%s"' % (inbag_filename, outbag_filename, expression))
188 
189  msgs = list(rosbag.Bag(outbag_filename).read_messages())
190 
191  self.assertEquals(len(msgs), 5)
192 
193  # Regression test for issue #736
195  tempdir = tempfile.mkdtemp()
196  try:
197  inbag_filename = os.path.join(tempdir, 'test_rosbag_filter__1.bag')
198  outbag1_filename = os.path.join(tempdir, 'test_rosbag_filter__2.bag')
199  outbag2_filename = os.path.join(tempdir, 'test_rosbag_filter__3.bag')
200 
201  with rosbag.Bag(inbag_filename, 'w') as b:
202  for i in range(30):
203  msg = ColorRGBA()
204  t = genpy.Time.from_sec(i)
205  b.write('/ints' + str(i), msg, t)
206 
207  # filtering multiple times should not affect the filtered rosbag
208  cmd = 'rosbag filter %s %s "True"'
209  os.system(cmd % (inbag_filename, outbag1_filename))
210  os.system(cmd % (outbag1_filename, outbag2_filename))
211 
212  with open(outbag1_filename, 'r') as h:
213  outbag1_md5 = hashlib.md5(h.read()).hexdigest()
214  with open(outbag2_filename, 'r') as h:
215  outbag2_md5 = hashlib.md5(h.read()).hexdigest()
216  self.assertEquals(outbag1_md5, outbag2_md5)
217  finally:
218  shutil.rmtree(tempdir)
219 
221  fn = '/tmp/test_reindex_works.bag'
222 
223  chunk_threshold = 1024
224 
225  with rosbag.Bag(fn, 'w', chunk_threshold=chunk_threshold) as b:
226  for i in range(100):
227  for j in range(5):
228  msg = Int32()
229  msg.data = i
230  b.write('/topic%d' % j, msg)
231  file_header_pos = b._file_header_pos
232 
233  start_index = 4117 + chunk_threshold * 2 + int(chunk_threshold / 2)
234 
235  trunc_filename = '%s.trunc%s' % os.path.splitext(fn)
236  reindex_filename = '%s.reindex%s' % os.path.splitext(fn)
237 
238  for trunc_index in range(start_index, start_index + chunk_threshold):
239  shutil.copy(fn, trunc_filename)
240 
241  with open(trunc_filename, 'r+b') as f:
242  f.seek(file_header_pos)
243  header = {
244  'op': bag._pack_uint8(bag._OP_FILE_HEADER),
245  'index_pos': bag._pack_uint64(0),
246  'conn_count': bag._pack_uint32(0),
247  'chunk_count': bag._pack_uint32(0)
248  }
249  bag._write_record(f, header, padded_size=bag._FILE_HEADER_LENGTH)
250  f.truncate(trunc_index)
251 
252  shutil.copy(trunc_filename, reindex_filename)
253 
254  try:
255  b = rosbag.Bag(reindex_filename, 'a', allow_unindexed=True)
256  except Exception as ex:
257  pass
258  for done in b.reindex():
259  pass
260  b.close()
261 
262  msgs = list(rosbag.Bag(reindex_filename, 'r'))
263 
265  fn = '/tmp/test_future_version_2.1.bag'
266 
267  with rosbag.Bag(fn, 'w', chunk_threshold=256) as b:
268  for i in range(10):
269  msg = Int32()
270  msg.data = i
271  b.write('/int', msg)
272 
273  header = { 'op': bag._pack_uint8(max(bag._OP_CODES.keys()) + 1) }
274  data = 'ABCDEFGHI123456789'
275  bag._write_record(b._file, header, data)
276 
277  b._file.seek(0)
278  data = '#ROSBAG V%d.%d\n' % (int(b._version / 100), (b._version % 100) + 1) # increment the minor version
279  data = data.encode()
280  b._file.write(data)
281  b._file.seek(0, os.SEEK_END)
282 
283  with rosbag.Bag(fn) as b:
284  for topic, msg, t in b:
285  pass
286 
288  fn = '/tmp/test_get_message_count.bag'
289  with rosbag.Bag(fn, mode='w') as bag:
290  for i in range(100):
291  bag.write("/test_bag", Int32(data=i))
292  bag.write("/test_bag", String(data='also'))
293  bag.write("/test_bag/more", String(data='alone'))
294 
295  with rosbag.Bag(fn) as bag:
296  self.assertEquals(bag.get_message_count(), 300)
297  self.assertEquals(bag.get_message_count(topic_filters='/test_bag'), 200)
298  self.assertEquals(bag.get_message_count(topic_filters=['/test_bag', '/test_bag/more']), 300)
299  self.assertEquals(bag.get_message_count(topic_filters=['/none']), 0)
300 
302  fn = '/tmp/test_get_compression_info.bag'
303 
304  # No Compression
305  with rosbag.Bag(fn, mode='w') as bag:
306  for i in range(100):
307  bag.write("/test_bag", Int32(data=i))
308 
309  with rosbag.Bag(fn) as bag:
310  info = bag.get_compression_info()
311  self.assertEquals(info.compression, rosbag.Compression.NONE)
312  # 167 Bytes of overhead, 50 Bytes per Int32.
313  self.assertEquals(info.uncompressed, 5166)
314  self.assertEquals(info.compressed, 5166)
315 
316  with rosbag.Bag(fn, mode='w', compression=rosbag.Compression.BZ2) as bag:
317  for i in range(100):
318  bag.write("/test_bag", Int32(data=i))
319 
320  with rosbag.Bag(fn) as bag:
321  info = bag.get_compression_info()
322  self.assertEquals(info.compression, rosbag.Compression.BZ2)
323  self.assertEquals(info.uncompressed, 5166)
324 
325  # the value varies each run, I suspect based on rand, but seems
326  # to generally be around 960 to 980 on my comp
327  self.assertLess(info.compressed, 1050)
328  self.assertGreater(info.compressed, 850)
329 
330  def test_get_time(self):
331  fn = '/tmp/test_get_time.bag'
332 
333  with rosbag.Bag(fn, mode='w') as bag:
334  for i in range(100):
335  bag.write("/test_bag", Int32(data=i), t=genpy.Time.from_sec(i))
336 
337  with rosbag.Bag(fn) as bag:
338  start_stamp = bag.get_start_time()
339  end_stamp = bag.get_end_time()
340 
341  self.assertEquals(start_stamp, 0.0)
342  self.assertEquals(end_stamp, 99.0)
343 
345  """Test for issue #657"""
346  fn = '/tmp/test_get_time_emtpy_bag.bag'
347 
348  with rosbag.Bag(fn, mode='w') as bag:
349  self.assertRaisesRegexp(rosbag.ROSBagException,
350  'Bag contains no message',
351  bag.get_start_time)
352  self.assertRaisesRegexp(rosbag.ROSBagException,
353  'Bag contains no message',
354  bag.get_end_time)
355 
357  fn = '/tmp/test_get_type_and_topic_info.bag'
358  topic_1 = "/test_bag"
359  topic_2 = "/test_bag/more"
360  with rosbag.Bag(fn, mode='w') as bag:
361  for i in range(100):
362  bag.write(topic_1, Int32(data=i))
363  bag.write(topic_1, String(data='also'))
364  bag.write(topic_2, String(data='alone'))
365 
366  with rosbag.Bag(fn) as bag:
367  msg_types, topics = bag.get_type_and_topic_info()
368  self.assertEquals(len(msg_types), 2)
369  self.assertTrue("std_msgs/Int32" in msg_types)
370  self.assertTrue("std_msgs/String" in msg_types)
371  self.assertEquals(len(topics), 2)
372  self.assertTrue(topic_1 in topics)
373  self.assertTrue(topic_2 in topics)
374 
375  self.assertEquals(topics[topic_1].message_count, 200)
376  self.assertEquals(topics[topic_1].msg_type, "std_msgs/Int32")
377  self.assertEquals(topics[topic_2].message_count, 100)
378  self.assertEquals(topics[topic_2].msg_type, "std_msgs/String")
379 
380  #filter on topic 1
381  msg_types, topics = bag.get_type_and_topic_info(topic_1)
382 
383  # msg_types should be unaffected by the filter
384  self.assertEquals(len(msg_types), 2)
385  self.assertTrue("std_msgs/Int32" in msg_types)
386  self.assertTrue("std_msgs/String" in msg_types)
387 
388  self.assertEquals(len(topics), 1)
389  self.assertTrue(topic_1 in topics)
390 
391  self.assertEquals(topics[topic_1].message_count, 200)
392  self.assertEquals(topics[topic_1].msg_type, "std_msgs/Int32")
393 
394  #filter on topic 2
395  msg_types, topics = bag.get_type_and_topic_info(topic_2)
396 
397  # msg_types should be unaffected by the filter
398  self.assertEquals(len(msg_types), 2)
399  self.assertTrue("std_msgs/Int32" in msg_types)
400  self.assertTrue("std_msgs/String" in msg_types)
401 
402  self.assertEquals(len(topics), 1)
403  self.assertTrue(topic_2 in topics)
404 
405  self.assertEquals(topics[topic_2].message_count, 100)
406  self.assertEquals(topics[topic_2].msg_type, "std_msgs/String")
407 
408  #filter on missing topic
409  msg_types, topics = bag.get_type_and_topic_info("/none")
410 
411  # msg_types should be unaffected by the filter
412  self.assertEquals(len(msg_types), 2)
413  self.assertTrue("std_msgs/Int32" in msg_types)
414  self.assertTrue("std_msgs/String" in msg_types)
415 
416  # topics should be empty
417  self.assertEquals(len(topics), 0)
418 
419  def _print_bag_records(self, fn):
420  with open(fn) as f:
421  f.seek(0, os.SEEK_END)
422  size = f.tell()
423  f.seek(0)
424 
425  version_line = f.readline().rstrip()
426  print(version_line)
427 
428  while f.tell() < size:
429  header = bag._read_header(f)
430  op = bag._read_uint8_field(header, 'op')
431  data = bag._read_record_data(f)
432 
433  print(bag._OP_CODES.get(op, op))
434 
435 if __name__ == '__main__':
436  import rostest
437  PKG='rosbag'
438  rostest.run(PKG, 'TestRosbag', TestRosbag, sys.argv)
def test_writing_nonchronological_works(self)
Definition: test_bag.py:112
def setUp(self)
Definition: test_bag.py:55
def test_opening_stream_works(self)
Definition: test_bag.py:58
def test_large_write_works(self)
Definition: test_bag.py:129
def test_get_time_empty_bag(self)
Definition: test_bag.py:344
def test_invalid_bag_arguments_fails(self)
Definition: test_bag.py:71
def test_reindex_works(self)
Definition: test_bag.py:220
def test_get_compression_info(self)
Definition: test_bag.py:301
def test_get_message_count(self)
Definition: test_bag.py:287
def test_io_on_close_fails(self)
Definition: test_bag.py:82
def test_get_messages_filter_works(self)
Definition: test_bag.py:161
def test_rosbag_filter(self)
Definition: test_bag.py:174
def test_write_invalid_message_fails(self)
Definition: test_bag.py:89
def _print_bag_records(self, fn)
Definition: test_bag.py:419
def test_get_time(self)
Definition: test_bag.py:330
def test_future_version_works(self)
Definition: test_bag.py:264
def test_simple_write_uncompressed_works(self)
Definition: test_bag.py:95
def test_get_type_and_topic_info(self)
Definition: test_bag.py:356
def test_get_messages_time_range_works(self)
Definition: test_bag.py:147
def test_trivial_rosbag_filter(self)
Definition: test_bag.py:194


test_rosbag
Author(s): Tim Field, Jeremy Leibs, James Bowman
autogenerated on Sun Feb 3 2019 03:30:35