test_sparse_image.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 
00003 import os.path as p
00004 import cv2
00005 from cv_bridge import CvBridge, CvBridgeError
00006 
00007 import unittest
00008 
00009 from sensor_msgs.msg import Image
00010 
00011 import rospy
00012 
00013 
00014 class EdgeImagePublisher(object):
00015     def __init__(self, imgpath):
00016         self.pub = rospy.Publisher("/test_edge_image", Image)
00017         self.sub_topic_name = "/sparse/image_decoded"
00018         self.sub = rospy.Subscriber(self.sub_topic_name, Image, self._callback)
00019         self.bridge = CvBridge()
00020         self.edgeimg = cv2.imread(imgpath)
00021         self.edgemsg = self.bridge.cv2_to_imgmsg(self.edgeimg, encoding="8UC3")
00022         self.response = False
00023 
00024     def _publish(self):
00025         try:
00026             self.edgemsg.header.stamp = rospy.Time.now()
00027             self.pub.publish(self.edgemsg)
00028         except CvBridgeError, e:
00029             print e
00030 
00031     def _check_img_msg_diff(self, msg1, msg2):
00032         cnt = 0
00033         if msg1.encoding == "8UC3":
00034             channel1 = 3
00035         else:
00036             channel1 = 1
00037         channel2 = 1
00038         if msg1.width != msg2.width:
00039             return -1
00040         if msg1.height != msg2.height:
00041             return -1
00042         if msg1.header.stamp != msg2.header.stamp:
00043             return -1
00044         receivedimg = self.bridge.imgmsg_to_cv2(msg2)
00045         for x in range(self.edgeimg.shape[1]):
00046             for y in range(self.edgeimg.shape[0]):
00047                 if not (self.edgeimg[y,x] == receivedimg[y,x]).any():
00048                     rospy.loginfo(self.edgeimg[y,x])
00049                     rospy.loginfo(receivedimg[y,x])
00050                     rospy.loginfo("")
00051                     cnt += 1
00052         rospy.logdebug(cnt)
00053         return cnt
00054 
00055     def _callback(self, msg):
00056         if self._check_img_msg_diff(self.edgemsg, msg) == 0:
00057             self.response = True
00058 
00059     def checkSparseImage(self):
00060         cnt = 0
00061         while cnt <= 100:
00062             try:
00063                 self._publish() 
00064                 rospy.wait_for_message(self.sub_topic_name, Image, timeout=1.0)
00065             except:
00066                 pass
00067             rospy.loginfo("waiting for subscribe...")
00068             if self.response:
00069                 return True
00070             cnt += 1
00071         return False
00072 
00073 class TestSparseImage(unittest.TestCase):
00074     def test_encode_decode_data16(self):
00075         imgpath = p.join(p.dirname(__file__), "../sample/edge_image_test_240x180.bmp")
00076         ei = EdgeImagePublisher(imgpath)
00077         self.assertTrue(ei.checkSparseImage())
00078 
00079     def test_encode_decode_data32(self):
00080         imgpath = p.join(p.dirname(__file__), "../sample/edge_image_test_640x480.bmp")
00081         ei = EdgeImagePublisher(imgpath)
00082         self.assertTrue(ei.checkSparseImage())
00083 
00084 
00085 if __name__ == "__main__":
00086     import rostest
00087     rospy.init_node("sparse_image_test_py")
00088     rostest.rosrun("jsk_perception", "test_sparse_image", TestSparseImage)
00089 


jsk_perception
Author(s): Manabu Saito, Ryohei Ueda
autogenerated on Sun Oct 8 2017 02:43:23