face_detector_node.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # This file is responsible for bridging ROS to the FaceDetector class (built with Caffe)
00003 
00004 from __future__ import division
00005 
00006 import sys
00007 
00008 import cv2
00009 import rospy
00010 import numpy as np
00011 from cv_bridge import CvBridge, CvBridgeError
00012 from sensor_msgs.msg import Image, CompressedImage
00013 from rail_face_detection_msgs.msg import Face, Detections
00014 
00015 import face_detector
00016 
00017 # Debug Helpers
00018 FAIL_COLOR = '\033[91m'
00019 ENDC_COLOR = '\033[0m'
00020 
00021 def eprint(error):
00022     sys.stderr.write(
00023         FAIL_COLOR
00024         + type(error).__name__
00025         + ": "
00026         + error.message
00027         + ENDC_COLOR
00028     )
00029 # End Debug Helpers
00030 
00031 
00032 class FaceDetector(object):
00033     """
00034     This class takes in image data and finds / annotates faces within the image
00035     """
00036 
00037     def __init__(self):
00038         rospy.init_node('face_detector_node')
00039         self.faces = []
00040         self.keypoint_arrays = []
00041         self.bridge = CvBridge()
00042         self.debug = rospy.get_param('~debug', default=False)
00043         self.image_sub_topic_name = rospy.get_param('~image_sub_topic_name', default='/kinect/qhd/image_color_rect')
00044         use_gpu = rospy.get_param('~use_gpu', default=True)
00045         self.use_compressed_image = rospy.get_param('~use_compressed_image', default=False)
00046         self.face_detector = face_detector.FaceDetector(use_gpu)
00047 
00048     def _draw_bb(self, image, bounding_box, color):
00049         start_x = bounding_box['x']
00050         start_y = bounding_box['y']
00051         end_x = start_x + bounding_box['w']
00052         end_y = start_y + bounding_box['h']
00053         cv2.rectangle(image,
00054                       (start_x, start_y),
00055                       (end_x, end_y),
00056                       color=color,
00057                       thickness=3)
00058         return image
00059 
00060     def _convert_msg_to_image(self, image_msg):
00061         """
00062         Convert an incoming image message (compressed or otherwise) into a cv2
00063         image
00064         """
00065         if not self.use_compressed_image:
00066             try:
00067                 image_cv = self.bridge.imgmsg_to_cv2(image_msg, "bgr8")
00068             except CvBridgeError as e:
00069                 print e
00070                 return None
00071         else:
00072             image_np = np.fromstring(image_msg.data, np.uint8)
00073             image_cv = cv2.imdecode(image_np, cv2.CV_LOAD_IMAGE_COLOR)
00074 
00075         return image_cv
00076 
00077     def _parse_image(self, image_msg):
00078         """
00079         Take in an image and draw a bounding box within it
00080         :param image_msg: Image data
00081         :return: None
00082         """
00083 
00084         header = image_msg.header
00085         image_cv = self._convert_msg_to_image(image_msg)
00086         if image_cv is None:
00087             return
00088         self.faces, self.keypoint_arrays = self.face_detector.find_faces(image_cv)
00089         if self.debug:
00090             c = (0, 255, 0)
00091             for face, points in zip(self.faces, self.keypoint_arrays):
00092                 x1 = int(face[0])
00093                 y1 = int(face[1])
00094                 x2 = int(face[2])
00095                 y2 = int(face[3])
00096                 nose_x = int(points[2])
00097                 nose_y = int(points[7])
00098                 left_eye_x = int(points[1])
00099                 left_eye_y = int(points[6])
00100                 right_eye_x = int(points[0])
00101                 right_eye_y = int(points[5])
00102                 left_mouth_x = int(points[4])
00103                 left_mouth_y = int(points[9])
00104                 right_mouth_x = int(points[3])
00105                 right_mouth_y = int(points[8])
00106                 image_cv = self._draw_bb(image_cv, {'x': x1,
00107                                                     'y': y1,
00108                                                     'w': x2-x1,
00109                                                     'h': y2-y1}, c)
00110                 cv2.circle(image_cv, (nose_x, nose_y), 3, c, thickness=1)
00111                 cv2.circle(image_cv, (left_eye_x, left_eye_y), 3, c, thickness=1)
00112                 cv2.circle(image_cv, (right_eye_x, right_eye_y), 3, c, thickness=1)
00113                 cv2.circle(image_cv, (left_mouth_x, left_mouth_y), 3, c, thickness=1)
00114                 cv2.circle(image_cv, (right_mouth_x, right_mouth_y), 3, c, thickness=1)
00115             try:
00116                 image_msg = self.bridge.cv2_to_imgmsg(image_cv, "bgr8")
00117             except CvBridgeError as e:
00118                 print e
00119 
00120             image_msg.header = header
00121             self.image_pub.publish(image_msg)
00122 
00123         # Instantiate detections object
00124         face_arr = Detections()
00125         face_arr.header = header
00126         # For each face / keypoint set found in the image:
00127         for face, points in zip(self.faces, self.keypoint_arrays):
00128             msg = Face()
00129             msg.top_left_x = int(face[0])
00130             msg.top_left_y = int(face[1])
00131             msg.bot_right_x = int(face[2])
00132             msg.bot_right_y = int(face[3])
00133             msg.nose_x = int(points[2])
00134             msg.nose_y = int(points[7])
00135             msg.left_eye_x = int(points[1])
00136             msg.left_eye_y = int(points[6])
00137             msg.right_eye_x = int(points[0])
00138             msg.right_eye_y = int(points[5])
00139             msg.left_mouth_x = int(points[4])
00140             msg.left_mouth_y = int(points[9])
00141             msg.right_mouth_x = int(points[3])
00142             msg.right_mouth_y = int(points[8])
00143             face_arr.faces.append(msg)
00144 
00145         self.face_pub.publish(face_arr)
00146 
00147     def run(self,
00148             pub_image_topic='~debug/face_image',
00149             pub_face_topic='~faces'):
00150         if not self.use_compressed_image:
00151             rospy.Subscriber(self.image_sub_topic_name, Image, self._parse_image) # subscribe to sub_image_topic and callback parse
00152         else:
00153             rospy.Subscriber(self.image_sub_topic_name+'/compressed', CompressedImage, self._parse_image)
00154         if self.debug:
00155             self.image_pub = rospy.Publisher(pub_image_topic, Image, queue_size=2) # image publisher
00156         self.face_pub = rospy.Publisher(pub_face_topic, Detections, queue_size=2) # faces publisher
00157         rospy.spin()
00158 
00159 if __name__ == '__main__':
00160     try:
00161         detector = FaceDetector()
00162         detector.run()
00163     except rospy.ROSInterruptException:
00164         pass


rail_face_detector
Author(s): Siddhartha Banerjee , Andrew Silva
autogenerated on Thu Jun 6 2019 21:23:56