people_mask_publisher.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 # -*- coding:utf-8 -*-
3 
4 import cv2
5 import numpy as np
6 
7 import cv_bridge
8 import message_filters
9 import rospy
10 from jsk_topic_tools import ConnectionBasedTransport
11 from jsk_recognition_msgs.msg import PeoplePoseArray
12 from sensor_msgs.msg import Image
13 
14 
15 class PeopleMaskPublisher(ConnectionBasedTransport):
16 
17  limb_names = ["Nose",
18  "Neck",
19  "RShoulder",
20  "RElbow",
21  "RWrist",
22  "LShoulder",
23  "LElbow",
24  "LWrist",
25  "RHip",
26  "RKnee",
27  "RAnkle",
28  "LHip",
29  "LKnee",
30  "LAnkle",
31  "REye",
32  "LEye",
33  "REar",
34  "LEar",
35  'RHand',
36  'LHand']
37 
38  def __init__(self):
39  super(self.__class__, self).__init__()
40  self.person_indices = rospy.get_param('~person_indices', -1)
41  self.limb_part = rospy.get_param('~limb_part', 'all')
42  if self.limb_part == 'all':
43  self.limb_part = self.limb_names
44  self.arms_score_threshold = rospy.get_param(
45  '~arms_score_threshold', 0.25)
46  self.hand_ratio = rospy.get_param('~hand_ratio', 0.33)
47  self.hand_width_ratio = rospy.get_param('~hand_width_ratio', 0.8)
48  self.face_ratio = rospy.get_param('~face_ratio', 0.6)
49  self.face_shoulder_ratio = rospy.get_param('~face_shoulder_ratio', 0.5)
50  self.face_width_margin_ratio = rospy.get_param('~face_width_margin_ratio', 1.3)
51 
52  self.pub = self.advertise('~output', Image, queue_size=1)
53  self.debug_pub = self.advertise('~debug/output', Image, queue_size=1)
54 
55  def subscribe(self):
56  queue_size = rospy.get_param('~queue_size', 10)
58  '~input', Image, queue_size=1, buff_size=2**24)
59  sub_pose = message_filters.Subscriber(
60  '~input/pose', PeoplePoseArray, queue_size=1, buff_size=2**24)
61  self.subs = [sub_img, sub_pose]
62  if rospy.get_param('~approximate_sync', False):
63  slop = rospy.get_param('~slop', 0.1)
64  sync = message_filters.ApproximateTimeSynchronizer(
65  fs=self.subs, queue_size=queue_size, slop=slop)
66  else:
68  fs=self.subs, queue_size=queue_size)
69  sync.registerCallback(self._cb)
70 
71  def unsubscribe(self):
72  for sub in self.subs:
73  sub.unregister()
74 
75  def _cb(self, img_msg, people_pose_array_msg):
76  br = cv_bridge.CvBridge()
77  img = br.imgmsg_to_cv2(img_msg, desired_encoding='bgr8')
78 
79  if self.person_indices != -1:
80  indices = np.array(self.person_indices)
81  indices = indices[indices < len(people_pose_array_msg.poses)]
82  people_pose = np.array(people_pose_array_msg.poses)[indices]
83  else:
84  people_pose = people_pose_array_msg.poses
85 
86  mask_img = np.zeros((img.shape[0], img.shape[1]), dtype=np.bool)
87  arm = [limb_prefix for limb_prefix in ['R', 'L']
88  if limb_prefix + 'Hand' in self.limb_part]
89  nose = 'Nose' in self.limb_part
90  if arm:
91  arm_mask_img, debug_img = self._create_hand_mask(people_pose, img, arm)
92  mask_img = np.bitwise_or(mask_img, arm_mask_img)
93  if nose:
94  nose_mask_img, debug_img = self._create_nose_mask(people_pose, img)
95  mask_img = np.bitwise_or(mask_img, nose_mask_img)
96 
97  mask_msg = br.cv2_to_imgmsg(np.uint8(mask_img * 255.0), encoding='mono8')
98  mask_msg.header = img_msg.header
99 
100  debug_msg = br.cv2_to_imgmsg(debug_img, encoding='bgr8')
101  debug_msg.header = img_msg.header
102 
103  self.pub.publish(mask_msg)
104  self.debug_pub.publish(debug_msg)
105 
106  def _create_hand_mask(self, people_pose, img, arm=['R', 'L']):
107  rectangle_thickness = 5
108  rectangle_colors = [(0, 255, 0), (0, 0, 255)]
109 
110  mask_img = np.zeros((img.shape[0], img.shape[1]), dtype=np.bool)
111  for person_pose in people_pose:
112  for limb_prefix, color in zip(arm, rectangle_colors):
113  try:
114  shoulder_index = person_pose.limb_names.index(
115  limb_prefix + 'Shoulder')
116  elbow_index = person_pose.limb_names.index(
117  limb_prefix + 'Elbow')
118  wrist_index = person_pose.limb_names.index(
119  limb_prefix + 'Wrist')
120  except ValueError:
121  continue
122 
123  if not np.all(np.array(person_pose.scores)[[elbow_index, shoulder_index, wrist_index]] > self.arms_score_threshold):
124  continue
125 
126  shoulder = person_pose.poses[shoulder_index]
127  elbow = person_pose.poses[elbow_index]
128  wrist = person_pose.poses[wrist_index]
129 
130  x = wrist.position.x + self.hand_ratio * \
131  (wrist.position.x - elbow.position.x)
132  y = wrist.position.y + self.hand_ratio * \
133  (wrist.position.y - elbow.position.y)
134 
135  wrist_to_elbow_length = ((wrist.position.x - elbow.position.x)
136  ** 2 + (wrist.position.y - elbow.position.y) ** 2) ** 0.5
137  elbow_to_shoulder_length = ((shoulder.position.x - elbow.position.x) ** 2 + (
138  shoulder.position.y - elbow.position.y) ** 2) ** 0.5
139  width = self.hand_width_ratio * \
140  max(wrist_to_elbow_length, 0.9 * elbow_to_shoulder_length)
141  height = width
142 
143  x -= width / 2.0
144  y -= height / 2.0
145 
146  x = int(x)
147  y = int(y)
148  width = int(width)
149  height = int(height)
150 
151  mask_img[max(y, 0):max(min(y + height, img.shape[0]), 0),
152  max(x, 0):max(min(x + width, img.shape[1]), 0)] = True
153  cv2.rectangle(img, (x, y), (x + width, y + height),
154  color, rectangle_thickness)
155  return mask_img, img
156 
157  def _create_nose_mask(self, people_pose, img):
158  rectangle_thickness = 5
159  color = (0, 255, 0)
160 
161  mask_img = np.zeros((img.shape[0], img.shape[1]), dtype=np.bool)
162  for person_pose in people_pose:
163  x_max = -10000
164  x_min = 10000
165 
166  limb_poses = []
167  nose_pose = None
168  neck_pose = None
169  target_limb_names = ['Nose',
170  'Neck',
171  'REye',
172  'LEye',
173  'REar',
174  'LEar']
175  for limb_name in target_limb_names:
176  try:
177  limb_index = person_pose.limb_names.index(limb_name)
178  limb_pose = person_pose.poses[limb_index]
179  limb_poses.append(limb_pose)
180  if limb_name == 'Nose':
181  nose_pose = limb_pose
182  elif limb_name == 'Neck':
183  neck_pose = limb_pose
184  except ValueError:
185  continue
186 
187  shoulder_limb_poses = []
188  shoulder_limb_names = ['RShoulder',
189  'LShoulder']
190  for limb_name in shoulder_limb_names:
191  try:
192  shoulder_limb_index = person_pose.limb_names.index(limb_name)
193  shoulder_limb_pose = person_pose.poses[shoulder_limb_index]
194  shoulder_limb_poses.append(shoulder_limb_pose)
195  except ValueError:
196  continue
197  if len(shoulder_limb_poses) == 2:
198  positions_x = [pose.position.x for pose in shoulder_limb_poses]
199  x_mid = sum(positions_x) / len(positions_x)
200  x_max = max(positions_x) * self.face_shoulder_ratio + \
201  x_mid * (1.0 - self.face_shoulder_ratio)
202  x_min = min(positions_x) * self.face_shoulder_ratio + \
203  x_mid * (1.0 - self.face_shoulder_ratio)
204 
205  if nose_pose == None or neck_pose == None:
206  continue
207 
208  positions_x = [pose.position.x for pose in limb_poses]
209  x_max = max(max(positions_x), x_max)
210  x_min = min(min(positions_x), x_min)
211  width = (x_max - x_min) * self.face_width_margin_ratio
212  x = (x_max + x_min) / 2.0 - width / 2.0
213 
214  positions_y = [pose.position.y for pose in limb_poses]
215  y_min = min(positions_y)
216  height = self.face_ratio * \
217  abs(nose_pose.position.y - neck_pose.position.y) * 2.0
218  y_min = min(y_min, nose_pose.position.y - height / 2.0)
219  y_max = nose_pose.position.y + height / 2.0
220  y = y_min
221  height = y_max - y_min
222 
223  x = int(x)
224  y = int(y)
225  width = int(width)
226  height = int(height)
227 
228  mask_img[max(y, 0):max(min(y + height, img.shape[0]), 0),
229  max(x, 0):max(min(x + width, img.shape[1]), 0)] = True
230  cv2.rectangle(img, (x, y), (x + width, y + height),
231  color, rectangle_thickness)
232  return mask_img, img
233 
234 if __name__ == '__main__':
235  rospy.init_node('people_mask_publisher')
237  rospy.spin()
def _cb(self, img_msg, people_pose_array_msg)
def _create_hand_mask(self, people_pose, img, arm=['R', L)


jsk_perception
Author(s): Manabu Saito, Ryohei Ueda
autogenerated on Mon May 3 2021 03:03:27