$search
00001 #!/usr/bin/env python 00002 # Software License Agreement (BSD License) 00003 # 00004 # Copyright (c) 2008, Willow Garage, Inc. 00005 # All rights reserved. 00006 # 00007 # Redistribution and use in source and binary forms, with or without 00008 # modification, are permitted provided that the following conditions 00009 # are met: 00010 # 00011 # * Redistributions of source code must retain the above copyright 00012 # notice, this list of conditions and the following disclaimer. 00013 # * Redistributions in binary form must reproduce the above 00014 # copyright notice, this list of conditions and the following 00015 # disclaimer in the documentation and/or other materials provided 00016 # with the distribution. 00017 # * Neither the name of Willow Garage, Inc. nor the names of its 00018 # contributors may be used to endorse or promote products derived 00019 # from this software without specific prior written permission. 00020 # 00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 00032 # POSSIBILITY OF SUCH DAMAGE. 00033 # 00034 00035 import roslib 00036 roslib.load_manifest('test_rosbag') 00037 00038 import sys 00039 import struct 00040 00041 import unittest 00042 00043 import rostest 00044 import rosbag 00045 00046 class MigrationTest(unittest.TestCase): 00047 00048 def setUp(self): 00049 self.pkg_dir = roslib.packages.get_pkg_dir("test_rosbag") 00050 00051 def test_unmigrated(self): 00052 tmp_rule_files = ['migrated_explicit_rules.bmr', 'migrated_mixed_rules.bmr', 'migrated_addsub_rules.bmr'] 00053 rule_files = ["%s/test/%s"%(self.pkg_dir,r) for r in tmp_rule_files] 00054 00055 inbag = "%s/test/unmigrated_gen1.bag"%(self.pkg_dir,) 00056 00057 mm = rosbag.migration.MessageMigrator(rule_files,False) 00058 res = rosbag.migration.checkbag(mm, inbag) 00059 00060 self.assertTrue(len(res[0][1]) == 1) 00061 self.assertTrue(not res[0][1][0].valid) 00062 self.assertEqual(res[0][1][0].old_class._md5sum, '4b12e5ff694b0e2a31b2ea9e0bd900f4') 00063 self.assertEqual(res[0][1][0].new_class._md5sum, 'b5d640967dccef2a24697ec4b8a571ec') 00064 00065 00066 def test_subunmigrated(self): 00067 tmp_rule_files = ['migrated_explicit_rules.bmr', 'migrated_mixed_rules.bmr', 'migrated_addsub_rules.bmr'] 00068 rule_files = ["%s/test/%s"%(self.pkg_dir,r) for r in tmp_rule_files] 00069 00070 inbag = "%s/test/subunmigrated_gen1.bag"%(self.pkg_dir,) 00071 00072 mm = rosbag.migration.MessageMigrator(rule_files, False) 00073 res = rosbag.migration.checkbag(mm, inbag) 00074 00075 self.assertTrue(len(res[0][1]) == 1) 00076 self.assertTrue(not res[0][1][0].valid) 00077 self.assertEqual(res[0][1][0].old_class._md5sum, '4b12e5ff694b0e2a31b2ea9e0bd900f4') 00078 self.assertEqual(res[0][1][0].new_class._md5sum, 'b5d640967dccef2a24697ec4b8a571ec') 00079 00080 def do_test_partially_migrated(self, N): 00081 tmp_rule_files = ['migrated_explicit_rules.bmr', 'migrated_mixed_rules.bmr', 'migrated_addsub_rules.bmr', 'partially_migrated_rules.bmr'] 00082 rule_files = ["%s/test/%s"%(self.pkg_dir,r) for r in tmp_rule_files] 00083 00084 inbag = "%s/test/partially_migrated_gen%d.bag"%(self.pkg_dir,N) 00085 00086 mm = rosbag.migration.MessageMigrator(rule_files, False) 00087 res = rosbag.migration.checkbag(mm, inbag) 00088 00089 self.assertTrue(len(res[0][1]) == 1) 00090 self.assertTrue(not res[0][1][0].valid) 00091 self.assertEqual(res[0][1][0].old_class._md5sum, 'aba12af164ecf3f5cd150fb990205c4b') 00092 self.assertEqual(res[0][1][0].new_class._md5sum, 'b942bf4a41fb2bebc502889fd8981dfe') 00093 00094 00095 def test_partially_migrated_gen1(self): 00096 self.do_test_partially_migrated(1) 00097 00098 def test_partially_migrated_gen2(self): 00099 self.do_test_partially_migrated(2) 00100 00101 def test_partially_migrated_gen3(self): 00102 self.do_test_partially_migrated(3) 00103 00104 00105 def test_addsub(self): 00106 tmp_rule_files = ['migrated_explicit_rules.bmr', 'migrated_mixed_rules.bmr', 'migrated_addsub_rules.bmr'] 00107 rule_files = ["%s/test/%s"%(self.pkg_dir,r) for r in tmp_rule_files] 00108 00109 inbag = "%s/test/migrated_addsub_gen1.bag"%(self.pkg_dir,) 00110 outbag = "%s/test/migrated_addsub_gen1.fixed.bag"%(self.pkg_dir,) 00111 00112 mm = rosbag.migration.MessageMigrator(rule_files, False) 00113 res = rosbag.migration.checkbag(mm, inbag) 00114 00115 self.assertTrue(not False in [m[1] == [] for m in res], 'Bag not ready to be migrated') 00116 res = rosbag.migration.fixbag(mm, inbag, outbag) 00117 self.assertTrue(res, 'Bag not converted successfully') 00118 00119 msgs = [msg for msg in rosbag.Bag(outbag).read_messages()] 00120 00121 self.assertTrue(len(msgs) > 0) 00122 00123 self.assertEqual(msgs[0][1].field1.field1, 42) 00124 self.assertEqual(msgs[0][1].field2.field1, 42) 00125 00126 00127 def do_test_migrated_explicit(self, N): 00128 tmp_rule_files = ['migrated_explicit_rules.bmr', 'migrated_mixed_rules.bmr', 'migrated_addsub_rules.bmr'] 00129 rule_files = ["%s/test/%s"%(self.pkg_dir,r) for r in tmp_rule_files] 00130 00131 inbag = "%s/test/migrated_explicit_gen%d.bag"%(self.pkg_dir,N) 00132 outbag = "%s/test/migrated_explicit_gen%d.fixed.bag"%(self.pkg_dir,N) 00133 00134 mm = rosbag.migration.MessageMigrator(rule_files, False) 00135 res = rosbag.migration.checkbag(mm, inbag) 00136 self.assertTrue(not False in [m[1] == [] for m in res], 'Bag not ready to be migrated') 00137 00138 res = rosbag.migration.fixbag(mm, inbag, outbag) 00139 self.assertTrue(res, 'Bag not converted successfully') 00140 00141 msgs = [msg for msg in rosbag.Bag(outbag).read_messages()] 00142 00143 self.assertTrue(len(msgs) > 0) 00144 00145 self.assertEqual(msgs[0][1].afield2, struct.unpack('<f',struct.pack('<f',58.2))[0]) 00146 self.assertEqual(msgs[0][1].combo_field3, "aldfkja 17") 00147 self.assertEqual(msgs[0][1].afield4, 82) 00148 00149 def test_migrated_explicit_gen1(self): 00150 self.do_test_migrated_explicit(1) 00151 00152 def test_migrated_explicit_gen2(self): 00153 self.do_test_migrated_explicit(2) 00154 00155 def test_migrated_explicit_gen3(self): 00156 self.do_test_migrated_explicit(3) 00157 00158 00159 00160 def do_test_migrated_implicit(self, N): 00161 tmp_rule_files = ['migrated_explicit_rules.bmr', 'migrated_mixed_rules.bmr', 'migrated_addsub_rules.bmr'] 00162 rule_files = ["%s/test/%s"%(self.pkg_dir,r) for r in tmp_rule_files] 00163 00164 inbag = "%s/test/migrated_implicit_gen%d.bag"%(self.pkg_dir,N) 00165 outbag = "%s/test/migrated_implicit_gen%d.fixed.bag"%(self.pkg_dir,N) 00166 00167 mm = rosbag.migration.MessageMigrator(rule_files, False) 00168 res = rosbag.migration.checkbag(mm, inbag 00169 ) 00170 self.assertTrue(not False in [m[1] == [] for m in res], 'Bag not ready to be migrated') 00171 res = rosbag.migration.fixbag(mm, inbag, outbag) 00172 self.assertTrue(res, 'Bag not converted successfully') 00173 00174 msgs = [msg for msg in rosbag.Bag(outbag).read_messages()] 00175 00176 self.assertTrue(len(msgs) > 0) 00177 00178 self.assertEqual(msgs[0][1].field4.afield2, struct.unpack('<f',struct.pack('<f',58.2))[0]) 00179 self.assertEqual(msgs[0][1].field4.combo_field3, "aldfkja 17") 00180 self.assertEqual(msgs[0][1].field4.afield4, 82) 00181 00182 self.assertEqual(msgs[0][1].field1, 34) 00183 self.assertEqual(msgs[0][1].field2, struct.unpack('<f',struct.pack('<f',16.32))[0]) 00184 self.assertEqual(msgs[0][1].field3, "kljene") 00185 00186 00187 def test_migrated_implicit_gen1(self): 00188 self.do_test_migrated_implicit(1) 00189 00190 def test_migrated_implicit_gen2(self): 00191 self.do_test_migrated_implicit(2) 00192 00193 def test_migrated_implicit_gen3(self): 00194 self.do_test_migrated_implicit(3) 00195 00196 00197 00198 def do_test_migrated_mixed(self, N): 00199 tmp_rule_files = ['migrated_explicit_rules.bmr', 'migrated_mixed_rules.bmr', 'migrated_addsub_rules.bmr'] 00200 rule_files = ["%s/test/%s"%(self.pkg_dir,r) for r in tmp_rule_files] 00201 00202 inbag = "%s/test/migrated_mixed_gen%d.bag"%(self.pkg_dir,N) 00203 outbag = "%s/test/migrated_mixed_gen%d.fixed.bag"%(self.pkg_dir,N) 00204 00205 mm = rosbag.migration.MessageMigrator(rule_files, False) 00206 res = rosbag.migration.checkbag(mm, inbag) 00207 00208 self.assertTrue(not False in [m[1] == [] for m in res], 'Bag not ready to be migrated') 00209 res = rosbag.migration.fixbag(mm, inbag, outbag) 00210 self.assertTrue(res, 'Bag not converted successfully') 00211 00212 msgs = [msg for msg in rosbag.Bag(outbag).read_messages()] 00213 00214 self.assertTrue(len(msgs) > 0) 00215 00216 self.assertEqual(msgs[0][1].field1.field4.afield2, struct.unpack('<f',struct.pack('<f',58.2))[0]) 00217 self.assertEqual(msgs[0][1].field1.field4.combo_field3, "aldfkja 17") 00218 self.assertEqual(msgs[0][1].field1.field4.afield4, 82) 00219 00220 self.assertEqual(msgs[0][1].field1.field1, 34) 00221 self.assertEqual(msgs[0][1].field1.field2, struct.unpack('<f',struct.pack('<f',16.32))[0]) 00222 self.assertEqual(msgs[0][1].field1.field3, "kljene") 00223 00224 self.assertEqual(msgs[0][1].field2, 59) 00225 00226 def test_migrated_mixed_gen1(self): 00227 self.do_test_migrated_mixed(1) 00228 00229 def test_migrated_mixed_gen2(self): 00230 self.do_test_migrated_mixed(2) 00231 00232 def test_migrated_mixed_gen3(self): 00233 self.do_test_migrated_mixed(3) 00234 00235 00236 00237 def do_test_renamed(self, N): 00238 tmp_rule_files = ['migrated_explicit_rules.bmr', 'migrated_mixed_rules.bmr', 'migrated_addsub_rules.bmr', 'renamed_rules.bmr'] 00239 rule_files = ["%s/test/%s"%(self.pkg_dir,r) for r in tmp_rule_files] 00240 00241 inbag = "%s/test/renamed_gen%d.bag"%(self.pkg_dir,N) 00242 outbag = "%s/test/renamed_gen%d.fixed.bag"%(self.pkg_dir,N) 00243 00244 mm = rosbag.migration.MessageMigrator(rule_files, False) 00245 res = rosbag.migration.checkbag(mm, inbag 00246 ) 00247 self.assertTrue(not False in [m[1] == [] for m in res], 'Bag not ready to be migrated') 00248 res = rosbag.migration.fixbag(mm, inbag, outbag) 00249 self.assertTrue(res, 'Bag not converted successfully') 00250 00251 msgs = [msg for msg in rosbag.Bag(outbag).read_messages()] 00252 00253 self.assertTrue(len(msgs) > 0) 00254 00255 00256 self.assertEqual(msgs[0][1]._type, 'test_rosbag/Renamed4', 'Type name is wrong') 00257 self.assertEqual(msgs[0][1].foo, struct.unpack('<d',struct.pack('<d',2.17))[0]) 00258 self.assertEqual(msgs[0][1].bar, (8, 2, 5, 1)) 00259 00260 00261 def test_renamed_gen1(self): 00262 self.do_test_renamed(1) 00263 00264 def test_renamed_gen2(self): 00265 self.do_test_renamed(2) 00266 00267 def test_renamed_gen3(self): 00268 self.do_test_renamed(3) 00269 00270 00271 def do_test_converged(self, N): 00272 tmp_rule_files = ['migrated_explicit_rules.bmr', 'migrated_mixed_rules.bmr', 'migrated_addsub_rules.bmr', 'renamed_rules.bmr', 'simple_migrated_rules.bmr', 'converged_rules.bmr'] 00273 rule_files = ["%s/test/%s"%(self.pkg_dir,r) for r in tmp_rule_files] 00274 00275 inbag = "%s/test/converged_gen%d.bag"%(self.pkg_dir,N) 00276 outbag = "%s/test/converged_gen%d.fixed.bag"%(self.pkg_dir,N) 00277 00278 mm = rosbag.migration.MessageMigrator(rule_files, False) 00279 res = rosbag.migration.checkbag(mm, inbag) 00280 00281 self.assertTrue(not False in [m[1] == [] for m in res], 'Bag not ready to be migrated') 00282 res = rosbag.migration.fixbag(mm, inbag, outbag) 00283 self.assertTrue(res, 'Bag not converted successfully') 00284 00285 msgs = [msg for msg in rosbag.Bag(outbag).read_messages()] 00286 00287 self.assertTrue(len(msgs) > 0) 00288 00289 self.assertEqual(msgs[0][1]._type, 'test_rosbag/Converged', 'Type name is wrong') 00290 self.assertEqual(msgs[0][1].field1[0], struct.unpack('<f',struct.pack('<f',1.2))[0]) 00291 self.assertEqual(msgs[0][1].field1[1], struct.unpack('<f',struct.pack('<f',3.4))[0]) 00292 self.assertEqual(msgs[0][1].field1[2], struct.unpack('<f',struct.pack('<f',5.6))[0]) 00293 self.assertEqual(msgs[0][1].field1[3], struct.unpack('<f',struct.pack('<f',7.8))[0]) 00294 00295 self.assertEqual(msgs[0][1].field2[0].data, 11) 00296 self.assertEqual(msgs[0][1].field2[1].data, 22) 00297 self.assertEqual(msgs[0][1].field2[2].data, 33) 00298 self.assertEqual(msgs[0][1].field2[3].data, 44) 00299 00300 00301 def test_converged_gen1(self): 00302 self.do_test_converged(1) 00303 00304 def test_converged_gen2(self): 00305 self.do_test_converged(2) 00306 00307 def test_converged_gen3(self): 00308 self.do_test_converged(3) 00309 00310 00311 def do_test_convergent(self, N): 00312 tmp_rule_files = ['migrated_explicit_rules.bmr', 'migrated_mixed_rules.bmr', 'migrated_addsub_rules.bmr', 'renamed_rules.bmr', 'simple_migrated_rules.bmr', 'converged_rules.bmr'] 00313 rule_files = ["%s/test/%s"%(self.pkg_dir,r) for r in tmp_rule_files] 00314 00315 inbag = "%s/test/convergent_gen%d.bag"%(self.pkg_dir,N) 00316 outbag = "%s/test/convergent_gen%d.fixed.bag"%(self.pkg_dir,N) 00317 00318 mm = rosbag.migration.MessageMigrator(rule_files, False) 00319 res = rosbag.migration.checkbag(mm, inbag) 00320 00321 self.assertTrue(not False in [m[1] == [] for m in res], 'Bag not ready to be migrated') 00322 res = rosbag.migration.fixbag(mm, inbag, outbag) 00323 self.assertTrue(res, 'Bag not converted successfully') 00324 00325 msgs = [msg for msg in rosbag.Bag(outbag).read_messages()] 00326 00327 self.assertTrue(len(msgs) > 0) 00328 00329 self.assertEqual(msgs[0][1]._type, 'test_rosbag/Converged', 'Type name is wrong') 00330 self.assertEqual(msgs[0][1].field1[0], struct.unpack('<f',struct.pack('<f',1.2))[0]) 00331 self.assertEqual(msgs[0][1].field1[1], struct.unpack('<f',struct.pack('<f',3.4))[0]) 00332 self.assertEqual(msgs[0][1].field1[2], struct.unpack('<f',struct.pack('<f',5.6))[0]) 00333 self.assertEqual(msgs[0][1].field1[3], struct.unpack('<f',struct.pack('<f',7.8))[0]) 00334 00335 self.assertEqual(msgs[0][1].field2[0].data, 11) 00336 self.assertEqual(msgs[0][1].field2[1].data, 22) 00337 self.assertEqual(msgs[0][1].field2[2].data, 33) 00338 self.assertEqual(msgs[0][1].field2[3].data, 44) 00339 00340 00341 def test_convergent_gen1(self): 00342 self.do_test_convergent(1) 00343 00344 def test_convergent_gen2(self): 00345 self.do_test_convergent(2) 00346 00347 00348 00349 00350 def do_test_constants_no_rules(self, N): 00351 tmp_rule_files = [] 00352 rule_files = ["%s/test/%s"%(self.pkg_dir,r) for r in tmp_rule_files] 00353 00354 inbag = "%s/test/constants_gen%d.bag"%(self.pkg_dir,N) 00355 outbag = "%s/test/constants_gen%d.fixed.bag"%(self.pkg_dir,N) 00356 00357 mm = rosbag.migration.MessageMigrator(rule_files, False) 00358 res = rosbag.migration.checkbag(mm, inbag) 00359 00360 self.assertTrue(len(res[0][1]) == 1) 00361 self.assertTrue(not res[0][1][0].valid) 00362 self.assertEqual(res[0][1][0].old_class._md5sum, '06a34bda7d4ea2950ab952e89ca35d7a') 00363 self.assertEqual(res[0][1][0].new_class._md5sum, 'b45401c4d442c4da7b0a2a105075fa4a') 00364 00365 00366 def do_test_constants_rules(self, N): 00367 tmp_rule_files = ['constants.bmr'] 00368 rule_files = ["%s/test/%s"%(self.pkg_dir,r) for r in tmp_rule_files] 00369 00370 inbag = "%s/test/constants_gen%d.bag"%(self.pkg_dir,N) 00371 outbag = "%s/test/constants_gen%d.fixed.bag"%(self.pkg_dir,N) 00372 00373 mm = rosbag.migration.MessageMigrator(rule_files, False) 00374 res = rosbag.migration.checkbag(mm, inbag) 00375 00376 self.assertTrue(not False in [m[1] == [] for m in res], 'Bag not ready to be migrated') 00377 res = rosbag.migration.fixbag(mm, inbag, outbag) 00378 self.assertTrue(res, 'Bag not converted successfully') 00379 00380 msgs = [msg for msg in rosbag.Bag(outbag).read_messages()] 00381 00382 self.assertTrue(len(msgs) > 0) 00383 00384 self.assertEqual(msgs[0][1]._type, 'test_rosbag/Constants', 'Type name is wrong') 00385 self.assertEqual(msgs[0][1].value, msgs[0][1].CONSTANT) 00386 00387 00388 def test_constants_no_rules_gen1(self): 00389 self.do_test_constants_no_rules(1) 00390 00391 def test_constants_gen1(self): 00392 self.do_test_constants_rules(1) 00393 00394 def test_constants_gen2(self): 00395 self.do_test_constants_rules(2) 00396 00397 00398 if __name__ == '__main__': 00399 rostest.unitrun('test_rosbag', 'migration_test', MigrationTest, sys.argv)