test_bag.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2008, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 #
00034 # test_bag.py
00035 
00036 import heapq
00037 import os
00038 import shutil
00039 import sys
00040 import time
00041 import unittest
00042 
00043 import genpy
00044 
00045 import rosbag
00046 from rosbag import bag
00047 import rospy
00048 from std_msgs.msg import Int32
00049 from std_msgs.msg import ColorRGBA
00050 
00051 class TestRosbag(unittest.TestCase):
00052     def setUp(self):
00053         pass
00054     
00055     def test_opening_stream_works(self):
00056         f = open('/tmp/test_opening_stream_works.bag', 'w')
00057         with rosbag.Bag(f, 'w') as b:
00058             for i in range(10):
00059                 msg = Int32()
00060                 msg.data = i
00061                 b.write('/int', msg)
00062         
00063         f = open('/tmp/test_opening_stream_works.bag', 'r')
00064         b = rosbag.Bag(f, 'r')
00065         self.assert_(len(list(b.read_messages())) == 10)
00066         b.close()
00067 
00068     def test_invalid_bag_arguments_fails(self):
00069         f = '/tmp/test_invalid_bad_arguments_fails.bag'
00070         
00071         def fn1(): rosbag.Bag('')
00072         def fn2(): rosbag.Bag(None)
00073         def fn3(): rosbag.Bag(f, 'z')        
00074         def fn4(): rosbag.Bag(f, 'r', compression='foobar')
00075         def fn5(): rosbag.Bag(f, 'r', chunk_threshold=-1000)
00076         for fn in [fn1, fn2, fn3, fn4, fn5]:
00077             self.failUnlessRaises(ValueError, fn)
00078 
00079     def test_io_on_close_fails(self):
00080         def fn():
00081             b = rosbag.Bag('/tmp/test_io_close_fails.bag', 'w')
00082             b.close()
00083             size = b.size()
00084         self.failUnlessRaises(ValueError, fn)
00085         
00086     def test_write_invalid_message_fails(self):
00087         def fn():
00088             with rosbag.Bag('/tmp/test_write_invalid_message_fails.bag', 'w') as b:
00089                 b.write(None, None, None)
00090         self.failUnlessRaises(ValueError, fn)
00091 
00092     def test_simple_write_uncompressed_works(self):
00093         with rosbag.Bag('/tmp/test_simple_write_uncompressed_works.bag', 'w') as b:
00094             msg_count = 0
00095             for i in range(5, 0, -1):
00096                 msg = Int32()
00097                 msg.data = i
00098                 t = genpy.Time.from_sec(i)
00099                 b.write('/ints' + str(i), msg, t)
00100                 msg_count += 1
00101 
00102         msgs = list(rosbag.Bag('/tmp/test_simple_write_uncompressed_works.bag').read_messages())
00103         
00104         self.assert_(len(msgs) == msg_count, 'not all messages written: expected %d, got %d' % (msg_count, len(msgs)))
00105 
00106         for (_, _, t1), (_, _, t2) in zip(msgs, msgs[1:]):
00107             self.assert_(t1 < t2, 'messages returned unordered: got timestamp %s before %s' % (str(t1), str(t2)))
00108 
00109     def test_writing_nonchronological_works(self):
00110         with rosbag.Bag('/tmp/test_writing_nonchronological_works.bag', 'w') as b:
00111             msg_count = 0
00112             for i in range(5, 0, -1):
00113                 msg = Int32()
00114                 msg.data = i
00115                 t = genpy.Time.from_sec(i)
00116                 b.write('/ints', msg, t)
00117                 msg_count += 1
00118 
00119         msgs = list(rosbag.Bag('/tmp/test_writing_nonchronological_works.bag').read_messages())
00120         
00121         self.assert_(len(msgs) == msg_count, 'not all messages written: expected %d, got %d' % (msg_count, len(msgs)))
00122 
00123         for (_, _, t1), (_, _, t2) in zip(msgs, msgs[1:]):
00124             self.assert_(t1 < t2, 'messages returned unordered: got timestamp %s before %s' % (str(t1), str(t2)))
00125 
00126     def test_large_write_works(self):
00127         for compression in [rosbag.Compression.NONE, rosbag.Compression.BZ2]:
00128             with rosbag.Bag('/tmp/test_large_write_works.bag', 'w', compression=compression) as b:
00129                 msg_count = 0
00130                 for i in range(10000):
00131                     msg = Int32()
00132                     msg.data = i
00133                     t = genpy.Time.from_sec(i)
00134                     b.write('/ints', msg, t)
00135                     msg_count += 1
00136 
00137             msgs = list(rosbag.Bag('/tmp/test_large_write_works.bag').read_messages())
00138 
00139             self.assertEquals(len(msgs), msg_count, 'not all messages written: expected %d, got %d' % (msg_count, len(msgs)))
00140 
00141             for (_, _, t1), (_, _, t2) in zip(msgs, msgs[1:]):
00142                 self.assert_(t1 < t2, 'messages returned unordered: got timestamp %s before %s' % (str(t1), str(t2)))
00143 
00144     def test_get_messages_time_range_works(self):
00145         with rosbag.Bag('/tmp/test_get_messages_time_range_works.bag', 'w') as b:
00146             for i in range(30):
00147                 msg = Int32()
00148                 msg.data = i
00149                 t = genpy.Time.from_sec(i)
00150                 b.write('/ints', msg, t)
00151         
00152         start_time = genpy.Time.from_sec(3)
00153         end_time = genpy.Time.from_sec(7)
00154         msgs = list(rosbag.Bag('/tmp/test_get_messages_time_range_works.bag').read_messages(topics='/ints', start_time=start_time, end_time=end_time))
00155 
00156         self.assertEquals(len(msgs), 5)
00157         
00158     def test_get_messages_filter_works(self):
00159         with rosbag.Bag('/tmp/test_get_messages_filter_works.bag', 'w') as b:
00160             for i in range(30):
00161                 msg = Int32()
00162                 msg.data = i
00163                 t = genpy.Time.from_sec(i)
00164                 b.write('/ints' + str(i), msg, t)
00165 
00166         def filter(topic, datatype, md5sum, msg_def, header):
00167             return '5' in topic and datatype == Int32._type and md5sum == Int32._md5sum and msg_def == Int32._full_text
00168 
00169         self.assertEquals(len(list(rosbag.Bag('/tmp/test_get_messages_filter_works.bag').read_messages(connection_filter=filter))), 3)
00170 
00171     def test_rosbag_filter(self):
00172         inbag_filename  = '/tmp/test_rosbag_filter__1.bag'
00173         outbag_filename = '/tmp/test_rosbag_filter__2.bag'
00174         
00175         with rosbag.Bag(inbag_filename, 'w') as b:
00176             for i in range(30):
00177                 msg = Int32()
00178                 msg.data = i
00179                 t = genpy.Time.from_sec(i)
00180                 b.write('/ints' + str(i), msg, t)
00181 
00182         expression = "(int(t.secs) == m.data) and (topic == '/ints' + str(m.data)) and (m.data >= 15 and m.data < 20)"
00183 
00184         os.system('rosbag filter %s %s "%s"' % (inbag_filename, outbag_filename, expression))
00185 
00186         msgs = list(rosbag.Bag(outbag_filename).read_messages())
00187 
00188         self.assertEquals(len(msgs), 5)
00189 
00190     def test_reindex_works(self):
00191         fn = '/tmp/test_reindex_works.bag'
00192         
00193         chunk_threshold = 1024
00194 
00195         with rosbag.Bag(fn, 'w', chunk_threshold=chunk_threshold) as b:
00196             for i in range(100):
00197                 for j in range(5):
00198                     msg = Int32()
00199                     msg.data = i
00200                     b.write('/topic%d' % j, msg)
00201             file_header_pos = b._file_header_pos
00202 
00203         start_index = 4117 + chunk_threshold * 2 + chunk_threshold / 2
00204 
00205         trunc_filename   = '%s.trunc%s'   % os.path.splitext(fn)
00206         reindex_filename = '%s.reindex%s' % os.path.splitext(fn)
00207 
00208         for trunc_index in range(start_index, start_index + chunk_threshold):
00209             shutil.copy(fn, trunc_filename)
00210             
00211             with open(trunc_filename, 'r+b') as f:
00212                 f.seek(file_header_pos)
00213                 header = {
00214                     'op':          bag._pack_uint8(bag._OP_FILE_HEADER),
00215                     'index_pos':   bag._pack_uint64(0),
00216                     'conn_count':  bag._pack_uint32(0),
00217                     'chunk_count': bag._pack_uint32(0)
00218                 }
00219                 bag._write_record(f, header, padded_size=bag._FILE_HEADER_LENGTH)
00220                 f.truncate(trunc_index)
00221 
00222             shutil.copy(trunc_filename, reindex_filename)
00223        
00224             try:
00225                 b = rosbag.Bag(reindex_filename, 'a', allow_unindexed=True)
00226             except Exception, ex:
00227                 pass
00228             for done in b.reindex():
00229                 pass
00230             b.close()
00231 
00232             msgs = list(rosbag.Bag(reindex_filename, 'r'))
00233 
00234     def test_future_version_works(self):
00235         fn = '/tmp/test_future_version_2.1.bag'
00236 
00237         with rosbag.Bag(fn, 'w', chunk_threshold=256) as b:
00238             for i in range(10):
00239                 msg = Int32()
00240                 msg.data = i
00241                 b.write('/int', msg)
00242 
00243                 header = { 'op': bag._pack_uint8(max(bag._OP_CODES.iterkeys()) + 1) }
00244                 data = 'ABCDEFGHI123456789'
00245                 bag._write_record(b._file, header, data)
00246 
00247             b._file.seek(0)
00248             b._file.write('#ROSBAG V%d.%d\n' % (b._version / 100, (b._version % 100) + 1))   # increment the minor version
00249             b._file.seek(0, os.SEEK_END)
00250 
00251         with rosbag.Bag(fn) as b:
00252             for topic, msg, t in b:
00253                 pass
00254 
00255     def _print_bag_records(self, fn):
00256         with open(fn) as f:
00257             f.seek(0, os.SEEK_END)
00258             size = f.tell()
00259             f.seek(0)
00260 
00261             version_line = f.readline().rstrip()
00262             print version_line
00263 
00264             while f.tell() < size:
00265                 header = bag._read_header(f)
00266                 op = bag._read_uint8_field(header, 'op')
00267                 data = bag._read_record_data(f)
00268 
00269                 print bag._OP_CODES.get(op, op)
00270 
00271 if __name__ == '__main__':
00272     import rostest
00273     PKG='rosbag'
00274     rostest.run(PKG, 'TestRosbag', TestRosbag, sys.argv)


test_rosbag
Author(s): Tim Field, Jeremy Leibs, James Bowman
autogenerated on Fri Aug 28 2015 12:34:27