00001
00002
00003
00004 import cv2
00005 import numpy as np
00006
00007 import cv_bridge
00008 import message_filters
00009 import rospy
00010 from jsk_topic_tools import ConnectionBasedTransport
00011 from jsk_recognition_msgs.msg import PeoplePoseArray
00012 from sensor_msgs.msg import Image
00013
00014
00015 class PeopleMaskPublisher(ConnectionBasedTransport):
00016
00017 limb_names = ["Nose",
00018 "Neck",
00019 "RShoulder",
00020 "RElbow",
00021 "RWrist",
00022 "LShoulder",
00023 "LElbow",
00024 "LWrist",
00025 "RHip",
00026 "RKnee",
00027 "RAnkle",
00028 "LHip",
00029 "LKnee",
00030 "LAnkle",
00031 "REye",
00032 "LEye",
00033 "REar",
00034 "LEar",
00035 'RHand',
00036 'LHand']
00037
00038 def __init__(self):
00039 super(self.__class__, self).__init__()
00040 self.person_indices = rospy.get_param('~person_indices', -1)
00041 self.limb_part = rospy.get_param('~limb_part', 'all')
00042 if self.limb_part == 'all':
00043 self.limb_part = self.limb_names
00044 self.arms_score_threshold = rospy.get_param(
00045 '~arms_score_threshold', 0.25)
00046 self.hand_ratio = rospy.get_param('~hand_ratio', 0.33)
00047 self.hand_width_ratio = rospy.get_param('~hand_width_ratio', 0.8)
00048
00049 self.pub = self.advertise('~output', Image, queue_size=1)
00050 self.debug_pub = self.advertise('~debug/output', Image, queue_size=1)
00051
00052 def subscribe(self):
00053 queue_size = rospy.get_param('~queue_size', 10)
00054 sub_img = message_filters.Subscriber(
00055 '~input', Image, queue_size=1, buff_size=2**24)
00056 sub_pose = message_filters.Subscriber(
00057 '~input/pose', PeoplePoseArray, queue_size=1, buff_size=2**24)
00058 self.subs = [sub_img, sub_pose]
00059 if rospy.get_param('~approximate_sync', False):
00060 slop = rospy.get_param('~slop', 0.1)
00061 sync = message_filters.ApproximateTimeSynchronizer(
00062 fs=self.subs, queue_size=queue_size, slop=slop)
00063 else:
00064 sync = message_filters.TimeSynchronizer(
00065 fs=self.subs, queue_size=queue_size)
00066 sync.registerCallback(self._cb)
00067
00068 def unsubscribe(self):
00069 for sub in self.subs:
00070 sub.unregister()
00071
00072 def _cb(self, img_msg, people_pose_array_msg):
00073 br = cv_bridge.CvBridge()
00074 img = br.imgmsg_to_cv2(img_msg, desired_encoding='bgr8')
00075
00076 if self.person_indices != -1:
00077 indices = np.array(self.person_indices)
00078 indices = indices[indices < len(people_pose_array_msg.poses)]
00079 people_pose = np.array(people_pose_array_msg.poses)[indices]
00080 else:
00081 people_pose = people_pose_array_msg.poses
00082
00083 mask_img = np.zeros((img.shape[0], img.shape[1]), dtype=np.bool)
00084 arm = [limb_prefix for limb_prefix in ['R', 'L']
00085 if limb_prefix + 'Hand' in self.limb_part]
00086 if arm:
00087 arm_mask_img, debug_img = self._create_hand_mask(people_pose, img, arm)
00088 mask_img = np.bitwise_or(mask_img, arm_mask_img)
00089
00090 mask_msg = br.cv2_to_imgmsg(np.uint8(mask_img * 255.0), encoding='mono8')
00091 mask_msg.header = img_msg.header
00092
00093 debug_msg = br.cv2_to_imgmsg(debug_img, encoding='bgr8')
00094 debug_msg.header = img_msg.header
00095
00096 self.pub.publish(mask_msg)
00097 self.debug_pub.publish(debug_msg)
00098
00099 def _create_hand_mask(self, people_pose, img, arm=['R', 'L']):
00100 rectangle_thickness = 5
00101 rectangle_colors = [(0, 255, 0), (0, 0, 255)]
00102
00103 mask_img = np.zeros((img.shape[0], img.shape[1]), dtype=np.bool)
00104 for person_pose in people_pose:
00105 for limb_prefix, color in zip(arm, rectangle_colors):
00106 try:
00107 shoulder_index = person_pose.limb_names.index(
00108 limb_prefix + 'Shoulder')
00109 elbow_index = person_pose.limb_names.index(
00110 limb_prefix + 'Elbow')
00111 wrist_index = person_pose.limb_names.index(
00112 limb_prefix + 'Wrist')
00113 except ValueError:
00114 continue
00115
00116 if not np.all(np.array(person_pose.scores)[[elbow_index, shoulder_index, wrist_index]] > self.arms_score_threshold):
00117 continue
00118
00119 shoulder = person_pose.poses[shoulder_index]
00120 elbow = person_pose.poses[elbow_index]
00121 wrist = person_pose.poses[wrist_index]
00122
00123 x = wrist.position.x + self.hand_ratio * \
00124 (wrist.position.x - elbow.position.x)
00125 y = wrist.position.y + self.hand_ratio * \
00126 (wrist.position.y - elbow.position.y)
00127
00128 wrist_to_elbow_length = ((wrist.position.x - elbow.position.x)
00129 ** 2 + (wrist.position.y - elbow.position.y) ** 2) ** 0.5
00130 elbow_to_shoulder_length = ((shoulder.position.x - elbow.position.x) ** 2 + (
00131 shoulder.position.y - elbow.position.y) ** 2) ** 0.5
00132 width = self.hand_width_ratio * \
00133 max(wrist_to_elbow_length, 0.9 * elbow_to_shoulder_length)
00134 height = width
00135
00136 x -= width / 2.0
00137 y -= height / 2.0
00138
00139 x = int(x)
00140 y = int(y)
00141 width = int(width)
00142 height = int(height)
00143
00144 mask_img[y:y + height, x:x + width] = True
00145 cv2.rectangle(img, (x, y), (x + width, y + height),
00146 color, rectangle_thickness)
00147 return mask_img, img
00148
00149 if __name__ == '__main__':
00150 rospy.init_node('people_mask_publisher')
00151 PeopleMaskPublisher()
00152 rospy.spin()