eus_teleop_status_visualizer.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import numpy as np
4 
5 import cv_bridge
6 from jsk_topic_tools import ConnectionBasedTransport
7 import rospy
8 
9 from eus_teleop.msg import EusTeleopStatusArray
10 from sensor_msgs.msg import Image
11 
12 
13 class EusTeleopStatusVisualizer(ConnectionBasedTransport):
14 
15  def __init__(self):
16  super(EusTeleopStatusVisualizer, self).__init__()
17  self.pub = self.advertise('~output/image', Image, queue_size=1)
18  self.enable = {'larm': False, 'rarm': False}
19  self.collision = {'larm': False, 'rarm': False}
20  self.track_error = {'larm': False, 'rarm': False}
21  self.hand_close = {'larm': False, 'rarm': False}
22  self.bridge = cv_bridge.CvBridge()
23 
24  def subscribe(self):
25  self.sub = rospy.Subscriber('~input/image', Image, self._cb)
26  self.status_sub = rospy.Subscriber(
27  '~input/status', EusTeleopStatusArray, self._status_cb)
28 
29  def unsubscribe(self):
30  self.sub.unregister()
31  self.status_sub.unregister()
32 
33  def _cb(self, msg):
34  img = self.bridge.imgmsg_to_cv2(msg, desired_encoding="bgr8")
35  H, W, C = img.shape
36  img_W = int((H * 16.0 / 9.0 - W) / 2.0)
37  imgs = {}
38  imgs['larm'] = np.zeros((H, img_W, C), dtype=np.uint8)
39  imgs['rarm'] = np.zeros((H, img_W, C), dtype=np.uint8)
40 
41  for index, arm in zip([1, 0], ['larm', 'rarm']):
42  if self.enable[arm]:
43  if self.collision[arm] or self.track_error[arm]:
44  # red
45  imgs[arm][:, :, 2] = 255
46  else:
47  # larm: green
48  # rarm: blue
49  imgs[arm][:, :, index] = 255
50  if self.hand_close[arm]:
51  # orange
52  imgs[arm][int(2.0 * H / 3.0):, :, 0] = 0
53  imgs[arm][int(2.0 * H / 3.0):, :, 1] = 165
54  imgs[arm][int(2.0 * H / 3.0):, :, 2] = 255
55  else:
56  # gray
57  imgs[arm][:] = 127
58 
59  img = np.concatenate((imgs['rarm'], img, imgs['larm']), axis=1)
60  img_msg = self.bridge.cv2_to_imgmsg(img, encoding="bgr8")
61  img_msg.header = msg.header
62  self.pub.publish(img_msg)
63 
64  def _status_cb(self, msg):
65  for status in msg.status:
66  if status.part_name in self.enable:
67  self.enable[status.part_name] = status.enable
68  if status.part_name in self.collision:
69  self.collision[status.part_name] = status.collision
70  if status.part_name in self.track_error:
71  self.track_error[status.part_name] = status.track_error
72  if status.part_name in self.hand_close:
73  self.hand_close[status.part_name] = status.hand_close
74 
75 
76 if __name__ == '__main__':
77  rospy.init_node('eus_teleop_status_visualizer')
79  rospy.spin()
eus_teleop_status_visualizer.EusTeleopStatusVisualizer.sub
sub
Definition: eus_teleop_status_visualizer.py:25
eus_teleop_status_visualizer.EusTeleopStatusVisualizer.subscribe
def subscribe(self)
Definition: eus_teleop_status_visualizer.py:24
eus_teleop_status_visualizer.EusTeleopStatusVisualizer.track_error
track_error
Definition: eus_teleop_status_visualizer.py:20
eus_teleop_status_visualizer.EusTeleopStatusVisualizer.pub
pub
Definition: eus_teleop_status_visualizer.py:17
eus_teleop_status_visualizer.EusTeleopStatusVisualizer.status_sub
status_sub
Definition: eus_teleop_status_visualizer.py:26
eus_teleop_status_visualizer.EusTeleopStatusVisualizer.hand_close
hand_close
Definition: eus_teleop_status_visualizer.py:21
eus_teleop_status_visualizer.EusTeleopStatusVisualizer.enable
enable
Definition: eus_teleop_status_visualizer.py:18
eus_teleop_status_visualizer.EusTeleopStatusVisualizer
Definition: eus_teleop_status_visualizer.py:13
eus_teleop_status_visualizer.EusTeleopStatusVisualizer._status_cb
def _status_cb(self, msg)
Definition: eus_teleop_status_visualizer.py:64
eus_teleop_status_visualizer.EusTeleopStatusVisualizer.collision
collision
Definition: eus_teleop_status_visualizer.py:19
eus_teleop_status_visualizer.EusTeleopStatusVisualizer.bridge
bridge
Definition: eus_teleop_status_visualizer.py:22
eus_teleop_status_visualizer.EusTeleopStatusVisualizer.__init__
def __init__(self)
Definition: eus_teleop_status_visualizer.py:15
eus_teleop_status_visualizer.EusTeleopStatusVisualizer.unsubscribe
def unsubscribe(self)
Definition: eus_teleop_status_visualizer.py:29
eus_teleop_status_visualizer.EusTeleopStatusVisualizer._cb
def _cb(self, msg)
Definition: eus_teleop_status_visualizer.py:33


eus_teleop
Author(s): Shingo Kitagawa
autogenerated on Mon Dec 9 2024 04:10:50