00001
00002
00003
00004 from __future__ import division
00005
00006 import sys
00007
00008 import cv2
00009 import rospy
00010 import numpy as np
00011 from cv_bridge import CvBridge, CvBridgeError
00012 from sensor_msgs.msg import Image, CompressedImage
00013 from rail_face_detection_msgs.msg import Face, Detections
00014
00015 import face_detector
00016
00017
00018 FAIL_COLOR = '\033[91m'
00019 ENDC_COLOR = '\033[0m'
00020
00021 def eprint(error):
00022 sys.stderr.write(
00023 FAIL_COLOR
00024 + type(error).__name__
00025 + ": "
00026 + error.message
00027 + ENDC_COLOR
00028 )
00029
00030
00031
00032 class FaceDetector(object):
00033 """
00034 This class takes in image data and finds / annotates faces within the image
00035 """
00036
00037 def __init__(self):
00038 rospy.init_node('face_detector_node')
00039 self.faces = []
00040 self.keypoint_arrays = []
00041 self.bridge = CvBridge()
00042 self.debug = rospy.get_param('~debug', default=False)
00043 self.image_sub_topic_name = rospy.get_param('~image_sub_topic_name', default='/kinect/qhd/image_color_rect')
00044 use_gpu = rospy.get_param('~use_gpu', default=True)
00045 self.use_compressed_image = rospy.get_param('~use_compressed_image', default=False)
00046 self.face_detector = face_detector.FaceDetector(use_gpu)
00047
00048 def _draw_bb(self, image, bounding_box, color):
00049 start_x = bounding_box['x']
00050 start_y = bounding_box['y']
00051 end_x = start_x + bounding_box['w']
00052 end_y = start_y + bounding_box['h']
00053 cv2.rectangle(image,
00054 (start_x, start_y),
00055 (end_x, end_y),
00056 color=color,
00057 thickness=3)
00058 return image
00059
00060 def _convert_msg_to_image(self, image_msg):
00061 """
00062 Convert an incoming image message (compressed or otherwise) into a cv2
00063 image
00064 """
00065 if not self.use_compressed_image:
00066 try:
00067 image_cv = self.bridge.imgmsg_to_cv2(image_msg, "bgr8")
00068 except CvBridgeError as e:
00069 print e
00070 return None
00071 else:
00072 image_np = np.fromstring(image_msg.data, np.uint8)
00073 image_cv = cv2.imdecode(image_np, cv2.CV_LOAD_IMAGE_COLOR)
00074
00075 return image_cv
00076
00077 def _parse_image(self, image_msg):
00078 """
00079 Take in an image and draw a bounding box within it
00080 :param image_msg: Image data
00081 :return: None
00082 """
00083
00084 header = image_msg.header
00085 image_cv = self._convert_msg_to_image(image_msg)
00086 if image_cv is None:
00087 return
00088 self.faces, self.keypoint_arrays = self.face_detector.find_faces(image_cv)
00089 if self.debug:
00090 c = (0, 255, 0)
00091 for face, points in zip(self.faces, self.keypoint_arrays):
00092 x1 = int(face[0])
00093 y1 = int(face[1])
00094 x2 = int(face[2])
00095 y2 = int(face[3])
00096 nose_x = int(points[2])
00097 nose_y = int(points[7])
00098 left_eye_x = int(points[1])
00099 left_eye_y = int(points[6])
00100 right_eye_x = int(points[0])
00101 right_eye_y = int(points[5])
00102 left_mouth_x = int(points[4])
00103 left_mouth_y = int(points[9])
00104 right_mouth_x = int(points[3])
00105 right_mouth_y = int(points[8])
00106 image_cv = self._draw_bb(image_cv, {'x': x1,
00107 'y': y1,
00108 'w': x2-x1,
00109 'h': y2-y1}, c)
00110 cv2.circle(image_cv, (nose_x, nose_y), 3, c, thickness=1)
00111 cv2.circle(image_cv, (left_eye_x, left_eye_y), 3, c, thickness=1)
00112 cv2.circle(image_cv, (right_eye_x, right_eye_y), 3, c, thickness=1)
00113 cv2.circle(image_cv, (left_mouth_x, left_mouth_y), 3, c, thickness=1)
00114 cv2.circle(image_cv, (right_mouth_x, right_mouth_y), 3, c, thickness=1)
00115 try:
00116 image_msg = self.bridge.cv2_to_imgmsg(image_cv, "bgr8")
00117 except CvBridgeError as e:
00118 print e
00119
00120 image_msg.header = header
00121 self.image_pub.publish(image_msg)
00122
00123
00124 face_arr = Detections()
00125 face_arr.header = header
00126
00127 for face, points in zip(self.faces, self.keypoint_arrays):
00128 msg = Face()
00129 msg.top_left_x = int(face[0])
00130 msg.top_left_y = int(face[1])
00131 msg.bot_right_x = int(face[2])
00132 msg.bot_right_y = int(face[3])
00133 msg.nose_x = int(points[2])
00134 msg.nose_y = int(points[7])
00135 msg.left_eye_x = int(points[1])
00136 msg.left_eye_y = int(points[6])
00137 msg.right_eye_x = int(points[0])
00138 msg.right_eye_y = int(points[5])
00139 msg.left_mouth_x = int(points[4])
00140 msg.left_mouth_y = int(points[9])
00141 msg.right_mouth_x = int(points[3])
00142 msg.right_mouth_y = int(points[8])
00143 face_arr.faces.append(msg)
00144
00145 self.face_pub.publish(face_arr)
00146
00147 def run(self,
00148 pub_image_topic='~debug/face_image',
00149 pub_face_topic='~faces'):
00150 if not self.use_compressed_image:
00151 rospy.Subscriber(self.image_sub_topic_name, Image, self._parse_image)
00152 else:
00153 rospy.Subscriber(self.image_sub_topic_name+'/compressed', CompressedImage, self._parse_image)
00154 if self.debug:
00155 self.image_pub = rospy.Publisher(pub_image_topic, Image, queue_size=2)
00156 self.face_pub = rospy.Publisher(pub_face_topic, Detections, queue_size=2)
00157 rospy.spin()
00158
00159 if __name__ == '__main__':
00160 try:
00161 detector = FaceDetector()
00162 detector.run()
00163 except rospy.ROSInterruptException:
00164 pass