test_rosbag_image_compressor.py
Go to the documentation of this file.
00001 
00002 from PIL import Image
00003 
00004 import os
00005 import unittest
00006 import tempfile
00007 import shutil
00008 
00009 from sensor_msgs.msg import Image as sensorImage
00010 
00011 import rosbag
00012 import rospy
00013 
00014 from rosbag_image_compressor import compress_image,\
00015     uncompress_image, compress, uncompress, EncodingCache,\
00016     image_topic_basename
00017 
00018 
00019 def create_image():
00020     img_data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 0] * 10
00021     img_data = "1234567890" * 10
00022     # not available on precise image = Image.frombytes('L', (10, 10), img_data)
00023     image = Image.fromstring('L', (10, 10), img_data)
00024     return image
00025 
00026 
00027 def create_image_msg():
00028     img = create_image()
00029     im = sensorImage()
00030     im.header.frame_id = 'foobar'
00031     im.width, im.height = img.size
00032     im.encoding = 'foobar'
00033     im.is_bigendian = False  # TODO
00034     im.step = im.width
00035     im.data = img.tostring()
00036     return im
00037 
00038 
00039 class TestBag():
00040     def __init__(self, num_images=1):
00041         self.num_images = num_images
00042 
00043     def __enter__(self):
00044         self.tempdir = tempfile.mkdtemp()
00045         self.filename = os.path.join(self.tempdir, 'test.bag')
00046         with rosbag.Bag(self.filename, 'w') as bagfile:
00047             for n in range(self.num_images):
00048                 bagfile.write('/foo/bar/image_raw', create_image_msg(),
00049                               rospy.Time(n + 100))
00050         return self
00051 
00052     def __exit__(       self, exc_type, exc_value, traceback):
00053         shutil.rmtree(self.tempdir)
00054 
00055 
00056 class TestClass(unittest.TestCase):
00057 
00058     def test_compress_uncompress(self):
00059         i = create_image_msg()
00060         ci, encoding_msg = compress_image(i)
00061         uci = uncompress_image(ci, encoding_msg.data)
00062         self.check_contents(i, uci)
00063 
00064     def check_contents(self, i, uci):
00065         self.assertEqual(i.header.frame_id, uci.header.frame_id)
00066         self.assertEqual(i.header.stamp, uci.header.stamp)
00067         self.assertEqual(i.width, uci.width)
00068         self.assertEqual(i.height, uci.height)
00069         self.assertEqual(i.step, uci.step)
00070         self.assertEqual(i.is_bigendian, uci.is_bigendian)
00071         self.assertEqual(i.encoding, uci.encoding)
00072         self.assertEqual(i.data, uci.data)
00073 
00074     def test_encoding_cache(self):
00075         cache = EncodingCache()
00076         cache.insert_encoding('topic1', 'enc1')
00077         self.assertEqual('enc1', cache.lookup_encoding('topic1'),
00078                          "Coorect encoding")
00079         try:
00080             cache.lookup_encoding('topic/2')
00081             self.assertFalse("Should have excepted")
00082         except:
00083             self.assertTrue(True)
00084 
00085         cache.insert_encoding('topic/2', 'enc2')
00086         self.assertEqual('enc2', cache.lookup_encoding('topic/2'),
00087                          "Coorect encoding")
00088 
00089     def test_image_topic_basename(self):
00090         self.assertEqual(image_topic_basename('foo/bar/image_raw'), 'foo/bar/')
00091         self.assertEqual(image_topic_basename('foo/bar/compressed'), 'foo/bar/')
00092         self.assertEqual(image_topic_basename('foo/bar/encoding'), 'foo/bar/')
00093         try:
00094             image_topic_basename('foo/bar/other')
00095             self.assertFalse("Should have thrown")
00096         except:
00097             self.assertTrue("Has thrown")
00098 
00099     def test_bag_unbag_empty(self):
00100         with TestBag(0) as bag_context:
00101             intermediate = os.path.join(bag_context.tempdir, 'intermediate.bag')
00102             output = os.path.join(bag_context.tempdir, 'output.bag')
00103             compress(bag_context.filename, intermediate)
00104             uncompress(intermediate, output)
00105 
00106     def test_bag_unbag(self):
00107         with TestBag(4) as bag_context:
00108             intermediate = os.path.join(bag_context.tempdir, 'intermediate.bag')
00109             output = os.path.join(bag_context.tempdir, 'output.bag')
00110             compress(bag_context.filename, intermediate)
00111             uncompress(intermediate, output)
00112             with rosbag.Bag(output, 'r') as bag:
00113                 for topic, msg, t in bag:
00114                     self.check_contents(create_image_msg(), msg)


rosbag_image_compressor
Author(s):
autogenerated on Thu Aug 27 2015 14:50:43