00001
00002 from PIL import Image
00003
00004 import os
00005 import unittest
00006 import tempfile
00007 import shutil
00008
00009 from sensor_msgs.msg import Image as sensorImage
00010
00011 import rosbag
00012 import rospy
00013
00014 from rosbag_image_compressor import compress_image,\
00015 uncompress_image, compress, uncompress, EncodingCache,\
00016 image_topic_basename
00017
00018
00019 def create_image():
00020 img_data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 0] * 10
00021 img_data = "1234567890" * 10
00022
00023 image = Image.fromstring('L', (10, 10), img_data)
00024 return image
00025
00026
00027 def create_image_msg():
00028 img = create_image()
00029 im = sensorImage()
00030 im.header.frame_id = 'foobar'
00031 im.width, im.height = img.size
00032 im.encoding = 'foobar'
00033 im.is_bigendian = False
00034 im.step = im.width
00035 im.data = img.tostring()
00036 return im
00037
00038
00039 class TestBag():
00040 def __init__(self, num_images=1):
00041 self.num_images = num_images
00042
00043 def __enter__(self):
00044 self.tempdir = tempfile.mkdtemp()
00045 self.filename = os.path.join(self.tempdir, 'test.bag')
00046 with rosbag.Bag(self.filename, 'w') as bagfile:
00047 for n in range(self.num_images):
00048 bagfile.write('/foo/bar/image_raw', create_image_msg(),
00049 rospy.Time(n + 100))
00050 return self
00051
00052 def __exit__( self, exc_type, exc_value, traceback):
00053 shutil.rmtree(self.tempdir)
00054
00055
00056 class TestClass(unittest.TestCase):
00057
00058 def test_compress_uncompress(self):
00059 i = create_image_msg()
00060 ci, encoding_msg = compress_image(i)
00061 uci = uncompress_image(ci, encoding_msg.data)
00062 self.check_contents(i, uci)
00063
00064 def check_contents(self, i, uci):
00065 self.assertEqual(i.header.frame_id, uci.header.frame_id)
00066 self.assertEqual(i.header.stamp, uci.header.stamp)
00067 self.assertEqual(i.width, uci.width)
00068 self.assertEqual(i.height, uci.height)
00069 self.assertEqual(i.step, uci.step)
00070 self.assertEqual(i.is_bigendian, uci.is_bigendian)
00071 self.assertEqual(i.encoding, uci.encoding)
00072 self.assertEqual(i.data, uci.data)
00073
00074 def test_encoding_cache(self):
00075 cache = EncodingCache()
00076 cache.insert_encoding('topic1', 'enc1')
00077 self.assertEqual('enc1', cache.lookup_encoding('topic1'),
00078 "Coorect encoding")
00079 try:
00080 cache.lookup_encoding('topic/2')
00081 self.assertFalse("Should have excepted")
00082 except:
00083 self.assertTrue(True)
00084
00085 cache.insert_encoding('topic/2', 'enc2')
00086 self.assertEqual('enc2', cache.lookup_encoding('topic/2'),
00087 "Coorect encoding")
00088
00089 def test_image_topic_basename(self):
00090 self.assertEqual(image_topic_basename('foo/bar/image_raw'), 'foo/bar/')
00091 self.assertEqual(image_topic_basename('foo/bar/compressed'), 'foo/bar/')
00092 self.assertEqual(image_topic_basename('foo/bar/encoding'), 'foo/bar/')
00093 try:
00094 image_topic_basename('foo/bar/other')
00095 self.assertFalse("Should have thrown")
00096 except:
00097 self.assertTrue("Has thrown")
00098
00099 def test_bag_unbag_empty(self):
00100 with TestBag(0) as bag_context:
00101 intermediate = os.path.join(bag_context.tempdir, 'intermediate.bag')
00102 output = os.path.join(bag_context.tempdir, 'output.bag')
00103 compress(bag_context.filename, intermediate)
00104 uncompress(intermediate, output)
00105
00106 def test_bag_unbag(self):
00107 with TestBag(4) as bag_context:
00108 intermediate = os.path.join(bag_context.tempdir, 'intermediate.bag')
00109 output = os.path.join(bag_context.tempdir, 'output.bag')
00110 compress(bag_context.filename, intermediate)
00111 uncompress(intermediate, output)
00112 with rosbag.Bag(output, 'r') as bag:
00113 for topic, msg, t in bag:
00114 self.check_contents(create_image_msg(), msg)