00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 import roslib
00036 roslib.load_manifest('test_common_msgs')
00037
00038 import sys
00039 import struct
00040
00041 import unittest
00042
00043 import rostest
00044 import rosbag
00045 import rosbagmigration
00046
00047 import re
00048 from cStringIO import StringIO
00049 import os
00050
00051 import rospy
00052
00053 import math
00054
00055 def quaternion_from_euler(x, y, z):
00056 x /= 2.0
00057 y /= 2.0
00058 z /= 2.0
00059 ci = math.cos(x)
00060 si = math.sin(x)
00061 cj = math.cos(y)
00062 sj = math.sin(y)
00063 ck = math.cos(z)
00064 sk = math.sin(z)
00065 cc = ci*ck
00066 cs = ci*sk
00067 sc = si*ck
00068 ss = si*sk
00069
00070 quaternion = [0.0,0.0,0.0,0.0]
00071
00072 quaternion[0] = cj*sc - sj*cs
00073 quaternion[1] = cj*ss + sj*cc
00074 quaternion[2] = cj*cs - sj*sc
00075 quaternion[3] = cj*cc + sj*ss
00076
00077 return quaternion
00078
00079 migrator = rosbagmigration.MessageMigrator()
00080
00081 identity6x6 = [1.0] + 6*[0] + [1.0] + 6*[0] + [1.0] + 6*[0] + [1.0] + 6*[0] + [1.0] + 6*[0] + [1.0]
00082
00083
00084 def repack(x):
00085 return struct.unpack('<f',struct.pack('<f',x))[0]
00086
00087 class TestCommonMsgsMigration(unittest.TestCase):
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126 def get_old_cam_info(self):
00127 cam_info_classes = self.load_saved_classes('CamInfo.saved')
00128
00129 cam_info = cam_info_classes['image_msgs/CamInfo']
00130
00131 return cam_info(None,
00132 480, 640,
00133 (-0.45795000000000002, 0.29532999999999998, 0.0, 0.0, 0.0),
00134 (734.37707999999998, 0.0, 343.25992000000002,
00135 0.0, 734.37707999999998, 229.65119999999999,
00136 0.0, 0.0, 1.0),
00137 (0.99997999999999998, 0.0012800000000000001, -0.0057400000000000003,
00138 -0.0012700000000000001, 1.0, 0.00148,
00139 0.0057400000000000003, -0.00147, 0.99997999999999998),
00140 (722.28197999999998, 0.0, 309.70123000000001, 0.0,
00141 0.0, 722.28197999999998, 240.53899000000001, 0.0,
00142 0.0, 0.0, 1.0, 0.0))
00143
00144 def get_new_cam_info(self):
00145 from sensor_msgs.msg import CameraInfo
00146 from sensor_msgs.msg import RegionOfInterest
00147
00148 return CameraInfo(None,
00149 480, 640,
00150 "plumb_bob",
00151 (-0.45795000000000002, 0.29532999999999998, 0.0, 0.0, 0.0),
00152 (734.37707999999998, 0.0, 343.25992000000002,
00153 0.0, 734.37707999999998, 229.65119999999999,
00154 0.0, 0.0, 1.0),
00155 (0.99997999999999998, 0.0012800000000000001, -0.0057400000000000003,
00156 -0.0012700000000000001, 1.0, 0.00148,
00157 0.0057400000000000003, -0.00147, 0.99997999999999998),
00158 (722.28197999999998, 0.0, 309.70123000000001, 0.0,
00159 0.0, 722.28197999999998, 240.53899000000001, 0.0,
00160 0.0, 0.0, 1.0, 0.0),
00161 1, 1,
00162 RegionOfInterest(0,0,480,640,False))
00163
00164 def test_cam_info(self):
00165 self.do_test('cam_info', self.get_old_cam_info, self.get_new_cam_info)
00166
00167
00168
00169
00170
00171
00172 def get_old_camera_info(self):
00173 camera_info_classes = self.load_saved_classes('CameraInfo.saved')
00174
00175 camera_info = camera_info_classes['sensor_msgs/CameraInfo']
00176 region_of_interest = camera_info_classes['sensor_msgs/RegionOfInterest']
00177
00178 return camera_info(None,
00179 480, 640,
00180 region_of_interest(0,0,480,640),
00181 (-0.45795000000000002, 0.29532999999999998, 0.0, 0.0, 0.0),
00182 (734.37707999999998, 0.0, 343.25992000000002,
00183 0.0, 734.37707999999998, 229.65119999999999,
00184 0.0, 0.0, 1.0),
00185 (0.99997999999999998, 0.0012800000000000001, -0.0057400000000000003,
00186 -0.0012700000000000001, 1.0, 0.00148,
00187 0.0057400000000000003, -0.00147, 0.99997999999999998),
00188 (722.28197999999998, 0.0, 309.70123000000001, 0.0,
00189 0.0, 722.28197999999998, 240.53899000000001, 0.0,
00190 0.0, 0.0, 1.0, 0.0))
00191
00192 def get_new_camera_info(self):
00193 from sensor_msgs.msg import CameraInfo
00194 from sensor_msgs.msg import RegionOfInterest
00195
00196 return CameraInfo(None,
00197 480, 640,
00198 "plumb_bob",
00199 (-0.45795000000000002, 0.29532999999999998, 0.0, 0.0, 0.0),
00200 (734.37707999999998, 0.0, 343.25992000000002,
00201 0.0, 734.37707999999998, 229.65119999999999,
00202 0.0, 0.0, 1.0),
00203 (0.99997999999999998, 0.0012800000000000001, -0.0057400000000000003,
00204 -0.0012700000000000001, 1.0, 0.00148,
00205 0.0057400000000000003, -0.00147, 0.99997999999999998),
00206 (722.28197999999998, 0.0, 309.70123000000001, 0.0,
00207 0.0, 722.28197999999998, 240.53899000000001, 0.0,
00208 0.0, 0.0, 1.0, 0.0),
00209 1, 1,
00210 RegionOfInterest(0,0,480,640,False))
00211
00212 def test_camera_info(self):
00213 self.do_test('camera_info', self.get_old_camera_info, self.get_new_camera_info)
00214
00215
00216
00217
00218
00219
00220 def get_old_compressed_image(self):
00221 compressed_image_classes = self.load_saved_classes('CompressedImage.saved')
00222
00223 compressed_image = compressed_image_classes['sensor_msgs/CompressedImage']
00224
00225 multi_array_layout = compressed_image_classes['std_msgs/MultiArrayLayout']
00226 multi_array_dimension = compressed_image_classes['std_msgs/MultiArrayDimension']
00227 uint8_multi_array = compressed_image_classes['std_msgs/UInt8MultiArray']
00228
00229 import random
00230 r = random.Random(1234)
00231
00232
00233 return compressed_image(None,
00234 'image',
00235 'mono',
00236 'jpeg',
00237 uint8_multi_array(multi_array_layout([multi_array_dimension('data', 1000, 1000)], 0),
00238 [r.randint(0,255) for x in xrange(0,1000)]))
00239
00240 def get_new_compressed_image(self):
00241 from sensor_msgs.msg import CompressedImage
00242
00243 import random
00244 r = random.Random(1234)
00245
00246 return CompressedImage(None,
00247 "jpeg",
00248 [r.randint(0,255) for x in xrange(0,1000)])
00249
00250 def test_compressed_image(self):
00251 self.do_test('compressed_image', self.get_old_compressed_image, self.get_new_compressed_image)
00252
00253
00254
00255
00256
00257
00258 def get_old_mono_image(self):
00259 image_classes = self.load_saved_classes('Image.saved')
00260
00261 image = image_classes['image_msgs/Image']
00262
00263 multi_array_layout = image_classes['std_msgs/MultiArrayLayout']
00264 multi_array_dimension = image_classes['std_msgs/MultiArrayDimension']
00265
00266 uint8_multi_array = image_classes['std_msgs/UInt8MultiArray']
00267 int8_multi_array = image_classes['std_msgs/Int8MultiArray']
00268 uint16_multi_array = image_classes['std_msgs/UInt16MultiArray']
00269 int16_multi_array = image_classes['std_msgs/Int16MultiArray']
00270 uint32_multi_array = image_classes['std_msgs/UInt32MultiArray']
00271 int32_multi_array = image_classes['std_msgs/Int32MultiArray']
00272 uint64_multi_array = image_classes['std_msgs/UInt64MultiArray']
00273 int64_multi_array = image_classes['std_msgs/Int64MultiArray']
00274
00275 float32_multi_array = image_classes['std_msgs/Float32MultiArray']
00276 float64_multi_array = image_classes['std_msgs/Float64MultiArray']
00277
00278 import random
00279 r = random.Random(1234)
00280
00281 return image(None,
00282 'image',
00283 'mono',
00284 'uint8',
00285 uint8_multi_array(multi_array_layout([multi_array_dimension('height', 480, 307200),
00286 multi_array_dimension('width', 640, 640),
00287 multi_array_dimension('channel', 1, 1)
00288 ], 0),
00289 [r.randint(0,255) for x in xrange(0,307200)]),
00290 int8_multi_array(),
00291 uint16_multi_array(),
00292 int16_multi_array(),
00293 uint32_multi_array(),
00294 int32_multi_array(),
00295 uint64_multi_array(),
00296 int64_multi_array(),
00297 float32_multi_array(),
00298 float64_multi_array())
00299
00300 def get_new_mono_image(self):
00301 from sensor_msgs.msg import Image
00302
00303 import random
00304 r = random.Random(1234)
00305
00306 return Image(None,
00307 480,
00308 640,
00309 'mono8',
00310 0,
00311 640,
00312 [r.randint(0,255) for x in xrange(0,307200)])
00313
00314 def test_mono_image(self):
00315 self.do_test('mono_image', self.get_old_mono_image, self.get_new_mono_image)
00316
00317
00318 def get_old_rgb_image(self):
00319 image_classes = self.load_saved_classes('Image.saved')
00320
00321 image = image_classes['image_msgs/Image']
00322
00323 multi_array_layout = image_classes['std_msgs/MultiArrayLayout']
00324 multi_array_dimension = image_classes['std_msgs/MultiArrayDimension']
00325
00326 uint8_multi_array = image_classes['std_msgs/UInt8MultiArray']
00327 int8_multi_array = image_classes['std_msgs/Int8MultiArray']
00328 uint16_multi_array = image_classes['std_msgs/UInt16MultiArray']
00329 int16_multi_array = image_classes['std_msgs/Int16MultiArray']
00330 uint32_multi_array = image_classes['std_msgs/UInt32MultiArray']
00331 int32_multi_array = image_classes['std_msgs/Int32MultiArray']
00332 uint64_multi_array = image_classes['std_msgs/UInt64MultiArray']
00333 int64_multi_array = image_classes['std_msgs/Int64MultiArray']
00334
00335 float32_multi_array = image_classes['std_msgs/Float32MultiArray']
00336 float64_multi_array = image_classes['std_msgs/Float64MultiArray']
00337
00338 import random
00339 r = random.Random(1234)
00340
00341 return image(None,
00342 'image',
00343 'rgb',
00344 'uint8',
00345 uint8_multi_array(multi_array_layout([multi_array_dimension('height', 480, 921600),
00346 multi_array_dimension('width', 640, 1920),
00347 multi_array_dimension('channel', 3, 3)
00348 ], 0),
00349 [r.randint(0,255) for x in xrange(0,921600)]),
00350 int8_multi_array(),
00351 uint16_multi_array(),
00352 int16_multi_array(),
00353 uint32_multi_array(),
00354 int32_multi_array(),
00355 uint64_multi_array(),
00356 int64_multi_array(),
00357 float32_multi_array(),
00358 float64_multi_array())
00359
00360 def get_new_rgb_image(self):
00361 from sensor_msgs.msg import Image
00362
00363 import random
00364 r = random.Random(1234)
00365
00366 return Image(None,
00367 480,
00368 640,
00369 'rgb8',
00370 0,
00371 1920,
00372 [r.randint(0,255) for x in xrange(0,921600)])
00373
00374 def test_rgb_image(self):
00375 self.do_test('rgb_image', self.get_old_rgb_image, self.get_new_rgb_image)
00376
00377
00378
00379
00380
00381 def get_old_robot_base_2d_odom(self):
00382 robot_base_2d_odom_classes = self.load_saved_classes('RobotBase2DOdom.saved')
00383
00384 robot_base_2d_odom = robot_base_2d_odom_classes['deprecated_msgs/RobotBase2DOdom']
00385 pose_2d_float32 = robot_base_2d_odom_classes['deprecated_msgs/Pose2DFloat32']
00386
00387 return robot_base_2d_odom(None, pose_2d_float32(3.33, 2.22, 1.11), pose_2d_float32(.1,.2,.3), 0, 1)
00388
00389 def get_new_robot_base_2d_odom(self):
00390 from nav_msgs.msg import Odometry
00391 from geometry_msgs.msg import PoseWithCovariance
00392 from geometry_msgs.msg import Pose
00393 from geometry_msgs.msg import Point
00394 from geometry_msgs.msg import Quaternion
00395 from geometry_msgs.msg import TwistWithCovariance
00396 from geometry_msgs.msg import Twist
00397 from geometry_msgs.msg import Vector3
00398
00399
00400 p = PoseWithCovariance(Pose(Point(repack(3.33), repack(2.22), 0), apply(Quaternion,quaternion_from_euler(0,0,repack(1.11)))), 36*[0.])
00401 t = TwistWithCovariance(Twist(Vector3(repack(.1), repack(.2), 0), Vector3(0, 0, repack(.3))), 36*[0.])
00402
00403 return Odometry(None, 'base_footprint', p, t)
00404
00405 def test_robot_base_2d_odom(self):
00406 self.do_test('robot_base_2d_odom', self.get_old_robot_base_2d_odom, self.get_new_robot_base_2d_odom)
00407
00408
00409
00410
00411
00412
00413 def get_old_occupancy_grid(self):
00414 occupancy_grid_classes = self.load_saved_classes('OccGrid.saved')
00415
00416 occupancy_grid = occupancy_grid_classes['robot_msgs/OccGrid']
00417 map_meta_data = occupancy_grid_classes['robot_msgs/MapMetaData']
00418 pose = occupancy_grid_classes['robot_msgs/Pose']
00419 point = occupancy_grid_classes['robot_msgs/Point']
00420 quaternion = occupancy_grid_classes['robot_msgs/Quaternion']
00421
00422 import random
00423 r = random.Random(1234)
00424
00425 return occupancy_grid(map_meta_data(rospy.Time(123,456), 0.1, 100, 100, pose(point(1.23, 4.56, 7.89), quaternion(0,0,0,1))), [r.randint(-1,100) for x in xrange(0,1000)])
00426
00427 def get_new_occupancy_grid(self):
00428 from nav_msgs.msg import OccupancyGrid
00429 from nav_msgs.msg import MapMetaData
00430 from roslib.msg import Header
00431 from geometry_msgs.msg import Pose
00432 from geometry_msgs.msg import Point
00433 from geometry_msgs.msg import Quaternion
00434
00435 import random
00436 r = random.Random(1234)
00437
00438 return OccupancyGrid(Header(0, rospy.Time(0,0), "/map"), MapMetaData(rospy.Time(123,456), 0.1, 100, 100, Pose(Point(1.23, 4.56, 7.89), Quaternion(0,0,0,1))), [r.randint(-1,100) for x in xrange(0,1000)])
00439
00440 def test_occupancy_grid(self):
00441 self.do_test('occupancy_grid', self.get_old_occupancy_grid, self.get_new_occupancy_grid)
00442
00443
00444
00445
00446 def get_old_polygon3d(self):
00447 polygon3d_classes = self.load_saved_classes('Polygon3D.saved')
00448
00449 polygon3d = polygon3d_classes['robot_msgs/Polygon3D']
00450 point32 = polygon3d_classes['robot_msgs/Point32']
00451 color_rgba = polygon3d_classes['std_msgs/ColorRGBA']
00452
00453 return polygon3d([point32(1.23, 4.56, 7.89), point32(82.861, 31.028, 93.317), point32(87.569, 29.085, 33.415)], color_rgba(255, 100, 100, 0))
00454
00455 def get_new_polygon3d(self):
00456 from geometry_msgs.msg import Polygon
00457 from geometry_msgs.msg import Point32
00458
00459 return Polygon([Point32(1.23, 4.56, 7.89), Point32(82.861, 31.028, 93.317), Point32(87.569, 29.085, 33.415)])
00460
00461 def test_polygon3d(self):
00462 self.do_test('polygon3d', self.get_old_polygon3d, self.get_new_polygon3d)
00463
00464
00465
00466
00467 def get_old_acceleration(self):
00468 acceleration_classes = self.load_saved_classes('Acceleration.saved')
00469
00470 acceleration = acceleration_classes['robot_msgs/Acceleration']
00471
00472 return acceleration(1.23, 4.56, 7.89)
00473
00474 def get_new_acceleration(self):
00475 from geometry_msgs.msg import Vector3
00476
00477 return Vector3(1.23, 4.56, 7.89)
00478
00479 def test_acceleration(self):
00480 self.do_test('acceleration', self.get_old_acceleration, self.get_new_acceleration)
00481
00482
00483
00484
00485
00486 def get_old_angular_acceleration(self):
00487 angular_acceleration_classes = self.load_saved_classes('AngularAcceleration.saved')
00488
00489 angular_acceleration = angular_acceleration_classes['robot_msgs/AngularAcceleration']
00490
00491 return angular_acceleration(1.23, 4.56, 7.89)
00492
00493 def get_new_angular_acceleration(self):
00494 from geometry_msgs.msg import Vector3
00495
00496 return Vector3(1.23, 4.56, 7.89)
00497
00498 def test_angular_acceleration(self):
00499 self.do_test('angular_acceleration', self.get_old_angular_acceleration, self.get_new_angular_acceleration)
00500
00501
00502
00503
00504 def get_old_diagnostic_value(self):
00505 diagnostic_value_classes = self.load_saved_classes('DiagnosticValue.saved')
00506
00507 diagnostic_value = diagnostic_value_classes['diagnostic_msgs/DiagnosticValue']
00508
00509 return diagnostic_value(42.42, 'foo')
00510
00511 def get_new_diagnostic_value(self):
00512 from diagnostic_msgs.msg import KeyValue
00513
00514 return KeyValue('foo', str(repack(42.42)))
00515
00516
00517 def test_diagnostic_value(self):
00518 self.do_test('diagnostic_value', self.get_old_diagnostic_value, self.get_new_diagnostic_value)
00519
00520
00521
00522
00523 def get_old_diagnostic_string(self):
00524 diagnostic_string_classes = self.load_saved_classes('DiagnosticString.saved')
00525
00526 diagnostic_string = diagnostic_string_classes['diagnostic_msgs/DiagnosticString']
00527
00528 return diagnostic_string('xxxxx', 'bar')
00529
00530 def get_new_diagnostic_string(self):
00531 from diagnostic_msgs.msg import KeyValue
00532
00533 return KeyValue('bar', 'xxxxx')
00534
00535
00536 def test_diagnostic_string(self):
00537 self.do_test('diagnostic_string', self.get_old_diagnostic_string, self.get_new_diagnostic_string)
00538
00539
00540
00541
00542 def get_old_diagnostic_status(self):
00543 diagnostic_status_classes = self.load_saved_classes('DiagnosticStatus.saved')
00544
00545 diagnostic_status = diagnostic_status_classes['diagnostic_msgs/DiagnosticStatus']
00546 diagnostic_value = diagnostic_status_classes['diagnostic_msgs/DiagnosticValue']
00547 diagnostic_string = diagnostic_status_classes['diagnostic_msgs/DiagnosticString']
00548
00549 return diagnostic_status(0, "abcdef", "ghijkl", [diagnostic_value(42.42, 'foo')], [diagnostic_string('xxxxx', 'bar')])
00550
00551 def get_new_diagnostic_status(self):
00552 from diagnostic_msgs.msg import DiagnosticStatus
00553 from diagnostic_msgs.msg import KeyValue
00554
00555 return DiagnosticStatus(0, "abcdef", "ghijkl", "NONE", [KeyValue('foo', str(repack(42.42))), KeyValue('bar', 'xxxxx')])
00556
00557
00558 def test_diagnostic_status(self):
00559 self.do_test('diagnostic_status', self.get_old_diagnostic_status, self.get_new_diagnostic_status)
00560
00561
00562
00563
00564 def get_old_diagnostic_message(self):
00565 diagnostic_message_classes = self.load_saved_classes('DiagnosticMessage.saved')
00566
00567 diagnostic_message = diagnostic_message_classes['diagnostic_msgs/DiagnosticMessage']
00568 diagnostic_status = diagnostic_message_classes['diagnostic_msgs/DiagnosticStatus']
00569 diagnostic_value = diagnostic_message_classes['diagnostic_msgs/DiagnosticValue']
00570 diagnostic_string = diagnostic_message_classes['diagnostic_msgs/DiagnosticString']
00571
00572 msg = diagnostic_message(None, [])
00573 msg.status.append(diagnostic_status(0, "abcdef", "ghijkl", [diagnostic_value(12.34, 'abc')], [diagnostic_string('ghbvf', 'jkl')]))
00574 msg.status.append(diagnostic_status(0, "mnop", "qrst", [diagnostic_value(56.78, 'def')], [diagnostic_string('klmnh', 'mno')]))
00575 msg.status.append(diagnostic_status(0, "uvw", "xyz", [diagnostic_value(90.12, 'ghi')], [diagnostic_string('erfcd', 'pqr')]))
00576
00577 return msg
00578
00579 def get_new_diagnostic_message(self):
00580 from diagnostic_msgs.msg import DiagnosticArray
00581 from diagnostic_msgs.msg import DiagnosticStatus
00582 from diagnostic_msgs.msg import KeyValue
00583
00584 msg = DiagnosticArray(None, [])
00585 msg.status.append(DiagnosticStatus(0, "abcdef", "ghijkl", "NONE", [KeyValue('abc', str(repack(12.34))), KeyValue('jkl', 'ghbvf')]))
00586 msg.status.append(DiagnosticStatus(0, "mnop", "qrst", "NONE", [KeyValue('def', str(repack(56.78))), KeyValue('mno', 'klmnh')]))
00587 msg.status.append(DiagnosticStatus(0, "uvw", "xyz", "NONE", [KeyValue('ghi', str(repack(90.12))), KeyValue('pqr', 'erfcd')]))
00588
00589 return msg
00590
00591
00592 def test_diagnostic_message(self):
00593 self.do_test('diagnostic_message', self.get_old_diagnostic_message, self.get_new_diagnostic_message)
00594
00595
00596
00597
00598
00599
00600 def get_old_vector3(self):
00601 vector3_classes = self.load_saved_classes('Vector3.saved')
00602
00603 vector3 = vector3_classes['robot_msgs/Vector3']
00604
00605 return vector3(1.23, 4.56, 7.89)
00606
00607 def get_new_vector3(self):
00608 from geometry_msgs.msg import Vector3
00609
00610 return Vector3(1.23, 4.56, 7.89)
00611
00612
00613 def test_vector3(self):
00614 self.do_test('vector3', self.get_old_vector3, self.get_new_vector3)
00615
00616
00617
00618
00619
00620 def get_old_vector3_stamped(self):
00621 vector3_classes = self.load_saved_classes('Vector3.saved')
00622 vector3_stamped_classes = self.load_saved_classes('Vector3Stamped.saved')
00623
00624 vector3 = vector3_classes['robot_msgs/Vector3']
00625 vector3_stamped = vector3_stamped_classes['robot_msgs/Vector3Stamped']
00626
00627 return vector3_stamped(None, vector3(1.23, 4.56, 7.89))
00628
00629 def get_new_vector3_stamped(self):
00630 from geometry_msgs.msg import Vector3
00631 from geometry_msgs.msg import Vector3Stamped
00632
00633 return Vector3Stamped(None, Vector3(1.23, 4.56, 7.89))
00634
00635
00636 def test_vector3_stamped(self):
00637 self.do_test('vector3_stamped', self.get_old_vector3_stamped, self.get_new_vector3_stamped)
00638
00639
00640
00641
00642
00643 def get_old_quaternion(self):
00644 quaternion_classes = self.load_saved_classes('Quaternion.saved')
00645
00646 quaternion = quaternion_classes['robot_msgs/Quaternion']
00647
00648 return quaternion(1.23, 4.56, 7.89, 1.23)
00649
00650 def get_new_quaternion(self):
00651 from geometry_msgs.msg import Quaternion
00652
00653 return Quaternion(1.23, 4.56, 7.89, 1.23)
00654
00655
00656 def test_quaternion(self):
00657 self.do_test('quaternion', self.get_old_quaternion, self.get_new_quaternion)
00658
00659
00660
00661
00662
00663 def get_old_quaternion_stamped(self):
00664 quaternion_classes = self.load_saved_classes('QuaternionStamped.saved')
00665
00666 quaternion_stamped = quaternion_classes['robot_msgs/QuaternionStamped']
00667 quaternion = quaternion_classes['robot_msgs/Quaternion']
00668
00669 return quaternion_stamped(None, quaternion(1.23, 4.56, 7.89, 1.23))
00670
00671 def get_new_quaternion_stamped(self):
00672 from geometry_msgs.msg import QuaternionStamped
00673 from geometry_msgs.msg import Quaternion
00674
00675 return QuaternionStamped(None, Quaternion(1.23, 4.56, 7.89, 1.23))
00676
00677
00678 def test_quaternion_stamped(self):
00679 self.do_test('quaternion_stamped', self.get_old_quaternion_stamped, self.get_new_quaternion_stamped)
00680
00681
00682
00683
00684
00685 def get_old_point(self):
00686 point_classes = self.load_saved_classes('Point.saved')
00687
00688 point = point_classes['robot_msgs/Point']
00689
00690 return point(1.23, 4.56, 7.89)
00691
00692 def get_new_point(self):
00693 from geometry_msgs.msg import Point
00694
00695 return Point(1.23, 4.56, 7.89)
00696
00697
00698 def test_point(self):
00699 self.do_test('point', self.get_old_point, self.get_new_point)
00700
00701
00702
00703
00704
00705 def get_old_point_stamped(self):
00706 point_classes = self.load_saved_classes('PointStamped.saved')
00707
00708 point_stamped = point_classes['robot_msgs/PointStamped']
00709 point = point_classes['robot_msgs/Point']
00710
00711 return point_stamped(None, point(1.23, 4.56, 7.89))
00712
00713 def get_new_point_stamped(self):
00714 from geometry_msgs.msg import PointStamped
00715 from geometry_msgs.msg import Point
00716
00717 return PointStamped(None, Point(1.23, 4.56, 7.89))
00718
00719
00720 def test_point_stamped(self):
00721 self.do_test('point_stamped', self.get_old_point_stamped, self.get_new_point_stamped)
00722
00723
00724
00725
00726
00727 def get_old_point32(self):
00728 point32_classes = self.load_saved_classes('Point32.saved')
00729
00730 point32 = point32_classes['robot_msgs/Point32']
00731
00732 return point32(1.23, 4.56, 7.89)
00733
00734 def get_new_point32(self):
00735 from geometry_msgs.msg import Point32
00736
00737 return Point32(1.23, 4.56, 7.89)
00738
00739
00740 def test_point32(self):
00741 self.do_test('point32', self.get_old_point32, self.get_new_point32)
00742
00743
00744
00745
00746
00747 def get_old_transform(self):
00748 transform_classes = self.load_saved_classes('Transform.saved')
00749
00750 transform = transform_classes['robot_msgs/Transform']
00751 vector3 = transform_classes['robot_msgs/Vector3']
00752 quaternion = transform_classes['robot_msgs/Quaternion']
00753
00754 return transform(vector3(1.23, 4.56, 7.89), quaternion(0,0,0,1))
00755
00756 def get_new_transform(self):
00757 from geometry_msgs.msg import Transform
00758 from geometry_msgs.msg import Vector3
00759 from geometry_msgs.msg import Quaternion
00760
00761 return Transform(Vector3(1.23, 4.56, 7.89), Quaternion(0,0,0,1))
00762
00763
00764 def test_transform(self):
00765 self.do_test('transform', self.get_old_transform, self.get_new_transform)
00766
00767
00768
00769
00770
00771 def get_old_transform_stamped(self):
00772 transform_classes = self.load_saved_classes('TransformStamped.saved')
00773
00774 transform_stamped = transform_classes['robot_msgs/TransformStamped']
00775 transform = transform_classes['robot_msgs/Transform']
00776 vector3 = transform_classes['robot_msgs/Vector3']
00777 quaternion = transform_classes['robot_msgs/Quaternion']
00778
00779 ts = transform_stamped(None, "parent_frame", transform(vector3(1.23, 4.56, 7.89), quaternion(0,0,0,1)))
00780 ts.header.frame_id = "child_frame"
00781 return ts
00782
00783 def get_old_transform_stamped_parent_id(self):
00784 transform_classes = self.load_saved_classes('TransformStamped_parent_id.saved')
00785
00786 transform_stamped = transform_classes['geometry_msgs/TransformStamped']
00787 transform = transform_classes['geometry_msgs/Transform']
00788 vector3 = transform_classes['geometry_msgs/Vector3']
00789 quaternion = transform_classes['geometry_msgs/Quaternion']
00790 header = transform_classes['std_msgs/Header']
00791
00792 ts = transform_stamped(None, "parent_frame", transform(vector3(1.23, 4.56, 7.89), quaternion(0,0,0,1)))
00793 ts.header.frame_id = "child_frame"
00794 return ts
00795
00796 def get_new_transform_stamped(self):
00797 from geometry_msgs.msg import TransformStamped
00798 from geometry_msgs.msg import Transform
00799 from geometry_msgs.msg import Vector3
00800 from geometry_msgs.msg import Quaternion
00801 ts = TransformStamped(None, "child_frame", Transform(Vector3(1.23, 4.56, 7.89), Quaternion(0,0,0,1)))
00802 ts.header.frame_id = "parent_frame"
00803 return ts
00804
00805 def test_transform_stamped(self):
00806 self.do_test('transform_stamped', self.get_old_transform_stamped, self.get_new_transform_stamped)
00807
00808 def test_transform_stamped_parent_id(self):
00809 self.do_test('transform_stamped_parent_id', self.get_old_transform_stamped_parent_id, self.get_new_transform_stamped)
00810
00811
00812
00813
00814
00815 def get_old_pose(self):
00816 pose_classes = self.load_saved_classes('Pose.saved')
00817
00818 pose = pose_classes['robot_msgs/Pose']
00819 point = pose_classes['robot_msgs/Point']
00820 quaternion = pose_classes['robot_msgs/Quaternion']
00821
00822 return pose(point(1.23, 4.56, 7.89), quaternion(0,0,0,1))
00823
00824 def get_new_pose(self):
00825 from geometry_msgs.msg import Pose
00826 from geometry_msgs.msg import Point
00827 from geometry_msgs.msg import Quaternion
00828
00829 return Pose(Point(1.23, 4.56, 7.89), Quaternion(0,0,0,1))
00830
00831
00832 def test_pose(self):
00833 self.do_test('pose', self.get_old_pose, self.get_new_pose)
00834
00835
00836
00837
00838
00839
00840 def get_old_pose_stamped(self):
00841 pose_classes = self.load_saved_classes('PoseStamped.saved')
00842
00843 pose_stamped = pose_classes['robot_msgs/PoseStamped']
00844 pose = pose_classes['robot_msgs/Pose']
00845 point = pose_classes['robot_msgs/Point']
00846 quaternion = pose_classes['robot_msgs/Quaternion']
00847
00848 return pose_stamped(None, pose(point(1.23, 4.56, 7.89), quaternion(0,0,0,1)))
00849
00850 def get_new_pose_stamped(self):
00851 from geometry_msgs.msg import PoseStamped
00852 from geometry_msgs.msg import Pose
00853 from geometry_msgs.msg import Point
00854 from geometry_msgs.msg import Quaternion
00855
00856 return PoseStamped(None, Pose(Point(1.23, 4.56, 7.89), Quaternion(0,0,0,1)))
00857
00858
00859 def test_pose_stamped(self):
00860 self.do_test('pose_stamped', self.get_old_pose_stamped, self.get_new_pose_stamped)
00861
00862
00863
00864
00865
00866
00867
00868
00869
00870
00871
00872
00873
00874
00875
00876
00877
00878
00879
00880
00881
00882
00883
00884
00885
00886
00887
00888
00889
00890
00891
00892
00893
00894 def get_old_pose_with_covariance(self):
00895 pose_with_covariance_classes = self.load_saved_classes('PoseWithCovariance.saved')
00896
00897 pose_with_covariance = pose_with_covariance_classes['robot_msgs/PoseWithCovariance']
00898 pose = pose_with_covariance_classes['robot_msgs/Pose']
00899 point = pose_with_covariance_classes['robot_msgs/Point']
00900 quaternion = pose_with_covariance_classes['robot_msgs/Quaternion']
00901
00902 return pose_with_covariance(None, pose(point(1.23, 4.56, 7.89), quaternion(0,0,0,1)), identity6x6)
00903
00904 def get_new_pose_with_covariance(self):
00905 from geometry_msgs.msg import PoseWithCovarianceStamped
00906 from geometry_msgs.msg import PoseWithCovariance
00907 from geometry_msgs.msg import Pose
00908 from geometry_msgs.msg import Point
00909 from geometry_msgs.msg import Quaternion
00910
00911 return PoseWithCovarianceStamped(None, PoseWithCovariance(Pose(Point(1.23, 4.56, 7.89), Quaternion(0,0,0,1)), identity6x6))
00912
00913
00914 def test_pose_with_covariance(self):
00915 self.do_test('pose_with_covariance', self.get_old_pose_with_covariance, self.get_new_pose_with_covariance)
00916
00917
00918
00919
00920
00921
00922 def get_old_pose_dot(self):
00923 pose_dot_classes = self.load_saved_classes('PoseDot.saved')
00924
00925 pose_dot = pose_dot_classes['robot_msgs/PoseDot']
00926 velocity = pose_dot_classes['robot_msgs/Velocity']
00927 angular_velocity = pose_dot_classes['robot_msgs/AngularVelocity']
00928
00929 return pose_dot(velocity(1.23, 4.56, 7.89), angular_velocity(9.87, 6.54, 3.21))
00930
00931 def get_new_pose_dot(self):
00932 from geometry_msgs.msg import Twist
00933 from geometry_msgs.msg import Vector3
00934
00935 return Twist(Vector3(1.23, 4.56, 7.89), Vector3(9.87, 6.54, 3.21))
00936
00937 def test_pose_dot(self):
00938 self.do_test('pose_dot', self.get_old_pose_dot, self.get_new_pose_dot)
00939
00940
00941
00942
00943
00944
00945
00946 def get_old_particle_cloud(self):
00947 particle_cloud_classes = self.load_saved_classes('ParticleCloud.saved')
00948
00949 particle_cloud = particle_cloud_classes['robot_msgs/ParticleCloud']
00950 pose = particle_cloud_classes['robot_msgs/Pose']
00951 point = particle_cloud_classes['robot_msgs/Point']
00952 quaternion = particle_cloud_classes['robot_msgs/Quaternion']
00953
00954 return particle_cloud([pose(point(1.23, 4.56, 7.89), quaternion(1,0,0,1)),
00955 pose(point(1.25, 4.58, 7.91), quaternion(0,1,0,1)),
00956 pose(point(1.27, 4.60, 7.93), quaternion(0,0,1,1))])
00957
00958 def get_new_particle_cloud(self):
00959 from geometry_msgs.msg import PoseArray
00960 from geometry_msgs.msg import Pose
00961 from geometry_msgs.msg import Point
00962 from geometry_msgs.msg import Quaternion
00963
00964 return PoseArray(None, [Pose(Point(1.23, 4.56, 7.89), Quaternion(1,0,0,1)),
00965 Pose(Point(1.25, 4.58, 7.91), Quaternion(0,1,0,1)),
00966 Pose(Point(1.27, 4.60, 7.93), Quaternion(0,0,1,1))])
00967
00968
00969 def test_particle_cloud(self):
00970 self.do_test('particle_cloud', self.get_old_particle_cloud, self.get_new_particle_cloud)
00971
00972
00973
00974
00975
00976
00977 def get_old_path(self):
00978 path_classes = self.load_saved_classes('Path.saved')
00979
00980
00981
00982 path = path_classes['robot_msgs/Path']
00983 header = path_classes['std_msgs/Header']
00984 pose_stamped = path_classes['robot_msgs/PoseStamped']
00985 pose = path_classes['robot_msgs/Pose']
00986 point = path_classes['robot_msgs/Point']
00987 quaternion = path_classes['robot_msgs/Quaternion']
00988
00989 return path([pose_stamped(header(0,rospy.Time(0,0),'foo'), pose(point(1.23, 4.56, 7.89), quaternion(1,0,0,1))),
00990 pose_stamped(header(0,rospy.Time(0,1000),'foo'), pose(point(1.25, 4.58, 7.91), quaternion(0,1,0,1))),
00991 pose_stamped(header(0,rospy.Time(0,2000),'foo'), pose(point(1.27, 4.60, 7.93), quaternion(0,0,1,1)))])
00992
00993 def get_new_path(self):
00994 from nav_msgs.msg import Path
00995 from roslib.msg import Header
00996 from geometry_msgs.msg import PoseStamped
00997 from geometry_msgs.msg import Pose
00998 from geometry_msgs.msg import Point
00999 from geometry_msgs.msg import Quaternion
01000
01001 return Path(None, [PoseStamped(Header(0,rospy.Time(0,0),'foo'), Pose(Point(1.23, 4.56, 7.89), Quaternion(1,0,0,1))),
01002 PoseStamped(Header(0,rospy.Time(0,1000),'foo'), Pose(Point(1.25, 4.58, 7.91), Quaternion(0,1,0,1))),
01003 PoseStamped(Header(0,rospy.Time(0,2000),'foo'), Pose(Point(1.27, 4.60, 7.93), Quaternion(0,0,1,1)))])
01004
01005
01006 def test_path(self):
01007 self.do_test('path', self.get_old_path, self.get_new_path)
01008
01009
01010
01011
01012
01013
01014
01015 def get_old_velocity(self):
01016 velocity_classes = self.load_saved_classes('Velocity.saved')
01017
01018 velocity = velocity_classes['robot_msgs/Velocity']
01019
01020 return velocity(1.23, 4.56, 7.89)
01021
01022 def get_new_velocity(self):
01023 from geometry_msgs.msg import Vector3
01024
01025 return Vector3(1.23, 4.56, 7.89)
01026
01027
01028 def test_velocity(self):
01029 self.do_test('velocity', self.get_old_velocity, self.get_new_velocity)
01030
01031
01032
01033
01034
01035 def get_old_angular_velocity(self):
01036 angular_velocity_classes = self.load_saved_classes('AngularVelocity.saved')
01037
01038 angular_velocity = angular_velocity_classes['robot_msgs/AngularVelocity']
01039
01040 return angular_velocity(1.23, 4.56, 7.89)
01041
01042 def get_new_angular_velocity(self):
01043 from geometry_msgs.msg import Vector3
01044
01045 return Vector3(1.23, 4.56, 7.89)
01046
01047
01048 def test_angular_velocity(self):
01049 self.do_test('angular_velocity', self.get_old_angular_velocity, self.get_new_angular_velocity)
01050
01051
01052
01053
01054
01055 def get_old_twist(self):
01056 twist_classes = self.load_saved_classes('Twist.saved')
01057
01058 twist = twist_classes['robot_msgs/Twist']
01059 vector3 = twist_classes['robot_msgs/Vector3']
01060
01061 return twist(None, vector3(1,2,3), vector3(4, 5, 6))
01062
01063 def get_new_twist(self):
01064 from geometry_msgs.msg import TwistStamped
01065 from geometry_msgs.msg import Twist
01066 from geometry_msgs.msg import Vector3
01067
01068 return TwistStamped(None, Twist(Vector3(1,2,3), Vector3(4, 5, 6)))
01069
01070
01071 def test_twist(self):
01072 self.do_test('twist', self.get_old_twist, self.get_new_twist)
01073
01074
01075
01076
01077
01078
01079
01080 def get_old_wrench(self):
01081 wrench_classes = self.load_saved_classes('Wrench.saved')
01082
01083 wrench = wrench_classes['robot_msgs/Wrench']
01084 vector3 = wrench_classes['robot_msgs/Vector3']
01085
01086 return wrench(None, vector3(1,2,3), vector3(4, 5, 6))
01087
01088 def get_new_wrench(self):
01089 from geometry_msgs.msg import WrenchStamped
01090 from geometry_msgs.msg import Wrench
01091 from geometry_msgs.msg import Vector3
01092
01093 return WrenchStamped(None, Wrench(Vector3(1,2,3), Vector3(4, 5, 6)))
01094
01095
01096 def test_wrench(self):
01097 self.do_test('wrench', self.get_old_wrench, self.get_new_wrench)
01098
01099
01100
01101
01102
01103
01104
01105
01106
01107 def get_old_channelfloat32(self):
01108 channelfloat32_classes = self.load_saved_classes('ChannelFloat32.saved')
01109
01110 channelfloat32 = channelfloat32_classes['robot_msgs/ChannelFloat32']
01111
01112 return channelfloat32("myname", [1.23, 4.56, 7.89])
01113
01114 def get_new_channelfloat32(self):
01115 from sensor_msgs.msg import ChannelFloat32
01116
01117 return ChannelFloat32("myname", [1.23, 4.56, 7.89])
01118
01119
01120 def test_channelfloat32(self):
01121 self.do_test('channelfloat32', self.get_old_channelfloat32, self.get_new_channelfloat32)
01122
01123
01124
01125
01126
01127 def get_old_pointcloud(self):
01128 pointcloud_classes = self.load_saved_classes('PointCloud.saved')
01129
01130 pointcloud = pointcloud_classes['robot_msgs/PointCloud']
01131 point32 = pointcloud_classes['robot_msgs/Point32']
01132 channelfloat32 = pointcloud_classes['robot_msgs/ChannelFloat32']
01133
01134 points = [point32(1,2,3), point32(4,5,6)]
01135 channels = [channelfloat32("myname1", [1.23, 4.56]),
01136 channelfloat32("myname2", [1.231, 4.561])]
01137 return pointcloud(None, points, channels)
01138
01139 def get_new_pointcloud(self):
01140 from sensor_msgs.msg import PointCloud
01141 from geometry_msgs.msg import Point32
01142 from sensor_msgs.msg import ChannelFloat32
01143
01144 Points = [Point32(1,2,3), Point32(4,5,6)]
01145 Channels = [ChannelFloat32("myname1", [1.23, 4.56]),
01146 ChannelFloat32("myname2", [1.231, 4.561])]
01147
01148
01149 return PointCloud(None, Points, Channels)
01150
01151
01152 def test_pointcloud(self):
01153 self.do_test('pointcloud', self.get_old_pointcloud, self.get_new_pointcloud)
01154
01155
01156
01157
01158 def setUp(self):
01159 self.pkg_dir = roslib.packages.get_pkg_dir("test_common_msgs")
01160
01161
01162 def load_saved_classes(self,saved_msg):
01163 f = open("%s/test/saved/%s"%(self.pkg_dir,saved_msg), 'r')
01164
01165 type_line = f.readline()
01166 pat = re.compile(r"\[(.*)]:")
01167 type_match = pat.match(type_line)
01168
01169 self.assertTrue(type_match is not None, "Full definition file malformed. First line should be: '[my_package/my_msg]:'")
01170
01171 saved_type = type_match.groups()[0]
01172 saved_full_text = f.read()
01173
01174 saved_classes = roslib.genpy.generate_dynamic(saved_type,saved_full_text)
01175
01176 self.assertTrue(saved_classes is not None, "Could not generate class from full definition file.")
01177 self.assertTrue(saved_classes.has_key(saved_type), "Could not generate class from full definition file.")
01178
01179 return saved_classes
01180
01181 def do_test(self, name, old_msg, new_msg):
01182
01183 oldbag = "%s/test/%s_old.bag"%(self.pkg_dir,name)
01184 newbag = "%s/test/%s_new.bag"%(self.pkg_dir,name)
01185
01186
01187 bag = rosbag.Bag(oldbag, 'w')
01188 bag.write("topic", old_msg(), roslib.rostime.Time())
01189 bag.close()
01190
01191
01192 res = rosbagmigration.checkbag(migrator, oldbag)
01193 self.assertTrue(not False in [m[1] == [] for m in res], 'Bag not ready to be migrated')
01194 res = rosbagmigration.fixbag(migrator, oldbag, newbag)
01195 self.assertTrue(res, 'Bag not converted successfully')
01196
01197
01198 topic, msg, t = rosbag.Bag(newbag).read_messages().next()
01199
01200
01201 m = new_msg()
01202 buff = StringIO()
01203 m.serialize(buff)
01204 m.deserialize(buff.getvalue())
01205
01206
01207 self.assertTrue(roslib.message.strify_message(msg) == roslib.message.strify_message(m))
01208
01209
01210
01211 os.remove(oldbag)
01212 os.remove(newbag)
01213
01214
01215
01216
01217
01218 if __name__ == '__main__':
01219 rostest.unitrun('test_common_msgs', 'test_common_msgs_migration', TestCommonMsgsMigration, sys.argv)