00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 import roslib
00036 roslib.load_manifest('test_rosbag')
00037
00038 import sys
00039 import struct
00040
00041 import unittest
00042
00043 import rostest
00044 import rosbag
00045
00046 class MigrationTest(unittest.TestCase):
00047
00048 def setUp(self):
00049 self.pkg_dir = roslib.packages.get_pkg_dir("test_rosbag")
00050
00051 def test_unmigrated(self):
00052 tmp_rule_files = ['migrated_explicit_rules.bmr', 'migrated_mixed_rules.bmr', 'migrated_addsub_rules.bmr']
00053 rule_files = ["%s/test/%s"%(self.pkg_dir,r) for r in tmp_rule_files]
00054
00055 inbag = "%s/test/unmigrated_gen1.bag"%(self.pkg_dir,)
00056
00057 mm = rosbag.migration.MessageMigrator(rule_files,False)
00058 res = rosbag.migration.checkbag(mm, inbag)
00059
00060 self.assertTrue(len(res[0][1]) == 1)
00061 self.assertTrue(not res[0][1][0].valid)
00062 self.assertEqual(res[0][1][0].old_class._md5sum, '4b12e5ff694b0e2a31b2ea9e0bd900f4')
00063 self.assertEqual(res[0][1][0].new_class._md5sum, 'b5d640967dccef2a24697ec4b8a571ec')
00064
00065
00066 def test_subunmigrated(self):
00067 tmp_rule_files = ['migrated_explicit_rules.bmr', 'migrated_mixed_rules.bmr', 'migrated_addsub_rules.bmr']
00068 rule_files = ["%s/test/%s"%(self.pkg_dir,r) for r in tmp_rule_files]
00069
00070 inbag = "%s/test/subunmigrated_gen1.bag"%(self.pkg_dir,)
00071
00072 mm = rosbag.migration.MessageMigrator(rule_files, False)
00073 res = rosbag.migration.checkbag(mm, inbag)
00074
00075 self.assertTrue(len(res[0][1]) == 1)
00076 self.assertTrue(not res[0][1][0].valid)
00077 self.assertEqual(res[0][1][0].old_class._md5sum, '4b12e5ff694b0e2a31b2ea9e0bd900f4')
00078 self.assertEqual(res[0][1][0].new_class._md5sum, 'b5d640967dccef2a24697ec4b8a571ec')
00079
00080 def do_test_partially_migrated(self, N):
00081 tmp_rule_files = ['migrated_explicit_rules.bmr', 'migrated_mixed_rules.bmr', 'migrated_addsub_rules.bmr', 'partially_migrated_rules.bmr']
00082 rule_files = ["%s/test/%s"%(self.pkg_dir,r) for r in tmp_rule_files]
00083
00084 inbag = "%s/test/partially_migrated_gen%d.bag"%(self.pkg_dir,N)
00085
00086 mm = rosbag.migration.MessageMigrator(rule_files, False)
00087 res = rosbag.migration.checkbag(mm, inbag)
00088
00089 self.assertTrue(len(res[0][1]) == 1)
00090 self.assertTrue(not res[0][1][0].valid)
00091 self.assertEqual(res[0][1][0].old_class._md5sum, 'aba12af164ecf3f5cd150fb990205c4b')
00092 self.assertEqual(res[0][1][0].new_class._md5sum, 'b942bf4a41fb2bebc502889fd8981dfe')
00093
00094
00095 def test_partially_migrated_gen1(self):
00096 self.do_test_partially_migrated(1)
00097
00098 def test_partially_migrated_gen2(self):
00099 self.do_test_partially_migrated(2)
00100
00101 def test_partially_migrated_gen3(self):
00102 self.do_test_partially_migrated(3)
00103
00104
00105 def test_addsub(self):
00106 tmp_rule_files = ['migrated_explicit_rules.bmr', 'migrated_mixed_rules.bmr', 'migrated_addsub_rules.bmr']
00107 rule_files = ["%s/test/%s"%(self.pkg_dir,r) for r in tmp_rule_files]
00108
00109 inbag = "%s/test/migrated_addsub_gen1.bag"%(self.pkg_dir,)
00110 outbag = "%s/test/migrated_addsub_gen1.fixed.bag"%(self.pkg_dir,)
00111
00112 mm = rosbag.migration.MessageMigrator(rule_files, False)
00113 res = rosbag.migration.checkbag(mm, inbag)
00114
00115 self.assertTrue(not False in [m[1] == [] for m in res], 'Bag not ready to be migrated')
00116 res = rosbag.migration.fixbag(mm, inbag, outbag)
00117 self.assertTrue(res, 'Bag not converted successfully')
00118
00119 msgs = [msg for msg in rosbag.Bag(outbag).read_messages()]
00120
00121 self.assertTrue(len(msgs) > 0)
00122
00123 self.assertEqual(msgs[0][1].field1.field1, 42)
00124 self.assertEqual(msgs[0][1].field2.field1, 42)
00125
00126
00127 def do_test_migrated_explicit(self, N):
00128 tmp_rule_files = ['migrated_explicit_rules.bmr', 'migrated_mixed_rules.bmr', 'migrated_addsub_rules.bmr']
00129 rule_files = ["%s/test/%s"%(self.pkg_dir,r) for r in tmp_rule_files]
00130
00131 inbag = "%s/test/migrated_explicit_gen%d.bag"%(self.pkg_dir,N)
00132 outbag = "%s/test/migrated_explicit_gen%d.fixed.bag"%(self.pkg_dir,N)
00133
00134 mm = rosbag.migration.MessageMigrator(rule_files, False)
00135 res = rosbag.migration.checkbag(mm, inbag)
00136 self.assertTrue(not False in [m[1] == [] for m in res], 'Bag not ready to be migrated')
00137
00138 res = rosbag.migration.fixbag(mm, inbag, outbag)
00139 self.assertTrue(res, 'Bag not converted successfully')
00140
00141 msgs = [msg for msg in rosbag.Bag(outbag).read_messages()]
00142
00143 self.assertTrue(len(msgs) > 0)
00144
00145 self.assertEqual(msgs[0][1].afield2, struct.unpack('<f',struct.pack('<f',58.2))[0])
00146 self.assertEqual(msgs[0][1].combo_field3, "aldfkja 17")
00147 self.assertEqual(msgs[0][1].afield4, 82)
00148
00149 def test_migrated_explicit_gen1(self):
00150 self.do_test_migrated_explicit(1)
00151
00152 def test_migrated_explicit_gen2(self):
00153 self.do_test_migrated_explicit(2)
00154
00155 def test_migrated_explicit_gen3(self):
00156 self.do_test_migrated_explicit(3)
00157
00158
00159
00160 def do_test_migrated_implicit(self, N):
00161 tmp_rule_files = ['migrated_explicit_rules.bmr', 'migrated_mixed_rules.bmr', 'migrated_addsub_rules.bmr']
00162 rule_files = ["%s/test/%s"%(self.pkg_dir,r) for r in tmp_rule_files]
00163
00164 inbag = "%s/test/migrated_implicit_gen%d.bag"%(self.pkg_dir,N)
00165 outbag = "%s/test/migrated_implicit_gen%d.fixed.bag"%(self.pkg_dir,N)
00166
00167 mm = rosbag.migration.MessageMigrator(rule_files, False)
00168 res = rosbag.migration.checkbag(mm, inbag
00169 )
00170 self.assertTrue(not False in [m[1] == [] for m in res], 'Bag not ready to be migrated')
00171 res = rosbag.migration.fixbag(mm, inbag, outbag)
00172 self.assertTrue(res, 'Bag not converted successfully')
00173
00174 msgs = [msg for msg in rosbag.Bag(outbag).read_messages()]
00175
00176 self.assertTrue(len(msgs) > 0)
00177
00178 self.assertEqual(msgs[0][1].field4.afield2, struct.unpack('<f',struct.pack('<f',58.2))[0])
00179 self.assertEqual(msgs[0][1].field4.combo_field3, "aldfkja 17")
00180 self.assertEqual(msgs[0][1].field4.afield4, 82)
00181
00182 self.assertEqual(msgs[0][1].field1, 34)
00183 self.assertEqual(msgs[0][1].field2, struct.unpack('<f',struct.pack('<f',16.32))[0])
00184 self.assertEqual(msgs[0][1].field3, "kljene")
00185
00186
00187 def test_migrated_implicit_gen1(self):
00188 self.do_test_migrated_implicit(1)
00189
00190 def test_migrated_implicit_gen2(self):
00191 self.do_test_migrated_implicit(2)
00192
00193 def test_migrated_implicit_gen3(self):
00194 self.do_test_migrated_implicit(3)
00195
00196
00197
00198 def do_test_migrated_mixed(self, N):
00199 tmp_rule_files = ['migrated_explicit_rules.bmr', 'migrated_mixed_rules.bmr', 'migrated_addsub_rules.bmr']
00200 rule_files = ["%s/test/%s"%(self.pkg_dir,r) for r in tmp_rule_files]
00201
00202 inbag = "%s/test/migrated_mixed_gen%d.bag"%(self.pkg_dir,N)
00203 outbag = "%s/test/migrated_mixed_gen%d.fixed.bag"%(self.pkg_dir,N)
00204
00205 mm = rosbag.migration.MessageMigrator(rule_files, False)
00206 res = rosbag.migration.checkbag(mm, inbag)
00207
00208 self.assertTrue(not False in [m[1] == [] for m in res], 'Bag not ready to be migrated')
00209 res = rosbag.migration.fixbag(mm, inbag, outbag)
00210 self.assertTrue(res, 'Bag not converted successfully')
00211
00212 msgs = [msg for msg in rosbag.Bag(outbag).read_messages()]
00213
00214 self.assertTrue(len(msgs) > 0)
00215
00216 self.assertEqual(msgs[0][1].field1.field4.afield2, struct.unpack('<f',struct.pack('<f',58.2))[0])
00217 self.assertEqual(msgs[0][1].field1.field4.combo_field3, "aldfkja 17")
00218 self.assertEqual(msgs[0][1].field1.field4.afield4, 82)
00219
00220 self.assertEqual(msgs[0][1].field1.field1, 34)
00221 self.assertEqual(msgs[0][1].field1.field2, struct.unpack('<f',struct.pack('<f',16.32))[0])
00222 self.assertEqual(msgs[0][1].field1.field3, "kljene")
00223
00224 self.assertEqual(msgs[0][1].field2, 59)
00225
00226 def test_migrated_mixed_gen1(self):
00227 self.do_test_migrated_mixed(1)
00228
00229 def test_migrated_mixed_gen2(self):
00230 self.do_test_migrated_mixed(2)
00231
00232 def test_migrated_mixed_gen3(self):
00233 self.do_test_migrated_mixed(3)
00234
00235
00236
00237 def do_test_renamed(self, N):
00238 tmp_rule_files = ['migrated_explicit_rules.bmr', 'migrated_mixed_rules.bmr', 'migrated_addsub_rules.bmr', 'renamed_rules.bmr']
00239 rule_files = ["%s/test/%s"%(self.pkg_dir,r) for r in tmp_rule_files]
00240
00241 inbag = "%s/test/renamed_gen%d.bag"%(self.pkg_dir,N)
00242 outbag = "%s/test/renamed_gen%d.fixed.bag"%(self.pkg_dir,N)
00243
00244 mm = rosbag.migration.MessageMigrator(rule_files, False)
00245 res = rosbag.migration.checkbag(mm, inbag
00246 )
00247 self.assertTrue(not False in [m[1] == [] for m in res], 'Bag not ready to be migrated')
00248 res = rosbag.migration.fixbag(mm, inbag, outbag)
00249 self.assertTrue(res, 'Bag not converted successfully')
00250
00251 msgs = [msg for msg in rosbag.Bag(outbag).read_messages()]
00252
00253 self.assertTrue(len(msgs) > 0)
00254
00255
00256 self.assertEqual(msgs[0][1]._type, 'test_rosbag/Renamed4', 'Type name is wrong')
00257 self.assertEqual(msgs[0][1].foo, struct.unpack('<d',struct.pack('<d',2.17))[0])
00258 self.assertEqual(msgs[0][1].bar, (8, 2, 5, 1))
00259
00260
00261 def test_renamed_gen1(self):
00262 self.do_test_renamed(1)
00263
00264 def test_renamed_gen2(self):
00265 self.do_test_renamed(2)
00266
00267 def test_renamed_gen3(self):
00268 self.do_test_renamed(3)
00269
00270
00271 def do_test_converged(self, N):
00272 tmp_rule_files = ['migrated_explicit_rules.bmr', 'migrated_mixed_rules.bmr', 'migrated_addsub_rules.bmr', 'renamed_rules.bmr', 'simple_migrated_rules.bmr', 'converged_rules.bmr']
00273 rule_files = ["%s/test/%s"%(self.pkg_dir,r) for r in tmp_rule_files]
00274
00275 inbag = "%s/test/converged_gen%d.bag"%(self.pkg_dir,N)
00276 outbag = "%s/test/converged_gen%d.fixed.bag"%(self.pkg_dir,N)
00277
00278 mm = rosbag.migration.MessageMigrator(rule_files, False)
00279 res = rosbag.migration.checkbag(mm, inbag)
00280
00281 self.assertTrue(not False in [m[1] == [] for m in res], 'Bag not ready to be migrated')
00282 res = rosbag.migration.fixbag(mm, inbag, outbag)
00283 self.assertTrue(res, 'Bag not converted successfully')
00284
00285 msgs = [msg for msg in rosbag.Bag(outbag).read_messages()]
00286
00287 self.assertTrue(len(msgs) > 0)
00288
00289 self.assertEqual(msgs[0][1]._type, 'test_rosbag/Converged', 'Type name is wrong')
00290 self.assertEqual(msgs[0][1].field1[0], struct.unpack('<f',struct.pack('<f',1.2))[0])
00291 self.assertEqual(msgs[0][1].field1[1], struct.unpack('<f',struct.pack('<f',3.4))[0])
00292 self.assertEqual(msgs[0][1].field1[2], struct.unpack('<f',struct.pack('<f',5.6))[0])
00293 self.assertEqual(msgs[0][1].field1[3], struct.unpack('<f',struct.pack('<f',7.8))[0])
00294
00295 self.assertEqual(msgs[0][1].field2[0].data, 11)
00296 self.assertEqual(msgs[0][1].field2[1].data, 22)
00297 self.assertEqual(msgs[0][1].field2[2].data, 33)
00298 self.assertEqual(msgs[0][1].field2[3].data, 44)
00299
00300
00301 def test_converged_gen1(self):
00302 self.do_test_converged(1)
00303
00304 def test_converged_gen2(self):
00305 self.do_test_converged(2)
00306
00307 def test_converged_gen3(self):
00308 self.do_test_converged(3)
00309
00310
00311 def do_test_convergent(self, N):
00312 tmp_rule_files = ['migrated_explicit_rules.bmr', 'migrated_mixed_rules.bmr', 'migrated_addsub_rules.bmr', 'renamed_rules.bmr', 'simple_migrated_rules.bmr', 'converged_rules.bmr']
00313 rule_files = ["%s/test/%s"%(self.pkg_dir,r) for r in tmp_rule_files]
00314
00315 inbag = "%s/test/convergent_gen%d.bag"%(self.pkg_dir,N)
00316 outbag = "%s/test/convergent_gen%d.fixed.bag"%(self.pkg_dir,N)
00317
00318 mm = rosbag.migration.MessageMigrator(rule_files, False)
00319 res = rosbag.migration.checkbag(mm, inbag)
00320
00321 self.assertTrue(not False in [m[1] == [] for m in res], 'Bag not ready to be migrated')
00322 res = rosbag.migration.fixbag(mm, inbag, outbag)
00323 self.assertTrue(res, 'Bag not converted successfully')
00324
00325 msgs = [msg for msg in rosbag.Bag(outbag).read_messages()]
00326
00327 self.assertTrue(len(msgs) > 0)
00328
00329 self.assertEqual(msgs[0][1]._type, 'test_rosbag/Converged', 'Type name is wrong')
00330 self.assertEqual(msgs[0][1].field1[0], struct.unpack('<f',struct.pack('<f',1.2))[0])
00331 self.assertEqual(msgs[0][1].field1[1], struct.unpack('<f',struct.pack('<f',3.4))[0])
00332 self.assertEqual(msgs[0][1].field1[2], struct.unpack('<f',struct.pack('<f',5.6))[0])
00333 self.assertEqual(msgs[0][1].field1[3], struct.unpack('<f',struct.pack('<f',7.8))[0])
00334
00335 self.assertEqual(msgs[0][1].field2[0].data, 11)
00336 self.assertEqual(msgs[0][1].field2[1].data, 22)
00337 self.assertEqual(msgs[0][1].field2[2].data, 33)
00338 self.assertEqual(msgs[0][1].field2[3].data, 44)
00339
00340
00341 def test_convergent_gen1(self):
00342 self.do_test_convergent(1)
00343
00344 def test_convergent_gen2(self):
00345 self.do_test_convergent(2)
00346
00347
00348
00349
00350 def do_test_constants_no_rules(self, N):
00351 tmp_rule_files = []
00352 rule_files = ["%s/test/%s"%(self.pkg_dir,r) for r in tmp_rule_files]
00353
00354 inbag = "%s/test/constants_gen%d.bag"%(self.pkg_dir,N)
00355 outbag = "%s/test/constants_gen%d.fixed.bag"%(self.pkg_dir,N)
00356
00357 mm = rosbag.migration.MessageMigrator(rule_files, False)
00358 res = rosbag.migration.checkbag(mm, inbag)
00359
00360 self.assertTrue(len(res[0][1]) == 1)
00361 self.assertTrue(not res[0][1][0].valid)
00362 self.assertEqual(res[0][1][0].old_class._md5sum, '06a34bda7d4ea2950ab952e89ca35d7a')
00363 self.assertEqual(res[0][1][0].new_class._md5sum, 'b45401c4d442c4da7b0a2a105075fa4a')
00364
00365
00366 def do_test_constants_rules(self, N):
00367 tmp_rule_files = ['constants.bmr']
00368 rule_files = ["%s/test/%s"%(self.pkg_dir,r) for r in tmp_rule_files]
00369
00370 inbag = "%s/test/constants_gen%d.bag"%(self.pkg_dir,N)
00371 outbag = "%s/test/constants_gen%d.fixed.bag"%(self.pkg_dir,N)
00372
00373 mm = rosbag.migration.MessageMigrator(rule_files, False)
00374 res = rosbag.migration.checkbag(mm, inbag)
00375
00376 self.assertTrue(not False in [m[1] == [] for m in res], 'Bag not ready to be migrated')
00377 res = rosbag.migration.fixbag(mm, inbag, outbag)
00378 self.assertTrue(res, 'Bag not converted successfully')
00379
00380 msgs = [msg for msg in rosbag.Bag(outbag).read_messages()]
00381
00382 self.assertTrue(len(msgs) > 0)
00383
00384 self.assertEqual(msgs[0][1]._type, 'test_rosbag/Constants', 'Type name is wrong')
00385 self.assertEqual(msgs[0][1].value, msgs[0][1].CONSTANT)
00386
00387
00388 def test_constants_no_rules_gen1(self):
00389 self.do_test_constants_no_rules(1)
00390
00391 def test_constants_gen1(self):
00392 self.do_test_constants_rules(1)
00393
00394 def test_constants_gen2(self):
00395 self.do_test_constants_rules(2)
00396
00397
00398 if __name__ == '__main__':
00399 rostest.unitrun('test_rosbag', 'migration_test', MigrationTest, sys.argv)