Go to the documentation of this file.00001
00002
00003 import os.path as p
00004 import cv2
00005 from cv_bridge import CvBridge, CvBridgeError
00006
00007 import unittest
00008
00009 from sensor_msgs.msg import Image
00010
00011 import rospy
00012
00013
00014 class EdgeImagePublisher(object):
00015 def __init__(self, imgpath):
00016 self.pub = rospy.Publisher("/test_edge_image", Image)
00017 self.sub_topic_name = "/sparse/image_decoded"
00018 self.sub = rospy.Subscriber(self.sub_topic_name, Image, self._callback)
00019 self.bridge = CvBridge()
00020 self.edgeimg = cv2.imread(imgpath)
00021 self.edgemsg = self.bridge.cv2_to_imgmsg(self.edgeimg, encoding="8UC3")
00022 self.response = False
00023
00024 def _publish(self):
00025 try:
00026 self.edgemsg.header.stamp = rospy.Time.now()
00027 self.pub.publish(self.edgemsg)
00028 except CvBridgeError, e:
00029 print e
00030
00031 def _check_img_msg_diff(self, msg1, msg2):
00032 cnt = 0
00033 if msg1.encoding == "8UC3":
00034 channel1 = 3
00035 else:
00036 channel1 = 1
00037 channel2 = 1
00038 if msg1.width != msg2.width:
00039 return -1
00040 if msg1.height != msg2.height:
00041 return -1
00042 if msg1.header.stamp != msg2.header.stamp:
00043 return -1
00044 receivedimg = self.bridge.imgmsg_to_cv2(msg2)
00045 for x in range(self.edgeimg.shape[1]):
00046 for y in range(self.edgeimg.shape[0]):
00047 if not (self.edgeimg[y,x] == receivedimg[y,x]).any():
00048 rospy.loginfo(self.edgeimg[y,x])
00049 rospy.loginfo(receivedimg[y,x])
00050 rospy.loginfo("")
00051 cnt += 1
00052 rospy.logdebug(cnt)
00053 return cnt
00054
00055 def _callback(self, msg):
00056 if self._check_img_msg_diff(self.edgemsg, msg) == 0:
00057 self.response = True
00058
00059 def checkSparseImage(self):
00060 cnt = 0
00061 while cnt <= 100:
00062 try:
00063 self._publish()
00064 rospy.wait_for_message(self.sub_topic_name, Image, timeout=1.0)
00065 except:
00066 pass
00067 rospy.loginfo("waiting for subscribe...")
00068 if self.response:
00069 return True
00070 cnt += 1
00071 return False
00072
00073 class TestSparseImage(unittest.TestCase):
00074 def test_encode_decode_data16(self):
00075 imgpath = p.join(p.dirname(__file__), "../sample/edge_image_test_240x180.bmp")
00076 ei = EdgeImagePublisher(imgpath)
00077 self.assertTrue(ei.checkSparseImage())
00078
00079 def test_encode_decode_data32(self):
00080 imgpath = p.join(p.dirname(__file__), "../sample/edge_image_test_640x480.bmp")
00081 ei = EdgeImagePublisher(imgpath)
00082 self.assertTrue(ei.checkSparseImage())
00083
00084
00085 if __name__ == "__main__":
00086 import rostest
00087 rospy.init_node("sparse_image_test_py")
00088 rostest.rosrun("jsk_perception", "test_sparse_image", TestSparseImage)
00089