label_image_decomposer.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import sys
4 
5 import cv2
6 import matplotlib
7 matplotlib.use('Agg') # NOQA
8 import matplotlib.cm
9 import numpy as np
10 import scipy.ndimage
11 
12 import cv_bridge
13 import dynamic_reconfigure.server
14 from jsk_recognition_utils import bounding_rect_of_mask
15 from jsk_recognition_utils import get_tile_image
16 from jsk_recognition_utils.color import labelcolormap
17 from jsk_topic_tools import ConnectionBasedTransport
18 from jsk_topic_tools import warn_no_remap
19 import message_filters
20 import rospy
21 from sensor_msgs.msg import Image
22 
23 from jsk_perception.cfg import LabelImageDecomposerConfig
24 
25 
26 def get_text_color(color):
27  if color[0] * 0.299 + color[1] * 0.587 + color[2] * 0.114 > 170:
28  return (0, 0, 0)
29  return (255, 255, 255)
30 
31 
32 def label2rgb(lbl, img=None, label_names=None, alpha=0.3, bg_label=0):
33  if label_names is None:
34  n_labels = lbl.max() + 1 # +1 for bg_label 0
35  else:
36  n_labels = len(label_names)
37  cmap = labelcolormap(256)
38  cmap = (cmap * 255).astype(np.uint8)
39  bg_color, cmap = cmap[0], cmap[1:] # bg_color is 0
40 
41  lbl_viz = np.zeros((lbl.shape[0], lbl.shape[1], 3), dtype=np.uint8)
42  fg_mask = lbl != bg_label
43  lbl_viz[fg_mask] = cmap[lbl[fg_mask] % 255]
44  lbl_viz[~fg_mask] = bg_color
45 
46  if img is not None:
47  if img.ndim == 3:
48  img_gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
49  else:
50  assert img.ndim == 2
51  img_gray = img
52  img_gray = cv2.cvtColor(img_gray, cv2.COLOR_GRAY2RGB)
53  lbl_viz = alpha * lbl_viz + (1 - alpha) * img_gray
54  lbl_viz = lbl_viz.astype(np.uint8)
55 
56  if label_names is None:
57  return lbl_viz
58 
59  np.random.seed(1234)
60  labels = np.unique(lbl)
61  labels = labels[labels != 0]
62  for label in labels:
63  mask = lbl == label
64  mask = (mask * 255).astype(np.uint8)
65  y, x = scipy.ndimage.center_of_mass(mask)
66  y, x = map(int, [y, x])
67 
68  if lbl[y, x] != label:
69  Y, X = np.where(mask)
70  point_index = np.random.randint(0, len(Y))
71  y, x = Y[point_index], X[point_index]
72 
73  text = label_names[label]
74  font_face = cv2.FONT_HERSHEY_SIMPLEX
75  font_scale = 0.7
76  thickness = 2
77  text_size, baseline = cv2.getTextSize(
78  text, font_face, font_scale, thickness)
79 
80  color = get_text_color(lbl_viz[y, x])
81  cv2.putText(lbl_viz, text,
82  (x - text_size[0] // 2, y),
83  font_face, font_scale, color, thickness)
84  return lbl_viz
85 
86 
87 class LabelImageDecomposer(ConnectionBasedTransport):
88 
89  def __init__(self):
90  super(LabelImageDecomposer, self).__init__()
91 
92  self._srv_dynparam = dynamic_reconfigure.server.Server(
93  LabelImageDecomposerConfig, self._config_callback)
94 
95  self.pub_img = self.advertise('~output', Image, queue_size=5)
96  self.pub_label_viz = self.advertise('~output/label_viz', Image,
97  queue_size=5)
98  self._bg_label = rospy.get_param('~bg_label', 0) # ignored label
99  self._only_label = rospy.get_param('~only_label', False)
100  self._label_names = rospy.get_param('~label_names', None)
101  # publish masks of fg/bg by decomposing each label
102  self._publish_mask = rospy.get_param('~publish_mask', False)
103  if self._publish_mask:
104  self.pub_fg_mask = self.advertise('~output/fg_mask', Image,
105  queue_size=5)
106  self.pub_bg_mask = self.advertise('~output/bg_mask', Image,
107  queue_size=5)
108  # publish each region image. this can take time so optional.
109  self._publish_tile = rospy.get_param('~publish_tile', False)
110  rospy.loginfo('~publish_tile: {}'.format(self._publish_tile))
111  if self._only_label and self._publish_tile:
112  rospy.logerr('Can not publish tile image when ~only_label is true,'
113  ' so forcely set ~publish_tile to false.')
114  self._publish_tile = False
115  if self._publish_tile:
116  self.pub_tile = self.advertise('~output/tile', Image, queue_size=5)
117 
118  def _config_callback(self, config, level):
119  self._alpha = config.alpha
120  return config
121 
122  def subscribe(self):
123  self.sub_label = message_filters.Subscriber('~input/label', Image)
124  if self._only_label:
125  self.sub_label.registerCallback(self._apply)
126  return
127  self.sub_img = message_filters.Subscriber('~input', Image)
128  warn_no_remap('~input', '~input/label')
129  use_async = rospy.get_param('~approximate_sync', False)
130  queue_size = rospy.get_param('~queue_size', 10)
131  rospy.loginfo('~approximate_sync: {}, queue_size: {}'
132  .format(use_async, queue_size))
133  if use_async:
134  slop = rospy.get_param('~slop', 0.1)
135  rospy.loginfo('~slop: {}'.format(slop))
136  sync = message_filters.ApproximateTimeSynchronizer(
137  [self.sub_label, self.sub_img],
138  queue_size=queue_size, slop=slop)
139  sync.registerCallback(self._apply)
140  if self._publish_tile:
141  sync.registerCallback(self._apply_tile)
142  else:
144  [self.sub_label, self.sub_img], queue_size=queue_size)
145  sync.registerCallback(self._apply)
146  if self._publish_tile:
147  sync.registerCallback(self._apply_tile)
148 
149  def unsubscribe(self):
150  self.sub_img.sub.unregister()
151  self.sub_label.sub.unregister()
152 
153  def _apply(self, label_msg, img_msg=None):
154  bridge = cv_bridge.CvBridge()
155  label_img = bridge.imgmsg_to_cv2(label_msg)
156  if img_msg:
157  img = bridge.imgmsg_to_cv2(img_msg)
158  # publish only valid label region
159  applied = img.copy()
160  applied[label_img == self._bg_label] = 0
161  applied_msg = bridge.cv2_to_imgmsg(applied, encoding=img_msg.encoding)
162  applied_msg.header = img_msg.header
163  self.pub_img.publish(applied_msg)
164  # publish visualized label
165  if img_msg.encoding in {'16UC1', '32SC1'}:
166  # do dynamic scaling to make it look nicely
167  min_value, max_value = img.min(), img.max()
168  img = (img - min_value) / (max_value - min_value) * 255
169  img = img.astype(np.uint8)
170  img = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB)
171  else:
172  img = None
173 
174  label_viz = label2rgb(label_img, img, label_names=self._label_names,
175  alpha=self._alpha, bg_label=self._bg_label)
176  label_viz_msg = bridge.cv2_to_imgmsg(label_viz, encoding='rgb8')
177  label_viz_msg.header = label_msg.header
178  self.pub_label_viz.publish(label_viz_msg)
179 
180  # publish mask
181  if self._publish_mask:
182  bg_mask = (label_img == 0)
183  fg_mask = ~bg_mask
184  bg_mask = (bg_mask * 255).astype(np.uint8)
185  fg_mask = (fg_mask * 255).astype(np.uint8)
186  fg_mask_msg = bridge.cv2_to_imgmsg(fg_mask, encoding='mono8')
187  fg_mask_msg.header = label_msg.header
188  bg_mask_msg = bridge.cv2_to_imgmsg(bg_mask, encoding='mono8')
189  bg_mask_msg.header = label_msg.header
190  self.pub_fg_mask.publish(fg_mask_msg)
191  self.pub_bg_mask.publish(bg_mask_msg)
192 
193  def _apply_tile(self, label_msg, img_msg):
194  bridge = cv_bridge.CvBridge()
195  img = bridge.imgmsg_to_cv2(img_msg)
196  label_img = bridge.imgmsg_to_cv2(label_msg)
197 
198  imgs = []
199  labels = np.unique(label_img)
200  for label in labels:
201  if label == 0:
202  # should be skipped 0, because
203  # 0 is to label image as black region to mask image
204  continue
205  img_tmp = img.copy()
206  mask = label_img == label
207  img_tmp[~mask] = 0
208  img_tmp = bounding_rect_of_mask(img_tmp, mask)
209  imgs.append(img_tmp)
210  tile_img = get_tile_image(imgs)
211  tile_msg = bridge.cv2_to_imgmsg(tile_img, encoding='bgr8')
212  tile_msg.header = img_msg.header
213  self.pub_tile.publish(tile_msg)
214 
215 
216 if __name__ == '__main__':
217  rospy.init_node('label_image_decomposer')
218  label_image_decomposer = LabelImageDecomposer()
219  rospy.spin()
def label2rgb(lbl, img=None, label_names=None, alpha=0.3, bg_label=0)


jsk_perception
Author(s): Manabu Saito, Ryohei Ueda
autogenerated on Mon May 3 2021 03:03:27