test_bag.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2008, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of Willow Garage, Inc. nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 #
00034 # test_bag.py
00035 
00036 import heapq
00037 import os
00038 import shutil
00039 import sys
00040 import time
00041 import unittest
00042 
00043 import genpy
00044 
00045 import rosbag
00046 from rosbag import bag
00047 import rospy
00048 from std_msgs.msg import Int32
00049 from std_msgs.msg import ColorRGBA
00050 from std_msgs.msg import String
00051 
00052 class TestRosbag(unittest.TestCase):
00053     def setUp(self):
00054         pass
00055     
00056     def test_opening_stream_works(self):
00057         f = open('/tmp/test_opening_stream_works.bag', 'wb')
00058         with rosbag.Bag(f, 'w') as b:
00059             for i in range(10):
00060                 msg = Int32()
00061                 msg.data = i
00062                 b.write('/int', msg)
00063         
00064         f = open('/tmp/test_opening_stream_works.bag', 'rb')
00065         b = rosbag.Bag(f, 'r')
00066         self.assert_(len(list(b.read_messages())) == 10)
00067         b.close()
00068 
00069     def test_invalid_bag_arguments_fails(self):
00070         f = '/tmp/test_invalid_bad_arguments_fails.bag'
00071         
00072         def fn1(): rosbag.Bag('')
00073         def fn2(): rosbag.Bag(None)
00074         def fn3(): rosbag.Bag(f, 'z')        
00075         def fn4(): rosbag.Bag(f, 'r', compression='foobar')
00076         def fn5(): rosbag.Bag(f, 'r', chunk_threshold=-1000)
00077         for fn in [fn1, fn2, fn3, fn4, fn5]:
00078             self.failUnlessRaises(ValueError, fn)
00079 
00080     def test_io_on_close_fails(self):
00081         def fn():
00082             b = rosbag.Bag('/tmp/test_io_close_fails.bag', 'w')
00083             b.close()
00084             size = b.size()
00085         self.failUnlessRaises(ValueError, fn)
00086         
00087     def test_write_invalid_message_fails(self):
00088         def fn():
00089             with rosbag.Bag('/tmp/test_write_invalid_message_fails.bag', 'w') as b:
00090                 b.write(None, None, None)
00091         self.failUnlessRaises(ValueError, fn)
00092 
00093     def test_simple_write_uncompressed_works(self):
00094         with rosbag.Bag('/tmp/test_simple_write_uncompressed_works.bag', 'w') as b:
00095             msg_count = 0
00096             for i in range(5, 0, -1):
00097                 msg = Int32()
00098                 msg.data = i
00099                 t = genpy.Time.from_sec(i)
00100                 b.write('/ints' + str(i), msg, t)
00101                 msg_count += 1
00102 
00103         msgs = list(rosbag.Bag('/tmp/test_simple_write_uncompressed_works.bag').read_messages())
00104         
00105         self.assert_(len(msgs) == msg_count, 'not all messages written: expected %d, got %d' % (msg_count, len(msgs)))
00106 
00107         for (_, _, t1), (_, _, t2) in zip(msgs, msgs[1:]):
00108             self.assert_(t1 < t2, 'messages returned unordered: got timestamp %s before %s' % (str(t1), str(t2)))
00109 
00110     def test_writing_nonchronological_works(self):
00111         with rosbag.Bag('/tmp/test_writing_nonchronological_works.bag', 'w') as b:
00112             msg_count = 0
00113             for i in range(5, 0, -1):
00114                 msg = Int32()
00115                 msg.data = i
00116                 t = genpy.Time.from_sec(i)
00117                 b.write('/ints', msg, t)
00118                 msg_count += 1
00119 
00120         msgs = list(rosbag.Bag('/tmp/test_writing_nonchronological_works.bag').read_messages())
00121         
00122         self.assert_(len(msgs) == msg_count, 'not all messages written: expected %d, got %d' % (msg_count, len(msgs)))
00123 
00124         for (_, _, t1), (_, _, t2) in zip(msgs, msgs[1:]):
00125             self.assert_(t1 < t2, 'messages returned unordered: got timestamp %s before %s' % (str(t1), str(t2)))
00126 
00127     def test_large_write_works(self):
00128         for compression in [rosbag.Compression.NONE, rosbag.Compression.BZ2]:
00129             with rosbag.Bag('/tmp/test_large_write_works.bag', 'w', compression=compression) as b:
00130                 msg_count = 0
00131                 for i in range(10000):
00132                     msg = Int32()
00133                     msg.data = i
00134                     t = genpy.Time.from_sec(i)
00135                     b.write('/ints', msg, t)
00136                     msg_count += 1
00137 
00138             msgs = list(rosbag.Bag('/tmp/test_large_write_works.bag').read_messages())
00139 
00140             self.assertEquals(len(msgs), msg_count, 'not all messages written: expected %d, got %d' % (msg_count, len(msgs)))
00141 
00142             for (_, _, t1), (_, _, t2) in zip(msgs, msgs[1:]):
00143                 self.assert_(t1 < t2, 'messages returned unordered: got timestamp %s before %s' % (str(t1), str(t2)))
00144 
00145     def test_get_messages_time_range_works(self):
00146         with rosbag.Bag('/tmp/test_get_messages_time_range_works.bag', 'w') as b:
00147             for i in range(30):
00148                 msg = Int32()
00149                 msg.data = i
00150                 t = genpy.Time.from_sec(i)
00151                 b.write('/ints', msg, t)
00152         
00153         start_time = genpy.Time.from_sec(3)
00154         end_time = genpy.Time.from_sec(7)
00155         msgs = list(rosbag.Bag('/tmp/test_get_messages_time_range_works.bag').read_messages(topics='/ints', start_time=start_time, end_time=end_time))
00156 
00157         self.assertEquals(len(msgs), 5)
00158         
00159     def test_get_messages_filter_works(self):
00160         with rosbag.Bag('/tmp/test_get_messages_filter_works.bag', 'w') as b:
00161             for i in range(30):
00162                 msg = Int32()
00163                 msg.data = i
00164                 t = genpy.Time.from_sec(i)
00165                 b.write('/ints' + str(i), msg, t)
00166 
00167         def filter(topic, datatype, md5sum, msg_def, header):
00168             return '5' in topic and datatype == Int32._type and md5sum == Int32._md5sum and msg_def == Int32._full_text
00169 
00170         self.assertEquals(len(list(rosbag.Bag('/tmp/test_get_messages_filter_works.bag').read_messages(connection_filter=filter))), 3)
00171 
00172     def test_rosbag_filter(self):
00173         inbag_filename  = '/tmp/test_rosbag_filter__1.bag'
00174         outbag_filename = '/tmp/test_rosbag_filter__2.bag'
00175         
00176         with rosbag.Bag(inbag_filename, 'w') as b:
00177             for i in range(30):
00178                 msg = Int32()
00179                 msg.data = i
00180                 t = genpy.Time.from_sec(i)
00181                 b.write('/ints' + str(i), msg, t)
00182 
00183         expression = "(int(t.secs) == m.data) and (topic == '/ints' + str(m.data)) and (m.data >= 15 and m.data < 20)"
00184 
00185         os.system('rosbag filter %s %s "%s"' % (inbag_filename, outbag_filename, expression))
00186 
00187         msgs = list(rosbag.Bag(outbag_filename).read_messages())
00188 
00189         self.assertEquals(len(msgs), 5)
00190 
00191     def test_reindex_works(self):
00192         fn = '/tmp/test_reindex_works.bag'
00193         
00194         chunk_threshold = 1024
00195 
00196         with rosbag.Bag(fn, 'w', chunk_threshold=chunk_threshold) as b:
00197             for i in range(100):
00198                 for j in range(5):
00199                     msg = Int32()
00200                     msg.data = i
00201                     b.write('/topic%d' % j, msg)
00202             file_header_pos = b._file_header_pos
00203 
00204         start_index = 4117 + chunk_threshold * 2 + int(chunk_threshold / 2)
00205 
00206         trunc_filename   = '%s.trunc%s'   % os.path.splitext(fn)
00207         reindex_filename = '%s.reindex%s' % os.path.splitext(fn)
00208 
00209         for trunc_index in range(start_index, start_index + chunk_threshold):
00210             shutil.copy(fn, trunc_filename)
00211             
00212             with open(trunc_filename, 'r+b') as f:
00213                 f.seek(file_header_pos)
00214                 header = {
00215                     'op':          bag._pack_uint8(bag._OP_FILE_HEADER),
00216                     'index_pos':   bag._pack_uint64(0),
00217                     'conn_count':  bag._pack_uint32(0),
00218                     'chunk_count': bag._pack_uint32(0)
00219                 }
00220                 bag._write_record(f, header, padded_size=bag._FILE_HEADER_LENGTH)
00221                 f.truncate(trunc_index)
00222 
00223             shutil.copy(trunc_filename, reindex_filename)
00224        
00225             try:
00226                 b = rosbag.Bag(reindex_filename, 'a', allow_unindexed=True)
00227             except Exception as ex:
00228                 pass
00229             for done in b.reindex():
00230                 pass
00231             b.close()
00232 
00233             msgs = list(rosbag.Bag(reindex_filename, 'r'))
00234 
00235     def test_future_version_works(self):
00236         fn = '/tmp/test_future_version_2.1.bag'
00237 
00238         with rosbag.Bag(fn, 'w', chunk_threshold=256) as b:
00239             for i in range(10):
00240                 msg = Int32()
00241                 msg.data = i
00242                 b.write('/int', msg)
00243 
00244                 header = { 'op': bag._pack_uint8(max(bag._OP_CODES.keys()) + 1) }
00245                 data = 'ABCDEFGHI123456789'
00246                 bag._write_record(b._file, header, data)
00247 
00248             b._file.seek(0)
00249             data = '#ROSBAG V%d.%d\n' % (int(b._version / 100), (b._version % 100) + 1)  # increment the minor version
00250             data = data.encode()
00251             b._file.write(data)
00252             b._file.seek(0, os.SEEK_END)
00253 
00254         with rosbag.Bag(fn) as b:
00255             for topic, msg, t in b:
00256                 pass
00257             
00258     def test_get_message_count(self):
00259         fn = '/tmp/test_get_message_count.bag'
00260         with rosbag.Bag(fn, mode='w') as bag:
00261             for i in xrange(100):
00262                 bag.write("/test_bag", Int32(data=i))
00263                 bag.write("/test_bag", String(data='also'))
00264                 bag.write("/test_bag/more", String(data='alone'))
00265                 
00266         with rosbag.Bag(fn) as bag:
00267             self.assertEquals(bag.get_message_count(), 300)
00268             self.assertEquals(bag.get_message_count(topic_filters='/test_bag'), 200)
00269             self.assertEquals(bag.get_message_count(topic_filters=['/test_bag', '/test_bag/more']), 300)
00270             self.assertEquals(bag.get_message_count(topic_filters=['/none']), 0)
00271         
00272     def test_get_compression_info(self):
00273         fn = '/tmp/test_get_compression_info.bag'
00274         
00275         # No Compression
00276         with rosbag.Bag(fn, mode='w') as bag:
00277             for i in xrange(100):
00278                 bag.write("/test_bag", Int32(data=i))
00279                 
00280         with rosbag.Bag(fn) as bag:
00281             info = bag.get_compression_info()
00282             self.assertEquals(info.compression, rosbag.Compression.NONE)
00283             # 167 Bytes of overhead, 50 Bytes per Int32.
00284             self.assertEquals(info.uncompressed, 5166)
00285             self.assertEquals(info.compressed, 5166)
00286         
00287         with rosbag.Bag(fn, mode='w', compression=rosbag.Compression.BZ2) as bag:
00288             for i in xrange(100):
00289                 bag.write("/test_bag", Int32(data=i))
00290                 
00291         with rosbag.Bag(fn) as bag:
00292             info = bag.get_compression_info()
00293             self.assertEquals(info.compression, rosbag.Compression.BZ2)
00294             self.assertEquals(info.uncompressed, 5166)
00295             
00296             # the value varies each run, I suspect based on rand, but seems
00297             # to generally be around 960 to 980 on my comp
00298             self.assertLess(info.compressed, 1000)
00299             self.assertGreater(info.compressed, 900)
00300         
00301     def test_get_time(self):
00302         fn = '/tmp/test_get_time.bag'
00303         
00304         with rosbag.Bag(fn, mode='w') as bag:
00305             for i in xrange(100):
00306                 bag.write("/test_bag", Int32(data=i), t=genpy.Time.from_sec(i))
00307                 
00308         with rosbag.Bag(fn) as bag:
00309             start_stamp = bag.get_start_time()
00310             end_stamp = bag.get_end_time()
00311             
00312             self.assertEquals(start_stamp, 0.0)
00313             self.assertEquals(end_stamp, 99.0)
00314 
00315     def test_get_time_empty_bag(self):
00316         """Test for issue #657"""
00317         fn = '/tmp/test_get_time_emtpy_bag.bag'
00318 
00319         with rosbag.Bag(fn, mode='w') as bag:
00320             self.assertRaisesRegexp(rosbag.ROSBagException,
00321                                     'Bag contains no message',
00322                                     bag.get_start_time)
00323             self.assertRaisesRegexp(rosbag.ROSBagException,
00324                                     'Bag contains no message',
00325                                     bag.get_end_time)
00326 
00327     def test_get_type_and_topic_info(self):
00328         fn = '/tmp/test_get_type_and_topic_info.bag'
00329         topic_1 = "/test_bag"
00330         topic_2 = "/test_bag/more"
00331         with rosbag.Bag(fn, mode='w') as bag:
00332             for i in xrange(100):
00333                 bag.write(topic_1, Int32(data=i))
00334                 bag.write(topic_1, String(data='also'))
00335                 bag.write(topic_2, String(data='alone'))
00336                 
00337         with rosbag.Bag(fn) as bag:
00338             msg_types, topics = bag.get_type_and_topic_info()
00339             self.assertEquals(len(msg_types), 2)
00340             self.assertTrue("std_msgs/Int32" in msg_types)
00341             self.assertTrue("std_msgs/String" in msg_types)
00342             self.assertEquals(len(topics), 2)
00343             self.assertTrue(topic_1 in topics)
00344             self.assertTrue(topic_2 in topics)
00345             
00346             self.assertEquals(topics[topic_1].message_count, 200)
00347             self.assertEquals(topics[topic_1].msg_type, "std_msgs/Int32")
00348             self.assertEquals(topics[topic_2].message_count, 100)
00349             self.assertEquals(topics[topic_2].msg_type, "std_msgs/String")
00350             
00351             #filter on topic 1
00352             msg_types, topics = bag.get_type_and_topic_info(topic_1)
00353             
00354             # msg_types should be unaffected by the filter
00355             self.assertEquals(len(msg_types), 2)
00356             self.assertTrue("std_msgs/Int32" in msg_types)
00357             self.assertTrue("std_msgs/String" in msg_types)            
00358             
00359             self.assertEquals(len(topics), 1)
00360             self.assertTrue(topic_1 in topics)
00361             
00362             self.assertEquals(topics[topic_1].message_count, 200)
00363             self.assertEquals(topics[topic_1].msg_type, "std_msgs/Int32")
00364             
00365             #filter on topic 2
00366             msg_types, topics = bag.get_type_and_topic_info(topic_2)
00367             
00368             # msg_types should be unaffected by the filter
00369             self.assertEquals(len(msg_types), 2)
00370             self.assertTrue("std_msgs/Int32" in msg_types)
00371             self.assertTrue("std_msgs/String" in msg_types)            
00372             
00373             self.assertEquals(len(topics), 1)
00374             self.assertTrue(topic_2 in topics)
00375             
00376             self.assertEquals(topics[topic_2].message_count, 100)
00377             self.assertEquals(topics[topic_2].msg_type, "std_msgs/String")
00378             
00379             #filter on missing topic
00380             msg_types, topics = bag.get_type_and_topic_info("/none")
00381             
00382             # msg_types should be unaffected by the filter
00383             self.assertEquals(len(msg_types), 2)
00384             self.assertTrue("std_msgs/Int32" in msg_types)
00385             self.assertTrue("std_msgs/String" in msg_types)            
00386             
00387             # topics should be empty
00388             self.assertEquals(len(topics), 0)
00389         
00390     def _print_bag_records(self, fn):
00391         with open(fn) as f:
00392             f.seek(0, os.SEEK_END)
00393             size = f.tell()
00394             f.seek(0)
00395 
00396             version_line = f.readline().rstrip()
00397             print(version_line)
00398 
00399             while f.tell() < size:
00400                 header = bag._read_header(f)
00401                 op = bag._read_uint8_field(header, 'op')
00402                 data = bag._read_record_data(f)
00403 
00404                 print(bag._OP_CODES.get(op, op))
00405 
00406 if __name__ == '__main__':
00407     import rostest
00408     PKG='rosbag'
00409     rostest.run(PKG, 'TestRosbag', TestRosbag, sys.argv)


test_rosbag
Author(s): Tim Field, Jeremy Leibs, James Bowman
autogenerated on Tue Mar 7 2017 03:46:01